Skip to content

Commit 0307767

Browse files
committed
KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics (apache#9780)
The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic. However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end. Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity. Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance. The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes. These changes are implemented as follows: 1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets. 2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance. 3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed. 4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic. 5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.) 6. Change `MirrorMaker` similarly to `ConnectDistributed`. 7. Change existing unit tests to no longer use deprecated constructors. 8. Add unit tests for new functionality. Author: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
1 parent 6c00d8e commit 0307767

File tree

14 files changed

+808
-64
lines changed

14 files changed

+808
-64
lines changed

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
3737
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
3838

39+
import org.apache.kafka.connect.util.SharedTopicAdmin;
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
4142

@@ -233,20 +234,28 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
233234
plugins.compareAndSwapWithDelegatingLoader();
234235
DistributedConfig distributedConfig = new DistributedConfig(workerProps);
235236
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
236-
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
237+
// Create the admin client to be shared by all backing stores for this herder
238+
Map<String, Object> adminProps = new HashMap<>(config.originals());
239+
ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId);
240+
SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
241+
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
237242
offsetBackingStore.configure(distributedConfig);
238243
Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
239244
WorkerConfigTransformer configTransformer = worker.configTransformer();
240245
Converter internalValueConverter = worker.getInternalValueConverter();
241-
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
246+
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
242247
statusBackingStore.configure(distributedConfig);
243248
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
244249
internalValueConverter,
245250
distributedConfig,
246-
configTransformer);
251+
configTransformer,
252+
sharedAdmin);
253+
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
254+
// herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
255+
// tracking the various shared admin objects in this class.
247256
Herder herder = new DistributedHerder(distributedConfig, time, worker,
248257
kafkaClusterId, statusBackingStore, configBackingStore,
249-
advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY);
258+
advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
250259
herders.put(sourceAndTarget, herder);
251260
}
252261

connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@
3636
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
3737
import org.apache.kafka.connect.storage.StatusBackingStore;
3838
import org.apache.kafka.connect.util.ConnectUtils;
39+
import org.apache.kafka.connect.util.SharedTopicAdmin;
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
4142

4243
import java.net.URI;
4344
import java.util.Arrays;
4445
import java.util.Collections;
46+
import java.util.HashMap;
4547
import java.util.Map;
4648

