All Projects â†’ niqdev â†’ kafka-scala-examples

niqdev / kafka-scala-examples

Licence: other
Examples of Avro, Kafka, Schema Registry, Kafka Streams, Interactive Queries, KSQL, Kafka Connect in Scala

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to kafka-scala-examples

sbt-avro
Plugin SBT to Generate Scala classes from Apache Avro schemas hosted on a remote Confluent Schema Registry.
Stars: ✭ 15 (-71.7%)
Mutual labels:  avro, schema-registry
kafka-compose
🎼 Docker compose files for various kafka stacks
Stars: ✭ 32 (-39.62%)
Mutual labels:  avro, kafka-connect
registryless-avro-converter
An avro converter for Kafka Connect without a Schema Registry
Stars: ✭ 45 (-15.09%)
Mutual labels:  avro, kafka-connect
tamer
Standalone alternatives to Kafka Connect Connectors
Stars: ✭ 42 (-20.75%)
Mutual labels:  avro, schema-registry
Examples
Demo applications and code examples for Confluent Platform and Apache Kafka
Stars: ✭ 571 (+977.36%)
Mutual labels:  avro, kafka-streams
avro turf
A library that makes it easier to use the Avro serialization format from Ruby.
Stars: ✭ 130 (+145.28%)
Mutual labels:  avro, schema-registry
Kafka Connect Mongodb
**Unofficial / Community** Kafka Connect MongoDB Sink Connector - Find the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Stars: ✭ 137 (+158.49%)
Mutual labels:  avro, kafka-connect
Kafka Ui
Open-Source Web GUI for Apache Kafka Management
Stars: ✭ 230 (+333.96%)
Mutual labels:  kafka-connect, kafka-streams
confluent-spark-avro
Spark UDFs to deserialize Avro messages with schemas stored in Schema Registry.
Stars: ✭ 18 (-66.04%)
Mutual labels:  avro, schema-registry
spring-cloud-stream-event-sourcing-testcontainers
Goal: create a Spring Boot application that handles users using Event Sourcing. So, whenever a user is created, updated, or deleted, an event informing this change is sent to Kafka. Also, we will implement another application that listens to those events and saves them in Cassandra. Finally, we will use Testcontainers for integration testing.
Stars: ✭ 16 (-69.81%)
Mutual labels:  avro, schema-registry
Mongo Kafka
MongoDB Kafka Connector
Stars: ✭ 166 (+213.21%)
Mutual labels:  avro, kafka-connect
postgres-kafka-demo
Fully reproducible, Dockerized, step-by-step, demo on how to stream tables from Postgres to Kafka/KSQL back to Postgres. Detailed blog post published on Medium.
Stars: ✭ 128 (+141.51%)
Mutual labels:  kafka-connect, ksql
kafkacli
CLI and Go Clients to manage Kafka components (Kafka Connect & SchemaRegistry)
Stars: ✭ 28 (-47.17%)
Mutual labels:  schema-registry, kafka-connect
schema-registry
📙 json & avro http schema registry backed by Kafka
Stars: ✭ 23 (-56.6%)
Mutual labels:  avro, schema-registry
KafkaStream-CQRS-EventSourcing
Event Sourcing(CQRS) and Materialized views with Kafka Streams
Stars: ✭ 22 (-58.49%)
Mutual labels:  schema-registry, kafka-streams
avrora
A convenient Elixir library to work with Avro schemas and Confluent® Schema Registry
Stars: ✭ 59 (+11.32%)
Mutual labels:  avro, schema-registry
Strimzi Kafka Operator
Apache Kafka running on Kubernetes
Stars: ✭ 2,833 (+5245.28%)
Mutual labels:  kafka-connect, kafka-streams
Hivemq Mqtt Tensorflow Kafka Realtime Iot Machine Learning Training Inference
Real Time Big Data / IoT Machine Learning (Model Training and Inference) with HiveMQ (MQTT), TensorFlow IO and Apache Kafka - no additional data store like S3, HDFS or Spark required
Stars: ✭ 204 (+284.91%)
Mutual labels:  kafka-connect, kafka-streams
dotnet-avro
An Avro implementation for .NET
Stars: ✭ 60 (+13.21%)
Mutual labels:  avro, schema-registry
Schema Registry
Confluent Schema Registry for Kafka
Stars: ✭ 1,647 (+3007.55%)
Mutual labels:  avro, schema-registry

