All Projects → erdemcer → Kafka Connect Oracle

erdemcer / Kafka Connect Oracle

Licence: apache-2.0
Kafka Source Connector For Oracle

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to Kafka Connect Oracle

Kafka Connect Tools
Kafka Connect Tooling
Stars: ✭ 115 (-55.25%)
Mutual labels:  kafka, kafka-connect
Mongo Kafka
MongoDB Kafka Connector
Stars: ✭ 166 (-35.41%)
Mutual labels:  kafka, kafka-connect
Kafka Connect Mongodb
**Unofficial / Community** Kafka Connect MongoDB Sink Connector - Find the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Stars: ✭ 137 (-46.69%)
Mutual labels:  kafka, kafka-connect
oracdc
Oracle database CDC (Change Data Capture)
Stars: ✭ 51 (-80.16%)
Mutual labels:  oracle, kafka-connect
Ksql Udf Deep Learning Mqtt Iot
Deep Learning UDF for KSQL for Streaming Anomaly Detection of MQTT IoT Sensor Data
Stars: ✭ 219 (-14.79%)
Mutual labels:  kafka, kafka-connect
Kafka Connect
equivalent to kafka-connect 🔧 for nodejs ✨🐢🚀✨
Stars: ✭ 102 (-60.31%)
Mutual labels:  kafka, kafka-connect
Kafka Connect Storage Cloud
Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Stars: ✭ 153 (-40.47%)
Mutual labels:  kafka, kafka-connect
Kafka Connect Protobuf Converter
Protobuf converter plugin for Kafka Connect
Stars: ✭ 79 (-69.26%)
Mutual labels:  kafka, kafka-connect
Hivemq Mqtt Tensorflow Kafka Realtime Iot Machine Learning Training Inference
Real Time Big Data / IoT Machine Learning (Model Training and Inference) with HiveMQ (MQTT), TensorFlow IO and Apache Kafka - no additional data store like S3, HDFS or Spark required
Stars: ✭ 204 (-20.62%)
Mutual labels:  kafka, kafka-connect
Strimzi Kafka Operator
Apache Kafka running on Kubernetes
Stars: ✭ 2,833 (+1002.33%)
Mutual labels:  kafka, kafka-connect
Streamx
kafka-connect-s3 : Ingest data from Kafka to Object Stores(s3)
Stars: ✭ 96 (-62.65%)
Mutual labels:  kafka, kafka-connect
Datafaker
Datafaker is a large-scale test data and flow test data generation tool. Datafaker fakes data and inserts to varied data sources. 测试数据生成工具
Stars: ✭ 327 (+27.24%)
Mutual labels:  oracle, kafka
Dataengineeringproject
Example end to end data engineering project.
Stars: ✭ 82 (-68.09%)
Mutual labels:  kafka, kafka-connect
Kukulcan
A REPL for Apache Kafka
Stars: ✭ 103 (-59.92%)
Mutual labels:  kafka, kafka-connect
Kspp
A high performance/ real-time C++ Kafka streams framework (C++17)
Stars: ✭ 80 (-68.87%)
Mutual labels:  kafka, kafka-connect
Kafka Docker Playground
Playground for Kafka/Confluent Docker experimentations
Stars: ✭ 140 (-45.53%)
Mutual labels:  kafka, kafka-connect
Cp Docker Images
[DEPRECATED] Docker images for Confluent Platform.
Stars: ✭ 975 (+279.38%)
Mutual labels:  kafka, kafka-connect
Kattlo Cli
Kattlo CLI Project
Stars: ✭ 58 (-77.43%)
Mutual labels:  kafka, kafka-connect
Mirus
Mirus is a cross data-center data replication tool for Apache Kafka
Stars: ✭ 171 (-33.46%)
Mutual labels:  kafka, kafka-connect
Kafka Ui
Open-Source Web GUI for Apache Kafka Management
Stars: ✭ 230 (-10.51%)
Mutual labels:  kafka, kafka-connect

Kafka Connect Oracle

kafka-connect-oracle is a Kafka source connector for capturing all row based DML changes from Oracle database and streaming these changes to Kafka. Change data capture logic is based on Oracle LogMiner solution.

