From 0338995d7330d2f806155c6736afb39d96ecfbcf Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Tue, 9 Feb 2021 11:09:41 -0600 Subject: [PATCH] KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics (#9780) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic. However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end. Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity. Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance. The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes. These changes are implemented as follows: 1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets. 2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance. 3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed. 4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic. 5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.) 6. Change `MirrorMaker` similarly to `ConnectDistributed`. 7. Change existing unit tests to no longer use deprecated constructors. 8. Add unit tests for new functionality. Author: Randall Hauch Reviewer: Konstantine Karantasis --- .../kafka/connect/mirror/MirrorMaker.java | 16 ++- .../kafka/connect/cli/ConnectDistributed.java | 17 ++- .../distributed/DistributedHerder.java | 37 ++++- .../storage/KafkaConfigBackingStore.java | 39 +++--- .../storage/KafkaOffsetBackingStore.java | 43 +++--- .../storage/KafkaStatusBackingStore.java | 37 ++--- .../kafka/connect/util/KafkaBasedLog.java | 77 +++++++++-- .../kafka/connect/util/SharedTopicAdmin.java | 128 ++++++++++++++++++ .../apache/kafka/connect/util/TopicAdmin.java | 71 +++++++++- .../distributed/DistributedHerderTest.java | 22 ++- .../storage/KafkaConfigBackingStoreTest.java | 6 +- .../storage/KafkaOffsetBackingStoreTest.java | 6 +- .../connect/util/SharedTopicAdminTest.java | 95 +++++++++++++ .../kafka/connect/util/TopicAdminTest.java | 118 +++++++++++++++- 14 files changed, 630 insertions(+), 82 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index 497e691f1d2e2..4b63bdf86d467 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,20 +234,27 @@ private void addHerder(SourceAndTarget sourceAndTarget) { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); + // Create the admin client to be shared by all backing stores for this herder + Map adminProps = new HashMap<>(config.originals()); + SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin); offsetBackingStore.configure(distributedConfig); Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY); WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin); statusBackingStore.configure(distributedConfig); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, distributedConfig, - configTransformer); + configTransformer, + sharedAdmin); + // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the + // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than + // tracking the various shared admin objects in this class. Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, - advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY); + advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin); herders.put(sourceAndTarget, herder); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 22c1ad82d6138..5150dd55c7709 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -36,12 +36,14 @@ import org.apache.kafka.connect.storage.KafkaStatusBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectUtils; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URI; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** @@ -101,7 +103,11 @@ public Connect startConnect(Map workerProps) { URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); + // Create the admin client to be shared by all backing stores. + Map adminProps = new HashMap<>(config.originals()); + SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); + + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin); offsetBackingStore.configure(config); ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin( @@ -112,17 +118,20 @@ public Connect startConnect(Map workerProps) { WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin); statusBackingStore.configure(config); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, config, - configTransformer); + configTransformer, + sharedAdmin); + // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the + // herder is stopped. This is easier than having to track and own the lifecycle ourselves. DistributedHerder herder = new DistributedHerder(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, - advertisedUrl.toString(), connectorClientConfigOverridePolicy); + advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin); final Connect connect = new Connect(herder, rest); log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 896fccd69fe6a..6e8b6d7d57eb1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; @@ -67,6 +68,7 @@ import javax.crypto.SecretKey; import javax.ws.rs.core.Response; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -139,6 +141,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final Time time; private final HerderMetrics herderMetrics; + private final List uponShutdown; private final String workerGroupId; private final int workerSyncTimeoutMs; @@ -186,6 +189,22 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final DistributedConfig config; + /** + * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs) + * that have the same group ID. + * + * @param config the configuration for the worker; may not be null + * @param time the clock to use; may not be null + * @param worker the {@link Worker} instance to use; may not be null + * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null + * @param statusBackingStore the backing store for statuses; may not be null + * @param configBackingStore the backing store for connector configurations; may not be null + * @param restUrl the URL of this herder's REST API; may not be null + * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden + * in connector configurations; may not be null + * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped}, + * after all services and resources owned by this herder are stopped + */ public DistributedHerder(DistributedConfig config, Time time, Worker worker, @@ -193,9 +212,10 @@ public DistributedHerder(DistributedConfig config, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), - time, connectorClientConfigOverridePolicy); + time, connectorClientConfigOverridePolicy, uponShutdown); configBackingStore.setUpdateListener(new ConfigUpdateListener()); } @@ -210,7 +230,8 @@ public DistributedHerder(DistributedConfig config, String restUrl, ConnectMetrics metrics, Time time, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy); this.time = time; @@ -224,6 +245,7 @@ public DistributedHerder(DistributedConfig config, this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG); this.keyGenerator = config.getInternalRequestKeyGenerator(); this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); + this.uponShutdown = Arrays.asList(uponShutdown); String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig; @@ -621,6 +643,15 @@ public void halt() { } } + @Override + protected void stopServices() { + try { + super.stopServices(); + } finally { + this.uponShutdown.forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "")); + } + } + @Override public void stop() { log.info("Herder stopping"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e279ed07bfebd..6a48c85ba28bc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -59,6 +59,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** *

@@ -221,6 +222,7 @@ public static String COMMIT_TASKS_KEY(String connectorName) { // Connector and task configs: name or id -> config map private final Map> connectorConfigs = new HashMap<>(); private final Map> taskConfigs = new HashMap<>(); + private final Supplier topicAdminSupplier; // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data // is in an inconsistent state and we cannot safely use them until they have been refreshed. @@ -238,11 +240,17 @@ public static String COMMIT_TASKS_KEY(String connectorName) { private final WorkerConfigTransformer configTransformer; + @Deprecated public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) { + this(converter, config, configTransformer, null); + } + + public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer, Supplier adminSupplier) { this.lock = new Object(); this.started = false; this.converter = converter; this.offset = -1; + this.topicAdminSupplier = adminSupplier; this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG); if (this.topic == null || this.topic.trim().length() == 0) @@ -463,29 +471,26 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); - NewTopic topicDescription = TopicAdmin.defineTopic(topic). - compacted(). - partitions(1). - replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). - build(); - - return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps); + Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); + NewTopic topicDescription = TopicAdmin.defineTopic(topic) + .compacted() + .partitions(1) + .replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)) + .build(); + + return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, - final NewTopic topicDescription, final Map adminProps) { - Runnable createTopics = new Runnable() { - @Override - public void run() { - log.debug("Creating admin client to manage Connect internal config topic"); - try (TopicAdmin admin = new TopicAdmin(adminProps)) { - admin.createTopics(topicDescription); - } - } + final NewTopic topicDescription, Supplier adminSupplier) { + java.util.function.Consumer createTopics = admin -> { + log.debug("Creating admin client to manage Connect internal config topic"); + // Create the topic if it doesn't exist + admin.createTopics(topicDescription); }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); } @SuppressWarnings("unchecked") diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 22bcb7a5ae2e6..4cc1a4912ecc5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -38,10 +38,12 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** *

@@ -58,6 +60,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { private KafkaBasedLog offsetLog; private HashMap data; + private final Supplier topicAdminSupplier; + + @Deprecated + public KafkaOffsetBackingStore() { + this.topicAdminSupplier = null; + } + + public KafkaOffsetBackingStore(Supplier topicAdmin) { + this.topicAdminSupplier = Objects.requireNonNull(topicAdmin); + } @Override public void configure(final WorkerConfig config) { @@ -78,29 +90,26 @@ public void configure(final WorkerConfig config) { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); - NewTopic topicDescription = TopicAdmin.defineTopic(topic). - compacted(). - partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)). - replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)). - build(); - - offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps); + Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); + NewTopic topicDescription = TopicAdmin.defineTopic(topic) + .compacted() + .partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)) + .replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)) + .build(); + + offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, - final NewTopic topicDescription, final Map adminProps) { - Runnable createTopics = new Runnable() { - @Override - public void run() { - log.debug("Creating admin client to manage Connect internal offset topic"); - try (TopicAdmin admin = new TopicAdmin(adminProps)) { - admin.createTopics(topicDescription); - } - } + final NewTopic topicDescription, Supplier adminSupplier) { + java.util.function.Consumer createTopics = admin -> { + log.debug("Creating admin client to manage Connect internal offset topic"); + // Create the topic if it doesn't exist + admin.createTopics(topicDescription); }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index c8eace72934a5..bdd6b5123ba71 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -59,6 +59,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; /** * StatusBackingStore implementation which uses a compacted topic for storage @@ -126,17 +127,24 @@ public class KafkaStatusBackingStore implements StatusBackingStore { protected final Table> tasks; protected final Map> connectors; protected final ConcurrentMap> topics; + private final Supplier topicAdminSupplier; private String statusTopic; private KafkaBasedLog kafkaLog; private int generation; + @Deprecated public KafkaStatusBackingStore(Time time, Converter converter) { + this(time, converter, null); + } + + public KafkaStatusBackingStore(Time time, Converter converter, Supplier topicAdminSupplier) { this.time = time; this.converter = converter; this.tasks = new Table<>(); this.connectors = new HashMap<>(); this.topics = new ConcurrentHashMap<>(); + this.topicAdminSupplier = topicAdminSupplier; } // visible for testing @@ -163,11 +171,12 @@ public void configure(final WorkerConfig config) { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); - NewTopic topicDescription = TopicAdmin.defineTopic(statusTopic). - compacted(). - partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)). - replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)). - build(); + Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); + NewTopic topicDescription = TopicAdmin.defineTopic(statusTopic) + .compacted() + .partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)) + .replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)) + .build(); Callback> readCallback = new Callback>() { @Override @@ -175,23 +184,19 @@ public void onCompletion(Throwable error, ConsumerRecord record) read(record); } }; - this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminProps); + this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, - final NewTopic topicDescription, final Map adminProps) { - Runnable createTopics = new Runnable() { - @Override - public void run() { - log.debug("Creating admin client to manage Connect internal status topic"); - try (TopicAdmin admin = new TopicAdmin(adminProps)) { - admin.createTopics(topicDescription); - } - } + final NewTopic topicDescription, Supplier adminSupplier) { + java.util.function.Consumer createTopics = admin -> { + log.debug("Creating admin client to manage Connect internal status topic"); + // Create the topic if it doesn't exist + admin.createTopics(topicDescription); }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, time, createTopics); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 5248715aa6293..6a2a787578e84 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -41,10 +41,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** @@ -79,13 +81,15 @@ public class KafkaBasedLog { private final Map producerConfigs; private final Map consumerConfigs; private final Callback> consumedCallback; + private final Supplier topicAdminSupplier; private Consumer consumer; private Producer producer; + private TopicAdmin admin; private Thread thread; private boolean stopRequested; private Queue> readLogEndOffsetCallbacks; - private Runnable initializer; + private java.util.function.Consumer initializer; /** * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until @@ -103,31 +107,63 @@ public class KafkaBasedLog { * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log * @param time Time interface * @param initializer the component that should be run when this log is {@link #start() started}; may be null + * @deprecated Replaced by {@link #KafkaBasedLog(String, Map, Map, Supplier, Callback, Time, java.util.function.Consumer)} */ + @Deprecated public KafkaBasedLog(String topic, Map producerConfigs, Map consumerConfigs, Callback> consumedCallback, Time time, Runnable initializer) { + this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null); + } + + /** + * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until + * {@link #start()} is invoked. + * + * @param topic the topic to treat as a log + * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must + * contain compatible serializer settings for the generic types used on this class. Some + * setting, such as the number of acks, will be overridden to ensure correct behavior of this + * class. + * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must + * contain compatible serializer settings for the generic types used on this class. Some + * setting, such as the auto offset reset policy, will be overridden to ensure correct + * behavior of this class. + * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled + * by the calling component; may not be null + * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log + * @param time Time interface + * @param initializer the function that should be run when this log is {@link #start() started}; may be null + */ + public KafkaBasedLog(String topic, + Map producerConfigs, + Map consumerConfigs, + Supplier topicAdminSupplier, + Callback> consumedCallback, + Time time, + java.util.function.Consumer initializer) { this.topic = topic; this.producerConfigs = producerConfigs; this.consumerConfigs = consumerConfigs; + this.topicAdminSupplier = Objects.requireNonNull(topicAdminSupplier); this.consumedCallback = consumedCallback; this.stopRequested = false; this.readLogEndOffsetCallbacks = new ArrayDeque<>(); this.time = time; - this.initializer = initializer != null ? initializer : new Runnable() { - @Override - public void run() { - } - }; + this.initializer = initializer != null ? initializer : admin -> { }; } public void start() { log.info("Starting KafkaBasedLog with topic " + topic); - initializer.run(); + // Create the topic admin client and initialize the topic ... + admin = topicAdminSupplier.get(); // may be null + initializer.accept(admin); + + // Then create the producer and consumer producer = createProducer(); consumer = createConsumer(); @@ -193,6 +229,9 @@ public void stop() { log.error("Failed to stop KafkaBasedLog consumer", e); } + // do not close the admin client, since we don't own it + admin = null; + log.info("Stopped KafkaBasedLog for topic " + topic); } @@ -282,7 +321,29 @@ private void readToLogEnd() { log.trace("Reading to end of offset log"); Set assignment = consumer.assignment(); - Map endOffsets = consumer.endOffsets(assignment); + Map endOffsets; + // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. + // That is because it's possible that the consumer is already blocked waiting for new records to appear, when + // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least + // one more record becomes available, meaning we can't even check whether we're at the end offset. + // Since all we're trying to do here is get the end offset, we should use the supplied admin client + // (if available) + // (which prevents 'consumer.endOffsets(...)' + // from + + // Deprecated constructors do not provide an admin supplier, so the admin is potentially null. + if (admin != null) { + // Use the admin client to immediately find the end offsets for the assigned topic partitions. + // Unlike using the consumer + endOffsets = admin.endOffsets(assignment); + } else { + // The admin may be null if older deprecated constructor is used, though AK Connect currently always provides an admin client. + // Using the consumer is not ideal, because when the topic has low volume, the 'poll(...)' method called from the + // work thread may have blocked the consumer while waiting for more records (even when there are none). + // In such cases, this call to the consumer to simply find the end offsets will block even though we might already be + // at the end offset. + endOffsets = consumer.endOffsets(assignment); + } log.trace("Reading to end of log offsets {}", endOffsets); while (!endOffsets.isEmpty()) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java new file mode 100644 index 0000000000000..4fda12cb6bcb2 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.connect.errors.ConnectException; + +/** + * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers. + * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin} + * instance until this SharedAdmin is closed via {@link #close()}. + * + *

The owner of this object is responsible for ensuring that either {@link #close()} + * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this + * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + * + *

This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object, + * until this object is closed, at which point it cannot be used anymore + */ +public class SharedTopicAdmin implements AutoCloseable, Supplier { + + private final Map adminProps; + private final AtomicReference admin = new AtomicReference<>(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Function, TopicAdmin> factory; + + public SharedTopicAdmin(Map adminProps) { + this(adminProps, TopicAdmin::new); + } + + // Visible for testing + SharedTopicAdmin(Map adminProps, Function, TopicAdmin> factory) { + this.adminProps = Objects.requireNonNull(adminProps); + this.factory = Objects.requireNonNull(factory); + } + + /** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ + @Override + public TopicAdmin get() { + return topicAdmin(); + } + + /** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ + public TopicAdmin topicAdmin() { + return admin.updateAndGet(this::createAdmin); + } + + /** + * Get the string containing the list of bootstrap server addresses to the Kafka broker(s) to which + * the admin client connects. + * + * @return the bootstrap servers as a string; never null + */ + public String bootstrapServers() { + return adminProps.getOrDefault(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "").toString(); + } + + /** + * Close the underlying {@link TopicAdmin} instance, if one has been created, and prevent new ones from being created. + * + *

Once this method is called, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + */ + @Override + public void close() { + if (this.closed.compareAndSet(false, true)) { + TopicAdmin admin = this.admin.getAndSet(null); + if (admin != null) { + admin.close(); + } + } + } + + @Override + public String toString() { + return "admin client for brokers at " + bootstrapServers(); + } + + /** + * Method used to create a {@link TopicAdmin} instance. This method must be side-effect free, since it is called from within + * the {@link AtomicReference#updateAndGet(UnaryOperator)}. + * + * @param existing the existing instance; may be null + * @return the + */ + protected TopicAdmin createAdmin(TopicAdmin existing) { + if (closed.get()) { + throw new ConnectException("The " + this + " has already been closed and cannot be used."); + } + if (existing != null) { + return existing; + } + return factory.apply(adminProps); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index e644e805ea613..e94561e91b5d5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -19,15 +19,22 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; -import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +44,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; /** * Utility to simplify creating and managing topics via the {@link Admin}. @@ -175,6 +184,14 @@ public TopicAdmin(Map adminConfig) { this.adminConfig = adminConfig != null ? adminConfig : Collections.emptyMap(); } + /** + * Get the {@link Admin} client used by this topic admin object. + * @return the Kafka admin instance; never null + */ + public Admin admin() { + return admin; + } + /** * Attempt to create the topic described by the given definition, returning true if the topic was created or false * if the topic already existed. @@ -268,6 +285,58 @@ public Set createTopics(NewTopic... topics) { return newlyCreatedTopicNames; } + /** + * Fetch the most recent offset for each of the supplied {@link TopicPartition} objects. + * + * @param partitions the topic partitions + * @return the map of offset for each topic partition, or an empty map if the supplied partitions + * are null or empty + * @throws RetriableException if a retriable error occurs, the operation takes too long, or the + * thread is interrupted while attempting to perform this operation + * @throws ConnectException if a non retriable error occurs + */ + public Map endOffsets(Set partitions) { + if (partitions == null || partitions.isEmpty()) { + return Collections.emptyMap(); + } + Map offsetSpecMap = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())); + ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap); + // Get the individual result for each topic partition so we have better error messages + Map result = new HashMap<>(); + for (TopicPartition partition : partitions) { + try { + ListOffsetsResultInfo info = resultFuture.partitionResult(partition).get(); + result.put(partition, info.offset()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + String topic = partition.topic(); + if (cause instanceof AuthorizationException) { + String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new ConnectException(msg, e); + } else if (cause instanceof UnsupportedVersionException) { + // Should theoretically never happen, because this method is the same as what the consumer uses and therefore + // should exist in the broker since before the admin client was added + String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, bootstrapServers()); + throw new ConnectException(msg, e); + } else if (cause instanceof TimeoutException) { + String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new RetriableException(msg, e); + } else if (cause instanceof LeaderNotAvailableException) { + String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new RetriableException(msg, e); + } else { + String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new ConnectException(msg, e); + } + } catch (InterruptedException e) { + Thread.interrupted(); + String msg = String.format("Interrupted while attempting to read end offsets for topic '%s' on brokers at %s", partition.topic(), bootstrapServers()); + throw new RetriableException(msg, e); + } + } + return result; + } + @Override public void close() { admin.close(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 04e18acd39023..2a96850d77744 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -78,6 +78,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -182,12 +183,10 @@ public class DistributedHerderTest { @Mock private Worker worker; @Mock private WorkerConfigTransformer transformer; @Mock private Callback> putConnectorCallback; - @Mock - private Plugins plugins; - @Mock - private PluginClassLoader pluginLoader; - @Mock - private DelegatingClassLoader delegatingLoader; + @Mock private Plugins plugins; + @Mock private PluginClassLoader pluginLoader; + @Mock private DelegatingClassLoader delegatingLoader; + private CountDownLatch shutdownCalled = new CountDownLatch(1); private ConfigBackingStore.UpdateListener configUpdateListener; private WorkerRebalanceListener rebalanceListener; @@ -205,6 +204,7 @@ public void setUp() throws Exception { metrics = new MockConnectMetrics(time); worker = PowerMock.createMock(Worker.class); EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE); + AutoCloseable uponShutdown = () -> shutdownCalled.countDown(); // Default to the old protocol unless specified otherwise connectProtocolVersion = CONNECT_PROTOCOL_V0; @@ -212,7 +212,8 @@ public void setUp() throws Exception { herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"}, new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID, - statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy); + statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy, + new AutoCloseable[]{uponShutdown}); configUpdateListener = herder.new ConfigUpdateListener(); rebalanceListener = herder.new RebalanceListener(time); @@ -2101,6 +2102,13 @@ public void testThreadNames() { getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor")); } + @Test + public void testHerderStopServicesClosesUponShutdown() { + assertEquals(1, shutdownCalled.getCount()); + herder.stopServices(); + assertEquals(0, shutdownCalled.getCount()); + } + private void expectRebalance(final long offset, final List assignedConnectors, final List assignedTasks) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 9a5569004cb3e..85f0e2ad6311a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TestFuture; +import org.apache.kafka.connect.util.TopicAdmin; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -56,6 +57,7 @@ import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -141,7 +143,7 @@ public class KafkaConfigBackingStoreTest { private Capture capturedTopic = EasyMock.newCapture(); private Capture> capturedProducerProps = EasyMock.newCapture(); private Capture> capturedConsumerProps = EasyMock.newCapture(); - private Capture> capturedAdminProps = EasyMock.newCapture(); + private Capture> capturedAdminSupplier = EasyMock.newCapture(); private Capture capturedNewTopic = EasyMock.newCapture(); private Capture>> capturedConsumedCallback = EasyMock.newCapture(); @@ -886,7 +888,7 @@ private void expectConfigure() throws Exception { PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback), - EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps)) + EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier)) .andReturn(storeLog); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index c04363b974605..05e5b5f2a93d2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -49,6 +50,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -105,7 +107,7 @@ public class KafkaOffsetBackingStoreTest { private Capture capturedTopic = EasyMock.newCapture(); private Capture> capturedProducerProps = EasyMock.newCapture(); private Capture> capturedConsumerProps = EasyMock.newCapture(); - private Capture> capturedAdminProps = EasyMock.newCapture(); + private Capture> capturedAdminSupplier = EasyMock.newCapture(); private Capture capturedNewTopic = EasyMock.newCapture(); private Capture>> capturedConsumedCallback = EasyMock.newCapture(); @@ -380,7 +382,7 @@ public void onCompletion(Throwable error, Void result) { private void expectConfigure() throws Exception { PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback), - EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps)) + EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier)) .andReturn(storeLog); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java new file mode 100644 index 0000000000000..9e1886a20a0a3 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; + +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.Rule; +import org.mockito.Mock; +import org.junit.Before; +import org.junit.Test; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SharedTopicAdminTest { + + private static final Map EMPTY_CONFIG = Collections.emptyMap(); + + @Rule + public MockitoRule rule = MockitoJUnit.rule(); + + @Mock private TopicAdmin mockTopicAdmin; + @Mock private Function, TopicAdmin> factory; + private SharedTopicAdmin sharedAdmin; + + @Before + public void beforeEach() { + when(factory.apply(anyMap())).thenReturn(mockTopicAdmin); + sharedAdmin = new SharedTopicAdmin(EMPTY_CONFIG, factory::apply); + } + + @Test + public void shouldCloseWithoutBeingUsed() { + // When closed before being used + sharedAdmin.close(); + // Then should not create or close admin + verifyTopicAdminCreatesAndCloses(0); + } + + @Test + public void shouldCloseAfterTopicAdminUsed() { + // When used and then closed + assertSame(mockTopicAdmin, sharedAdmin.topicAdmin()); + sharedAdmin.close(); + // Then should have created and closed just one admin + verifyTopicAdminCreatesAndCloses(1); + } + + @Test + public void shouldCloseAfterTopicAdminUsedMultipleTimes() { + // When used many times and then closed + for (int i = 0; i != 10; ++i) { + assertSame(mockTopicAdmin, sharedAdmin.topicAdmin()); + } + sharedAdmin.close(); + // Then should have created and closed just one admin + verifyTopicAdminCreatesAndCloses(1); + } + + @Test + public void shouldFailToGetTopicAdminAfterClose() { + // When closed + sharedAdmin.close(); + // Then using the admin should fail + assertThrows(ConnectException.class, () -> sharedAdmin.topicAdmin()); + } + + private void verifyTopicAdminCreatesAndCloses(int count) { + verify(factory, times(count)).apply(anyMap()); + verify(mockTopicAdmin, times(count)).close(); + } +} \ No newline at end of file diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index cfcab32c7754c..0b5be7c69c518 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -17,28 +17,43 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientUnitTestEnv; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.errors.ConnectException; import org.junit.Test; import java.util.Collections; import java.util.HashMap; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TopicAdminTest { @@ -129,6 +144,107 @@ public void shouldReturnFalseWhenSuppliedNullTopicDescription() { } } + @SuppressWarnings("unchecked") + private KafkaFuture mockFuture() { + return (KafkaFuture) mock(KafkaFuture.class); + } + + private Admin expectAdminListOffsetsFailure(Throwable expected) throws Exception { + // When the admin client lists offsets + Admin mockAdmin = mock(Admin.class); + ListOffsetsResult results = mock(ListOffsetsResult.class); + when(mockAdmin.listOffsets(anyMap())).thenReturn(results); + // and throws an exception via the future.get() + ExecutionException execException = new ExecutionException(expected); + KafkaFuture future = mockFuture(); + when(future.get()).thenThrow(execException); + when(results.partitionResult(any(TopicPartition.class))).thenReturn(future); + return mockAdmin; + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new AuthorizationException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new UnsupportedVersionException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new TimeoutException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new RuntimeException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + + @Test + public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // Then the topic admin should return immediately + Admin mockAdmin = mock(Admin.class); + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + Map offsets = admin.endOffsets(Collections.emptySet()); + assertTrue(offsets.isEmpty()); + } + + @Test(expected = ConnectException.class) + public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() throws Exception { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + + // When the admin client lists offsets throws an exception + Admin mockAdmin = expectAdminListOffsetsFailure(new AuthorizationException("failed")); + + // Then the topic admin should throw exception + TopicAdmin admin = new TopicAdmin(null, mockAdmin); + admin.endOffsets(tps); + } + private Cluster createCluster(int numNodes) { HashMap nodes = new HashMap<>(); for (int i = 0; i < numNodes; ++i) {