All Projects → averemee-si → oracdc

averemee-si / oracdc

Licence: Apache-2.0 License
Oracle database CDC (Change Data Capture)

Programming Languages

java
68154 projects - #9 most used programming language
shell
77523 projects

Projects that are alternatives of or similar to oracdc

kafka-connect-http
Kafka Connect connector that enables Change Data Capture from JSON/HTTP APIs into Kafka.
Stars: ✭ 81 (+58.82%)
Mutual labels:  kafka-connect, cdc, change-data-capture
kafka-connect-iot-mqtt-connector-example
Internet of Things Integration Example => Apache Kafka + Kafka Connect + MQTT Connector + Sensor Data
Stars: ✭ 170 (+233.33%)
Mutual labels:  kafka-connect, confluent-kafka, confluent-platform
scylla-cdc-source-connector
A Kafka source connector capturing Scylla CDC changes
Stars: ✭ 19 (-62.75%)
Mutual labels:  kafka-connect, cdc, change-data-capture
OpenLogReplicator
Open Source Oracle database CDC written purely in C++. Reads transactions directly from database redo log files and streams in JSON or Protobuf format to: Kafka, RocketMQ, flat file, network stream (plain TCP/IP or ZeroMQ)
Stars: ✭ 112 (+119.61%)
Mutual labels:  cdc, oracle-database, change-data-capture
debezium-incubator
Previously used repository for new Debezium modules and connectors in incubation phase (archived)
Stars: ✭ 89 (+74.51%)
Mutual labels:  kafka-connect, cdc, change-data-capture
Debezium
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.
Stars: ✭ 5,937 (+11541.18%)
Mutual labels:  kafka-connect, cdc, change-data-capture
Amazon-SP-API-CSharp
.Net C# library for the new Amazon Selling Partner API
Stars: ✭ 95 (+86.27%)
Mutual labels:  amazon, amazon-web-services
aws-sqs-sns-client
AWS SNS SQS client UI
Stars: ✭ 26 (-49.02%)
Mutual labels:  amazon, amazon-web-services
aws-ses-template-manager
A simple application offering an interface for CRUD management of AWS SES templates across all compatible regions and with your own choice of credentials profile. A great GUI productivity tool that can be setup and run locally in seconds (see readme).
Stars: ✭ 23 (-54.9%)
Mutual labels:  amazon, amazon-web-services
golang-tts
Text-to-Speach golang package based in Amazon Polly service
Stars: ✭ 19 (-62.75%)
Mutual labels:  amazon, amazon-web-services
southpaw
⚾ Streaming left joins in Kafka for change data capture
Stars: ✭ 48 (-5.88%)
Mutual labels:  cdc, change-data-capture
terraform-emr-spark-example
An example Terraform project that will configure a Secure and Customizable Spark Cluster on Amazon EMR.
Stars: ✭ 43 (-15.69%)
Mutual labels:  amazon, amazon-web-services
Real-time-Data-Warehouse
Real-time Data Warehouse with Apache Flink & Apache Kafka & Apache Hudi
Stars: ✭ 52 (+1.96%)
Mutual labels:  cdc, change-data-capture
ercole-agent
Proactive Software Asset Management. Agent component
Stars: ✭ 24 (-52.94%)
Mutual labels:  oracle, oracle-database
pgcapture
A scalable Netflix DBLog implementation for PostgreSQL
Stars: ✭ 94 (+84.31%)
Mutual labels:  cdc, change-data-capture
okcli
An Oracle-DB command line client
Stars: ✭ 47 (-7.84%)
Mutual labels:  oracle, oracle-database
docker-apex-stack
Utility scripts for creating an Oracle Application Express stack as a Docker container.
Stars: ✭ 67 (+31.37%)
Mutual labels:  oracle, oracle-database
selling-partner-sdk
Amazon Selling Partner JAVA SDK SP API
Stars: ✭ 15 (-70.59%)
Mutual labels:  amazon, amazon-web-services
azure-sql-db-change-stream-debezium
SQL Server Change Stream sample using Debezium
Stars: ✭ 74 (+45.1%)
Mutual labels:  cdc, change-data-capture
oracle-jdbc-tester
A simple command line Java application to test JDBC connection to Oracle database
Stars: ✭ 37 (-27.45%)
Mutual labels:  oracle, oracle-database

