diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java index 20e8b9238..b61cee403 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java @@ -35,6 +35,7 @@ import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator; +import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; @@ -298,24 +299,16 @@ private void onHandleSubscribedStreamsFetch(Set fetchedKafkaStreams final Set activeTopics = activeClusterTopics.getValue(); // filter out removed topics - Set activeAssignedPartitions = - kafkaSourceEnumState.assignedPartitions().stream() - .filter(tp -> activeTopics.contains(tp.topic())) - .collect(Collectors.toSet()); - Set activeUnassignedInitialPartitions = - kafkaSourceEnumState.unassignedInitialPartitions().stream() - .filter(tp -> activeTopics.contains(tp.topic())) + Set partitions = + kafkaSourceEnumState.partitions().stream() + .filter(tp -> activeTopics.contains(tp.topicPartition().topic())) .collect(Collectors.toSet()); newKafkaSourceEnumState = new KafkaSourceEnumState( - activeAssignedPartitions, - activeUnassignedInitialPartitions, - kafkaSourceEnumState.initialDiscoveryFinished()); + partitions, kafkaSourceEnumState.initialDiscoveryFinished()); } else { - newKafkaSourceEnumState = - new KafkaSourceEnumState( - Collections.emptySet(), Collections.emptySet(), false); + newKafkaSourceEnumState = new KafkaSourceEnumState(Collections.emptySet(), false); } // restarts enumerator from state using only the active topic partitions, to avoid diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java index 70c435ee3..66ceeeb8a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java @@ -32,7 +32,8 @@ public class KafkaSourceEnumState { /** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */ private final Set partitions; /** - * this flag will be marked as true if inital partitions are discovered after enumerator starts. + * this flag will be marked as true if initial partitions are discovered after enumerator + * starts. */ private final boolean initialDiscoveryFinished; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java index 0ea4d9f65..f8dc17deb 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java @@ -89,9 +89,7 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I case CURRENT_VERSION: return deserializeTopicPartitionAndAssignmentStatus(serialized); case VERSION_1: - final Set assignedPartitions = - deserializeTopicPartitions(serialized); - return new KafkaSourceEnumState(assignedPartitions, new HashSet<>(), true); + return deserializeAssignedTopicPartitions(serialized); case VERSION_0: Map> currentPartitionAssignment = SerdeUtils.deserializeSplitAssignments( @@ -113,23 +111,24 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I } } - private static Set deserializeTopicPartitions(byte[] serializedTopicPartitions) - throws IOException { + private static KafkaSourceEnumState deserializeAssignedTopicPartitions( + byte[] serializedTopicPartitions) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedTopicPartitions); DataInputStream in = new DataInputStream(bais)) { final int numPartitions = in.readInt(); - Set topicPartitions = new HashSet<>(numPartitions); + Set partitions = new HashSet<>(numPartitions); for (int i = 0; i < numPartitions; i++) { final String topic = in.readUTF(); final int partition = in.readInt(); - topicPartitions.add(new TopicPartition(topic, partition)); + partitions.add( + new TopicPartitionAndAssignmentStatus( + new TopicPartition(topic, partition), AssignmentStatus.ASSIGNED)); } if (in.available() > 0) { throw new IOException("Unexpected trailing bytes in serialized topic partitions"); } - - return topicPartitions; + return new KafkaSourceEnumState(partitions, true); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 02323a74f..10025fa2a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -112,7 +112,7 @@ public KafkaSourceEnumerator( properties, context, boundedness, - new KafkaSourceEnumState(Collections.emptySet(), Collections.emptySet(), false)); + new KafkaSourceEnumState(Collections.emptySet(), false)); } public KafkaSourceEnumerator(