All Projects → jobcloud → php-kafka-lib

jobcloud / php-kafka-lib

Licence: MIT license
PHP Kafka producer / consumer library with PHP Avro support, based on php-rdkafka

Programming Languages

PHP
23972 projects - #3 most used programming language

Projects that are alternatives of or similar to php-kafka-lib

rocketmq
RocketMQ client for go supportting producer and consumer.
Stars: ✭ 29 (-23.68%)
Mutual labels:  consumer, producer
pulsar-flex
Pulsar Flex is a modern Apache Pulsar client for Node.js, developed to be independent of C++.
Stars: ✭ 43 (+13.16%)
Mutual labels:  consumer, producer
youtube-dl-nas
youtube download queue websocket server with login for private NAS.
Stars: ✭ 136 (+257.89%)
Mutual labels:  consumer, producer
Kafka Go
Kafka library in Go
Stars: ✭ 4,200 (+10952.63%)
Mutual labels:  consumer, producer
Qmq
QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。
Stars: ✭ 2,420 (+6268.42%)
Mutual labels:  consumer, producer
Insulator
A client UI to inspect Kafka topics, consume, produce and much more
Stars: ✭ 53 (+39.47%)
Mutual labels:  avro, consumer
Confluent Kafka Go
Confluent's Apache Kafka Golang client
Stars: ✭ 3,047 (+7918.42%)
Mutual labels:  consumer, producer
frizzle
The magic message bus
Stars: ✭ 14 (-63.16%)
Mutual labels:  consumer, producer
messaging-polyglot
RabbitMQ Messaging Polyglot with Java, ColdFusion, CommandBox, Groovy and more
Stars: ✭ 18 (-52.63%)
Mutual labels:  consumer
srclient
Golang Client for Schema Registry
Stars: ✭ 188 (+394.74%)
Mutual labels:  avro
retail-banking
Consumer Banking Application
Stars: ✭ 25 (-34.21%)
Mutual labels:  consumer
parquet-flinktacular
How to use Parquet in Flink
Stars: ✭ 29 (-23.68%)
Mutual labels:  avro
avrow
Avrow is a pure Rust implementation of the avro specification https://avro.apache.org/docs/current/spec.html with Serde support.
Stars: ✭ 27 (-28.95%)
Mutual labels:  avro
node-bunnymq
BunnyMQ is an amqp.node wrapper to ease common AMQP usages (RPC, pub/sub, channel/connection handling etc.).
Stars: ✭ 20 (-47.37%)
Mutual labels:  consumer
kafka-avro-confluent
Kafka De/Serializer using avro and Confluent's Schema Registry
Stars: ✭ 18 (-52.63%)
Mutual labels:  avro
rabbitmq-consumer
A configurable RabbitMQ consumer made in Rust, useful for a stable and reliable CLI commands processor.
Stars: ✭ 25 (-34.21%)
Mutual labels:  consumer
columnify
Make record oriented data to columnar format.
Stars: ✭ 28 (-26.32%)
Mutual labels:  avro
kafka-scala-examples
Examples of Avro, Kafka, Schema Registry, Kafka Streams, Interactive Queries, KSQL, Kafka Connect in Scala
Stars: ✭ 53 (+39.47%)
Mutual labels:  avro
parquet-extra
A collection of Apache Parquet add-on modules
Stars: ✭ 30 (-21.05%)
Mutual labels:  avro
tamer
Standalone alternatives to Kafka Connect Connectors
Stars: ✭ 42 (+10.53%)
Mutual labels:  avro

php-kafka-lib

CircleCI Maintainability Test Coverage Latest Stable Version Latest Unstable Version

Description

This is a library that makes it easier to use Kafka in your PHP project.

This library relies on arnaud-lb/php-rdkafka
Avro support relies on flix-tech/avro-serde-php
The documentation of the php extension,
can help out to understand the internals of this library.

Requirements

  • php: ^7.3|^8.0
  • ext-rdkafka: >=4.0.0
  • librdkafka: >=0.11.6 (if you use <librdkafka:1.x please define your own error callback)

⚠️ To use the transactional producer you'll need:

  • ext-rdkafka: >=4.1.0
  • librdkafka: >=1.4

Installation

composer require jobcloud/php-kafka-lib "~1.0"

Enable Avro support

If you need Avro support, run:

composer require flix-tech/avro-serde-php "~1.4"

Usage

Producer

Kafka

Simple example
<?php

use Jobcloud\Kafka\Message\KafkaProducerMessage;
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;

$producer = KafkaProducerBuilder::create()
    ->withAdditionalBroker('localhost:9092')
    ->build();

$message = KafkaProducerMessage::create('test-topic', 0)
            ->withKey('asdf-asdf-asfd-asdf')
            ->withBody('some test message payload')
            ->withHeaders([ 'key' => 'value' ]);

$producer->produce($message);

// Shutdown producer, flush messages that are in queue. Give up after 20s
$result = $producer->flush(20000);
Transactional producer (needs >=php-rdkafka:4.1 and >=librdkafka:1.4)
<?php

use Jobcloud\Kafka\Message\KafkaProducerMessage;
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionRetryException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionAbortException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionFatalException;

$producer = KafkaProducerBuilder::create()
    ->withAdditionalBroker('localhost:9092')
    ->build();

$message = KafkaProducerMessage::create('test-topic', 0)
            ->withKey('asdf-asdf-asfd-asdf')
            ->withBody('some test message payload')
            ->withHeaders([ 'key' => 'value' ]);
