Skip to content

Commit cdfa328

Browse files
authored
[FLINK-32416] Fix flaky tests by ensuring test utilities produce records with consistency and cleanup notify no more splits to ensure it is sent. This closes #79
1 parent eaeb781 commit cdfa328

File tree

6 files changed

+63
-32
lines changed

6 files changed

+63
-32
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public class DynamicKafkaSourceEnumerator
9696
private int kafkaMetadataServiceDiscoveryFailureCount;
9797
private Map<String, Set<String>> latestClusterTopicsMap;
9898
private Set<KafkaStream> latestKafkaStreams;
99+
private boolean firstDiscoveryComplete;
99100

100101
public DynamicKafkaSourceEnumerator(
101102
KafkaStreamSubscriber kafkaStreamSubscriber,
@@ -151,6 +152,7 @@ public DynamicKafkaSourceEnumerator(
151152
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD,
152153
Integer::parseInt);
153154
this.kafkaMetadataServiceDiscoveryFailureCount = 0;
155+
this.firstDiscoveryComplete = false;
154156

155157
this.kafkaMetadataService = kafkaMetadataService;
156158
this.stoppableKafkaEnumContextProxyFactory = stoppableKafkaEnumContextProxyFactory;
@@ -212,32 +214,27 @@ public void start() {
212214

213215
private void handleNoMoreSplits() {
214216
if (Boundedness.BOUNDED.equals(boundedness)) {
215-
enumContext.runInCoordinatorThread(
216-
() -> {
217-
boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
218-
for (StoppableKafkaEnumContextProxy context :
219-
clusterEnumContextMap.values()) {
220-
allEnumeratorsHaveSignalledNoMoreSplits =
221-
allEnumeratorsHaveSignalledNoMoreSplits
222-
&& context.isNoMoreSplits();
223-
}
224-
225-
if (allEnumeratorsHaveSignalledNoMoreSplits) {
226-
logger.info(
227-
"Signal no more splits to all readers: {}",
228-
enumContext.registeredReaders().keySet());
229-
enumContext
230-
.registeredReaders()
231-
.keySet()
232-
.forEach(enumContext::signalNoMoreSplits);
233-
}
234-
});
217+
boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
218+
for (StoppableKafkaEnumContextProxy context : clusterEnumContextMap.values()) {
219+
allEnumeratorsHaveSignalledNoMoreSplits =
220+
allEnumeratorsHaveSignalledNoMoreSplits && context.isNoMoreSplits();
221+
}
222+
223+
if (firstDiscoveryComplete && allEnumeratorsHaveSignalledNoMoreSplits) {
224+
logger.info(
225+
"Signal no more splits to all readers: {}",
226+
enumContext.registeredReaders().keySet());
227+
enumContext.registeredReaders().keySet().forEach(enumContext::signalNoMoreSplits);
228+
} else {
229+
logger.info("Not ready to notify no more splits to readers.");
230+
}
235231
}
236232
}
237233

238234
// --------------- private methods for metadata discovery ---------------
239235

240236
private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams, Throwable t) {
237+
firstDiscoveryComplete = true;
241238
Set<KafkaStream> handledFetchKafkaStreams =
242239
handleFetchSubscribedStreamsError(fetchedKafkaStreams, t);
243240

@@ -370,9 +367,19 @@ private KafkaSourceEnumerator createEnumeratorWithAssignedTopicPartitions(
370367
Set<String> topics,
371368
KafkaSourceEnumState kafkaSourceEnumState,
372369
Properties fetchedProperties) {
370+
final Runnable signalNoMoreSplitsCallback;
371+
if (Boundedness.BOUNDED.equals(boundedness)) {
372+
signalNoMoreSplitsCallback = this::handleNoMoreSplits;
373+
} else {
374+
signalNoMoreSplitsCallback = null;
375+
}
376+
373377
StoppableKafkaEnumContextProxy context =
374378
stoppableKafkaEnumContextProxyFactory.create(
375-
enumContext, kafkaClusterId, kafkaMetadataService);
379+
enumContext,
380+
kafkaClusterId,
381+
kafkaMetadataService,
382+
signalNoMoreSplitsCallback);
376383

377384
Properties consumerProps = new Properties();
378385
KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
3636

37+
import javax.annotation.Nullable;
38+
3739
import java.util.Collection;
3840
import java.util.HashMap;
3941
import java.util.List;
@@ -69,6 +71,7 @@ public class StoppableKafkaEnumContextProxy
6971
private final KafkaMetadataService kafkaMetadataService;
7072
private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
7173
private final ScheduledExecutorService subEnumeratorWorker;
74+
private final Runnable signalNoMoreSplitsCallback;
7275
private boolean noMoreSplits = false;
7376
private volatile boolean isClosing;
7477

@@ -79,17 +82,20 @@ public class StoppableKafkaEnumContextProxy
7982
* KafkaSourceEnumerator
8083
* @param kafkaMetadataService the Kafka metadata service to facilitate error handling
8184
* @param enumContext the underlying enumerator context
85+
* @param signalNoMoreSplitsCallback the callback when signal no more splits is invoked
8286
*/
8387
public StoppableKafkaEnumContextProxy(
8488
String kafkaClusterId,
8589
KafkaMetadataService kafkaMetadataService,
86-
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext) {
90+
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
91+
@Nullable Runnable signalNoMoreSplitsCallback) {
8792
this.kafkaClusterId = kafkaClusterId;
8893
this.kafkaMetadataService = kafkaMetadataService;
8994
this.enumContext = enumContext;
9095
this.subEnumeratorWorker =
9196
Executors.newScheduledThreadPool(
9297
1, new ExecutorThreadFactory(kafkaClusterId + "-enum-worker"));
98+
this.signalNoMoreSplitsCallback = signalNoMoreSplitsCallback;
9399
this.isClosing = false;
94100
}
95101

@@ -147,8 +153,14 @@ public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignmen
147153

148154
@Override
149155
public void signalNoMoreSplits(int subtask) {
150-
// there are no more splits for this cluster
156+
// There are no more splits for this cluster, but we need to wait until all clusters are
157+
// finished with their respective split discoveries. In the Kafka Source, this is called in
158+
// the coordinator thread, ensuring thread safety, for all source readers at the same time.
151159
noMoreSplits = true;
160+
if (signalNoMoreSplitsCallback != null) {
161+
// Thread safe idempotent callback
162+
signalNoMoreSplitsCallback.run();
163+
}
152164
}
153165

154166
/** Execute the one time callables in the coordinator. */
@@ -286,12 +298,19 @@ public interface StoppableKafkaEnumContextProxyFactory {
286298
StoppableKafkaEnumContextProxy create(
287299
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
288300
String kafkaClusterId,
289-
KafkaMetadataService kafkaMetadataService);
301+
KafkaMetadataService kafkaMetadataService,
302+
Runnable signalNoMoreSplitsCallback);
290303

291304
static StoppableKafkaEnumContextProxyFactory getDefaultFactory() {
292-
return (enumContext, kafkaClusterId, kafkaMetadataService) ->
305+
return (enumContext,
306+
kafkaClusterId,
307+
kafkaMetadataService,
308+
signalNoMoreSplitsCallback) ->
293309
new StoppableKafkaEnumContextProxy(
294-
kafkaClusterId, kafkaMetadataService, enumContext);
310+
kafkaClusterId,
311+
kafkaMetadataService,
312+
enumContext,
313+
signalNoMoreSplitsCallback);
295314
}
296315
}
297316
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ public void start() {
132132

133133
@Override
134134
public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
135-
// do not return end of input if no more splits has not yet been signaled
136-
if (!isNoMoreSplits && clusterReaderMap.isEmpty()) {
135+
// at startup, do not return end of input if metadata event has not been received
136+
if (clusterReaderMap.isEmpty()) {
137137
return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
138138
}
139139

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,11 +919,13 @@ private DynamicKafkaSourceEnumState getCheckpointState() throws Throwable {
919919

920920
private static class TestKafkaEnumContextProxyFactory
921921
implements StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory {
922+
922923
@Override
923924
public StoppableKafkaEnumContextProxy create(
924925
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
925926
String kafkaClusterId,
926-
KafkaMetadataService kafkaMetadataService) {
927+
KafkaMetadataService kafkaMetadataService,
928+
Runnable signalNoMoreSplitsCallback) {
927929
return new TestKafkaEnumContextProxy(
928930
kafkaClusterId,
929931
kafkaMetadataService,
@@ -939,7 +941,7 @@ public TestKafkaEnumContextProxy(
939941
String kafkaClusterId,
940942
KafkaMetadataService kafkaMetadataService,
941943
MockSplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext) {
942-
super(kafkaClusterId, kafkaMetadataService, enumContext);
944+
super(kafkaClusterId, kafkaMetadataService, enumContext, null);
943945
this.enumContext = enumContext;
944946
}
945947

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ private StoppableKafkaEnumContextProxy createStoppableKafkaEnumContextProxy(
150150
return new StoppableKafkaEnumContextProxy(
151151
contextKafkaCluster,
152152
new MockKafkaMetadataService(Collections.singleton(mockStream)),
153-
enumContext);
153+
enumContext,
154+
null);
154155
}
155156

156157
// this modeled after `KafkaSourceEnumerator` topic partition subscription to throw the same

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ public static <K, V> void produceToKafka(
204204
throws Throwable {
205205
Properties props = new Properties();
206206
props.putAll(clusterProperties);
207+
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
207208
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
208209
props.setProperty(
209210
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
@@ -219,8 +220,9 @@ public static <K, V> void produceToKafka(
219220
};
220221
try (KafkaProducer<K, V> producer = new KafkaProducer<>(props)) {
221222
for (ProducerRecord<K, V> record : records) {
222-
producer.send(record, callback).get();
223+
producer.send(record, callback);
223224
}
225+
producer.flush();
224226
}
225227
if (sendingError.get() != null) {
226228
throw sendingError.get();

0 commit comments

Comments
 (0)