diff --git a/README.md b/README.md index 081de5f..d1b3cee 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,82 @@ # Conduit Connector MongoDB -## General - The [MongoDB](https://www.mongodb.com/) connector is one of Conduit plugins. It provides both, a source and a destination MongoDB connector. -### Prerequisites + +## Source -- [Go](https://go.dev/) 1.23+ -- [MongoDB](https://www.mongodb.com/) [replica set](https://www.mongodb.com/docs/manual/replication/) ( - at least single-node) - or [sharded cluster](https://www.mongodb.com/docs/manual/sharding/) - with [WiredTiger](https://www.mongodb.com/docs/manual/core/wiredtiger/) - storage engine -- [Docker](https://www.docker.com/) -- (optional) [golangci-lint](https://github.com/golangci/golangci-lint) v1.55.2 +The MongoDB Source Connector connects to a MongoDB with the provided `uri`, `db` +and `collection` and starts creating records for each change detected in a +collection. -### How to build it +Upon starting, the Source takes a snapshot of a given collection in the +database, then switches into CDC mode. In CDC mode, the plugin reads events from +a [Change Stream](https://www.mongodb.com/docs/manual/changeStreams/). In order +for this to work correctly, your MongoDB instance must +meet [the criteria](https://www.mongodb.com/docs/manual/changeStreams/#availability) +specified on the official website. -Run `make build`. +### Snapshot Capture -### Development +When the connector first starts, snapshot mode is enabled. The connector reads +all rows of a collection in batches using +a [cursor-based](https://www.mongodb.com/docs/drivers/go/current/fundamentals/crud/read-operations/cursor/) +pagination, +limiting the rows by `batchSize`. The connector stores the last processed +element value of an `orderingColumn` in a position, so the snapshot process can +be paused and resumed without losing data. Once all rows in that initial +snapshot are read the connector switches into CDC mode. -Run `make install-tools` to install all the required tools. +This behavior is enabled by default, but can be turned off by adding +`"snapshot": false` to the Source configuration. -Run `make test` to run all the units and `make test-integration` to run all the -integration tests, which require Docker to be installed and running. The command -will handle starting and stopping docker container for you. +### Change Data Capture + +The connector implements CDC features for MongoDB by using a Change Stream that +listens to changes in the configured collection. Every detected change is +converted into a record and returned in the call to `Read`. If there is no +available record when `Read` is called, the connector returns +`sdk.ErrBackoffRetry` error. + +The connector stores a `resumeToken` of every Change Stream event in a position, +so the CDC process is resumble. + +> **Warning** +> +> [Azure CosmosDB for MongoDB](https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/change-streams) +> has very limited support for Change Streams, so they cannot be used for CDC. +> If CDC is not possible, like in the case with CosmosDB, the connector only +> supports detecting insert operations by polling for new documents. + +### Key handling + +The connector always uses the `_id` field as a key. + +If the `_id` field is `bson.ObjectID` the connector converts it to a string when +transferring a record to a destination, otherwise, it leaves it unchanged. + +## Destination + +The MongoDB Destination takes a `opencdc.Record` and parses it into a valid +MongoDB query. The Destination is designed to handle different payloads and +keys. Because of this, each record is individually parsed and written. + +### Collection name + +If a record contains an `opencdc.collection` property in its metadata it will be +written in that collection, otherwise it will fall back to use the `collection` +configured in the connector. Thus, a Destination can support multiple +collections in the same connector, as long as the user has proper access to +those collections. + +### Key handling + +The connector uses all keys from an `opencdc.Record` when updating and deleting +documents. + +If the `_id` field can be converted to a `bson.ObjectID`, the connector converts +it, otherwise, it uses it as it is. ### Source Configuration Parameters @@ -248,4 +298,16 @@ pipelines: ``` +### How to build it + +Run `make build`. + +### Development + +Run `make install-tools` to install all the required tools. + +Run `make test` to run all the units and `make test-integration` to run all the +integration tests, which require Docker to be installed and running. The command +will handle starting and stopping docker container for you. + ![scarf pixel](https://static.scarf.sh/a.png?x-pxid=528a9760-d573-4524-8f65-74a5e4d402e8) diff --git a/source/iterator/cdc.go b/source/iterator/cdc.go index 2c1a7eb..8aafe00 100644 --- a/source/iterator/cdc.go +++ b/source/iterator/cdc.go @@ -39,12 +39,12 @@ const ( var changeStreamMatchPipeline = bson.D{ { Key: "$match", Value: bson.M{ - "operationType": bson.M{"$in": []string{ - operationTypeInsert, - operationTypeUpdate, - operationTypeDelete, - }}, - }, + "operationType": bson.M{"$in": []string{ + operationTypeInsert, + operationTypeUpdate, + operationTypeDelete, + }}, + }, }, }