kafka-scala-examples

Build Status Scala Steward badge

Examples in Scala of

Local environment

# start locally
# - zookeeper
# - kafka
# - kafka-rest
# - kafka-ui
# - schema-registry
# - schema-registry-ui
# - ksql-server
# - ksql-cli
# - kafka-connect
# - kafka-connect-ui
docker-compose up

# (mac|linux) view kafka ui
[open|xdg-open] http://localhost:8000

# (mac|linux) view schema-registry ui
[open|xdg-open] http://localhost:8001

# (mac|linux) view kafka-connect ui
[open|xdg-open] http://localhost:8002

# cleanup
docker-compose down -v

If containers are crashing, make sure you have enough resources

# verify memory and cpu usage
docker ps -q | xargs docker stats --no-stream

# verify status
docker inspect <CONTAINER_NAME> | jq '.[].State'

avro

Description

Avro serialization and deserialization examples of

Demo

# console
sbt avro/console

# generate avro classes
# avro/target/scala-2.12/classes/com/kafka/demo/User.class
sbt clean avro/compile

# test
sbt clean avro/test

Resources

kafka

Description

Kafka apis example of

Demo

# access kafka
docker exec -it local-kafka bash

# create topic
# convention <MESSAGE_TYPE>.<DATASET_NAME>.<DATA_NAME>
# example [example.no-schema.original|example.no-schema.cakesolutions]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# delete topic
kafka-topics --zookeeper zookeeper:2181 \
  --delete --topic <TOPIC_NAME>

# view topic
kafka-topics --zookeeper zookeeper:2181 --list 
kafka-topics --zookeeper zookeeper:2181 --describe --topic <TOPIC_NAME>

# view topic offset
kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list kafka:9092 \
  --time -1 \
  --topic <TOPIC_NAME>

# list consumer groups
kafka-consumer-groups --bootstrap-server kafka:9092 --list

# view consumer group offset
kafka-consumer-groups \
  --bootstrap-server kafka:9092 \
  --group <GROUP_NAME> \
  --describe

# reset consumer group offset
kafka-consumer-groups \
  --bootstrap-server kafka:9092 \
  --group <GROUP_NAME> \
  --topic <TOPIC_NAME> \
  --reset-offsets \
  --to-earliest \
  --execute

# console producer
kafka-console-producer --broker-list kafka:9092 --topic <TOPIC_NAME>
kafkacat -P -b 0 -t <TOPIC_NAME>

# console consumer
kafka-console-consumer --bootstrap-server kafka:9092 --topic <TOPIC_NAME> --from-beginning
kafkacat -C -b 0 -t <TOPIC_NAME>

# producer example
sbt "kafka/runMain com.kafka.demo.original.Producer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Producer"

# consumer example
sbt "kafka/runMain com.kafka.demo.original.Consumer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Consumer"

# test
sbt clean kafka/test
sbt "test:testOnly *KafkaSpec"

Resources

schema-registry

Description

# register schema
# convention <TOPIC_NAME>-key or <TOPIC_NAME>-value
http -v POST :8081/subjects/example.with-schema.simple-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'

# import schema from file
http -v POST :8081/subjects/example.with-schema.user-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema=@avro/src/main/avro/user.avsc

# export schema to file
http :8081/subjects/example.with-schema.user-value/versions/latest \
  | jq -r '.schema|fromjson' \
  | tee avro/src/main/avro/user-latest.avsc

# list subjects
http -v :8081/subjects

# list subject's versions
http -v :8081/subjects/example.with-schema.simple-value/versions

# fetch by version
http -v :8081/subjects/example.with-schema.simple-value/versions/1

