All Projects → confluentinc → Parallel Consumer

confluentinc / Parallel Consumer

Licence: apache-2.0
Parallel Apache Kafka client wrapper with client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to Parallel Consumer

Jocko
Kafka implemented in Golang with built-in coordination (No ZK dep, single binary install, Cloud Native)
Stars: ✭ 4,445 (+2786.36%)
Mutual labels:  kafka, messaging
Rafka
Kafka proxy with a simple API, speaking the Redis protocol
Stars: ✭ 49 (-68.18%)
Mutual labels:  kafka, kafka-consumer
Kq
Kafka-based Job Queue for Python
Stars: ✭ 530 (+244.16%)
Mutual labels:  kafka, kafka-consumer
Kafka Webview
Full-featured web-based Apache Kafka consumer UI
Stars: ✭ 319 (+107.14%)
Mutual labels:  kafka, kafka-consumer
Kukulcan
A REPL for Apache Kafka
Stars: ✭ 103 (-33.12%)
Mutual labels:  kafka, kafka-consumer
Racecar
Racecar: a simple framework for Kafka consumers in Ruby
Stars: ✭ 327 (+112.34%)
Mutual labels:  kafka, kafka-consumer
Hermes
Fast and reliable message broker built on top of Kafka.
Stars: ✭ 665 (+331.82%)
Mutual labels:  kafka, messaging
Strimzi Kafka Operator
Apache Kafka running on Kubernetes
Stars: ✭ 2,833 (+1739.61%)
Mutual labels:  kafka, messaging
Kafka Php
kafka php client
Stars: ✭ 1,340 (+770.13%)
Mutual labels:  kafka, kafka-consumer
Karafka
Framework for Apache Kafka based Ruby and Rails applications development.
Stars: ✭ 1,223 (+694.16%)
Mutual labels:  kafka, kafka-consumer
Trubka
A CLI tool for Kafka
Stars: ✭ 296 (+92.21%)
Mutual labels:  kafka, kafka-consumer
Azure Event Hubs For Kafka
Azure Event Hubs for Apache Kafka Ecosystems
Stars: ✭ 124 (-19.48%)
Mutual labels:  kafka, messaging
Qbusbridge
The Apache Kafka Client SDK
Stars: ✭ 272 (+76.62%)
Mutual labels:  kafka, kafka-consumer
Ockam
End-to-end encrypted messaging and mutual authentication between cloud and edge-device applications
Stars: ✭ 395 (+156.49%)
Mutual labels:  kafka, messaging
blaster
Web hooks for message queues
Stars: ✭ 14 (-90.91%)
Mutual labels:  messaging, kafka-consumer
Librdkafka
The Apache Kafka C/C++ library
Stars: ✭ 5,617 (+3547.4%)
Mutual labels:  kafka, kafka-consumer
Laravel Queue
Laravel Enqueue message queue extension. Supports AMQP, Amazon SQS, Kafka, Google PubSub, Redis, STOMP, Gearman, Beanstalk and others
Stars: ✭ 155 (+0.65%)
Mutual labels:  kafka, messaging
Decaton
High throughput asynchronous task processing on Apache Kafka
Stars: ✭ 187 (+21.43%)
Mutual labels:  kafka, kafka-consumer
Kattlo Cli
Kattlo CLI Project
Stars: ✭ 58 (-62.34%)
Mutual labels:  kafka, kafka-consumer
Slimmessagebus
Lightweight message bus interface for .NET (pub/sub and request-response) with transport plugins for popular message brokers.
Stars: ✭ 120 (-22.08%)
Mutual labels:  kafka, messaging

// // STOP!!! Make sure you're editing the TEMPLATE version of the README, in /src/docs/README.adoc // // Do NOT edit /README.adoc as your changes will be overwritten when the template is rendered again during // process-sources. // // Changes made to this template, must then be rendered to the base readme, by running mvn process-sources // // To render the README directly, run mvn asciidoc-template::build //

= parallel-consumer (beta) :icons: :toc: macro :toclevels: 3 :numbered: 1 :sectlinks: true :sectanchors: true

:github_name: parallel-consumer :base_url: https://github.com/confluentinc/{github_name} :issues_link: {base_url}/issues

// dynamic include base for editing in IDEA :project_root: ./

// uncomment the following if not using IDEA or having issues, for editing the template to see the includes // note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e. // leave it commented out when committing work) //:project_root: ../../

ifdef::env-github[] :tip-caption: 💡 :note-caption: ℹ️ :important-caption: ❗️ :caution-caption: 🔥 :warning-caption: ⚠️ endif::[]

image:https://maven-badges.herokuapp.com/maven-central/io.confluent.parallelconsumer/parallel-consumer-parent/badge.svg?style=flat[link=https://mvnrepository.com/artifact/io.confluent.parallelconsumer/parallel-consumer-parent,Latest Parallel Consumer on Maven Central]

image:https://github.com/confluentinc/parallel-consumer/actions/workflows/maven.yml/badge.svg[Java 8 Unit Test GitHub] + ^(^^full^ ^test^ ^suite^ ^currently^ ^running^ ^only^ ^on^ ^Confluent^ ^internal^ ^CI^ ^server^^)^

