All Projects → helins → kafka.clj

helins / kafka.clj

Licence: EPL-1.0 License
Clojure client for Kafka

Programming Languages

clojure
4091 projects

Projects that are alternatives of or similar to kafka.clj

eav-bundle
A Symfony bundle for basic EAV management
Stars: ✭ 19 (-81%)
Mutual labels:  cqrs
SplitetFramework
Splitet is a Java based Event Sourcing framework which can be benefited by the teams who are planning to make CQRS transitions with minimum learning curve and ease of adaptation.
Stars: ✭ 159 (+59%)
Mutual labels:  cqrs
cqrs-typescript
CQRS implementation in typescript
Stars: ✭ 29 (-71%)
Mutual labels:  cqrs
CQELight
CQELight is an entreprise grade extensible and customisable framework for creating software with CQRS, DDD & Event Sourcing patterns
Stars: ✭ 21 (-79%)
Mutual labels:  cqrs
dudulina
CQRS + Event Sourcing library for PHP
Stars: ✭ 53 (-47%)
Mutual labels:  cqrs
MQTT.jl
An asynchronous MQTT client library for julia
Stars: ✭ 15 (-85%)
Mutual labels:  broker
Plastic
This project provides encapsulation of things like Domain, Application Rules, Business Rules or Business Logic in Application.
Stars: ✭ 30 (-70%)
Mutual labels:  cqrs
commanded-ecto-projections
Read model projections for Commanded using Ecto
Stars: ✭ 68 (-32%)
Mutual labels:  cqrs
les
Go directly from an event storming to a working API: Event Markdown / Markup validation & NodeJS CQRS/ES application builder.
Stars: ✭ 48 (-52%)
Mutual labels:  cqrs
delta
DDD-centric event-sourcing library for the JVM
Stars: ✭ 15 (-85%)
Mutual labels:  cqrs
ftgogo
FTGOGO - event-driven architecture demonstration application using edat
Stars: ✭ 82 (-18%)
Mutual labels:  cqrs
sqrs
🚌SQRS is a JavaScript library for implementing CQRS pattern.
Stars: ✭ 23 (-77%)
Mutual labels:  cqrs
MediatrTutorial
CQRS implementation in ASP.NET Core using MediatR in .NET 5
Stars: ✭ 88 (-12%)
Mutual labels:  cqrs
Cart
No description or website provided.
Stars: ✭ 22 (-78%)
Mutual labels:  cqrs
MediatR.Extensions.Autofac.DependencyInjection
Autofac plug-in for MediatR.
Stars: ✭ 30 (-70%)
Mutual labels:  cqrs
cqrs-event-sourcing-example
Example of a list-making Web API using CQRS, Event Sourcing and DDD.
Stars: ✭ 28 (-72%)
Mutual labels:  cqrs
Distributed-eStore
Ecommerce SPA application with a microservices architecture implemented from scratch. Tech stack - Docker, Consul, Fabio, RabbitMQ, .Net Core, Mediatr, CQRS, React, Redux. .NET Core Microservices template, .NET React Redux, .NET RabbitMQ, .NET Distributed, Docker, .NET Core with Docker.
Stars: ✭ 99 (-1%)
Mutual labels:  cqrs
coreddd
A set of open-source .NET libraries helping with domain-driven design (DDD) and CQRS
Stars: ✭ 68 (-32%)
Mutual labels:  cqrs
implementing-cqrs-in-elixir
An introduction to implementing Command Query Responsibility Segregation (CQRS) in Elixir applications.
Stars: ✭ 17 (-83%)
Mutual labels:  cqrs
library-php
WIP: A comprehensive Domain-Driven Design example with problem space strategic analysis and various tactical patterns.
Stars: ✭ 73 (-27%)
Mutual labels:  cqrs

dvlopt.kafka

Clojars Project

This Apache Kafka client library is a Clojure wrapper for the official Java libraries. It strives for a balance between being idiomatic but not too clever. Users accustomed to the java libraries will be right at home, although it is not a prerequisite.

It provides namespaces for handling consumers, producers, and doing some administration. Also, we have the pleasure to announce that Kafka Streams is fully supported. The user can create a mock Kafka Streams application which do not need a running Kafka Cluster. This is perfect for learning the library as well as building applications at the REPL and seeing how they behave right away.

Ready for Kafka 2.1.0. Previously known as "Milena", the API of this iteration is considered stable unless something significantly changes in the Java libraries. In general, only the namespace relative to administration is at risk.

We try to provide good documention because many important concepts are poorly explained or confusing in the Java libraries. Feel free to provide feedback, contribute, and let us know if something is not clear.

Usage

First, read the fairly detailed API. Specially if you are not used to the java libraries and their various concepts.

