All Projects → ciena → Afkak

ciena / Afkak

Licence: apache-2.0
Kafka client written in Twisted Python

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to Afkak

Kafka Go
Kafka library in Go
Stars: ✭ 4,200 (+15455.56%)
Mutual labels:  kafka-client
Tor2web
Tor2web is an HTTP proxy software that enables access to Tor Hidden Services by mean of common web browsers
Stars: ✭ 531 (+1866.67%)
Mutual labels:  twisted
Klein
werkzeug + twisted.web
Stars: ✭ 770 (+2751.85%)
Mutual labels:  twisted
Requests Threads
🎭 Twisted Deferred Thread backend for Requests.
Stars: ✭ 366 (+1255.56%)
Mutual labels:  twisted
Treq
Python requests like API built on top of Twisted's HTTP client.
Stars: ✭ 517 (+1814.81%)
Mutual labels:  twisted
Jasmin
Jasmin - Open source SMS gateway
Stars: ✭ 590 (+2085.19%)
Mutual labels:  twisted
Node Sinek
🎩 Most advanced high level Node.js Kafka client
Stars: ✭ 262 (+870.37%)
Mutual labels:  kafka-client
Eliot
Eliot: the logging system that tells you *why* it happened
Stars: ✭ 874 (+3137.04%)
Mutual labels:  twisted
Kq
Kafka-based Job Queue for Python
Stars: ✭ 530 (+1862.96%)
Mutual labels:  kafka-client
Kafka Streams Machine Learning Examples
This project contains examples which demonstrate how to deploy analytic models to mission-critical, scalable production environments leveraging Apache Kafka and its Streams API. Models are built with Python, H2O, TensorFlow, Keras, DeepLearning4 and other technologies.
Stars: ✭ 661 (+2348.15%)
Mutual labels:  kafka-client
Honeypy
A low to medium interaction honeypot.
Stars: ✭ 410 (+1418.52%)
Mutual labels:  twisted
Logback Kafka Appender
Logback appender for Apache Kafka
Stars: ✭ 505 (+1770.37%)
Mutual labels:  kafka-client
Scrapyrt
HTTP API for Scrapy spiders
Stars: ✭ 637 (+2259.26%)
Mutual labels:  twisted
Umongo
sync/async MongoDB ODM, yes.
Stars: ✭ 331 (+1125.93%)
Mutual labels:  twisted
Globaleaks
GlobaLeaks is free, open source software enabling anyone to easily set up and maintain a secure whistleblowing platform.
Stars: ✭ 832 (+2981.48%)
Mutual labels:  twisted
Iwant
Commandline tool for searching and downloading files in LAN network, without any central server
Stars: ✭ 268 (+892.59%)
Mutual labels:  twisted
Aiokafka
asyncio client for kafka
Stars: ✭ 544 (+1914.81%)
Mutual labels:  kafka-client
Sarama
Sarama is a Go library for Apache Kafka 0.8, and up.
Stars: ✭ 7,964 (+29396.3%)
Mutual labels:  kafka-client
Skillbox Chat 08 19
Skillbox demo application for the Python course
Stars: ✭ 25 (-7.41%)
Mutual labels:  twisted
Rust Rdkafka
A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka
Stars: ✭ 637 (+2259.26%)
Mutual labels:  kafka-client

Afkak: Twisted Python Kafka Client

PyPI calver: YY.MM.MICRO Apache 2.0 Documentation

Afkak is a Twisted-native Apache Kafka client library. It provides support for:

  • Producing messages, with automatic batching and optional compression.
  • Consuming messages, with group coordination and automatic commit.

Learn more in the documentation, download from PyPI, or review the contribution guidelines. Please report any issues on GitHub.

Status

Afkak supports these Pythons:

  • CPython 2.7
  • CPython 3.5, 3.6, 3.7, and 3.8
  • PyPy and PyPy3 6.0+

We aim to support Kafka 1.1.x and later. Integration tests are run against these Kafka broker versions:

  • 0.9.0.1
  • 1.1.1

Testing against 2.0.0 is planned (see #45).

Newer broker releases will generally function, but not all Afkak features will work on older brokers. In particular, the coordinated consumer won’t work before Kafka 0.9.0.1. We don’t recommend deploying such old releases anyway, as they have serious bugs.

Usage

High level

Note: This code is not meant to be runnable. See producer_example and consumer_example for runnable example code.

from afkak.client import KafkaClient
from afkak.consumer import Consumer
from afkak.producer import Producer
from afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS,
    PRODUCER_ACK_LOCAL_WRITE)

kClient = KafkaClient("localhost:9092")

# To send messages
producer = Producer(kClient)
d1 = producer.send_messages("my-topic", msgs=[b"some message"])
d2 = producer.send_messages("my-topic", msgs=[b"takes a list", b"of messages"])
# To get confirmations/errors on the sends, add callbacks to the returned deferreds
d1.addCallbacks(handleResponses, handleErrors)

