From 6064cc2b36fcd8179e9ae2d97579e8da6fa6c902 Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Thu, 6 Feb 2025 15:23:49 +0530 Subject: [PATCH] feat: support of errors.tolerance - Added a IT for error tolerance - Added conditions with in JSONRecordBuilder for error tolerance --- pom.xml | 6 ++++ .../builders/JsonRecordBuilderIT.java | 30 +++++++++++++++++++ .../connect/mqsource/MQSourceTask.java | 2 ++ .../mqsource/builders/JsonRecordBuilder.java | 22 ++++++++++++-- 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 94e50e4..092d0f9 100644 --- a/pom.xml +++ b/pom.xml @@ -54,6 +54,12 @@ 3.7.1 provided + + org.apache.kafka + connect-runtime + 3.7.1 + provided + javax.jms diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java index fd8f055..b0e1a6d 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,6 +28,7 @@ import javax.jms.MapMessage; import javax.jms.TextMessage; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; @@ -102,4 +104,32 @@ public void buildFromJmsMapMessage() throws Exception { // verify the exception assertEquals("Unsupported JMS message type", exc.getMessage()); } + + @Test + public void buildFromJmsTestJsonError() throws Exception { + // create MQ message + final TextMessage message = getJmsContext().createTextMessage("Not a valid json string"); + + // use the builder to convert it to a Kafka record + final JsonRecordBuilder builder = new JsonRecordBuilder(); + final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message)); + System.out.println(exec); + System.out.println(exec.getMessage()); + } + + @Test + public void buildFromJmsTestErrorTolerance() throws Exception { + // create MQ message + final TextMessage message = getJmsContext().createTextMessage("Not a valid json string"); + + // use the builder to convert it to a Kafka record + final JsonRecordBuilder builder = new JsonRecordBuilder(); + final HashMap config = new HashMap(); + config.put("errors.tolerance", "all"); + builder.configure(config); + final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message); + assertNull(record.key()); + assertNull(record.valueSchema()); + verifyJsonMap((Map) record.value()); + } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index 8126e48..7f3a501 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -363,6 +363,8 @@ private List internalPoll() throws InterruptedException, JMSRuntim final List sourceRecordList = messageList.stream() .peek(saveMessageID(msgIds)) .map(message -> reader.toSourceRecord(message, sourceQueueConfig.isMqMessageBodyJms(), sourceOffset, sourceQueuePartition)) + // TODO: comment this for errors.tolerance + .filter(sourceRecord -> sourceRecord != null) .collect(Collectors.toList()); // In RE-DELIVER we already have a state on the queue diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java index 5ff72ac..b2f6b28 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java @@ -18,6 +18,9 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + import javax.jms.BytesMessage; import javax.jms.JMSContext; import javax.jms.JMSException; @@ -26,7 +29,8 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; - +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +42,7 @@ public class JsonRecordBuilder extends BaseRecordBuilder { private static final Logger log = LoggerFactory.getLogger(JsonRecordBuilder.class); private JsonConverter converter; + private ToleranceType toleranceType; public JsonRecordBuilder() { log.info("Building records using com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); @@ -51,6 +56,13 @@ public JsonRecordBuilder() { converter.configure(m, false); } + @Override + public void configure(final Map props) { + super.configure(props); + final String errorTolerance = props.getOrDefault(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, ConnectorConfig.ERRORS_TOLERANCE_DEFAULT.toString()); + toleranceType = ToleranceType.valueOf(errorTolerance.toUpperCase(Locale.ROOT)); + } + /** * Gets the value schema to use for the Kafka Connect SourceRecord. * @@ -78,6 +90,12 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin throw new RecordBuilderException("Unsupported JMS message type"); } - return converter.toConnectData(topic, payload); + try { + return converter.toConnectData(topic, payload); + } catch (final Exception e) { + if (toleranceType == ToleranceType.NONE) + throw e; + } + return null; } } \ No newline at end of file