All Projects → blendle → Pg2kafka

blendle / Pg2kafka

Licence: isc
Ship changes in Postgres 🐘 to Kafka 📖

Programming Languages

go
31211 projects - #10 most used programming language
golang
3204 projects

Projects that are alternatives of or similar to Pg2kafka

Ksql
The database purpose-built for stream processing applications.
Stars: ✭ 4,668 (+7552.46%)
Mutual labels:  kafka, stream-processing
Pmacct
pmacct is a small set of multi-purpose passive network monitoring tools [NetFlow IPFIX sFlow libpcap BGP BMP RPKI IGP Streaming Telemetry].
Stars: ✭ 677 (+1009.84%)
Mutual labels:  kafka, postgresql
Kafka Streams
equivalent to kafka-streams 🐙 for nodejs ✨🐢🚀✨
Stars: ✭ 613 (+904.92%)
Mutual labels:  kafka, stream-processing
Micronaut Microservices Poc
Very simplified insurance sales system made in a microservices architecture using Micronaut
Stars: ✭ 394 (+545.9%)
Mutual labels:  kafka, postgresql
Streamsx.messaging
This toolkit is focused on interacting with popular messaging systems such as Kafka, JMS, XMS, and MQTT. After release v5.4.2 the complete toolkit will be deprecated. See the README.md file for hints to alternative toolkits.
Stars: ✭ 31 (-49.18%)
Mutual labels:  kafka, stream-processing
Awesome Kafka
A list about Apache Kafka
Stars: ✭ 397 (+550.82%)
Mutual labels:  kafka, stream-processing
Faust
Python Stream Processing
Stars: ✭ 5,899 (+9570.49%)
Mutual labels:  kafka, stream-processing
Watermill
Building event-driven applications the easy way in Go.
Stars: ✭ 3,504 (+5644.26%)
Mutual labels:  kafka, stream-processing
Hazelcast Jet
Distributed Stream and Batch Processing
Stars: ✭ 855 (+1301.64%)
Mutual labels:  kafka, stream-processing
Es Cqrs Shopping Cart
A resilient and scalable shopping cart system designed using Event Sourcing (ES) and Command Query Responsibility Segregation (CQRS)
Stars: ✭ 19 (-68.85%)
Mutual labels:  kafka, postgresql
Datafaker
Datafaker is a large-scale test data and flow test data generation tool. Datafaker fakes data and inserts to varied data sources. 测试数据生成工具
Stars: ✭ 327 (+436.07%)
Mutual labels:  kafka, postgresql
Storm Dynamic Spout
A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
Stars: ✭ 40 (-34.43%)
Mutual labels:  kafka, stream-processing
Benthos
Fancy stream processing made operationally mundane
Stars: ✭ 3,705 (+5973.77%)
Mutual labels:  kafka, stream-processing
Kasper
Kasper is a lightweight library for processing Kafka topics.
Stars: ✭ 413 (+577.05%)
Mutual labels:  kafka, stream-processing
Devicehive Java Server
DeviceHive Java Server
Stars: ✭ 241 (+295.08%)
Mutual labels:  kafka, postgresql
Go Streams
A lightweight stream processing library for Go
Stars: ✭ 615 (+908.2%)
Mutual labels:  kafka, stream-processing
Devops Bash Tools
550+ DevOps Bash Scripts - AWS, GCP, Kubernetes, Kafka, Docker, APIs, Hadoop, SQL, PostgreSQL, MySQL, Hive, Impala, Travis CI, Jenkins, Concourse, GitHub, GitLab, BitBucket, Azure DevOps, TeamCity, Spotify, MP3, LDAP, Code/Build Linting, pkg mgmt for Linux, Mac, Python, Perl, Ruby, NodeJS, Golang, Advanced dotfiles: .bashrc, .vimrc, .gitconfig, .screenrc, .tmux.conf, .psqlrc ...
Stars: ✭ 226 (+270.49%)
Mutual labels:  kafka, postgresql
Storagetapper
StorageTapper is a scalable realtime MySQL change data streaming, logical backup and logical replication service
Stars: ✭ 232 (+280.33%)
Mutual labels:  kafka, postgresql
Nakadi
A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues
Stars: ✭ 734 (+1103.28%)
Mutual labels:  kafka, postgresql
Ksql Recipes Try It At Home
Files needed to try out KSQL Recipes for yourself
Stars: ✭ 33 (-45.9%)
Mutual labels:  kafka, stream-processing

pg2kafka

This service adds triggers to a given table in your Postgres database after taking a snapshot of it's initial representation, and tracks changes to that table to deliver them to a Kafka topic.