Only committed changes are pulled from Oracle which are Insert, Update, Delete operations. All streamed messages have related full "sql_redo" statement and parsed fields with values of sql statements. Parsed fields and values are kept in proper field type in schemas.

Messages have old (before change) and new (after change) values of row fields for DML operations. Insert operation has only new values of row tagged as "data". Update operation has new data tagged as "data" and also contains old values of row before change tagged as "before". Delete operation only contains old data tagged as "before".

News

  • Ability to only capture specified operation like INSERT / UPDATE / DELETE .
  • DDL Capture Support.DDL statements can be captured via this connector.All captured DDL statements are published into <db.name.alias>.<SEGMENT_OWNER>."_GENERIC_DDL" name formatted topic if no topic configuration property is set in property file.If a topic is set for configuration propery , all captured DDL statements are published into this topic with other statements.
  • Partial rollback detection has been implemented
  • With new relases of Oracle database like 19c, CONTINUOUS_MINE option is desupported and Logminer has lost ability mining of redo and archive logs continuously. First release of this connector was based on this property. This Connector now has the ability to capture all changed data(DML changes) without CONTINUOUS_MINE option for new relases of Oracle database. For this change all test have been done on single instances. Working on RAC support
  • Table blacklist configuration property can be used to not capture specified tables or schemas

Sample Data

Insert :

{    
    "SCN": 768889966828,
    "SEG_OWNER": "TEST",
    "TABLE_NAME": "TEST4",
    "TIMESTAMP": 1537958606000,
    "SQL_REDO": "insert into \"TEST\".\"TEST4\"(\"ID\",\"NAME\",\"PROCESS_DATE\",\"CDC_TIMESTAMP\") values (78238,NULL,NULL,TIMESTAMP ' 2018-09-26 10:43:26.643')",
    "OPERATION": "INSERT",
    "data": {
        "ID": 78238.0,
        "NAME": null,
        "PROCESS_DATE": null,
        "CDC_TIMESTAMP": 1537947806643
    },
    "before": null 
}

Update :

{
    "SCN": 768889969452,
    "SEG_OWNER": "TEST",
    "TABLE_NAME": "TEST4",
    "TIMESTAMP": 1537964106000,
    "SQL_REDO": "update \"TEST\".\"TEST4\" set \"NAME\" = 'XaQCZKDINhTQBMevBZGGDjfPAsGqTUlCTyLThpmZ' where \"ID\" = 78238 and \"NAME\" IS NULL and \"PROCESS_DATE\" IS NULL and \"CDC_TIMESTAMP\" = TIMESTAMP ' 2018-09-26 10:43:26.643'",
    "OPERATION": "UPDATE",
    "data": {
        "ID": 78238.0,
        "NAME": "XaQCZKDINhTQBMevBZGGDjfPAsGqTUlCTyLThpmZ",
        "PROCESS_DATE": null,
        "CDC_TIMESTAMP": 1537947806643
    },
    "before": {
        "ID": 78238.0,
        "NAME": null,
        "PROCESS_DATE": null,
        "CDC_TIMESTAMP": 1537947806643
    }
}

Delete :

{
    "SCN": 768889969632,
    "SEG_OWNER": "TEST",
    "TABLE_NAME": "TEST4",
    "TIMESTAMP": 1537964142000,
    "SQL_REDO": "delete from \"TEST\".\"TEST4\" where \"ID\" = 78238 and \"NAME\" = 'XaQCZKDINhTQBMevBZGGDjfPAsGqTUlCTyLThpmZ' and \"PROCESS_DATE\" IS NULL and \"CDC_TIMESTAMP\" = TIMESTAMP ' 2018-09-26 10:43:26.643'",
    "OPERATION": "DELETE",
    "data": null,
    "before": {
        "ID": 78238.0,
        "NAME": "XaQCZKDINhTQBMevBZGGDjfPAsGqTUlCTyLThpmZ",
        "PROCESS_DATE": null,
        "CDC_TIMESTAMP": 1537947806643
    }
}

Setting Up

The database must be in archivelog mode and supplemental logging must be enabled.

On database server

sqlplus / as sysdba    
SQL>shutdown immediate
SQL>startup mount
SQL>alter database archivelog;
SQL>alter database open;

Enable supplemental logging

sqlplus / as sysdba    
SQL>alter database add supplemental log data (all) columns;