try {
    $producer->beginTransaction(10000);
    $producer->produce($message);
    $producer->commitTransaction(10000);
} catch (KafkaProducerTransactionRetryException $e) {
    // something went wrong but you can retry the failed call (either beginTransaction or commitTransaction)
} catch (KafkaProducerTransactionAbortException $e) {
    // you need to call $producer->abortTransaction(10000); and try again
} catch (KafkaProducerTransactionFatalException $e) {
    // something went very wrong, re-create your producer, otherwise you could jeopardize the idempotency guarantees
}

// Shutdown producer, flush messages that are in queue. Give up after 20s
$result = $producer->flush(20000);
Avro Producer

To create an avro prodcuer add the avro encoder.

<?php

use FlixTech\AvroSerializer\Objects\RecordSerializer;
use Jobcloud\Kafka\Message\KafkaProducerMessage;
use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
use Jobcloud\Kafka\Message\KafkaAvroSchema;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;

$cachedRegistry = new CachedRegistry(
    new BlockingRegistry(
        new PromisingRegistry(
            new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
        )
    ),
    new AvroObjectCacheAdapter()
);

$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);

//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addBodySchemaMappingForTopic(
    'test-topic',
    new KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */)
);
$registry->addKeySchemaMappingForTopic(
    'test-topic',
    new KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */)
);

// if you are only encoding key or value, you can pass that mode as additional third argument
// per default both key and body will get encoded
$encoder = new AvroEncoder($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */);

$producer = KafkaProducerBuilder::create()
    ->withAdditionalBroker('kafka:9092')
    ->withEncoder($encoder)
    ->build();

$schemaName = 'testSchema';
$version = 1;
$message = KafkaProducerMessage::create('test-topic', 0)
            ->withKey('asdf-asdf-asfd-asdf')
            ->withBody(['name' => 'someName'])
            ->withHeaders([ 'key' => 'value' ]);

$producer->produce($message);

// Shutdown producer, flush messages that are in queue. Give up after 20s
$result = $producer->flush(20000);

NOTE: To improve producer latency you can install the pcntl extension.
The php-kafka-lib already has code in place, similarly described here:
https://github.com/arnaud-lb/php-rdkafka#performance--low-latency-settings

Consumer

Kafka High Level

<?php

use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;

$consumer = KafkaConsumerBuilder::create()
     ->withAdditionalConfig(
        [
            'compression.codec' => 'lz4',
            'auto.commit.interval.ms' => 500
        ]
    )
    ->withAdditionalBroker('kafka:9092')
    ->withConsumerGroup('testGroup')
    ->withAdditionalSubscription('test-topic')
    ->build();

$consumer->subscribe();

while (true) {
    try {
        $message = $consumer->consume();
        // your business logic
        $consumer->commit($message);
    } catch (KafkaConsumerTimeoutException $e) {
        //no messages were read in a given time
    } catch (KafkaConsumerEndOfPartitionException $e) {
        //only occurs if enable.partition.eof is true (default: false)
    } catch (KafkaConsumerConsumeException $e) {
        // Failed
    }
}

Kafka Low Level

<?php

use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;

$consumer = KafkaConsumerBuilder::create()
     ->withAdditionalConfig(
        [
            'compression.codec' => 'lz4',
            'auto.commit.interval.ms' => 500
        ]
    )
    ->withAdditionalBroker('kafka:9092')
    ->withConsumerGroup('testGroup')
    ->withAdditionalSubscription('test-topic')
    ->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL)
    ->build();

$consumer->subscribe();

while (true) {
    try {
        $message = $consumer->consume();
        // your business logic
        $consumer->commit($message);
    } catch (KafkaConsumerTimeoutException $e) {
        //no messages were read in a given time
    } catch (KafkaConsumerEndOfPartitionException $e) {
        //only occurs if enable.partition.eof is true (default: false)
    } catch (KafkaConsumerConsumeException $e) {
        // Failed
    } 
}

Avro Consumer

To create an avro consumer add the avro decoder.

<?php

use FlixTech\AvroSerializer\Objects\RecordSerializer;
use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
use Jobcloud\Kafka\Message\Decoder\AvroDecoder;
use Jobcloud\Kafka\Message\KafkaAvroSchema;
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;

$cachedRegistry = new CachedRegistry(
    new BlockingRegistry(
        new PromisingRegistry(
            new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
        )
    ),
    new AvroObjectCacheAdapter()
);

$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);

//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addBodySchemaMappingForTopic(
    'test-topic',
    new KafkaAvroSchema('bodySchema' , 9 /* , AvroSchema $definition */)
);
$registry->addKeySchemaMappingForTopic(
    'test-topic',
    new KafkaAvroSchema('keySchema' , 9 /* , AvroSchema $definition */)
);

// If you are only encoding / decoding key or value, only register the schema(s) you need.
// It is advised against doing that though, some tools might not play
// nice if you don't fully encode your message
$decoder = new AvroDecoder($registry, $recordSerializer);

$consumer = KafkaConsumerBuilder::create()
     ->withAdditionalConfig(
        [
            'compression.codec' => 'lz4',
            'auto.commit.interval.ms' => 500
        ]
    )
    ->withDecoder($decoder)
    ->withAdditionalBroker('kafka:9092')
    ->withConsumerGroup('testGroup')
    ->withAdditionalSubscription('test-topic')
    ->build();

$consumer->subscribe();

while (true) {
    try {
        $message = $consumer->consume();
        // your business logic
        $consumer->commit($message);
    } catch (KafkaConsumerTimeoutException $e) {
        //no messages were read in a given time
    } catch (KafkaConsumerEndOfPartitionException $e) {
        //only occurs if enable.partition.eof is true (default: false)
    } catch (KafkaConsumerConsumeException $e) {
        // Failed
    } 
}

Additional information

Replaces messaging-lib
Check Migration.md for help to migrate.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].