All Projects → manub → Scalatest Embedded Kafka

manub / Scalatest Embedded Kafka

Licence: mit
A library that provides an in-memory Kafka instance to run your tests against.

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Scalatest Embedded Kafka

Awesome Kafka
A collection of kafka-resources
Stars: ✭ 116 (-60.27%)
Mutual labels:  kafka, kafka-streams
Streamline
StreamLine - Streaming Analytics
Stars: ✭ 151 (-48.29%)
Mutual labels:  kafka, kafka-streams
Kafka Tutorials
Kafka Tutorials microsite
Stars: ✭ 144 (-50.68%)
Mutual labels:  kafka, kafka-streams
Java Kafka Client
OpenTracing Instrumentation for Apache Kafka Client
Stars: ✭ 101 (-65.41%)
Mutual labels:  kafka, kafka-streams
Strimzi Kafka Operator
Apache Kafka running on Kubernetes
Stars: ✭ 2,833 (+870.21%)
Mutual labels:  kafka, kafka-streams
Kukulcan
A REPL for Apache Kafka
Stars: ✭ 103 (-64.73%)
Mutual labels:  kafka, kafka-streams
A Kafka Story
Kafka ecosystem ... but step by step!
Stars: ✭ 148 (-49.32%)
Mutual labels:  kafka, kafka-streams
Kspp
A high performance/ real-time C++ Kafka streams framework (C++17)
Stars: ✭ 80 (-72.6%)
Mutual labels:  kafka, kafka-streams
Kafka Streams Scala
Thin Scala wrapper around Kafka Streams Java API
Stars: ✭ 192 (-34.25%)
Mutual labels:  kafka, kafka-streams
Mockedstreams
Scala DSL for Unit-Testing Processing Topologies in Kafka Streams
Stars: ✭ 184 (-36.99%)
Mutual labels:  kafka, kafka-streams
Kafkastreams Cep
Complex Event Processing on top of Kafka Streams
Stars: ✭ 257 (-11.99%)
Mutual labels:  kafka, kafka-streams
Kafka Ui
Open-Source Web GUI for Apache Kafka Management
Stars: ✭ 230 (-21.23%)
Mutual labels:  kafka, kafka-streams
Logisland
Scalable stream processing platform for advanced realtime analytics on top of Kafka and Spark. LogIsland also supports MQTT and Kafka Streams (Flink being in the roadmap). The platform does complex event processing and is suitable for time series analysis. A large set of valuable ready to use processors, data sources and sinks are available.
Stars: ✭ 97 (-66.78%)
Mutual labels:  kafka, kafka-streams
Seldon Server
Machine Learning Platform and Recommendation Engine built on Kubernetes
Stars: ✭ 1,435 (+391.44%)
Mutual labels:  kafka, kafka-streams
Willa
A Clojure DSL for Kafka Streams
Stars: ✭ 97 (-66.78%)
Mutual labels:  kafka, kafka-streams
Azkarra Streams
🚀 Azkarra is a lightweight java framework to make it easy to develop, deploy and manage cloud-native streaming microservices based on Apache Kafka Streams.
Stars: ✭ 146 (-50%)
Mutual labels:  kafka, kafka-streams
Examples
Demo applications and code examples for Confluent Platform and Apache Kafka
Stars: ✭ 571 (+95.55%)
Mutual labels:  kafka, kafka-streams
Kattlo Cli
Kattlo CLI Project
Stars: ✭ 58 (-80.14%)
Mutual labels:  kafka, kafka-streams
Kafka Streams Dotnet
.NET Stream Processing Library for Apache Kafka 🚀
Stars: ✭ 173 (-40.75%)
Mutual labels:  kafka, kafka-streams
Hivemq Mqtt Tensorflow Kafka Realtime Iot Machine Learning Training Inference
Real Time Big Data / IoT Machine Learning (Model Training and Inference) with HiveMQ (MQTT), TensorFlow IO and Apache Kafka - no additional data store like S3, HDFS or Spark required
Stars: ✭ 204 (-30.14%)
Mutual labels:  kafka, kafka-streams

Note: This repository is no longer maintained. New versions will be available at https://github.com/embeddedkafka/embedded-kafka and https://github.com/embeddedkafka/embedded-kafka-schema-registry respectively.

scalatest-embedded-kafka

A library that provides an in-memory Kafka instance to run your tests against.

As of version 1.1.1 the library is not dependent anymore on ScalaTest and won't transitively bring ScalaTest into your build.

Inspired by https://github.com/chbatey/kafka-unit

Build Status

Codacy Badge

Gitter chat

License

Version compatibility matrix

scalatest-embedded-kafka is available on Bintray and Maven Central, compiled for both Scala 2.11 and 2.12. Scala 2.10 is supported until 0.10.0. Scala 2.12 is supported from 0.11.0 onwards, following Apache Kafka release cycle.

Currently there's no support for Scala 2.13-Mx as Kafka artifacts are not published for these versions.