// travis badges temporarily disabled as travis isn't running CI currently //image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"] image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]

Parallel Apache Kafka client wrapper with client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.

WARNING: This is not a Confluent supported product. It is an experimental alpha stage accelerator. See the <> section for more information.

[[intro]] This library lets you process messages in parallel via a single Kafka Consumer meaning you can increase consumer parallelism without increasing the number of partitions in the topic you intend to process. For many use cases this improves both throughput and latency by reducing load on your brokers. It also opens up new use cases like extreme parallelism, external data enrichment, and queuing.

.Consume many messages concurrently with a single consumer instance: [source,java,indent=0]

    parallelConsumer.poll(record ->
            log.info("Concurrently processing a record: {}", record)
    );

toc::[]

== Motivation

=== Why would I need this?

The unit of parallelism in Kafka’s consumers is the partition but sometimes you want to break away from this approach and manage parallelism yourself using threads rather than new instances of a Consumer. Notable use cases include:

  • Where partition counts are difficult to change and you need more parallelism than the current configuration allows.

  • You wish to avoid over provisioning partitions in topics due to unknown future requirements.

  • You wish to reduce the broker-side resource utilization associated with highly-parallel consumer groups.

  • You need queue-like semantics that use message level acknowledgment, for example to process a work queue with short- and long-running tasks.

When reading the below, keep in mind that the unit of concurrency and thus performance, is restricted by the number of partitions (degree of sharding / concurrency). Currently, you can't adjust the number of partitions in your Kafka topics without jumping through a lot of hoops, or breaking your key ordering.

==== Before .The slow consumer situation with the raw Apache Kafka Consumer client image::https://lucid.app/publicSegments/view/98ad200f-97b2-479b-930c-2805491b2ce7/image.png[align="center"]

==== After .Example usage of the Parallel Consumer image::https://lucid.app/publicSegments/view/2cb3b7e2-bfdf-4e78-8247-22ec394de965/image.png[align="center"]

=== Background

The core Kafka consumer client gives you a batch of messages to process one at a time. Processing these in parallel on thread pools is difficult, particularly when considering offset management and strong ordering guarantees. You also need to manage your consume loop, and commit transactions properly if using Exactly Once semantics.

This wrapper library for the Apache Kafka Java client handles all this for you, you just supply your processing function.

Another common situation where concurrent processing of messages is advantageous, is what is referred to as "competing consumers". A pattern that is often addressed in traditional messaging systems using a shared queue. Kafka doesn't provide native queue support and this can result in a slow processing message blocking the messages behind it in the same partition. If <<ordering-guarantees,log ordering>> isn't a concern this can be an unwelcome bottleneck for users. The Parallel Consumer provides a solution to this problem.

In addition, the <<http-with-vertx,Vert.x extension>> to this library supplies non-blocking interfaces, allowing higher still levels of concurrency with a further simplified interface.

=== FAQ [qanda] Why not just run more consumers?:: The typical way to address performance issues in a Kafka system, is to increase the number of consumers reading from a topic. This is effective in many situations, but falls short in a lot too.

  • Primarily: You cannot use more consumers than you have partitions available to read from. For example, if you have a topic with five partitions, you cannot use a group with more than five consumers to read from it.
  • Running more extra consumers has resource implications - each consumer takes up resources on both the client and broker side. Each consumer adds a lot of overhead in terms of memory, CPU, and network bandwidth.
  • Large consumer groups (especially many large groups) can cause a lot of strain on the consumer group coordination system, such as rebalance storms.
  • Even with several partitions, you cannot achieve the performance levels obtainable by per-key ordered or unordered concurrent processing.
  • A single slow or failing message will also still block all messages behind the problematic message, ie. the entire partition. The process may recover, but the latency of all the messages behind the problematic one will be negatively impacted severely.

Why not run more consumers within your application instance?::

  • This is in some respects a slightly easier way of running more consumer instances, and in others a more complicated way. However, you are still restricted by all the per consumer restrictions as described above.

Why not use the Vert.x library yourself in your processing loop?::

  • Vert.x us used in this library to provide a non-blocking IO system in the message processing step. Using Vert.x without using this library with ordered processing requires dealing with the quite complicated, and not straight forward, aspect of handling offset commits with Vert.x asynchronous processing system.

Unordered processing with Vert.x is somewhat easier, however offset management is still quite complicated, and the Parallel Consumer also provides optimizations for message-level acknowledgment in this case. This library handles offset commits for both ordered and unordered processing cases.

=== Scenarios

Below are some real world use cases which illustrate concrete situations where the described advantages massively improve performance.

  • Slow consumer systems in transactional systems (online vs offline or reporting systems) ** Notification system:

