diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index 497e691f1d2e2..4b63bdf86d467 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,20 +234,27 @@ private void addHerder(SourceAndTarget sourceAndTarget) { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); + // Create the admin client to be shared by all backing stores for this herder + Map adminProps = new HashMap<>(config.originals()); + SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin); offsetBackingStore.configure(distributedConfig); Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY); WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin); statusBackingStore.configure(distributedConfig); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, distributedConfig, - configTransformer); + configTransformer, + sharedAdmin); + // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the + // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than + // tracking the various shared admin objects in this class. Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, - advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY); + advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin); herders.put(sourceAndTarget, herder); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 22c1ad82d6138..5150dd55c7709 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -36,12 +36,14 @@ import org.apache.kafka.connect.storage.KafkaStatusBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectUtils; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URI; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** @@ -101,7 +103,11 @@ public Connect startConnect(Map workerProps) { URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); + // Create the admin client to be shared by all backing stores. + Map adminProps = new HashMap<>(config.originals()); + SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); + + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin); offsetBackingStore.configure(config); ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin( @@ -112,17 +118,20 @@ public Connect startConnect(Map workerProps) { WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin); statusBackingStore.configure(config); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, config, - configTransformer); + configTransformer, + sharedAdmin); + // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the + // herder is stopped. This is easier than having to track and own the lifecycle ourselves. DistributedHerder herder = new DistributedHerder(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, - advertisedUrl.toString(), connectorClientConfigOverridePolicy); + advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin); final Connect connect = new Connect(herder, rest); log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 896fccd69fe6a..6e8b6d7d57eb1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; @@ -67,6 +68,7 @@ import javax.crypto.SecretKey; import javax.ws.rs.core.Response; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -139,6 +141,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final Time time; private final HerderMetrics herderMetrics; + private final List uponShutdown; private final String workerGroupId; private final int workerSyncTimeoutMs; @@ -186,6 +189,22 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final DistributedConfig config; + /** + * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs) + * that have the same group ID. + * + * @param config the configuration for the worker; may not be null + * @param time the clock to use; may not be null + * @param worker the {@link Worker} instance to use; may not be null + * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null + * @param statusBackingStore the backing store for statuses; may not be null + * @param configBackingStore the backing store for connector configurations; may not be null + * @param restUrl the URL of this herder's REST API; may not be null + * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden + * in connector configurations; may not be null + * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped}, + * after all services and resources owned by this herder are stopped + */ public DistributedHerder(DistributedConfig config, Time time, Worker worker, @@ -193,9 +212,10 @@ public DistributedHerder(DistributedConfig config, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), - time, connectorClientConfigOverridePolicy); + time, connectorClientConfigOverridePolicy, uponShutdown); configBackingStore.setUpdateListener(new ConfigUpdateListener()); } @@ -210,7 +230,8 @@ public DistributedHerder(DistributedConfig config, String restUrl, ConnectMetrics metrics, Time time, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy); this.time = time; @@ -224,6 +245,7 @@ public DistributedHerder(DistributedConfig config, this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG); this.keyGenerator = config.getInternalRequestKeyGenerator(); this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); + this.uponShutdown = Arrays.asList(uponShutdown); String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig; @@ -621,6 +643,15 @@ public void halt() { } } + @Override + protected void stopServices() { + try { + super.stopServices(); + } finally { + this.uponShutdown.forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "")); + } + } + @Override public void stop() { log.info("Herder stopping"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e279ed07bfebd..6a48c85ba28bc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -59,6 +59,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** *