oracdc

oracdc is a software package for near real-time data integration and replication in heterogeneous IT environments. oracdc consist of two Apache Kafka Source Connector's and JDBC sink connector. oracdc provides data integration, transactional change data capture, and data replication between operational and analytical IT systems. Starting from Oracle RDBMS 12c various Oracle tools for CDC and/or replication are deprecated and desupported and replaced by Oracle Golden Gate. This project is not intended to be 100% replacement of expensive Oracle Golden Gate licenses however may help in many practical cases. Project was tested using Oracle E-Business Suite customer instance for transferring various information (mostly INV, ONT, WSH, GL & XLA tables) to further reporting and analytics in PostgreSQL database. We tested both short transactions (human data entry) and long transactions (various accounting programs) changing millions of rows in dozens of tables (description of some tables used: WSH_NEW_DELIVERIES, WSH_DELIVERY_ASSIGNMENTS, WSH_DELIVERY_DETAILS). oracdc Source Connector's compatible with Oracle RDBMS versions 10g, 11g, 12c, 18c, 19c, and 21c. If you need support for Oracle Database 9i and please send us an email at [email protected].

eu.solutions.a2.cdc.oracle.OraCdcLogMinerConnector

This Source Connector uses Oracle LogMiner as source for data changes. Connector is designed to minimize the side effects of using Oracle LogMiner, even for Oracle RDBMS versions with DBMS_LOGMNR.CONTINUOUS_MINE feature support oracdc does not use it. Instead, oracdc reads V$LOGMNR_CONTENTS and saves information with V$LOGMNR_CONTENTS.OPERATION in ('INSERT', 'DELETE', 'UPDATE') in Java off-heap memory structures provided by Chronicle Queue. This approach minimizes the load on the Oracle database server, but requires additional disk space on the server with oracdc installed. When restarting, all in progress transactions, records, and objects are saved, their status is written to the file defined by parameter a2.persistent.state.file . An example oracdc.state file is located in etc directory. oracdc's eu.solutions.a2.cdc.oracle.OraCdcLogMinerConnector connects to the following configurations of Oracle RDBMS:

  1. Standalone instance, or Primary Database of Oracle DataGuard Cluster/Oracle Active DataGuard Cluster, i.e. V$DATABASE.OPEN_MODE = READ WRITE
  2. Physical Standby Database of Oracle Active DataGuard cluster, i.e. V$DATABASE.OPEN_MODE = READ ONLY
  3. Physical Standby Database of Oracle DataGuard cluster, i.e. V$DATABASE.OPEN_MODE = MOUNTED. In this mode, a physical standby database is used to retrieve data using LogMiner and connection to primary database is used to perform strictly limited number of queries to data dictionary (ALL|CDB_OBJECTS, ALL|CDB_TABLES, and ALL|CDB_TAB_COLUMNS). This option allows you to promote a physical standby database to source of replication, eliminates LogMiner overhead from primary database, and decreases TCO of Oracle Database.
  4. Running in distributed configuration when source database generates redo log files and also contain dictionary and target database is compatible mining database (see Figure 22-1 in Using LogMiner to Analyze Redo Log Files). N.B. Currently only non-CDB distributed database configuration has tested, tests for CDB distributed database configuration are in progress now.

Monitoring

eu.solutions.a2.cdc.oracle.OraCdcLogMinerConnector publishes a number of metrics about the connector’s activities that can be monitored through JMX. For complete list of metrics please refer to LOGMINER-METRICS.md

Oracle Database SecureFiles and Large Objects including SYS.XMLTYPE

Oracle Database SecureFiles and Large Objects i.e. LOB's are supported from v0.9.7 when parameter a2.process.lobs set to true. CLOB type supported only for columns with DBA_LOBS.FORMAT='ENDIAN NEUTRAL'. If you need support for CLOB columns with DBA_LOBS.FORMAT='ENDIAN SPECIFIC' or XMLTYPE please send us an email at [email protected]. SYS.XMLTYPE data are supported from v0.9.8.2 when parameter a2.process.lobs set to true. For processing LOB's and SYS.XMLTYPE please do not forget to set Apache Kafka parameters according to size of LOB's:

  1. For Source connector: producer.max.request.size
  2. For broker: replica.fetch.max.bytes and message.max.bytes By default CLOB, NCLOB, and SYS.XMLTYPE data are compressed using LZ4 compression algorithm.

