All Projects → soabase → soabase-stages

soabase / soabase-stages

Licence: Apache-2.0 License
A tiny library that makes staged/pipelined CompletableFutures much easier to create and manage

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to soabase-stages

Brightfutures
Write great asynchronous code in Swift using futures and promises
Stars: ✭ 1,890 (+8117.39%)
Mutual labels:  concurrency, futures
java-red
Effective Concurrency Modules for Java
Stars: ✭ 25 (+8.7%)
Mutual labels:  concurrency, futures
Smol
A small and fast async runtime for Rust
Stars: ✭ 2,206 (+9491.3%)
Mutual labels:  concurrency, futures
Transient
A full stack, reactive architecture for general purpose programming. Algebraic and monadically composable primitives for concurrency, parallelism, event handling, transactions, multithreading, Web, and distributed computing with complete de-inversion of control (No callbacks, no blocking, pure state)
Stars: ✭ 617 (+2582.61%)
Mutual labels:  concurrency, threading
thread-pool
A modern thread pool implementation based on C++20
Stars: ✭ 104 (+352.17%)
Mutual labels:  concurrency, threading
Agency
Execution primitives for C++
Stars: ✭ 127 (+452.17%)
Mutual labels:  concurrency, threading
Threadly
A library of tools to assist with safe concurrent java development. Providing unique priority based thread pools, and ways to distrbute threaded work safely.
Stars: ✭ 196 (+752.17%)
Mutual labels:  concurrency, threading
Python3 Concurrency Pics 02
爬取 www.mzitu.com 全站图片,截至目前共5162个图集,16.5万多张美女图片,使用 asyncio 和 aiohttp 实现的异步版本只需要不到2小时就能爬取完成。按日期创建图集目录,保存更合理。控制台只显示下载的进度条,详细信息保存在日志文件中。支持异常处理,不会终止爬虫程序。失败的请求,下次再执行爬虫程序时会自动下载
Stars: ✭ 275 (+1095.65%)
Mutual labels:  concurrency, futures
python3-concurrency
Python3爬虫系列的理论验证,首先研究I/O模型,分别用Python实现了blocking I/O、nonblocking I/O、I/O multiplexing各模型下的TCP服务端和客户端。然后,研究同步I/O操作(依序下载、多进程并发、多线程并发)和异步I/O(asyncio)之间的效率差别
Stars: ✭ 49 (+113.04%)
Mutual labels:  concurrency, futures
thread-pool
BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library
Stars: ✭ 1,043 (+4434.78%)
Mutual labels:  concurrency, threading
Funfix
Functional Programming Library for JavaScript, TypeScript and Flow ✨⚡️
Stars: ✭ 596 (+2491.3%)
Mutual labels:  concurrency, futures
swift-futures
Demand-driven asynchronous programming in Swift
Stars: ✭ 32 (+39.13%)
Mutual labels:  concurrency, futures
Thread Pool
Thread pool implementation using c++11 threads
Stars: ✭ 417 (+1713.04%)
Mutual labels:  concurrency, futures
Floyd
The Floyd programming language
Stars: ✭ 133 (+478.26%)
Mutual labels:  concurrency, threading
Concurrencpp
Modern concurrency for C++. Tasks, executors, timers and C++20 coroutines to rule them all
Stars: ✭ 340 (+1378.26%)
Mutual labels:  concurrency, threading
Pht
A new threading extension for PHP
Stars: ✭ 175 (+660.87%)
Mutual labels:  concurrency, threading
swoole-futures
⏳ Futures, Streams & Async/Await for PHP's Swoole asynchronous run-time.
Stars: ✭ 100 (+334.78%)
Mutual labels:  concurrency, futures
async-oneshot
A fast, small, full-featured, no-std compatible oneshot channel
Stars: ✭ 55 (+139.13%)
Mutual labels:  concurrency, futures
futureproof
Bulletproof concurrent.futures
Stars: ✭ 36 (+56.52%)
Mutual labels:  concurrency, futures
think-async
🌿 Exploring cooperative concurrency primitives in Python
Stars: ✭ 178 (+673.91%)
Mutual labels:  concurrency, threading

soabase-stages

Build Status Maven Central

A tiny library that makes staged/pipelined CompletableFutures much easier to create and manage.

Use Cases

  • You have a sequence of tasks that pipeline or chain
  • These tasks can be executed synchronously or asynchronously
  • You might need to abort/cancel the chain in the middle
  • You might need to provide a timeout for the tasks

Most of this can be done with Java 8's CompletableFuture/CompletionStage today. Timeouts must be added manually (or wait for Java 9). Aborting tasks is not supported. Also, the CompletableFuture/CompletionStage API is awkward and difficult to use.

StagedFuture

StagedFuture simplifies CompletableFuture/CompletionStage so that you can write code like this:

