All Projects → jpzk → Mockedstreams

jpzk / Mockedstreams

Licence: apache-2.0
Scala DSL for Unit-Testing Processing Topologies in Kafka Streams

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Mockedstreams

Willa
A Clojure DSL for Kafka Streams
Stars: ✭ 97 (-47.28%)
Mutual labels:  kafka, kafka-streams
Seldon Server
Machine Learning Platform and Recommendation Engine built on Kubernetes
Stars: ✭ 1,435 (+679.89%)
Mutual labels:  kafka, kafka-streams
Logisland
Scalable stream processing platform for advanced realtime analytics on top of Kafka and Spark. LogIsland also supports MQTT and Kafka Streams (Flink being in the roadmap). The platform does complex event processing and is suitable for time series analysis. A large set of valuable ready to use processors, data sources and sinks are available.
Stars: ✭ 97 (-47.28%)
Mutual labels:  kafka, kafka-streams
Examples
Demo applications and code examples for Confluent Platform and Apache Kafka
Stars: ✭ 571 (+210.33%)
Mutual labels:  kafka, kafka-streams
Azkarra Streams
🚀 Azkarra is a lightweight java framework to make it easy to develop, deploy and manage cloud-native streaming microservices based on Apache Kafka Streams.
Stars: ✭ 146 (-20.65%)
Mutual labels:  kafka, kafka-streams
Kattlo Cli
Kattlo CLI Project
Stars: ✭ 58 (-68.48%)
Mutual labels:  kafka, kafka-streams
Kukulcan
A REPL for Apache Kafka
Stars: ✭ 103 (-44.02%)
Mutual labels:  kafka, kafka-streams
Demo Scene
👾Scripts and samples to support Confluent Demos and Talks. ⚠️Might be rough around the edges ;-) 👉For automated tutorials and QA'd code, see https://github.com/confluentinc/examples/
Stars: ✭ 806 (+338.04%)
Mutual labels:  kafka, kafka-streams
Supersafebank
Sample Event Sourcing implementation with .NET Core
Stars: ✭ 142 (-22.83%)
Mutual labels:  kafka, unit-testing
Kafka Tutorials
Kafka Tutorials microsite
Stars: ✭ 144 (-21.74%)
Mutual labels:  kafka, kafka-streams
Tsujun
Yet another Web UI for KSQL
Stars: ✭ 45 (-75.54%)
Mutual labels:  kafka, kafka-streams
Streamline
StreamLine - Streaming Analytics
Stars: ✭ 151 (-17.93%)
Mutual labels:  kafka, kafka-streams
Kafka Workshop
Materials (slides and code) for Kafka and Kafka Streams Workshop
Stars: ✭ 44 (-76.09%)
Mutual labels:  kafka, kafka-streams
Kspp
A high performance/ real-time C++ Kafka streams framework (C++17)
Stars: ✭ 80 (-56.52%)
Mutual labels:  kafka, kafka-streams
Cp Docker Images
[DEPRECATED] Docker images for Confluent Platform.
Stars: ✭ 975 (+429.89%)
Mutual labels:  kafka, kafka-streams
Java Kafka Client
OpenTracing Instrumentation for Apache Kafka Client
Stars: ✭ 101 (-45.11%)
Mutual labels:  kafka, kafka-streams
Faust
Python Stream Processing
Stars: ✭ 5,899 (+3105.98%)
Mutual labels:  kafka, kafka-streams
Kafka Streams Machine Learning Examples
This project contains examples which demonstrate how to deploy analytic models to mission-critical, scalable production environments leveraging Apache Kafka and its Streams API. Models are built with Python, H2O, TensorFlow, Keras, DeepLearning4 and other technologies.
Stars: ✭ 661 (+259.24%)
Mutual labels:  kafka, kafka-streams
Awesome Kafka
A collection of kafka-resources
Stars: ✭ 116 (-36.96%)
Mutual labels:  kafka, kafka-streams
A Kafka Story
Kafka ecosystem ... but step by step!
Stars: ✭ 148 (-19.57%)
Mutual labels:  kafka, kafka-streams

Mocked Streams

Build Status Codacy Badge codecov License GitHub stars Maven Central

Documentation located at http://mockedstreams.madewithtea.com/

