Skip to content

Commit dd9253f

Browse files
authored
KAFKA-374: Implement an error handler to address transient database errors. (#159)
* work in progress. * KAFKA-374: Implement an error handler to address specific scenarios.
1 parent 2ffd620 commit dd9253f

File tree

4 files changed

+85
-15
lines changed

4 files changed

+85
-15
lines changed

src/main/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordData.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private <T> Optional<T> tryProcess(final Supplier<Optional<T>> supplier) {
108108
if (config.logErrors()) {
109109
LOGGER.error("Unable to process record {}", sinkRecord, e);
110110
}
111-
if (!config.tolerateErrors()) {
111+
if (!(config.tolerateErrors() || config.tolerateDataErrors())) {
112112
throw e;
113113
}
114114
}

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public enum UuidBsonFormat {
7979

8080
public enum ErrorTolerance {
8181
NONE,
82+
DATA,
8283
ALL;
8384

8485
public String value() {
@@ -323,7 +324,8 @@ public String value() {
323324
public static final String ERRORS_TOLERANCE_DOC =
324325
"Behavior for tolerating errors during connector operation. 'none' is the default value "
325326
+ "and signals that any error will result in an immediate connector task failure; 'all' "
326-
+ "changes the behavior to skip over problematic records.";
327+
+ "changes the behavior to skip over problematic records,"
328+
+ "'data' will try for network/server unreachable errors.";
327329

328330
public static final String OVERRIDE_ERRORS_TOLERANCE_CONFIG = "mongo.errors.tolerance";
329331
public static final String OVERRIDE_ERRORS_TOLERANCE_DOC =
@@ -465,7 +467,7 @@ public String getTopic() {
465467
}
466468

467469
boolean logErrors() {
468-
return !tolerateErrors()
470+
return !(tolerateErrors() || tolerateDataErrors())
469471
|| ConfigHelper.getOverrideOrFallback(
470472
this,
471473
AbstractConfig::getBoolean,
@@ -483,6 +485,16 @@ boolean tolerateErrors() {
483485
return ErrorTolerance.valueOf(errorsTolerance.toUpperCase()).equals(ErrorTolerance.ALL);
484486
}
485487

488+
boolean tolerateDataErrors() {
489+
String errorsTolerance =
490+
ConfigHelper.getOverrideOrFallback(
491+
this,
492+
AbstractConfig::getString,
493+
OVERRIDE_ERRORS_TOLERANCE_CONFIG,
494+
ERRORS_TOLERANCE_CONFIG);
495+
return ErrorTolerance.valueOf(errorsTolerance.toUpperCase()).equals(ErrorTolerance.DATA);
496+
}
497+
486498
public boolean isTimeseries() {
487499
return !getString(TIMESERIES_TIMEFIELD_CONFIG).trim().isEmpty();
488500
}

src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,17 @@ private void bulkWriteBatch(final List<MongoProcessedSinkRecordData> batch) {
166166
} catch (RuntimeException e) {
167167
statistics.getBatchWritesFailed().sample(writeTime.getElapsedTime().toMillis());
168168
statistics.getRecordsFailed().sample(batch.size());
169+
if (config.tolerateDataErrors() && !(e instanceof MongoBulkWriteException)) {
170+
throw new DataException("non Data Error, fail the connector.", e);
171+
}
169172
handleTolerableWriteException(
170173
batch.stream()
171174
.map(MongoProcessedSinkRecordData::getSinkRecord)
172175
.collect(Collectors.toList()),
173176
bulkWriteOrdered,
174177
e,
175178
config.logErrors(),
176-
config.tolerateErrors());
179+
config.tolerateErrors() || config.tolerateDataErrors());
177180
}
178181
checkRateLimit(config);
179182
}

src/test/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTaskTest.java

+66-11
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,7 @@ void put() {
133133
client.capturedBulkWrites().get(DEFAULT_NAMESPACE), errorReporter.reported());
134134
}
135135

136-
@Test
137-
void putTolerateAllPostProcessingError() {
138-
properties.put(MongoSinkTopicConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.ALL.value());
139-
MongoSinkConfig config = new MongoSinkConfig(properties);
136+
void testToleratePostProcessingError(final MongoSinkConfig config) {
140137
client.configureCapturing(DEFAULT_NAMESPACE);
141138
task = new StartedMongoSinkTask(config, client.mongoClient(), errorReporter);
142139
RecordsAndExpectations recordsAndExpectations =
@@ -153,6 +150,21 @@ void putTolerateAllPostProcessingError() {
153150
client.capturedBulkWrites().get(DEFAULT_NAMESPACE), errorReporter.reported());
154151
}
155152

153+
@Test
154+
void putTolerateAllPostProcessingError() {
155+
properties.put(MongoSinkTopicConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.ALL.value());
156+
MongoSinkConfig config = new MongoSinkConfig(properties);
157+
testToleratePostProcessingError(config);
158+
}
159+
160+
@Test
161+
void putTolerateDataPostProcessingError() {
162+
properties.put(
163+
MongoSinkTopicConfig.OVERRIDE_ERRORS_TOLERANCE_CONFIG, ErrorTolerance.DATA.value());
164+
MongoSinkConfig config = new MongoSinkConfig(properties);
165+
testToleratePostProcessingError(config);
166+
}
167+
156168
/**
157169
* {@link StartedMongoSinkTask#put(Collection)} must report not only {@linkplain
158170
* #putTolerateAllPostProcessingError() post-processing exceptions} and {@link
@@ -229,6 +241,34 @@ void putTolerateNoneWriteError() {
229241
client.capturedBulkWrites().get(DEFAULT_NAMESPACE), errorReporter.reported());
230242
}
231243

244+
@Test
245+
void putTolerateDataWriteError() {
246+
properties.put(
247+
MongoSinkTopicConfig.OVERRIDE_ERRORS_TOLERANCE_CONFIG, ErrorTolerance.DATA.value());
248+
MongoSinkConfig config = new MongoSinkConfig(properties);
249+
client.configureCapturing(
250+
DEFAULT_NAMESPACE,
251+
collection ->
252+
when(collection.bulkWrite(anyList(), any(BulkWriteOptions.class)))
253+
// batch1
254+
.thenThrow(new MongoCommandException(new BsonDocument(), new ServerAddress()))
255+
// batch2
256+
.thenReturn(BulkWriteResult.unacknowledged()));
257+
task = new StartedMongoSinkTask(config, client.mongoClient(), errorReporter);
258+
RecordsAndExpectations recordsAndExpectations =
259+
new RecordsAndExpectations(
260+
asList(
261+
// batch1
262+
Records.simpleValid(TEST_TOPIC, 0),
263+
// batch2
264+
Records.simpleValid(TEST_TOPIC2, 1)),
265+
singletonList(0),
266+
emptyList());
267+
assertThrows(DataException.class, () -> task.put(recordsAndExpectations.records()));
268+
recordsAndExpectations.assertExpectations(
269+
client.capturedBulkWrites().get(DEFAULT_NAMESPACE), errorReporter.reported());
270+
}
271+
232272
@Test
233273
void putTolerateAllOrderedWriteError() {
234274
properties.put(MongoSinkTopicConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.ALL.value());
@@ -281,13 +321,7 @@ void putTolerateAllOrderedWriteError() {
281321
client.capturedBulkWrites().get(DEFAULT_NAMESPACE), errorReporter.reported());
282322
}
283323

284-
@Test
285-
void putTolerateAllUnorderedWriteError() {
286-
properties.put(MongoSinkTopicConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.ALL.value());
287-
boolean bulkWriteOrdered = false;
288-
properties.put(
289-
MongoSinkTopicConfig.BULK_WRITE_ORDERED_CONFIG, String.valueOf(bulkWriteOrdered));
290-
MongoSinkConfig config = new MongoSinkConfig(properties);
324+
void testTolerateOrderedWriteError(final MongoSinkConfig config, final boolean bulkWriteOrdered) {
291325
client.configureCapturing(
292326
DEFAULT_NAMESPACE,
293327
collection ->
@@ -336,6 +370,27 @@ void putTolerateAllUnorderedWriteError() {
336370
client.capturedBulkWrites().get(DEFAULT_NAMESPACE), errorReporter.reported());
337371
}
338372

373+
@Test
374+
void putTolerateAllUnorderedWriteError() {
375+
properties.put(MongoSinkTopicConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.ALL.value());
376+
boolean bulkWriteOrdered = false;
377+
properties.put(
378+
MongoSinkTopicConfig.BULK_WRITE_ORDERED_CONFIG, String.valueOf(bulkWriteOrdered));
379+
MongoSinkConfig config = new MongoSinkConfig(properties);
380+
testTolerateOrderedWriteError(config, bulkWriteOrdered);
381+
}
382+
383+
@Test
384+
void putTolerateDataUnorderedWriteError() {
385+
properties.put(
386+
MongoSinkTopicConfig.OVERRIDE_ERRORS_TOLERANCE_CONFIG, ErrorTolerance.DATA.value());
387+
boolean bulkWriteOrdered = false;
388+
properties.put(
389+
MongoSinkTopicConfig.BULK_WRITE_ORDERED_CONFIG, String.valueOf(bulkWriteOrdered));
390+
MongoSinkConfig config = new MongoSinkConfig(properties);
391+
testTolerateOrderedWriteError(config, bulkWriteOrdered);
392+
}
393+
339394
@SuppressWarnings("unchecked")
340395
private static <T> T cast(final Object o) {
341396
return (T) o;

0 commit comments

Comments
 (0)