Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36817] Introduce KafkaConsumerFactory #137

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer;
import org.apache.flink.connector.kafka.source.KafkaConsumerFactory;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
Expand Down Expand Up @@ -87,6 +88,7 @@ public class DynamicKafkaSource<T>
private final OffsetsInitializer stoppingOffsetsInitializer;
private final Properties properties;
private final Boundedness boundedness;
private final KafkaConsumerFactory kafkaConsumerFactory;

DynamicKafkaSource(
KafkaStreamSubscriber kafkaStreamSubscriber,
Expand All @@ -95,14 +97,16 @@ public class DynamicKafkaSource<T>
OffsetsInitializer startingOffsetsInitializer,
OffsetsInitializer stoppingOffsetsInitializer,
Properties properties,
Boundedness boundedness) {
Boundedness boundedness,
KafkaConsumerFactory kafkaConsumerFactory) {
this.kafkaStreamSubscriber = kafkaStreamSubscriber;
this.deserializationSchema = deserializationSchema;
this.properties = properties;
this.kafkaMetadataService = kafkaMetadataService;
this.startingOffsetsInitializer = startingOffsetsInitializer;
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
this.boundedness = boundedness;
this.kafkaConsumerFactory = kafkaConsumerFactory;
}

/**
Expand Down Expand Up @@ -134,7 +138,11 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<T, DynamicKafkaSourceSplit> createReader(
SourceReaderContext readerContext) {
return new DynamicKafkaSourceReader<>(readerContext, deserializationSchema, properties);
return new DynamicKafkaSourceReader<>(
readerContext,
deserializationSchema,
properties,
kafkaConsumerFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.StreamPatternSubscriber;
import org.apache.flink.connector.kafka.source.KafkaConsumerFactory;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
Expand All @@ -33,6 +34,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,6 +54,7 @@ public class DynamicKafkaSourceBuilder<T> {
private OffsetsInitializer stoppingOffsetsInitializer;
private Boundedness boundedness;
private final Properties props;
private KafkaConsumerFactory kafkaConsumerFactory;

DynamicKafkaSourceBuilder() {
this.kafkaStreamSubscriber = null;
Expand All @@ -61,6 +64,7 @@ public class DynamicKafkaSourceBuilder<T> {
this.stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer();
this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
this.props = new Properties();
this.kafkaConsumerFactory = KafkaConsumer::new;
}

/**
Expand Down Expand Up @@ -201,6 +205,13 @@ public DynamicKafkaSourceBuilder<T> setClientIdPrefix(String prefix) {
return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix);
}

public DynamicKafkaSourceBuilder<T> setKafkaConsumerFactory(KafkaConsumerFactory kafkaConsumerFactory) {
Preconditions.checkNotNull(
kafkaConsumerFactory, "kafkaConsumerFactory can not be null.");
this.kafkaConsumerFactory = kafkaConsumerFactory;
return this;
}

/**
* Construct the source with the configuration that was set.
*
Expand All @@ -217,7 +228,8 @@ public DynamicKafkaSource<T> build() {
startingOffsetsInitializer,
stoppingOffsetsInitializer,
props,
boundedness);
boundedness,
kafkaConsumerFactory);
}

// Below are utility methods, code and structure are mostly copied over from KafkaSourceBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup;
import org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.KafkaConsumerFactory;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka
private final NavigableMap<String, KafkaSourceReader<T>> clusterReaderMap;
private final Map<String, Properties> clustersProperties;
private final List<DynamicKafkaSourceSplit> pendingSplits;
private final KafkaConsumerFactory kafkaConsumerFactory;

private MultipleFuturesAvailabilityHelper availabilityHelper;
private boolean isActivelyConsumingSplits;
Expand All @@ -99,7 +101,8 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka
public DynamicKafkaSourceReader(
SourceReaderContext readerContext,
KafkaRecordDeserializationSchema<T> deserializationSchema,
Properties properties) {
Properties properties,
KafkaConsumerFactory kafkaConsumerFactory) {
this.readerContext = readerContext;
this.clusterReaderMap = new TreeMap<>();
this.deserializationSchema = deserializationSchema;
Expand All @@ -116,6 +119,7 @@ public DynamicKafkaSourceReader(
this.isActivelyConsumingSplits = false;
this.restartingReaders = new AtomicBoolean();
this.clustersProperties = new HashMap<>();
this.kafkaConsumerFactory = kafkaConsumerFactory;
}

/**
Expand Down Expand Up @@ -458,7 +462,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
readerSpecificProperties,
readerContext,
kafkaSourceReaderMetrics,
kafkaClusterId),
kafkaClusterId,
kafkaConsumerFactory),
(ignore) -> {}),
recordEmitter,
toConfiguration(readerSpecificProperties),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.kafka.source.KafkaConsumerFactory;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;

Expand All @@ -43,8 +44,9 @@ public KafkaPartitionSplitReaderWrapper(
Properties props,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
String kafkaClusterId) {
super(props, context, kafkaSourceReaderMetrics);
String kafkaClusterId,
KafkaConsumerFactory kafkaConsumerFactory) {
super(props, context, kafkaSourceReaderMetrics, kafkaConsumerFactory);
this.kafkaClusterId = kafkaClusterId;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.source;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

public interface KafkaConsumerFactory {
KafkaConsumer<byte[], byte[]> get(Properties properties);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was debating myself if it would be better to return Consumer interface instead of the implementation. Happy to change it if community agrees that returning an interface is the better choice.

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class KafkaSource<OUT>
private final Properties props;
// Client rackId callback
private final SerializableSupplier<String> rackIdSupplier;
private final KafkaConsumerFactory kafkaConsumerFactory;

KafkaSource(
KafkaSubscriber subscriber,
Expand All @@ -122,14 +123,16 @@ public class KafkaSource<OUT>
Boundedness boundedness,
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
Properties props,
SerializableSupplier<String> rackIdSupplier) {
SerializableSupplier<String> rackIdSupplier,
KafkaConsumerFactory kafkaConsumerFactory) {
this.subscriber = subscriber;
this.startingOffsetsInitializer = startingOffsetsInitializer;
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
this.boundedness = boundedness;
this.deserializationSchema = deserializationSchema;
this.props = props;
this.rackIdSupplier = rackIdSupplier;
this.kafkaConsumerFactory = kafkaConsumerFactory;
}

/**
Expand Down Expand Up @@ -182,7 +185,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
kafkaSourceReaderMetrics,
Optional.ofNullable(rackIdSupplier)
.map(Supplier::get)
.orElse(null));
.orElse(null),
kafkaConsumerFactory);
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);

return new KafkaSourceReader<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableSupplier;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -107,6 +109,7 @@ public class KafkaSourceBuilder<OUT> {
protected Properties props;
// Client rackId supplier
private SerializableSupplier<String> rackIdSupplier;
private KafkaConsumerFactory kafkaConsumerFactory;

KafkaSourceBuilder() {
this.subscriber = null;
Expand All @@ -116,6 +119,7 @@ public class KafkaSourceBuilder<OUT> {
this.deserializationSchema = null;
this.props = new Properties();
this.rackIdSupplier = null;
this.kafkaConsumerFactory = KafkaConsumer::new;
}

/**
Expand Down Expand Up @@ -423,6 +427,13 @@ public KafkaSourceBuilder<OUT> setProperties(Properties props) {
return this;
}

public KafkaSourceBuilder<OUT> setKafkaConsumerFactory(KafkaConsumerFactory kafkaConsumerFactory) {
Preconditions.checkNotNull(
kafkaConsumerFactory, "kafkaConsumerFactory can not be null.");
this.kafkaConsumerFactory = kafkaConsumerFactory;
return this;
}

/**
* Build the {@link KafkaSource}.
*
Expand All @@ -438,7 +449,8 @@ public KafkaSource<OUT> build() {
boundedness,
deserializationSchema,
props,
rackIdSupplier);
rackIdSupplier,
kafkaConsumerFactory);
}

// ------------- private helpers --------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.KafkaConsumerFactory;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -79,22 +81,24 @@ public class KafkaPartitionSplitReader
public KafkaPartitionSplitReader(
Properties props,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
this(props, context, kafkaSourceReaderMetrics, null);
KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
KafkaConsumerFactory kafkaConsumerFactory) {
this(props, context, kafkaSourceReaderMetrics, null, kafkaConsumerFactory);
}

public KafkaPartitionSplitReader(
Properties props,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
String rackIdSupplier) {
String rackIdSupplier,
KafkaConsumerFactory kafkaConsumerFactory) {
this.subtaskId = context.getIndexOfSubtask();
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
Properties consumerProps = new Properties();
consumerProps.putAll(props);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props));
setConsumerClientRack(consumerProps, rackIdSupplier);
this.consumer = new KafkaConsumer<>(consumerProps);
this.consumer = kafkaConsumerFactory.get(consumerProps);
this.stoppingOffsets = new HashMap<>();
this.groupId = consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
Expand Down Expand Up @@ -271,7 +272,8 @@ private DynamicKafkaSourceReader<Integer> createReaderWithoutStart(
return new DynamicKafkaSourceReader<>(
context,
KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class),
properties);
properties,
KafkaConsumer::new);
}

private SourceReader<Integer, DynamicKafkaSourceSplit> startReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand Down Expand Up @@ -435,7 +436,8 @@ private KafkaPartitionSplitReader createReader(
props,
new TestingReaderContext(new Configuration(), sourceReaderMetricGroup),
kafkaSourceReaderMetrics,
rackId);
rackId,
KafkaConsumer::new);
}

private Map<String, KafkaPartitionSplit> assignSplits(
Expand Down
Loading