Integrating Oracle Database with Debezium and Kafka Connect for CDC (Change Data Capture)

Roy Ali Hasan
4 min readFeb 23, 2024

--

Introduction:

Change Data Capture (CDC) is a pivotal component in modern data architectures, enabling real-time data synchronization and analysis across different systems. Oracle Database, being a cornerstone of many enterprises, often requires seamless integration with CDC solutions. In this article, we will delve into the integration of Oracle Database with Debezium and Kafka Connect, two popular open-source tools that facilitate CDC.

Why do Debezium and Kafka connect?

Debezium and Kafka Connect offer a robust and scalable solution for the CDC. Debezium captures row-level changes in databases and streams them to Apache Kafka topics, while Kafka Connect provides connectors to easily integrate data sources and sinks with Kafka. This combination allows for real-time data propagation and enables various downstream applications such as analytics, monitoring, and data warehousing.

Prerequisites:

Before proceeding, ensure you have Docker installed on your system

  1. Oracle Database installed and configured.
  2. Apache Kafka and Apache ZooKeeper installed and configured.
  3. Debezium and Kafka Connect installed and configured.
  4. Appropriate permissions and access to Oracle Database and Kafka infrastructure.

Github Repo Link:

https://github.com/royalihasan/dockerized-setup-kafka-connect-oracle-debezium-stack/tree/master

How to set up Oracle in Docker

Firstly, we need to make an account on the Oracle website.

Oracle Container Registry Explore more:

https://container-registry.oracle.com/ords/f?p=113:1:12072228597753:::1:P1_BUSINESS_AREA:3&cs=3DJqjikERfD7UMHbPVeK9JupLb8ZJGXhY5Ra1QnGxoSGflLIn9kpnfZAchrg1tDZS6Sy3xJ7WZmc5-ickXjiZFA

Step 1:Login to the Oracle Container Registry:

Run the following command to log in to the Oracle Container Registry. You will be prompted to enter your Oracle Cloud credentials (username and password):

docker login container-registry.oracle.com

Step 2:Pull the Oracle Image:

docker pull container-registry.oracle.com/database/enterprise:latest

Docker Compose File

version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
container_name: schema-registry
ports:
- 8081:8081
depends_on:
- zookeeper
- kafka
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
debezium:
image: debezium/connect:1.9
container_name: connect
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
LD_LIBRARY_PATH: '/kafka/external_libs/instantclient_19_6/'
depends_on: [ kafka , zookeeper,schema-registry ]
ports:
- 8083:8083
# Source DB's
oracle:
image: container-registry.oracle.com/database/enterprise:latest # 21C
container_name: oracle
environment:
- ORACLE_PWD=top_secret
ports:
- 1521:1521


# Sinks DB's
postgres:
# *-----------------------------*
# To connect to the DB:
# docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
# *-----------------------------*
image: postgres:latest
container_name: postgres
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- 5432:5432

1. Prepare a Database for the CDC

Step 1: Create a recovery area folder

Now you need to access the Oracle bash shell

docker exec -it oracle bash -c 'sleep 1;

Create a recovery_area Directory

cd /opt/oracle/oradata
mkdir -p recovery_area

Step2: Setup the Logminer

Run the following script from the Oracle Docker container.

curl https://raw.githubusercontent.com/royalihasan/dockerized-setup-kafka-connect-oracle-debezium-stack/master/src/main/resources/01_db_setup_scripts/01_logminer-setup.sh | sh

Step3: Create a Sample database

Paste this command in the Oracle Bash shell

curl https://raw.githubusercontent.com/royalihasan/dockerized-setup-kafka-connect-oracle-debezium-stack/master/src/main/resources/01_db_setup_scripts/inventory.sql | sqlplus debezium/dbz@//localhost:1521/orclpdb1

Step 4: Confirm that the tables were created successfully.

a. Start sqlplus prompt

docker exec -it oracle bash -c 'sleep 1; sqlplus Debezium/dbz@localhost:1521/orclpdb1'

b. USE Select Statement on Tables we created

SELECT *
FROM CUSTOMERS;

2. Prepare Debizium Connect for CDC

Step1: Install the Required Drivers

First you Need to access the bash terminal of Oracle

Change the Directiry to : cd libs

Step2: Now do Some Curls or pull the jars (drivers)

curl https://maven.xwiki.org/externals/com/oracle/jdbc/ojdbc8/12.2.0.1/ojdbc8-12.2.0.1.jar -o ojdbc8-12.2.0.1.jar

Step4:Now Restart the Debizium Connector

Note: Use Docker to Restart the connector Service

Hurrah, all configurations are done!

3. Create a Oracle Source Connector for Testing

Generate a POST request on: http://localhost:8083/connectors

By using this Payload

{
"name": "oracle-customer-source-connector-00",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"database.hostname": "oracle",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.server.name": "test",
"database.history.kafka.topic": "history",
"database.dbname": "ORCLCDB",
"database.connection.adapter": "LogMiner",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"table.include.list": "DEBEZIUM.CUSTOMERS",
"database.schema": "DEBEZIUM",
"database.pdb.name": "ORCLPDB1",
"snapshot.mode": "schema_only",
"include.schema.changes": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}

Now check the Connector is Working Fine

By Using this in you BASH

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort

The Output Should be like this

source | oracle-customer-source-connector-00 | RUNNING | RUNNING | io.debezium.connector.oracle.OracleConnector

Step2: Listen to the Topic and Read Message messages

Access You Terminal and paste this command

docker run --tty --network resources_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r http:/schema-registry:8081 -t test.DEBEZIUM.CUSTOMERS

Note: resources_default is the network , you can replace with you network name

4. To Verify the CDC in Working Fine

Step1: Access the BASH of Oracle DB

docker exec -it oracle bash -c 'sleep 1; sqlplus Debezium/dbz@localhost:1521/orclpdb1'

Step2: Enable Auto Commit by using this query

SET AUTOCOMMIT ON;

Step3: Perform Some Changes

INSERT:

INSERT INTO CUSTOMERS
VALUES (NULL, 'Peter', 'Parker', 'peter.parker@marvel.com');

UPDATE:

UPDATE CUSTOMERS
SET email = 'new_email@gmail.com'
WHERE id = 1041

DELETE:

DELETE
FROM CUSTOMERS
WHERE id = 1024;

Now check the changes in the terminal where we are listening ou TOPIC

5. Create a Postgres Connector as a Sink

Note: Your Postgres should be UP and RUNNING

Step1: Create a Sink request using this payload

POST on this URL : http://localhost:8083/connectors

{
"name": "jdbc-postgres-sink-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.username": "postgres",
"connection.password": "postgres",
"insert.mode": "upsert",
"delete.enabled": "true",
"auto.create": "true",
"primary.key.mode": "record_key",
"schema.evolution": "basic",
"database.time_zone": "UTC",
"topics": "test.DEBEZIUM.CUSTOMERS",
"table.name.format": "customers"
}
}

Step2: Check the connector is Working Fine

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort

The Output Should be like this

source | oracle-customer-source-connector-00 | RUNNING | RUNNING | io.debezium.connector.oracle.OracleConnector
sink | jdbc-postgres-sink-connector | RUNNING | RUNNING | io.debezium.connector.oracle.OracleConnector

Now check the Changes in Postgres DB

1. First You need to access the Postgres BASH and Login into it

psql -U postgres

2. Check the Tables in DB by using \td

SELECT *
FROM customers;

There will be a same Data which was in Oracle Db

Hurrah, Congrats You have Done!

--

--

Roy Ali Hasan
Roy Ali Hasan

Responses (1)