StagedFuture.async(executor)
    .thenIf(() -> queryDatabaseFor("something"))
        .withTimeout(Duration.ofSeconds(25))
    .thenIf(record -> applyRecord(record)) // chain aborts if no record found
    .thenIf(result -> returnNextRecord(result))
    .whenSucceeded(nextResult -> handleResult(nextResult))
    .whenAborted(() -> handleAbort())
    .whenFailed(e -> handleFailure(e));

Benefits Over Raw CompletableFuture/CompletionStage

  • Can easily set timeouts and timeouts with default values for tasks (without waiting for Java 9)
  • Allows setting the executor once instead of with each chained call
  • Allows a task to signal that the remainder of the chain should be canceled
  • Simplified API

Note: you can easily access the managed CompletionStage when needed by calling StagedFuture#unwrap().

Using Stages

Stages is available from Maven Central. Use your favorite build tool and specify:

GroupId ArtifactId
io.soabase.stages soabase-stages

Change Log

Starting a chain

Similarly to the builders in CompletableFuture you start a chain using the builders in StagedFuture. There are syncrhonous and asynchronous builders:

  • StagedFuture.sync() - starts a StagedFuture chain that executes tasks synchronously
  • StagedFuture.async(executor) - starts a StagedFuture chain that executes tasks asynchronously using the given executor
  • StagedFuture.asyncPool() - starts a StagedFuture chain that executes tasks asynchronously using the ForkJoin pool

Adding tasks to the chain

Tasks are added to the chain using one of the "thenIf" methods. The first task added is specified via a supplier and subsequent tasks are specified via functions that take the result of the previous task:

Initial Task

  • thenIf(Supplier<Optional<U>> proc) - Execute the given task synchronously or asynchronously depending on how the StagedFuture was built. The given task returns an optional value that indicates whether or not the next stage can execute. If Optional.empty() is returned, the entire StagedFuture chain is considered to be aborted and no future tasks will execute. The StagedFuture.whenAborted() completer will get called.

Subsequent Tasks

  • thenIf(Function<T, Optional<U>> proc) - If the chain has not been aborted or errored, the result of the current task is passed to this new task synchronously or asynchronously depending on how the StagedFuture was built. The given task returns an optional value that indicates whether or not the next stage can execute. If Optional.empty() is returned, the entire StagedFuture chain is considered to be aborted and no future tasks will execute. The StagedFuture.whenAborted() completer will get called.

Timeouts

The "then" methods (see above) can optional be assigned a timeout or a timeout and default value:

  • thenIf(X).withTimeout(Duration timeout) - Sets a timeout for this stage's task. If the given timeout elapses before the task completes this stage is completed exceptionally with a TimeoutException.
  • thenIf(X).withTimeout(Duration timeout, Supplier<T> defaultValue) - Sets a timeout for this stage's task. If the given timeout elapses before the task completes this stage is completed with the given default value.

Completers

At any point in the chain, you can add handlers for successful completions, failures or aborts:

  • whenSucceeded(Consumer<T> handler) - if the chain completes successfully the handler is called.
  • whenSucceededYield(Function<T, U> handler) - same as whenSucceeded() but allows mapping the return type.
  • whenAborted(Runnable handler) - if the chain is aborted (i.e. one of the thenIf() tasks returns empty) the handler is called.
  • whenFailed(Consumer<Throwable> handler) - if there is an exception or failure in the chain the handler is called.
  • whenFinal(Runnable handler) - calls the handler when the chain completes in any way (success, abort, exception, etc.).

Chaining Other Stages

You can include external stages into the chain:

  • thenStageIf(Function<T, CompletionStage<Optional<U>>> stage) - executes the given stage asynchronously as the next task in the chain. If the stage returns an empty Optional the chain is aborted.

Access The Internal CompletionStage

You can access the internally managed CompletionStage via:

  • unwrap() - returns the CompletionStage<Optional<T>>.

Tracing

The tasks submitted to StagedFuture can optionally be traced via the Tracing interface. The library comes with an SLF4J tracer and a System.out tracer. You can also write your own. Pass an instace of the tracer to the StagedFuture builder. E.g.

StagedFuture.async(executor, Tracing.debug(logger)).
    then(...)
    ...

Cancelable Tracer

The special purpose tracer, Cancelable, can be used to enable canceling a running chain. It keeps track of the threads in use by the StagedFuture it is associated with. At any time you can call cancelChain(boolean) to interrupt currently running tasks and prevent new tasks from running. E.g.

Cancelable cancelable = new Cancelable();
StagedFuture.async(executor, cancelable)
    .thenIf(() -> worker("1"))
    .thenIf(s -> hangingWorker("2"))
    .thenIf(s -> worker("3"))
    .thenIf(s -> worker("4"));

cancelable.cancel(true);    // hangingWorker() gets interrupted 

Manual Wrappers

The CompletionStage wrappers that StagedFuture uses internally can be used directly without having to use StagedFuture.

Timeout

The Timeout class has methods that wrap CompletionStage adding timeouts and timeouts with default values. It roughly emulates the forthcoming Java 9 timeout features for CompletableFuture.

Aborted

The Aborted class has methods that wrap CompletionStage and call given handlers when the stage completes with an empty Optional.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].