All Projects → karafka → Waterdrop

karafka / Waterdrop

Licence: lgpl-3.0
WaterDrop is a standalone Karafka component library for generating Kafka messages

Programming Languages

ruby
36898 projects - #4 most used programming language

Projects that are alternatives of or similar to Waterdrop

Karafka
Framework for Apache Kafka based Ruby and Rails applications development.
Stars: ✭ 1,223 (+799.26%)
Mutual labels:  apache-kafka, kafka, rubygem, rubygems
Kt
Kafka command line tool that likes JSON
Stars: ✭ 799 (+487.5%)
Mutual labels:  apache-kafka, kafka
Materialize Sass
Materializecss rubygem for Rails Asset Pipeline / Sprockets
Stars: ✭ 785 (+477.21%)
Mutual labels:  rubygem, rubygems
Kowl
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Stars: ✭ 1,036 (+661.76%)
Mutual labels:  apache-kafka, kafka
Kq
Kafka-based Job Queue for Python
Stars: ✭ 530 (+289.71%)
Mutual labels:  apache-kafka, kafka
Librdkafka
The Apache Kafka C/C++ library
Stars: ✭ 5,617 (+4030.15%)
Mutual labels:  apache-kafka, kafka
Kafka Workshop
Materials (slides and code) for Kafka and Kafka Streams Workshop
Stars: ✭ 44 (-67.65%)
Mutual labels:  apache-kafka, kafka
Kafka Connect Hdfs
Kafka Connect HDFS connector
Stars: ✭ 400 (+194.12%)
Mutual labels:  apache-kafka, kafka
Kukulcan
A REPL for Apache Kafka
Stars: ✭ 103 (-24.26%)
Mutual labels:  apache-kafka, kafka
Kattlo Cli
Kattlo CLI Project
Stars: ✭ 58 (-57.35%)
Mutual labels:  apache-kafka, kafka
Debezium
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.
Stars: ✭ 5,937 (+4265.44%)
Mutual labels:  apache-kafka, kafka
Slimmessagebus
Lightweight message bus interface for .NET (pub/sub and request-response) with transport plugins for popular message brokers.
Stars: ✭ 120 (-11.76%)
Mutual labels:  apache-kafka, kafka
Agile data code 2
Code for Agile Data Science 2.0, O'Reilly 2017, Second Edition
Stars: ✭ 413 (+203.68%)
Mutual labels:  apache-kafka, kafka
Kafka Storm Starter
Code examples that show to integrate Apache Kafka 0.8+ with Apache Storm 0.9+ and Apache Spark Streaming 1.1+, while using Apache Avro as the data serialization format.
Stars: ✭ 728 (+435.29%)
Mutual labels:  apache-kafka, kafka
Cppkafka
Modern C++ Apache Kafka client library (wrapper for librdkafka)
Stars: ✭ 413 (+203.68%)
Mutual labels:  apache-kafka, kafka
Storm Dynamic Spout
A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
Stars: ✭ 40 (-70.59%)
Mutual labels:  apache-kafka, kafka
Kafka Sprout
🚀 Web GUI for Kafka Cluster Management
Stars: ✭ 388 (+185.29%)
Mutual labels:  apache-kafka, kafka
Awesome Kafka
A list about Apache Kafka
Stars: ✭ 397 (+191.91%)
Mutual labels:  apache-kafka, kafka
Pykafka
Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
Stars: ✭ 1,079 (+693.38%)
Mutual labels:  apache-kafka, kafka
Meetup
Kafka 한국 사용자 모임에서 운영하는 meetup repository
Stars: ✭ 106 (-22.06%)
Mutual labels:  apache-kafka, kafka

WaterDrop

Note: Documentation presented here refers to WaterDrop 2.0.0.

WaterDrop 2.0 does not work with Karafka 1.* and aims to either work as a standalone producer outside of Karafka 1.* ecosystem or as a part of not yet released Karafka 2.0.*.

Please refer to this branch and it's documentation for details about WaterDrop 1.* usage.

Build Status Gem Version Join the chat at https://gitter.im/karafka/karafka

Gem used to send messages to Kafka in an easy way with an extra validation layer. It is a part of the Karafka ecosystem.

It:

  • Is thread safe
  • Supports sync producing
  • Supports async producing
  • Supports buffering
  • Supports producing messages to multiple clusters
  • Supports multiple delivery policies
  • Works with Kafka 1.0+ and Ruby 2.5+

Installation

gem install waterdrop

or add this to your Gemfile:

gem 'waterdrop'

and run

bundle install

Setup

WaterDrop is a complex tool, that contains multiple configuration options. To keep everything organized, all the configuration options were divided into two groups:

  • WaterDrop options - options directly related to Karafka framework and it's components
  • Kafka driver options - options related to Kafka

To apply all those configuration options, you need to create a producer instance and use the #setup method:

producer = WaterDrop::Producer.new

producer.setup do |config|
  config.deliver = true
  config.kafka = {
    'bootstrap.servers': 'localhost:9092',
    'request.required.acks': 1
  }
end

or you can do the same while initializing the producer:

producer = WaterDrop::Producer.new do |config|
  config.deliver = true
  config.kafka = {
    'bootstrap.servers': 'localhost:9092',
    'request.required.acks': 1
  }
end

WaterDrop configuration options

Option Description
id id of the producer for instrumentation and logging
logger Logger that we want to use
deliver Should we send messages to Kafka or just fake the delivery
max_wait_timeout Waits that long for the delivery report or raises an error
wait_timeout Waits that long before re-check of delivery report availability

