All Projects → tolitius → lasync

tolitius / lasync

Licence: other
making executor service tougher

Programming Languages

clojure
4091 projects
java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to lasync

gbp
Golang Best Practices (GBP™)
Stars: ✭ 25 (-34.21%)
Mutual labels:  concurrency
nested scheduler
Shard for creating separate groups of fibers in a hierarchical way and to collect results and errors in a structured way.
Stars: ✭ 20 (-47.37%)
Mutual labels:  concurrency
drone-stm32-map
STM32 peripheral mappings for Drone, an Embedded Operating System.
Stars: ✭ 16 (-57.89%)
Mutual labels:  concurrency
concurrent-ll
concurrent linked list implementation
Stars: ✭ 66 (+73.68%)
Mutual labels:  concurrency
archery
Abstract over the atomicity of reference-counting pointers in rust
Stars: ✭ 107 (+181.58%)
Mutual labels:  concurrency
pen
The parallel, concurrent, and functional programming language for scalable software development
Stars: ✭ 394 (+936.84%)
Mutual labels:  concurrency
workerpool
A workerpool that can get expanded & shrink dynamically.
Stars: ✭ 55 (+44.74%)
Mutual labels:  concurrency
Hunch
Hunch provides functions like: All, First, Retry, Waterfall etc., that makes asynchronous flow control more intuitive.
Stars: ✭ 94 (+147.37%)
Mutual labels:  concurrency
rockgo
A developing game server framework,based on Entity Component System(ECS).
Stars: ✭ 617 (+1523.68%)
Mutual labels:  concurrency
django-concurrency-talk
🎭 Database Integrity in Django: Safely Handling Critical Data in Distributed Systems
Stars: ✭ 49 (+28.95%)
Mutual labels:  concurrency
java-red
Effective Concurrency Modules for Java
Stars: ✭ 25 (-34.21%)
Mutual labels:  concurrency
haskell-simple-concurrency
Small examples of concurrency in Haskell.
Stars: ✭ 75 (+97.37%)
Mutual labels:  concurrency
linked-blocking-multi-queue
A concurrent collection that extends the existing Java concurrent collection library, offering an optionally-bounded blocking "multi-queue" based on linked nodes.
Stars: ✭ 41 (+7.89%)
Mutual labels:  concurrency
Actors.jl
Concurrent computing in Julia based on the Actor Model
Stars: ✭ 95 (+150%)
Mutual labels:  concurrency
skywalker
A package to allow one to concurrently go through a filesystem with ease
Stars: ✭ 87 (+128.95%)
Mutual labels:  concurrency
go-left-right
A faster RWLock primitive in Go, 2-3 times faster than RWMutex. A Go implementation of concurrency control algorithm in paper <Left-Right - A Concurrency Control Technique with Wait-Free Population Oblivious Reads>
Stars: ✭ 42 (+10.53%)
Mutual labels:  concurrency
batching-toposort
Efficiently sort interdependent tasks into a sequence of concurrently-executable batches
Stars: ✭ 21 (-44.74%)
Mutual labels:  concurrency
noroutine
Goroutine analogue for Node.js, spreads I/O-bound routine calls to utilize thread pool (worker_threads) using balancer with event loop utilization. 🌱
Stars: ✭ 86 (+126.32%)
Mutual labels:  concurrency
bascomtask
Lightweight parallel Java tasks
Stars: ✭ 49 (+28.95%)
Mutual labels:  concurrency
YACLib
Yet Another Concurrency Library
Stars: ✭ 193 (+407.89%)
Mutual labels:  concurrency

limited async

an executor service (a.k.a. smart pool of threads) that is backed by an ArrayLimitedQueue or a LinkedLimitedQueue.

<! release <! clojars

why

the purpose of this library is to be able to block on ".submit" / ".execute" whenever the q task limit is reached. Here is why..

if a regular BlockingQueue is used, a ThreadPoolExecutor calls queue's "offer" method which does not block: inserts a task and returns true, or returns false in case a queue is "capacity-restricted" and its capacity was reached.

while this behavior is useful, there are cases where we do need to block and wait until a ThreadPoolExecutor has a thread available to work on the task.

depending on a use case this back pressure can be very useful. One reason could be an off heap storage that is being read and processed by a ThreadPoolExecutor: e.g. there is no need, and sometimes completely undesired, to use JVM heap for something that is already available off heap. Another good use is described in "Creating a NotifyingBlockingThreadPoolExecutor".

how to

to create a pool with limited number of threads and a backing q limit:

(ns sample.project
  (:require [lasync.core :as lasync]))

