All Projects → Parsely → Pykafka

Parsely / Pykafka

Licence: other
Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to Pykafka

Wirbelsturm
Wirbelsturm is a Vagrant and Puppet based tool to perform 1-click local and remote deployments, with a focus on big data tech like Kafka.
Stars: ✭ 332 (-69.23%)
Mutual labels:  apache-kafka, kafka
Cppkafka
Modern C++ Apache Kafka client library (wrapper for librdkafka)
Stars: ✭ 413 (-61.72%)
Mutual labels:  apache-kafka, kafka
Kafka Sprout
🚀 Web GUI for Kafka Cluster Management
Stars: ✭ 388 (-64.04%)
Mutual labels:  apache-kafka, kafka
Storm Dynamic Spout
A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
Stars: ✭ 40 (-96.29%)
Mutual labels:  apache-kafka, kafka
Kq
Kafka-based Job Queue for Python
Stars: ✭ 530 (-50.88%)
Mutual labels:  apache-kafka, kafka
Cp All In One
docker-compose.yml files for cp-all-in-one , cp-all-in-one-community, cp-all-in-one-cloud
Stars: ✭ 239 (-77.85%)
Mutual labels:  apache-kafka, kafka
Kafka Connect Hdfs
Kafka Connect HDFS connector
Stars: ✭ 400 (-62.93%)
Mutual labels:  apache-kafka, kafka
Azkarra Streams
🚀 Azkarra is a lightweight java framework to make it easy to develop, deploy and manage cloud-native streaming microservices based on Apache Kafka Streams.
Stars: ✭ 146 (-86.47%)
Mutual labels:  apache-kafka, kafka
Kafka Workshop
Materials (slides and code) for Kafka and Kafka Streams Workshop
Stars: ✭ 44 (-95.92%)
Mutual labels:  apache-kafka, kafka
Debezium
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.
Stars: ✭ 5,937 (+450.23%)
Mutual labels:  apache-kafka, kafka
Kowl
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Stars: ✭ 1,036 (-3.99%)
Mutual labels:  apache-kafka, kafka
Kafka Storm Starter
Code examples that show to integrate Apache Kafka 0.8+ with Apache Storm 0.9+ and Apache Spark Streaming 1.1+, while using Apache Avro as the data serialization format.
Stars: ✭ 728 (-32.53%)
Mutual labels:  apache-kafka, kafka
Kafkactl
Command Line Tool for managing Apache Kafka
Stars: ✭ 177 (-83.6%)
Mutual labels:  apache-kafka, kafka
Kafka Ui
Open-Source Web GUI for Apache Kafka Management
Stars: ✭ 230 (-78.68%)
Mutual labels:  apache-kafka, kafka
Kop
Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
Stars: ✭ 159 (-85.26%)
Mutual labels:  apache-kafka, kafka
Awesome Kafka
A list about Apache Kafka
Stars: ✭ 397 (-63.21%)
Mutual labels:  apache-kafka, kafka
Oryx
Oryx 2: Lambda architecture on Apache Spark, Apache Kafka for real-time large scale machine learning
Stars: ✭ 1,785 (+65.43%)
Mutual labels:  apache-kafka, kafka
Kafka Tutorials
Kafka Tutorials microsite
Stars: ✭ 144 (-86.65%)
Mutual labels:  apache-kafka, kafka
Agile data code 2
Code for Agile Data Science 2.0, O'Reilly 2017, Second Edition
Stars: ✭ 413 (-61.72%)
Mutual labels:  apache-kafka, kafka
Librdkafka
The Apache Kafka C/C++ library
Stars: ✭ 5,617 (+420.57%)
Mutual labels:  apache-kafka, kafka

.. image:: https://travis-ci.com/Parsely/pykafka.svg?branch=master :target: https://travis-ci.com/Parsely/pykafka

PyKafka

.. image:: http://i.imgur.com/ztYl4lG.jpg

PyKafka is a programmer-friendly Kafka client for Python. It includes Python implementations of Kafka producers and consumers, which are optionally backed by a C extension built on librdkafka_. It runs under Python 2.7+, Python 3.4+, and PyPy, and supports versions of Kafka 0.8.2 and newer.

.. _librdkafka: https://github.com/edenhill/librdkafka

PyKafka's primary goal is to provide a similar level of abstraction to the JVM Kafka client_ using idioms familiar to Python programmers and exposing the most Pythonic API possible.

You can install PyKafka from PyPI with

::

$ pip install pykafka

or from conda-forge with

::

$ conda install -c conda-forge pykafka

Full documentation and usage examples for PyKafka can be found on readthedocs_.

You can install PyKafka for local development and testing by cloning this repository and running

::

$ python setup.py develop

.. _JVM Kafka client: https://github.com/apache/kafka/tree/0.8.2/clients/src/main/java/org/apache/kafka .. _readthedocs: http://pykafka.readthedocs.org/en/latest/

Getting Started

Assuming you have at least one Kafka instance running on localhost, you can use PyKafka to connect to it.

.. sourcecode:: python

>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093,...")

Or, for a TLS connection, you might write (and also see SslConfig docs for further details):

.. sourcecode:: python

>>> from pykafka import KafkaClient, SslConfig
>>> config = SslConfig(cafile='/your/ca.cert',
...                    certfile='/your/client.cert',  # optional
...                    keyfile='/your/client.key',  # optional
...                    password='unlock my client key please')  # optional
>>> client = KafkaClient(hosts="127.0.0.1:<ssl-port>,...",
...                      ssl_config=config)

If the cluster you've connected to has any topics defined on it, you can list them with:

.. sourcecode:: python

>>> client.topics
>>> topic = client.topics['my.test']

Once you've got a Topic, you can create a Producer for it and start producing messages.

.. sourcecode:: python