Alternatively, documentation can be generated by running :

$ lein codox
$ cd doc/auto

Then, have a look at the following examples. Just so we are prepared, let us require all namespaces involved.

You can clone this repo and start a REPL, everything needed is imported in the dev namespace.

;; For Kafka :

(require '[dvlopt.kafka       :as K]
         '[dvlopt.kafka.admin :as K.admin]
         '[dvlopt.kafka.in    :as K.in]
         '[dvlopt.kafka.out   :as K.out])


;; For Kafka Streams :

(require '[dvlopt.kstreams          :as KS]
         '[dvlopt.kstreams.mock     :as KS.mock]
         '[dvlopt.kstreams.topology :as KS.topology]
         '[dvlopt.kstreams.ctx      :as KS.ctx]
         '[dvlopt.kstreams.store    :as KS.store]
         '[dvlopt.kstreams.builder  :as KS.builder]
         '[dvlopt.kstreams.stream   :as KS.stream]
         '[dvlopt.kstreams.table    :as KS.table])

Administration

Creating topic "my-topic" using the dvlopt.kafka.admin namespace.

(with-open [admin (K.admin/admin)]
  (K.admin/create-topics admin
                         {"my-topic" {::K.admin/number-of-partitions 4
                                      ::K.admin/replication-factor   3
                                      ::K.admin/configuration        {"cleanup.policy" "compact"}}})
  (println "Existing topics : " (keys @(K.admin/topics admin
                                                       {::K/internal? false}))))

Producing records

Sending 25 records to "my-topic" using the dvlopt.kafka.out namespace.

(with-open [producer (K.out/producer {::K/nodes             [["localhost" 9092]]
                                      ::K/serializer.key    (K/serializers :long)
                                      ::K/serializer.value  :long
                                      ::K.out/configuration {"client.id" "my-producer"}})]
  (doseq [i (range 25)]
    (K.out/send producer
                {::K/topic "my-topic"
                 ::K/key   i
                 ::K/value (* 100 i)}
                (fn callback [exception metadata]
                  (println (format "Record %d : %s"
                                   i
                                   (if exception
                                     "FAILURE"
                                     "SUCCESS")))))))

Consuming records

Reading a batch of records from "my-topic" and manually commit the offset of where we are using the dvlopt.kafka.in namespace.

(with-open [consumer (K.in/consumer {::K/nodes              [["localhost" 9092]]
                                     ::K/deserializer.key   :long
                                     ::K/deserializer.value :long
                                     ::K.in/configuration   {"auto.offset.reset" "earliest"
                                                             "enable.auto.commit" false
                                                             "group.id"           "my-group"}})]
  (K.in/register-for consumer
                     ["my-topic"])
  (doseq [record (K.in/poll consumer
                            {::K/timeout [5 :seconds]})]
    (println (format "Record %d @%d - Key = %d, Value = %d"
                     (::K/offset record)
                     (::K/timestamp record)
                     (::K/key record)
                     (::K/value record))))
  (K.in/commit-offsets consumer))

Kafka Streams low-level API

Useless but simple example of grouping records in two categories based on their key, "odd" and "even", and continuously summing values in each category.

First, we create a topology. We then add a source node fetching records from "my-input-topic". Those records are processed by "my-processor" which needs "my-store" in order to persist the current sum for each category. Finally, a sink node receives processed records and sends them to "my-output-topic".

For testing the topology, we create a mock Kafka Streams application which emulate a Kafka cluster. This is perfect for learning, testing, and fiddling at the REPL. We pipe a few records to see how it is behaving. Of course, you could test it with a running cluster.

(def topology
     (-> (KS.topology/topology)
         (KS.topology/add-source "my-source"
                                 ["my-input-topic"]
                                 {::K/deserializer.key   :long
                                  ::K/deserializer.value :long
                                  ::KS/offset-reset      :earliest})
         (KS.topology/add-processor "my-processor"
                                    ["my-source"]
                                    {::KS/processor.init      (fn [ctx]
                                                                (KS.ctx/kv-store ctx
                                                                                 "my-store"))
                                     ::KS/processor.on-record (fn [ctx my-store record]
                                                                (println "Processing record : " record)
                                                                (let [key' (if (odd? (::K/key record))
                                                                             "odd"
                                                                             "even")
                                                                      sum  (+ (or (KS.store/kv-get my-store
                                                                                                   key')
                                                                                  0)
                                                                              (::K/value record))]
                                                                  (KS.store/kv-put my-store
                                                                                   key'
                                                                                   sum)
                                                                  (KS.ctx/forward ctx
                                                                                  {::K/key   key'
                                                                                   ::K/value sum})))})
         (KS.topology/add-store ["my-processor"]
                                {::K/deserializer.key   :string
                                 ::K/deserializer.value :long
                                 ::K/serializer.key     :string
                                 ::K/serializer.value   :long
                                 ::KS.store/name        "my-store"
                                 ::KS.store/type        :kv.in-memory
                                 ::KS.store/cache?      false})
         (KS.topology/add-sink "my-sink"
                               ["my-processor"]
                               "my-output-topic"
                               {::K/serializer.key   :string
                                ::K/serializer.value :long})))


;; This will run without a Kafka Cluster

(def mock-app
     (KS.mock/mock-app "KS-low-level-test"
                       topology))


;; We pipe a few records into our fake runtime

(dotimes [i 25]
  (KS.mock/pipe-record mock-app
                       {::K/topic "my-input-topic"
                        ::K/partition 0
                        ::K/offset    i
                        ::K/key       ((K/serializers :long) i)
                        ::K/value     ((K/serializers :long) i)}))


;; Run this a few times to se the result

(KS.mock/read-record mock-app
       	             "my-output-topic"
                     {::K/deserializer.key   :string
                      ::K/deserializer.value :long})


;; If you wish to use a real Kafka cluster instead

(comment
    (def app
         (KS/app "my-app-1"
                 topology
                 {::K/nodes         [["localhost" 9092]]
                  ::KS/on-exception (fn [exception _thread]
                                      (println "Exception : " exception))}))


    (KS/start app))

Kafka Streams high-level API

Same example as previously but in a more functional style. In addition, values are aggregated in 2 seconds windows (it is best to run the producer example a few times first).

First, we need a builder. Then, we add a stream fetching records from "my-input-topic". Records are then grouped into our categories and then each category is windowed in 2 seconds windows. Each window is then reduced for computing a sum. We are now ready and we can build a topology out of our builder. It is always a good idea to have a look at the description of the built topology to have a better idea of what is created by the high-level API.

A window store is then retrieved and each window for each category is printed.

(def topology
     (let [builder (KS.builder/builder)]
       (-> builder
           (KS.builder/stream ["my-input-topic"]
                              {::K/deserializer.key   :long
                               ::K/deserializer.value :long
                               ::KS/offset-reset      :earliest})
           (KS.stream/group-by (fn [k v]
                                 (println (format "Grouping [%d %d]"
                                                  k
                                                  v))
                                 (if (odd? k)
                                   "odd"
                                   "even"))
                               {::K/deserializer.key   :string
                                ::K/deserializer.value :long
                                ::K/serializer.key     :string
                                ::K/serializer.value   :long})
           (KS.stream/window [2 :seconds])
           (KS.stream/reduce-windows (fn reduce-window [sum k v]
                                       (println (format "Adding value %d to sum %s for key '%s'"
                                                        v
                                                        sum
                                                        k))
                                       (+ sum
                                          v))
                                     (fn seed []
                                       0)
                                     {::K/deserializer.key   :string
                                      ::K/deserializer.value :long
                                      ::K/serializer.key     :string
                                      ::K/serializer.value   :long
                                      ::KS.store/name        "my-store"
                                      ::KS.store/type        :kv.in-memory
                                      ::KS.store/cache?      false}))
       (KS.topology/topology builder)))


;; Always interesting to see what is the actual topology.

(KS.topology/describe topology)


;; Just like in the previous example, we create a fake runtime and pipe a few records.

(def mock-app
     (KS.mock/mock-app "KS-high-level-test"
                       topology))


(dotimes [i 25]
  (KS.mock/pipe-record mock-app
                       {::K/topic "my-input-topic"
                        ::K/partition 0
                        ::K/offset    i
                        ::K/key       ((K/serializers :long) i)
                        ::K/value     ((K/serializers :long) i)}))


;; And here is how we can read from a store.

(def my-store
     (KS.mock/window-store mock-app
             	           "my-store"))


(with-open [cursor (KS.store/ws-multi-range my-store)]
  (doseq [db-record (iterator-seq cursor)]
    (println (format "Aggregated key = '%s', time windows = [%d;%d), value = %d"
                     (::K/key db-record)
                     (::K/timestamp.from db-record)
                     (::K/timestamp.to db-record)
                     (::K/value db-record)))))

Testing

It is possible to create mock consumers (dvlopt.kafka.in.mock namespace) and producers (dvlopt.kafka.out.mock) for testing purposes. They can use the normal API without needing to contact a Kafka cluster but not everything behave strictly the same (cf. documentation).

Kafka Streams applications can get fairly complex. Providing a fake runtime, as in the examples, is a great solution for unit tests. While building a new application at the REPL, you can gradually inspect if everything seem to work as expected. Hence, this is a valuable tool.

License

Copyright © 2017 Adam Helinski

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].