All Projects → RTBHOUSE → kafka-workers

RTBHOUSE / kafka-workers

Licence: Apache-2.0 license
Kafka Workers is a client library which unifies records consuming from Kafka and processing them by user-defined WorkerTasks.

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to kafka-workers

MultiHttp
This is a high performance , very useful multi-curl tool written in php. 一个超级好用的并发CURL工具!!!(httpful,restful, concurrency)
Stars: ✭ 79 (+163.33%)
Mutual labels:  parallel, multithreading
java-multithread
Códigos feitos para o curso de Multithreading com Java, no canal RinaldoDev do YouTube.
Stars: ✭ 24 (-20%)
Mutual labels:  parallel, multithreading
Taskflow
A General-purpose Parallel and Heterogeneous Task Programming System
Stars: ✭ 6,128 (+20326.67%)
Mutual labels:  parallel, multithreading
wasm-bindgen-rayon
An adapter for enabling Rayon-based concurrency on the Web with WebAssembly.
Stars: ✭ 257 (+756.67%)
Mutual labels:  parallel, multithreading
Napajs
Napa.js: a multi-threaded JavaScript runtime
Stars: ✭ 8,945 (+29716.67%)
Mutual labels:  parallel, multithreading
Corium
Corium is a modern scripting language which combines simple, safe and efficient programming.
Stars: ✭ 18 (-40%)
Mutual labels:  parallel, multithreading
Hamsters.js
100% Vanilla Javascript Multithreading & Parallel Execution Library
Stars: ✭ 517 (+1623.33%)
Mutual labels:  parallel, multithreading
pblat
parallelized blat with multi-threads support
Stars: ✭ 34 (+13.33%)
Mutual labels:  parallel, multithreading
Pelagia
Automatic parallelization (lock-free multithreading thread) tool developed by Surparallel Open Source.Pelagia is embedded key value database that implements a small, fast, high-reliability on ANSI C.
Stars: ✭ 1,132 (+3673.33%)
Mutual labels:  parallel, multithreading
Openmp Examples
openmp examples
Stars: ✭ 64 (+113.33%)
Mutual labels:  parallel, multithreading
Poshrsjob
Provides an alternative to PSjobs with greater performance and less overhead to run commands in the background, freeing up the console and allowing throttling on the jobs.
Stars: ✭ 447 (+1390%)
Mutual labels:  parallel, multithreading
noroutine
Goroutine analogue for Node.js, spreads I/O-bound routine calls to utilize thread pool (worker_threads) using balancer with event loop utilization. 🌱
Stars: ✭ 86 (+186.67%)
Mutual labels:  parallel, multithreading
Ems
Extended Memory Semantics - Persistent shared object memory and parallelism for Node.js and Python
Stars: ✭ 552 (+1740%)
Mutual labels:  parallel, multithreading
pooljs
Browser computing unleashed!
Stars: ✭ 17 (-43.33%)
Mutual labels:  parallel, multithreading
thread-pool
BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library
Stars: ✭ 1,043 (+3376.67%)
Mutual labels:  parallel, multithreading
Boting
Simple but powerful Telegram Bot library
Stars: ✭ 21 (-30%)
Mutual labels:  multithreading
parallel
PARALLEL: Stata module for parallel computing
Stars: ✭ 97 (+223.33%)
Mutual labels:  parallel
sphere-mt
C/C++ Multithreading Programming Course Materials.
Stars: ✭ 17 (-43.33%)
Mutual labels:  multithreading
product-sp
An open source, cloud-native streaming data integration and analytics product optimized for agile digital businesses
Stars: ✭ 80 (+166.67%)
Mutual labels:  stream-processing
SoftLight
A shader-based Software Renderer Using The LightSky Framework.
Stars: ✭ 2 (-93.33%)
Mutual labels:  multithreading

build status

Kafka Workers

