Skip to main content

Debezium MySQL CDC with Kafka (KRaft)

1. Download Correct Debezium SQL Server Plugin (v3.3)

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/3.3.0.Final/debezium-connector-sqlserver-3.3.0.Final-plugin.tar.gz

Extract plugin And Move to /opt/kafka/connectors

tar -xzf debezium-connector-sqlserver-3.3.0.Final-plugin.tar.gz -C /opt/kafka/connectors

Restart your Kafka Connect service:

systemctl restart kafka-connect

Create SQL Server User With Required Permissions

CREATE DATABASE DebeziumTest;
USE DebeziumTest;
EXEC sys.sp_cdc_enable_db;

CREATE TABLE dbo.Customers (
    CustomerID INT IDENTITY(1,1) PRIMARY KEY,
    FirstName NVARCHAR(100),
    LastName NVARCHAR(100),
    Email NVARCHAR(255),
    CreatedAt DATETIME DEFAULT GETDATE()
);

EXEC sys.sp_cdc_enable_table
    @source_schema = 'dbo',
    @source_name = 'Customers',
    @role_name = NULL,
    @supports_net_changes = 0;

INSERT INTO dbo.Customers (FirstName, LastName, Email)
VALUES
 ('Golam', 'Kibria', 'golam@example.com'),
 ('John', 'Doe', 'john.doe@example.com'),
 ('Jane', 'Smith', 'jane.smith@example.com');

UPDATE dbo.Customers
SET Email = 'golam.kibria@updated.com'
WHERE CustomerID = 1;

SELECT name, is_cdc_enabled FROM sys.databases WHERE name='DebeziumTest';

SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name='Customers';

USE master;
CREATE LOGIN debezium WITH PASSWORD = 'StrongPassword!123';

USE DebeziumTest;
CREATE USER debezium FOR LOGIN debezium;

ALTER ROLE db_owner ADD MEMBER debezium;



Register debezium connector

curl -X POST http://192.168.1.226:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mssql-connector",
    "config": {
      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",

      "database.hostname": "192.168.1.225",
      "database.port": "1433",
      "database.user": "debezium",
      "database.password": "StrongPassword!123",

      "database.names": "DebeziumTest",
      "database.server.name": "sqlserver",
      "database.encrypt": "false",

      "topic.prefix": "mssql",

      "snapshot.mode": "initial",

      "table.include.list": "DebeziumTest.dbo.Customers",

     "schema.history.internal.kafka.bootstrap.servers": "192.168.1.226:9092",
     "schema.history.internal.kafka.topic": "schema-changes.mssql",

      "include.schema.changes": "true",

      "decimal.handling.mode": "string",
      "time.precision.mode": "adaptive_time_microseconds"
    }
  }'

. Verify Connector

curl http://localhost:8083/connectors

Check status

curl http://localhost:8083/connectors/mssql-connector/status

. Kafka Topics Created

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Consume CDC Messages

kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.226:9092 \
  --topic mssql.sqlserver.DebeziumTest.dbo.Customers \
  --from-beginning

Delete Connector

curl -X DELETE http://localhost:8083/connectors/mssql-connector