# fetch by id
http -v :8081/schemas/ids/1

# test compatibility
http -v POST :8081/compatibility/subjects/example.with-schema.simple-value/versions/latest \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'

# delete version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/1

# delete latest version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/latest

# delete subject
http -v DELETE :8081/subjects/example.with-schema.simple-value

# stringify
jq tostring avro/src/main/avro/user.avsc

Demo

# generate SpecificRecord classes under "schema-registry/target/scala-2.12/src_managed/main/compiled_avro"
sbt clean schema-registry/avroScalaGenerateSpecific

# (optional) create schema
http -v POST :8081/subjects/example.with-schema.payment-key/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'
http -v POST :8081/subjects/example.with-schema.payment-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema=@schema-registry/src/main/avro/Payment.avsc

# access kafka
docker exec -it local-kafka bash

# (optional) create topic
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic example.with-schema.payment

# console producer (binary)
kafka-console-producer --broker-list kafka:9092 --topic example.with-schema.payment

# console consumer (binary)
kafka-console-consumer --bootstrap-server kafka:9092 --topic example.with-schema.payment

# access schema-registry
docker exec -it local-schema-registry bash

# avro console producer
# example "MyKey",{"id":"MyId","amount":10}
kafka-avro-console-producer --broker-list kafka:29092 \
  --topic example.with-schema.payment \
  --property schema.registry.url=http://schema-registry:8081 \
  --property parse.key=true \
  --property key.separator=, \
  --property key.schema='{"type":"string"}' \
  --property value.schema='{"namespace":"io.confluent.examples.clients.basicavro","type":"record","name":"Payment","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]}'

# avro console consumer
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
  --topic example.with-schema.payment \
  --property schema.registry.url=http://schema-registry:8081 \
  --property schema.id.separator=: \
  --property print.key=true \
  --property print.schema.ids=true \
  --property key.separator=, \
  --from-beginning

# producer example
sbt "schema-registry/runMain com.kafka.demo.specific.Producer"

# consumer example
sbt "schema-registry/runMain com.kafka.demo.specific.Consumer"

# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistrySpecificSpec"
# producer example
sbt "schema-registry/runMain com.kafka.demo.generic.Producer"

# consumer example
sbt "schema-registry/runMain com.kafka.demo.generic.Consumer"

# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistryGenericSpec"

Resources

Alternatives

TODO

  • generic + schema evolution
  • ovotech
  • multi-schema
  • formulation

kafka-streams

Description

Kafka Streams apis examples

Demo-1

# access kafka
docker exec -it local-kafka bash

# create topic
# example [example.to-upper-case-app.input|example.to-upper-case-app.output]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# ToUpperCaseApp example (input topic required)
sbt "streams/runMain com.kafka.demo.streams.ToUpperCaseApp"

# produce
kafka-console-producer --broker-list kafka:9092 \
  --topic example.to-upper-case-app.input

# consume
kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic example.to-upper-case-app.output

# test
sbt clean streams/test

Demo-2

Tested with embedded-kafka and embedded-kafka-schema-registry

# access kafka
docker exec -it local-kafka bash

# create topic
# example [json.streams-json-to-avro-app.input|avro.streams-json-to-avro-app.output]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# produce (default StringSerializer)
kafka-console-producer \
  --broker-list kafka:9092 \
  --property "parse.key=true" \
  --property "key.separator=:" \
  --property "key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
  --property "value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
  --topic <TOPIC_NAME>

# consume (default StringDeserializer)
kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --from-beginning \
  --property "print.key=true" \
  --property "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
  --property "value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
  --topic <TOPIC_NAME>

# access schema-registry
docker exec -it local-schema-registry bash

# consume avro
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
  --property schema.registry.url=http://schema-registry:8081 \
  --property schema.id.separator=: \
  --property print.key=true \
  --property print.schema.ids=true \
  --property key.separator=, \
  --from-beginning \
  --topic <TOPIC_NAME>