# To wait for acknowledgements
# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to
#                         a local log before sending response
# [ the default ]
# PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed
#                            by all in sync replicas before sending a response
producer = Producer(kClient,
                    req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE,
                    ack_timeout=2000)

responseD = producer.send_messages("my-topic", msgs=[b"message"])

# Using twisted's @inlineCallbacks:
responses = yield responseD
if response:
    print(response[0].error)
    print(response[0].offset)

# To send messages in batch: You can use a producer with any of the
# partitioners for doing this. The following producer will collect
# messages in batch and send them to Kafka after 20 messages are
# collected or every 60 seconds (whichever comes first). You can
# also batch by number of bytes.
# Notes:
# * If the producer dies before the messages are sent, the caller would
# * not have had the callbacks called on the send_messages() returned
# * deferreds, and so can retry.
# * Calling producer.stop() before the messages are sent will
# errback() the deferred(s) returned from the send_messages call(s)
producer = Producer(kClient, batch_send=True,
                    batch_send_every_n=20,
                    batch_send_every_t=60)
responseD1 = producer.send_messages("my-topic", msgs=[b"message"])
responseD2 = producer.send_messages("my-topic", msgs=[b"message 2"])

# To consume messages
# define a function which takes a list of messages to process and
# possibly returns a deferred which fires when the processing is
# complete.
def processor_func(consumer, messages):
    #  Store_Messages_In_Database may return a deferred
    result = store_messages_in_database(messages)
    # record last processed message
    consumer.commit()
    return result

the_partition = 3  # Consume only from partition 3.
consumer = Consumer(kClient, "my-topic", the_partition, processor_func)
d = consumer.start(OFFSET_EARLIEST)  # Start reading at earliest message
# The deferred returned by consumer.start() will fire when an error
# occurs that can't handled by the consumer, or when consumer.stop()
# is called
yield d

consumer.stop()
kClient.close()

Keyed messages

from afkak.client import KafkaClient
from afkak.producer import Producer
from afkak.partitioner import HashedPartitioner, RoundRobinPartitioner

kafka = KafkaClient("localhost:9092")

# Use the HashedPartitioner so that the producer will use the optional key
# argument on send_messages()
producer = Producer(kafka, partitioner_class=HashedPartitioner)
producer.send_messages("my-topic", "key1", [b"some message"])
producer.send_messages("my-topic", "key2", [b"this method"])


Low level

from afkak.client import KafkaClient
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
    messages=[KafkaProtocol.encode_message(b"some message")])
resps = afkak.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()

resps[0].topic      # b"my-topic"
resps[0].partition  # 1
resps[0].error      # 0 (hopefully)
resps[0].offset     # offset of the first message sent in this request

Install

Afkak releases are available on PyPI.

Because the Afkak dependencies Twisted and python-snappy have binary extension modules you will need to install the Python development headers for the interpreter you wish to use:

Debian/Ubuntu: sudo apt-get install build-essential python-dev python3-dev pypy-dev pypy3-dev libsnappy-dev
OS X brew install python pypy snappy pip install virtualenv

Then Afkak can be installed with pip as usual:

License

Copyright 2013, 2014, 2015 David Arthur under Apache License, v2.0. See LICENSE

Copyright 2014, 2015 Cyan, Inc. under Apache License, v2.0. See LICENSE

Copyright 2015, 2016, 2017, 2018, 2019 Ciena Corporation under Apache License, v2.0. See LICENSE

This project began as a port of the kafka-python library to Twisted.

See AUTHORS.md for the full contributor list.

Tests

In order to run Afkak's tests, you need to install the dependencies as specified in the install section.

The Afkak test suite uses Tox to execute the tests in all the supported Python versions. The preferred method to run the tests is to install Tox in a virtual environment before running the tests:

make venv

Testing Strategy

Afkak has two types of tests:

  • Unit tests — unit tests are fast tests. They don't do I/O.

  • Integration tests — tests that run against a real Kafka broker.

Run the unit tests

To run all unit tests in all the supported Python versions (requires all the versions to be installed in the system where the tests will run):

make toxu

Alternatively, you might want to run unit tests in a list of specific Python versions:

.env/bin/tox -e py27-unit-snappy,py35-unit-snappy

It is recommended for contributors to run unit tests in at least Python 2.7 and one Python 3 version before submitting a pull request.

Run the integration tests

The integration tests will actually start up real local ZooKeeper instance and Kafka brokers, and send messages in using the client.

The makefile knows how to download several versions of Kafka. This will run just the integration tests against Kafka 1.1.1:

KAFKA_VER=1.1.1 make toxi

Run all the tests against the default Kafka version

make toxa

Run the integration tests against all the Kafka versions the Makefile knows about

make toxik
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].