Skip to main content

Debezium MySQL CDC with Kafka (KRaft) – Full Setup Guide

Debezium MySQL CDC with Kafka (KRaft) – Full Setup Guide

Overview

This guide explains how to configure Debezium MySQL Change Data Capture (CDC) using:

  • Apache Kafka 4.x (KRaft mode)
  • Kafka Connect
  • Debezium MySQL connector
  • MySQL database at 192.168.1.226
  • Database name: mydb

1. Install Debezium Connector Plugin

Create plugin directory:

sudo mkdir -p /opt/kafka/connectors
sudo chown -R kafka:kafka /opt/kafka/connectors
cd /opt/kafka/connectors

Download Debezium MySQL plugin:

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.2.Final/debezium-connector-mysql-2.7.2.Final-plugin.tar.gz

Extract it:

tar -xzf debezium-connector-mysql-2.7.2.Final-plugin.tar.gz

2. Configure Kafka Connect

Create connect config:

sudo nano /opt/kafka/config/connect-distributed.properties

Paste:

bootstrap.servers=localhost:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

plugin.path=/opt/kafka/connectors
rest.port=8083

3. Kafka Connect Systemd Service

Create service:

sudo nano /etc/systemd/system/kafka-connect.service

Paste:

[Unit]
Description=Kafka Connect Distributed Service
After=network.target

[Service]
Type=simple
User=kafka
Group=kafka
Environment="KAFKA_HEAP_OPTS=-Xms512M -Xmx2G"
ExecStart=/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Restart=always
RestartSec=5
Environment="PATH=/usr/bin:/usr/local/bin:/opt/kafka/bin"

StandardOutput=journal
StandardError=journal
LimitNOFILE=100000

[Install]
WantedBy=multi-user.target

Reload:

sudo systemctl daemon-reload
sudo systemctl start kafka-connect
sudo systemctl enable kafka-connect

4. Prepare MySQL

Enable binlog and create user:

SET GLOBAL binlog_format = 'ROW';
SET GLOBAL binlog_row_image = 'FULL';

CREATE USER 'debezium'@'%' IDENTIFIED BY 'Debezium123!';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

Test:

mysql -h 192.168.1.226 -u debezium -p

5. Register Debezium MySQL Connector (curl)

curl -X PUT http://192.168.1.226:8083/connectors/mysql-connector/config \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",

    "database.hostname": "192.168.1.225",
    "database.port": "3306",
    "database.user": "appuser",
    "database.password": "QAZWSX@786#Dhaka",
    "database.server.id": "184054",

    "topic.prefix": "mysqlserver1",
    "database.include.list": "mydb",

    "snapshot.mode": "initial",
    "include.schema.changes": "true",

    "schema.history.internal.kafka.bootstrap.servers": "192.168.1.226:9092",
    "schema.history.internal.kafka.topic": "schema-changes.mysql"
  }'


6. Verify Connector

curl http://localhost:8083/connectors
curl http://localhost:8083/connectors/mysql-connector/status

7. Kafka Topics Created

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

8. Consume CDC Messages

/opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic mysqlserver1.mydb.userscustomers \
  --from-beginning

9. Delete Connector

curl -X DELETE http://localhost:8083/connectors/mysql-connector

10. Restart Connector

curl -X POST http://localhost:8083/connectors/mysql-connector/restart

✔ Completed

You now have working end-to-end Debezium CDC with Kafka + MySQL.