All Projects → sevenmind → Kaufmann_ex

sevenmind / Kaufmann_ex

Licence: mit
Kafka backed service library.

Programming Languages

elixir
2628 projects

Projects that are alternatives of or similar to Kaufmann ex

Schema Registry
Confluent Schema Registry for Kafka
Stars: ✭ 1,647 (+1815.12%)
Mutual labels:  schema, kafka, avro
Schema Registry
A CLI and Go client for Kafka Schema Registry
Stars: ✭ 105 (+22.09%)
Mutual labels:  schema, kafka, avro
Divolte Collector
Divolte Collector
Stars: ✭ 264 (+206.98%)
Mutual labels:  kafka, avro
Schema Registry Ui
Web tool for Avro Schema Registry |
Stars: ✭ 358 (+316.28%)
Mutual labels:  kafka, avro
Kafka Storm Starter
Code examples that show to integrate Apache Kafka 0.8+ with Apache Storm 0.9+ and Apache Spark Streaming 1.1+, while using Apache Avro as the data serialization format.
Stars: ✭ 728 (+746.51%)
Mutual labels:  kafka, avro
schema-registry
📙 json & avro http schema registry backed by Kafka
Stars: ✭ 23 (-73.26%)
Mutual labels:  schema, avro
avro ex
An Avro Library that emphasizes testability and ease of use.
Stars: ✭ 47 (-45.35%)
Mutual labels:  schema, avro
Pmacct
pmacct is a small set of multi-purpose passive network monitoring tools [NetFlow IPFIX sFlow libpcap BGP BMP RPKI IGP Streaming Telemetry].
Stars: ✭ 677 (+687.21%)
Mutual labels:  kafka, avro
sbt-avro
Plugin SBT to Generate Scala classes from Apache Avro schemas hosted on a remote Confluent Schema Registry.
Stars: ✭ 15 (-82.56%)
Mutual labels:  schema, avro
Tweetmap
A real time Tweet Trend Map and Sentiment Analysis web application with kafka, Angular, Spring Boot, Flink, Elasticsearch, Kibana, Docker and Kubernetes deployed on the cloud
Stars: ✭ 28 (-67.44%)
Mutual labels:  microservice, kafka
Quarkus Microservices Poc
Very simplified shop sales system made in a microservices architecture using quarkus
Stars: ✭ 16 (-81.4%)
Mutual labels:  microservice, kafka
Go Kafka Avro
A library provides consumer/producer to work with kafka, avro and schema registry
Stars: ✭ 39 (-54.65%)
Mutual labels:  kafka, avro
avro turf
A library that makes it easier to use the Avro serialization format from Ruby.
Stars: ✭ 130 (+51.16%)
Mutual labels:  schema, avro
avrow
Avrow is a pure Rust implementation of the avro specification https://avro.apache.org/docs/current/spec.html with Serde support.
Stars: ✭ 27 (-68.6%)
Mutual labels:  schema, avro
Insulator
A client UI to inspect Kafka topics, consume, produce and much more
Stars: ✭ 53 (-38.37%)
Mutual labels:  schema, avro
srclient
Golang Client for Schema Registry
Stars: ✭ 188 (+118.6%)
Mutual labels:  schema, avro
Cap
Distributed transaction solution in micro-service base on eventually consistency, also an eventbus with Outbox pattern
Stars: ✭ 5,208 (+5955.81%)
Mutual labels:  microservice, kafka
Event Sourcing Castanha
An Event Sourcing service template with DDD, TDD and SOLID. It has High Cohesion and Loose Coupling, it's a good start for your next Microservice application.
Stars: ✭ 68 (-20.93%)
Mutual labels:  microservice, kafka
Mongoke
Instant Graphql for MongoDb (active branch is golang, rewrite in process)
Stars: ✭ 203 (+136.05%)
Mutual labels:  microservice, schema
Yivnet
Yivnet is a microservice game server base on go-kit
Stars: ✭ 237 (+175.58%)
Mutual labels:  microservice, kafka

KaufmannEx

Build Status Hex.pm Inline docs Ebert codebeat badge

Check out our blog post about KaufmannEx

The goal of KaufmannEx is to provide a simple to use library for building kafka based microservices.

It should be simple and fast to write new microservices with Avro or JSON event schemas (or whatever).

Tieing KafkaEx, AvroEx, and Schemex.

KaufmannEx exists to make it easy to consume Avro encoded messages off of a kafka broker in a parallel, controlled, manner.

Installation

If available in Hex, the package can be installed by adding kaufmann_ex to your list of dependencies in mix.exs:

def deps do
  [
    {:kaufmann_ex, "~> 0.4.0-dev"}
  ]
end

Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/kaufmann_ex.

Usage

KaufmannEx is under very active development. So it's a little more complicated than is ideal at the moment.

KaufmannEx needs:

  • to be in mix.exs
  • to be in your Supervision Tree
  • config.exs
  • event_handler_mod
  • schemas

Supervision Tree

To Use KaufmannEx start by adding it to your supervision tree

