Change Data Capture (CDC) - MySQL
This post briefly documents the process of Change Data Capture (CDC) with MySQL.
Introduction
Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real-time to a downstream process or system.
Minimum Software Requirements
- Docker (MySQL, Zookeeper, Kafka, Debezium, schema-registry, kafka-ui, ksqldb-server, ksqldb-cli)
- Docker Compose
- MySQL database.
- MySQL Workbench or on any other MySQL database client/console.
mysql.cnf MySQL database configuration
[mysqld]
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
Sample Project Setup
- Clone the repository
$ git clone https://github.com/AnanthaRajuC/Streaming_ETL_pipeline_MySQL.git
$ cd Streaming_ETL_pipeline_MySQL
Running the application via docker compose
- Pull all required docker images
$ docker compose -f docker-compose.yaml pull
- Start up the environment
The first time that you do this, the Docker images will be pulled down from the remote server. This may take a while!
$ docker compose -f docker-compose.yaml up
Creating network "streaming_etl_pipeline_mysql_webproxy" with driver "bridge"
Creating zookeeper ... done
Creating kafka ... done
Creating debezium ... done
Creating cp-schema-registry ... done
Creating kafka-connect-01 ... done
Creating ksqldb-server ... done
Creating ksqldb-cli ... done
Accessing Kafka Topics via Kakfka-UI
- Optionally, start Kafka UI, an open-source web UI for Apache Kafka Management
$ docker run --name=kafka-ui --network=streaming_etl_pipeline_mysql_webproxy -p 8080:8080 -e KAFKA_CLUSTERS_0_NAME=local -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 -d provectuslabs/kafka-ui:latest
URL to access Kakfka-UI console: http://localhost:8080
- Make sure everything is up and running
$ docker ps
IMPORTANT: If any components do not show "Up" under the Status
column (e.g., they say "Exit") then you must rectify this before continuing.
As a first solution, try re-issuing the docker-compose up -d
command.
Accessing ksqlDb via ksqldb-cli
- Launch the KSQL CLI in another terminal window.
$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Tear down the stack
$ docker stop kafka-ui
$ docker rm kafka-ui
$ docker compose -f docker-compose.yaml down
Stopping ksqldb-cli ... done
Stopping ksqldb-server ... done
Stopping kafka-connect-01 ... done
Stopping debezium ... done
Stopping kafka ... done
Stopping zookeeper ... done
Removing ksqldb-cli ... done
Removing ksqldb-server ... done
Removing kafka-connect-01 ... done
Removing cp-schema-registry ... done
Removing debezium ... done
Removing kafka ... done
Removing zookeeper ... done
Removing network streaming_etl_pipeline_mysql_webproxy
If you want to preserve the state of all containers, run docker-compose stop
instead.
Initial MySQL preparation
Initial Data setup.
MySQL
- Declare schema, user and permissions.
-- create schema
CREATE SCHEMA streaming_etl_db;
-- use schema
USE streaming_etl_db;
-- Create user
CREATE USER 'debezium' IDENTIFIED WITH mysql_native_password BY 'Debezium@123#';
-- Grant privileges to user
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';
-- Reload the grant tables in the mysql database enabling the changes to take effect without reloading or restarting mysql service
FLUSH PRIVILEGES;
- Declare Tables
CREATE TABLE `geo` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'Unique ID for each entry.',
`uuid` VARCHAR(50) DEFAULT (uuid()),
`created_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Field representing the date the entity containing the field was created.',
`last_modified_date_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ,
`lat` varchar(255) DEFAULT NULL,
`lng` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='Application Log.';
CREATE TABLE `address` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'Unique ID for each entry.',
`uuid` VARCHAR(50) DEFAULT (uuid()),
`created_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Field representing the date the entity containing the field was created.',
`last_modified_date_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ,
`city` varchar(255) DEFAULT NULL,
`zipcode` varchar(255) DEFAULT NULL,
`state` varchar(255) DEFAULT NULL,
`geo_id` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `FK_geo_id` (`geo_id`),
CONSTRAINT `FKC_geo_id` FOREIGN KEY (`geo_id`) REFERENCES `geo` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `person` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'Unique ID for each entry.',
`uuid` VARCHAR(50) DEFAULT (uuid()),
`created_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Field representing the date the entity containing the field was created.',
`last_modified_date_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ,
`first_name` varchar(255) NOT NULL,
`last_name` varchar(255) DEFAULT NULL,
`email` varchar(255) DEFAULT NULL,
`gender` varchar(255) DEFAULT NULL,
`registration` datetime DEFAULT NULL,
`age` int DEFAULT NULL,
`address_id` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `FK_address_id` (`address_id`),
CONSTRAINT `FKC_address_id` FOREIGN KEY (`address_id`) REFERENCES `address` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
- Sample Data
INSERT INTO `streaming_etl_db`.`geo`(`lat`,`lng`)VALUES('la14','lo14');
INSERT INTO `streaming_etl_db`.`address`(`city`,`zipcode`,`state`,`geo_id`)VALUES('c14','z14','s14',1);
INSERT INTO `streaming_etl_db`.`person`(`first_name`,`last_name`,`email`,`gender`,`registration`,`age`,`address_id`)VALUES('fn14','ln14','example@domain.com','M',now(),34,1);
- Select Statement
SELECT *
FROM streaming_etl_db.person p
LEFT JOIN streaming_etl_db.address a on a.id = p.address_id
LEFT JOIN streaming_etl_db.geo g on g.id = a.geo_id;
SELECT * FROM streaming_etl_db.person;
SELECT * FROM streaming_etl_db.address;
Debezium Registration
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ -d '{
"name": "streaming_ETL_pipeline_MySQL-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "172.17.0.1",
"database.port": "3306",
"database.user": "debezium",
"database.password": "Debezium@123#",
"database.server.name": "mysql",
"database.server.id": "223344",
"database.include.list": "streaming_etl_db",
"database.allowPublicKeyRetrieval": true,
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "mysql-streaming_etl_db-person",
"time.precision.mode": "connect",
"include.schema.changes": false,
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"asgard.demo.(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
After Debezium registration.
http://localhost:8083/connectors?expand=info&expand=status
http://localhost:8083/connectors/streaming_ETL_pipeline_MySQL-connector/status
Kafka UI
Person Topic
Accessing ksqlDb via ksqldb-cli
ksqlDB
- Check topics, streams and tables
show topics;
show streams;
show tables;
Streams
- Declare Streams
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM PERSON_STREAM (id bigint,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,name VARCHAR,username VARCHAR,address_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.person',VALUE_FORMAT='JSON');
CREATE STREAM ADDRESS_STREAM (id bigint,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,city VARCHAR,street VARCHAR,suite VARCHAR,zipcode VARCHAR,geo_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.address',VALUE_FORMAT='JSON');
SELECT * FROM PERSON_STREAM EMIT CHANGES LIMIT 1;
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|ID |UUID |CREATED_DATE_TIME |LAST_MODIFIED_DATE_TIME |NAME |USERNAME |ADDRESS_ID |
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|1 |ce8d2120-1f93-11ed-8647-0|2022-08-19T13:22:00.000 |2022-08-19T13:22:00.000 |d14 |dbz14 |1 |
| |c9a3cfadc50 | | | | | |
Limit Reached
Query terminated
DESCRIBE PERSON_STREAM;
select * from PERSON_STREAM;
- stream-stream join
CREATE STREAM PERSON_ADDRESS_ENRICHED_STREAM WITH (FORMAT='JSON', KAFKA_TOPIC='person_address_enriched', PARTITIONS=1, REPLICAS=1) AS
SELECT
P.ID P_ID,
A.ID A_ID,
P.NAME NAME,
A.CITY CITY
FROM PERSON_STREAM P
LEFT OUTER JOIN ADDRESS_STREAM A WITHIN 1 HOURS GRACE PERIOD 30 MINUTES ON ((A.ID = P.ADDRESS_ID))
EMIT CHANGES;
Kafka Sink MySQL DB
CREATE SINK CONNECTOR SINK_PERSON_ADDRESS_ENRICHED_STREAM WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:mysql://172.17.0.1:3306/',
'connection.user' = 'debezium',
'connection.password' = 'Debezium@123#',
'topics' = 'PERSON_ADDRESS_ENRICHED_STREAM',
'key.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable' = 'false',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false'
);
Accessing ksqlDb via ksqldb-cli
ksqlDB
- Check topics, streams and tables
show topics;
show streams;
show tables;
Tables
- Declare Tables
CREATE TABLE PERSON (id bigint PRIMARY KEY,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,name VARCHAR,username VARCHAR,address_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.person',VALUE_FORMAT='JSON');
CREATE TABLE ADDRESS (id bigint PRIMARY KEY,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,city VARCHAR,street VARCHAR,suite VARCHAR,zipcode VARCHAR,geo_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.address',VALUE_FORMAT='JSON');
SELECT * FROM PERSON EMIT CHANGES LIMIT 1;
SELECT * FROM ADDRESS EMIT CHANGES LIMIT 1;
Joins
SELECT
P.NAME,
A.CITY
FROM PERSON P
LEFT JOIN ADDRESS A on A.id = P.address_id
EMIT CHANGES
LIMIT 1;
SELECT
P.NAME,
A.CITY
FROM PERSON P
INNER JOIN ADDRESS A
ON A.id = P.address_id
EMIT CHANGES
LIMIT 1;
CREATE TABLE PERSON_ADDRESS_ENRICHED (P_ID bigint,A_ID bigint,NAME VARCHAR,CITY VARCHAR) WITH (KAFKA_TOPIC='person_address_enriched',VALUE_FORMAT='JSON');
- Others
DROP TABLE IF EXISTS PERSON;