(def pool (lasync/pool))

that is pretty much it. The pool is a regular ExecutorService that can have tasks submitted to it:

(.submit pool #(+ 41 1))

there is also a submit function that wraps this call and returns a future:

show=> (lasync/submit pool #(+ 41 1))
#object[java.util.concurrent.FutureTask 0x6d1ce6d3 "java.util.concurrent.FutureTask@6d1ce6d3"]

as well as an execute function that does not return a future, hence exeptions will be caught and reported by the default exception handler.

number of threads

by default lasync will create available cores * 2 + 42 number of threads:

(defn- number-of-threads []
  (+ (* 2 available-cores) 42))

but the number can be changed by:

user=> (def pool (lasync/pool {:threads 42}))
#'user/pool

queue size

the default queue that is backing lasync's pool is ArrayLimitedQueue with a default capacity of 1024 items. But all defaults are there to customize. A queue size is what limits the pool enabling the back pressure. Use :limit to tune that knob:

(def pool (lasync/pool {:limit 65535}))

stats

pool is no good when it is a black box. lasync let's you unbox those stats whenever you need it:

user=> (lasync/stats pool)

{:largestPoolSize 0,
 :queueCurrentSize 0,
 :activeCount 0,
 :terminating false,
 :poolSize 0,
 :taskCount 0,
 :completedTaskCount 0,
 :class java.util.concurrent.ThreadPoolExecutor,
 :terminated false,
 :keepAliveTimeMs 60000,
 :allowsCoreThreadTimeOut false,
 :corePoolSize 66,
 :maximumPoolSize 66,
 :shutdown false}

show me

to see lasync in action:

lein repl
user=> (require '[show :refer [rock-on]])
user=> (rock-on 69)  ;; Woodstock'69
INFO: pool q-size: 4, submitted: 1
INFO: pool q-size: 4, submitted: 3
INFO: pool q-size: 4, submitted: 2
INFO: pool q-size: 4, submitted: 0
INFO: pool q-size: 4, submitted: 4
INFO: pool q-size: 4, submitted: 5
INFO: pool q-size: 4, submitted: 6
INFO: pool q-size: 4, submitted: 7
...
...
INFO: pool q-size: 4, submitted: 62
INFO: pool q-size: 3, submitted: 60
INFO: pool q-size: 4, submitted: 63
INFO: pool q-size: 3, submitted: 65
INFO: pool q-size: 3, submitted: 64
INFO: pool q-size: 2, submitted: 66
INFO: pool q-size: 1, submitted: 67
INFO: pool q-size: 0, submitted: 68

here lasync show was rocking on 4 core box (which it picked up on), so regardless of how many tasks are being pushed to it, the queue max size always stays at 4, and lasync creates that back pressure in case the task q limit is reached. In fact the "blocking" can be seen in action, as each task is sleeping for a second, so the whole thing can be visually seen being processed by 4, pause, next 4, pause, etc..

here is the code behind the show

do check out the (expire-core-threads 69) and (use-max-threads 69) from the examples as well

tweaking other knobs

queue implementation

while ArrayLimitedQueue fits most of the use cases, a custom, or a different queue can be configured via :queue:

(def pool (lasync/pool {:queue (LinkedLimitedQueue. 128)}))

thread factory

by default lasync's thread factory tries to have reasonable defaults but if you want to make your it's simply a matter of reify'ing an interface.

(def tpool (reify
             ThreadFactory
             (newThread [_ runnable] ...)))

(def pool (lasync/pool {:threads 10 :thread-factory tpool}))

rejected execution handler

lasync takes an optional rejected-fn that will be called on every RejectedExecutionException. The default function is:

(defn default-rejected-fn [runnable _]
  (throw (RejectedExecutionException.
           (str "rejected execution: " runnable))))

but it can be replaced with a custom one (the second param is an executor, it is ignored in this case):

(defn log-rejected [runnable _]
  (error runnable "was rejected"))

(def pool (lasync/pool {:threads 10 :rejected-fn log-rejected}))

unDefault it

(def tpool (reify ThreadFactory
                 (newThread [_ runnable] ...)))

(defn log-rejected [runnable _]
  (error runnable "was rejected"))

(def lp (lasync/pool {:threads 42
                      :thread-factory tpool
                      :limit 101010101
                      :rejected-fn log-rejected}))

shut it down

when you done with a pool it is a good idea to shut it down:

(lasync/shutdown pool)

also wait for completion of tasks (timeout in ms):

(lasync/await-termination pool 5000)

license

copyright © 2021 tolitius

distributed under the Eclipse Public License, the same as Clojure.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].