defmodule Sample.Application do
  use Application
  require Logger

  def start(_type, _args) do
    children = [
      KaufmannEx.Supervisor
    ]

    opts = [strategy: :one_for_one, name: Sample.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Config.exs

KaufmannEx expects configuration in your Application Config.

Kaufman depends on KafkaEx which must also be configured

A config.exs may include something like this:

config :kaufmann_ex,
  consumer_group: "Consumer-Group",
  default_topics: ["default-topic"],
  event_handler_mod: MyApp.EventHandler,
  producer_mod: KaufmannEx.Publisher,
  schema_path: "priv/schemas",
  schema_registry_uri: "http://localhost:8081",
  transcoder: [
    default: KaufmannEx.Transcoder.SevenAvro,
    json: KaufmannEx.Transcoder.Json
  ]

config :kafka_ex,
  brokers: [
    {
      localhost,
      9092
    }
  ],
  consumer_group: "Consumer-Group",
  commit_threshold: 10,
  commit_interval: 100,
  sync_timeout: 10_000

event_handler_mod

KaufmannEx expects an event handler module with the callback given_event/1

defmodule MyApp.EventHandler do
  use KaufmannEx.EventHandler
  alias KaufmannEx.Schemas.Event

  @behaviour KaufmannEx.EventHandler

  @impl true
  def given_event(%Event{name: "test.command", payload: payload}) do
    message_body = do_some_work(payload)

    {:reply, [{"test.event", message_body, topic}]}
  end

  # In the event of an error a ErrorEvent is emitted
  def given_event(%Event{name: "this.event.returns.error", payload: payload}) do
    {:error, :unhandled_event}
  end
end

Events

KaufmannEx assumes every event has a matching event Avro or JSON Event Schema.

All AVRO events are expected to include a meta metadata key.

If an Event causes an exception it will emit an error event with "event.error.#{event.name}" as the event_name. Events that raise exceptions are not retried or persisted beyond emitting this error event. If specific handling of failing events is important to you, implement a dead-letter service or similar.

Internals

KaufmannEx uses KafkaEx.ConsumerGroup to subscribe to kafka topic/s. Events are consumed by a kafka_ex_gen_stage_consumer stage to a Flow event handler KaufmannEx.Consumer.Flow.

Release Tasks

There are a few release tasks intended for use as Distillery custom commands. Distillery's custom commands don't provide the environment we're used to with mix tasks, so extra configuration and care is needed.

migrate_schemas

Migrate Schemas will attempt to register all schemas in the implementing project's priv/schemas directory.

reinit_service

This task is intended to be used to recover idempotent services from a catastrophic failure or substantial architectural change.

ReInit Service will reset the configured consumer group to the earliest available Kafka Offset. It will then consume all events from the Kafka broker until the specified offset is reached (Or all events are consumed).

By default message publication is disabled during reinitialization. This can be overridden in KaufmannEx.ReleaseTasks.reinit_service/4.

Configuration & Use

These tasks are intended for use with Distillery in a release environment. In these examples the application is named Sample.

release_tasks.ex

defmodule Sample.ReleaseTasks do
  def migrate_schemas do
    Application.load(:kaufmann_ex)

    KaufmannEx.ReleaseTasks.migrate_schemas(:sample)
  end

  def reinit_service do
    Application.load(:kaufmann_ex)
    KaufmannEx.ReleaseTasks.reinit_service(:sample)  
  end
end

rel/config.exs

...

release :sample do
  set(
      commands: [
        migrate_schemas: "rel/commands/migrate_schemas.sh",
        reinit_service: "rel/commands/reinit_service.sh"
      ]
    )
end

rel/commands/migrate_schemas.sh

#!/bin/sh

$RELEASE_ROOT_DIR/bin/sample command Elixir.Sample.ReleaseTasks migrate_schemas

Common questions

When are offsets commited? In case of a node going down, will it lose messages?

It is possible to lose events when a node goes down. But we try to prevent that from happening.

  1. The backpressure in Kaufmann prevents pulling more events than processing capacity.
  2. KuafmannEx uses the default KafkaEx GenConsumer asynchronous offset commit behavior. Offsets are committed asynchronously on a timer or event count.

Ideally the kafka_ex :commit_threshold should be set somewhat larger than kaufmann_ex :max_demand (the default is 100 and 50, respectively). This should make it less likely that the node will be processing already-committed messages when it goes down.

Can order of delivery be guaranteed?

Kafka can only guarantee event ordering within a single partition. KaufmannEx will consume events in the order published, but event processing is not guaranteed to be sequential. KaufmannEx (as with kafka in general) does cannot provide strong consistency.

Telemetry

Kaufmann provides some internal :temeletry events

[:kaufmann_ex, :schemas, :decode]
[:kaufmann_ex, :event_handler, :handle_event]
[:kaufmann_ex, :schema, :encode]
[:kaufmann_ex, :publisher, :publish]

Kaufmann provides an optional logger demonstrating consumption of these events in KaufmannEx.TelemetryLogger. This logger can be started by adding KaufmannEx.TelemetryLogger to your supervision tree.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].