Skip to content

Commit 54d12b7

Browse files
authored
Mongo sink connector must tolerate the ErrantRecordReporter being not available (#100)
KAFKA-286
1 parent 31e57ec commit 54d12b7

File tree

6 files changed

+44
-22
lines changed

6 files changed

+44
-22
lines changed

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkRecordProcessor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,19 @@
2222
import java.util.Collection;
2323
import java.util.List;
2424

25-
import org.apache.kafka.connect.sink.ErrantRecordReporter;
2625
import org.apache.kafka.connect.sink.SinkRecord;
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
2928

29+
import com.mongodb.kafka.connect.sink.dlq.ErrorReporter;
30+
3031
final class MongoSinkRecordProcessor {
3132
private static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkRecordProcessor.class);
3233

3334
static List<List<MongoProcessedSinkRecordData>> orderedGroupByTopicAndNamespace(
3435
final Collection<SinkRecord> records,
3536
final MongoSinkConfig sinkConfig,
36-
final ErrantRecordReporter errorReporter) {
37+
final ErrorReporter errorReporter) {
3738
LOGGER.debug("Number of sink records to process: {}", records.size());
3839

3940
List<List<MongoProcessedSinkRecordData>> orderedProcessedSinkRecordData = new ArrayList<>();

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTask.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525

2626
import java.util.Collection;
2727
import java.util.Map;
28-
import java.util.concurrent.CompletableFuture;
29-
import java.util.concurrent.Future;
3028

3129
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3230
import org.apache.kafka.common.TopicPartition;
@@ -43,6 +41,7 @@
4341
import com.mongodb.client.MongoClients;
4442

4543
import com.mongodb.kafka.connect.Versions;
44+
import com.mongodb.kafka.connect.sink.dlq.ErrorReporter;
4645
import com.mongodb.kafka.connect.util.VisibleForTesting;
4746

4847
public class MongoSinkTask extends SinkTask {
@@ -119,13 +118,13 @@ public void stop() {
119118
}
120119
}
121120

122-
private ErrantRecordReporter createErrorReporter() {
123-
ErrantRecordReporter result = nopErrorReporter();
121+
private ErrorReporter createErrorReporter() {
122+
ErrorReporter result = nopErrorReporter();
124123
if (context != null) {
125124
try {
126125
ErrantRecordReporter errantRecordReporter = context.errantRecordReporter();
127126
if (errantRecordReporter != null) {
128-
result = errantRecordReporter;
127+
result = errantRecordReporter::report;
129128
} else {
130129
LOGGER.info("Errant record reporter not configured.");
131130
}
@@ -138,9 +137,8 @@ private ErrantRecordReporter createErrorReporter() {
138137
}
139138

140139
@VisibleForTesting(otherwise = PRIVATE)
141-
static ErrantRecordReporter nopErrorReporter() {
142-
Future<Void> completedFuture = CompletableFuture.completedFuture(null);
143-
return (record, e) -> completedFuture;
140+
static ErrorReporter nopErrorReporter() {
141+
return (record, e) -> {};
144142
}
145143

146144
private static MongoClient createMongoClient(final MongoSinkConfig sinkConfig) {

src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.stream.Collectors;
2828

2929
import org.apache.kafka.connect.errors.DataException;
30-
import org.apache.kafka.connect.sink.ErrantRecordReporter;
3130
import org.apache.kafka.connect.sink.SinkRecord;
3231
import org.slf4j.Logger;
3332
import org.slf4j.LoggerFactory;
@@ -42,19 +41,20 @@
4241
import com.mongodb.client.model.WriteModel;
4342

4443
import com.mongodb.kafka.connect.sink.dlq.AnalyzedBatchFailedWithBulkWriteException;
44+
import com.mongodb.kafka.connect.sink.dlq.ErrorReporter;
4545

4646
public final class StartedMongoSinkTask {
4747
private static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkTask.class);
4848

4949
private final MongoSinkConfig sinkConfig;
5050
private final MongoClient mongoClient;
51-
private final ErrantRecordReporter errorReporter;
51+
private final ErrorReporter errorReporter;
5252
private final Set<MongoNamespace> checkedTimeseriesNamespaces;
5353

5454
StartedMongoSinkTask(
5555
final MongoSinkConfig sinkConfig,
5656
final MongoClient mongoClient,
57-
final ErrantRecordReporter errorReporter) {
57+
final ErrorReporter errorReporter) {
5858
this.sinkConfig = sinkConfig;
5959
this.mongoClient = mongoClient;
6060
this.errorReporter = errorReporter;

src/main/java/com/mongodb/kafka/connect/sink/dlq/AnalyzedBatchFailedWithBulkWriteException.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Map;
2727
import java.util.stream.IntStream;
2828

29-
import org.apache.kafka.connect.sink.ErrantRecordReporter;
3029
import org.apache.kafka.connect.sink.SinkRecord;
3130

3231
import com.mongodb.MongoBulkWriteException;
@@ -44,7 +43,7 @@
4443
public final class AnalyzedBatchFailedWithBulkWriteException {
4544
private final List<SinkRecord> batch;
4645
private final MongoBulkWriteException e;
47-
private final ErrantRecordReporter errorReporter;
46+
private final ErrorReporter errorReporter;
4847
private final Logger logger;
4948
private final Map<Integer, Map.Entry<SinkRecord, WriteException>> recordsFailedWithWriteError =
5049
new HashMap<>();
@@ -57,7 +56,7 @@ public AnalyzedBatchFailedWithBulkWriteException(
5756
final List<SinkRecord> batch,
5857
final boolean ordered,
5958
final MongoBulkWriteException e,
60-
final ErrantRecordReporter errorReporter,
59+
final ErrorReporter errorReporter,
6160
final Logger logger) {
6261
this.batch = batch;
6362
this.e = e;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.kafka.connect.sink.dlq;
17+
18+
import org.apache.kafka.connect.sink.ErrantRecordReporter;
19+
import org.apache.kafka.connect.sink.SinkRecord;
20+
21+
/**
22+
* This interface exists to avoid exposing our code to {@link ErrantRecordReporter}. This allows
23+
* tolerating situations when the class {@link ErrantRecordReporter} is not available.
24+
*/
25+
public interface ErrorReporter {
26+
void report(SinkRecord record, Exception e);
27+
}

src/test/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTaskTest.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,13 @@
4848
import java.util.List;
4949
import java.util.Map;
5050
import java.util.Set;
51-
import java.util.concurrent.CompletableFuture;
52-
import java.util.concurrent.Future;
5351
import java.util.function.Consumer;
5452
import java.util.function.Function;
5553
import java.util.stream.Collectors;
5654
import java.util.stream.IntStream;
5755

5856
import org.apache.kafka.connect.data.Schema;
5957
import org.apache.kafka.connect.errors.DataException;
60-
import org.apache.kafka.connect.sink.ErrantRecordReporter;
6158
import org.apache.kafka.connect.sink.SinkRecord;
6259
import org.junit.jupiter.api.Assertions;
6360
import org.junit.jupiter.api.BeforeEach;
@@ -81,6 +78,7 @@
8178
import com.mongodb.client.model.WriteModel;
8279

8380
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ErrorTolerance;
81+
import com.mongodb.kafka.connect.sink.dlq.ErrorReporter;
8482
import com.mongodb.kafka.connect.sink.dlq.WriteConcernException;
8583
import com.mongodb.kafka.connect.sink.dlq.WriteException;
8684
import com.mongodb.kafka.connect.sink.dlq.WriteSkippedException;
@@ -357,13 +355,12 @@ static boolean match(
357355
}
358356
}
359357

360-
private static final class InMemoryErrorReporter implements ErrantRecordReporter {
358+
private static final class InMemoryErrorReporter implements ErrorReporter {
361359
private final List<ReportedData> reported = new ArrayList<>();
362360

363361
@Override
364-
public Future<Void> report(final SinkRecord record, final Throwable e) {
362+
public void report(final SinkRecord record, final Exception e) {
365363
reported.add(new ReportedData(record, e));
366-
return CompletableFuture.completedFuture(null);
367364
}
368365

369366
List<ReportedData> reported() {

0 commit comments

Comments
 (0)