4749
/**
@@ -101,7 +103,12 @@ public Connect startConnect(Map<String, String> workerProps) {
101103
URI advertisedUrl = rest.advertisedUrl();
102104
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
103105

104-
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
106+
// Create the admin client to be shared by all backing stores.
107+
Map<String, Object> adminProps = new HashMap<>(config.originals());
108+
ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId);
109+
SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
110+
111+
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
105112
offsetBackingStore.configure(config);
106113

107114
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
@@ -112,17 +119,20 @@ public Connect startConnect(Map<String, String> workerProps) {
112119
WorkerConfigTransformer configTransformer = worker.configTransformer();
113120

114121
Converter internalValueConverter = worker.getInternalValueConverter();
115-
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
122+
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
116123
statusBackingStore.configure(config);
117124

118125
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
119126
internalValueConverter,
120127
config,
121-
configTransformer);
128+
configTransformer,
129+
sharedAdmin);
122130

131+
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
132+
// herder is stopped. This is easier than having to track and own the lifecycle ourselves.
123133
DistributedHerder herder = new DistributedHerder(config, time, worker,
124134
kafkaClusterId, statusBackingStore, configBackingStore,
125-
advertisedUrl.toString(), connectorClientConfigOverridePolicy);
135+
advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
126136

127137
final Connect connect = new Connect(herder, rest);
128138
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

+34-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.common.utils.LogContext;
2929
import org.apache.kafka.common.utils.ThreadUtils;
3030
import org.apache.kafka.common.utils.Time;
31+
import org.apache.kafka.common.utils.Utils;
3132
import org.apache.kafka.connect.connector.Connector;
3233
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
3334
import org.apache.kafka.connect.errors.AlreadyExistsException;
@@ -66,6 +67,7 @@
6667
import javax.crypto.SecretKey;
6768
import javax.ws.rs.core.Response;
6869
import java.util.ArrayList;
70+
import java.util.Arrays;
6971
import java.util.Collection;
7072
import java.util.Collections;
7173
import java.util.HashSet;
@@ -138,6 +140,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
138140

139141
private final Time time;
140142
private final HerderMetrics herderMetrics;
143+
private final List<AutoCloseable> uponShutdown;
141144

142145
private final String workerGroupId;
143146
private final int workerSyncTimeoutMs;
@@ -185,16 +188,33 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
185188

186189
private final DistributedConfig config;
187190

191+
/**
192+
* Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
193+
* that have the same group ID.
194+
*
195+
* @param config the configuration for the worker; may not be null
196+
* @param time the clock to use; may not be null
197+
* @param worker the {@link Worker} instance to use; may not be null
198+
* @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null
199+
* @param statusBackingStore the backing store for statuses; may not be null
200+
* @param configBackingStore the backing store for connector configurations; may not be null
201+
* @param restUrl the URL of this herder's REST API; may not be null
202+
* @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
203+
* in connector configurations; may not be null
204+
* @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
205+
* after all services and resources owned by this herder are stopped
206+
*/
188207
public DistributedHerder(DistributedConfig config,
189208
Time time,
190209
Worker worker,
191210
String kafkaClusterId,
192211
StatusBackingStore statusBackingStore,
193212
ConfigBackingStore configBackingStore,
194213
String restUrl,
195-
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
214+
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
215+
AutoCloseable... uponShutdown) {
196216
this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(),
197-
time, connectorClientConfigOverridePolicy);
217+
time, connectorClientConfigOverridePolicy, uponShutdown);
198218
configBackingStore.setUpdateListener(new ConfigUpdateListener());
199219
}
200220

