Skip to content

Commit c3cedcd

Browse files
authored
Add regular expressions in topic mapping (#141)
1 parent 523cc7a commit c3cedcd

File tree

5 files changed

+578
-129
lines changed

5 files changed

+578
-129
lines changed

config/checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,5 @@
4040
<suppress checks="[a-zA-Z0-9]*" files="com[\\/]mongodb[\\/]kafka[\\/]connect[\\/]avro[\\/]" />
4141
<suppress checks="[a-zA-Z0-9]*" files="com[\\/]mongodb[\\/]kafka[\\/]connect[\\/]embedded[\\/]" />
4242
<suppress checks="[a-zA-Z0-9]*" files="com[\\/]mongodb[\\/]kafka[\\/]connect[\\/]mongodb[\\/]" />
43+
<suppress checks="MethodLength" files=".*DefaultTopicMapperTest.java"/>
4344
</suppressions>

src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,76 @@ public class MongoSourceConfig extends AbstractConfig {
184184
public static final String TOPIC_NAMESPACE_MAP_CONFIG = "topic.namespace.map";
185185
private static final String TOPIC_NAMESPACE_MAP_DISPLAY = "The namespace to topic map";
186186
private static final String TOPIC_NAMESPACE_MAP_DOC =
187-
"A json map that maps change stream document namespaces to topics.\n"
188-
+ "For example: `{\"db\": \"dbTopic\", \"db.coll\": \"dbCollTopic\"}` will map all "
189-
+ "change stream documents from the `db` database to `dbTopic.<collectionName>` apart from"
190-
+ "any documents from the `db.coll` namespace which map to the `dbCollTopic` topic.\n"
191-
+ "If you want to map all messages to a single topic use `*`: "
192-
+ "For example: `{\"*\": \"everyThingTopic\", \"db.coll\": \"exceptionToTheRuleTopic\"}` "
193-
+ "will map all change stream documents to the `everyThingTopic` apart from the `db.coll` "
194-
+ "messages."
195-
+ "Note: Any prefix and suffix configuration will still apply.";
187+
"A JSON object specifying how to map a MongoDB change stream document namespace"
188+
+ " to a Kafka topic name. Used by the `DefaultTopicMapper`."
189+
+ " MongoDB change stream document namespace is"
190+
+ " a database name optionally concatenated with a collection name, separated by full stop '.'."
191+
+ "\nThe name in each JSON name/value pair is a namespace pattern,"
192+
+ " the value is a string representing the corresponding topic name template."
193+
+ " Pairs are ordered. When there are multiple pairs with equal namespace patterns (duplicates),"
194+
+ " they are deduplicated and only one pair is taken into account:"
195+
+ " its position is taken from the first pair among duplicates,"
196+
+ " its topic name template is taken from the last pair among duplicates."
197+
+ " After deduplication, pairs with an empty topic name template are ignored when computing topic names."
198+
+ " Note that a topic name computed based on this configuration is then decorated"
199+
+ " using the `topic.prefix` and `topic.suffix` configuration properties."
200+
+ "\nThere are three kinds of pairs:"
201+
+ "\n- Simple. The namespace pattern must not contain solidus '/' and can be either of the following:"
202+
+ "\n - A namespace with a collection name, in which case it matches only that namespace."
203+
+ " The topic name template is interpreted as the topic name."
204+
+ "\n - A namespace with only a database name, in which case it matches any namespace having that database name."
205+
+ " The matching namespace may either have a collection name, or not:"
206+
+ "\n - If there is a collection name, then the topic name is computed"
207+
+ " by concatenating the topic name template and the collection name from the matching namespace, separated by `topic.separator`."
208+
+ "\n - If there is no collection name, then the topic name template is interpreted as the topic name."
209+
+ "\n- Regex. The namespace pattern starts with solidus '/',"
210+
+ " followed by a regular expression with the syntax and behavior as per `java.util.regex.Pattern`."
211+
+ " The topic name is computed by doing variable expansion on the topic name template."
212+
+ " The following variables are supported:"
213+
+ "\n - `db` The database name from the matching namespace."
214+
+ "\n - `sep` The value of the `topic.separator` configuration property."
215+
+ "\n - `coll` The collection name from the matching namespace, or an empty string if there is no collection name."
216+
+ "\n - `sep_coll` The value of the `coll` variable"
217+
+ " prefixed with the value of `sep` if and only if the value of `coll` is not empty."
218+
+ "\n - `coll_sep` The value of the `coll` variable"
219+
+ " suffixed with the value of `sep` if and only if the value of `coll` is not empty."
220+
+ "\n - `sep_coll_sep` The value of the `coll` variable"
221+
+ " prefixed and suffixed with the value of `sep` if and only if the value of `coll` is not empty."
222+
+ "\n To be expanded, a variable must be enclosed between curly brackets '{' and '}', for example '{db}'."
223+
+ " The characters '{' and '}' are not allowed to be used in the topic name template for any other purpose."
224+
+ " Note that variable names are case-sensitive."
225+
+ "\nBe careful when creating a namespace pattern with characters that need escaping according to the JSON syntax."
226+
+ " For example, if you want to match full stop '.', the regex syntax requires escaping it as '\\.'."
227+
+ " However, reverse solidus '\\' itself must be escaped as '\\\\' according to the JSON syntax."
228+
+ " Consequently, to match '.' you need to write '\\\\.'."
229+
+ "\n- Wildcard. The namespace pattern is asterisk '*' and matches any namespace."
230+
+ " The topic name template is interpreted as the topic name."
231+
+ "\n The matching order:"
232+
+ "\n1. Simple pairs with a collection name in the namespace pattern."
233+
+ "\n2. Simple pairs without a collection name in the namespace pattern."
234+
+ "\n3. Regex pairs in order."
235+
+ "\n4. The wildcard pair."
236+
+ "\n Matching stops as soon as the first match is found. If no matches are found,"
237+
+ " the topic name is computed solely based on the namespace. The namespace may either have a collection name, or not:"
238+
+ "\n- If there is a collection name, then the topic name is computed"
239+
+ " by concatenating the database name and the collection name, separated by `topic.separator`."
240+
+ "\n- If there is no collection name, then the database name is used as the topic name."
241+
+ "\nExamples (`topic.separator` is assumed to be '-'):"
242+
+ "\n1. '{\"myDb\": \"topicTwo\", \"myDb.myColl\": \"topicOne\"}'"
243+
+ " The 'myDb.myColl' namespace is mapped to the 'topicOne' topic name."
244+
+ " All other namespaces with the 'myDb' database name are mapped"
245+
+ " either to the 'topicTwo-<collection name>' topic name, if they have a collection name,"
246+
+ " or to the 'topicTwo' topic otherwise."
247+
+ " All other namespaces are mapped"
248+
+ " either to the '<database name>-<collection name>' topic name, if they have a collection name,"
249+
+ " or to the '<database name>' topic otherwise."
250+
+ "\n2. '{\"/myDb(?:\\\\..*)?\": \"topicTwo{sep_coll}\", \"*\": \"topicThree\", \"myDb.myColl\": \"topicOne\"}'"
251+
+ " The regex namespace pattern matches any namespace with the 'myDb' database name."
252+
+ " The 'myDb.myColl' namespace is mapped to the 'topicOne' topic name."
253+
+ " All other namespaces with the 'myDb' database name are mapped"
254+
+ " either to the 'topicTwo-<collection name>' topic name, if they have a collection name,"
255+
+ " or to the 'topicTwo' topic otherwise."
256+
+ " All other namespaces are mapped to the 'topicThree' topic name.";
196257
private static final String TOPIC_NAMESPACE_MAP_DEFAULT = EMPTY_STRING;
197258

198259
public static final String PIPELINE_CONFIG = "pipeline";

0 commit comments

Comments
 (0)