All Projects → qntfy → frizzle

qntfy / frizzle

Licence: MIT license
The magic message bus

Programming Languages

go
31211 projects - #10 most used programming language

Projects that are alternatives of or similar to frizzle

godsend
A simple and eloquent workflow for streaming messages to micro-services.
Stars: ✭ 15 (+7.14%)
Mutual labels:  message-bus, stream-processing, streaming-data
Qmq
QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。
Stars: ✭ 2,420 (+17185.71%)
Mutual labels:  message-bus, consumer, producer
Benthos
Fancy stream processing made operationally mundane
Stars: ✭ 3,705 (+26364.29%)
Mutual labels:  message-bus, stream-processing, streaming-data
Machine
Machine is a workflow/pipeline library for processing data
Stars: ✭ 78 (+457.14%)
Mutual labels:  pipeline, stream-processing, streaming-data
Go Streams
A lightweight stream processing library for Go
Stars: ✭ 615 (+4292.86%)
Mutual labels:  pipeline, stream-processing, streaming-data
rocketmq
RocketMQ client for go supportting producer and consumer.
Stars: ✭ 29 (+107.14%)
Mutual labels:  consumer, producer
Kafka Go
Kafka library in Go
Stars: ✭ 4,200 (+29900%)
Mutual labels:  consumer, producer
Confluent Kafka Go
Confluent's Apache Kafka Golang client
Stars: ✭ 3,047 (+21664.29%)
Mutual labels:  consumer, producer
Kafka Streams In Action
Source code for the Kafka Streams in Action Book
Stars: ✭ 167 (+1092.86%)
Mutual labels:  stream-processing, streaming-data
aws-kinesis-consumer
Consume an AWS Kinesis Data Stream to look over the records from a terminal.
Stars: ✭ 23 (+64.29%)
Mutual labels:  kinesis, consumer
vector
A high-performance observability data pipeline.
Stars: ✭ 12,138 (+86600%)
Mutual labels:  pipeline, stream-processing
php-kafka-lib
PHP Kafka producer / consumer library with PHP Avro support, based on php-rdkafka
Stars: ✭ 38 (+171.43%)
Mutual labels:  consumer, producer
pulsar-flex
Pulsar Flex is a modern Apache Pulsar client for Node.js, developed to be independent of C++.
Stars: ✭ 43 (+207.14%)
Mutual labels:  consumer, producer
youtube-dl-nas
youtube download queue websocket server with login for private NAS.
Stars: ✭ 136 (+871.43%)
Mutual labels:  consumer, producer
Vector
A reliable, high-performance tool for building observability data pipelines.
Stars: ✭ 8,736 (+62300%)
Mutual labels:  pipeline, stream-processing
Gollum
An n:m message multiplexer written in Go
Stars: ✭ 883 (+6207.14%)
Mutual labels:  pipeline, message-bus
Awesome Bigdata
A curated list of awesome big data frameworks, ressources and other awesomeness.
Stars: ✭ 10,478 (+74742.86%)
Mutual labels:  stream-processing, streaming-data
Real Time Sentiment Tracking On Twitter For Brand Improvement And Trend Recognition
A real-time interactive web app based on data pipelines using streaming Twitter data, automated sentiment analysis, and MySQL&PostgreSQL database (Deployed on Heroku)
Stars: ✭ 127 (+807.14%)
Mutual labels:  stream-processing, streaming-data
Dolphinbeat
A server that pulls and parses MySQL binlog, pushs change data into different sinks like Kafka.
Stars: ✭ 164 (+1071.43%)
Mutual labels:  pipeline, stream-processing
Rangeless
c++ LINQ -like library of higher-order functions for data manipulation
Stars: ✭ 148 (+957.14%)
Mutual labels:  pipeline, streaming-data

Frizzle

Travis Build Status Coverage Status MIT licensed GitHub release Go Report Card GoDoc

Frizzle is a magic message (Msg) bus designed for parallel processing with many goroutines.

  • Receive() messages from a configured Source
  • Do your processing, possibly Send() each Msg on to one or more Sink destinations
  • Ack() (or Fail()) the Msg to notify the Source that processing completed

Getting Started

Start with the example implementation which shows a simple canonical implementation of a Processor on top of Frizzle and most of the core functions.

high level interface

// Frizzle is a Msg bus for rapidly configuring and processing messages between multiple message services.
type Frizzle interface {
	Receive() <-chan Msg
	Send(m Msg, dest string) error
	Ack(Msg) error
	Fail(Msg) error
	Events() <-chan Event
	AddOptions(...Option)
	FlushAndClose(timeout time.Duration) error
	Close() error
}

func Init(source Source, sink Sink, opts ...Option) Frizzle

The core of the repo is a Friz struct (returned by Init()) which implements Frizzle. The intent is for separate Source and Sink implementations (in separate repos) to be mixed and matched with the glue of Frizzle. A processing library can take a Frizzle input to allow easy re-use with multiple underlying message technologies. Friz also implements Source and Sink to allow chaining if needed.

Source and Sink Implementations

If you write a new implementation, we'd love to add it to our list!

Msg

A basic interface which can be extended:

// Msg encapsulates an immutable message passed around by Frizzle
type Msg interface {
	ID() string
	Data() []byte
	Timestamp() time.Time
}

