YACLib
Yet Another Concurrency Library
Table of Contents
About YACLib
YACLib is a lightweight C++ library for concurrent and parallel task execution, that is striving to satisfy the following properties:
- Zero cost abstractions
- Easy to use
- Easy to build
- Good test coverage
For more details check our design document and documentation.
Getting started
For quick start just paste this code in your CMakeLists.txt
file.
include(FetchContent)
FetchContent_Declare(yaclib
GIT_REPOSITORY https://github.com/YACLib/YACLib.git
GIT_TAG main
)
FetchContent_MakeAvailable(yaclib)
link_libraries(yaclib)
For more details check install guide.
For more details about 'yaclib_std' or fault injection, check doc.
Examples
Here are short examples of using some features from YACLib, for details check documentation.
Asynchronous pipeline
yaclib::FairThreadPool cpu_tp{/*threads=*/4};
yaclib::FairThreadPool io_tp{/*threads=*/1};
yaclib::Run(cpu_tp, [] { // on cpu_tp
return 42;
}).ThenInline([](int r) { // called directly after 'return 42', without Submit to cpu_tp
return r + 1;
}).Then([](int r) { // on cpu_tp
return std::to_string(r);
}).Detach(io_tp, [](std::string&& r) { // on io_tp
std::cout << "Pipeline result: <" << r << ">" << std::endl; // 43
});
We guarantee that no more than one allocation will be made for each step of the pipeline.
We have Then/Detach
x IExecutor/previous step IExecutor/Inline
.
Also Future/Promise don't contain shared atomic counters!
C++20 coroutine
yaclib::Future<int> task42() {
co_return 42;
}
yaclib::Future<int> task43() {
auto value = co_await task42();
co_return value + 1;
}
You can zero cost-combine Future coroutine code with Future callbacks code. That allows using YAClib for a smooth transfer from C++17 to C++20 with coroutines.
Also Future with coroutine doesn't make additional allocation for Future, only coroutine frame allocation that is caused by compiler, and can be optimized.
And finally co_await
doesn't require allocation,
so you can combine some async operation without allocation.
Lazy pipeline
auto task = yaclib::Schedule(tp1, [] {
return 1;
}).Then([] (int x) {
return x * 2;
});
task.Run(); // Run task on tp1
Same as asynchronous pipeline, but starting only after Run/ToFuture/Get. Task can be used as coroutine return type too.
Also running a Task that returns a Future doesn't make allocation. And it doesn't need synchronization, so it is even faster than asynchronous pipeline.
Thread pool
yaclib::FairThreadPool tp{/*threads=*/4};
Submit(tp, [] {
// some computations...
});
Submit(tp, [] {
// some computations...
});
tp.Stop();
tp.Wait();
Strand, Serial executor
yaclib::FairThreadPool tp{/*threads=*/4};
// decorated thread pool by serializing tasks:
auto strand = yaclib::MakeStrand(&tp);
std::size_t counter = 0;
for (std::size_t i = 0; i < 100; ++i) {
Submit(tp, [&] {
Submit(*strand, [&] {
// serialized by Strand, no data race!
++counter;
});
});
}
This is much more efficient than a mutex because
- don't block the threadpool thread.
- we execute critical sections in batches (the idea is known as flat-combining).
And also the implementation of strand is lock-free and efficient, without additional allocations.
Mutex
yaclib::FairThreadPool tp{4};
yaclib::Mutex<> m;
std::size_t counter = 0;
auto compute = [&] (std::size_t index) -> yaclib::Future<> {
co_await On(tp);
// ... some computation with index ...
auto guard = co_await m.Guard();
// ... co_await On(other thread pool) ...
counter += index;
};
for (std::size_t i = 0; i < 100; ++i) {
compute(i);
}
First, this is the only correct mutex implementation for C++20 coroutines as far as I know (cppcoro, libunifex, folly::coro implement Unlock incorrectly, it serializes the code after Unlock)
Second, Mutex
inherits all the Strand
benefits.
Rescheduling
yaclib::Future<> bar(yaclib::IExecutor& cpu, yaclib::IExecutor& io) {
co_await On(cpu);
// ... some heavy computation ...
co_await On(io);
// ... some io computation ...
}
This is really zero-cost, just suspend the coroutine and submit its resume to another executor, without synchronization inside the coroutine and allocations anywhere.
WhenAll
yaclib::FairThreadPool tp{/*threads=*/4};
std::vector<yaclib::Future<int>> fs;
// Run parallel computations
for (std::size_t i = 0; i < 5; ++i) {
fs.push_back(yaclib::Run(tp, [i]() -> int {
return random() * i;
}));
}
// Will be ready when all futures are ready
yaclib::Future<std::vector<int>> all = WhenAll(fs.begin(), fs.size());
std::vector<int> unique_ints = std::move(all).Then([](std::vector<int> ints) {
ints.erase(std::unique(ints.begin(), ints.end()), ints.end());
return ints;
}).Get().Ok();
Doesn't make more than 3 allocations regardless of input size.
WhenAny
yaclib::FairThreadPool tp{/*threads=*/4};
std::vector<yaclib::Future<int>> fs;
// Run parallel computations
for (std::size_t i = 0; i < 5; ++i) {
fs.push_back(yaclib::Run(tp, [i] {
// connect with one of the database shards
return i;
}));
}
// Will be ready when any future is ready
WhenAny(fs.begin(), fs.size()).Detach([](int i) {
// some work with database
});
Doesn't make more than 2 allocations regardless of input size.
Future unwrapping
yaclib::FairThreadPool tp_output{/*threads=*/1};
yaclib::FairThreadPool tp_compute{/*threads=CPU cores*/};
auto future = yaclib::Run(tp_output, [] {
std::cout << "Outer task" << std::endl;
return yaclib::Run(tp_compute, [] { return 42; });
}).Then(/*tp_compute*/ [](int result) {
result *= 13;
return yaclib::Run(tp_output, [result] {
std::cout << "Result = " << result << std::endl;
});
});
Sometimes it's necessary to return from one async function the result of the other. It would be possible with the wait on this result. But this would cause blocking of the thread while waiting for the task to complete.
This problem can be solved using future unwrapping: when an async function returns a Future object, instead of setting its result to the Future object, the inner Future will "replace" the outer Future. This means that the outer Future will complete when the inner Future finishes and will acquire the result of the inner Future.
It also doesn't require additional allocations.
Timed wait
yaclib::FairThreadPool tp{/*threads=*/4};
yaclib::Future<int> f1 = yaclib::Run(tp, [] { return 42; });
yaclib::Future<double> f2 = yaclib::Run(tp, [] { return 15.0; });
WaitFor(10ms, f1, f2); // or Wait / WaitUntil
if (f1.Ready()) {
Process(std::as_const(f1).Get());
yaclib::Result<int> res1 = std::as_const(f1).Get();
assert(f1.Valid()); // f1 valid here
}
if (f2.Ready()) {
Process(std::move(f2).Get());
assert(!f2.Valid()); // f2 invalid here
}
We support Wait/WaitFor/WaitUntil
.
Also all of them don't make allocation, and we have optimized the path for single Future
(used in Future::Get()
).
WaitGroup
yaclib::WaitGroup wg{1};
yaclib::FairThreadPool tp;
wg.Add(2/*default=1*/);
Submit(tp, [] {
wg.Done();
});
Submit(tp, [] {
wg.Done();
});
yaclib::Future<int> f1 = yaclib::Run(tp, [] {...});
wg.Attach(f1); // auto Done then Future became Ready
yaclib::Future<> f2 = yaclib::Run(tp, [] {...});
wg.Consume(std::move(f2)); // auto Done then Future became Ready
auto coro = [&] () -> yaclib::Future<> {
co_await On(tp);
co_await wg; // alias for co_await wg.Await(CurrentThreadPool());
std::cout << f1.Touch().Ok(); // Valid access to Result of Ready Future
};
auto coro_f = coro();
wg.Done(/*default=1*/);
wg.Wait();
Effective like simple atomic counter in intrusive pointer, also doesn't require any allocation.
Exception recovering
yaclib::FairThreadPool tp{/*threads=*/4};
auto f = yaclib::Run(tp, [] {
if (random() % 2) {
throw std::runtime_error{"1"};
}
return 42;
}).Then([](int y) {
if (random() % 2) {
throw std::runtime_error{"2"};
}
return y + 15;
}).Then([](int z) { // Will not run if we have any error
return z * 2;
}).Then([](std::exception_ptr e) { // Recover from error codes
try {
std::rethrow_exception(e);
} catch (const std::runtime_error& e) {
std::cout << e.what() << std::endl;
}
return 10; // Some default value
});
int x = std::move(f).Get().Value();
Error recovering
yaclib::FairThreadPool tp{/*threads=*/4};
auto f = yaclib::Run<std::error_code>(tp, [] {
if (random() % 2) {
return std::make_error_code(1);
}
return 42;
}).Then([](int y) {
if (random() % 2) {
return std::make_error_code(2);
}
return y + 15;
}).Then([](int z) { // Will not run if we have any error
return z * 2;
}).Then([](std::error_code ec) { // Recover from error codes
std::cout << ec.value() << std::endl;
return 10; // Some default value
});
int x = std::move(f).Get().Value();
Use Result for smart recovering
yaclib::FairThreadPool tp{/*threads=*/4};
auto f = yaclib::Run(tp, [] {
if (random() % 2) {
return std::make_error_code(1);
}
return 42;
}).Then([](int y) {
if (random() % 2) {
throw std::runtime_error{"2"};
}
return y + 15;
}).Then([](yaclib::Result<int>&& z) {
if (!z) {
return 10; // Some default value
}
return std::move(z).Value();
});
int x = std::move(f).Get().Value();
Requirements
YACLib is a static library, that uses CMake as a build system and requires a compiler with C++17 or newer.
If the library doesn't compile on some compiler satisfying this condition, please create an issue. Pull requests with fixes are welcome!
We can also try to support older standards. If you are interested in it, check this discussion.
We test following configurations:
Compiler\OS | Linux | Windows | macOS | Android |
---|---|---|---|---|
GCC | ||||
Clang | ||||
AppleClang | — | — | — | |
MSVC | — | — | — |
MinGW works in CI early, check this.
Releases
YACLib follows the Abseil Live at Head philosophy (update to the latest commit from the main branch as often as possible).
So we recommend using the latest commit in the main branch in your projects.
This is safe because we suggest compiling YACLib from source, and each commit in main goes through dozens of test runs in various configurations. Our test coverage is 100%, to simplify, we run tests on the cartesian product of possible configurations:
os x compiler x stdlib x sanitizer x fault injection backend
However, we realize this philosophy doesn't work for every project, so we also provide Releases.
We don't believe in SemVer (check this),
but we use a year.month.day[.patch]
versioning approach.
I'll release a new version if you ask, or I'll decide we have important or enough changes.
Contributing
We are always open for issues and pull requests. Check our good first issues.
For more details you can check the following links:
Thanks
-
Roman Lipovsky for an incredible course about concurrency, which gave us a lot of ideas for this library and for showing us how important to test concurrency correctly.
-
Paul E. McKenney for an incredible book about parallel programming, which gave me a lot of insight into memory models and how they relate to what's going on in hardware.
Contacts
You can contact us by my email: [email protected]
Or join our Discord Server
License
YACLib is made available under MIT License. See LICENSE file for details.
We would be glad if you let us know that you're using our library.