*** Notification processing system which sends push notifications to a user to acknowledge a two-factor authentication request on their mobile and authorising a login to a website, requires optimal end-to-end latency for a good user experience. *** A specific message in this queue uncharacteristically takes a long time to process because the third party system is sometimes unpredictably slow to respond and so holds up the processing for ALL other notifications for other users that are in the same partition behind this message. *** Using key order concurrent processing will allow notifications to proceed while this message either slowly succeeds or times out and retires. ** Slow GPS tracking system (slow HTTP service interfaces that can scale horizontally) *** GPS tracking messages from 100,000 different field devices pour through at a high rate into an input topic. *** For each message, the GPS location coordinates is checked to be within allowed ranges using a legacy HTTP services, dictated by business rules behind the service. *** The service takes 50ms to process each message, however can be scaled out horizontally without restriction. *** The input topic only has 10 partitions and for various reasons (see above) cannot be changed. *** With the vanilla consumer, messages on each partition must be consumed one after the other in serial order. *** The maximum rate of message processing is then: + 1 second / 50 ms * 10 partitions = 200 messages per second. *** By using this library, the 10 partitions can all be processed in key order. + 1 second / 50ms × 100,000 keys = 2,000,000 messages per second + While the HTTP system probably cannot handle 2,000,000 messages per second, more importantly, your system is no longer the bottleneck.

** Slow CPU bound model processing for fraud prediction *** Consider a system where message data is passed through a fraud prediction model which takes CPU cycles, instead of an external system being slow. *** We can scale easily the number of CPUs on our virtual machine where the processing is being run, but we choose not to scale the partitions or consumers (see above). *** By deploying onto machines with far more CPUs available, we can run our prediction model massively parallel, increasing our throughput and reducing our end-to-end response times.

  • Spikey load with latency sensitive non-functional requirements ** An upstream system regularly floods our input topic daily at close of business with settlement totals data from retail outlets. *** Situations like this are common where systems are designed to comfortably handle average day time load, but are not provisioned to handle sudden increases in traffic as they don't happen often enough to justify the increased spending on processing capacity that would otherwise remain idle. *** Without adjusting the available partitions or running consumers, we can reduce our maximum end-to-end latency and increase throughout to get our global days outlet reports to division managers so action can be taken, before close of business. ** Natural consumer behaviour *** Consider scenarios where bursts of data flooding input topics are generated by sudden user behaviour such as sales or television events ("Oprah" moments). *** For example, an evening, prime-time game show on TV where users send in quiz answers on their devices. The end-to-end latency of the responses to these answers needs to be as low as technically possible, even if the processing step is quick. *** Instead of a vanilla client where each user response waits in a virtual queue with others to be processed, this library allows every single response to be processed in parallel.
  • Legacy partition structure ** Any existing setups where we need higher performance either in throughput or latency where there are not enough partitions for needed concurrency level, the tool can be applied.
  • Partition overloaded brokers ** Clusters with under-provisioned hardware and with too many partitions already - where we cannot expand partitions even if we were able to. ** Similar to the above, but from the operations perspective, our system is already over partitioned, perhaps in order to support existing parallel workloads which aren't using the tool (and so need large numbers of partitions). ** We encourage our development teams to migrate to the tool, and then being a process of actually lowering the number of partitions in our partitions in order to reduce operational complexity, improve reliability and perhaps save on infrastructure costs.
  • Server side resources are controlled by a different team we can't influence ** The cluster our team is working with is not in our control, we cannot change the partition setup, or perhaps even the consumer layout. ** We can use the tool ourselves to improve our system performance without touching the cluster / topic setup.
  • Kafka Streams app that had a slow stage ** We use Kafka Streams for our message processing, but one of it's steps have characteristics of the above and we need better performance. We can break out as described below into the tool for processing that step, then return to the Kafka Streams context.
  • Provisioning extra machines (either virtual machines or real machines) to run multiple clients has a cost, using this library instead avoids the need for extra instances to be deployed in any respect.

== Feature List

  • Have massively parallel consumption processing without running hundreds or thousands of ** Kafka consumer clients ** topic partitions

without operational burden or harming the clusters performance

  • Client side queueing system on top of Apache Kafka consumer ** Efficient individual message acknowledgement system (without local or third party external system state storage) to massively reduce (and usually completely eliminate) message replay upon failure - see <<offset_map>> section for more details
  • Per key concurrent processing, per partition and unordered message processing
  • Offsets committed correctly, in order, of only processed messages, regardless of concurrency level or retries
  • Vert.x non-blocking library integration (HTTP currently)
  • Fair partition traversal
  • Zero~ dependencies (Slf4j and Lombok) for the core module
  • Java 8 compatibility
  • Throttle control and broker liveliness management
  • Clean draining shutdown cycle

//image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer] //image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"]

And more <<roadmap,to come>>!

== Performance

In the best case, you don't care about ordering at all.In which case, the degree of concurrency achievable is simply set by max thread and concurrency settings, or with the Vert.x extension, the Vert.x Vertical being used - e.g. non-blocking HTTP calls.

For example, instead of having to run 1,000 consumers to process 1,000 messages at the same time, we can process all 1,000 concurrently on a single consumer instance.

More typically though you probably still want the per key ordering grantees that Kafka provides. For this there is the per key ordering setting. This will limit the library from processing any message at the same time or out of order, if they have the same key.