@@ -221,6 +222,7 @@ public static String COMMIT_TASKS_KEY(String connectorName) { // Connector and task configs: name or id -> config map private final Map> connectorConfigs = new HashMap<>(); private final Map> taskConfigs = new HashMap<>(); + private final Supplier topicAdminSupplier; // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data // is in an inconsistent state and we cannot safely use them until they have been refreshed. @@ -238,11 +240,17 @@ public static String COMMIT_TASKS_KEY(String connectorName) { private final WorkerConfigTransformer configTransformer; + @Deprecated public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) { + this(converter, config, configTransformer, null); + } + + public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer, Supplier adminSupplier) { this.lock = new Object(); this.started = false; this.converter = converter; this.offset = -1; + this.topicAdminSupplier = adminSupplier; this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG); if (this.topic == null || this.topic.trim().length() == 0) @@ -463,29 +471,26 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); - NewTopic topicDescription = TopicAdmin.defineTopic(topic). - compacted(). - partitions(1). - replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). - build(); - - return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps); + Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); + NewTopic topicDescription = TopicAdmin.defineTopic(topic) + .compacted() + .partitions(1) + .replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)) + .build(); + + return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, - final NewTopic topicDescription, final Map adminProps) { - Runnable createTopics = new Runnable() { - @Override - public void run() { - log.debug("Creating admin client to manage Connect internal config topic"); - try (TopicAdmin admin = new TopicAdmin(adminProps)) { - admin.createTopics(topicDescription); - } - } + final NewTopic topicDescription, Supplier adminSupplier) { + java.util.function.Consumer createTopics = admin -> { + log.debug("Creating admin client to manage Connect internal config topic"); + // Create the topic if it doesn't exist + admin.createTopics(topicDescription); }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); } @SuppressWarnings("unchecked") diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 22bcb7a5ae2e6..4cc1a4912ecc5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -38,10 +38,12 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** *

@@ -58,6 +60,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { private KafkaBasedLog offsetLog; private HashMap data; + private final Supplier topicAdminSupplier; + + @Deprecated + public KafkaOffsetBackingStore() { + this.topicAdminSupplier = null; + } + + public KafkaOffsetBackingStore(Supplier topicAdmin) { + this.topicAdminSupplier = Objects.requireNonNull(topicAdmin); + } @Override public void configure(final WorkerConfig config) { @@ -78,29 +90,26 @@ public void configure(final WorkerConfig config) { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); - NewTopic topicDescription = TopicAdmin.defineTopic(topic). - compacted(). - partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)). - replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)). - build(); - - offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps); + Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); + NewTopic topicDescription = TopicAdmin.defineTopic(topic) + .compacted() + .partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)) + .replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)) + .build(); + + offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, - final NewTopic topicDescription, final Map adminProps) { - Runnable createTopics = new Runnable() { - @Override - public void run() { - log.debug("Creating admin client to manage Connect internal offset topic"); - try (TopicAdmin admin = new TopicAdmin(adminProps)) { - admin.createTopics(topicDescription); - } - } + final NewTopic topicDescription, Supplier adminSupplier) { + java.util.function.Consumer createTopics = admin -> { + log.debug("Creating admin client to manage Connect internal offset topic"); + // Create the topic if it doesn't exist + admin.createTopics(topicDescription); }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index c8eace72934a5..bdd6b5123ba71 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -59,6 +59,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; /** * StatusBackingStore implementation which uses a compacted topic for storage @@ -126,17 +127,24 @@ public class KafkaStatusBackingStore implements StatusBackingStore { protected final Table> tasks; protected final Map> connectors; protected final ConcurrentMap> topics; + private final Supplier topicAdminSupplier; private String statusTopic; private KafkaBasedLog kafkaLog; private int generation; + @Deprecated public KafkaStatusBackingStore(Time time, Converter converter) { + this(time, converter, null); + } + + public KafkaStatusBackingStore(Time time, Converter converter, Supplier topicAdminSupplier) { this.time = time; this.converter = converter; this.tasks = new Table<>(); this.connectors = new HashMap<>(); this.topics = new ConcurrentHashMap<>(); + this.topicAdminSupplier = topicAdminSupplier; } // visible for testing @@ -163,11 +171,12 @@ public void configure(final WorkerConfig config) { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); - NewTopic topicDescription = TopicAdmin.defineTopic(statusTopic). - compacted(). - partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)). - replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)). - build(); + Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); + NewTopic topicDescription = TopicAdmin.defineTopic(statusTopic) + .compacted() + .partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)) + .replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)) + .build(); Callback> readCallback = new Callback>() { @Override @@ -175,23 +184,19 @@ public void onCompletion(Throwable error, ConsumerRecord record) read(record); } }; - this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminProps); + this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, - final NewTopic topicDescription, final Map adminProps) { - Runnable createTopics = new Runnable() { - @Override - public void run() { - log.debug("Creating admin client to manage Connect internal status topic"); - try (TopicAdmin admin = new TopicAdmin(adminProps)) { - admin.createTopics(topicDescription); - } - } + final NewTopic topicDescription, Supplier adminSupplier) { + java.util.function.Consumer createTopics = admin -> { + log.debug("Creating admin client to manage Connect internal status topic"); + // Create the topic if it doesn't exist + admin.createTopics(topicDescription); }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, time, createTopics); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 5248715aa6293..6a2a787578e84 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -41,10 +41,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** @@ -79,13 +81,15 @@ public class KafkaBasedLog { private final Map producerConfigs; private final Map consumerConfigs; private final Callback> consumedCallback; + private final Supplier topicAdminSupplier; private Consumer consumer; private Producer producer; + private TopicAdmin admin; private Thread thread; private boolean stopRequested; private Queue> readLogEndOffsetCallbacks; - private Runnable initializer; + private java.util.function.Consumer initializer; /** * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until @@ -103,31 +107,63 @@ public class KafkaBasedLog { * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log * @param time Time interface * @param initializer the component that should be run when this log is {@link #start() started}; may be null + * @deprecated Replaced by {@link #KafkaBasedLog(String, Map, Map, Supplier, Callback, Time, java.util.function.Consumer)} */ + @Deprecated public KafkaBasedLog(String topic, Map producerConfigs, Map consumerConfigs, Callback> consumedCallback, Time time, Runnable initializer) { + this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null); + } + + /** + * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until + * {@link #start()} is invoked. + * + * @param topic the topic to treat as a log + * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must + * contain compatible serializer settings for the generic types used on this class. Some + * setting, such as the number of acks, will be overridden to ensure correct behavior of this + * class. + * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must + * contain compatible serializer settings for the generic types used on this class. Some + * setting, such as the auto offset reset policy, will be overridden to ensure correct + * behavior of this class. + * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled + * by the calling component; may not be null + * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log + * @param time Time interface + * @param initializer the function that should be run when this log is {@link #start() started}; may be null + */ + public KafkaBasedLog(String topic, + Map producerConfigs, + Map consumerConfigs, + Supplier topicAdminSupplier, + Callback> consumedCallback, + Time time, + java.util.function.Consumer initializer) { this.topic = topic; this.producerConfigs = producerConfigs; this.consumerConfigs = consumerConfigs; + this.topicAdminSupplier = Objects.requireNonNull(topicAdminSupplier); this.consumedCallback = consumedCallback; this.stopRequested = false; this.readLogEndOffsetCallbacks = new ArrayDeque<>(); this.time = time; - this.initializer = initializer != null ? initializer : new Runnable() { - @Override - public void run() { - } - }; + this.initializer = initializer != null ? initializer : admin -> { }; } public void start() { log.info("Starting KafkaBasedLog with topic " + topic); - initializer.run(); + // Create the topic admin client and initialize the topic ... + admin = topicAdminSupplier.get(); // may be null + initializer.accept(admin); + + // Then create the producer and consumer producer = createProducer(); consumer = createConsumer(); @@ -193,6 +229,9 @@ public void stop() { log.error("Failed to stop KafkaBasedLog consumer", e); } + // do not close the admin client, since we don't own it + admin = null; + log.info("Stopped KafkaBasedLog for topic " + topic); } @@ -282,7 +321,29 @@ private void readToLogEnd() { log.trace("Reading to end of offset log"); Set assignment = consumer.assignment(); - Map endOffsets = consumer.endOffsets(assignment); + Map endOffsets; + // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. + // That is because it's possible that the consumer is already blocked waiting for new records to appear, when + // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least + // one more record becomes available, meaning we can't even check whether we're at the end offset. + // Since all we're trying to do here is get the end offset, we should use the supplied admin client + // (if available) + // (which prevents 'consumer.endOffsets(...)' + // from + + // Deprecated constructors do not provide an admin supplier, so the admin is potentially null. + if (admin != null) { + // Use the admin client to immediately find the end offsets for the assigned topic partitions. + // Unlike using the consumer + endOffsets = admin.endOffsets(assignment); + } else { + // The admin may be null if older deprecated constructor is used, though AK Connect currently always provides an admin client. + // Using the consumer is not ideal, because when the topic has low volume, the 'poll(...)' method called from the + // work thread may have blocked the consumer while waiting for more records (even when there are none). + // In such cases, this call to the consumer to simply find the end offsets will block even though we might already be + // at the end offset. + endOffsets = consumer.endOffsets(assignment); + } log.trace("Reading to end of log offsets {}", endOffsets); while (!endOffsets.isEmpty()) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java new file mode 100644 index 0000000000000..4fda12cb6bcb2 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.connect.errors.ConnectException; + +/** + * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers. + * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin} + * instance until this SharedAdmin is closed via {@link #close()}. + * + *

The owner of this object is responsible for ensuring that either {@link #close()} + * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this + * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + * + *

This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object, + * until this object is closed, at which point it cannot be used anymore + */ +public class SharedTopicAdmin implements AutoCloseable, Supplier { + + private final Map adminProps; + private final AtomicReference admin = new AtomicReference<>(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Function, TopicAdmin> factory; + + public SharedTopicAdmin(Map adminProps) { + this(adminProps, TopicAdmin::new); + } + + // Visible for testing + SharedTopicAdmin(Map adminProps, Function, TopicAdmin> factory) { + this.adminProps = Objects.requireNonNull(adminProps); + this.factory = Objects.requireNonNull(factory); + } + + /** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ + @Override + public TopicAdmin get() { + return topicAdmin(); + } + + /** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ + public TopicAdmin topicAdmin() { + return admin.updateAndGet(this::createAdmin); + } + + /** + * Get the string containing the list of bootstrap server addresses to the Kafka broker(s) to which + * the admin client connects. + * + * @return the bootstrap servers as a string; never null + */ + public String bootstrapServers() { + return adminProps.getOrDefault(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "").toString(); + } + + /** + * Close the underlying {@link TopicAdmin} instance, if one has been created, and prevent new ones from being created. + * + *

Once this method is called, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + */ + @Override + public void close() { + if (this.closed.compareAndSet(false, true)) { + TopicAdmin admin = this.admin.getAndSet(null); + if (admin != null) { + admin.close(); + } + } + } + + @Override + public String toString() { + return "admin client for brokers at " + bootstrapServers(); + } + + /** + * Method used to create a {@link TopicAdmin} instance. This method must be side-effect free, since it is called from within + * the {@link AtomicReference#updateAndGet(UnaryOperator)}. + * + * @param existing the existing instance; may be null + * @return the + */ + protected TopicAdmin createAdmin(TopicAdmin existing) { + if (closed.get()) { + throw new ConnectException("The " + this + " has already been closed and cannot be used."); + } + if (existing != null) { + return existing; + } + return factory.apply(adminProps); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index e644e805ea613..e94561e91b5d5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -19,15 +19,22 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; -import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +44,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; /** * Utility to simplify creating and managing topics via the {@link Admin}. @@ -175,6 +184,14 @@ public TopicAdmin(Map adminConfig) { this.adminConfig = adminConfig != null ? adminConfig : Collections.emptyMap(); } + /** + * Get the {@link Admin} client used by this topic admin object. + * @return the Kafka admin instance; never null + */ + public Admin admin() { + return admin; + } + /** * Attempt to create the topic described by the given definition, returning true if the topic was created or false * if the topic already existed. @@ -268,6 +285,58 @@ public Set createTopics(NewTopic... topics) { return newlyCreatedTopicNames; } + /** + * Fetch the most recent offset for each of the supplied {@link TopicPartition} objects. + * + * @param partitions the topic partitions + * @return the map of offset for each topic partition, or an empty map if the supplied partitions + * are null or empty + * @throws RetriableException if a retriable error occurs, the operation takes too long, or the + * thread is interrupted while attempting to perform this operation + * @throws ConnectException if a non retriable error occurs + */ + public Map endOffsets(Set partitions) { + if (partitions == null || partitions.isEmpty()) { + return Collections.emptyMap(); + } + Map offsetSpecMap = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())); + ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap); + // Get the individual result for each topic partition so we have better error messages + Map result = new HashMap<>(); + for (TopicPartition partition : partitions) { + try { + ListOffsetsResultInfo info = resultFuture.partitionResult(partition).get(); + result.put(partition, info.offset()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + String topic = partition.topic(); + if (cause instanceof AuthorizationException) { + String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new ConnectException(msg, e); + } else if (cause instanceof UnsupportedVersionException) { + // Should theoretically never happen, because this method is the same as what the consumer uses and therefore + // should exist in the broker since before the admin client was added + String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, bootstrapServers()); + throw new ConnectException(msg, e); + } else if (cause instanceof TimeoutException) { + String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new RetriableException(msg, e); + } else if (cause instanceof LeaderNotAvailableException) { + String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new RetriableException(msg, e); + } else { + String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new ConnectException(msg, e); + } + } catch (InterruptedException e) { + Thread.interrupted(); + String msg = String.format("Interrupted while attempting to read end offsets for topic '%s' on brokers at %s", partition.topic(), bootstrapServers()); + throw new RetriableException(msg, e); + } + } + return result; + } + @Override public void close() { admin.close(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 04e18acd39023..2a96850d77744 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -78,6 +78,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -182,12 +183,10 @@ public class DistributedHerderTest { @Mock private Worker worker; @Mock private WorkerConfigTransformer transformer; @Mock private Callback> putConnectorCallback; - @Mock - private Plugins plugins; - @Mock - private PluginClassLoader pluginLoader; - @Mock - private DelegatingClassLoader delegatingLoader; + @Mock private Plugins plugins; + @Mock private PluginClassLoader pluginLoader; + @Mock private DelegatingClassLoader delegatingLoader; + private CountDownLatch shutdownCalled = new CountDownLatch(1); private ConfigBackingStore.UpdateListener configUpdateListener; private WorkerRebalanceListener rebalanceListener; @@ -205,6 +204,7 @@ public void setUp() throws Exception { metrics = new MockConnectMetrics(time); worker = PowerMock.createMock(Worker.class); EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE); + AutoCloseable uponShutdown = () -> shutdownCalled.countDown(); // Default to the old protocol unless specified otherwise connectProtocolVersion = CONNECT_PROTOCOL_V0; @@ -212,7 +212,8 @@ public void setUp() throws Exception { herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"}, new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID, - statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy); + statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy, + new AutoCloseable[]{uponShutdown}); configUpdateListener = herder.new ConfigUpdateListener(); rebalanceListener = herder.new RebalanceListener(time); @@ -2101,6 +2102,13 @@ public void testThreadNames() { getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor")); } + @Test + public void testHerderStopServicesClosesUponShutdown() { + assertEquals(1, shutdownCalled.getCount()); + herder.stopServices(); + assertEquals(0, shutdownCalled.getCount()); + } + private void expectRebalance(final long offset, final List assignedConnectors, final List assignedTasks) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 9a5569004cb3e..85f0e2ad6311a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TestFuture; +import org.apache.kafka.connect.util.TopicAdmin; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -56,6 +57,7 @@ import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -141,7 +143,7 @@ public class KafkaConfigBackingStoreTest { private Capture capturedTopic = EasyMock.newCapture(); private Capture> capturedProducerProps = EasyMock.newCapture(); private Capture> capturedConsumerProps = EasyMock.newCapture(); - private Capture> capturedAdminProps = EasyMock.newCapture(); + private Capture> capturedAdminSupplier = EasyMock.newCapture(); private Capture capturedNewTopic = EasyMock.newCapture(); private Capture>> capturedConsumedCallback = EasyMock.newCapture(); @@ -886,7 +888,7 @@ private void expectConfigure() throws Exception { PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback), - EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps)) + EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier)) .andReturn(storeLog); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index c04363b974605..05e5b5f2a93d2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -49,6 +50,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -105,7 +107,7 @@ public class KafkaOffsetBackingStoreTest { private Capture capturedTopic = EasyMock.newCapture(); private Capture> capturedProducerProps = EasyMock.newCapture(); private Capture> capturedConsumerProps = EasyMock.newCapture(); - private Capture> capturedAdminProps = EasyMock.newCapture(); + private Capture> capturedAdminSupplier = EasyMock.newCapture(); private Capture capturedNewTopic = EasyMock.newCapture(); private Capture>> capturedConsumedCallback = EasyMock.newCapture(); @@ -380,7 +382,7 @@ public void onCompletion(Throwable error, Void result) { private void expectConfigure() throws Exception { PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback), - EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps)) + EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier)) .andReturn(storeLog); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java new file mode 100644 index 0000000000000..9e1886a20a0a3 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; + +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.Rule; +import org.mockito.Mock; +import org.junit.Before; +import org.junit.Test; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SharedTopicAdminTest { + + private static final Map EMPTY_CONFIG = Collections.emptyMap(); + + @Rule + public MockitoRule rule = MockitoJUnit.rule(); + + @Mock private TopicAdmin mockTopicAdmin; + @Mock private Function, TopicAdmin> factory; + private SharedTopicAdmin sharedAdmin; + + @Before + public void beforeEach() { + when(factory.apply(anyMap())).thenReturn(mockTopicAdmin); + sharedAdmin = new SharedTopicAdmin(EMPTY_CONFIG, factory::apply); + } + + @Test + public void shouldCloseWithoutBeingUsed() { + // When closed before being used + sharedAdmin.close(); + // Then should not create or close admin + verifyTopicAdminCreatesAndCloses(0); + } + + @Test + public void shouldCloseAfterTopicAdminUsed() { + // When used and then closed + assertSame(mockTopicAdmin, sharedAdmin.topicAdmin()); + sharedAdmin.close(); + // Then should have created and closed just one admin + verifyTopicAdminCreatesAndCloses(1); + } + + @Test + public void shouldCloseAfterTopicAdminUsedMultipleTimes() { + // When used many times and then closed + for (int i = 0; i != 10; ++i) { + assertSame(mockTopicAdmin, sharedAdmin.topicAdmin()); + } + sharedAdmin.close(); + // Then should have created and closed just one admin + verifyTopicAdminCreatesAndCloses(1); + } + + @Test + public void shouldFailToGetTopicAdminAfterClose() { + // When closed + sharedAdmin.close(); + // Then using the admin should fail + assertThrows(ConnectException.class, () -> sharedAdmin.topicAdmin()); + } + + private void verifyTopicAdminCreatesAndCloses(int count) { + verify(factory, times(count)).apply(anyMap()); + verify(mockTopicAdmin, times(count)).close(); + } +} \ No newline at end of file diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index cfcab32c7754c..0b5be7c69c518 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -17,28 +17,43 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientUnitTestEnv; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.errors.ConnectException; import org.junit.Test; import java.util.Collections; import java.util.HashMap; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TopicAdminTest { @@ -129,6 +144,107 @@ public void shouldReturnFalseWhenSuppliedNullTopicDescription() { } } + @SuppressWarnings("unchecked") + private KafkaFuture mockFuture() { + return (KafkaFuture) mock(KafkaFuture.class); + } + + private Admin expectAdminListOffsetsFailure(Throwable expected) throws Exception { + // When the admin client lists offsets + Admin mockAdmin = mock(Admin.class); + ListOffsetsResult results = mock(ListOffsetsResult.class); + when(mockAdmin.listOffsets(anyMap())).thenReturn(results); + // and throws an exception via the future.get() + ExecutionException execException = new ExecutionException(expected); + KafkaFuture future = mockFuture(); + when(future.get()).thenThrow(execException); + when(results.partitionResult(any(TopicPartition.class))).thenReturn(future); + return mockAdmin; + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new AuthorizationException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new UnsupportedVersionException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new TimeoutException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new RuntimeException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + + @Test + public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // Then the topic admin should return immediately + Admin mockAdmin = mock(Admin.class); + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + Map offsets = admin.endOffsets(Collections.emptySet()); + assertTrue(offsets.isEmpty()); + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new AuthorizationException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + private Cluster createCluster(int numNodes) { HashMap nodes = new HashMap<>(); for (int i = 0; i < numNodes; ++i) {