Starting from 1.0.0, versions match the version of Kafka they're built against. However in the past there were some mismatches - the easiest way is to check through the git history of the build.sbt file

If you're using Kafka 1.1.0, please use version 1.1.0-kafka1.1-nosr - this version doesn't pull in the Confluent Schema Registry by default

How to use

  • In your build.sbt file add the following dependency: "net.manub" %% "scalatest-embedded-kafka" % "2.0.0" % "test"
  • Have your class extend the EmbeddedKafka trait.
  • Enclose the code that needs a running instance of Kafka within the withRunningKafka closure.

An example, using ScalaTest:

class MySpec extends WordSpec with EmbeddedKafka {

  "runs with embedded kafka" should {

    "work" in {

      withRunningKafka {
        // ... code goes here
      }
    }
  }
}
  • In-memory Zookeeper and Kafka will be instantiated respectively on port 6000 and 6001 and automatically shutdown at the end of the test.

Use without the withRunningKafka method

A EmbeddedKafka companion object is provided for usage without extending the EmbeddedKafka trait. Zookeeper and Kafka can be started and stopped in a programmatic way. This is the recommended usage if you have more than one test in your file and you don't want to start and stop Kafka and Zookeeper on every test.

class MySpec extends WordSpec {
  
  "runs with embedded kafka" should {

    "work" in {
      EmbeddedKafka.start()
    
      // ... code goes here
    
      EmbeddedKafka.stop()
    }
  }
}

Please note that in order to avoid Kafka instances not shutting down properly, it's recommended to call EmbeddedKafka.stop() in a after block or in a similar teardown logic.

Configuration

It's possible to change the ports on which Zookeeper and Kafka are started by providing an implicit EmbeddedKafkaConfig

class MySpec extends WordSpec with EmbeddedKafka {

  "runs with embedded kafka on a specific port" should {

    "work" in {
      implicit val config = EmbeddedKafkaConfig(kafkaPort = 12345)

      withRunningKafka {
        // now a kafka broker is listening on port 12345
      }
    }
  }
}

If you want to run ZooKeeper and Kafka on arbitrary available ports, you can use the withRunningKafkaOnFoundPort method. This is useful to make tests more reliable, especially when running tests in parallel or on machines where other tests or services may be running with port numbers you can't control.

class MySpec extends WordSpec with EmbeddedKafka {

  "runs with embedded kafka on arbitrary available ports" should {

    "work" in {
      val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)

      withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
        // now a kafka broker is listening on actualConfig.kafkaPort
        publishStringMessageToKafka("topic", "message")
        consumeFirstStringMessageFrom("topic") shouldBe "message"
      }
    }
  }
}

The same implicit EmbeddedKafkaConfig is used to define custom consumer or producer properties

class MySpec extends WordSpec with EmbeddedKafka {

  "runs with custom producer and consumer properties" should {
    "work" in {
      val customBrokerConfig = Map("replica.fetch.max.bytes" -> "2000000",
        "message.max.bytes" -> "2000000")
        
      val customProducerConfig = Map("max.request.size" -> "2000000")
      val customConsumerConfig = Map("max.partition.fetch.bytes" -> "2000000")

      implicit val customKafkaConfig = EmbeddedKafkaConfig(
        customBrokerProperties = customBrokerConfig,
        customProducerProperties = customProducerConfig,
        customConsumerProperties = customConsumerConfig)

      withRunningKafka {
        // now a kafka broker is listening on port 12345
      }
    }
  }
}

This works for withRunningKafka, withRunningKafkaOnFoundPort, and EmbeddedKafka.start()

Also, it is now possible to provide custom properties to the broker while starting Kafka. EmbeddedKafkaConfig has a customBrokerProperties field which can be used to provide extra properties contained in a Map[String, String]. Those properties will be added to the broker configuration, be careful some properties are set by the library itself and in case of conflict the customBrokerProperties values will take precedence. Please look at the source code to see what these properties are.

Utility methods

The EmbeddedKafka trait provides also some utility methods to interact with the embedded kafka, in order to set preconditions or verifications in your specs:

def publishToKafka(topic: String, message: String): Unit

def consumeFirstMessageFrom(topic: String): String

def createCustomTopic(topic: String, topicConfig: Map[String,String], partitions: Int, replicationFactor: Int): Unit

Custom producers

It is possible to create producers for custom types in two ways:

  • Using the syntax aKafkaProducer thatSerializesValuesWith classOf[Serializer[V]]. This will return a KafkaProducer[String, V]
  • Using the syntax aKafkaProducer[V]. This will return a KafkaProducer[String, V], using an implicit Serializer[V].

For more information about how to use the utility methods, you can either look at the Scaladocs or at the tests of this project.

Custom consumers