Kafka configuration options

You can create producers with different kafka settings. Documentation of the available configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

Usage

Please refer to the documentation in case you're interested in the more advanced API.

Basic usage

To send Kafka messages, just create a producer and use it:

producer = WaterDrop::Producer.new

producer.setup do |config|
  config.kafka = { 'bootstrap.servers': 'localhost:9092' }
end

producer.produce_sync(topic: 'my-topic', payload: 'my message')

# or for async
producer.produce_async(topic: 'my-topic', payload: 'my message')

# or in batches
producer.produce_many_sync(
  [
    { topic: 'my-topic', payload: 'my message'},
    { topic: 'my-topic', payload: 'my message'}
  ]
)

# both sync and async
producer.produce_many_async(
  [
    { topic: 'my-topic', payload: 'my message'},
    { topic: 'my-topic', payload: 'my message'}
  ]
)

# Don't forget to close the producer once you're done to flush the internal buffers, etc
producer.close

Each message that you want to publish, will have its value checked.

Here are all the things you can provide in the message hash:

Option Required Value type Description
topic true String The Kafka topic that should be written to
payload true String Data you want to send to Kafka
key false String The key that should be set in the Kafka message
partition false Integer A specific partition number that should be written to
timestamp false Time, Integer The timestamp that should be set on the message
headers false Hash Headers for the message

Keep in mind, that message you want to send should be either binary or stringified (to_s, to_json, etc).

Buffering

WaterDrop producers support buffering of messages, which means that you can easily implement periodic flushing for long running processes as well as buffer several messages to be flushed the same moment:

producer = WaterDrop::Producer.new

producer.setup do |config|
  config.kafka = { 'bootstrap.servers': 'localhost:9092' }
end

time = Time.now - 10

while time < Time.now
  time += 1
  producer.buffer(topic: 'times', payload: Time.now.to_s)
end

puts "The messages buffer size #{producer.messages.size}"
producer.flush_sync
puts "The messages buffer size #{producer.message.size}"

producer.close

Instrumentation

Each of the producers after the #setup is done, has a custom monitor to which you can subscribe.

producer = WaterDrop::Producer.new

producer.setup do |config|
  config.kafka = { 'bootstrap.servers': 'localhost:9092' }
end

producer.monitor.subscribe('message.produced_async') do |event|
  puts "A message was produced to '#{event[:message][:topic]}' topic!"
end

producer.produce_async(topic: 'events', payload: 'data')

producer.close

See the WaterDrop::Instrumentation::Monitor::EVENTS for the list of all the supported events.

Usage statistics

WaterDrop may be configured to emit internal metrics at a fixed interval by setting the kafka statistics.interval.ms configuration property to a value > 0. Once that is done, emitted statistics are available after subscribing to the statistics.emitted publisher event.

The statistics include all of the metrics from librdkafka (full list here) as well as the diff of those against the previously emitted values.

For several attributes like txmsgs, librdkafka publishes only the totals. In order to make it easier to track the progress (for example number of messages sent between statistics emitted events), WaterDrop diffs all the numeric values against previously available numbers. All of those metrics are available under the same key as the metric but with additional _d postfix:

producer = WaterDrop::Producer.new do |config|
  config.kafka = {
    'bootstrap.servers': 'localhost:9092',
    'statistics.interval.ms': 2_000 # emit statistics every 2 seconds
  }
end

producer.monitor.subscribe('statistics.emitted') do |event|
  sum = event[:statistics]['txmsgs']
  diff = event[:statistics]['txmsgs_d']

  p "Sent messages: #{sum}"
  p "Messages sent from last statistics report: #{diff}"
end

sleep(2)

# Sent messages: 0
# Messages sent from last statistics report: 0

20.times { producer.produce_async(topic: 'events', payload: 'data') }

# Sent messages: 20
# Messages sent from last statistics report: 20

sleep(2)

20.times { producer.produce_async(topic: 'events', payload: 'data') }

# Sent messages: 40
# Messages sent from last statistics report: 20

sleep(2)

# Sent messages: 40
# Messages sent from last statistics report: 0

producer.close

Note: The metrics returned may not be completely consistent between brokers, toppars and totals, due to the internal asynchronous nature of librdkafka. E.g., the top level tx total may be less than the sum of the broker tx values which it represents.

Forking and potential memory problems

If you work with forked processes, make sure you don't use the producer before the fork. You can easily configure the producer and then fork and use it.

To tackle this obstacle related to rdkafka, WaterDrop adds finalizer to each of the producers to close the rdkafka client before the Ruby process is shutdown. Due to the nature of the finalizers, this implementation prevents producers from being GCed (except upon VM shutdown) and can cause memory leaks if you don't use persistent/long-lived producers in a long-running process or if you don't use the #close method of a producer when it is no longer needed. Creating a producer instance for each message is anyhow a rather bad idea, so we recommend not to.

References

Note on contributions

First, thank you for considering contributing to WaterDrop! It's people like you that make the open source community such a great community!

Each pull request must pass all the RSpec specs and meet our quality requirements.

To check if everything is as it should be, we use Coditsu that combines multiple linters and code analyzers for both code and documentation. Once you're done with your changes, submit a pull request.

Coditsu will automatically check your work against our quality standards. You can find your commit check results on the builds page of WaterDrop repository.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].