All Projects → FrancoisChabot → variadic_future

FrancoisChabot / variadic_future

Licence: Apache-2.0 license
Variadic, completion-based futures for C++17

Programming Languages

C++
36643 projects - #6 most used programming language
CMake
9771 projects

Projects that are alternatives of or similar to variadic future

Swiftcoroutine
Swift coroutines for iOS, macOS and Linux.
Stars: ✭ 690 (+1582.93%)
Mutual labels:  multithreading, futures
parallel-dfs-dag
A parallel implementation of DFS for Directed Acyclic Graphs (https://research.nvidia.com/publication/parallel-depth-first-search-directed-acyclic-graphs)
Stars: ✭ 29 (-29.27%)
Mutual labels:  multithreading, futures
BeauRoutine
Coroutine and tweening framework for Unity3D
Stars: ✭ 88 (+114.63%)
Mutual labels:  futures
akshare
AKShare is an elegant and simple financial data interface library for Python, built for human beings! 开源财经数据接口库
Stars: ✭ 5,155 (+12473.17%)
Mutual labels:  futures
TaskManager
A C++14 Task Manager / Scheduler
Stars: ✭ 81 (+97.56%)
Mutual labels:  multithreading
kafka-workers
Kafka Workers is a client library which unifies records consuming from Kafka and processing them by user-defined WorkerTasks.
Stars: ✭ 30 (-26.83%)
Mutual labels:  multithreading
wasm-bindgen-rayon
An adapter for enabling Rayon-based concurrency on the Web with WebAssembly.
Stars: ✭ 257 (+526.83%)
Mutual labels:  multithreading
java-multithread
Códigos feitos para o curso de Multithreading com Java, no canal RinaldoDev do YouTube.
Stars: ✭ 24 (-41.46%)
Mutual labels:  multithreading
ray tutorial
An introductory tutorial about leveraging Ray core features for distributed patterns.
Stars: ✭ 67 (+63.41%)
Mutual labels:  futures
triple-buffer
Implementation of triple buffering in Rust
Stars: ✭ 66 (+60.98%)
Mutual labels:  multithreading
ThreadPinning.jl
Pinning Julia threads to cores
Stars: ✭ 23 (-43.9%)
Mutual labels:  multithreading
HTTPStaticServer
Bare-bones static HTTP server written in a single C file
Stars: ✭ 24 (-41.46%)
Mutual labels:  multithreading
finam-export
Python client library to download historical data from finam.ru
Stars: ✭ 84 (+104.88%)
Mutual labels:  futures
workerpoolxt
Concurrency limiting goroutine pool without upper limit on queue length. Extends github.com/gammazero/workerpool
Stars: ✭ 15 (-63.41%)
Mutual labels:  multithreading
python3-concurrency
Python3爬虫系列的理论验证,首先研究I/O模型,分别用Python实现了blocking I/O、nonblocking I/O、I/O multiplexing各模型下的TCP服务端和客户端。然后,研究同步I/O操作(依序下载、多进程并发、多线程并发)和异步I/O(asyncio)之间的效率差别
Stars: ✭ 49 (+19.51%)
Mutual labels:  futures
swift-futures
Demand-driven asynchronous programming in Swift
Stars: ✭ 32 (-21.95%)
Mutual labels:  futures
SoftLight
A shader-based Software Renderer Using The LightSky Framework.
Stars: ✭ 2 (-95.12%)
Mutual labels:  multithreading
ideas4
An Additional 100 Ideas for Computing https://samsquire.github.io/ideas4/
Stars: ✭ 26 (-36.59%)
Mutual labels:  multithreading
disruptor-billing-example
Example LMAX Disruptor spring-boot project that uses disruptor-spring-manager framework
Stars: ✭ 56 (+36.59%)
Mutual labels:  multithreading
ObviousAwait
🧵 Expressive aliases to ConfigureAwait(true) and ConfigureAwait(false)
Stars: ✭ 55 (+34.15%)
Mutual labels:  multithreading

CircleCI Build status Codacy Badge Total alerts Language grade: C/C++ Documentation

Variadic futures

High-performance variadic completion-based futures for C++17.

  • No external dependency
  • Header-only
  • Lockless

Why?

This was needed to properly implement Easy gRPC, and it was an interesting exercise.

What

Completion-based futures are a non-blocking, callback-based, synchronization mechanism that hides the callback logic from the asynchronous code, while properly handling error conditions.

A fairly common pattern is to have some long operation perform a callback upon its completion. At first glance, this seems pretty straightforward:

void do_something(int x, int y, std::function<int> on_complete);

void foo() {
  do_something(1, 12, [](int val) {
    std::cout << val << "\n";
  });
}

However, there's a few hidden complexities at play here. The code within do_something() has to make decisions about what to do with on_complete. Should on_complete be called inline or put in a work pool? Can we accept a default constructed on_complete? What should we do with error conditions? The path of least resistance led us to writing code with no error handling whatsoever...

With Futures, these decisions are delegated to the caller of do_something(), which prevents do_something() from having to know much about the context within which it is operating. Error handling is also not optional, so you will never have an error dropped on the floor.

Future<int> do_something(int x, int y);

void foo() {
  do_something(1, 12).finally([](expected<int> val) {
    if(val.has_value()) {
      std::cout << val << "\n";
    }
  });

It looks essentially the same, but now implementing do_something() is a lot more straightforward, less error-prone, and supports many more operation modes out of the box.

Once you start combining things, you can express some fairly complicated synchronization relationships in a clear and concise manner:

Future<void> foo() {
  Future<int> fut_a = do_something_that_produces_an_int();
  Future<bool> fut_b = do_something_that_produces_a_bool();
 
  // Create a future that triggers once both fut_a and fut_b are ready
  Future<int, bool> combined_fut = join(fut_a, fut_b);

  // This callback will only be invoked if both fut_a and fut_b are successfully fullfilled. Otherwise,
  // The failure gets automatically propagated to the resulting future.
  Future<void> result = combined_fut.then([](int a, bool b) {
    std::cout << a << " - " << b;
  });
  

  return result;
}

Documentation

You can find the auto-generated API reference here.

Installation

  • Make the contents of the include directory available to your project.
  • Have a look at var_future/config.h and make changes as needed.
  • If you are from the future, you may want to use std::expected instead of expected_lite,

Usage

Prerequisites

I am assuming you are already familiar with the expected<> concept/syntax. aom::expected<T> is simply a std::expected<T, std::exception_ptr>.

Consuming futures

Let's say that you are using a function that happens to return a Future<...>, and you want to execute a callback when the values becomes available:

Future<int, float> get_value_eventually();

The Future<int, float> will eventually be fullfilled with an int and a float or failed with one or more std::exception_ptr, up to one per field.

The simplest thing you can do is call finally() on it. This will register a callback that will be invoked when both values are available or failed:

auto f = get_value_eventually();

f.finally([](expected<int> v, expected<float> f) { 
  if(v.has_value() && f.has_value()) {
    std::cout << "values are " << *v << " and " << *f << "\n"; 
  }
 });

Alternatively, if you want to create a future that is completed once the callback has completed, you can use then_expect().

Like finally(), then_expect() invokes its callback when all values are either fullfilled or failed. However, this time, the return value of the callback is used to populate a Future<T> (even if the callback returns void). If the callback happens to throw an exception (like invoking value() on an expected containing an error), then that exception becomes the result's failure.

Rules:

  • if the callback returns a Future<T>, that produces a Future<T>.
  • if the callback returns a expected<T>, that produces a Future<T>.
  • if the callback returns segmented(T, U), that produces a Future<T, U>.
  • Otherwise if the callback returns T, that produces a Future<T>
auto f = get_value_eventually();

Future<float> result = f.then_expect([](expected<int> v, expected<float> f) {
  // Reminder: expected::value() throws an exception if it contains an error.
  return f.value() * v.value(); 
 });

Finally, this pattern of propagating a future's failure as the failure of its callback's result is so common that a third method does that all at once: then().

Here, if f contains one or more failures, then the callback is never invoked at all, and the first error is immediately propagated as the result's failure.

The same return value rules as then_expect() apply.

auto f = get_value_eventually();

Future<float> result = f.then([](int v, float f) {
  return f * v; 
 });

In short:

error-handling error-propagating
chains then_expect() then()
terminates finally() N/A

Void fields

If a callback attached to then_expect() or then() returns void, that produces a Future<void>.

Future<>::then() has special handling of void fields: They are ommited entirely from the callback arguments:

Future<void> f_a;
Future<void, int> f_b;
Future<float, void, int> f_c;

f_a.then([](){});
f_b.then([](int v){});
f_c.then([](float f, int v){});

The Executor

The callback can either

  1. Be executed directly wherever the future is fullfilled (immediate)
  2. Be posted to a work pool to be executed by some worker (deffered)

immediate mode is used by default, just pass your callback to your chosen method and you are done.

N.B. If the future is already fullfilled by the time a callback is attached in immediate mode, the callback will be invoked in the thread attaching the callback as the callback is being attached.

For deferred mode, you need to pass your queue (or an adapter) as the first parameter to the method. The queue only needs to be some type that implements void push(T&&) where T is a Callable<void()>.

struct Queue {
  // In almost all cases, this needs to be thread-safe.
  void push(std::function<void()> cb);
};

void foo(Queue& queue) {
  get_value_eventually()
    .then([](int v){ return v * v;})             
    .finally(queue, [](expected<int> v) {
      if(v.has_value()) {
        std::cerr << "final value: " << *v << "\n";
      }
    });
}

Producing futures

Futures can be created by Future::then() or Future::then_expect(), but the chain has to start somewhere.

Promises

Promise<Ts...> is a lightweight interface you can use to create a future that will eventually be fullfilled (or failed).

Promise<int> prom;
Future<int> fut = prom.get_future();

std::thread thread([p=std::move(prom)](){ 
  p.set_value(3); 
});

async

async() will post the passed operation to the queue, and return a future to the value returned by that function.

aom::Future<double> fut = aom::async(queue, [](){return 12.0;})

Joining futures

You can wait on multiple futures at the same time using the join() function.

#include "var_future/future.h"

void foo() {
  aom::Future<int> fut_a = ...;
  aom::Future<int> fut_b = ...;

  aom::Future<int, int> combined = join(fut_a, fut_b);

  combined.finally([](aom::expected<int> a, aom::expected<int> b){
    //Do something with a and/or b;
  });
}

Posting callbacks to an ASIO context.

This example shows how to use ASIO, but the same idea can be applied to other contexts easily.

#include "asio.hpp"
#include "var_future/future.h"

// This can be any type that has a thread-safe push(Callable<void()>); method
struct Work_queue {
  template<typename T>
  void push(T&& cb) {
    asio::post(ctx_, std::forward<T>(cb));
  }

  asio::io_context& ctx_;
};

int int_generating_operation();

void foo() {
  asio::io_context io_ctx;
  Work_queue asio_adapter{io_ctx};

  // Queue the operation in the asio context, and get a future to the result.
  aom::Future<int> fut = aom::async(asio_adapter, int_generating_operation);

  // push the execution of this callback in io_context when ready.
  fut.finally(asio_adapter, [](aom::expected<int> v) {
    //Do something with v;
  });
}

get_std_future()

Future<> provides get_std_future(), as well as get(), which is the exact same as get_std_future().get() as a convenience for explicit synchronization.

This was added primarily to simplify writing unit tests, and using it extensively in other contexts is probably a bit of a code smell. If you find yourself that a lot, then perhaps you should just be using std::future<> directly instead.

Future<int> f1 = ...;
std::future<int> x_f = f1.get_std_future();

Future<int> f2 = ...;
int x = f2.get();

Future Streams

Warning: The stream API and performance are not nearly as mature and tested as Future<>/Promise<>.

Producing Future streams

aom::Stream_future<int> get_stream() {
   aom::Stream_promise<int> prom;
   auto result = prom.get_future();
   
   std::thread worker([p = std::move(prom)]() mutable {
     p.push(1);
     p.push(2);
     p.push(3);
     p.push(4);
     
     // If p is destroyed, the stream is implicitely failed.
     p.complete();
   });

   worker.detach();

   return result;
}

Consuming Future streams

 auto all_done = get_stream().for_each([](int v) {
   std::cout << v << "\n";
 }).then([](){
   std::cout << "all done!\n";
 });
 
 all_done.get();

Performance notes

The library assumes that, more often than not, a callback is attached to the future before a value or error is produced, and is tuned this way. Everything will still work if the value is produced before the callback arrives, but perhaps not as fast as possible.

The library also assumes that it is much more likely that a future will be fullfilled successfully rather than failed.

FAQs

Is there a std::shared_future<> equivalent?

Not yet. If someone would use it, it can be added to the library, we just don't want to add features that would not be used anywhere.

Why is there no terminating+error propagating method?

We have to admit that it would be nice to just do fut.finally([](int a, float b){ ... }), but the problem with that is that errors would have nowhere to go. Having the path of least resistance leading to dropping errors on the ground by default is just a recipe for disaster in the long run.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].