It consists of two parts:

  • A schema in your DB containing an outbound_event_queue table and all the necessary functions and triggers to take snapshots and track changes.
  • A small executable that reads from said table, and ships them to Kafka.

pg2kafka is still in early development, it is not advised to use this in production. If you run into issues, please open an issue.

We use this as a way to reliably get data out of our hosted PostgreSQL databases where we cannot use systems like debezium or bottled water since we do not have access to the WAL logs and cannot install native extensions or run binaries on the database host machine.

The following SQL statements are used to send updates to the relevant topic:

  • INSERT
  • UPDATE
  • DELETE

Usage

Connect pg2kafka to the database you want to stream changes from, and set the PERFORM_MIGRATIONS env var to true, this will create a schema pg2kafka in said DB and will set up an outbound_event_queue table there, together with the necessary functions and triggers to start exporting data.

In order to start tracking changes for a table, you need to execute the pg2kafka.setup function with the table name and a column to use as external ID. The external ID will be what's used as a partitioning key in Kafka, this ensures that messages for a given entity will always end up in order, on the same partition. The example below will add the trigger to the products table and use its sku column as the external ID.

Let's say we have a database called shop_test:

$ createdb shop_test

It contains a table products:

CREATE TABLE products (
  id BIGSERIAL,
  sku TEXT,
  name TEXT
);

And it already has some data in it:

INSERT INTO products (sku, name) VALUES ('CM01-R', 'Red Coffee Mug');
INSERT INTO products (sku, name) VALUES ('CM01-B', 'Blue Coffee Mug');

Given that we've already connected pg2kafka to it, and it has ran it's migrations, we can start tracking changes to the products table:

SELECT pg2kafka.setup('products', 'sku');

This will create snapshots of the current data in that table:

{
  "uuid": "ea76e080-6acd-413a-96b3-131a42ab1002",
  "external_id": "CM01-B",
  "statement": "SNAPSHOT",
  "data": {
    "id": 2,
    "sku": "CM01-B",
    "name": "Blue Coffee Mug"
  },
  "created_at": "2017-11-02T16:14:36.709116Z"
}
{
  "uuid": "e1c0008d-6b7a-455a-afa6-c1c2eebd65d3",
  "external_id": "CM01-R",
  "statement": "SNAPSHOT",
  "data": {
    "id": 1,
    "sku": "CM01-R",
    "name": "Red Coffee Mug"
  },
  "created_at": "2017-11-02T16:14:36.709116Z"
}

Now once you start making changes to your table, you should start seeing events come in on the pg2kafka.shop_test.products topic:

UPDATE products SET name = 'Big Red Coffee Mug' WHERE sku = 'CM01-R';
{
  "uuid": "d6521ce5-4068-45e4-a9ad-c0949033a55b",
  "external_id": "CM01-R",
  "statement": "UPDATE",
  "data": {
    "name": "Big Red Coffee Mug"
  },
  "created_at": "2017-11-02T16:15:13.94077Z"
}

The producer topics are all in the form of pg2kafka.$database_name.$table_name, you need to make sure that this topic exists, or else pg2kafka will crash.

You can optionally prepend a namespace to the Kafka topic, by setting the TOPIC_NAMESPACE environment variable. When doing this, the final topic name would be pg2kafka.$namespace.$database_name.$table_name.

Cleanup

If you decide not to use pg2kafka anymore you can cleanup the Database triggers using the following command:

DROP SCHEMA pg2kafka CASCADE;

Development

Setup

Golang

You will need Go 1.9.

PostgreSQL

Set up a database and expose a connection string to it as an env variable, for local development we also specify sslmode=disable.

$ createdb pg2kafka_test
$ export DATABASE_URL="postgres://user:[email protected]/pg2kafka_test?sslmode=disable"

Kafka

Install Kafka if you don't already have it running. This is not required to run the tests, but it is required if you want to run pg2kafka locally against a real Kafka.

Create a topic for the table you want to track in your database:

kafka-topics \
  --zookeeper localhost:2181 \
  --create \
  --topic pg2kafka.pg2kafka_test.users \
  --replication-factor 1 \
  --partitions 3

Then export the Kafka host as an URL so pg2kafka can use it:

$ export KAFKA_BROKER="localhost:9092"

Running the service locally

Make sure you export the DATABASE_URL and KAFKA_BROKER, and also export PERFORM_MIGRATIONS=true.

$ go run main.go

To run the service without using Kafka, you can set a DRY_RUN=true flag, which will produce the messages to stdout.

Running tests

The only thing required for the tests to run is that you've set up a database and exposed a connection string to it as DATABASE_URL. All the necessary schemas, tables and triggers will be created by the tests.

$ ./script/test

License

pg2kafka is released under the ISC license. See LICENSE for details.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].