Massively reduce message processing latency regardless of partition count for spikey workloads where there is good key distribution. Eg 100,000 “users” all trigger an action at once. As long as the processing layer can handle the load horizontally (e.g auto scaling web service), per message latency will be massively decreased, potentially down to the time for processing a single message, if the integration point can handle the concurrency.

For example, if you have a key set of 10,000 unique keys, and you need to call an http endpoint to process each one, you can use the per key order setting, and in the best case the system will process 10,000 at the same time using the non-blocking Vert.x HTTP client library. The user just has to provide a function to extract from the message the HTTP call parameters and construct the HTTP request object.

=== Illustrative Performance Example .(see link:./parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VolumeTests.java[VolumeTests.java])

These performance comparison results below, even though are based on real performance measurement results, are for illustrative purposes. To see how the performance of the tool is related to instance counts, partition counts, key distribution and how it would relate to the vanilla client. Actual results will vary wildly depending upon the setup being deployed into.

For example, if you have hundreds of thousands of keys in your topic, randomly distributed, even with hundreds of partitions, with only a handful of this wrapper deployed, you will probably see many orders of magnitude performance improvements - massively out performing dozens of vanilla Kafka consumer clients.

.Time taken to process a large number of messages with a Single Parallel Consumer vs a single Kafka Consumer, for different key space sizes. As the number of unique keys in the data set increases, the key ordered Parallel Consumer performance starts to approach that of the unordered Parallel Consumer. The raw Kafka consumer performance remains unaffected by the key distribution. image::https://docs.google.com/spreadsheets/d/e/2PACX-1vQffkAFG-_BzH-LKfGCVnytdzAHiCNIrixM6X2vF8cqw2YVz6KyW3LBXTB-lVazMAJxW0UDuFILKvtK/pubchart?oid=1691474082&amp;format=image[align="center"]

.Consumer group size effect on total processing time vs a single Parallel Consumer. As instances are added to the consumer group, it's performance starts to approach that of the single instance Parallel Consumer. Key ordering is faster than partition ordering, with unordered being the fastest. image::https://docs.google.com/spreadsheets/d/e/2PACX-1vQffkAFG-_BzH-LKfGCVnytdzAHiCNIrixM6X2vF8cqw2YVz6KyW3LBXTB-lVazMAJxW0UDuFILKvtK/pubchart?oid=938493158&format=image[align="center"]

.Consumer group size effect on message latency vs a single Parallel Consumer. As instances are added to the consumer group, it's performance starts to approach that of the single instance Parallel Consumer. image::https://docs.google.com/spreadsheets/d/e/2PACX-1vQffkAFG-_BzH-LKfGCVnytdzAHiCNIrixM6X2vF8cqw2YVz6KyW3LBXTB-lVazMAJxW0UDuFILKvtK/pubchart?oid=1161363385&format=image[align="center"]

As an illustrative example of relative performance, given:

  • A random processing time between 0 and 5ms
  • 10,000 messages to process
  • A single partition (simplifies comparison - a topic with 5 partitions is the same as 1 partition with a keyspace of 5)
  • Default ParallelConsumerOptions ** maxUncommittedMessagesToHandle = 1000 ** maxConcurrency = 100 ** numberOfThreads = 16

.Comparative performance of order modes and key spaces [cols="1,1,1,3", options="header"] |=== |Ordering |Number of keys |Duration |Note

|Partition |20 (not relevant) |22.221s |This is the same as a single partition with a single normal serial consumer, as we can see: 2.5ms avg processing time * 10,000 msg / 1000ms = ~25s.

|Key |1 |26.743s |Same as above

|Key |2 |13.576s |

|Key |5 |5.916s |

|Key |10 |3.310s |

|Key |20 |2.242s |

|Key |50 |2.204s |

|Key |100 |2.178s |

|Key |1,000 |2.056s |

|Key |10,000 |2.128s |As key space is t he same as the number of messages, this is similar (but restricted by max concurrency settings) as having a single consumer instance and partition per key. 10,000 msgs * avg processing time 2.5ms = ~2.5s.

|Unordered |20 (not relevant) |2.829s |As there is no order restriction, this is similar (but restricted by max concurrency settings) as having a single consumer instance and partition per key. 10,000 msgs * avg processing time 2.5ms = ~2.5s. |===

== Support and Issues

WARNING: This library is experimental, and Confluent does not currently offer support for this library.

If you encounter any issues, or have any suggestions or future requests, please create issues in the {issues_link}[github issue tracker]. Issues will be dealt with on a good faith, best efforts basis, by the small team maintaining this library.

We also encourage participation, so if you have any feature ideas etc, please get in touch, and we will help you work on submitting a PR!

NOTE: We are very interested to hear about your experiences!

If you have questions, head over to the https://launchpass.com/confluentcommunity[Confluent Slack community], or raise an https://github.com/confluentinc/parallel-consumer/issues[issue] on GitHub.

== License

This library is copyright Confluent Inc, and licensed under the Apache License Version 2.0.

== Usage

=== Maven

