Skip to content

Commit 63c7b4e

Browse files
authored
[Fix][Connector-V2] Fix kafka format_error_handle_way not work (#7838)
1 parent 258f931 commit 63c7b4e

File tree

11 files changed

+60
-12
lines changed

11 files changed

+60
-12
lines changed

seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ public final class Handover<T> implements Closeable {
3030
new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
3131
private Throwable error;
3232

33-
public boolean isEmpty() {
33+
public boolean isEmpty() throws Exception {
34+
if (error != null) {
35+
rethrowException(error, error.getMessage());
36+
}
3437
return blockingQueue.isEmpty();
3538
}
3639

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.common;
19+
20+
import org.junit.jupiter.api.Assertions;
21+
import org.junit.jupiter.api.Test;
22+
23+
public class HandoverTest {
24+
25+
@Test
26+
public void testThrowExceptionWhenQueueIsEmtpy() {
27+
Handover<Object> handover = new Handover<>();
28+
handover.reportError(new RuntimeException("test"));
29+
Assertions.assertThrows(RuntimeException.class, handover::isEmpty);
30+
}
31+
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

34-
import java.io.IOException;
3534
import java.util.Map;
3635

3736
public class KafkaRecordEmitter
@@ -71,13 +70,14 @@ public void emitRecord(
7170
// consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset
7271
// for the next run
7372
splitState.setCurrentOffset(consumerRecord.offset() + 1);
74-
} catch (IOException e) {
73+
} catch (Exception e) {
7574
if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) {
7675
logger.warn(
7776
"Deserialize message failed, skip this message, message: {}",
7877
new String(consumerRecord.value()));
78+
} else {
79+
throw e;
7980
}
80-
throw e;
8181
}
8282
}
8383

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ public void testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer c
291291
DEFAULT_FORMAT,
292292
DEFAULT_FIELD_DELIMITER,
293293
null);
294-
generateTestData(row -> serializer.serializeRow(row), 0, 100);
294+
generateTestData(serializer::serializeRow, 0, 100);
295295
Container.ExecResult execResult =
296296
container.executeJob(
297297
"/kafka/kafkasource_format_error_handle_way_skip_to_console.conf");
@@ -308,11 +308,11 @@ public void testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer c
308308
DEFAULT_FORMAT,
309309
DEFAULT_FIELD_DELIMITER,
310310
null);
311-
generateTestData(row -> serializer.serializeRow(row), 0, 100);
311+
generateTestData(serializer::serializeRow, 0, 100);
312312
Container.ExecResult execResult =
313313
container.executeJob(
314314
"/kafka/kafkasource_format_error_handle_way_fail_to_console.conf");
315-
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
315+
Assertions.assertEquals(1, execResult.getExitCode(), execResult.getStderr());
316316
}
317317

318318
@TestTemplate

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ source {
3737
result_table_name = "kafka_table"
3838
start_mode = "earliest"
3939
format_error_handle_way = fail
40+
format = text
4041
schema = {
4142
fields {
4243
id = bigint

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ source {
3737
result_table_name = "kafka_table"
3838
start_mode = "earliest"
3939
format_error_handle_way = skip
40+
format = text
4041
schema = {
4142
fields {
4243
id = bigint

seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ public SeaTunnelInputPartitionReader(ParallelBatchPartitionReader partitionReade
3434

3535
@Override
3636
public boolean next() throws IOException {
37-
return partitionReader.next();
37+
try {
38+
return partitionReader.next();
39+
} catch (Exception e) {
40+
throw new RuntimeException(e);
41+
}
3842
}
3943

4044
@Override

seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected String getEnumeratorThreadName() {
8484
return String.format("parallel-split-enumerator-executor-%s", subtaskId);
8585
}
8686

87-
public boolean next() throws IOException {
87+
public boolean next() throws Exception {
8888
prepare();
8989
while (running && handover.isEmpty()) {
9090
try {

seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected String getEnumeratorThreadName() {
8484
return String.format("parallel-split-enumerator-executor-%s", subtaskId);
8585
}
8686

87-
public boolean next() throws IOException {
87+
public boolean next() throws Exception {
8888
prepare();
8989
while (running && handover.isEmpty()) {
9090
try {

seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ public SeaTunnelBatchPartitionReader(ParallelBatchPartitionReader partitionReade
3232

3333
@Override
3434
public boolean next() throws IOException {
35-
return partitionReader.next();
35+
try {
36+
return partitionReader.next();
37+
} catch (Exception e) {
38+
throw new RuntimeException(e);
39+
}
3640
}
3741

3842
@Override

seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partition
3434

3535
@Override
3636
public boolean next() throws IOException {
37-
return partitionReader.next();
37+
try {
38+
return partitionReader.next();
39+
} catch (Exception e) {
40+
throw new RuntimeException(e);
41+
}
3842
}
3943

4044
@Override

0 commit comments

Comments
 (0)