🌊Change Data Capture (CDC) with Docker, Apache Kafka, Debezium, and Apache Spark Streaming

Abdelbarre Chafik
Towards Dev
Published in
6 min readApr 17, 2024

--

Introduction

Change Data Capture (CDC) is a pivotal technique in data engineering, facilitating real-time data integration and analysis. In this article, we’ll delve into setting up a CDC pipeline using Docker Compose, Apache Kafka, Debezium, and Apache Spark Streaming. The architecture involves capturing changes from a MySQL database, processing them with Debezium, publishing to Kafka, and finally reading the data from Kafka using Spark Streaming for further analysis.

Components Overview

  • MySQL: The source database where changes are made and tracked.
  • Debezium: Responsible for capturing changes from MySQL and publishing them to Kafka topics.
  • Apache Kafka: Acts as a distributed messaging system for handling the CDC events.
  • Kafka UI : UI for Apache Kafka is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance
  • Zookeeper: Used by Kafka for coordination and synchronization.
  • Apache Spark: Utilized for real-time processing of data streams from Kafka.

Docker Compose Setup

The Docker Compose file provided orchestrates the deployment of all necessary services:

  • MySQL: Configured with sample credentials for demonstration purposes.
  • Debezium Connect: Set up to capture changes from MySQL and publish them to Kafka.
  • Kafka: Serves as the messaging backbone for CDC events.
  • Zookeeper: Ensures coordination and synchronization for Kafka.
  • Kafka UI : Added to show visually the message in kafka brocker
  • Apache Spark: Added to the architecture for real-time processing of data streams from Kafka.

in this example the file name is docker-compose-spark-kafka-mysql.yaml

version: '3'
services:

# ----------------- #
# Apache Spark #
# ----------------- #
spark:
container_name: spark-master
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=master
ports:
- '8080:8080'
- '4040:4040'
- '7077:7077'
volumes:
- ./data:/data
- ./src:/src
spark-worker:
container_name: spark-worker
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_EXECUTOR_MEMORY=4G
- SPARK_WORKER_CORES=4
volumes:
- ./data:/data
- ./src:/src
# ----------------- #
# Apache Zookeeper #
# ----------------- #
zookeeper:
container_name: zookeeper
image: quay.io/debezium/zookeeper:2.0
ports:
- 2181:2181
- 2888:2888
- 3888:3888

# ----------------- #
# apache Kafka #
# ----------------- #
kafka:
container_name: kafka
image: quay.io/debezium/kafka:2.0
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181

# ----------------- #
# Kafka UI #
# ----------------- #
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 10000:10000
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
volumes:
- ./kui/config.yml:/etc/kafkaui/dynamic_config.yaml
# ----------------- #
# Mysql #
# ----------------- #
mysql:
container_name: mysql
image: quay.io/debezium/example-mysql:2.0
ports:
- 3307:3307
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
# ----------------- #
# Debezium #
# ----------------- #
connect:
container_name: debezium
image: quay.io/debezium/connect:2.0
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses

Architecture

  • MySQL → CDC → Debezium → Kafka (Zookeeper) → Apache Spark Streaming
  • MySQL: The source database where changes occur.
  • Change Data Capture (CDC): Utilizes Debezium to capture and monitor database changes.
  • Debezium: Captures the changes in MySQL and publishes them to Kafka topics.
  • Kafka: Acts as the distributed messaging system where the CDC events are stored and distributed to consumers.
  • Zookeeper: Coordinates and synchronizes Kafka brokers.
  • Kafka UI : UI for Apache Kafka is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance
  • Apache Spark Streaming: Consumes data streams from Kafka for real-time processing and analysis.

Implementation and Testing

Run Docker Compose by using the following commands

docker-compose -f docker-compose-spark-kafka-mysql.yaml up -d

expected result will be like :

Double check in Docker Desktop

Lets move to setup database by using root user (MySQL root user)

docker-compose -f docker-compose-spark-kafka-mysql.yaml exec mysql bash -c 'mysql -u root -pdebezium'

After that, we check existed users by running this command

SELECT user,host FROM mysql.user;

Create Database name cdc

In another Terminal (or in the same but click CTRL+D → Bye 😁), you will access to MySQL by using another user

docker-compose -f docker-compose-spark-kafka-mysql.yaml exec mysql bash -c 'mysql -u debezium -pdbz'

After that you will create table

CREATE TABLE cdc.example(
customerId int,
customerFName varchar(255),
customerLName varchar(255),
customerCity varchar(255)
);

USE cdc;

SHOW tables;

Create Debezium Connector :
Add in conf folder the config json like shown below

{"name": "data-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"topic.prefix": "dbserver1",
"database.server.id": "184054",
"database.include.list": "data",
"debezium.sink.kafka.topic": "debezium-mysql-events",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.data"
}
}
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @./conf/register-mysql.json

After Running this command in terminal your will have something like this :

Configure Debezium to monitor MySQL and publish to Kafka topics.

docker-compose -f docker-compose-spark-kafka-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.cdc.example

Then you update one record in the database by running this command

UPDATE example SET customerCity = 'Casablanca' WHERE customerId = 2;

You can use https://jsoncrack.com/editor to check the change happen for this field

Move on to consume this Data using Spark (PySpark)😉

in your src folder add consumer_pyspark_code_.py like shown below

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create Spark Session
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Define Kafka source
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "dbserver1.data.example") \
.option("startingOffsets", "earliest") \
.load()

# Decode the key and value columns from bytes to strings
df = df.withColumn("key", col("key").cast("string"))
df = df.withColumn("value", col("value").cast("string"))

# Start processing the stream
query = df \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()

query.awaitTermination()

after you can run spark submit command to call this job

docker exec -it {use_your_spark_docker_id_container} bash -c 'spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/real_time_pipeline.py'

Kafka UI :

Add the config file in the kui subfolder as shown below

server:
port: 10000
kafka:
clusters:
-
name: kafka
bootstrapServers: kafka:9092

To access to the Kafka UI : http://localhost:10000/

We can see also the consumers, as shown below :

Github repo

for all project code please visit
https://github.com/abdelbarre/cdc_mysql_debezium_spark_delta

Conclusion

By integrating Apache Spark Streaming into the CDC pipeline, organizations can leverage real-time data processing capabilities to extract valuable insights from streaming data. This architecture, coupled with Docker Compose for easy deployment and management, provides a scalable and efficient solution for real-time data integration and analysis.
!! Happy Learning !! ✌️

--

--