In order to execute connector successfully, connector must be started with privileged Oracle user. If given user has DBA role, this step can be skipped. Otherwise, the following scripts need to be executed to create a privileged user:

create role logmnr_role;
grant create session to logmnr_role;
grant  execute_catalog_role,select any transaction ,select any dictionary to logmnr_role;
create user kminer identified by kminerpass;
grant  logmnr_role to kminer;
alter user kminer quota unlimited on users;

In a multitenant configuration, the privileged Oracle user must be a "common user" and some multitenant-specific grants need to be made:

create role c##logmnr_role;
grant create session to c##logmnr_role;
grant  execute_catalog_role,select any transaction ,select any dictionary,logmining to c##logmnr_role;
create user c##kminer identified by kminerpass;
grant  c##logmnr_role to c##kminer;
alter user c##kminer quota unlimited on users set container_data = all container = current;

Configuration

Configuration Properties

Name Type Description
name String Connector name
connector.class String The name of the java class for this connector.
db.name.alias String Identifier name for database like Test,Dev,Prod or specific name to identify the database.This name will be used as header of topics and schema names.
tasks.max Integer Maximum number of tasks to create.This connector uses a single task.
topic String Name of the topic that the messages will be written to.If it is set a value all messages will be written into this declared topic , if it is not set, for each database table a topic will be created dynamically.
db.name String Service name or sid of the database to connect.Mostly database service name is used.
db.hostname String Ip address or hostname of Oracle database server.
db.port Integer Port number of Oracle database server.
db.user String Name of database user which is used to connect to database to start and execute logminer. This user must provide necessary privileges mentioned above.
db.user.password String Password of database user.
db.fetch.size Integer This config property sets Oracle row fetch size value.
table.whitelist String A comma separated list of database schema or table names which will be captured.
For all schema capture <SCHEMA_NAME>.*
For table capture <SCHEMA_NAME>.<TABLE_NAME> must be specified.
parse.dml.data Boolean If it is true , captured sql DML statement will be parsed into fields and values.If it is false only sql DML statement is published.
reset.offset Boolean If it is true , offset value will be set to current SCN of database when connector started.If it is false connector will start from last offset value.
start.scn Long If it is set , offset value will be set this specified value and logminer will start at this SCN.If connector would like to be started from desired SCN , this property can be used.
multitenant Boolean If true, multitenant support is enabled. If false, single instance configuration will be used.
table.blacklist String A comma separated list of database schema or table names which will not be captured.
For all schema capture <SCHEMA_NAME>.*
For table capture <SCHEMA_NAME>.<TABLE_NAME> must be specified.
dml.types String A comma separated list of DML operations (INSERT,UPDATE,DELETE). If not specified the default behavior of replicating all DML operations occurs,if specified only specified operations are captured.

Example Config

name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=test
tasks.max=1
topic=cdctest
db.name=testdb
db.hostname=10.1.X.X
db.port=1521
db.user=kminer
db.user.password=kminerpass
db.fetch.size=1
table.whitelist=TEST.*,TEST2.TABLE2
table.blacklist=TEST2.TABLE3
parse.dml.data=true
reset.offset=false
multitenant=false

Building and Running

mvn clean package

Copy kafka-connect-oracle-1.0.jar and lib/ojdbc7.jar to KAFKA_HOME/lib folder. If CONFLUENT platform is using for kafka cluster , copy ojdbc7.jar and  kafka-connect-oracle-1.0.jar to $CONFLUENT_HOME/share/java/kafka-connect-jdbc folder.

Copy config/OracleSourceConnector.properties file to $KAFKA_HOME/config . For CONFLUENT copy properties file to $CONFLUENT_HOME/etc/kafka-connect-jdbc

To start connector

cd $KAFKA_HOME
./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/OracleSourceConnector.properties

In order to start connector with Avro serialization 

cd $CONFLUENT_HOME

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/OracleSourceConnector.properties 

Do not forget to populate connect-standalone.properties  or connect-avro-standalone.properties with the appropriate hostnames and ports.

Todo:

1. Implementation for DDL operations
2. Support for other Oracle specific data types
3. New implementations
4. Performance Tuning
5. Initial data load
6. Bug fix    
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].