Skip to content

Commit af32ff2

Browse files
committed
refactored IdStrategy implementations
thereby introduced customisation options to add further strategies by means of config property documentation / readme updates
1 parent 6af5c17 commit af32ff2

14 files changed

+158
-130
lines changed

README.md

+42-24
Large diffs are not rendered by default.

config/MongoDbSinkConnector.properties

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ mongodb.max.num.retries=3
2525
mongodb.retries.defer.timeout=5000
2626
mongodb.value.projection.type=none
2727
mongodb.value.projection.list=
28-
mongodb.document.id.strategy=objectid
28+
mongodb.document.id.strategy=at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy
29+
mongodb.document.id.strategies=
2930
mongodb.key.projection.type=none
3031
mongodb.key.projection.list=
3132
mongodb.field.renamer.mapping=[]

src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java

+58-41
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@
3232
import org.apache.kafka.common.config.ConfigDef.Type;
3333
import org.apache.kafka.common.config.ConfigException;
3434
import org.apache.kafka.connect.errors.ConnectException;
35+
import org.bson.conversions.Bson;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

3839
import java.io.IOException;
40+
import java.lang.reflect.InvocationTargetException;
3941
import java.util.*;
4042
import java.util.stream.Collectors;
4143

@@ -47,17 +49,6 @@ public enum FieldProjectionTypes {
4749
WHITELIST
4850
}
4951

50-
public enum IdStrategyModes {
51-
OBJECTID,
52-
UUID,
53-
KAFKAMETA,
54-
FULLKEY,
55-
PARTIALKEY,
56-
PARTIALVALUE,
57-
PROVIDEDINKEY,
58-
PROVIDEDINVALUE
59-
}
60-
6152
public static final String FIELD_LIST_SPLIT_CHAR = ",";
6253

