All Projects → pivovarit → Parallel Collectors

pivovarit / Parallel Collectors

Licence: apache-2.0
Parallel Collectors is a toolkit easing parallel collection processing in Java using Stream API.

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to Parallel Collectors

simplecov-parallel
Parallelism support for SimpleCov, currently only for CircleCI 1.0
Stars: ✭ 31 (-91.09%)
Mutual labels:  parallelism
inter-operator-scheduler
[MLSys 2021] IOS: Inter-Operator Scheduler for CNN Acceleration
Stars: ✭ 133 (-61.78%)
Mutual labels:  parallelism
Rabbitmq Sharding
Sharded logical queues for RabbitMQ: a queue type which provides improved parallelism and thoughput at the cost of total ordering
Stars: ✭ 279 (-19.83%)
Mutual labels:  parallelism
golang-101
🍺 In-depth internals, my personal notes, example codes and projects. Includes - Thousands of codes, OOP, Concurrency, Parallelism, Goroutines, Mutexes & Wait Groups, Testing in Go, Go tool chain, Backend web development, Some projects including Log file parser using bufio.Scanner, Spam Masker, Retro led clock, Console animations, Dictionary pro…
Stars: ✭ 61 (-82.47%)
Mutual labels:  parallelism
parallel stream
A parallelized stream implementation for Elixir
Stars: ✭ 86 (-75.29%)
Mutual labels:  parallelism
vercors
The VerCors verification toolset for verifying parallel and concurrent software
Stars: ✭ 30 (-91.38%)
Mutual labels:  parallelism
NPB-CPP
NAS Parallel Benchmark Kernels in C/C++. The parallel versions are in FastFlow, TBB, and OpenMP.
Stars: ✭ 18 (-94.83%)
Mutual labels:  parallelism
Async Techniques Python Course
Async Techniques and Examples in Python Course
Stars: ✭ 314 (-9.77%)
Mutual labels:  parallelism
knapsack pro-ruby
Knapsack Pro gem splits tests across parallel CI nodes and makes sure that tests will run in optimal time on each node.
Stars: ✭ 101 (-70.98%)
Mutual labels:  parallelism
Concurrency Glossary
🦑 Informal definitions of terms used in concurrency modeling
Stars: ✭ 276 (-20.69%)
Mutual labels:  parallelism
parquet2
Fastest and safest Rust implementation of parquet. `unsafe` free. Integration-tested against pyarrow
Stars: ✭ 157 (-54.89%)
Mutual labels:  parallelism
Async-Channel
Python async multi-task communication library. Used by OctoBot project.
Stars: ✭ 13 (-96.26%)
Mutual labels:  parallelism
parallel python
Code for Python Parallel Programming Cookbook by Giancarlo Zaccone
Stars: ✭ 29 (-91.67%)
Mutual labels:  parallelism
gemini
Sci-Fi galaxy simulation with heavy procedural generation focus
Stars: ✭ 25 (-92.82%)
Mutual labels:  parallelism
Bild
Image processing algorithms in pure Go
Stars: ✭ 3,431 (+885.92%)
Mutual labels:  parallelism
detox
distributed tox (tox plugin to run testenvs in parallel)
Stars: ✭ 48 (-86.21%)
Mutual labels:  parallelism
Paraphrase
Multi-core suitable Forth-like language
Stars: ✭ 27 (-92.24%)
Mutual labels:  parallelism
Datafuse
Datafuse is a free Cloud-Native Analytics DBMS(Inspired by ClickHouse) implemented in Rust
Stars: ✭ 327 (-6.03%)
Mutual labels:  parallelism
Weave
A state-of-the-art multithreading runtime: message-passing based, fast, scalable, ultra-low overhead
Stars: ✭ 305 (-12.36%)
Mutual labels:  parallelism
Onetbb
oneAPI Threading Building Blocks (oneTBB)
Stars: ✭ 3,284 (+843.68%)
Mutual labels:  parallelism