This project is available in maven central, https://repo1.maven.org/maven2/io/confluent/parallelconsumer/[repo1].

Latest version can be seen https://search.maven.org/artifact/io.confluent.parallelconsumer/parallel-consumer-core[here].

Where ${project.version} is the version to be used:

.Core Module Dependency [source,xml,indent=0] io.confluent.parallelconsumer parallel-consumer-core ${project.version}

.Vert.x Module Dependency [source,xml,indent=0] io.confluent.parallelconsumer parallel-consumer-vertx ${project.version}

=== Common Preparation

.Setup the client [source,java,indent=0]

    Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <1>
    Producer<String, String> kafkaProducer = getKafkaProducer();

    var options = ParallelConsumerOptions.<String, String>builder()
            .ordering(KEY) // <2>
            .maxConcurrency(1000) // <3>
            .consumer(kafkaConsumer)
            .producer(kafkaProducer)
            .build();

    ParallelStreamProcessor<String, String> eosStreamProcessor =
            ParallelStreamProcessor.createEosStreamProcessor(options);

    eosStreamProcessor.subscribe(of(inputTopic)); // <4>

    return eosStreamProcessor;

<1> Setup your clients as per normal. A Producer is only required if using the produce flows. <2> Choose your ordering type, KEY in this case. This ensures maximum concurrency, while ensuring messages are processed and committed in KEY order, making sure no offset is committed unless all offsets before it in it's partition, are completed also. <3> The maximum number of concurrent processing operations to be performing at any given time. Also, because the library coordinates offsets, enable.auto.commit must be disabled in your consumer. <5> Subscribe to your topics

NOTE: Because the library coordinates offsets, enable.auto.commit must be disabled.

After this setup, one then has the choice of interfaces:

  • ParallelStreamProcessor
  • VertxParallelStreamProcessor
  • JStreamParallelStreamProcessor
  • JStreamVertxParallelStreamProcessor

There is another interface: ParallelConsumer which is integrated, however there is currently no immediate implementation. See {issues_link}/12[issue #12], and the ParallelConsumer JavaDoc:

[source,java]

/**

  • Asynchronous / concurrent message consumer for Kafka.
  • Currently there is no direct implementation, only the {@link ParallelStreamProcessor} version (see {@link
  • ParallelEoSStreamProcessor}), but there may be in the future.
  • @param key consume / produce key type
  • @param value consume / produce value type
  • @see ParallelEoSStreamProcessor
  • @see #poll(Consumer) */

=== Core

==== Simple Message Process

This is the only thing you need to do, in order to get massively concurrent processing in your code.

.Usage - print message content out to the console in parallel [source,java,indent=0] parallelConsumer.poll(record -> log.info("Concurrently processing a record: {}", record) );

See the link:./parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java[core example] project, and it's test.

==== Process and Produce a Response Message

This interface allows you to process your message, then publish back to the broker zero, one or more result messages. You can also optionally provide a callback function to be run after the message(s) is(are) successfully published to the broker.

.Usage - print message content out to the console in parallel [source,java,indent=0] parallelConsumer.pollAndProduce(record -> { var result = processBrokerRecord(record); return new ProducerRecord<>(outputTopic, record.key(), result.payload); }, consumeProduceResult -> { log.debug("Message {} saved to broker at offset {}", consumeProduceResult.getOut(), consumeProduceResult.getMeta().offset()); } );

==== Callbacks vs Streams

You have the option to either use callbacks to be notified of events, or use the Streaming versions of the API, which use the java.util.stream.Stream system:

  • JStreamParallelStreamProcessor
  • JStreamVertxParallelStreamProcessor

In future versions, we plan to look at supporting other streaming systems like https://github.com/ReactiveX/RxJava[RxJava] via modules.

[[http-with-vertx]] === HTTP with the Vert.x Module

.Call an HTTP endpoint for each message usage [source,java,indent=0]

    var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> {
        log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
        Map<String, String> params = UniMaps.of("recordKey", record.key(), "payload", record.value());
        return new RequestInfo("localhost", port, "/api", params); // <1>
    });

<1> Simply return an object representing the request, the Vert.x HTTP engine will handle the rest, using it's non-blocking engine

See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java[Vert.x example] project, and it's test.

[[streams-usage-code]] === Kafka Streams Concurrent Processing

Use your Streams app to process your data first, then send anything needed to be processed concurrently to an output topic, to be consumed by the parallel consumer.

.Example usage with Kafka Streams image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"]

.Preprocess in Kafka Streams, then process concurrently [source,java,indent=0]

void run() {
    preprocess(); // <1>
    concurrentProcess(); // <2>
}

void preprocess() {
    StreamsBuilder builder = new StreamsBuilder();
    builder.<String, String>stream(inputTopic)
            .mapValues((key, value) -> {
                log.info("Streams preprocessing key: {} value: {}", key, value);
                return String.valueOf(value.length());
            })
            .to(outputTopicName);

    startStreams(builder.build());
}

void startStreams(Topology topology) {
    streams = new KafkaStreams(topology, getStreamsProperties());
    streams.start();
}