6354
public static final String MONGODB_URI_SCHEME = "mongodb://";
@@ -75,7 +66,8 @@ public enum IdStrategyModes {
7566
public static final int MONGODB_RETRIES_DEFER_TIMEOUT_DEFAULT = 10000;
7667
public static final String MONGODB_VALUE_PROJECTION_TYPE_DEFAULT = "none";
7768
public static final String MONGODB_VALUE_PROJECTION_LIST_DEFAULT = "";
78-
public static final String MONGODB_DOCUMENT_ID_STRATEGY_DEFAULT = "objectid";
69+
public static final String MONGODB_DOCUMENT_ID_STRATEGY_DEFAULT = "at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy";
70+
public static final String MONGODB_DOCUMENT_ID_STRATEGIES_DEFAULT = "";
7971
public static final String MONGODB_KEY_PROJECTION_TYPE_DEFAULT = "none";
8072
public static final String MONGODB_KEY_PROJECTION_LIST_DEFAULT = "";
8173
public static final String MONGODB_FIELD_RENAMER_MAPPING_DEFAULT = "[]";
@@ -125,7 +117,10 @@ public enum IdStrategyModes {
125117
private static final String MONGODB_VALUE_PROJECTION_LIST_DOC = "comma separated list of field names for value projection";
126118

127119
public static final String MONGODB_DOCUMENT_ID_STRATEGY_CONF = "mongodb.document.id.strategy";
128-
private static final String MONGODB_DOCUMENT_ID_STRATEGY_CONF_DOC = "which strategy to use for a unique document id (_id)";
120+
private static final String MONGODB_DOCUMENT_ID_STRATEGY_CONF_DOC = "class name of strategy to use for generating a unique document id (_id)";
121+
122+
public static final String MONGODB_DOCUMENT_ID_STRATEGIES_CONF = "mongodb.document.id.strategies";
123+
private static final String MONGODB_DOCUMENT_ID_STRATEGIES_CONF_DOC = "comma separated list of custom strategy classes to register for usage";
129124

130125
public static final String MONGODB_KEY_PROJECTION_TYPE_CONF = "mongodb.key.projection.type";
131126
private static final String MONGODB_KEY_PROJECTION_TYPE_DOC = "whether or not and which key projection to use";
@@ -169,7 +164,8 @@ public static ConfigDef conf() {
169164
.define(MONGODB_RETRIES_DEFER_TIMEOUT_CONF, Type.INT, MONGODB_RETRIES_DEFER_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(0), Importance.MEDIUM, MONGODB_RETRIES_DEFER_TIME_OUT_DOC)
170165
.define(MONGODB_VALUE_PROJECTION_TYPE_CONF, Type.STRING, MONGODB_VALUE_PROJECTION_TYPE_DEFAULT, EnumValidator.in(FieldProjectionTypes.values()), Importance.LOW, MONGODB_VALUE_PROJECTION_TYPE_DOC)
171166
.define(MONGODB_VALUE_PROJECTION_LIST_CONF, Type.STRING, MONGODB_VALUE_PROJECTION_LIST_DEFAULT, Importance.LOW, MONGODB_VALUE_PROJECTION_LIST_DOC)
172-
.define(MONGODB_DOCUMENT_ID_STRATEGY_CONF, Type.STRING, MONGODB_DOCUMENT_ID_STRATEGY_DEFAULT, EnumValidator.in(IdStrategyModes.values()), Importance.HIGH, MONGODB_DOCUMENT_ID_STRATEGY_CONF_DOC)
167+
.define(MONGODB_DOCUMENT_ID_STRATEGY_CONF, Type.STRING, MONGODB_DOCUMENT_ID_STRATEGY_DEFAULT, Importance.HIGH, MONGODB_DOCUMENT_ID_STRATEGY_CONF_DOC)
168+
.define(MONGODB_DOCUMENT_ID_STRATEGIES_CONF, Type.STRING, MONGODB_DOCUMENT_ID_STRATEGIES_DEFAULT, Importance.LOW, MONGODB_DOCUMENT_ID_STRATEGIES_CONF_DOC)
173169
.define(MONGODB_KEY_PROJECTION_TYPE_CONF, Type.STRING, MONGODB_KEY_PROJECTION_TYPE_DEFAULT, EnumValidator.in(FieldProjectionTypes.values()), Importance.LOW, MONGODB_KEY_PROJECTION_TYPE_DOC)
174170
.define(MONGODB_KEY_PROJECTION_LIST_CONF, Type.STRING, MONGODB_KEY_PROJECTION_LIST_DEFAULT, Importance.LOW, MONGODB_KEY_PROJECTION_LIST_DOC)
175171
.define(MONGODB_FIELD_RENAMER_MAPPING, Type.STRING, MONGODB_FIELD_RENAMER_MAPPING_DEFAULT, Importance.LOW, MONGODB_FIELD_RENAMER_MAPPING_DOC)
@@ -372,29 +368,50 @@ private Set<String> buildProjectionList(String projectionType, String fieldList)
372368
throw new ConfigException("error: invalid settings for "+ projectionType);
373369
}
374370

375-
public AbstractIdStrategy getIdStrategy() {
376-
377-
IdStrategyModes mode = IdStrategyModes
378-
.valueOf(getString(MONGODB_DOCUMENT_ID_STRATEGY_CONF).toUpperCase());
379-
380-
switch (mode) {
381-
case OBJECTID:
382-
return new BsonOidStrategy();
383-
case UUID:
384-
return new UuidStrategy();
385-
case KAFKAMETA:
386-
return new KafkaMetaDataStrategy();
387-
case FULLKEY:
388-
return new FullKeyStrategy();
389-
case PARTIALKEY:
390-
return new PartialKeyStrategy(this.getKeyProjector());
391-
case PARTIALVALUE:
392-
return new PartialValueStrategy(this.getKeyProjector());
393-
case PROVIDEDINKEY:
394-
case PROVIDEDINVALUE:
395-
return new ProvidedStrategy(mode);
396-
default:
397-
throw new ConfigException("error: unexpected IdStrategyMode "+mode.name());
371+
public static Set<String> getPredefinedStrategyClassNames() {
372+
Set<String> strategies = new HashSet<String>();
373+
strategies.add(BsonOidStrategy.class.getName());
374+
strategies.add(FullKeyStrategy.class.getName());
375+
strategies.add(KafkaMetaDataStrategy.class.getName());
376+
strategies.add(PartialKeyStrategy.class.getName());
377+
strategies.add(PartialValueStrategy.class.getName());
378+
strategies.add(ProvidedInKeyStrategy.class.getName());
379+
strategies.add(ProvidedInValueStrategy.class.getName());
380+
strategies.add(UuidStrategy.class.getName());
381+
return strategies;
382+
}
383+
384+
public IdStrategy getIdStrategy() {
385+
386+
Set<String> predefinedStrategies = getPredefinedStrategyClassNames();
387+
388+
Set<String> customStrategies = Arrays.asList(getString(MONGODB_DOCUMENT_ID_STRATEGIES_CONF)
389+
.split(FIELD_LIST_SPLIT_CHAR))
390+
.stream().filter(s -> !s.isEmpty()).collect(Collectors.toSet());
391+
392+
predefinedStrategies.addAll(customStrategies);
393+
394+
String strategy = getString(MONGODB_DOCUMENT_ID_STRATEGY_CONF);
395+
396+
if(!predefinedStrategies.contains(strategy)) {
397+
throw new ConfigException("error: unkown id strategy "+strategy);
398+
}
399+
400+
try {
401+
if(strategy.equals(PartialKeyStrategy.class.getName())
402+
|| strategy.equals(PartialValueStrategy.class.getName())) {
403+
return (IdStrategy)Class.forName(strategy)
404+
.getConstructor(FieldProjector.class)
405+
.newInstance(this.getKeyProjector());
406+
}
407+
return (IdStrategy)Class.forName(strategy)
408+
.getConstructor().newInstance();
409+
} catch (ReflectiveOperationException e) {
410+
throw new ConfigException(e.getMessage(),e);
411+
} catch (ClassCastException e) {
412+
throw new ConfigException("error: specified class "+ strategy
413+
+ " violates the contract since it doesn't implement " +
414+
IdStrategy.class);
398415
}
399416

400417
}
@@ -405,14 +422,14 @@ public FieldProjector getKeyProjector() {
405422
.equalsIgnoreCase(FieldProjectionTypes.BLACKLIST.name())) {
406423

407424
if(getString(MONGODB_DOCUMENT_ID_STRATEGY_CONF).
408-
equalsIgnoreCase(IdStrategyModes.PARTIALVALUE.name())) {
425+
equals(PartialValueStrategy.class.getName())) {
409426

410427
return new BlacklistValueProjector(this,
411428
this.getKeyProjectionList(),cfg -> cfg.isUsingBlacklistKeyProjection());
412429
}
413430

414431
if(getString(MONGODB_DOCUMENT_ID_STRATEGY_CONF).
415-
equalsIgnoreCase(IdStrategyModes.PARTIALKEY.name())) {
432+
equals(PartialKeyStrategy.class.getName())) {
416433

417434
return new BlacklistKeyProjector(this,
418435
this.getKeyProjectionList(),cfg -> cfg.isUsingBlacklistKeyProjection());
@@ -423,14 +440,14 @@ public FieldProjector getKeyProjector() {
423440
.equalsIgnoreCase(FieldProjectionTypes.WHITELIST.name())) {
424441

425442
if(getString(MONGODB_DOCUMENT_ID_STRATEGY_CONF).
426-
equalsIgnoreCase(IdStrategyModes.PARTIALVALUE.name())) {
443+
equals(PartialValueStrategy.class.getName())) {
427444

428445
return new WhitelistValueProjector(this,
429446
this.getKeyProjectionList(),cfg -> cfg.isUsingWhitelistKeyProjection());
430447
}
431448

432449
if(getString(MONGODB_DOCUMENT_ID_STRATEGY_CONF).
433-
equalsIgnoreCase(IdStrategyModes.PARTIALKEY.name())) {
450+
equals(PartialKeyStrategy.class.getName())) {
434451

435452
return new WhitelistKeyProjector(this,
436453
this.getKeyProjectionList(),cfg -> cfg.isUsingWhitelistKeyProjection());

src/main/java/at/grahsl/kafka/connect/mongodb/processor/id/strategy/BsonOidStrategy.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,13 @@
1616

1717
package at.grahsl.kafka.connect.mongodb.processor.id.strategy;
1818

19-
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
2019
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
2120
import org.apache.kafka.connect.sink.SinkRecord;
2221
import org.bson.BsonObjectId;
2322
import org.bson.BsonValue;
2423
import org.bson.types.ObjectId;
2524

26-
public class BsonOidStrategy extends AbstractIdStrategy {
27-
28-
public BsonOidStrategy() {
29-
super(MongoDbSinkConnectorConfig.IdStrategyModes.OBJECTID);
30-
}
25+
public class BsonOidStrategy implements IdStrategy {
3126

3227
@Override
3328
public BsonValue generateId(SinkDocument doc, SinkRecord orig) {

src/main/java/at/grahsl/kafka/connect/mongodb/processor/id/strategy/FullKeyStrategy.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,12 @@
1616

1717
package at.grahsl.kafka.connect.mongodb.processor.id.strategy;
1818

19-
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
2019
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
2120
import org.apache.kafka.connect.sink.SinkRecord;
2221
import org.bson.BsonDocument;
2322
import org.bson.BsonValue;
2423

25-
public class FullKeyStrategy extends AbstractIdStrategy {
26-
27-
public FullKeyStrategy() {
28-
super(MongoDbSinkConnectorConfig.IdStrategyModes.FULLKEY);
29-
}
24+
public class FullKeyStrategy implements IdStrategy {
3025

3126
@Override
3227
public BsonValue generateId(SinkDocument doc, SinkRecord orig) {

src/main/java/at/grahsl/kafka/connect/mongodb/processor/id/strategy/KafkaMetaDataStrategy.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,15 @@
1616

1717
package at.grahsl.kafka.connect.mongodb.processor.id.strategy;
1818

19-
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
2019
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
2120
import org.apache.kafka.connect.sink.SinkRecord;
2221
import org.bson.BsonString;
2322
import org.bson.BsonValue;
2423

25-
public class KafkaMetaDataStrategy extends AbstractIdStrategy {
24+
public class KafkaMetaDataStrategy implements IdStrategy {
2625

2726
public static final String DELIMITER = "#";
2827

29-
public KafkaMetaDataStrategy() {
30-
super(MongoDbSinkConnectorConfig.IdStrategyModes.KAFKAMETA);
31-
}
32-
3328
@Override
3429
public BsonValue generateId(SinkDocument doc, SinkRecord orig) {
3530

src/main/java/at/grahsl/kafka/connect/mongodb/processor/id/strategy/PartialKeyStrategy.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,17 @@
1616

1717
package at.grahsl.kafka.connect.mongodb.processor.id.strategy;
1818

19-
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
2019
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
2120
import at.grahsl.kafka.connect.mongodb.processor.field.projection.FieldProjector;
2221
import org.apache.kafka.connect.sink.SinkRecord;
2322
import org.bson.BsonDocument;
2423
import org.bson.BsonValue;
2524

26-
public class PartialKeyStrategy extends AbstractIdStrategy {
25+
public class PartialKeyStrategy implements IdStrategy {
2726

2827
private FieldProjector fieldProjector;
2928

3029
public PartialKeyStrategy(FieldProjector fieldProjector) {
31-
super(MongoDbSinkConnectorConfig.IdStrategyModes.PARTIALKEY);
3230
this.fieldProjector = fieldProjector;
3331
}
3432

src/main/java/at/grahsl/kafka/connect/mongodb/processor/id/strategy/PartialValueStrategy.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,17 @@
1616

1717
package at.grahsl.kafka.connect.mongodb.processor.id.strategy;
1818

19-
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
2019
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
2120
import at.grahsl.kafka.connect.mongodb.processor.field.projection.FieldProjector;
2221
import org.apache.kafka.connect.sink.SinkRecord;
2322
import org.bson.BsonDocument;
2423
import org.bson.BsonValue;
2524

26-
public class PartialValueStrategy extends AbstractIdStrategy {
25+
public class PartialValueStrategy implements IdStrategy {
2726

2827
private FieldProjector fieldProjector;
2928

3029
public PartialValueStrategy(FieldProjector fieldProjector) {
31-
super(MongoDbSinkConnectorConfig.IdStrategyModes.PARTIALVALUE);
3230
this.fieldProjector = fieldProjector;
3331
}
3432

src/main/java/at/grahsl/kafka/connect/mongodb/processor/id/strategy/AbstractIdStrategy.java src/main/java/at/grahsl/kafka/connect/mongodb/processor/id/strategy/ProvidedInKeyStrategy.java

+3-11
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,10 @@
1616

1717
package at.grahsl.kafka.connect.mongodb.processor.id.strategy;
1818

19-
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
19+
public class ProvidedInKeyStrategy extends ProvidedStrategy {
2020

21-
public abstract class AbstractIdStrategy implements IdStrategy {
22-
23-
private MongoDbSinkConnectorConfig.IdStrategyModes mode;
24-
25-
public AbstractIdStrategy(MongoDbSinkConnectorConfig.IdStrategyModes mode) {
26-
this.mode = mode;
27-
}
28-
29-
public MongoDbSinkConnectorConfig.IdStrategyModes getMode() {
30-
return mode;
21+
public ProvidedInKeyStrategy() {
22+
super(ProvidedIn.KEY);
3123
}
3224

3325
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) 2017. Hans-Peter Grahsl ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package at.grahsl.kafka.connect.mongodb.processor.id.strategy;
18+
19+
public class ProvidedInValueStrategy extends ProvidedStrategy {
20+
21+
public ProvidedInValueStrategy() {
22+
super(ProvidedIn.VALUE);
23+
}
24+
25+
}

src/main/java/at/grahsl/kafka/connect/mongodb/processor/id/strategy/ProvidedStrategy.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package at.grahsl.kafka.connect.mongodb.processor.id.strategy;
1818

19-
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
2019
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
2120
import com.mongodb.DBCollection;
2221
import org.apache.kafka.connect.errors.DataException;
@@ -27,22 +26,29 @@
2726

2827
import java.util.Optional;
2928

30-
public class ProvidedStrategy extends AbstractIdStrategy {
29+
public class ProvidedStrategy implements IdStrategy {
3130

32-
public ProvidedStrategy(MongoDbSinkConnectorConfig.IdStrategyModes mode) {
33-
super(mode);
31+
protected enum ProvidedIn {
32+
KEY,
33+
VALUE
34+
}
35+
36+
protected ProvidedIn where;
37+
38+
public ProvidedStrategy(ProvidedIn where) {
39+
this.where = where;
3440
}
3541

3642
@Override
3743
public BsonValue generateId(SinkDocument doc, SinkRecord orig) {
3844

3945
Optional<BsonDocument> bd = Optional.empty();
4046

41-
if(getMode().equals(MongoDbSinkConnectorConfig.IdStrategyModes.PROVIDEDINKEY)) {
47+
if(where.equals(ProvidedIn.KEY)) {
4248
bd = doc.getKeyDoc();
4349
}
4450

45-
if(getMode().equals(MongoDbSinkConnectorConfig.IdStrategyModes.PROVIDEDINVALUE)) {
51+
if(where.equals(ProvidedIn.VALUE)) {
4652
bd = doc.getValueDoc();
4753
}
4854

src/main/java/at/grahsl/kafka/connect/mongodb/processor/id/strategy/UuidStrategy.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,14 @@
1616

1717
package at.grahsl.kafka.connect.mongodb.processor.id.strategy;
1818

19-
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
2019
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
2120
import org.apache.kafka.connect.sink.SinkRecord;
2221
import org.bson.BsonString;
2322
import org.bson.BsonValue;
2423

2524
import java.util.UUID;
2625

27-
public class UuidStrategy extends AbstractIdStrategy {
28-
29-
public UuidStrategy() {
30-
super(MongoDbSinkConnectorConfig.IdStrategyModes.UUID);
31-
}
26+
public class UuidStrategy implements IdStrategy {
3227

3328
@Override
3429
public BsonValue generateId(SinkDocument doc, SinkRecord orig) {

0 commit comments

Comments
 (0)