Skip to content

Commit

Permalink
readme
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Feb 12, 2025
1 parent 65545d9 commit 27a4440
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 24 deletions.
98 changes: 80 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,82 @@
# Conduit Connector MongoDB

## General

The [MongoDB](https://www.mongodb.com/) connector is one of Conduit plugins. It
provides both, a source and a destination MongoDB connector.

### Prerequisites
<!-- readmegen:description -->
## Source

- [Go](https://go.dev/) 1.23+
- [MongoDB](https://www.mongodb.com/) [replica set](https://www.mongodb.com/docs/manual/replication/) (
at least single-node)
or [sharded cluster](https://www.mongodb.com/docs/manual/sharding/)
with [WiredTiger](https://www.mongodb.com/docs/manual/core/wiredtiger/)
storage engine
- [Docker](https://www.docker.com/)
- (optional) [golangci-lint](https://github.com/golangci/golangci-lint) v1.55.2
The MongoDB Source Connector connects to a MongoDB with the provided `uri`, `db`
and `collection` and starts creating records for each change detected in a
collection.

### How to build it
Upon starting, the Source takes a snapshot of a given collection in the
database, then switches into CDC mode. In CDC mode, the plugin reads events from
a [Change Stream](https://www.mongodb.com/docs/manual/changeStreams/). In order
for this to work correctly, your MongoDB instance must
meet [the criteria](https://www.mongodb.com/docs/manual/changeStreams/#availability)
specified on the official website.

Run `make build`.
### Snapshot Capture

### Development
When the connector first starts, snapshot mode is enabled. The connector reads
all rows of a collection in batches using
a [cursor-based](https://www.mongodb.com/docs/drivers/go/current/fundamentals/crud/read-operations/cursor/)
pagination,
limiting the rows by `batchSize`. The connector stores the last processed
element value of an `orderingColumn` in a position, so the snapshot process can
be paused and resumed without losing data. Once all rows in that initial
snapshot are read the connector switches into CDC mode.

Run `make install-tools` to install all the required tools.
This behavior is enabled by default, but can be turned off by adding
`"snapshot": false` to the Source configuration.

Run `make test` to run all the units and `make test-integration` to run all the
integration tests, which require Docker to be installed and running. The command
will handle starting and stopping docker container for you.
### Change Data Capture

The connector implements CDC features for MongoDB by using a Change Stream that
listens to changes in the configured collection. Every detected change is
converted into a record and returned in the call to `Read`. If there is no
available record when `Read` is called, the connector returns
`sdk.ErrBackoffRetry` error.

The connector stores a `resumeToken` of every Change Stream event in a position,
so the CDC process is resumble.

> **Warning**
>
> [Azure CosmosDB for MongoDB](https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/change-streams)
> has very limited support for Change Streams, so they cannot be used for CDC.
> If CDC is not possible, like in the case with CosmosDB, the connector only
> supports detecting insert operations by polling for new documents.
### Key handling

The connector always uses the `_id` field as a key.

If the `_id` field is `bson.ObjectID` the connector converts it to a string when
transferring a record to a destination, otherwise, it leaves it unchanged.

## Destination

The MongoDB Destination takes a `opencdc.Record` and parses it into a valid
MongoDB query. The Destination is designed to handle different payloads and
keys. Because of this, each record is individually parsed and written.

### Collection name

If a record contains an `opencdc.collection` property in its metadata it will be
written in that collection, otherwise it will fall back to use the `collection`
configured in the connector. Thus, a Destination can support multiple
collections in the same connector, as long as the user has proper access to
those collections.

### Key handling

The connector uses all keys from an `opencdc.Record` when updating and deleting
documents.

If the `_id` field can be converted to a `bson.ObjectID`, the connector converts
it, otherwise, it uses it as it is.<!-- /readmegen:description -->

### Source Configuration Parameters

Expand Down Expand Up @@ -248,4 +298,16 @@ pipelines:
```
<!-- /readmegen:destination.parameters.yaml -->
### How to build it
Run `make build`.

### Development

Run `make install-tools` to install all the required tools.

Run `make test` to run all the units and `make test-integration` to run all the
integration tests, which require Docker to be installed and running. The command
will handle starting and stopping docker container for you.

![scarf pixel](https://static.scarf.sh/a.png?x-pxid=528a9760-d573-4524-8f65-74a5e4d402e8)
12 changes: 6 additions & 6 deletions source/iterator/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ const (
var changeStreamMatchPipeline = bson.D{
{
Key: "$match", Value: bson.M{
"operationType": bson.M{"$in": []string{
operationTypeInsert,
operationTypeUpdate,
operationTypeDelete,
}},
},
"operationType": bson.M{"$in": []string{
operationTypeInsert,
operationTypeUpdate,
operationTypeDelete,
}},
},
},
}

Expand Down

0 comments on commit 27a4440

Please sign in to comment.