🌊Change Data Capture (CDC) with Docker, Apache Kafka, Debezium, and Apache Spark Streaming
Introduction
Change Data Capture (CDC) is a pivotal technique in data engineering, facilitating real-time data integration and analysis. In this article, we’ll delve into setting up a CDC pipeline using Docker Compose, Apache Kafka, Debezium, and Apache Spark Streaming. The architecture involves capturing changes from a MySQL database, processing them with Debezium, publishing to Kafka, and finally reading the data from Kafka using Spark Streaming for further analysis.
Components Overview
- MySQL: The source database where changes are made and tracked.
- Debezium: Responsible for capturing changes from MySQL and publishing them to Kafka topics.
- Apache Kafka: Acts as a distributed messaging system for handling the CDC events.
- Kafka UI : UI for Apache Kafka is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance
- Zookeeper: Used by Kafka for coordination and synchronization.
- Apache Spark: Utilized for real-time processing of data streams from Kafka.
Docker Compose Setup
The Docker Compose file provided orchestrates the deployment of all necessary services:
- MySQL: Configured with sample credentials for demonstration purposes.
- Debezium Connect: Set up to capture changes from MySQL and publish them to Kafka.
- Kafka: Serves as the messaging backbone for CDC events.
- Zookeeper: Ensures coordination and synchronization for Kafka.
- Kafka UI : Added to show visually the message in kafka brocker
- Apache Spark: Added to the architecture for real-time processing of data streams from Kafka.
in this example the file name is docker-compose-spark-kafka-mysql.yaml
version: '3'
services:
# ----------------- #
# Apache Spark #
# ----------------- #
spark:
container_name: spark-master
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=master
ports:
- '8080:8080'
- '4040:4040'
- '7077:7077'
volumes:
- ./data:/data
- ./src:/src
spark-worker:
container_name: spark-worker
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_EXECUTOR_MEMORY=4G
- SPARK_WORKER_CORES=4
volumes:
- ./data:/data
- ./src:/src
# ----------------- #
# Apache Zookeeper #
# ----------------- #
zookeeper:
container_name: zookeeper
image: quay.io/debezium/zookeeper:2.0
ports:
- 2181:2181
- 2888:2888
- 3888:3888
# ----------------- #
# apache Kafka #
# ----------------- #
kafka:
container_name: kafka
image: quay.io/debezium/kafka:2.0
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
# ----------------- #
# Kafka UI #
# ----------------- #
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 10000:10000
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
volumes:
- ./kui/config.yml:/etc/kafkaui/dynamic_config.yaml
# ----------------- #
# Mysql #
# ----------------- #
mysql:
container_name: mysql
image: quay.io/debezium/example-mysql:2.0
ports:
- 3307:3307
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
# ----------------- #
# Debezium #
# ----------------- #
connect:
container_name: debezium
image: quay.io/debezium/connect:2.0
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
Architecture
- MySQL → CDC → Debezium → Kafka (Zookeeper) → Apache Spark Streaming
- MySQL: The source database where changes occur.
- Change Data Capture (CDC): Utilizes Debezium to capture and monitor database changes.
- Debezium: Captures the changes in MySQL and publishes them to Kafka topics.
- Kafka: Acts as the distributed messaging system where the CDC events are stored and distributed to consumers.
- Zookeeper: Coordinates and synchronizes Kafka brokers.
- Kafka UI : UI for Apache Kafka is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance
- Apache Spark Streaming: Consumes data streams from Kafka for real-time processing and analysis.
Implementation and Testing
Run Docker Compose by using the following commands
docker-compose -f docker-compose-spark-kafka-mysql.yaml up -d
expected result will be like :
Double check in Docker Desktop
Lets move to setup database by using root user (MySQL root user)
docker-compose -f docker-compose-spark-kafka-mysql.yaml exec mysql bash -c 'mysql -u root -pdebezium'
After that, we check existed users by running this command
SELECT user,host FROM mysql.user;
Create Database name cdc
In another Terminal (or in the same but click CTRL+D → Bye 😁), you will access to MySQL by using another user
docker-compose -f docker-compose-spark-kafka-mysql.yaml exec mysql bash -c 'mysql -u debezium -pdbz'
After that you will create table
CREATE TABLE cdc.example(
customerId int,
customerFName varchar(255),
customerLName varchar(255),
customerCity varchar(255)
);
USE cdc;
SHOW tables;
Create Debezium Connector :
Add in conf folder the config json like shown below
{"name": "data-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"topic.prefix": "dbserver1",
"database.server.id": "184054",
"database.include.list": "data",
"debezium.sink.kafka.topic": "debezium-mysql-events",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.data"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @./conf/register-mysql.json
After Running this command in terminal your will have something like this :
Configure Debezium to monitor MySQL and publish to Kafka topics.
docker-compose -f docker-compose-spark-kafka-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.cdc.example
Then you update one record in the database by running this command
UPDATE example SET customerCity = 'Casablanca' WHERE customerId = 2;
You can use https://jsoncrack.com/editor to check the change happen for this field
Move on to consume this Data using Spark (PySpark)😉
in your src folder add consumer_pyspark_code_.py like shown below
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create Spark Session
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Define Kafka source
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "dbserver1.data.example") \
.option("startingOffsets", "earliest") \
.load()
# Decode the key and value columns from bytes to strings
df = df.withColumn("key", col("key").cast("string"))
df = df.withColumn("value", col("value").cast("string"))
# Start processing the stream
query = df \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()
query.awaitTermination()
after you can run spark submit command to call this job
docker exec -it {use_your_spark_docker_id_container} bash -c 'spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/real_time_pipeline.py'
Kafka UI :
Add the config file in the kui subfolder as shown below
server:
port: 10000
kafka:
clusters:
-
name: kafka
bootstrapServers: kafka:9092
To access to the Kafka UI : http://localhost:10000/
We can see also the consumers, as shown below :
Github repo
for all project code please visit
https://github.com/abdelbarre/cdc_mysql_debezium_spark_delta
Conclusion
By integrating Apache Spark Streaming into the CDC pipeline, organizations can leverage real-time data processing capabilities to extract valuable insights from streaming data. This architecture, coupled with Docker Compose for easy deployment and management, provides a scalable and efficient solution for real-time data integration and analysis.
!! Happy Learning !! ✌️