diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java index 9a93d7b88..05d4a84fe 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java @@ -37,6 +37,7 @@ import org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader; import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit; import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer; +import org.apache.flink.connector.kafka.source.KafkaConsumerFactory; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; @@ -87,6 +88,7 @@ public class DynamicKafkaSource private final OffsetsInitializer stoppingOffsetsInitializer; private final Properties properties; private final Boundedness boundedness; + private final KafkaConsumerFactory kafkaConsumerFactory; DynamicKafkaSource( KafkaStreamSubscriber kafkaStreamSubscriber, @@ -95,7 +97,8 @@ public class DynamicKafkaSource OffsetsInitializer startingOffsetsInitializer, OffsetsInitializer stoppingOffsetsInitializer, Properties properties, - Boundedness boundedness) { + Boundedness boundedness, + KafkaConsumerFactory kafkaConsumerFactory) { this.kafkaStreamSubscriber = kafkaStreamSubscriber; this.deserializationSchema = deserializationSchema; this.properties = properties; @@ -103,6 +106,7 @@ public class DynamicKafkaSource this.startingOffsetsInitializer = startingOffsetsInitializer; this.stoppingOffsetsInitializer = stoppingOffsetsInitializer; this.boundedness = boundedness; + this.kafkaConsumerFactory = kafkaConsumerFactory; } /** @@ -134,7 +138,11 @@ public Boundedness getBoundedness() { @Override public SourceReader createReader( SourceReaderContext readerContext) { - return new DynamicKafkaSourceReader<>(readerContext, deserializationSchema, properties); + return new DynamicKafkaSourceReader<>( + readerContext, + deserializationSchema, + properties, + kafkaConsumerFactory); } /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java index eab37c4ee..f283a2546 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java @@ -24,6 +24,7 @@ import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber; import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber; import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.StreamPatternSubscriber; +import org.apache.flink.connector.kafka.source.KafkaConsumerFactory; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; @@ -33,6 +34,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,7 @@ public class DynamicKafkaSourceBuilder { private OffsetsInitializer stoppingOffsetsInitializer; private Boundedness boundedness; private final Properties props; + private KafkaConsumerFactory kafkaConsumerFactory; DynamicKafkaSourceBuilder() { this.kafkaStreamSubscriber = null; @@ -61,6 +64,7 @@ public class DynamicKafkaSourceBuilder { this.stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer(); this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; this.props = new Properties(); + this.kafkaConsumerFactory = KafkaConsumer::new; } /** @@ -201,6 +205,13 @@ public DynamicKafkaSourceBuilder setClientIdPrefix(String prefix) { return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix); } + public DynamicKafkaSourceBuilder setKafkaConsumerFactory(KafkaConsumerFactory kafkaConsumerFactory) { + Preconditions.checkNotNull( + kafkaConsumerFactory, "kafkaConsumerFactory can not be null."); + this.kafkaConsumerFactory = kafkaConsumerFactory; + return this; + } + /** * Construct the source with the configuration that was set. * @@ -217,7 +228,8 @@ public DynamicKafkaSource build() { startingOffsetsInitializer, stoppingOffsetsInitializer, props, - boundedness); + boundedness, + kafkaConsumerFactory); } // Below are utility methods, code and structure are mostly copied over from KafkaSourceBuilder diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java index 8220ea14c..c9652ea84 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java @@ -35,6 +35,7 @@ import org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup; import org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager; import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit; +import org.apache.flink.connector.kafka.source.KafkaConsumerFactory; import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil; import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter; @@ -90,6 +91,7 @@ public class DynamicKafkaSourceReader implements SourceReader> clusterReaderMap; private final Map clustersProperties; private final List pendingSplits; + private final KafkaConsumerFactory kafkaConsumerFactory; private MultipleFuturesAvailabilityHelper availabilityHelper; private boolean isActivelyConsumingSplits; @@ -99,7 +101,8 @@ public class DynamicKafkaSourceReader implements SourceReader deserializationSchema, - Properties properties) { + Properties properties, + KafkaConsumerFactory kafkaConsumerFactory) { this.readerContext = readerContext; this.clusterReaderMap = new TreeMap<>(); this.deserializationSchema = deserializationSchema; @@ -116,6 +119,7 @@ public DynamicKafkaSourceReader( this.isActivelyConsumingSplits = false; this.restartingReaders = new AtomicBoolean(); this.clustersProperties = new HashMap<>(); + this.kafkaConsumerFactory = kafkaConsumerFactory; } /** @@ -458,7 +462,8 @@ public UserCodeClassLoader getUserCodeClassLoader() { readerSpecificProperties, readerContext, kafkaSourceReaderMetrics, - kafkaClusterId), + kafkaClusterId, + kafkaConsumerFactory), (ignore) -> {}), recordEmitter, toConfiguration(readerSpecificProperties), diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaPartitionSplitReaderWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaPartitionSplitReaderWrapper.java index cc250eace..21069a060 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaPartitionSplitReaderWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaPartitionSplitReaderWrapper.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.kafka.source.KafkaConsumerFactory; import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader; @@ -43,8 +44,9 @@ public KafkaPartitionSplitReaderWrapper( Properties props, SourceReaderContext context, KafkaSourceReaderMetrics kafkaSourceReaderMetrics, - String kafkaClusterId) { - super(props, context, kafkaSourceReaderMetrics); + String kafkaClusterId, + KafkaConsumerFactory kafkaConsumerFactory) { + super(props, context, kafkaSourceReaderMetrics, kafkaConsumerFactory); this.kafkaClusterId = kafkaClusterId; } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaConsumerFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaConsumerFactory.java new file mode 100644 index 000000000..800d5fcf9 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaConsumerFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; + +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.util.Properties; + +public interface KafkaConsumerFactory { + KafkaConsumer get(Properties properties); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index 39302751c..e993f6b3e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -114,6 +114,7 @@ public class KafkaSource private final Properties props; // Client rackId callback private final SerializableSupplier rackIdSupplier; + private final KafkaConsumerFactory kafkaConsumerFactory; KafkaSource( KafkaSubscriber subscriber, @@ -122,7 +123,8 @@ public class KafkaSource Boundedness boundedness, KafkaRecordDeserializationSchema deserializationSchema, Properties props, - SerializableSupplier rackIdSupplier) { + SerializableSupplier rackIdSupplier, + KafkaConsumerFactory kafkaConsumerFactory) { this.subscriber = subscriber; this.startingOffsetsInitializer = startingOffsetsInitializer; this.stoppingOffsetsInitializer = stoppingOffsetsInitializer; @@ -130,6 +132,7 @@ public class KafkaSource this.deserializationSchema = deserializationSchema; this.props = props; this.rackIdSupplier = rackIdSupplier; + this.kafkaConsumerFactory = kafkaConsumerFactory; } /** @@ -182,7 +185,8 @@ public UserCodeClassLoader getUserCodeClassLoader() { kafkaSourceReaderMetrics, Optional.ofNullable(rackIdSupplier) .map(Supplier::get) - .orElse(null)); + .orElse(null), + kafkaConsumerFactory); KafkaRecordEmitter recordEmitter = new KafkaRecordEmitter<>(deserializationSchema); return new KafkaSourceReader<>( diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java index 78a4b0b60..d76a71def 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java @@ -26,9 +26,11 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.SerializableSupplier; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; @@ -107,6 +109,7 @@ public class KafkaSourceBuilder { protected Properties props; // Client rackId supplier private SerializableSupplier rackIdSupplier; + private KafkaConsumerFactory kafkaConsumerFactory; KafkaSourceBuilder() { this.subscriber = null; @@ -116,6 +119,7 @@ public class KafkaSourceBuilder { this.deserializationSchema = null; this.props = new Properties(); this.rackIdSupplier = null; + this.kafkaConsumerFactory = KafkaConsumer::new; } /** @@ -423,6 +427,13 @@ public KafkaSourceBuilder setProperties(Properties props) { return this; } + public KafkaSourceBuilder setKafkaConsumerFactory(KafkaConsumerFactory kafkaConsumerFactory) { + Preconditions.checkNotNull( + kafkaConsumerFactory, "kafkaConsumerFactory can not be null."); + this.kafkaConsumerFactory = kafkaConsumerFactory; + return this; + } + /** * Build the {@link KafkaSource}. * @@ -438,7 +449,8 @@ public KafkaSource build() { boundedness, deserializationSchema, props, - rackIdSupplier); + rackIdSupplier, + kafkaConsumerFactory); } // ------------- private helpers -------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index 23956f5d5..a15f70331 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.kafka.source.KafkaConsumerFactory; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; @@ -79,22 +80,24 @@ public class KafkaPartitionSplitReader public KafkaPartitionSplitReader( Properties props, SourceReaderContext context, - KafkaSourceReaderMetrics kafkaSourceReaderMetrics) { - this(props, context, kafkaSourceReaderMetrics, null); + KafkaSourceReaderMetrics kafkaSourceReaderMetrics, + KafkaConsumerFactory kafkaConsumerFactory) { + this(props, context, kafkaSourceReaderMetrics, null, kafkaConsumerFactory); } public KafkaPartitionSplitReader( Properties props, SourceReaderContext context, KafkaSourceReaderMetrics kafkaSourceReaderMetrics, - String rackIdSupplier) { + String rackIdSupplier, + KafkaConsumerFactory kafkaConsumerFactory) { this.subtaskId = context.getIndexOfSubtask(); this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics; Properties consumerProps = new Properties(); consumerProps.putAll(props); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props)); setConsumerClientRack(consumerProps, rackIdSupplier); - this.consumer = new KafkaConsumer<>(consumerProps); + this.consumer = kafkaConsumerFactory.get(consumerProps); this.stoppingOffsets = new HashMap<>(); this.groupId = consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java index 5094e0151..fa4ac5bb5 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableList; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -271,7 +272,8 @@ private DynamicKafkaSourceReader createReaderWithoutStart( return new DynamicKafkaSourceReader<>( context, KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class), - properties); + properties, + KafkaConsumer::new); } private SourceReader startReader( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java index b592a6917..d625c3287 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -435,7 +436,8 @@ private KafkaPartitionSplitReader createReader( props, new TestingReaderContext(new Configuration(), sourceReaderMetricGroup), kafkaSourceReaderMetrics, - rackId); + rackId, + KafkaConsumer::new); } private Map assignSplits(