void concurrentProcess() {
    setupParallelConsumer();

    parallelConsumer.poll(record -> {
        log.info("Concurrently processing a record: {}", record);
        messageCount.getAndIncrement();
    });
}

<1> Setup your Kafka Streams stage as per normal, performing any type of preprocessing in Kafka Streams <2> For the slow consumer part of your Topology, drop down into the parallel consumer, and use massive concurrency

See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[Kafka Streams example] project, and it's test.

[[ordering-guarantees]] == Ordering Guarantees

The user has the option to either choose ordered, or unordered message processing.

Either in ordered or unordered processing, the system will only commit offsets for messages which have been successfully processed.

CAUTION: Unordered processing could cause problems for third party integration where ordering by key is required.

CAUTION: Beware of third party systems which are not idempotent, or are key order sensitive.

IMPORTANT: The below diagrams represent a single iteration of the system and a very small number of input partitions and messages.

=== Vanilla Kafka Consumer Operation

Given this input topic with three partitions and a series of messages:

.Input topic image::https://lucid.app/publicSegments/view/37d13382-3067-4c93-b521-7e43f2295fff/image.png[align="center"]

The normal Kafka client operations in the following manner. Note that typically offset commits are not performed after processing a single message, but is illustrated in this manner for comparison to the single pass concurrent methods below. Usually many messages are committed in a single go, which is much more efficient, but for our illustrative purposes is not really relevant, as we are demonstration sequential vs concurrent processing messages.

.Normal execution of the raw Kafka client image::https://lucid.app/publicSegments/view/0365890d-e8ff-4a06-b24a-8741175dacc3/image.png[align="center"]

=== Unordered

Unordered processing is where there is no restriction on the order of multiple messages processed per partition, allowing for highest level of concurrency.

This is the fastest option.

.Unordered concurrent processing of message image::https://lucid.app/publicSegments/view/aab5d743-de05-46d0-8c1e-0646d7d2946f/image.png[align="center"]

=== Ordered by Partition

At most only one message from any given input partition will be in flight at any given time. This means that concurrent processing is restricted to the number of input partitions.

The advantage of ordered processing mode, is that for an assignment of 1000 partitions to a single consumer, you do not need to run 1000 consumer instances or threads, to process the partitions in parallel.

Note that for a given partition, a slow processing message will prevent messages behind it from being processed. However, messages in other partitions assigned to the consumer will continue processing.

This option is most like normal operation, except if the consumer is assigned more than one partition, it is free to process all partitions in parallel.

.Partition ordered concurrent processing of messages image::https://lucid.app/publicSegments/view/30ad8632-e8fe-4e05-8afd-a2b6b3bab309/image.png[align="center"]

=== Ordered by Key

Most similar to ordered by partition, this mode ensures process ordering by key (per partition).

The advantage of this mode, is that a given input topic may not have many partitions, it may have a ~large number of unique keys. Each of these key -> message sets can actually be processed concurrently, bringing concurrent processing to a per key level, without having to increase the number of input partitions, whilst keeping strong ordering by key.

As usual, the offset tracking will be correct, regardless of the ordering of unique keys on the partition or adjacency to the committed offset, such that after failure or rebalance, the system will not replay messages already marked as successful.

This option provides the performance of maximum concurrency, while maintaining message processing order per key, which is sufficient for many applications.

.Key ordering concurrent processing of messages image::https://lucid.app/publicSegments/view/f7a05e99-24e6-4ea3-b3d0-978e306aa568/image.png[align="center"]

=== Retries and Ordering

Even during retries, offsets will always be committed only after successful processing, and in order.

== Result Models

  • Void

Processing is complete simply when your provided function finishes, and the offsets are committed.

  • Streaming User Results

When your function is actually run, a result object will be streamed back to your client code, with information about the operation completion.

  • Streaming Message Publishing Results

After your operation completes, you can also choose to publish a result message back to Kafka. The message publishing metadata can be streamed back to your client code.

== Commit Mode

The system gives you three choices for how to do offset commits. The simplest of the three are the two Consumer commits modes. They are of course, synchronous and asynchronous mode. The transactional mode is explained in the next section.

`Asynchronous mode is faster, as it doesn't block the control loop.

Synchronous will block the processing loop until a successful commit response is received, however,Asynchronouswill still be capped by the max processing settings in theOptions` class.

If you're used to using the auto commit mode in the normal Kafka consumer, you can think of the Asynchronous mode being similar to this. We suggest starting with this mode, and it is the default.

=== Apache Kafka EoS Transaction Model

There is also the option to use Kafka's Exactly Once Semantics (EoS) system. This causes all messages produced as a result of processing a message to be committed within a transaction, along with their source offset. This means that even under failure, the results will exist exactly once in the Kafka output topic. If as a part of your processing, you create side effects in other systems, this pertains to the usual idempotency requirements when breaking of EoS Kafka boundaries.

NOTE:: As with the synchronous processing mode, this will also block the processing loop until a successful transaction completes

CAUTION: This cannot be true for any externally integrated third party system, unless that system is idempotent.

