All Projects → rueian → pgcapture

rueian / pgcapture

Licence: Apache-2.0 license
A scalable Netflix DBLog implementation for PostgreSQL

Programming Languages

go
31211 projects - #10 most used programming language
python
139335 projects - #7 most used programming language
PLpgSQL
1095 projects
c
50402 projects - #5 most used programming language
Dockerfile
14818 projects
Makefile
30231 projects
shell
77523 projects

Projects that are alternatives of or similar to pgcapture

MySqlCdc
MySQL/MariaDB binlog replication client for .NET
Stars: ✭ 71 (-24.47%)
Mutual labels:  replication, cdc, change-data-capture
redis-connect-dist
Real-Time Event Streaming & Change Data Capture
Stars: ✭ 21 (-77.66%)
Mutual labels:  replication, cdc
Realtime
Listen to your to PostgreSQL database in realtime via websockets. Built with Elixir.
Stars: ✭ 4,278 (+4451.06%)
Mutual labels:  cdc, change-data-capture
walrus
Applying RLS to PostgreSQL WAL
Stars: ✭ 59 (-37.23%)
Mutual labels:  replication, cdc
debezium-incubator
Previously used repository for new Debezium modules and connectors in incubation phase (archived)
Stars: ✭ 89 (-5.32%)
Mutual labels:  cdc, change-data-capture
oracdc
Oracle database CDC (Change Data Capture)
Stars: ✭ 51 (-45.74%)
Mutual labels:  cdc, change-data-capture
pg2k4j
Postgresql To Kinesis For Java
Stars: ✭ 69 (-26.6%)
Mutual labels:  replication, cdc
kafka-connect-http
Kafka Connect connector that enables Change Data Capture from JSON/HTTP APIs into Kafka.
Stars: ✭ 81 (-13.83%)
Mutual labels:  cdc, change-data-capture
awesome-storage
A curated list of storage open source tools. Backups, redundancy, sharing, distribution, encryption, etc.
Stars: ✭ 324 (+244.68%)
Mutual labels:  backup, replication
radio
Redundant Array of Distributed Independent Objectstores in short RADIO performs synchronous mirroring, erasure coding across multiple object stores
Stars: ✭ 25 (-73.4%)
Mutual labels:  backup, replication
Wal E
Continuous Archiving for Postgres
Stars: ✭ 3,313 (+3424.47%)
Mutual labels:  backup, replication
OpenLogReplicator
Open Source Oracle database CDC written purely in C++. Reads transactions directly from database redo log files and streams in JSON or Protobuf format to: Kafka, RocketMQ, flat file, network stream (plain TCP/IP or ZeroMQ)
Stars: ✭ 112 (+19.15%)
Mutual labels:  cdc, change-data-capture
scylla-cdc-source-connector
A Kafka source connector capturing Scylla CDC changes
Stars: ✭ 19 (-79.79%)
Mutual labels:  cdc, change-data-capture
Debezium
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.
Stars: ✭ 5,937 (+6215.96%)
Mutual labels:  cdc, change-data-capture
Real-time-Data-Warehouse
Real-time Data Warehouse with Apache Flink & Apache Kafka & Apache Hudi
Stars: ✭ 52 (-44.68%)
Mutual labels:  cdc, change-data-capture
azure-sql-db-change-stream-debezium
SQL Server Change Stream sample using Debezium
Stars: ✭ 74 (-21.28%)
Mutual labels:  cdc, change-data-capture
zap
Maintain and replicate ZFS snapshots
Stars: ✭ 48 (-48.94%)
Mutual labels:  backup, replication
Zrepl
One-stop ZFS backup & replication solution
Stars: ✭ 327 (+247.87%)
Mutual labels:  backup, replication
southpaw
⚾ Streaming left joins in Kafka for change data capture
Stars: ✭ 48 (-48.94%)
Mutual labels:  cdc, change-data-capture
libio
libio is a cross platform high performance io library written in C. It provides ability to write event driven servers and applications with continuous code without callback hell
Stars: ✭ 16 (-82.98%)
Mutual labels:  event-driven

pgcapture

A scalable Netflix DBLog implementation for PostgreSQL

circleci Maintainability Test Coverage

overview

