Debezium MySQL Connector Setup for Search Sync
Problem Statement
You need the Debezium MySQL connector to stream row-level changes from a MySQL primary into a Kafka topic so a downstream sink keeps your search index current — and it refuses to start, or it starts but emits nothing. The failure is almost always upstream of Kafka Connect: the binary log is configured for statement-based replication, the GTID set has gaps, or the capture user lacks REPLICATION SLAVE. This guide resolves the exact MySQL-side prerequisites and connector configuration for a search-sync pipeline, sitting under the broader change data capture setup work inside your data ingestion and synchronization pipelines. The mechanics of Kafka Connect, sink upserts, and DLQ handling are covered once in the Debezium pipeline guide; here we stay on MySQL-specific terrain.
Prerequisites
- MySQL 8.0+ (or 5.7) running as a primary, reachable from Kafka Connect on port 3306.
- Kafka Connect in distributed mode with the Debezium MySQL plugin (
debezium-connector-mysql) on the plugin path. - A Kafka broker set and, for the schema history topic, a topic the connector can create or write to.
- Server-side write access to
my.cnfand permission to restart the instance once. - The ability to create a dedicated MySQL user with global replication privileges.
Diagnosis / Context
The Debezium MySQL connector reads the binary log (binlog), MySQL’s ordered record of row changes used for replication. It does not poll tables. Three settings gate whether the binlog is even usable for CDC. binlog_format must be ROW so each event carries the full before/after image of a row rather than the SQL text; STATEMENT or MIXED produce events Debezium cannot reconstruct into per-row change documents. binlog_row_image must be FULL so deletes and updates carry every column — MINIMAL omits unchanged columns and breaks search documents that depend on fields the writer did not touch. Each server in the topology needs a unique numeric server-id, and the connector itself registers as a replica with its own database.server.id.
When a prerequisite is missing, the connector task fails fast. A typical task trace in the Connect log:
io.debezium.DebeziumException: The MySQL server is not configured to use a ROW binlog_format,
which is required for this connector to work properly. Change the MySQL configuration to use a
binlog_format=ROW and restart the connector.
at io.debezium.connector.mysql.MySqlConnectorTask.validateBinlogConfiguration(...)
Verify the live server state before touching the connector:
-- Run as an admin against the source MySQL primary.
SHOW VARIABLES WHERE Variable_name IN (
'binlog_format', 'binlog_row_image', 'server_id', 'gtid_mode', 'enforce_gtid_consistency'
);
-- Expected:
-- binlog_format ROW
-- binlog_row_image FULL
-- server_id <non-zero, unique>
-- gtid_mode ON
-- enforce_gtid_consistency ON
Enabling GTID (Global Transaction Identifier) mode is optional but strongly recommended: it lets the connector record its offset as a globally unique transaction set rather than a fragile (binlog file, position) tuple, which survives binlog rotation and primary failover far more gracefully. Without GTID, a planned failover that promotes a replica leaves the connector pointing at a binlog coordinate that does not exist on the new primary, and recovery means a manual offset edit or a full re-snapshot. With GTID the connector resumes from the transaction set, and the same set is valid on any member of the topology.
The data path is short but every hop has a MySQL-specific failure mode. The binlog is the source of truth; the connector reads it as a registered replica; the schema history topic shadows DDL so events stay decodable; and the flattened payload lands on a per-table Kafka topic for the search sink to upsert.
Solution Steps
1. Configure the binary log in my.cnf
[mysqld]
# Row-based logging is mandatory; per-row before/after images.
binlog_format = ROW
# FULL captures every column on every event so deletes/updates are complete.
binlog_row_image = FULL
# Unique across every server in the replication topology.
server-id = 184054
# Keep binlogs long enough that the connector can recover after downtime.
binlog_expire_logs_seconds = 604800 # 7 days
# GTID: stable offsets across rotation and failover.
gtid_mode = ON
enforce_gtid_consistency = ON
Restart the instance once for these to take effect, then confirm with the SHOW VARIABLES query above. Note gtid_mode=ON cannot be set live from OFF in a single step on a running cluster; on a standalone dev box a restart is simplest.
2. Create a dedicated capture user
The connector needs global privileges to read the binlog and the table snapshots, but nothing that lets it write data.
CREATE USER 'cdc_search'@'%' IDENTIFIED BY 'change_me_in_secret_store';
-- SELECT for the initial snapshot; RELOAD/LOCK TABLES for a consistent snapshot.
GRANT SELECT, RELOAD, SHOW DATABASES, LOCK TABLES ON *.* TO 'cdc_search'@'%';
-- REPLICATION SLAVE lets the user read the binlog stream itself.
-- REPLICATION CLIENT lets it run SHOW MASTER STATUS / SHOW BINARY LOGS.
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_search'@'%';
FLUSH PRIVILEGES;
REPLICATION SLAVE is the grant most often forgotten — without it the snapshot succeeds but the streaming phase dies the moment it tries to attach to the binlog.
3. Register the connector
Post the config to the Kafka Connect REST API. The MySQL connector requires a schema history topic: an internal Kafka topic where it records every DDL statement it observes, so it can deserialize binlog events recorded under an older table shape after a restart.
{
"name": "mysql-cdc-search-sync",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "prod-mysql.internal",
"database.port": "3306",
"database.user": "cdc_search",
"database.password": "${DB_PASS}",
"database.server.id": "184055",
"topic.prefix": "search.mysql",
"database.include.list": "shop",
"table.include.list": "shop.products,shop.variants",
"schema.history.internal.kafka.topic": "schema-changes.search.mysql",
"schema.history.internal.kafka.bootstrap.servers": "kafka-broker-1:9092,kafka-broker-2:9092",
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"include.schema.changes": "false",
"heartbeat.interval.ms": "5000",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
database.server.id here is the connector’s identity as a replica and must differ from the primary’s server-id and from any other connector or real replica. The ExtractNewRecordState transform flattens Debezium’s nested envelope down to the row payload your search sink expects; refine the shape further in data normalization and cleaning before it reaches the index, and align the resulting fields with your search index mapping.
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @mysql-cdc-search-sync.json
4. Choose the right snapshot mode
snapshot.mode=initial does a one-time consistent scan of the included tables, emits a synthetic create event per row to seed the index, then switches to streaming. Use it the first time you index a table. If your search index is already populated from another source and you only need ongoing changes, use schema_only (called no_data in newer releases): the connector reads schema to populate the history topic but skips the row scan and starts streaming from the current binlog position. schema_only_recovery is a repair mode — run it once when the schema history topic is lost or corrupted but the connector offset is still valid, to rebuild history without re-snapshotting data.
For tables too large to snapshot in one consistent pass without holding read locks or exhausting connector memory, use the incremental snapshot feature instead: set snapshot.mode=initial with incremental.snapshot.chunk.size and trigger chunked, watermark-bounded reads through the signal table. Incremental snapshots interleave with live streaming, so search indexing of historical rows proceeds without pausing change capture — important when a single products table holds tens of millions of rows and a stop-the-world snapshot would block the index for hours. Each chunk is keyed on the primary key, so a snapshot that fails midway resumes from the last completed chunk rather than restarting.
5. Key connector settings
| Name | Default | Type | Effect |
|---|---|---|---|
database.server.id |
(none) | integer | Unique replica identity the connector registers under; must differ from the primary’s server-id and every other replica. |
snapshot.mode |
initial |
enum | initial seeds the index with a full scan then streams; schema_only skips the data scan; schema_only_recovery rebuilds lost history without re-snapshotting. |
schema.history.internal.kafka.topic |
(none) | string | Internal topic where observed DDL is recorded so old-shape binlog events stay decodable. |
snapshot.locking.mode |
minimal |
enum | Controls how aggressively the snapshot locks tables; none avoids global locks but requires no concurrent DDL. |
Verification
Confirm the task is RUNNING and that events are flowing.
# 1. Both connector and task should report RUNNING.
curl -s http://kafka-connect:8083/connectors/mysql-cdc-search-sync/status | jq '.connector.state, .tasks[].state'
# Expected:
# "RUNNING"
# "RUNNING"
# 2. After the snapshot, consume from a table topic and expect one record per row.
kafka-console-consumer.sh --bootstrap-server kafka-broker-1:9092 \
--topic search.mysql.shop.products --from-beginning --max-messages 1 | jq .
Now write a row on the primary and confirm it lands on the topic within a second:
UPDATE shop.products SET price = price + 1 WHERE id = 42;
kafka-console-consumer.sh --bootstrap-server kafka-broker-1:9092 \
--topic search.mysql.shop.products --offset latest --partition 0 --max-messages 1 | jq '.op, .price'
# Expected: "u" (update), followed by the new price value.
Common Pitfalls
Snapshot blocks writes because of a global read lock
Root cause: the default snapshot.locking.mode=minimal briefly takes a global lock to read the binlog position consistently, but on a busy primary even a brief FLUSH TABLES WITH READ LOCK stalls writers. On MySQL 8 with GTID and consistent snapshots available, set snapshot.locking.mode=none only if you can guarantee no DDL runs during the snapshot, or snapshot from a read replica. Remediation:
curl -s http://kafka-connect:8083/connectors/mysql-cdc-search-sync/config | jq '."snapshot.locking.mode"'
Connector deserialization fails after an upstream ALTER TABLE
Root cause: a DDL change was applied while the connector was down, and the schema history topic is missing the intermediate statement, so it cannot decode binlog events recorded under the old shape. The MySQL connector tracks DDL through the schema history topic, not through the binlog alone. Remediation: run a one-time recovery instead of a full re-snapshot:
curl -X PUT http://kafka-connect:8083/connectors/mysql-cdc-search-sync/config \
-H "Content-Type: application/json" \
-d '{"snapshot.mode":"schema_only_recovery", ...}'
Streaming dies with "binary log is not available" after downtime
Root cause: the connector’s stored offset points at a binlog file the server already purged because binlog_expire_logs_seconds was too short relative to the outage. With GTID enabled the connector can sometimes recover by GTID set, but a purged GTID range is unrecoverable and forces a re-snapshot. Remediation: confirm what the server still has, then re-snapshot if the offset is gone:
SHOW BINARY LOGS;
SELECT @@GLOBAL.gtid_purged; -- if the connector's GTIDs fall inside this, they are gone
Related
- Building a CDC pipeline with Debezium — the Kafka Connect, sink, and DLQ mechanics this MySQL setup plugs into.
- Setting up the Debezium MongoDB connector — the change-stream equivalent for a document store source.
- Schema design and index mapping — shape the flattened row events into a well-typed search index.