Mocked Streams 3.9.0 (git) is a library for Scala 2.12 and 2.13 which allows you to unit-test processing topologies of Kafka Streams applications (since Apache Kafka >=0.10.1) without Zookeeper and Kafka Brokers. Further, you can use your favourite Scala testing framework e.g. ScalaTest and Specs2. Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your SBT dependencies:

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.9.0" % "test"

Java 8 port of Mocked Streams is Mockafka

Apache Kafka Compatibility

Mocked Streams Version Apache Kafka Version
3.9.0 2.7.0.0
3.8.0 2.6.1.0
3.7.0 2.5.0.0
3.6.0 2.4.1.0
3.5.2 2.4.0.0
3.5.1 2.4.0.0
3.5.0 2.4.0.0
3.4.0 2.3.0.0
3.3.0 2.2.0.0
3.2.0 2.1.1.0
3.1.0 2.1.0.0
2.2.0 2.1.0.0
2.1.0 2.0.0.0
2.0.0 2.0.0.0
1.8.0 1.1.1.0
1.7.0 1.1.0.0
1.6.0 1.0.1.0
1.5.0 1.0.0.0
1.4.0 0.11.0.1
1.3.0 0.11.0.0
1.2.1 0.10.2.1
1.2.0 0.10.2.0
1.1.0 0.10.1.1
1.0.0 0.10.1.0

Simple Example

It wraps the org.apache.kafka.streams.TopologyTestDriver class, but adds more syntactic sugar to keep your test code simple:

import com.madewithtea.mockedstreams.MockedStreams

val input = Seq(("x", "v1"), ("y", "v2"))
val exp = Seq(("x", "V1"), ("y", "V2"))
val strings = Serdes.String()

MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("topic-in", strings, strings, input)
  .output("topic-out", strings, strings) shouldEqual exp

Multiple Input / Output Example and State

It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(stores: Seq[String]):

import com.madewithtea.mockedstreams.MockedStreams

val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

mstreams.output("out-a", strings, ints) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints) shouldEqual(expectedB)

Record order and multiple emissions

The records provided to the mocked stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit records multiple times to the same topics, at various moments in your scenario.

This can be handy to validate that your topology behaviour is or is not dependent on the order in which the records are received and processed.

In the example below, 2 records are first submitted to topic A, then 3 to topic B, then 1 more to topic A again.

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
  .topology(topologyTables) // Scala DSL
  .input(InputATopic, strings, ints, firstInputForTopicA)
  .input(InputBTopic, strings, ints, firstInputForTopicB)
  .input(InputATopic, strings, ints, secondInputForTopicA)

State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method:

import com.madewithtea.mockedstreams.MockedStreams

 val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

 mstreams.stateTable("store-name") shouldEqual Map('a' -> 1) 

Window State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2 and added the timestamp extractor to the config, you are able to verify the window state store content via the .windowStateTable(name: String, key: K) method:

import com.madewithtea.mockedstreams.MockedStreams

val props = new Properties
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  classOf[TimestampExtractors.CustomTimestampExtractor].getName)

val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("in-a", strings, ints, inputA)
  .stores(Seq("store-name"))
  .config(props)

mstreams.windowStateTable("store-name", "x") shouldEqual someMapX
mstreams.windowStateTable("store-name", "y") shouldEqual someMapY

Adding Timestamps

With .input the input records timestamps are set to 0 default timestamp of 0. This e.g. prevents testing Join windows of Kafka streams as it cannot produce records with different timestamps. However, using .inputWithTime allows adding timestamps like in the following example:

val inputA = Seq(
  ("x", int(1), 1000L),
  ("x", int(1), 1001L),
  ("x", int(1), 1002L)
)

val builder = MockedStreams()
  .topology(topology1WindowOutput) // Scala DSL
  .inputWithTime(InputCTopic, strings, ints, inputA)
  .stores(Seq(StoreName))

Custom Streams Configuration

Sometimes you need to pass a custom configuration to Kafka Streams:

import com.madewithtea.mockedstreams.MockedStreams

  val props = new Properties
  props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName)

  val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .config(props)
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

mstreams.output("out-a", strings, ints) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints) shouldEqual(expectedB)

Companies using Mocked Streams

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].