Integrating Oracle Database with Debezium and Kafka Connect for CDC (Change Data Capture)
Introduction:
Change Data Capture (CDC) is a pivotal component in modern data architectures, enabling real-time data synchronization and analysis across different systems. Oracle Database, being a cornerstone of many enterprises, often requires seamless integration with CDC solutions. In this article, we will delve into the integration of Oracle Database with Debezium and Kafka Connect, two popular open-source tools that facilitate CDC.
Why do Debezium and Kafka connect?
Debezium and Kafka Connect offer a robust and scalable solution for the CDC. Debezium captures row-level changes in databases and streams them to Apache Kafka topics, while Kafka Connect provides connectors to easily integrate data sources and sinks with Kafka. This combination allows for real-time data propagation and enables various downstream applications such as analytics, monitoring, and data warehousing.
Prerequisites:
Before proceeding, ensure you have Docker installed on your system
- Oracle Database installed and configured.
- Apache Kafka and Apache ZooKeeper installed and configured.
- Debezium and Kafka Connect installed and configured.
- Appropriate permissions and access to Oracle Database and Kafka infrastructure.
Github Repo Link:
https://github.com/royalihasan/dockerized-setup-kafka-connect-oracle-debezium-stack/tree/master
How to set up Oracle in Docker
Firstly, we need to make an account on the Oracle website.
Oracle Container Registry Explore more:
Step 1:Login to the Oracle Container Registry:
Run the following command to log in to the Oracle Container Registry. You will be prompted to enter your Oracle Cloud credentials (username and password):
docker login container-registry.oracle.com
Step 2:Pull the Oracle Image:
docker pull container-registry.oracle.com/database/enterprise:latest
Docker Compose File
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
container_name: schema-registry
ports:
- 8081:8081
depends_on:
- zookeeper
- kafka
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
debezium:
image: debezium/connect:1.9
container_name: connect
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
LD_LIBRARY_PATH: '/kafka/external_libs/instantclient_19_6/'
depends_on: [ kafka , zookeeper,schema-registry ]
ports:
- 8083:8083
# Source DB's
oracle:
image: container-registry.oracle.com/database/enterprise:latest # 21C
container_name: oracle
environment:
- ORACLE_PWD=top_secret
ports:
- 1521:1521
# Sinks DB's
postgres:
# *-----------------------------*
# To connect to the DB:
# docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
# *-----------------------------*
image: postgres:latest
container_name: postgres
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- 5432:5432
1. Prepare a Database for the CDC
Step 1: Create a recovery area folder
Now you need to access the Oracle bash shell
docker exec -it oracle bash -c 'sleep 1;
Create a recovery_area Directory
cd /opt/oracle/oradata
mkdir -p recovery_area
Step2: Setup the Logminer
Run the following script from the Oracle Docker container.
curl https://raw.githubusercontent.com/royalihasan/dockerized-setup-kafka-connect-oracle-debezium-stack/master/src/main/resources/01_db_setup_scripts/01_logminer-setup.sh | sh
Step3: Create a Sample database
Paste this command in the Oracle Bash shell
curl https://raw.githubusercontent.com/royalihasan/dockerized-setup-kafka-connect-oracle-debezium-stack/master/src/main/resources/01_db_setup_scripts/inventory.sql | sqlplus debezium/dbz@//localhost:1521/orclpdb1
Step 4: Confirm that the tables were created successfully.
a. Start sqlplus prompt
docker exec -it oracle bash -c 'sleep 1; sqlplus Debezium/dbz@localhost:1521/orclpdb1'
b. USE Select Statement on Tables we created
SELECT *
FROM CUSTOMERS;
2. Prepare Debizium Connect for CDC
Step1: Install the Required Drivers
First you Need to access the bash terminal of Oracle
Change the Directiry to : cd libs
Step2: Now do Some Curls or pull the jars (drivers)
curl https://maven.xwiki.org/externals/com/oracle/jdbc/ojdbc8/12.2.0.1/ojdbc8-12.2.0.1.jar -o ojdbc8-12.2.0.1.jar
Step4:Now Restart the Debizium Connector
Note: Use Docker to Restart the connector Service
Hurrah, all configurations are done!
3. Create a Oracle Source Connector for Testing
Generate a POST request on:
http://localhost:8083/connectors
By using this Payload
{
"name": "oracle-customer-source-connector-00",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"database.hostname": "oracle",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.server.name": "test",
"database.history.kafka.topic": "history",
"database.dbname": "ORCLCDB",
"database.connection.adapter": "LogMiner",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"table.include.list": "DEBEZIUM.CUSTOMERS",
"database.schema": "DEBEZIUM",
"database.pdb.name": "ORCLPDB1",
"snapshot.mode": "schema_only",
"include.schema.changes": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Now check the Connector is Working Fine
By Using this in you BASH
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
The Output Should be like this
source | oracle-customer-source-connector-00 | RUNNING | RUNNING | io.debezium.connector.oracle.OracleConnector
Step2: Listen to the Topic and Read Message messages
Access You Terminal and paste this command
docker run --tty --network resources_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r http:/schema-registry:8081 -t test.DEBEZIUM.CUSTOMERS
Note: resources_default is the network , you can replace with you network name
4. To Verify the CDC in Working Fine
Step1: Access the BASH of Oracle DB
docker exec -it oracle bash -c 'sleep 1; sqlplus Debezium/dbz@localhost:1521/orclpdb1'
Step2: Enable Auto Commit by using this query
SET AUTOCOMMIT ON;
Step3: Perform Some Changes
INSERT:
INSERT INTO CUSTOMERS
VALUES (NULL, 'Peter', 'Parker', 'peter.parker@marvel.com');
UPDATE:
UPDATE CUSTOMERS
SET email = 'new_email@gmail.com'
WHERE id = 1041
DELETE:
DELETE
FROM CUSTOMERS
WHERE id = 1024;
Now check the changes in the terminal where we are listening ou TOPIC
5. Create a Postgres Connector as a Sink
Note: Your Postgres should be UP and RUNNING
Step1: Create a Sink request using this payload
POST on this URL : http://localhost:8083/connectors
{
"name": "jdbc-postgres-sink-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.username": "postgres",
"connection.password": "postgres",
"insert.mode": "upsert",
"delete.enabled": "true",
"auto.create": "true",
"primary.key.mode": "record_key",
"schema.evolution": "basic",
"database.time_zone": "UTC",
"topics": "test.DEBEZIUM.CUSTOMERS",
"table.name.format": "customers"
}
}
Step2: Check the connector is Working Fine
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
The Output Should be like this
source | oracle-customer-source-connector-00 | RUNNING | RUNNING | io.debezium.connector.oracle.OracleConnector
sink | jdbc-postgres-sink-connector | RUNNING | RUNNING | io.debezium.connector.oracle.OracleConnector
Now check the Changes in Postgres DB
1. First You need to access the Postgres BASH and Login into it
psql -U postgres
2. Check the Tables in DB by using \td
SELECT *
FROM customers;
There will be a same Data which was in Oracle Db
Hurrah, Congrats You have Done!