Large objects (BLOB/CLOB/NCLOB/XMLTYPE) transformation feature (oracdc 0.9.8.3+)

Apache Kafka is not isn't meant to handle large messages with size over 1MB. But Oracle RDBMS often is used as storage for unstructured information too. For breaking this barrier we designed LOB transformation feature. Imagine that you have a table with following structure

describe APPLSYS.FND_LOBS
Name                                      Null?    Type
 ----------------------------------------- -------- ----------------
 FILE_ID                                   NOT NULL NUMBER
 FILE_NAME                                          VARCHAR2(256)
 FILE_CONTENT_TYPE                         NOT NULL VARCHAR2(256)
 FILE_DATA                                          BLOB
 UPLOAD_DATE                                        DATE
 EXPIRATION_DATE                                    DATE
 PROGRAM_NAME                                       VARCHAR2(32)
 PROGRAM_TAG                                        VARCHAR2(32)
 LANGUAGE                                           VARCHAR2(4)
 ORACLE_CHARSET                                     VARCHAR2(30)
 FILE_FORMAT                               NOT NULL VARCHAR2(10)

and you need the data in this table including BLOB column FILE_DATA in the reporting subsystem, which is implemented in different database management system with limited large object support like Snowflake and you have very strict constraints for traffic through Apache Kafka brokers. The best way to solve the problem is in Reddite quae sunt Caesaris Caesari et quae sunt Dei Deo where RDBMS will perform all data manipulation and object storage will be used for storing large objects instead of storing it in BLOB column FILE_DATA. To achieve this:

  1. Create transformation implementation class
package com.example.oracdc;

import eu.solutions.a2.cdc.oracle.data.OraCdcLobTransformationsIntf;
// more imports required

public class TransformScannedDataInBlobs implements OraCdcLobTransformationsIntf {

	@Override
	public Schema transformSchema(String pdbName, String tableOwner, String tableName, OraColumn lobColumn,
			SchemaBuilder valueSchema) {
		if ("FND_LOBS".equals(tableName) &&
				"FILE_DATA".equals(lobColumn.getColumnName())) {
			final SchemaBuilder transformedSchemaBuilder = SchemaBuilder
					.struct()
					.optional()
					.name(lobColumn.getColumnName())
					.version(1);
			transformedSchemaBuilder.field("S3_URL", Schema.OPTIONAL_STRING_SCHEMA);
			valueSchema.field(lobColumn.getColumnName(), transformedSchemaBuilder.build());
			return transformedSchemaBuilder.build();
		} else {
			return OraCdcLobTransformationsIntf.super.transformSchema(
					pdbName, tableOwner, tableName, lobColumn,valueSchema);
		}
	}

	@Override
	public Struct transformData(String pdbName, String tableOwner, String tableName, OraColumn lobColumn, byte[] content, Struct keyStruct, Schema valueSchema) {
		if ("FND_LOBS".equals(tableName) &&
				"FILE_DATA".equals(lobColumn.getColumnName())) {
				final Struct valueStruct = new Struct(valueSchema);
				// ...
				final String s3ObjectKey = <SOME_INIQUE_S3_OBJECT_KEY_USING_KEY_STRUCT>
				// ...
				final S3Client s3Client = S3Client
										.builder()
										.region(<VALID_AWS_REGION>)
										.build();
				final PutObjectRequest por = PutObjectRequest
										.builder()
										.bucket(<VALID_S3_BACKET>)
										.key(s3ObjectKey)
										.build();
				s3Client.putObject(por, RequestBody.fromBytes(content));
				// ...
				
				valueStruct.put("S3_URL", s3ObjectKey);
				return valueStruct;
		}
		return null;
	}
}

  1. Set required oracdc parameters
a2.process.lobs=true
a2.lob.transformation.class=com.example.oracdc.TransformScannedDataInBlobs

JSON Datatype support for Oracle RDBMS 21c +

