Debezium MongoDB Connector Setup for Search Sync
Problem Statement
You point the Debezium MongoDB connector at a standalone mongod to stream document changes into a search index, and it either refuses to start or emits update events that carry only the changed fields — leaving your search documents half-empty. Both symptoms trace to the same root: MongoDB change data capture is built on change streams, which require a replica set and, for complete update payloads, an explicit post-image lookup. This guide covers the MongoDB-specific setup under your broader change data capture setup work inside your data ingestion and synchronization pipelines. The shared Kafka Connect, sink, and dead-letter mechanics are documented once in the Debezium pipeline guide; here we stay on document-store specifics.
Prerequisites
- MongoDB 5.0+ deployed as a replica set or sharded cluster — a standalone node has no oplog and exposes no change stream.
- Kafka Connect in distributed mode with the Debezium MongoDB plugin (
debezium-connector-mongodb) on the plugin path. - A Kafka broker set the connector can reach for data and offset topics.
- A MongoDB user with read access to the captured collections plus cluster-level change-stream privileges.
- For complete updates,
changeStreamPreAndPostImagesenabled on each captured collection (MongoDB 6.0+).
Diagnosis / Context
A change stream is MongoDB’s real-time feed of data changes, exposed through the aggregation framework and backed internally by the oplog, the capped collection the replica set uses to replicate writes. The modern Debezium MongoDB connector consumes the change stream rather than tailing the oplog directly, which is why a replica set is mandatory: a standalone mongod keeps no oplog. Against a standalone node the connector fails immediately:
org.apache.kafka.connect.errors.ConnectException: Unable to connect to MongoDB and validate
its configuration: The $changeStream stage is only supported on replica sets
Each change stream event carries a resume token — an opaque cursor position the connector persists as its Kafka Connect offset. On restart it resumes the stream from the last committed token, so no events are missed and none are replayed, provided the token still falls inside the oplog window. If the connector is down longer than the oplog retains, the token expires and the connector must re-snapshot.
The second core issue is update completeness. By default a change stream update event reports only the modified fields in updateDescription, not the full document. A search index almost always needs the whole document, so the connector must be told to fetch the post-image — the state of the document after the change — via capture.mode=change_streams_update_full, which sets fullDocument=updateLookup on the underlying stream.
-- Confirm you are on a replica set (run in mongosh, not SQL):
-- rs.status() -> "set" and a list of members, not an error.
-- db.getReplicationInfo() -> oplog size and time window.
It is worth being precise about oplog versus change stream, because the distinction drives most MongoDB CDC decisions. The oplog is the low-level, internal replication log; older Debezium releases tailed it directly. Change streams are the supported abstraction layered on top of it: they survive primary elections, expose resume tokens, and respect collection-level filtering and pre/post-image configuration. The connector you deploy today should always use change streams (capture.mode=change_streams*); oplog mode is legacy and loses the post-image guarantees a search index depends on. The data path is short — change stream to connector to per-collection Kafka topic to sink — but each hop carries a document-store-specific concern.
Solution Steps
1. Confirm the replica set and oplog window
# In mongosh against the source cluster.
mongosh "mongodb://prod-mongo:27017/?replicaSet=rs0" --eval '
rs.status().set; // -> "rs0"
db.getReplicationInfo(); // check the oplog time window
'
A short oplog window is the silent killer of resumability: if the window is hours and your maintenance can take longer, widen it (replSetResizeOplog) before relying on the connector. The resume token the connector persists is just a pointer into this window; it carries no data, so when the window slides past it the token is irrecoverable and the only path forward is a fresh snapshot, which re-indexes every captured document. Treat oplog sizing as a search-availability decision, not just a database one — an undersized oplog turns a routine connector restart into a full reindex that can saturate the search sink.
On a sharded cluster the connector opens the change stream against mongos rather than an individual replica set, and it merges events from every shard into a single ordered stream. Point mongodb.connection.string at the router endpoints and grant the capture user cluster-wide read; do not connect to a single shard’s replica set directly, or you will miss writes routed to other shards.
2. Enable pre/post images on captured collections
On MongoDB 6.0+, full-document update lookups are far cheaper when the collection stores change-stream images. Enable it per collection:
mongosh "mongodb://prod-mongo:27017/?replicaSet=rs0" --eval '
db.getSiblingDB("shop").runCommand({
collMod: "products",
changeStreamPreAndPostImages: { enabled: true }
});
'
3. Create a capture user
mongosh "mongodb://prod-mongo:27017/admin?replicaSet=rs0" --eval '
db.createUser({
user: "cdc_search",
pwd: "change_me_in_secret_store",
// read on the data DB; changeStream + find at cluster scope.
roles: [
{ role: "read", db: "shop" },
{ role: "readAnyDatabase", db: "admin" }
]
});
'
4. Register the connector
The key MongoDB-specific knob is capture.mode. Setting it to change_streams_update_full makes update events carry the complete post-image so the sink can index a whole document rather than a partial patch.
{
"name": "mongo-cdc-search-sync",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://cdc_search:${DB_PASS}@prod-mongo:27017/?replicaSet=rs0",
"topic.prefix": "search.mongo",
"database.include.list": "shop",
"collection.include.list": "shop.products,shop.variants",
"snapshot.mode": "initial",
"capture.mode": "change_streams_update_full",
"capture.mode.full.update.type": "post_image",
"heartbeat.interval.ms": "5000",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.array.encoding": "array",
"transforms.unwrap.flatten.struct": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Note the MongoDB-specific transform: ExtractNewDocumentState, not the relational ExtractNewRecordState. It converts the BSON change event into a flat JSON document — collapsing the fullDocument envelope and serializing MongoDB types (ObjectId, Decimal128, dates) into plain JSON the search sink understands. Push any further reshaping into data normalization and cleaning, and make the field types line up with your search index mapping before they reach the index.
Two encoding decisions on this transform matter for search documents specifically. array.encoding=array keeps MongoDB arrays as JSON arrays rather than exploding them into indexed struct fields like tags0, tags1 — which is what you want for a keyword or text array field in the index. flatten.struct=true turns embedded subdocuments into dotted top-level keys (address.city), which maps cleanly onto flattened search fields; leave it off only if your sink and mapping handle nested objects natively. Deletes are emitted as tombstones unless delete.handling.mode=rewrite adds a __deleted flag, which most search sinks translate into a document removal.
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @mongo-cdc-search-sync.json
5. Choose the right snapshot mode
snapshot.mode=initial reads every captured collection once to seed the search index, recording a resume token at the start so no concurrent writes are lost, then switches to the live change stream. snapshot.mode=never skips the data scan entirely and begins streaming from the current token — use it when the index is already populated and you only need to keep it current. As with the relational connectors, an incremental snapshot is the right tool for very large collections: it reads document ranges in chunks while change capture runs in parallel, so seeding a multi-million-document collection never blocks ongoing index updates. Because MongoDB documents are schemaless, there is no schema history topic to manage here — the absence of a fixed schema is exactly why the ExtractNewDocumentState flattening step does more work than its relational sibling.
6. Key connector settings
| Name | Default | Type | Effect |
|---|---|---|---|
capture.mode |
change_streams |
enum | change_streams emits only changed fields on updates; change_streams_update_full performs a post-image lookup so updates carry the whole document. |
snapshot.mode |
initial |
enum | initial scans collections to seed the index then streams; never skips the scan and streams from the current resume token. |
mongodb.connection.string |
(none) | string | Connection URI; must include replicaSet so the connector discovers members and opens a change stream. |
transforms.unwrap.flatten.struct |
false |
boolean | Flattens nested document structs into dotted top-level fields so the sink can map them to flat index fields. |
Verification
# 1. Connector and task should both be RUNNING.
curl -s http://kafka-connect:8083/connectors/mongo-cdc-search-sync/status | jq '.connector.state, .tasks[].state'
# Expected:
# "RUNNING"
# "RUNNING"
# 2. After the snapshot, one record per existing document on the collection topic.
kafka-console-consumer.sh --bootstrap-server kafka-broker-1:9092 \
--topic search.mongo.shop.products --from-beginning --max-messages 1 | jq .
Apply a partial update and confirm the streamed event carries the full document, not just the changed field:
mongosh "mongodb://prod-mongo:27017/shop?replicaSet=rs0" --eval '
db.products.updateOne({ _id: 42 }, { $set: { price: 19.99 } });
'
kafka-console-consumer.sh --bootstrap-server kafka-broker-1:9092 \
--topic search.mongo.shop.products --offset latest --partition 0 --max-messages 1 | jq 'keys'
# Expected: the full set of document fields (name, sku, price, ...), not just "price".
Common Pitfalls
Update events index only the changed field
Root cause: capture.mode is left at the default change_streams, so the connector forwards only updateDescription.updatedFields. The sink then overwrites the search document with a near-empty body. The fix is the post-image lookup, which sets fullDocument=updateLookup on the change stream. Remediation:
curl -s http://kafka-connect:8083/connectors/mongo-cdc-search-sync/config | jq '."capture.mode"'
# Must read "change_streams_update_full".
Connector re-snapshots after downtime instead of resuming
Root cause: the stored resume token fell outside the oplog window during an outage longer than oplog retention, so MongoDB cannot resume the change stream and the connector starts a fresh snapshot. Resume tokens are only as durable as the oplog behind them. Remediation: size the oplog for your worst-case maintenance window, then verify:
mongosh "mongodb://prod-mongo:27017/?replicaSet=rs0" --eval 'db.getReplicationInfo().timeDiffHours'
# Should comfortably exceed your longest expected connector downtime.
Post-image is null for updates on older documents
Root cause: capture.mode.full.update.type=post_image relies on changeStreamPreAndPostImages being enabled on the collection; documents updated before it was enabled, or collections where it was never enabled, return a null post-image and Debezium falls back to a separate lookup or drops the field. Remediation: confirm the collection option is set:
mongosh --quiet --eval 'db.getSiblingDB("shop").runCommand({listCollections:1, filter:{name:"products"}}).cursor.firstBatch[0].options'
# Look for: changeStreamPreAndPostImages: { enabled: true }
Related
- Building a CDC pipeline with Debezium — the Kafka Connect, sink, and DLQ mechanics this MongoDB setup plugs into.
- Setting up the Debezium MySQL connector — the binlog-based equivalent for a relational source.
- Schema design and index mapping — turn flattened document events into a well-typed search index.