For implementations details, see the <> section.

[[streams-usage]] == Using with Kafka Streams

Kafka Streams (KS) doesn't yet (https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams[KIP-311], https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams[KIP-408]) have parallel processing of messages. However, any given preprocessing can be done in KS, preparing the messages. One can then use this library to consume from an input topic, produced by KS to process the messages in parallel.

For a code example, see the <> section.

.Example usage with Kafka Streams image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"]

[[roadmap]] == Roadmap

For released changes, see the link:CHANGELOG.adoc[CHANGELOG].

=== Short Term - What we're working on nowish ⏰

  • Depth~ first or breadth first partition traversal
  • JavaRX and other streaming modules

=== Medium Term - What's up next ⏲

=== Long Term - The future ☁️

  • Apache Kafka KIP?
  • Call backs only offset has been committed

== Usage Requirements

  • Client side ** JDK 8 ** SLF4J ** Apache Kafka (AK) Client libraries 2.5 ** Supports all features of the AK client (e.g. security setups, schema registry etc) ** For use with Streams, see <> section ** For use with Connect: *** Source: simply consume from the topic that your Connect plugin is publishing to *** Sink: use the poll and producer style API and publish the records to the topic that the connector is sinking from
  • Server side ** Should work with any cluster that the linked AK client library works with *** If using EoS/Transactions, needs a cluster setup that supports EoS/transactions

== Development Information

=== Requirements

=== Notes

The unit test code is set to run at a very high frequency, which can make it difficult to read debug logs (or impossible). If you want to debug the code or view the main logs, consider changing the below:

// replace with code inclusion from readme branch .ParallelEoSStreamProcessorTestBase [source]

ParallelEoSStreamProcessorTestBase#DEFAULT_BROKER_POLL_FREQUENCY_MS ParallelEoSStreamProcessorTestBase#DEFAULT_COMMIT_INTERVAL_MAX_MS

=== Readme

The README uses a special https://github.com/whelk-io/asciidoc-template-maven-plugin/pull/25[custom maven processor plugin] to import live code blocks into the root readme, so that GitHub can show the real code as includes in the README. This is because GitHub https://github.com/github/markup/issues/1095[doesn't properly support the include directive].

The source of truth readme is in link:{project_root}/src/docs/README.adoc[].

=== Maven targets

[qanda] Compile and run all tests:: mvn verify

Run tests excluding the integration tests:: mvn test

Run all tests:: mvn verify

Run any goal skipping tests (replace <goalName> e.g. install):: mvn <goalName> -DskipTests

See what profiles are active:: mvn help:active-profiles

See what plugins or dependencies are available to be updated:: mvn versions:display-plugin-updates versions:display-property-updates versions:display-dependency-updates

Run a specific integration test method in a submodule project, skipping unit tests:: mvn -Dit.test=TransactionAndCommitModeTest#testLowMaxPoll -DskipUTs=true verify -DfailIfNoTests=false --projects parallel-consumer-core

Run git bisect to find a bad commit, edit the Maven command in bisect.sh and run:: [source=bash]

git bisect start good bad git bisect run ./bisect.sh

=== Testing

The project has good automated test coverage, of all features. Including integration tests running against real Kafka broker and database. If you want to run the tests yourself, clone the repository and run the command: mvn test. The tests require an active docker server on localhost.

==== Integration Testing with TestContainers //https://github.com/confluentinc/schroedinger#integration-testing-with-testcontainers

We use the excellent https://testcontainers.org[Testcontainers] library for integration testing with JUnit.

To speed up test execution, you can enable container reuse across test runs by setting the following in your https://www.testcontainers.org/features/configuration/[`~/.testcontainers.properties` file]:

[source]

testcontainers.reuse.enable=true

This will leave the container running after the JUnit test is complete for reuse by subsequent runs.

NOTE: The container will only be left running if it is not explicitly stopped by the JUnit rule. For this reason, we use a variant of the https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers[singleton container pattern] instead of the JUnit rule.

Testcontainers detects if a container is reusable by hashing the container creation parameters from the JUnit test. If an existing container is not reusable, a new container will be created, but the old container will not be removed.

Target Description
testcontainers-list List all containers labeled as testcontainers
testcontainers-clean Remove all containers labeled as testcontainers

.Stop and remove all containers labeled with org.testcontainers=true [source,bash]

docker container ls --filter 'label=org.testcontainers=true' --format '{{.ID}}'
| $(XARGS) docker container rm --force

.List all containers labeled with org.testcontainers=true [source,bash]

docker container ls --filter 'label=org.testcontainers=true'

NOTE: testcontainers-clean removes all docker containers on your system with the io.testcontainers=true label (including the most recent container which may be reusable).

See https://github.com/testcontainers/testcontainers-java/pull/1781[this testcontainers PR] for details on the reusable containers feature.

== Implementation Details

=== Core Architecture

Concurrency is controlled by the size of the thread pool (worker pool in the diagram). Work is performed in a blocking manner, by the users submitted lambda functions.

These are the main sub systems:

  • controller thread
  • broker poller thread
  • work pool thread
  • work management
  • offset map manipulation