@@ -209,7 +229,8 @@ public DistributedHerder(DistributedConfig config,
209229
String restUrl,
210230
ConnectMetrics metrics,
211231
Time time,
212-
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
232+
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
233+
AutoCloseable... uponShutdown) {
213234
super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
214235

215236
this.time = time;
@@ -223,6 +244,7 @@ public DistributedHerder(DistributedConfig config,
223244
this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
224245
this.keyGenerator = config.getInternalRequestKeyGenerator();
225246
this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
247+
this.uponShutdown = Arrays.asList(uponShutdown);
226248

227249
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
228250
String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
@@ -676,6 +698,15 @@ public void halt() {
676698
}
677699
}
678700

701+
@Override
702+
protected void stopServices() {
703+
try {
704+
super.stopServices();
705+
} finally {
706+
this.uponShutdown.forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));
707+
}
708+
}
709+
679710
@Override
680711
public void stop() {
681712
log.info("Herder stopping");

connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.concurrent.ExecutionException;
6363
import java.util.concurrent.TimeUnit;
6464
import java.util.concurrent.TimeoutException;
65+
import java.util.function.Supplier;
6566

6667
/**
6768
* <p>
@@ -224,6 +225,7 @@ public static String COMMIT_TASKS_KEY(String connectorName) {
224225
// Connector and task configs: name or id -> config map
225226
private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
226227
private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
228+
private final Supplier<TopicAdmin> topicAdminSupplier;
227229

228230
// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
229231
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
@@ -241,11 +243,17 @@ public static String COMMIT_TASKS_KEY(String connectorName) {
241243

242244
private final WorkerConfigTransformer configTransformer;
243245

246+
@Deprecated
244247
public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) {
248+
this(converter, config, configTransformer, null);
249+
}
250+
251+
public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier) {
245252
this.lock = new Object();
246253
this.started = false;
247254
this.converter = converter;
248255
this.offset = -1;
256+
this.topicAdminSupplier = adminSupplier;
249257

250258
this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
251259
if (this.topic == null || this.topic.trim().length() == 0)
@@ -471,6 +479,7 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
471479

472480
Map<String, Object> adminProps = new HashMap<>(originals);
473481
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
482+
Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
474483
Map<String, Object> topicSettings = config instanceof DistributedConfig
475484
? ((DistributedConfig) config).configStorageTopicSettings()
476485
: Collections.emptyMap();
@@ -481,27 +490,25 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
481490
.replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
482491
.build();
483492

484-
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
493+
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
485494
}
486495

487496
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
488497
Map<String, Object> consumerProps,
489498
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
490-
final NewTopic topicDescription, final Map<String, Object> adminProps) {
491-
Runnable createTopics = () -> {
499+
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
500+
java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
492501
log.debug("Creating admin client to manage Connect internal config topic");
493-
try (TopicAdmin admin = new TopicAdmin(adminProps)) {
494-
// Create the topic if it doesn't exist
495-
Set<String> newTopics = admin.createTopics(topicDescription);
496-
if (!newTopics.contains(topic)) {
497-
// It already existed, so check that the topic cleanup policy is compact only and not delete
498-
log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
499-
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
500-
DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
501-
}
502+
// Create the topic if it doesn't exist
503+
Set<String> newTopics = admin.createTopics(topicDescription);
504+
if (!newTopics.contains(topic)) {
505+
// It already existed, so check that the topic cleanup policy is compact only and not delete
506+
log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
507+
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
508+
DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
502509
}
503510
};
504-
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
511+
return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
505512
}
506513

507514
@SuppressWarnings("unchecked")

connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java

+24-13
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@
4141
import java.util.Collections;
4242
import java.util.HashMap;
4343
import java.util.Map;
44+
import java.util.Objects;
4445
import java.util.Set;
4546
import java.util.concurrent.ExecutionException;
4647
import java.util.concurrent.Future;
4748
import java.util.concurrent.TimeUnit;
4849
import java.util.concurrent.TimeoutException;
50+
import java.util.function.Supplier;
4951

5052
/**
5153
* <p>
@@ -62,6 +64,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
6264

6365
private KafkaBasedLog<byte[], byte[]> offsetLog;
6466
private HashMap<ByteBuffer, ByteBuffer> data;
67+
private final Supplier<TopicAdmin> topicAdminSupplier;
68+
69+
@Deprecated
70+
public KafkaOffsetBackingStore() {
71+
this.topicAdminSupplier = null;
72+
}
73+
74+
public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin) {
75+
this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
76+
}
6577

6678
@Override
6779
public void configure(final WorkerConfig config) {
@@ -86,6 +98,7 @@ public void configure(final WorkerConfig config) {
8698

8799
Map<String, Object> adminProps = new HashMap<>(originals);
88100
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
101+
Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
89102
Map<String, Object> topicSettings = config instanceof DistributedConfig
90103
? ((DistributedConfig) config).offsetStorageTopicSettings()
91104
: Collections.emptyMap();
@@ -96,27 +109,25 @@ public void configure(final WorkerConfig config) {
96109
.replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG))
97110
.build();
98111

99-
offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps);
112+
offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier);
100113
}
101114

102115
private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
103116
Map<String, Object> consumerProps,
104117
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
105-
final NewTopic topicDescription, final Map<String, Object> adminProps) {
106-
Runnable createTopics = () -> {
118+
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
119+
java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
107120
log.debug("Creating admin client to manage Connect internal offset topic");
108-
try (TopicAdmin admin = new TopicAdmin(adminProps)) {
109-
// Create the topic if it doesn't exist
110-
Set<String> newTopics = admin.createTopics(topicDescription);
111-
if (!newTopics.contains(topic)) {
112-
// It already existed, so check that the topic cleanup policy is compact only and not delete
113-
log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
114-
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
115-
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
116-
}
121+
// Create the topic if it doesn't exist
122+
Set<String> newTopics = admin.createTopics(topicDescription);
123+
if (!newTopics.contains(topic)) {
124+
// It already existed, so check that the topic cleanup policy is compact only and not delete
125+
log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
126+
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
127+
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
117128
}
118129
};
119-
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
130+
return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
120131
}
121132

122133
@Override

0 commit comments

Comments
 (0)