Kafka Workers is a client library which unifies records consuming from Kafka and processing them by user-defined tasks. It provides:

  • higher level of distribution because of sub-partitioning defined by WorkerPartitioner,
  • tighter control of offsets commits to Kafka applied by RecordStatusObserver,
  • possibility to pause and resume processing by WorkerTask for given partition,
  • at-least-once state and output semantics,
  • backpressure,
  • processing timeouts,
  • handling failures.

Motivation

The major thing to address was a threading model with better resources utilization in terms of:

  • decoupling consumption and processing with records buffering (for better performance)
  • higher level of distribution for processing (higher than consumption and not always limited by partitions count)

Especially for the second point we have a repeating scenario: for some reasons we want to keep the same (not too high) count of partitions which gives us a good distribution and efficiency of day-to-day processing. The problem occurs when we need to (re)process the stream with a huge (consumer) lag (for example because of some failure). In such a scenario we would like to temporarily bump up the number of partitions for data stored in Kafka to better utilize cpu and/or external APIs. It is what we called sub-partitioning. What is important in terms of proper distributed processing, stream of records from one TopicPartition could be reordered now but records with the same WorkerSubpartition remain ordered to each other. Assuming that the user-defined subpartitioner depends on the key, these streams remain partially ordered but records with the same keys are processed sequentially.

The second requirement was a possibility to pause and resume processing for a given partition. It was required particularly by one of our components which role is to merge given pairs of partitions from different topics. It is a time-aware merging so from time to time we need to "wait" for a related partition without stopping others. What is more, this feature gives backpressure mechanism which was needed by mentioned separation of consumption and processing.

The last but not least was simplicity (lightweight library, with almost pure Kafka Consumer-like API, with no processing cluster, without external dependencies, without translating messages to/from its internal data format etc.) and genericity (we managed to replace mix of technologies like Storm, Kafka Streams/Connect, Flume and Logstash by a unified solution).

Version

Current version is 1.1.9

Requirements

You need Java 11 and at least Apache Kafka 2.0 to use this library.

Installation

Releases are distributed on mvn repository:

<dependency>
    <groupId>com.rtbhouse</groupId>
    <artifactId>kafka-workers</artifactId>
    <version>1.1.9</version>
</dependency>

Usage

To use Kafka Workers you should implement the following interfaces:

public interface WorkerTask<K, V> {

    void init(WorkerSubpartition subpartition, WorkersConfig config);

    boolean accept(WorkerRecord<K, V> record);

    void process(WorkerRecord<K, V> record, RecordStatusObserver observer);

    void punctuate(long punctuateTime);

    void close();
}

User-defined task which is associated with one of WorkerSubpartitions. The most crucial are: accept() and process() methods. The first one checks if given WorkerRecord could be polled from internal WorkerSubpartition's queue peek and passed to process method. The second one processes just polled WorkerRecord from given WorkerSubpartition's internal queue. Processing could be done synchronously or asynchronously but in both cases one of the RecordStatusObserver's methods onSuccess() or onFailure() has to be called. Not calling any of these methods for configurable amount of time will be considered as a failure. Additionally, punctuate() method allows to do maintenance tasks every configurable amount of time independently if there are records to process or not. All the methods: accept(), process() and punctuate() are executed in a single thread sequentially so synchronization is not necessary. What is more, both methods: init() and close() are synchronized with these accept(), process() and punctuate() internally by Kafka Workers so additional user synchronization is not necessary for these calls as well.

public interface WorkerPartitioner<K, V> {

    int subpartition(ConsumerRecord<K, V> consumerRecord);

    int count(TopicPartition topicPartition);
}

User-defined partitioner is used for additional sub-partitioning which could give better distribution of processing. It means that stream of records from one TopicPartition could be reordered during processing but records with the same WorkerSubpartition remain ordered to each other. It leads also to a bit more complex offsets committing policy which is provided by Kafka Workers to ensure at-least-once delivery.