Each thread collaborates with the others through thread safe Java collections.

.Core Architecture. Threads are represented by letters and colours, with their steps in sequential numbers. image::https://lucid.app/publicSegments/view/320d924a-6517-4c54-a72e-b1c4b22e59ed/image.png[Core Architecture, align="center"]

=== Vert.x Architecture

The Vert.x module is an optional extension to the core module. As depicted in the diagram, the architecture extends the core architecture.

Instead of the work thread pool count being the degree of concurrency, it is controlled by a max parallel requests setting, and work is performed asynchronously on the Vert.x engine by a core count aligned Vert.x managed thread pool using Vert.x asynchronous IO plugins (https://vertx.io/docs/vertx-core/java/#_verticles[verticles]).

.Vert.x Architecture image::https://lucid.app/publicSegments/view/509df410-5997-46be-98e7-ac7f241780b4/image.png[Vert.x Architecture, align="center"]

=== Transactional System Architecture

image::https://lucid.app/publicSegments/view/7480d948-ed7d-4370-a308-8ec12e6b453b/image.png[]

[[offset_map]] === Offset Map

Unlike a traditional queue, messages are not deleted on an acknowledgement. However, offsets are tracked per message, per consumer group - there is no message replay for successful messages, even over clean restarts.

Across a system failure, only completed messages not stored as such in the last offset payload commit will be replayed. This is not an exactly once guarantee, as message replay cannot be prevented across failure.

CAUTION: Note that Kafka's Exactly Once Semantics (EoS) (transactional processing) also does not prevent duplicate message replay - it presents an effectively once result messages in Kafka topics. Messages may still be replayed when using EoS. This is an important consideration when using it, especially when integrating with thrid party systems, which is a very common pattern for utilising this project.

As mentioned previously, offsets are always committed in the correct order and only once all previous messages have been successfully processed; regardless of <<ordering-guarantees,ordering mode>> selected. We call this the "highest committable offset".

However, because messages can be processed out of order, messages beyond the highest committable offset must also be tracked for success and not replayed upon restart of failure. To achieve this the system goes a step further than normal Kafka offset commits.

When messages beyond the highest committable offset are successfully processed;

. they are stored as such in an internal memory map. . when the system then next commits offsets . if there are any messages beyond the highest offset which have been marked as succeeded .. the offset map is serialised and encoded into a base 64 string, and added to the commit message metadata. . upon restore, if needed, the system then deserializes this offset map and loads it back into memory . when each messages is polled into the system .. it checks if it's already been previously completed .. at which point it is then skipped.

This ensures that no message is reprocessed if it's been previously completed.

IMPORTANT: Successful messages beyond the highest committable offset are still recorded as such in a specially constructed metadata payload stored alongside the Kafka committed offset. These messages are not replayed upon restore/restart.

The offset map is compressed in parallel using two different compression techniques - run length encoding and bitmap encoding. The sizes of the compressed maps are then compared, and the smallest chosen for serialization. If both serialised formats are significantly large, they are then both compressed using zstd compression, and if that results in a smaller serialization then the compressed form is used instead.

==== Storage Notes

  • Runtime data model creates list of incomplete offsets
  • Continuously builds a full complete / not complete bit map from the base offset to be committed
  • Dynamically switching storage ** encodes into a BitSet, and a RunLength, then compresses both using zstd, then uses the smallest and tags as such in the encoded String ** Which is smallest can depend on the size and information density of the offset map *** Smaller maps fit better into uncompressed BitSets ~(30 entry map bitset: compressed: 13 Bytes, uncompressed: 4 Bytes) *** Larger maps with continuous sections usually better in compressed RunLength *** Completely random offset maps, compressed and uncompressed BitSet is roughly the same (2000 entries, uncompressed bitset: 250, compressed: 259, compressed bytes array: 477) *** Very large maps (20,000 entries), a compressed BitSet seems to be significantly smaller again if random.
  • Gets stored along with base offset for each partition, in the offset commitsync metadata string
  • The offset commit metadata has a hardcoded limit of 4096 bytes per partition (@see kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = 4096) ** Because of this, if our map doesn't fit into this, we have to drop it and not use it, losing the shorter replay benefits. However with runlength encoding and typical offset patterns this should be quite rare. *** Work is being done on continuous and predictive space requirements, which will optionally prevent the system from continuing past a point by introducing local backpressure which it can't proceed without dropping the encoded map information - see https://github.com/confluentinc/parallel-consumer/issues/53[Exact continuous offset encoding for precise offset payload size back pressure]. ** Not being able to fit the map into the metadata, depends on message acknowledgement patterns in the use case and the numbers of messages involved. Also the information density in the map (i.e. a single not yet completed message in 4000 completed ones will be a tiny map and will fit very large amounts of messages)

== Attribution

http://www.apache.org/[Apache®], http://kafka.apache.org/[Apache Kafka], and http://kafka.apache.org/[Kafka®] are either registered trademarks or trademarks of the http://www.apache.org/[Apache Software Foundation] in the United States and/or other countries.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].