>>> with topic.get_sync_producer() as producer:
...     for i in range(4):
...         producer.produce('test message ' + str(i ** 2))

The example above would produce to kafka synchronously - the call only returns after we have confirmation that the message made it to the cluster.

To achieve higher throughput, we recommend using the Producer in asynchronous mode, so that produce() calls will return immediately and the producer may opt to send messages in larger batches. The Producer collects produced messages in an internal queue for linger_ms before sending each batch. This delay can be removed or changed at the expense of efficiency with linger_ms, min_queued_messages, and other keyword arguments (see readthedocs_). You can still obtain delivery confirmation for messages, through a queue interface which can be enabled by setting delivery_reports=True. Here's a rough usage example:

.. sourcecode:: python

>>> with topic.get_producer(delivery_reports=True) as producer:
...     count = 0
...     while True:
...         count += 1
...         producer.produce('test msg', partition_key='{}'.format(count))
...         if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)
...             while True:
...                 try:
...                     msg, exc = producer.get_delivery_report(block=False)
...                     if exc is not None:
...                         print 'Failed to deliver msg {}: {}'.format(
...                             msg.partition_key, repr(exc))
...                     else:
...                         print 'Successfully delivered msg {}'.format(
...                         msg.partition_key)
...                 except Queue.Empty:
...                     break

Note that the delivery report queue is thread-local: it will only serve reports for messages which were produced from the current thread. Also, if you're using delivery_reports=True, failing to consume the delivery report queue will cause PyKafka's memory usage to grow unbounded.

You can also consume messages from this topic using a Consumer instance.

.. sourcecode:: python

>>> consumer = topic.get_simple_consumer()
>>> for message in consumer:
...     if message is not None:
...         print message.offset, message.value
0 test message 0
1 test message 1
2 test message 4
3 test message 9

This SimpleConsumer doesn't scale - if you have two SimpleConsumers consuming the same topic, they will receive duplicate messages. To get around this, you can use the BalancedConsumer.

.. sourcecode:: python

>>> balanced_consumer = topic.get_balanced_consumer(
...     consumer_group='testgroup',
...     auto_commit_enable=True,
...     zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'
... )

You can have as many BalancedConsumer instances consuming a topic as that topic has partitions. If they are all connected to the same zookeeper instance, they will communicate with it to automatically balance the partitions between themselves. The partition assignment strategy used by the BalancedConsumer is the "range" strategy by default. The strategy is switchable via the membership_protocol keyword argument, and can be either an object exposed by pykafka.membershipprotocol or a custom instance of pykafka.membershipprotocol.GroupMembershipProtocol.

You can also use the Kafka 0.9 Group Membership API with the managed keyword argument on get_balanced_consumer.

Using the librdkafka extension

PyKafka includes a C extension that makes use of librdkafka to speed up producer and consumer operation.

To ensure the C extension is compiled, set environment variable RDKAFKA_INSTALL=system during pip install or setup.py, i.e. RDKAFKA_INSTALL=system pip install pykafka. The setup will fail if C extension is not compiled. Oppositely, if RDKAFKA_INSTALL='', this explicitly specifies that the C extension should not be compiled. The current default behavior is to compile the extension but will not fail the setup if compilation fails.

PyKafka requires librdkafka v0.9.1+. Some system package managers may not have up-to-date versions. To use the librdkafka extension, you need to make sure the header files and shared library are somewhere where python can find them, both when you build the extension (which is taken care of by setup.py develop) and at run time. Typically, this means that you need to either install librdkafka in a place conventional for your system, or declare C_INCLUDE_PATH, LIBRARY_PATH, and LD_LIBRARY_PATH in your shell environment to point to the installation location of the librdkafka shared objects. You can find this location with locate librdkafka.so.

After that, all that's needed is that you pass an extra parameter use_rdkafka=True to topic.get_producer(), topic.get_simple_consumer(), or topic.get_balanced_consumer(). Note that some configuration options may have different optimal values; it may be worthwhile to consult librdkafka's configuration notes_ for this.

.. _0.9.1: https://github.com/edenhill/librdkafka/releases/tag/0.9.1 .. _configuration notes: https://github.com/edenhill/librdkafka/blob/0.9.1/CONFIGURATION.md

Operational Tools

PyKafka includes a small collection of CLI tools_ that can help with common tasks related to the administration of a Kafka cluster, including offset and lag monitoring and topic inspection. The full, up-to-date interface for these tools can be found by running

.. sourcecode:: sh

$ python cli/kafka_tools.py --help

or after installing PyKafka via setuptools or pip:

.. sourcecode:: sh

$ kafka-tools --help

.. _CLI tools: https://github.com/Parsely/pykafka/blob/master/pykafka/cli/kafka_tools.py

PyKafka or kafka-python?

These are two different projects. See the discussion here <https://github.com/Parsely/pykafka/issues/334>_ for comparisons between the two projects.

Contributing

If you're interested in contributing code to PyKafka, a good place to start is the "help wanted"_ issue tag. We also recommend taking a look at the contribution guide_.

.. _"help wanted": https://github.com/Parsely/pykafka/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22

Support

If you need help using PyKafka, there are a bunch of resources available. For usage questions or common recipes, check out the StackOverflow tag. The Google Group can be useful for more in-depth questions or inquiries you'd like to send directly to the PyKafka maintainers. If you believe you've found a bug in PyKafka, please open a github issue_ after reading the contribution guide_.

.. _StackOverflow tag: https://stackoverflow.com/questions/tagged/pykafka .. _github issue: https://github.com/Parsely/pykafka/issues .. _Google Group: https://groups.google.com/forum/#!forum/pykafka-user .. _contribution guide: https://github.com/Parsely/pykafka/blob/master/CONTRIBUTING.rst

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].