Use the Consumers trait that easily creates consumers of arbitrary key-value types and manages their lifecycle (via a loaner pattern).

  • For basic String consumption use Consumers.withStringConsumer { your code here }.
  • For arbitrary key and value types, expose implicit Deserializers for each type and use Consumers.withConsumer { your code here }.
  • If you just want to create a consumer and manage its lifecycle yourself then try Consumers.newConsumer().

Easy message consumption

With ConsumerExtensions you can turn a consumer to a Scala lazy Stream of T and treat it as a collection for easy assertion.

  • Just import the extensions.
  • Bring an implicit ConsumerRecord[_, _] => T transform function into scope (some common functions are provided in Codecs).
  • On any KafkaConsumer instance you can now do:
import net.manub.embeddedkafka.ConsumerExtensions._
import net.manub.embeddedkafka.Codecs.stringKeyValueCrDecoder
...
consumer.consumeLazily[(String, String)]("from-this-topic").take(3).toList should be (Seq(
  "1" -> "one", 
  "2" -> "two", 
  "3" -> "three"
)

scalatest-embedded-kafka-streams

A library that builds on top of scalatest-embedded-kafka to offer easy testing of Kafka Streams.

It takes care of instantiating and starting your streams as well as closing them after running your test-case code.

How to use

  • In your build.sbt file add the following dependency: "net.manub" %% "scalatest-embedded-kafka-streams" % "2.0.0" % "test"
  • Have a look at the example test
  • For most of the cases have your Spec extend the EmbeddedKafkaStreamsAllInOne trait. This offers both streams management and easy creation of consumers for asserting resulting messages in output/sink topics.
  • If you only want to use the streams management without the test consumers just have the Spec extend the EmbeddedKafkaStreams trait.
  • Use the runStreamsWithStringConsumer to:
    • Create any topics that need to exist for the streams to operate (usually sources and sinks).
    • Pass the Topology that will be used to instantiate and start the Kafka Streams. This will be done while using the withRunningKafka closure internally so that your stream runs with an embedded Kafka and Zookeeper.
    • Pass the {code block} that needs a running instance of your streams. This is where your actual test code will sit. You can publish messages to your source topics and consume messages from your sink topics that the Kafka Streams should have generated. This method also offers a pre-instantiated consumer that can read String keys and values.
  • For more flexibility, use runStreams and withConsumer. This allows you to create your own consumers of custom types as seen in the example test.
import net.manub.embeddedkafka.ConsumerExtensions._
import net.manub.embeddedkafka.streams.EmbeddedKafkaStreamsAllInOne
import org.apache.kafka.streams.StreamsBuilder
import org.scalatest.{Matchers, WordSpec}

class MySpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
  "my kafka stream" should {
    "be easy to test" in {
      val inputTopic = "input-topic"
      val outputTopic = "output-topic"
      // your code for building the stream goes here e.g.
      val streamBuilder = new StreamsBuilder
      streamBuilder.stream(inputTopic).to(outputTopic)
      // tell the stream test
      // 1. what topics need to be created before the stream starts
      // 2. the stream topology to be used for initializing and starting the stream
      runStreamsWithStringConsumer(
        topicsToCreate = Seq(inputTopic, outputTopic),
        topology = streamBuilder.build()
      ){ consumer =>
        // your test code goes here
        publishToKafka(inputTopic, key = "hello", message = "world")
        consumer.consumeLazily[String](outputTopic).head should be ("hello" -> "world")
      }
    }
  }
}

scalatest-embedded-schema-registry

If you need to serialize and deserialize messages using Avro, a Confluent Schema Registry instance can be provided to test your code.

How to use

  • In your build.sbt file add the following resolver: resolvers += "confluent" at "https://packages.confluent.io/maven/"
  • In your build.sbt file add the following dependency: "net.manub" %% "scalatest-embedded-schema-registry" % "2.0.0" % "test"
  • Have your test extend the EmbeddedKafkaWithSchemaRegistry trait.
  • Enclose the code that needs a running instance of Kafka within the withRunningKafka closure.
  • Provide an implicit EmbeddedKafkaConfigWithSchemaRegistryImpl.
class MySpec extends WordSpec with EmbeddedKafkaWithSchemaRegistry {

  "runs with embedded kafka and Schema Registry" should {

    "work" in {
      implicit val config = EmbeddedKafkaConfigWithSchemaRegistryImpl()

      withRunningKafka {
        // ... code goes here
      }
    }
  }
}
  • A Schema Registry server will be started and automatically shutdown at the end of the test.

Utility methods

The net.manub.embeddedkafka.avro.schemaregistry package object provides useful implicit converters for testing with Avro and Schema Registry.

Using streams

  • For most of the cases have your Spec extend the EmbeddedKafkaStreamsWithSchemaRegistryAllInOne trait. This offers both streams management and easy creation of consumers for asserting resulting messages in output/sink topics.
  • If you only want to use the streams management without the test consumers just have the Spec extend the EmbeddedKafkaStreamsWithSchemaRegistry trait.
  • Build your own Topology and use runStreams to test it.
  • Have a look at the example test.
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].