From 3d406fa6d42301db3fc8719221c38dda9a5f81ab Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 1 Sep 2024 15:54:17 +0800 Subject: [PATCH 1/5] optimize state --- .../DynamicKafkaSourceEnumerator.java | 17 +++---- .../enumerator/KafkaSourceEnumState.java | 2 +- .../KafkaSourceEnumStateSerializer.java | 45 ++++++------------- .../enumerator/KafkaSourceEnumerator.java | 2 +- 4 files changed, 22 insertions(+), 44 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java index 20e8b9238..edca3baf5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java @@ -35,6 +35,7 @@ import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator; +import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; @@ -298,24 +299,18 @@ private void onHandleSubscribedStreamsFetch(Set fetchedKafkaStreams final Set activeTopics = activeClusterTopics.getValue(); // filter out removed topics - Set activeAssignedPartitions = - kafkaSourceEnumState.assignedPartitions().stream() - .filter(tp -> activeTopics.contains(tp.topic())) - .collect(Collectors.toSet()); - Set activeUnassignedInitialPartitions = - kafkaSourceEnumState.unassignedInitialPartitions().stream() - .filter(tp -> activeTopics.contains(tp.topic())) - .collect(Collectors.toSet()); + Set partitions = kafkaSourceEnumState.partitions().stream() + .filter(tp -> activeTopics.contains(tp.topicPartition().topic())) + .collect(Collectors.toSet()); newKafkaSourceEnumState = new KafkaSourceEnumState( - activeAssignedPartitions, - activeUnassignedInitialPartitions, + partitions, kafkaSourceEnumState.initialDiscoveryFinished()); } else { newKafkaSourceEnumState = new KafkaSourceEnumState( - Collections.emptySet(), Collections.emptySet(), false); + Collections.emptySet(),false); } // restarts enumerator from state using only the active topic partitions, to avoid diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java index 70c435ee3..9370dada4 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java @@ -32,7 +32,7 @@ public class KafkaSourceEnumState { /** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */ private final Set partitions; /** - * this flag will be marked as true if inital partitions are discovered after enumerator starts. + * this flag will be marked as true if initial partitions are discovered after enumerator starts. */ private final boolean initialDiscoveryFinished; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java index 0ea4d9f65..7a4a9ba6f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java @@ -89,9 +89,7 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I case CURRENT_VERSION: return deserializeTopicPartitionAndAssignmentStatus(serialized); case VERSION_1: - final Set assignedPartitions = - deserializeTopicPartitions(serialized); - return new KafkaSourceEnumState(assignedPartitions, new HashSet<>(), true); + return deserializeTopicPartitions(serialized, AssignmentStatus.ASSIGNED); case VERSION_0: Map> currentPartitionAssignment = SerdeUtils.deserializeSplitAssignments( @@ -113,53 +111,38 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I } } - private static Set deserializeTopicPartitions(byte[] serializedTopicPartitions) + private static KafkaSourceEnumState deserializeTopicPartitions(byte[] serializedTopicPartitions, + AssignmentStatus assignmentStatus) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedTopicPartitions); DataInputStream in = new DataInputStream(bais)) { - final int numPartitions = in.readInt(); - Set topicPartitions = new HashSet<>(numPartitions); - for (int i = 0; i < numPartitions; i++) { - final String topic = in.readUTF(); - final int partition = in.readInt(); - topicPartitions.add(new TopicPartition(topic, partition)); - } - if (in.available() > 0) { - throw new IOException("Unexpected trailing bytes in serialized topic partitions"); - } - - return topicPartitions; - } - } - - private static KafkaSourceEnumState deserializeTopicPartitionAndAssignmentStatus( - byte[] serialized) throws IOException { - - try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); - DataInputStream in = new DataInputStream(bais)) { - final int numPartitions = in.readInt(); Set partitions = new HashSet<>(numPartitions); - for (int i = 0; i < numPartitions; i++) { final String topic = in.readUTF(); final int partition = in.readInt(); - final int statusCode = in.readInt(); + if(assignmentStatus == null){ + final int statusCode = in.readInt(); + assignmentStatus = AssignmentStatus.ofStatusCode(statusCode); + } partitions.add( new TopicPartitionAndAssignmentStatus( new TopicPartition(topic, partition), - AssignmentStatus.ofStatusCode(statusCode))); + assignmentStatus)); } - final boolean initialDiscoveryFinished = in.readBoolean(); if (in.available() > 0) { throw new IOException("Unexpected trailing bytes in serialized topic partitions"); } - - return new KafkaSourceEnumState(partitions, initialDiscoveryFinished); + return new KafkaSourceEnumState(partitions, true); } } + private static KafkaSourceEnumState deserializeTopicPartitionAndAssignmentStatus( + byte[] serialized) throws IOException { + return deserializeTopicPartitions(serialized, null); + } + @VisibleForTesting public static byte[] serializeTopicPartitions(Collection topicPartitions) throws IOException { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 02323a74f..10025fa2a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -112,7 +112,7 @@ public KafkaSourceEnumerator( properties, context, boundedness, - new KafkaSourceEnumState(Collections.emptySet(), Collections.emptySet(), false)); + new KafkaSourceEnumState(Collections.emptySet(), false)); } public KafkaSourceEnumerator( From 103bc39ad38643cbf750c6f405a22bab5e565700 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 1 Sep 2024 16:00:34 +0800 Subject: [PATCH 2/5] optimize state --- .../KafkaSourceEnumStateSerializer.java | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java index 7a4a9ba6f..da61a4c09 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java @@ -112,24 +112,21 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I } private static KafkaSourceEnumState deserializeTopicPartitions(byte[] serializedTopicPartitions, - AssignmentStatus assignmentStatus) + AssignmentStatus status) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedTopicPartitions); DataInputStream in = new DataInputStream(bais)) { final int numPartitions = in.readInt(); + Set topicPartitions = new HashSet<>(numPartitions); Set partitions = new HashSet<>(numPartitions); for (int i = 0; i < numPartitions; i++) { final String topic = in.readUTF(); final int partition = in.readInt(); - if(assignmentStatus == null){ - final int statusCode = in.readInt(); - assignmentStatus = AssignmentStatus.ofStatusCode(statusCode); - } partitions.add( new TopicPartitionAndAssignmentStatus( new TopicPartition(topic, partition), - assignmentStatus)); + status)); } if (in.available() > 0) { throw new IOException("Unexpected trailing bytes in serialized topic partitions"); @@ -140,7 +137,29 @@ private static KafkaSourceEnumState deserializeTopicPartitions(byte[] serialized private static KafkaSourceEnumState deserializeTopicPartitionAndAssignmentStatus( byte[] serialized) throws IOException { - return deserializeTopicPartitions(serialized, null); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(bais)) { + + final int numPartitions = in.readInt(); + Set partitions = new HashSet<>(numPartitions); + + for (int i = 0; i < numPartitions; i++) { + final String topic = in.readUTF(); + final int partition = in.readInt(); + final int statusCode = in.readInt(); + partitions.add( + new TopicPartitionAndAssignmentStatus( + new TopicPartition(topic, partition), + AssignmentStatus.ofStatusCode(statusCode))); + } + final boolean initialDiscoveryFinished = in.readBoolean(); + if (in.available() > 0) { + throw new IOException("Unexpected trailing bytes in serialized topic partitions"); + } + + return new KafkaSourceEnumState(partitions, initialDiscoveryFinished); + } } @VisibleForTesting From b7035721be3e51d15d340ddf81e3d33738b53ab3 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 1 Sep 2024 16:01:25 +0800 Subject: [PATCH 3/5] optimize state --- .../kafka/source/enumerator/KafkaSourceEnumStateSerializer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java index da61a4c09..bd524c67b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java @@ -118,7 +118,6 @@ private static KafkaSourceEnumState deserializeTopicPartitions(byte[] serialized DataInputStream in = new DataInputStream(bais)) { final int numPartitions = in.readInt(); - Set topicPartitions = new HashSet<>(numPartitions); Set partitions = new HashSet<>(numPartitions); for (int i = 0; i < numPartitions; i++) { final String topic = in.readUTF(); From 3b640cc52b90abf4351d0ba46b90031fdc3e2a20 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Thu, 12 Sep 2024 10:44:43 +0800 Subject: [PATCH 4/5] fix style --- .../enumerator/DynamicKafkaSourceEnumerator.java | 14 ++++++-------- .../source/enumerator/KafkaSourceEnumState.java | 3 ++- .../enumerator/KafkaSourceEnumStateSerializer.java | 8 +++----- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java index edca3baf5..b61cee403 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java @@ -299,18 +299,16 @@ private void onHandleSubscribedStreamsFetch(Set fetchedKafkaStreams final Set activeTopics = activeClusterTopics.getValue(); // filter out removed topics - Set partitions = kafkaSourceEnumState.partitions().stream() - .filter(tp -> activeTopics.contains(tp.topicPartition().topic())) - .collect(Collectors.toSet()); + Set partitions = + kafkaSourceEnumState.partitions().stream() + .filter(tp -> activeTopics.contains(tp.topicPartition().topic())) + .collect(Collectors.toSet()); newKafkaSourceEnumState = new KafkaSourceEnumState( - partitions, - kafkaSourceEnumState.initialDiscoveryFinished()); + partitions, kafkaSourceEnumState.initialDiscoveryFinished()); } else { - newKafkaSourceEnumState = - new KafkaSourceEnumState( - Collections.emptySet(),false); + newKafkaSourceEnumState = new KafkaSourceEnumState(Collections.emptySet(), false); } // restarts enumerator from state using only the active topic partitions, to avoid diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java index 9370dada4..66ceeeb8a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java @@ -32,7 +32,8 @@ public class KafkaSourceEnumState { /** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */ private final Set partitions; /** - * this flag will be marked as true if initial partitions are discovered after enumerator starts. + * this flag will be marked as true if initial partitions are discovered after enumerator + * starts. */ private final boolean initialDiscoveryFinished; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java index bd524c67b..817cfeb6f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java @@ -111,9 +111,8 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I } } - private static KafkaSourceEnumState deserializeTopicPartitions(byte[] serializedTopicPartitions, - AssignmentStatus status) - throws IOException { + private static KafkaSourceEnumState deserializeTopicPartitions( + byte[] serializedTopicPartitions, AssignmentStatus status) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedTopicPartitions); DataInputStream in = new DataInputStream(bais)) { @@ -124,8 +123,7 @@ private static KafkaSourceEnumState deserializeTopicPartitions(byte[] serialized final int partition = in.readInt(); partitions.add( new TopicPartitionAndAssignmentStatus( - new TopicPartition(topic, partition), - status)); + new TopicPartition(topic, partition), status)); } if (in.available() > 0) { throw new IOException("Unexpected trailing bytes in serialized topic partitions"); From cc4ac6c53434cb1857fa78ba11162168e46fcd23 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Thu, 12 Sep 2024 11:11:55 +0800 Subject: [PATCH 5/5] rename deserializeTopicPartitions--->deserializeAssignedTopicPartitions --- .../source/enumerator/KafkaSourceEnumStateSerializer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java index 817cfeb6f..f8dc17deb 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java @@ -89,7 +89,7 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I case CURRENT_VERSION: return deserializeTopicPartitionAndAssignmentStatus(serialized); case VERSION_1: - return deserializeTopicPartitions(serialized, AssignmentStatus.ASSIGNED); + return deserializeAssignedTopicPartitions(serialized); case VERSION_0: Map> currentPartitionAssignment = SerdeUtils.deserializeSplitAssignments( @@ -111,8 +111,8 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I } } - private static KafkaSourceEnumState deserializeTopicPartitions( - byte[] serializedTopicPartitions, AssignmentStatus status) throws IOException { + private static KafkaSourceEnumState deserializeAssignedTopicPartitions( + byte[] serializedTopicPartitions) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedTopicPartitions); DataInputStream in = new DataInputStream(bais)) { @@ -123,7 +123,7 @@ private static KafkaSourceEnumState deserializeTopicPartitions( final int partition = in.readInt(); partitions.add( new TopicPartitionAndAssignmentStatus( - new TopicPartition(topic, partition), status)); + new TopicPartition(topic, partition), AssignmentStatus.ASSIGNED)); } if (in.available() > 0) { throw new IOException("Unexpected trailing bytes in serialized topic partitions");