All Projects → bsm → Sarama Cluster

bsm / Sarama Cluster

Licence: mit
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 [DEPRECATED]

Programming Languages

go
31211 projects - #10 most used programming language

Projects that are alternatives of or similar to Sarama Cluster

Kafka Zk Restapi
Kafka Zookeeper RESTful API to perform topic/consumer group administration/metric(offset\lag\message) collection and monitor
Stars: ✭ 121 (-87.51%)
Mutual labels:  consumer, kafka
Qmq
QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。
Stars: ✭ 2,420 (+149.74%)
Mutual labels:  consumer, kafka
Kafka Flow
KafkaFlow is a .NET framework to consume and produce Kafka messages with multi-threading support. It's very simple to use and very extendable. You just need to install, configure, start/stop the bus with your app and create a middleware/handler to process the messages.
Stars: ✭ 118 (-87.82%)
Mutual labels:  consumer, kafka
Anotherkafkamonitor Akm
Another app which used to monitor the progress of Kafka Producer and Consumer
Stars: ✭ 36 (-96.28%)
Mutual labels:  consumer, kafka
Kafka Go
Kafka library in Go
Stars: ✭ 4,200 (+333.44%)
Mutual labels:  consumer, kafka
Node Sinek
🎩 Most advanced high level Node.js Kafka client
Stars: ✭ 262 (-72.96%)
Mutual labels:  consumer, kafka
Remora
Kafka consumer lag-checking application for monitoring, written in Scala and Akka HTTP; a wrap around the Kafka consumer group command. Integrations with Cloudwatch and Datadog. Authentication recently added
Stars: ✭ 183 (-81.11%)
Mutual labels:  consumer, kafka
Librdkafka
The Apache Kafka C/C++ library
Stars: ✭ 5,617 (+479.67%)
Mutual labels:  consumer, kafka
Kminion
KMinion is a feature-rich Prometheus exporter for Apache Kafka written in Go. It is lightweight and highly configurable so that it will meet your requirements.
Stars: ✭ 274 (-71.72%)
Mutual labels:  consumer, kafka
Qbusbridge
The Apache Kafka Client SDK
Stars: ✭ 272 (-71.93%)
Mutual labels:  consumer, kafka
Zerocode
A community-developed, free, open source, microservices API automation and load testing framework built using JUnit core runners for Http REST, SOAP, Security, Database, Kafka and much more. Zerocode Open Source enables you to create, change, orchestrate and maintain your automated test cases declaratively with absolute ease.
Stars: ✭ 482 (-50.26%)
Mutual labels:  consumer, kafka
Aidp
weiboAd Infrastructure Data Processor : kafka consumer embedded Lua scripting language in data process framework
Stars: ✭ 20 (-97.94%)
Mutual labels:  consumer, kafka
Requery
Store e run queries on database to help system manager of a Django website
Stars: ✭ 12 (-98.76%)
Mutual labels:  deprecated
Tweetmap
A real time Tweet Trend Map and Sentiment Analysis web application with kafka, Angular, Spring Boot, Flink, Elasticsearch, Kibana, Docker and Kubernetes deployed on the cloud
Stars: ✭ 28 (-97.11%)
Mutual labels:  kafka
Grunt Csswring
DEPRECATED. Minify CSS files using PostCSS-based CSSWring
Stars: ✭ 12 (-98.76%)
Mutual labels:  deprecated
Javaok
必看!java后端,亮剑诛仙。java发展路线技术要点。
Stars: ✭ 867 (-10.53%)
Mutual labels:  kafka
Kafka End 2 End Encryption
De-/Serialization wrapper for Kafka to accomplish end to end encryption
Stars: ✭ 32 (-96.7%)
Mutual labels:  kafka
Overwatch Js
Overwatch NodeJS API : Retrieve informations about heroes/players from Overwatch Official Website
Stars: ✭ 27 (-97.21%)
Mutual labels:  deprecated
Python Kafka Elasticsearch
Simple learning project pushing CSV data into Kafka then indexing the data in ElasticSearch
Stars: ✭ 11 (-98.86%)
Mutual labels:  kafka
Rom Kafka
Apache Kafka support for Ruby Object Mapper
Stars: ✭ 11 (-98.86%)
Mutual labels:  kafka

Sarama Cluster

GoDoc Build Status Go Report Card License

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

DEPRECATION NOTICE

Please note that since https://github.com/Shopify/sarama/pull/1099 was merged and released (>= v1.19.0) this library is officially deprecated. The native implementation supports a variety of use cases that are not available through this library.

Documentation

Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster

Examples

Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple topics and partitions are all passed to the single channel:

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume errors
	go func() {
		for err := range consumer.Errors() {
			log.Printf("Error: %s\n", err.Error())
		}
	}()

	// consume notifications
	go func() {
		for ntf := range consumer.Notifications() {
			log.Printf("Rebalanced: %+v\n", ntf)
		}
	}()

	// consume messages, watch signals
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "")	// mark message as processed
			}
		case <-signals:
			return
		}
	}
}

Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level consumers:

package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"

  cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, set mode to ConsumerModePartitions
	config := cluster.NewConfig()
	config.Group.Mode = cluster.ConsumerModePartitions

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume partitions
	for {
		select {
		case part, ok := <-consumer.Partitions():
			if !ok {
				return
			}

			// start a separate goroutine to consume messages
			go func(pc cluster.PartitionConsumer) {
				for msg := range pc.Messages() {
					fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
					consumer.MarkOffset(msg, "")	// mark message as processed
				}
			}(part)
		case <-signals:
			return
		}
	}
}

Running tests

You need to install Ginkgo & Gomega to run tests. Please see http://onsi.github.io/ginkgo for more details.

To run tests, call:

$ make test

Troubleshooting

Consumer not receiving any messages?

By default, sarama's Config.Consumer.Offsets.Initial is set to sarama.OffsetNewest. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.

If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set Config.Consumer.Offsets.Initial to sarama.OffsetOldest.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].