Unfortunately, for operations on the JSON data type, the result is returned as (below is results for transaction which includes JSON datatype)

select SQL_REDO from V$LOGMNR_CONTENTS where XID='01000E0078020000';

SQL_REDO
--------------------------------------------------------------------------------
set transaction read write
Unsupported
Unsupported
Unsupported
Unsupported
commit

6 rows selected.

Wherein

alter system dump logfile '/path-to-archived-log-file' scn min XXXXXXXXX scn max YYYYYYYYY;

shows correct values in redo block. Unfortunately, JSON data type is not contained in LogMiner's Supported Data Types and Table Storage Attributes, nor in the Unsupported Data Types and Table Storage Attributes. We are watching the status of this issue.

DDL Support and schema evolution

The following Data Definition Language (DDL) clauses of ALTER TABLE command are currently supported:

1. add column(s)
2. modify column(s)
3. drop column(s)
4. rename column
5. set unused column(s)

To ensure compatibility with Schema Evolution the following algorithm is used:

  1. When oracdc first encounters an operation on a table in the redo logs, information about the table is read from the data dictionary or from the JSON file specified by the a2.dictionary.file parameter. Two schemas are created: immutable key schema with unique identifier [PDB_NAME:]OWNER.TABLE_NAME.Key, version=1 and mutable value schema with unique identifier [PDB_NAME:]OWNER.TABLE_NAME.Value, version=1.
  2. After successful parsing of DDL and comparison of columns definition before and after, value schema gets an incremented version number.

RDBMS errors resiliency and connection retry back-off

oracdc resilent to Oracle database shutdown and/or restart while performing DBMS_LOGMNR.ADD_LOGFILE call or waiting for new archived redo log. ORA-17410 ("No more data read from socket") is intercepted and an attempt to reconnect is made after fixed backoff time specified by parameter a2.connection.backoff

Kafka Connect distributed mode