Java Stream API Parallel Collectors - overcoming limitations of standard Parallel Streams

GitHub Actions status GitHub Actions status Maven Central

follow on Twitter

Parallel Collectors is a toolkit easing parallel collection processing in Java using Stream API... but without limitations imposed by standard Parallel Streams.

list.stream()
  .collect(parallel(i -> foo(i), toList(), executor, parallelism))
    .orTimeout(1000, MILLISECONDS)
    .thenAcceptAsync(System.out::println, otherExecutor)
    .thenRun(() -> System.out.println("Finished!"));

They are:

  • lightweight (yes, you could achieve the same with Project Reactor, but that's often a hammer way too big for the job)
  • powerful (combined power of Stream API and CompletableFutures allows to specify timeouts, compose with other CompletableFutures, or just perform the whole processing asynchronously)
  • configurable (it's possible to provide your own Executor, parallelism)
  • non-blocking (no need to block the calling thread while waiting for the result to arrive)
  • short-circuiting (if one of the operations raises an exception, remaining tasks will get interrupted)
  • non-invasive (they are just custom implementations of Collector interface, no magic inside, zero-dependencies)
  • versatile (missing an API for your use case? process the resulting Stream with the whole generosity of Stream API by reusing already available Collectors)

Maven Dependencies

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>2.5.0</version>
</dependency>
Gradle
compile 'com.pivovarit:parallel-collectors:2.5.0'

Philosophy

Parallel Collectors are unopinionated by design, so it's up to their users to use them responsibly, which involves things like:

  • proper configuration of a provided Executor and its lifecycle management
  • choosing the appropriate parallelism level
  • making sure that the tool is applied in the right context

Make sure to read API documentation before using these in production.

Basic API

The main entrypoint is the com.pivovarit.collectors.ParallelCollectors class - which follows the convention established by java.util.stream.Collectors and features static factory methods returning custom java.util.stream.Collector implementations spiced up with parallel processing capabilities.

By design, it's obligatory to supply a custom Executor instance and manage its lifecycle.

All parallel collectors are one-off and must not be reused.

Available Collectors:

  • CompletableFuture<Collection<T>> parallel(Function, Collector, Executor, parallelism)

  • CompletableFuture<Stream<T>> parallel(Function, Executor, parallelism)

  • Stream<T> parallelToStream(Function, Executor, parallelism)

  • Stream<T> parallelToOrderedStream(Function, Executor, parallelism)

Batching Collectors

By default, all ExecutorService threads compete for each task separately - which results in a basic form of work-stealing, which, unfortunately, is not free, but can decrease processing time for subtasks with varying processing time.

However, if the processing time for all subtasks is similar, it might be better to distribute tasks in batches to avoid excessive contention:

Batching alternatives are available under the ParallelCollectors.Batching namespace.

Leveraging CompletableFuture

Parallel Collectors™ expose results wrapped in CompletableFuture instances which provides great flexibility and possibility of working with them in a non-blocking fashion:

CompletableFuture<List<String>> result = list.stream()
  .collect(parallel(i -> foo(i), toList(), executor));

This makes it possible to conveniently apply callbacks, and compose with other CompletableFutures:

list.stream()
  .collect(parallel(i -> foo(i), toSet(), executor))
  .thenAcceptAsync(System.out::println, otherExecutor)
  .thenRun(() -> System.out.println("Finished!"));

Or just join() if you just want to block the calling thread and wait for the result:

List<String> result = list.stream()
  .collect(parallel(i -> foo(i), toList(), executor))
  .join();

What's more, since JDK9, you can even provide your own timeout easily.

Examples

1. Apply i -> foo(i) in parallel on a custom Executor and collect to List
Executor executor = ...

CompletableFuture<List<String>> result = list.stream()
  .collect(parallel(i -> foo(i), toList(), executor));
2. Apply i -> foo(i) in parallel on a custom Executor with max parallelism of 4 and collect to Set
Executor executor = ...

CompletableFuture<Set<String>> result = list.stream()
  .collect(parallel(i -> foo(i), toSet(), executor, 4));
3. Apply i -> foo(i) in parallel on a custom Executor and collect to LinkedList
Executor executor = ...

CompletableFuture<List<String>> result = list.stream()
  .collect(parallel(i -> foo(i), toCollection(LinkedList::new), executor));
4. Apply i -> foo(i) in parallel on a custom Executor and stream results in completion order
Executor executor = ...

list.stream()
  .collect(parallelToStream(i -> foo(i), executor))
  .forEach(i -> ...);
5. Apply i -> foo(i) in parallel on a custom Executor and stream results in original order
Executor executor = ...

list.stream()
  .collect(parallelToOrderedStream(i -> foo(i), executor))
  .forEach(i -> ...);

Rationale

Stream API is a great tool for collection processing, especially if you need to parallelize execution of CPU-intensive tasks, for example:

public static void parallelSetAll(int[] array, IntUnaryOperator generator) {
    Objects.requireNonNull(generator);
    IntStream.range(0, array.length).parallel().forEach(i -> { array[i] = generator.applyAsInt(i); });
}

However, Parallel Streams execute tasks on a shared ForkJoinPool instance.

Unfortunately, it's not the best choice for running blocking operations even when using ManagedBlocker - as explained here by Tagir Valeev) - this could easily lead to the saturation of the common pool, and to a performance degradation of everything that uses it.

For example:

List<String> result = list.parallelStream()
  .map(i -> foo(i)) // runs implicitly on ForkJoinPool.commonPool()
  .collect(Collectors.toList());

In order to avoid such problems, the solution is to isolate blocking tasks and run them on a separate thread pool... but there's a catch.

Sadly, Streams can only run parallel computations on the common ForkJoinPool which effectively restricts the applicability of them to CPU-bound jobs.

However, there's a trick that allows running parallel Streams in a custom FJP instance... but it's not considered reliable:

Note, however, that this technique of submitting a task to a fork-join pool to run the parallel stream in that pool is an implementation "trick" and is not guaranteed to work. Indeed, the threads or thread pool that is used for execution of parallel streams is unspecified. By default, the common fork-join pool is used, but in different environments, different thread pools might end up being used.

Says Stuart Marks on StackOverflow.

Not even mentioning that this approach was seriously flawed before JDK-10 - if a Stream was targeted towards another pool, splitting would still need to adhere to the parallelism of the common pool, and not the one of the targeted pool [JDK8190974].

Dependencies

None - the library is implemented using core Java libraries.

Limitations

Upstream Stream is always evaluated as a whole, even if the following operation is short-circuiting. This means that none of these should be used for working with infinite streams.

This limitation is imposed by the design of the Collector API.

Good Practices

  • Consider providing reasonable timeouts for CompletableFutures in order to not block for unreasonably long in case when something bad happens (how-to)
  • Name your thread pools - it makes debugging easier (how-to)
  • Limit the size of a working queue of your thread pool (source)
  • Limit the level of parallelism (source)
  • A no-longer-used ExecutorService should be shut down to allow reclamation of its resources
  • Keep in mind that CompletableFuture#then(Apply|Combine|Consume|Run|Accept) might be executed by the calling thread. If this is not suitable, use CompletableFuture#then(Apply|Combine|Consume|Run|Accept)Async instead, and provide a custom executor instance.

Words of Caution

Even if this tool makes it easy to parallelize things, it doesn't always mean that you should. Parallelism comes with a price that can be often higher than not using it at all. Threads are expensive to create, maintain and switch between, and you can only create a limited number of them.

It's essential to follow up on the root cause and double-check if parallelism is the way to go.

It often turns out that the root cause can be addressed by using a simple JOIN statement, batching, reorganizing your data... or even just by choosing a different API method.


See CHANGELOG.MD for a complete version history.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].