# JsonToAvroApp example (input topic required)
sbt "streams-json-avro/runMain com.kafka.demo.JsonToAvroApp"

# test
sbt clean streams-json-avro/test

Example

# json
mykey:{"valueInt":42,"valueString":"foo"}

# log
[json.streams-json-to-avro-app.input]: mykey, JsonModel(42,foo)
[avro.streams-json-to-avro-app.output]: KeyAvroModel(mykey), ValueAvroModel(42,FOO)

Demo-3

TODO

  • CatsKafkaStreamsApp [source]
# run app
sbt -jvm-debug 5005 "cats-kafka-streams/runMain com.kafka.demo.CatsKafkaStreamsApp"

Demo-4

TODO

# run app
sbt -jvm-debug 5005 "zio-kafka-streams/runMain com.kafka.demo.ZioKafkaStreamsApp"

Resources

ksql

Description

Setup Kafka

# access kafka
docker exec -it local-kafka bash

# create topic
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic USER_PROFILE

# produce sample data
kafka-console-producer --broker-list kafka:9092 --topic USER_PROFILE << EOF
{"userid": 1000, "firstname": "Alison", "lastname": "Smith", "countrycode": "GB", "rating": 4.7}
EOF

# consume
kafka-console-consumer --bootstrap-server kafka:9092 --topic USER_PROFILE --from-beginning

Access KSQL CLI

  • using the server

    # access ksql-server
    docker exec -it local-ksql-server bash
    
    # start ksql cli
    ksql http://ksql-server:8088
  • using a local instance

    # connect to local cli
    docker exec -it local-ksql-cli ksql http://ksql-server:8088
  • using a temporary instance

    # connect to remote server
    docker run --rm \
      --network=kafka-scala-examples_local_kafka_network \
      -it confluentinc/cp-ksql-cli http://ksql-server:8088

Execute SQL statements

# create stream
CREATE STREAM user_profile (\
  userid INT, \
  firstname VARCHAR, \
  lastname VARCHAR, \
  countrycode VARCHAR, \
  rating DOUBLE \
  ) WITH (KAFKA_TOPIC = 'USER_PROFILE', VALUE_FORMAT = 'JSON');

# verify stream
list streams;
describe user_profile;

# query stream
SELECT userid, firstname, lastname, countrycode, rating FROM user_profile EMIT CHANGES;

Expect the consumer and the query to show the generated data

# generate data
docker run --rm \
  -v $(pwd)/local/ksql:/datagen \
  --network=kafka-scala-examples_local_kafka_network \
  -it confluentinc/ksql-examples ksql-datagen \
  bootstrap-server=kafka:29092 \
  schemaRegistryUrl=http://schema-registry:8081 \
  schema=datagen/user_profile.avro \
  format=json \
  topic=USER_PROFILE \
  key=userid \
  maxInterval=5000 \
  iterations=100

kafka-connect

Setup PostgreSQL locally

# create shared network
docker-compose up

# start postgres
docker-compose -f docker-compose.postgres.yml up

# (mac|linux) view postgres ui
# [schema=public|database=postgres|username=postgres|password=postgres]
[open|xdg-open] http://localhost:8080

Setup connectors

# list connector
http -v :8083/connectors

# init data to generate schema
cp local/connect/data/resources-0.txt.orig local/connect/data/resources-0.txt

# setup spooldir source connector
http -v --json POST :8083/connectors < local/connect/config/source-spooldir-connector.json

# ingest data
echo "{\"accountId\":\"123\",\"resourceType\":\"XXX\",\"value\":\"X1\"}" > local/connect/data/resources-1.txt

# setup jdbc sink connector
# topic = SCHEMA.DATABASE = "public.postgres"
http -v --json POST :8083/connectors < local/connect/config/sink-jdbc-connector.json

# verify data
docker exec -it local-postgres bash -c "psql -U postgres postgres"
select * from public.postgres;

# cleanup
docker-compose -f docker-compose.postgres.yml down -v

extra

Resources

Tools

Companies

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].