All Projects → lensesio → kafka-connect-kcql-smt

lensesio / kafka-connect-kcql-smt

Licence: Apache-2.0 license
Kafka-Connect SMT (Single Message Transformations) with SQL syntax (Using Apache Calcite for the SQL parsing)

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to kafka-connect-kcql-smt

kafka-connect-examples
Kafka Connect Examples
Stars: ✭ 36 (+33.33%)
Mutual labels:  kafka-connect
cassandra.realtime
Different ways to process data into Cassandra in realtime with technologies such as Kafka, Spark, Akka, Flink
Stars: ✭ 25 (-7.41%)
Mutual labels:  kafka-connect
registryless-avro-converter
An avro converter for Kafka Connect without a Schema Registry
Stars: ✭ 45 (+66.67%)
Mutual labels:  kafka-connect
kafka-connect-http
Kafka Connect connector that enables Change Data Capture from JSON/HTTP APIs into Kafka.
Stars: ✭ 81 (+200%)
Mutual labels:  kafka-connect
bigquery-kafka-connect
☁️ nodejs kafka connect connector for Google BigQuery
Stars: ✭ 17 (-37.04%)
Mutual labels:  kafka-connect
kafka-junit
Enables you to start and stop a fully-fledged embedded Kafka cluster from within JUnit and provides a rich set of convenient accessors and fault injectors through a lean API. Supports working against external clusters as well.
Stars: ✭ 38 (+40.74%)
Mutual labels:  kafka-connect
kafka-connect-redis
📕 Kafka Connect source and sink connector for Redis
Stars: ✭ 35 (+29.63%)
Mutual labels:  kafka-connect
kafka-connect-datagen
A Kafka Connect source connector that generates data for tests
Stars: ✭ 27 (+0%)
Mutual labels:  kafka-connect
kafka-connect-arangodb
🥑 Kafka connect sink connector for ArangoDB
Stars: ✭ 22 (-18.52%)
Mutual labels:  kafka-connect
kafka-with-springboot
Demonstrations for Kafka with Spring Boot
Stars: ✭ 17 (-37.04%)
Mutual labels:  kafka-connect
MongoDb-Sink-Connector
Kafka MongoDb sink connector
Stars: ✭ 19 (-29.63%)
Mutual labels:  kafka-connect
kafkacli
CLI and Go Clients to manage Kafka components (Kafka Connect & SchemaRegistry)
Stars: ✭ 28 (+3.7%)
Mutual labels:  kafka-connect
scylla-cdc-source-connector
A Kafka source connector capturing Scylla CDC changes
Stars: ✭ 19 (-29.63%)
Mutual labels:  kafka-connect
kafka-connect-cosmosdb
Kafka Connect connectors for Azure Cosmos DB
Stars: ✭ 28 (+3.7%)
Mutual labels:  kafka-connect
docker-kafka-connect
Docker Image for kafka-connect
Stars: ✭ 16 (-40.74%)
Mutual labels:  kafka-connect
fast-data-connect-cluster
Create Kafka-Connect clusters with docker . You put the Kafka, we put the Connect.
Stars: ✭ 25 (-7.41%)
Mutual labels:  kafka-connect
connor
A commandline tool for resetting Kafka Connect source connector offsets.
Stars: ✭ 17 (-37.04%)
Mutual labels:  kafka-connect
kafka-connect-splunk
Kafka Connect connector for receiving data and writing data to Splunk.
Stars: ✭ 25 (-7.41%)
Mutual labels:  kafka-connect
kafka-connect-ftp
A Kafka Connect Source for FTP servers - Monitors files on an FTP server and feeds changes into Kafka
Stars: ✭ 46 (+70.37%)
Mutual labels:  kafka-connect
maxwell-sink
consume maxwell generated message from kafka,export it to another mysql.
Stars: ✭ 16 (-40.74%)
Mutual labels:  kafka-connect

Action Status

Kafka Connect Sql Single Message Transform

Use SQL to drive the transformation of the Kafka message(key or/and value) when using Kafka Connect. Before SMT you needed a KStream app to take the message from the source topic apply the transformation to a new topic. We have developed a KStreams library ( you can find on github) to make it easy expressing simple Kafka streams transformations.

However the extra topic is not required anymore for Kafka Connect!.

Why

Sources or sinks might produce/deal-with data that is not in sync with what you want:

  • you have a kafka topic where you want to pick up specific fields for the sink
  • you might want to flatten the message structure
  • you might want to rename fields
  • (coming soon) might want to filter messages

And you want to express it with a simple syntax! This is where SQL SMT comes to help you!

Configuration

Configuration Type Required Description
connect.transforms.sql.key String N Comma separated SQL targeting the key of a Kafka Message
connect.transforms.sql.value String N Comma separated SQL targeting the value of a Kafka Message

The SQL will define the mapping between the topic and the transformation to be applied. Each message on the specified topics will get the appropriate transformation.

Example configuration

connect.transforms.sql.value=SELECT ingredients.name as fieldName,ingredients.*, ingredients.sugar as fieldSugar FROM topic1 withstructure;SELECT name, address.street.name as streetName, address.street2.name as streetName2 FROM topic2

Kafka Connect Payloads supported

In most cases the payload sent over Kafka is Avro. That might not always be the case for existing systems where in most cases the payload is json. As a result the transform is capable of handling more than just Avro. Supported payload type (applies to both key and value):

Schema Type Input Schema Output Output
Type.STRUCT Struct Type Struct Struct
Type.BYTES Json (byte[]) Schema.Bytes Json(byte[])
Type.STRING Json(string) Schema.STRING Json (string)
NULL Json (byte[]) NULL Json (byte[])
NULL Json (string) NULL Json (string)

SQL

We make use of Apache Calcite to handle the SQL parsing. The library support for SQL is quite large but for now we only handle simple SQL identifiers (nested structure is supported) with more to come like: WHERE condition and probably SQL operation(field concatenation for example) Syntax:

SELECT ...
FROM TOPIC
[WITHSTRUCTURE]

There are two modes for the SQL when it comes to Kafka Connect SMT

  • flatten the structure. General syntax is like this:SELECT ... FROM TOPIC_A
//rename and only pick fields on first level
SELECT calories as C ,vegan as V ,name as fieldName FROM topic

//Cherry pick fields on different levels in the structure
SELECT name, address.street.name as streetName FROM topic

//Select and rename fields on nested level
SELECT name, address.street.*, address.street2.name as streetName2 FROM topic
  • retain structure. Syntax looks like SELECT ... FROM TOPIC_A WITHSTUCTURE. Notice the WITHSTRUCTRE keyword.
//you can select itself - obviously no real gain on this
SELECT * FROM topic withstructure 

//rename a field 
SELECT *, name as fieldName FROM topic withstructure

//rename a complex field
SELECT *, ingredients as stuff FROM topic withstructure

//select a single field
SELECT vegan FROM topic withstructure

//rename and only select nested fields
SELECT ingredients.name as fieldName, ingredients.sugar as fieldSugar, ingredients.* FROM topic withstructure

Not supported

Applying SQL to value to use the message key fields or metadata. Coming soon!

2.0 (2020-01-01)

  • Updated to scala 2.12

0.1 (2017-05-16)

  • first release

Building

Requires gradle 5.0 to build.

To build

gradle compile

To test

gradle test

You can also use the gradle wrapper

./gradlew build

To view dependency trees

gradle dependencies # 
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].