Fully compatible from 0.9.9.2+, when a2.resiliency.type set to fault-tolerant. `In this case all offset (SCN, RBA, COMMIT_SCN for rows and transactions, versions for table definitions) information is stored only on standard Kafka Connect offsets

Known issues

  1. INTERVAL data types family not supported yet

Performance tips

  1. If you do not use archivelogs as a source of database user activity audit information, consider setting Oracle RDBMS hidden parameter _transaction_auditing to false after consulting a Oracle Support Services
  2. Always try to set up supplemental logging at the table level, and not for all database objects
  3. Proper file system parameters and sizing for path where Chronicle Queue objects resides.
  4. Proper open files hard and soft limits for OS user running oracdc
  5. To determine source of bottleneck set parameter a2.logminer.trace to true and analyze waits at Oracle RDBMS side using data from trace file (tracefile_identifier='oracdc')
  6. For optimizing network transfer consider increase SDU (Ref.: Database Net Services Administrator's Guide, Chapter 14 "Optimizing Performance"). Also review Oracle Support Services Note 2652240.1SDU Ignored By The JDBC Thin Client Connection. Example listener.ora with SDU set:
LISTENER =
(DESCRIPTION_LIST =
  (DESCRIPTION =
    (SDU = 2097152)
    (ADDRESS = (PROTOCOL = IPC)(KEY = EXTPROC1))
    (ADDRESS = (PROTOCOL = TCP)(HOST = 0.0.0.0)(PORT = 1521))
  )
)

Example jdbc connect string with SDU set:

jdbc:oracle:thin:@(description=(sdu=2097152)(address=(protocol=tcp)(host=oratest01)(port=1521))(connect_data=(server=dedicated)(service_name=KAFKA)))

While setting SDU always check live settings using listener trace set to admin level. For example:

cd <LISTENER_TRACE_DIR>
lsnrctl set trc_level admin
grep nsconneg `lsnrctl show trc_file | grep "set to" | awk {'print $6'}`
  1. Use oracdc JMX performance [metrics]((doc/LOGMINER-METRICS.md)
  2. Depending on structure of your data try increasing value of a2.fetch.size parameter (default fetch size - 32 rows)
  3. Although some documents recommend changing setting of _log_read_buffers & _log_read_buffer_size hidden parameters we didn't see serious improvement or degradation using different combinations of these parameters. For more information please read LogMiner Tuning: _log_read_buffers & _log_read_buffer_size
  4. oracdc uses off-heap storage Chronicle Queue developed by the Chronicle Software. To determine required disk space, or size of Docker's tmpfs mounts, or size of k8s's emptyDir needed to store memory allocated files use values of following JMX metrics
MaxTransactionSizeMiB
MaxNumberOfTransInProcessingQueue
GiBWrittenUsingChronicleQueue

When setting the JVM parameters , pay attention to the Linux kernel parameter vm.max_map_count and to JVM parameter -XX:MaxDirectMemorySize. (Ref.: I have issue with memory)

Distribution

  1. GitHub
  2. Confluent Hub
  3. AWS Marketplace

eu.solutions.a2.cdc.oracle.OraCdcSourceConnector

This Source Connector uses Oracle RDBMS materialized view log's as source for data changes and materializes Oracle RDBMS materialized view log at heterogeneous database system. No materialized view should consume information from materialized view log's which are used by oracdc. Unlike eu.solutions.a2.cdc.oracle.OraCdcLogMinerConnector this SourceConnector works with BLOB, and CLOB data types. If you need support for Oracle Database LONG, and/or LONG RAW data types please send us an email at [email protected].

Getting Started

These instructions will get you a copy of the project up and running on any platform with JDK8+ support.

Prerequisites

Before using oracdc please check that required Java8+ is installed with

echo "Checking Java version"
java -version

Installing

Build with

mvn install

Oracle JDBC drivers

oracdc is shipped with Oracle JDBC 21.3.0.0, or you can copy drivers from Oracle RDBMS server

cp $ORACLE_HOME/jdbc/lib/ojdbc8.jar <JDBC directory> 
cp $ORACLE_HOME/ucp/lib/ucp.jar <JDBC directory> 
cp $ORACLE_HOME/jlib/oraclepki.jar <JDBC directory>
cp $ORACLE_HOME/jlib/osdt_core.jar <JDBC directory>
cp $ORACLE_HOME/jlib/osdt_cert.jar <JDBC directory>

or download drivers ojdbc[JAVA-VERSION].jar, ucp.jar, oraclepki.jar, osdt_core.jar, and osdt_cert.jar from JDBC and UCP Downloads page

Oracle Wallet

Please refer to Oracle Database documentation for instructions how to create and manage the Oracle Wallet and tnsnames.ora file. Oracle recommends that you create and manage the Wallet in a database environment using mkstore command

mkstore -wrl $A2_CDC_HOME/wallets/ -create
mkstore -wrl $A2_CDC_HOME/wallets/ -createCredential R1229 SCOTT

Running

Oracle LogMiner as CDC source (eu.solutions.a2.cdc.oracle.OraCdcLogMinerConnector)

The following steps need to be performed in order to prepare the Oracle database so the oracdc Connector can be used.

Enabling Oracle RDBMS ARCHIVELOG mode

Log in to SQL*Plus as SYSDBA and check results of query

select LOG_MODE from V$DATABASE

If the query returns ARCHIVELOG, it is enabled. Skip ahead to Enabling supplemental log data. If the query returns NOARCHIVELOG :

shutdown immediate
startup mount
alter database archivelog;
alter database open;

To verify that ARCHIVELOG has been enabled run again

select LOG_MODE from V$DATABASE

This time it should return ARCHIVELOG

Enabling supplemental logging

Log in to SQL*Plus as SYSDBA, if you like to enable supplemental logging for whole database:

alter database add supplemental log data (ALL) columns;

Alternatively, to enable only for selected tables and minimal supplemental logging, a database-level option (recommended):

alter database add supplemental log data;
alter table <OWNER>.<TABLE_NAME> add supplemental log data (ALL) columns; 

If using Amazon RDS for Oracle please see AWS Amazon Relational Database Service User Guide about rdsadmin.rdsadmin_util.alter_supplemental_logging procedure.

To verify supplemental logging settings at database level:

select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK, SUPPLEMENTAL_LOG_DATA_ALL
from V$DATABASE;

To verify supplemental logging settings at table level:

select LOG_GROUP_NAME, TABLE_NAME, DECODE(ALWAYS, 'ALWAYS', 'Unconditional', NULL, 'Conditional') ALWAYS
from DBA_LOG_GROUPS;

Creating non-privileged Oracle user for running LogMiner

Instructions below are for CDB, for non-CDB (depreciated in 12c, desupported in 21c) you can use role and user names without c## prefix. Log in as sysdba and enter the following commands to create a user with the privileges required for running oracdc with LogMiner as CDC source. For CDB:

create user C##ORACDC identified by ORACDC
  default tablespace SYSAUX
  temporary tablespace TEMP
  quota unlimited on SYSAUX
CONTAINER=ALL;
alter user C##ORACDC SET CONTAINER_DATA=ALL CONTAINER=CURRENT;
grant
  CREATE SESSION,
  SET CONTAINER,
  SELECT ANY TRANSACTION,
  SELECT ANY DICTIONARY,
  EXECUTE_CATALOG_ROLE,
  LOGMINING
to C##ORACDC
CONTAINER=ALL;

For non-CDB or for connection to PDB in RDBMS 19.10+:

create user ORACDC identified by ORACDC
  default tablespace SYSAUX
  temporary tablespace TEMP
  quota unlimited on SYSAUX;
grant
  CREATE SESSION,
  SELECT ANY TRANSACTION,
  SELECT ANY DICTIONARY,
  EXECUTE_CATALOG_ROLE,
  LOGMINING
to ORACDC;

Options for connecting to Oracle Database

In CDB Architecture oracdc must connected to CDB$ROOT, but starting from Oracle Database 19c RU 10 and Oracle Database 21c you can chose to connect either to the CDB$ROOT, or to an individual PDB.

Additional configuration for physical standby database (V$DATABASE.OPEN_MODE = MOUNTED)

Connection to physical standby database when database is opened in MOUNTED mode is possible only for users for SYSDBA privilege. To check for correct user settings log in to SQL*Plus as SYSDBA and connect to physical standby database. To verify that you connected to physical standby database enter

select OPEN_MODE, DATABASE_ROLE, DB_UNIQUE_NAME from V$DATABASE;

it should return MOUNTED PHYSICAL STANDBY Then enter:

select USERNAME from V$PWFILE_USERS where SYSDBA = 'TRUE';

For the user who will be used to connect to physical standby database create a Oracle Wallet. Please refer to section Oracle Wallet above. To run oracdc in this mode parameter a2.standby.activate must set to true.

Additional configuration for distributed database

  1. Copy oracdc-kafka--standalone.jar or a2solutions-oracdc-kafka-.zip to source and target (mining) database servers.
  2. On source database server start eu.solutions.a2.cdc.oracle.utils.file.SourceDatabaseShipmentAgent with --bind-address (IP address or hostname to listen for incoming requests from mining database server agent, default 0.0.0.0) and --port (TCP port to listen for incoming requests from mining database server agent, default 21521) parameters, for instance
java -cp oracdc-kafka-0.9.8-standalone.jar \
    eu.solutions.a2.cdc.oracle.utils.file.SourceDatabaseShipmentAgent \
        --port 21521 \
        --bind-address 192.168.7.101
  1. On target (mining) database server start eu.solutions.a2.cdc.oracle.utils.file.TargetDatabaseShipmentAgent with --bind-address (IP address or hostname to listen for incoming requests from oracdc connector, default 0.0.0.0), --port (TCP port to listen for incoming requests from oracdc connector, default 21521) parameters, --source-host (IP address or hostname of eu.solutions.a2.cdc.oracle.utils.file.SourceDatabaseShipmentAgent), --source-port (TCP port of eu.solutions.a2.cdc.oracle.utils.file.SourceDatabaseShipmentAgent), and --file-destination (existing directory to store redo log files) for instance
java -cp oracdc-kafka-0.9.8-standalone.jar \
    eu.solutions.a2.cdc.oracle.utils.file.TargetDatabaseShipmentAgent
        --port 21521
        --bind-address 192.168.7.102
        --source-host 192.168.7.101
        --source-port 21521
        --file-destination /d00/oradata/archive
  1. Configure oracdc connector with parameter a2.distributed.activate set to true. Set a2.jdbc.url/a2.jdbc.username/a2.jdbc.password or a2.wallet.location/a2.tns.admin/a2.tns.alias/a2.tns.alias parameters to valid values for connecting to source database. Set a2.distributed.wallet.location/a2.distributed.tns.admin/a2.distributed.tns.alias to valid values for connecting to target (mining) database. Set a2.distributed.target.host and a2.distributed.target.port to IP address/hostname and port where eu.solutions.a2.cdc.oracle.utils.file.TargetDatabaseShipmentAgent runs. Example parameter settings is in logminer-source-distributed-db.properties file

Materialized View logs as CDC source (eu.solutions.a2.cdc.oracle.OraCdcSourceConnector)

Create materialized view log's over replicated tables. These materialized view logs must be created with following options - with primary key, and/or with rowid, sequence, excluding new values and without commit scn option. You do not need to specify column list while creating materialized view log for using with oracdc. oracdc reads from materialized view log only primary key value and/or rowid of row in master table. Table below describes how oracdc operates depending on the materialized view log settings

SNAPSHOT LOG WITH Master table access Key for External system
Primary key Primary key Primary key
Primary key and ROWID ROWID Primary key
ROWID ROWID ROWID (String, key name ORA_ROW_ID)
connect scott/tiger
CREATE TABLE DEPT
       (DEPTNO NUMBER(2) CONSTRAINT PK_DEPT PRIMARY KEY,
        DNAME VARCHAR2(14) ,
        LOC VARCHAR2(13) );

CREATE TABLE EMP
       (EMPNO NUMBER(4) CONSTRAINT PK_EMP PRIMARY KEY,
        ENAME VARCHAR2(10),
        JOB VARCHAR2(9),
        MGR NUMBER(4),
        HIREDATE DATE,
        SAL NUMBER(7,2),
        COMM NUMBER(7,2),
        DEPTNO NUMBER(2) CONSTRAINT FK_DEPTNO REFERENCES DEPT);

create materialized view log on DEPT
  with primary key, sequence
  excluding new values;

create materialized view log on EMP
  with primary key, sequence
  excluding new values;

oracdc Connector's parameters

For instructions oracdc Connector's parameters please read KAFKA-CONNECT.md.

Message format

Every message is envelope has two parts: a schema and payload. The schema describes the structure of the payload, while the payload contains the actual data. Message format is similar to format used by Debezium with two important differences:

  1. before part of schema and payload used only for DELETE operation
  2. for DELETE operation schema and payload contain only primary key definition and value due to limit of information from materialized view log
  3. for INSERT and UPDATE only after part of payload contain values due to absense of row previous state in materialized view log's

Message examples

See the MESSAGE-SAMPLES-DBZM-STYLE.md for details

Oracle RDBMS specific information

source structure of payload contains various information about Oracle RDBMS instance from V$INSTANCE and V$DATABASE views. When using materialized view log as CDC source scn field for INSERT and UPDATE operations contains actual ORA_ROWSCN pseudocolumn value for given row in master table, for DELETE operation - ORA_ROWSCN pseudocolumn value for given row in materialized view log.

Oracle RDBMS Type mapping

By default (when a2.oracdc.schemas set to false) oracdc Source connector's is compatible with Confluent JDBC Sink connector and uses datatype mapping below

Oracle RDBMS Type JSON Type Comment
DATE int32 org.apache.kafka.connect.data.Date
TIMESTAMP% int64 org.apache.kafka.connect.data.Timestamp
NUMBER int8 NUMBER(1,0) & NUMBER(2,0)
NUMBER int16 NUMBER(3,0) & NUMBER(4,0)
NUMBER int32 NUMBER(5,0) & NUMBER(6,0) & NUMBER(7,0) & NUMBER(8,0)
NUMBER int64 Other Integers between billion and 1,000,000,000,000,000,000
NUMBER float64 Oracle NUMBER without specified SCALE and PRECISION
NUMBER bytes org.apache.kafka.connect.data.Decimal - all other numerics
FLOAT float64
BINARY_FLOAT float32
BINARY_DOUBLE float64
RAW bytes
BLOB bytes
CHAR string
NCHAR string
VARCHAR2 string
NVARCHAR2 string

When a2.oracdc.schemas set to true oracdc uses its own extensions for Oracle NUMBER (eu.solutions.a2.cdc.oracle.data.OraNumber) and TIMESTAMP WITH [LOCAL] TIMEZONE (eu.solutions.a2.cdc.oracle.data.OraTimestamp) datatypes.

When a2.process.lobs set to true oracdc uses its own extensions for Oracle BLOB (eu.solutions.a2.cdc.oracle.data.OraBlob), CLOB (eu.solutions.a2.cdc.oracle.data.OraClob), NCLOB (eu.solutions.a2.cdc.oracle.data.OraNClob), and SYS.XMLTYPE (eu.solutions.a2.cdc.oracle.data.OraXmlBinary) datatypes.

Built With

  • Maven - Dependency Management

TODO

  • better resilience to RDBMS errors
  • oracdc as audit information source

Version history

####0.9.0 (OCT-2019)

Initial release

####0.9.1 (NOV-2019)

Oracle Wallet support for storing database credentials

####0.9.2 (DEC-2019)

"with ROWID" materialized view log support

####0.9.3 (FEB-2020)

Oracle Log Miner as CDC source Removed AWS Kinesis support New class hierarchy

#####0.9.3.1 (FEB-2020)

Removing dynamic invocation of Oracle JDBC. Ref.: Oracle Database client libraries for Java now on Maven Central

####0.9.4 (MAR-2020)

Ability to run Oracle Log Miner on the physical database when V$DATABASE.OPEN_MODE = MOUNTED to reduce TCO

#####0.9.4.1 (MAR-2020)

Persistence across restarts CDB fixes/20c readiness

####0.9.5 (APR-2020)

Schema Editor GUI preview (java -cp <> eu.solutions.a2.cdc.oracle.schema.TableSchemaEditor). This GUI required for more precise mapping between Oracle and Kafka Connect datatypes. See also a2.dictionary.file parameter

####0.9.6 (MAY-2020)

Initial data load support. See also a2.initial.load parameter

#####0.9.6.1 (MAY-2020)

Partitioned tables support

#####0.9.6.2 (AUG-2020)

Oracle NUMBER datatype mapping fixes

#####0.9.6.3 (AUG-2020)

Kafka topic name configuration using a2.topic.name.style & a2.topic.name.delimiter parameters

#####0.9.6.4 (SEP-2020)

Dynamic list of tables to mine using a2.table.list.style parameter

####0.9.7 (OCT-2020)

LOB support. See also a2.process.lobs parameter

#####0.9.7.1 (OCT-2020)

Support for NCLOB

#####0.9.7.2 (NOV-2020)

VIP (Verified Integration Program) compliance

#####0.9.7.3 (NOV-2020)

fix CDB column type detection issue

#####0.9.7.4 (NOV-2020)

Important fixes for CDB

#####0.9.7.5 (JAN-2021)

fix Confluent Control Center 6.0+ sync issue with UCP PoolDataSource

#####0.9.7.6 (FEB-2021)

Protobuf Schema compatibility

#####0.9.7.7 (APR-2021)

Add more information about source record (XID, ROWID, and COMMIT_SCN)

#####0.9.7.8 (MAY-2021)

MAY-21 features/fixes (fix ORA-2396, add lag to JMX metrics, add feth size parameter)

####0.9.8 (JUL-2021)

Distributed database configuration

#####0.9.8.1 (AUG-2021)

RDBMS 21c compatibility

#####0.9.8.2 (OCT-2021)

SYS.XMLTYPE support and fixes for partitioned tables with BLOB/CLOB/NCLOB columns

#####0.9.8.3 (NOV-2021)

Large objects (BLOB/CLOB/NCLOB/XMLTYPE) transformation in source connector

#####0.9.8.4 (NOV-2021)

CDB: support connection to CDB$ROOT or to an individual PDB

####0.9.9 (JAN-2022)

DDL operations support for LogMiner source

#####0.9.9.1 (JAN-2022)

ORA-1291 fixes and new non-static connection pool for LogMiner connector

#####0.9.9.2 (FEB-2022)

a2.resiliency.type = fault-tolerant to ensure 100% compatibility with Kafka Connect distributed mode

Authors

License

This project is licensed under the Apache License - see the LICENSE file for details

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].