All Projects → izhangzhihao → Real-time-Data-Warehouse

izhangzhihao / Real-time-Data-Warehouse

Licence: other
Real-time Data Warehouse with Apache Flink & Apache Kafka & Apache Hudi

Programming Languages

Dockerfile
14818 projects
shell
77523 projects

Projects that are alternatives of or similar to Real-time-Data-Warehouse

debezium-incubator
Previously used repository for new Debezium modules and connectors in incubation phase (archived)
Stars: ✭ 89 (+71.15%)
Mutual labels:  cdc, change-data-capture, debezium
southpaw
⚾ Streaming left joins in Kafka for change data capture
Stars: ✭ 48 (-7.69%)
Mutual labels:  cdc, change-data-capture, debezium
scylla-cdc-source-connector
A Kafka source connector capturing Scylla CDC changes
Stars: ✭ 19 (-63.46%)
Mutual labels:  cdc, change-data-capture, debezium
azure-sql-db-change-stream-debezium
SQL Server Change Stream sample using Debezium
Stars: ✭ 74 (+42.31%)
Mutual labels:  cdc, change-data-capture, debezium
Debezium
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.
Stars: ✭ 5,937 (+11317.31%)
Mutual labels:  cdc, change-data-capture, debezium
LarkMidTable
LarkMidTable 是一站式开源的数据中台,实现中台的 基础建设,数据治理,数据开发,监控告警,数据服务,数据的可视化,实现高效赋能数据前台并提供数据服务的产品。
Stars: ✭ 873 (+1578.85%)
Mutual labels:  flink, flink-sql
litemall-dw
基于开源Litemall电商项目的大数据项目,包含前端埋点(openresty+lua)、后端埋点;数据仓库(五层)、实时计算和用户画像。大数据平台采用CDH6.3.2(已使用vagrant+ansible脚本化),同时也包含了Azkaban的workflow。
Stars: ✭ 36 (-30.77%)
Mutual labels:  flink, spark-sql
OLAP-cube
is an hypercube of data
Stars: ✭ 23 (-55.77%)
Mutual labels:  data-warehouse, data-warehousing
flink-connector-kudu
基于Apache-bahir-kudu-connector的flink-connector-kudu,支持Flink1.11.x DynamicTableSource/Sink,支持Range分区等
Stars: ✭ 40 (-23.08%)
Mutual labels:  flink, flink-sql
debezium.github.io
Source for the Debezium website; Please log issues in our tracker at https://issues.redhat.com/projects/DBZ/.
Stars: ✭ 34 (-34.62%)
Mutual labels:  change-data-capture, debezium
dlink
Dinky is an out of the box one-stop real-time computing platform dedicated to the construction and practice of Unified Streaming & Batch and Unified Data Lake & Data Warehouse. Based on Apache Flink, Dinky provides the ability to connect many big data frameworks including OLAP and Data Lake.
Stars: ✭ 1,535 (+2851.92%)
Mutual labels:  flink, datalake
pgcapture
A scalable Netflix DBLog implementation for PostgreSQL
Stars: ✭ 94 (+80.77%)
Mutual labels:  cdc, change-data-capture
Realtime
Listen to your to PostgreSQL database in realtime via websockets. Built with Elixir.
Stars: ✭ 4,278 (+8126.92%)
Mutual labels:  cdc, change-data-capture
flink-demo
Flink Demo
Stars: ✭ 39 (-25%)
Mutual labels:  flink, flink-sql
oracdc
Oracle database CDC (Change Data Capture)
Stars: ✭ 51 (-1.92%)
Mutual labels:  cdc, change-data-capture
TiBigData
TiDB connectors for Flink/Hive/Presto
Stars: ✭ 192 (+269.23%)
Mutual labels:  flink, cdc
kafka-connect-http
Kafka Connect connector that enables Change Data Capture from JSON/HTTP APIs into Kafka.
Stars: ✭ 81 (+55.77%)
Mutual labels:  cdc, change-data-capture
MySqlCdc
MySQL/MariaDB binlog replication client for .NET
Stars: ✭ 71 (+36.54%)
Mutual labels:  cdc, change-data-capture
OpenLogReplicator
Open Source Oracle database CDC written purely in C++. Reads transactions directly from database redo log files and streams in JSON or Protobuf format to: Kafka, RocketMQ, flat file, network stream (plain TCP/IP or ZeroMQ)
Stars: ✭ 112 (+115.38%)
Mutual labels:  cdc, change-data-capture
dt-sql-parser
SQL Parsers for BigData, built with antlr4.
Stars: ✭ 135 (+159.62%)
Mutual labels:  spark-sql, flink-sql