Usage example:

    Properties properties = new Properties();
    properties.setProperty("consumer.topics", "my-topic");
    properties.setProperty("consumer.kafka.bootstrap.servers", "localhost:9192");
    properties.setProperty("consumer.kafka.group.id", "my-workers");
    properties.setProperty("consumer.kafka.key.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer");
    properties.setProperty("consumer.kafka.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer");

    KafkaWorkers<String, String> kafkaWorkers = new KafkaWorkers<>(
        new WorkersConfig(properties),
        new MyWorkerTaskFactory<>(),
        new MyWorkerPartitioner<>(),
        new MyShutdownCallback());

    Runtime.getRuntime().addShutdownHook(new Thread(kafkaWorkers::shutdown));
    kafkaWorkers.start();
    KafkaWorkers.Status status = kafkaWorkers.waitForShutdown();
 }

Internals

Internally one Kafka Workers instance launches one consumer thread, one punctuator thread and configurable count of worker threads. Each thread can execute one or more WorkerTasks and each WorkerTask processes WorkerRecords from internal queue associated with given WorkerSubpartition. Kafka Workers ensures by its offsets state that only continuously processed offsets are commited.

Kafka Workers Architecture

Configuration

Name Description Type Default
consumer.topics A list of kafka topics read by ConsumerThread. list
consumer.commit.interval.ms The frequency in milliseconds that the processed offsets are committed to Kafka. long 10000
consumer.processing.timeout.ms The timeout in milliseconds for record to be successfully processed. long 300000
consumer.poll.timeout.ms The time in milliseconds spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. long 1000
consumer.commit.retries The number of retries in case of retriable commit failed exception. int 3
consumer.kafka Should be used as a prefix for internal kafka consumer configuration. Usage example:
consumer.kafka.bootstrap.servers = localhost:9192
consumer.kafka.group.id = my-workers
consumer.kafka.key.deserializer = org.apache.kafka.common.serialization.BytesDeserializer
consumer.kafka.value.deserializer = org.apache.kafka.common.serialization.BytesDeserializer
worker.threads.num The number of WorkerThreads per one Kafka Workers instance. int 1
worker.sleep.ms The time in milliseconds to wait for WorkerThread in case of not accepted tasks. long 1000
worker.processing.guarantee Specifies worker processing guarantees. Possible values:
  • none - logs and skips records which cause processing failure, thus failures don't cause message retransmission and may result in data loss.
  • at_least_once - shuts Kafka Workers down on record processing failure, enforces message retransmission upon restart and may cause data duplication.
String at_least_once
worker.task Could be used as a prefix for internal task configuration.
punctuator.interval.ms The frequency in milliseconds that punctuate method is called. long 1000
queue.total.size.heap.ratio It defines how big part of the heap can be used for input queues (0.5 size of the heap by default). This total memory size for all queues is divided into individual queue sizes. E.g. for 8G heap and 0.5 ratio there will be 4G for all queues. If there are 32 subpartitions each of them will get 128M input queue. Input record sizes are calculated using record.key.weigher and record.value.weigher classes. double from (0, 1) range 0.5
record.key.weigher Class implementing com.rtbhouse.kafka.workers.api.record.weigher.Weigher interface which measures size in bytes of input record keys. It is used to compute the total size of input records and input queues. class
record.value.weigher Class implementing com.rtbhouse.kafka.workers.api.record.weigher.Weigher interface which measures size in bytes of input record values. It is used to compute the total size of input records and input queues. class
queue.resume.ratio The minimum ratio of used to total queue size for partition resuming. double 0.9
metric.reporters A list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. list ""

Use cases

At RTB House we use Kafka Workers for all components in our processing infrastructure. For more details please check out our techblog posts:

We have adopted Kafka Workers to all our use cases which are: Kafka-to-Kafka-processing and HDFS, BigQuery, Aerospike, Elastic streaming etc. It helps us to utilize different types of resources and works successfully on a really huge scale. So far we have open sourced a core part only but we are going to do the same with our "connectors". The diagram below shows high-level architecture of our current processing infrastructure:

Our real-time data processing

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].