A frizzle.SimpleMsg struct is provided for basic use cases.

Source and Sink

// Source defines a stream of incoming Msgs to be Received for processing,
// and reporting whether or not processing was successful.
type Source interface {
	Receive() <-chan Msg
	Ack(m Msg) error
	Fail(m Msg) error
	UnAcked() []Msg
	Stop() error
	Close() error
}

// Sink defines a message service where Msgs can be sent as part of processing.
type Sink interface {
	Send(m Msg, dest string) error
	Close() error
}

Options

Frizzle supports a variety of Option parameters for additional functionality to simplify your integration. These can be included with Init() or added using a friz.AddOptions() call. Note that AddOptions() updates the current friz and does not return anything.

Currently supported options:

  • Logger(log *zap.Logger) - Include a logger to report frizzle-internal logging.
  • Stats(stats StatsIncrementer) - Include a stats client for frizzle-internal metrics reporting. See Stats for what metrics are supported.
  • FailSink(s Sink, dest string) - Provide a Sink and destination (kafka topic, kinesis stream etc) where Fail()ed Msgs will be sent automatically.
  • MonitorProcessingRate(pollPeriod time.Duration) - Log the sum count of Acked and Failed Msgs every pollPeriod.
  • ReportAsyncErrors() - Launch a simple go routine to monitor the Events() channel. All events are logged at Error or Warn level; any events that match error interface have a stat recorded. Logging and/or stats are disabled if Logger()/Stats() have not been set, respectively.
    • This is a most basic handling that does not account for any specific Event types from Source/Sink implementations; developers should write an app specific monitoring routine to parse and handle specific Event cases (for which this can be a helpful starting template).
  • HandleShutdown(appShutdown func()) - Monitor for SIGINT and SIGTERM, call FlushAndClose() followed by provided appShutdown when they are received.
  • WithTransformer(ft FrizTransformer) - Add a transformer to modify the Msg's before they are sent or received. Currently only supports a "Simple Separator" Transformer which adds a specified record separator (such as newline) before sending if it isn't already present, and removes the same separator on receive if it is present.

Events

Since Source and Sink implementations often send and receive Msgs in batch fashion, They often may find out about any errors (or other important events) asynchronously. To support this, async events can be recovered via a channel returned by the Friz.Events() method. If a Source/Sink does not implement the Eventer interface this functionality will be ignored.

Caveats for using Events()

  • Frizzle Events must provide a minimum String() interface; when consuming Events a type assertion switch is highly recommended to receive other relevant information.
    • A default: trap for unhandled cases is also highly recommended!
    • For a reference implementation of the same interface see here
  • A Friz's Events() channel will be closed after all underlying Source/Sink Events() channels are closed.
    • If a Friz is initialized without any Source/Sinks that implement Events(), the channel returned by Friz.Events() will be closed immediately.

In addition to the String() method required by frizzle, currently only errors are returned by frinesis (no other event types) so all Events recovered will also conform to error interface.

Transformers

Transformers provide a mechanism to do simple updates to a Msg prior to a Send() or Receive(), which can be added at initializiation but is otherwise transparent to the processor and Source/Sink. This can be useful in a case where e.g. you need to apply a transform when running on one messaging platform but not another, and don't want to expose the core processing code to information about which platform is in use.

Frizzle supports adding Transformers with a WithTransformer() Option:

// WithTransformer returns an Option to add the provided FrizTransformer to a Frizzle
func WithTransformer(ft FrizTransformer) Option

// Transform is a type that modifies a Msg
type Transform func(Msg) Msg

// FrizTransformer provides a Transform to apply when a Msg is sent or received
type FrizTransformer interface {
	SendTransform() Transform
	ReceiveTransform() Transform
}

An example implementation to add and remove a separator suffix on each Msg is included in transform.go. To reduce clutter we generally suggest implementing a new Transform in a separate repo, but we can consider adding high utility ones here.

Prereqs / Build instructions

Go mod

As of Go 1.11, frizzle uses go mod for dependency management.

Install

$ go get github.com/qntfy/frizzle
$ cd frizzle
$ go build

Running the tests

go test -v --cover ./...

Configuration

We recommend building Sources and Sinks to initialize using Viper, typically through environment variables (but client can do whatever it wants, just needs to provide the configured Viper object with relevant values). The application might use a prefix such as before the below values.

Basic

Variable Required Description Default
BUFFER_SIZE source (optional) size of Input() channel buffer 500
MOCK optional mocks don't track sent or unacked Msgs, just return without error false

Stats

StatsIncrementer is a simple interface with just Increment(bucket string); based on github.com/alexcesaro/statsd but potentially compatible with a variety of metrics engines. When Stats() is set, Frizzle records the following metrics. If a Logger() has been set, each of the below also generates a Debug level log with the ID() of the Msg.

Bucket Description
ctr.rcv count of Msgs received from Source
ctr.send count of Msgs sent to Sink
ctr.ack count of Msgs Ack'ed by application
ctr.fail count of Msgs Fail'ed by application
ctr.failsink count of Msgs sent to FailSink
ctr.error count of errors from Events()*

* only recorded if ReportAsyncErrors is running

Contributing

Contributions welcome! Take a look at open issues. New Source/Sink implementations should be added in separate repos. If you let us know (and link to test demonstrating it conforms to the interface) we are happy to link them here!

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].