Real-time Data Warehouse

Real-time Data Warehouse using: Flink & Kafka | Flink & Hudi | Spark & Delta | Flink & Hudi & E-commerce

demo_overview

Getting the setup up and running

docker compose build

docker compose up -d

Check everything really up and running

docker compose ps

You should be able to access the Flink Web UI (http://localhost:8081), as well as Kibana (http://localhost:5601).

Postgres

Start the Postgres client to have a look at the source tables and run some DML statements later:

docker compose exec postgres env PGOPTIONS="--search_path=claims" bash -c 'psql -U $POSTGRES_USER postgres'

What tables are we dealing with?

SELECT * FROM information_schema.tables WHERE table_schema = 'claims';

Debezium

Start the Debezium Postgres connector using the configuration provided in the register-postgres.json file:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-members.json

Check that the connector is running:

curl http://localhost:8083/connectors/claims-connector/status # | jq

The first time it connects to a Postgres server, Debezium takes a consistent snapshot of all database schemas; so, you should see that the pre-existing records in the accident_claims table have already been pushed into your Kafka topic:

docker compose exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic pg_claims.claims.accident_claims

ℹ️ Have a quick read about the structure of these events in the Debezium documentation.

Is it working?

In the tab you used to start the Postgres client, you can now run some DML statements to see that the changes are propagated all the way to your Kafka topic:

INSERT INTO accident_claims (claim_total, claim_total_receipt, claim_currency, member_id, accident_date, accident_type,accident_detail, claim_date, claim_status) VALUES (500, 'PharetraMagnaVestibulum.tiff', 'AUD', 321, '2020-08-01 06:43:03', 'Collision', 'Blue Ringed Octopus','2020-08-10 09:39:31', 'INITIAL');
UPDATE accident_claims
SET claim_total_receipt = 'CorrectReceipt.pdf'
WHERE claim_id = 1001;
DELETE
FROM accident_claims
WHERE claim_id = 1001;

In the output of your Kafka console consumer, you should now see three consecutive events with op values equal to c (an insert event), u (an update event) and d (a delete event).

Flink connectors

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/ https://flink-packages.org/categories/connectors https://github.com/knaufk/flink-faker/

Datasource ingestion

Start the Flink SQL Client:

docker compose exec sql-client ./sql-client.sh

OR

docker compose exec sql-client ./sql-client-submit.sh

test

CREATE TABLE t1(
  uuid VARCHAR(20), -- you can use 'PRIMARY KEY NOT ENFORCED' syntax to mark the field as record key
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '/data/t1',
  'write.tasks' = '1', -- default is 4 ,required more resource
  'compaction.tasks' = '1', -- default is 10 ,required more resource
  'table.type' = 'COPY_ON_WRITE', -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
  'read.tasks' = '1', -- default is 4 ,required more resource
  'read.streaming.enabled' = 'true',  -- this option enable the streaming read
  'read.streaming.start-commit' = '20210712134429', -- specifies the start commit instant time
  'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- insert data using values
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

SELECT * FROM t1;

Register a Postgres catalog , so you can access the metadata of the external tables over JDBC:

CREATE CATALOG datasource WITH (
    'type'='jdbc',
    'property-version'='1',
    'base-url'='jdbc:postgresql://postgres:5432/',
    'default-database'='postgres',
    'username'='postgres',
    'password'='postgres'
);
CREATE DATABASE IF NOT EXISTS datasource;
CREATE TABLE datasource.accident_claims WITH (
                                            'connector' = 'kafka',
                                            'topic' = 'pg_claims.claims.accident_claims',
                                            'properties.bootstrap.servers' = 'kafka:9092',
                                            'properties.group.id' = 'accident_claims-consumer-group',
                                            'format' = 'debezium-json',
                                            'scan.startup.mode' = 'earliest-offset'
                                            ) LIKE datasource.postgres.`claims.accident_claims` (EXCLUDING ALL);

OR generate data from datagen connector:

CREATE TABLE datasource.accident_claims(
    claim_id            BIGINT,
    claim_total         DOUBLE,
    claim_total_receipt VARCHAR(50),
    claim_currency      VARCHAR(3),
    member_id           INT,
    accident_date       DATE,
    accident_type       VARCHAR(20),
    accident_detail     VARCHAR(20),
    claim_date          DATE,
    claim_status        VARCHAR(10),
    ts_created          TIMESTAMP(3),
    ts_updated          TIMESTAMP(3)
                                          ) WITH (
                                            'connector' = 'datagen',
                                            'rows-per-second' = '100'
                                            );

and members table:

CREATE TABLE datasource.members WITH (
                                    'connector' = 'kafka',
                                    'topic' = 'pg_claims.claims.members',
                                    'properties.bootstrap.servers' = 'kafka:9092',
                                    'properties.group.id' = 'members-consumer-group',
                                    'format' = 'debezium-json',
                                    'scan.startup.mode' = 'earliest-offset'
                                    ) LIKE datasource.postgres.`claims.members` ( EXCLUDING OPTIONS);

OR generate data from datagen connector:

CREATE TABLE datasource.members(
    id                BIGINT,
    first_name        VARCHAR(50),
    last_name         VARCHAR(50),
    address           VARCHAR(50),
    address_city      VARCHAR(10),
    address_country   VARCHAR(10),
    insurance_company VARCHAR(25),
    insurance_number  VARCHAR(50),
    ts_created        TIMESTAMP(3),
    ts_updated        TIMESTAMP(3)
                                    ) WITH (
                                            'connector' = 'datagen',
                                            'rows-per-second' = '100'
                                            );

Check data:

SELECT * FROM datasource.accident_claims;
SELECT * FROM datasource.members;

DWD

Create a database in DWD layer:

CREATE DATABASE IF NOT EXISTS dwd;
CREATE TABLE dwd.accident_claims
(
    claim_id            BIGINT,
    claim_total         DOUBLE,
    claim_total_receipt VARCHAR(50),
    claim_currency      VARCHAR(3),
    member_id           INT,
    accident_date       DATE,
    accident_type       VARCHAR(20),
    accident_detail     VARCHAR(20),
    claim_date          DATE,
    claim_status        VARCHAR(10),
    ts_created          TIMESTAMP(3),
    ts_updated          TIMESTAMP(3),
    ds                  DATE,
    PRIMARY KEY (claim_id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
  'connector'='hudi',
  'path' = '/data/dwd/accident_claims',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'write.batch.size' = '1',
  'write.tasks' = '1',
  'compaction.tasks' = '1',
  'compaction.delta_seconds' = '60',
  'write.precombine.field' = 'ts_updated',
  'read.tasks' = '1',
  'read.streaming.check-interval' = '5',
  'read.streaming.start-commit' = '20210712134429',
  'index.bootstrap.enabled' = 'true'
);
CREATE TABLE dwd.members
(
    id                BIGINT,
    first_name        VARCHAR(50),
    last_name         VARCHAR(50),
    address           VARCHAR(50),
    address_city      VARCHAR(10),
    address_country   VARCHAR(10),
    insurance_company VARCHAR(25),
    insurance_number  VARCHAR(50),
    ts_created        TIMESTAMP(3),
    ts_updated        TIMESTAMP(3),
    ds                DATE,
    PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
      'connector'='hudi',
      'path'='/data/dwd/members',
      'table.type' = 'MERGE_ON_READ',
      'read.streaming.enabled' = 'true',
      'write.batch.size' = '1',
      'write.tasks' = '1',
      'compaction.tasks' = '1',
      'compaction.delta_seconds' = '60',
      'write.precombine.field' = 'ts_updated',
      'read.tasks' = '1',
      'read.streaming.check-interval' = '5',
      'read.streaming.start-commit' = '20210712134429',
      'index.bootstrap.enabled' = 'true'
);

and submit a continuous query to the Flink cluster that will write the data from datasource into dwd table(ES):

INSERT INTO dwd.accident_claims
SELECT claim_id,
       claim_total,
       claim_total_receipt,
       claim_currency,
       member_id,
       CAST (accident_date as DATE),
       accident_type,
       accident_detail,
       CAST (claim_date as DATE),
       claim_status,
       CAST (ts_created as TIMESTAMP),
       CAST (ts_updated as TIMESTAMP),
       claim_date
       --CAST (SUBSTRING(claim_date, 0, 9) as DATE)
FROM datasource.accident_claims;
INSERT INTO dwd.members
SELECT id,
       first_name,
       last_name,
       address,
       address_city,
       address_country,
       insurance_company,
       insurance_number,
       CAST (ts_created as TIMESTAMP),
       CAST (ts_updated as TIMESTAMP),
       ts_created
       --CAST (SUBSTRING(ts_created, 0, 9) as DATE)
FROM datasource.members;

Check data:

SELECT * FROM dwd.accident_claims;
SELECT * FROM dwd.members;

DWB

Create a database in DWB layer:

CREATE DATABASE IF NOT EXISTS dwb;
CREATE TABLE dwb.accident_claims
(
    claim_id            BIGINT,
    claim_total         DOUBLE,
    claim_total_receipt VARCHAR(50),
    claim_currency      VARCHAR(3),
    member_id           INT,
    accident_date       DATE,
    accident_type       VARCHAR(20),
    accident_detail     VARCHAR(20),
    claim_date          DATE,
    claim_status        VARCHAR(10),
    ts_created          TIMESTAMP(3),
    ts_updated          TIMESTAMP(3),
    ds                  DATE,
    PRIMARY KEY (claim_id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
  'connector'='hudi',
  'path' = '/data/dwb/accident_claims',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'write.batch.size' = '1',
  'write.tasks' = '1',
  'compaction.tasks' = '1',
  'compaction.delta_seconds' = '60',
  'write.precombine.field' = 'ts_updated',
  'read.tasks' = '1',
  'read.streaming.check-interval' = '5',
  'read.streaming.start-commit' = '20210712134429',
  'index.bootstrap.enabled' = 'true'
);
CREATE TABLE dwb.members
(
    id                BIGINT,
    first_name        VARCHAR(50),
    last_name         VARCHAR(50),
    address           VARCHAR(50),
    address_city      VARCHAR(10),
    address_country   VARCHAR(10),
    insurance_company VARCHAR(25),
    insurance_number  VARCHAR(50),
    ts_created        TIMESTAMP(3),
    ts_updated        TIMESTAMP(3),
    ds                DATE,
    PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
      'connector'='hudi',
      'path'='/data/dwb/members',
      'table.type' = 'MERGE_ON_READ',
      'read.streaming.enabled' = 'true',
      'write.batch.size' = '1',
      'write.tasks' = '1',
      'compaction.tasks' = '1',
      'compaction.delta_seconds' = '60',
      'write.precombine.field' = 'ts_updated',
      'read.tasks' = '1',
      'read.streaming.check-interval' = '5',
      'read.streaming.start-commit' = '20210712134429',
      'index.bootstrap.enabled' = 'true'
);
INSERT INTO dwb.accident_claims
SELECT claim_id,
       claim_total,
       claim_total_receipt,
       claim_currency,
       member_id,
       accident_date,
       accident_type,
       accident_detail,
       claim_date,
       claim_status,
       ts_created,
       ts_updated,
       ds
FROM dwd.accident_claims;
INSERT INTO dwb.members
SELECT id,
       first_name,
       last_name,
       address,
       address_city,
       address_country,
       insurance_company,
       insurance_number,
       ts_created,
       ts_updated,
       ds
FROM dwd.members;

Check data:

SELECT * FROM dwb.accident_claims;
SELECT * FROM dwb.members;

DWS

Create a database in DWS layer:

CREATE DATABASE IF NOT EXISTS dws;
CREATE TABLE dws.insurance_costs
(
    es_key            STRING PRIMARY KEY NOT ENFORCED,
    insurance_company STRING,
    accident_detail   STRING,
    accident_agg_cost DOUBLE
) WITH (
      'connector' = 'elasticsearch-7', 'hosts' = 'http://elasticsearch:9200', 'index' = 'agg_insurance_costs'
      );

and submit a continuous query to the Flink cluster that will write the aggregated insurance costs per insurance_company, bucketed by accident_detail (or, what animals are causing the most harm in terms of costs):

INSERT INTO dws.insurance_costs
SELECT UPPER(SUBSTRING(m.insurance_company, 0, 4) || '_' || SUBSTRING(ac.accident_detail, 0, 4)) es_key,
       m.insurance_company,
       ac.accident_detail,
       SUM(ac.claim_total) member_total
FROM dwb.accident_claims ac
         JOIN dwb.members m
              ON ac.member_id = m.id
WHERE ac.claim_status <> 'DENIED'
GROUP BY m.insurance_company, ac.accident_detail;

Finally, create a simple dashboard in Kibana

References

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].