Features

  • DDL commands are also captured
  • One unified gRPC Streaming API for consuming the latest changes and on-demand dumps
  • The changes and dumps are streamed in Postgres Binary Representation to save bandwidth

Improvements to Netflix DBLog

  • Dumps are neither queried from source database nor injected into the source CDC stream. Instead, They are dumped from the logical replicas and are injected into the selected downstreams by the grpc gateway service.
  • Therefore, the on-demand dump process can be scaled by adding more logical replicas and consumers. And most importantly, dumps process will not have impact to source database as well as other downstream consumers who don't need those dumps.
  • Primary keys of tables aren't limited to be a single numeric column, because dumps are performed by PostgreSQL TID Scan instead of performed on the primary key.

Use cases

  • Robust Microservice Event Sourcing
  • Data synchronization, Moving data to other databases (ex. for OLAP)
  • Upgrade PostgreSQL with minimum downtime

Dependencies

  • pglogical postgresql extension
  • pgcapture postgresql extension

See ./hack/postgres/Dockerfile for installation guide.

Consume changes with Golang

package main

import (
    "context"

    "github.com/jackc/pgtype"
    "github.com/rueian/pgcapture/pkg/pgcapture"
    "google.golang.org/grpc"
)

// MyTable implements pgcapture.Model interface
// and will be decoded from change that matching the TableName()
type MyTable struct {
    ID    pgtype.Int4 `pg:"id"`       // the field needed to be decoded should be a pgtype struct, 
    Value pgtype.Text `pg:"my_value"` // and has a 'pg' tag specifying the name mapping explicitly
}

func (t *MyTable) TableName() (schema, table string) {
    return "public", "my_table"
}

func (t MyTable) MarshalJSON() ([]byte, error) {
    return pgcapture.MarshalJSON(&t) // ignore unchanged TOAST field
}

func main() {
    ctx := context.Background()

    conn, _ := grpc.Dial("127.0.0.1:1000", grpc.WithInsecure())
    defer conn.Close()

    consumer := pgcapture.NewConsumer(ctx, conn, pgcapture.ConsumerOption{ 
        // the uri identify which change stream you want.
        // you can implement dblog.SourceResolver to customize gateway behavior based on uri
        URI: "my_subscription_id", 
    })
    defer consumer.Stop()
	
    consumer.Consume(map[pgcapture.Model]pgcapture.ModelHandlerFunc{
        &MyTable{}: func(change pgcapture.Change) error {
            row := change.New.(*MyTable) 
            // and then handle the decoded change event

            if row.Value.Status == pgtype.Undefined {
                // handle the unchanged toast field
            }

            return nil
        },
    })
}

Handling unchanged TOAST field

Since unchanged TOAST fields will not be present in the change stream, the corresponding model fields will remain undefined and have no value. Users should verify them by checking the field status.

The pgcapture.MarshalJSON is a handy json.Marshaler that just ignore those undefined fields.

Customize the dblog.SourceResolver

The provided gateway sub command will start a gateway server with dblog.StaticAgentPulsarResolver which reads a static URI resolving config. However, it is recommended to implement your own dblog.SourceResolver based on the URI consumer provided,

package main

import (
    "context"
    "net"
	
    "github.com/rueian/pgcapture/pkg/dblog"
    "github.com/rueian/pgcapture/pkg/pb"
    "github.com/rueian/pgcapture/pkg/source"
    "google.golang.org/grpc"
)

type MySourceResolver struct {}

func (m *MySourceResolver) Source(ctx context.Context, uri string) (source.RequeueSource, error) {
    // decide where to fetch latest change based on uri
}

func (m *MySourceResolver) Dumper(ctx context.Context, uri string) (dblog.SourceDumper, error) {
    // decide where to fetch on-demand dumps based on uri
}

func main() {
    // connect to dump controller
    controlConn, _ := grpc.Dial("127.0.0.1:10001", grpc.WithInsecure())
	
    gateway := &dblog.Gateway{
        SourceResolver: &MySourceResolver{}, 
        DumpInfoPuller: &dblog.GRPCDumpInfoPuller{Client: pb.NewDBLogControllerClient(controlConn)},
    }
    serveGRPC(gateway, "0.0.0.0:10000")
}
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].