diff --git a/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/RestSinkConnectorConfig.java b/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/RestSinkConnectorConfig.java index dbe601f..66df717 100644 --- a/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/RestSinkConnectorConfig.java +++ b/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/RestSinkConnectorConfig.java @@ -4,6 +4,7 @@ import com.tm.kafka.connect.rest.http.executor.RequestExecutor; import com.tm.kafka.connect.rest.http.handler.DefaultResponseHandler; import com.tm.kafka.connect.rest.http.handler.ResponseHandler; +import com.tm.kafka.connect.rest.errors.DLQReporter; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -16,7 +17,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.Properties; +import static com.tm.kafka.connect.rest.errors.DLQReporter.DLQ_TOPIC_CONFIG; import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; @@ -68,6 +71,11 @@ public class RestSinkConnectorConfig extends AbstractConfig { private static final String SINK_DATE_FORMAT_DOC = "Date format for interpolation. The default is MM-dd-yyyy HH:mm:ss.SSS"; private static final String SINK_DATE_FORMAT_DEFAULT = "MM-dd-yyyy HH:mm:ss.SSS"; + private static final String SINK_DLQ_KAFKA_ENABLE_CONFIG = "rest.deadletter.kafka.enabled"; + private static final String SINK_DLQ_KAFKA_ENABLE_DISPLAY = "HTTP request error to kafka"; + private static final String SINK_DLQ_KAFKA_ENABLE_DOC = "Enable deadletter queue support for error records"; + private static final Boolean SINK_DLQ_KAFKA_ENABLE_DEFAULT = false; + private static final long SINK_RETRY_BACKOFF_DEFAULT = 5000L; @@ -184,6 +192,16 @@ public static ConfigDef conf() { ++orderInGroup, ConfigDef.Width.NONE, SINK_DATE_FORMAT_DISPLAY) + + .define(SINK_DLQ_KAFKA_ENABLE_CONFIG, + Type.BOOLEAN, + SINK_DLQ_KAFKA_ENABLE_DEFAULT, + Importance.LOW, + SINK_DLQ_KAFKA_ENABLE_DOC, + group, + ++orderInGroup, + ConfigDef.Width.NONE, + SINK_DLQ_KAFKA_ENABLE_DISPLAY) ; } @@ -222,6 +240,17 @@ public ResponseHandler getResponseHandler() { ); } + public Boolean isDlqKafkaEnabled() { + return this.getBoolean(SINK_DLQ_KAFKA_ENABLE_CONFIG); + } + + public DLQReporter getDLQReporter() { + String topic = (String) this.originals().get(DLQ_TOPIC_CONFIG); + Properties props = new Properties(); + props.putAll(this.originalsWithPrefix("producer.")); + return new DLQReporter(topic, props); + } + private static class MethodRecommender implements ConfigDef.Recommender { @Override public List validValues(String name, Map connectorConfigs) { diff --git a/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/RestSinkTask.java b/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/RestSinkTask.java index 4b6d1c1..41a66d7 100644 --- a/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/RestSinkTask.java +++ b/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/RestSinkTask.java @@ -1,5 +1,6 @@ package com.tm.kafka.connect.rest; +import com.tm.kafka.connect.rest.errors.DLQReporter; import com.tm.kafka.connect.rest.http.Request; import com.tm.kafka.connect.rest.http.Response; import com.tm.kafka.connect.rest.http.executor.RequestExecutor; @@ -29,6 +30,7 @@ public class RestSinkTask extends SinkTask { private RequestExecutor executor; private ResponseHandler responseHandler; private String taskName = ""; + private DLQReporter errorReporter; @Override public void start(Map map) { @@ -40,6 +42,9 @@ public void start(Map map) { maxRetries = connectorConfig.getMaxRetries(); responseHandler = connectorConfig.getResponseHandler(); executor = connectorConfig.getRequestExecutor(); + if (connectorConfig.isDlqKafkaEnabled()) + errorReporter = connectorConfig.getDLQReporter(); + else errorReporter = null; } @Override @@ -74,6 +79,7 @@ public void put(Collection records) { } catch (RetriableException e) { log.error("HTTP call failed", e); increaseCounter(RETRIABLE_ERROR_METRIC, ctx); + if (retries == -1 && errorReporter != null) errorReporter.reportError(record, e); try { Thread.sleep(retryBackoff); log.error("Retrying"); @@ -83,6 +89,7 @@ public void put(Collection records) { } catch (Exception e) { log.error("HTTP call execution failed " + e.getMessage(), e); increaseCounter(UNRETRIABLE_ERROR_METRIC, ctx); + if (errorReporter != null) errorReporter.reportError(record, e); break; } } diff --git a/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/errors/DLQReporter.java b/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/errors/DLQReporter.java new file mode 100644 index 0000000..dc51e19 --- /dev/null +++ b/kafka-connect-rest-plugin/src/main/java/com/tm/kafka/connect/rest/errors/DLQReporter.java @@ -0,0 +1,102 @@ +package com.tm.kafka.connect.rest.errors; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +public class DLQReporter { + public static final String DLQ_TOPIC_CONFIG = "errors.deadletterqueue.topic.name"; + public static final String DEFAULT_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer"; + public static final String HEADER_PREFIX = "__connect.errors."; + public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name"; + public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message"; + public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace"; + private static final Logger log = LoggerFactory.getLogger(DLQReporter.class); + private String dlqTopic; + private Producer dlqProducer; + + public DLQReporter(String topic, Properties properties) { + //set default serializers if required + if (!properties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_SERIALIZER); + if (!properties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_SERIALIZER); + this.dlqTopic = topic; + this.dlqProducer = new KafkaProducer<>(properties); + } + + public DLQReporter() { + } + + public void setDlqTopic(String dlqTopic) { + this.dlqTopic = dlqTopic; + } + + public void setDlqProducer(Producer dlqProducer) { + this.dlqProducer = dlqProducer; + } + + public void reportError(ConnectRecord msg, Exception e) { + ProducerRecord producerRecord; + if (msg.timestamp() == RecordBatch.NO_TIMESTAMP) { + producerRecord = new ProducerRecord<>(dlqTopic, null, toBytes(msg.key()), toBytes(msg.value())); + + } else { + producerRecord = new ProducerRecord<>(dlqTopic, null, msg.timestamp(), toBytes(msg.key()), toBytes(msg.value())); + } + populateErrorHeaders(producerRecord, e); + this.dlqProducer.send(producerRecord, (metadata, exception) -> { + if (exception != null) { + log.error("Could not produce message to dead letter queue. topic=" + dlqTopic, exception); + } + }); + } + + private byte[] toBytes(Object value) { + if (value != null) { + return ((String) value).getBytes(StandardCharsets.UTF_8); + } else { + return null; + } + } + + private void populateErrorHeaders(ProducerRecord producerRecord, Exception e) { + Headers headers = producerRecord.headers(); + if (e != null) { + headers.add(ERROR_HEADER_EXCEPTION, toBytes(e.getClass().getName())); + headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes(e.getMessage())); + byte[] trace; + if ((trace = stacktrace(e)) != null) { + headers.add(ERROR_HEADER_EXCEPTION_STACK_TRACE, trace); + } + } + } + + private byte[] stacktrace(Throwable error) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + PrintStream stream = new PrintStream(bos, true, "UTF-8"); + error.printStackTrace(stream); + bos.close(); + return bos.toByteArray(); + } catch (IOException e) { + log.error("Could not serialize stacktrace.", e); + } + return null; + } + +} + + diff --git a/kafka-connect-rest-plugin/src/test/java/com/tm/kafka/connect/rest/errors/DLQReporterTest.java b/kafka-connect-rest-plugin/src/test/java/com/tm/kafka/connect/rest/errors/DLQReporterTest.java new file mode 100644 index 0000000..14b7b64 --- /dev/null +++ b/kafka-connect-rest-plugin/src/test/java/com/tm/kafka/connect/rest/errors/DLQReporterTest.java @@ -0,0 +1,54 @@ +package com.tm.kafka.connect.rest.errors; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.errors.RetriableException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class DLQReporterTest { + @Mock + Producer producer; + + @Mock + ConnectRecord record; + + DLQReporter reporter; + + @Before + public void setUp() { + reporter = new DLQReporter(); + reporter.setDlqTopic("test-topic"); + reporter.setDlqProducer(producer); + } + + @Test + public void populateHeadersAndSendToKafka() throws Exception { + Exception e = new RetriableException("Test"); + ArgumentCaptor captor = ArgumentCaptor.forClass(ProducerRecord.class); + reporter.reportError(record, e); + verify(producer).send(captor.capture(), any()); + List HEADERS_KEY = Arrays.asList(DLQReporter.ERROR_HEADER_EXCEPTION, DLQReporter.ERROR_HEADER_EXCEPTION_MESSAGE, DLQReporter.ERROR_HEADER_EXCEPTION_STACK_TRACE); + ProducerRecord r = captor.getValue(); + Map errHeaders = new HashMap<>(); + r.headers().forEach(header -> errHeaders.put(header.key(), new String(header.value()))); + HEADERS_KEY.forEach(key -> Assert.assertTrue(errHeaders.containsKey(key))); + Assert.assertEquals("org.apache.kafka.connect.errors.RetriableException", errHeaders.get(DLQReporter.ERROR_HEADER_EXCEPTION)); + Assert.assertEquals("Test", errHeaders.get(DLQReporter.ERROR_HEADER_EXCEPTION_MESSAGE)); + } +}