Debezium MySQL CDC with Kafka (KRaft)
1. Download Correct Debezium SQL Server Plugin (v3.3)
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/3.3.0.Final/debezium-connector-sqlserver-3.3.0.Final-plugin.tar.gz
Extract plugin And Move to /opt/kafka/connectors
tar -xzf debezium-connector-sqlserver-3.3.0.Final-plugin.tar.gz -C /opt/kafka/connectors
Restart your Kafka Connect service:
systemctl restart kafka-connect
Create SQL Server User With Required Permissions
CREATE DATABASE DebeziumTest;
USE DebeziumTest;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE dbo.Customers (
CustomerID INT IDENTITY(1,1) PRIMARY KEY,
FirstName NVARCHAR(100),
LastName NVARCHAR(100),
Email NVARCHAR(255),
CreatedAt DATETIME DEFAULT GETDATE()
);
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'Customers',
@role_name = NULL,
@supports_net_changes = 0;
INSERT INTO dbo.Customers (FirstName, LastName, Email)
VALUES
('Golam', 'Kibria', 'golam@example.com'),
('John', 'Doe', 'john.doe@example.com'),
('Jane', 'Smith', 'jane.smith@example.com');
UPDATE dbo.Customers
SET Email = 'golam.kibria@updated.com'
WHERE CustomerID = 1;
SELECT name, is_cdc_enabled FROM sys.databases WHERE name='DebeziumTest';
SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name='Customers';
USE master;
CREATE LOGIN debezium WITH PASSWORD = 'StrongPassword!123';
USE DebeziumTest;
CREATE USER debezium FOR LOGIN debezium;
ALTER ROLE db_owner ADD MEMBER debezium;
Register debezium connector
curl -X POST http://192.168.1.226:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mssql-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "192.168.1.225",
"database.port": "1433",
"database.user": "debezium",
"database.password": "StrongPassword!123",
"database.names": "DebeziumTest",
"database.server.name": "sqlserver",
"database.encrypt": "false",
"topic.prefix": "mssql",
"snapshot.mode": "initial",
"table.include.list": "DebeziumTest.dbo.Customers",
"schema.history.internal.kafka.bootstrap.servers": "192.168.1.226:9092",
"schema.history.internal.kafka.topic": "schema-changes.mssql",
"include.schema.changes": "true",
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive_time_microseconds"
}
}'
. Verify Connector
curl http://localhost:8083/connectors
curl http://localhost:8083/connectors/mssql-connector/status
. Kafka Topics Created
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list