diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index a975f31ed210c..e13873e2633e4 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,20 +234,28 @@ private void addHerder(SourceAndTarget sourceAndTarget) { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); + // Create the admin client to be shared by all backing stores for this herder + Map adminProps = new HashMap<>(config.originals()); + ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId); + SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin); offsetBackingStore.configure(distributedConfig); Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY); WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin); statusBackingStore.configure(distributedConfig); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, distributedConfig, - configTransformer); + configTransformer, + sharedAdmin); + // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the + // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than + // tracking the various shared admin objects in this class. Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, - advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY); + advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin); herders.put(sourceAndTarget, herder); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 22c1ad82d6138..8d93e795911b9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -36,12 +36,14 @@ import org.apache.kafka.connect.storage.KafkaStatusBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectUtils; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URI; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** @@ -101,7 +103,12 @@ public Connect startConnect(Map workerProps) { URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); + // Create the admin client to be shared by all backing stores. + Map adminProps = new HashMap<>(config.originals()); + ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId); + SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); + + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin); offsetBackingStore.configure(config); ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin( @@ -112,17 +119,20 @@ public Connect startConnect(Map workerProps) { WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin); statusBackingStore.configure(config); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, config, - configTransformer); + configTransformer, + sharedAdmin); + // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the + // herder is stopped. This is easier than having to track and own the lifecycle ourselves. DistributedHerder herder = new DistributedHerder(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, - advertisedUrl.toString(), connectorClientConfigOverridePolicy); + advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin); final Connect connect = new Connect(herder, rest); log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 7c6e8a6ab94c9..16dfbf970212c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.AlreadyExistsException; @@ -66,6 +67,7 @@ import javax.crypto.SecretKey; import javax.ws.rs.core.Response; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -138,6 +140,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final Time time; private final HerderMetrics herderMetrics; + private final List uponShutdown; private final String workerGroupId; private final int workerSyncTimeoutMs; @@ -185,6 +188,22 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final DistributedConfig config; + /** + * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs) + * that have the same group ID. + * + * @param config the configuration for the worker; may not be null + * @param time the clock to use; may not be null + * @param worker the {@link Worker} instance to use; may not be null + * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null + * @param statusBackingStore the backing store for statuses; may not be null + * @param configBackingStore the backing store for connector configurations; may not be null + * @param restUrl the URL of this herder's REST API; may not be null + * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden + * in connector configurations; may not be null + * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped}, + * after all services and resources owned by this herder are stopped + */ public DistributedHerder(DistributedConfig config, Time time, Worker worker, @@ -192,9 +211,10 @@ public DistributedHerder(DistributedConfig config, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), - time, connectorClientConfigOverridePolicy); + time, connectorClientConfigOverridePolicy, uponShutdown); configBackingStore.setUpdateListener(new ConfigUpdateListener()); } @@ -209,7 +229,8 @@ public DistributedHerder(DistributedConfig config, String restUrl, ConnectMetrics metrics, Time time, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy); this.time = time; @@ -223,6 +244,7 @@ public DistributedHerder(DistributedConfig config, this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG); this.keyGenerator = config.getInternalRequestKeyGenerator(); this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); + this.uponShutdown = Arrays.asList(uponShutdown); String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig; @@ -676,6 +698,15 @@ public void halt() { } } + @Override + protected void stopServices() { + try { + super.stopServices(); + } finally { + this.uponShutdown.forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "")); + } + } + @Override public void stop() { log.info("Herder stopping"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 685c3e023eb4d..d4e6358e2ea99 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -62,6 +62,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** *

@@ -224,6 +225,7 @@ public static String COMMIT_TASKS_KEY(String connectorName) { // Connector and task configs: name or id -> config map private final Map> connectorConfigs = new HashMap<>(); private final Map> taskConfigs = new HashMap<>(); + private final Supplier topicAdminSupplier; // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data // is in an inconsistent state and we cannot safely use them until they have been refreshed. @@ -241,11 +243,17 @@ public static String COMMIT_TASKS_KEY(String connectorName) { private final WorkerConfigTransformer configTransformer; + @Deprecated public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) { + this(converter, config, configTransformer, null); + } + + public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer, Supplier adminSupplier) { this.lock = new Object(); this.started = false; this.converter = converter; this.offset = -1; + this.topicAdminSupplier = adminSupplier; this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG); if (this.topic == null || this.topic.trim().length() == 0) @@ -471,6 +479,7 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo Map adminProps = new HashMap<>(originals); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); + Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); Map topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).configStorageTopicSettings() : Collections.emptyMap(); @@ -481,27 +490,25 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo .replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)) .build(); - return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps); + return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, - final NewTopic topicDescription, final Map adminProps) { - Runnable createTopics = () -> { + final NewTopic topicDescription, Supplier adminSupplier) { + java.util.function.Consumer createTopics = admin -> { log.debug("Creating admin client to manage Connect internal config topic"); - try (TopicAdmin admin = new TopicAdmin(adminProps)) { - // Create the topic if it doesn't exist - Set newTopics = admin.createTopics(topicDescription); - if (!newTopics.contains(topic)) { - // It already existed, so check that the topic cleanup policy is compact only and not delete - log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); - admin.verifyTopicCleanupPolicyOnlyCompact(topic, - DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations"); - } + // Create the topic if it doesn't exist + Set newTopics = admin.createTopics(topicDescription); + if (!newTopics.contains(topic)) { + // It already existed, so check that the topic cleanup policy is compact only and not delete + log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); + admin.verifyTopicCleanupPolicyOnlyCompact(topic, + DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations"); } }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); } @SuppressWarnings("unchecked") diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index a7e7c3b1aca07..26b47f996b18a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -41,11 +41,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** *

@@ -62,6 +64,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { private KafkaBasedLog offsetLog; private HashMap data; + private final Supplier topicAdminSupplier; + + @Deprecated + public KafkaOffsetBackingStore() { + this.topicAdminSupplier = null; + } + + public KafkaOffsetBackingStore(Supplier topicAdmin) { + this.topicAdminSupplier = Objects.requireNonNull(topicAdmin); + } @Override public void configure(final WorkerConfig config) { @@ -86,6 +98,7 @@ public void configure(final WorkerConfig config) { Map adminProps = new HashMap<>(originals); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); + Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); Map topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).offsetStorageTopicSettings() : Collections.emptyMap(); @@ -96,27 +109,25 @@ public void configure(final WorkerConfig config) { .replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)) .build(); - offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps); + offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, - final NewTopic topicDescription, final Map adminProps) { - Runnable createTopics = () -> { + final NewTopic topicDescription, Supplier adminSupplier) { + java.util.function.Consumer createTopics = admin -> { log.debug("Creating admin client to manage Connect internal offset topic"); - try (TopicAdmin admin = new TopicAdmin(adminProps)) { - // Create the topic if it doesn't exist - Set newTopics = admin.createTopics(topicDescription); - if (!newTopics.contains(topic)) { - // It already existed, so check that the topic cleanup policy is compact only and not delete - log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); - admin.verifyTopicCleanupPolicyOnlyCompact(topic, - DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets"); - } + // Create the topic if it doesn't exist + Set newTopics = admin.createTopics(topicDescription); + if (!newTopics.contains(topic)) { + // It already existed, so check that the topic cleanup policy is compact only and not delete + log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); + admin.verifyTopicCleanupPolicyOnlyCompact(topic, + DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets"); } }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index b1c6a2f514f83..efa405f3a4b9b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -61,6 +61,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; /** * StatusBackingStore implementation which uses a compacted topic for storage @@ -128,17 +129,24 @@ public class KafkaStatusBackingStore implements StatusBackingStore { protected final Table> tasks; protected final Map> connectors; protected final ConcurrentMap> topics; + private final Supplier topicAdminSupplier; private String statusTopic; private KafkaBasedLog kafkaLog; private int generation; + @Deprecated public KafkaStatusBackingStore(Time time, Converter converter) { + this(time, converter, null); + } + + public KafkaStatusBackingStore(Time time, Converter converter, Supplier topicAdminSupplier) { this.time = time; this.converter = converter; this.tasks = new Table<>(); this.connectors = new HashMap<>(); this.topics = new ConcurrentHashMap<>(); + this.topicAdminSupplier = topicAdminSupplier; } // visible for testing @@ -169,6 +177,7 @@ public void configure(final WorkerConfig config) { Map adminProps = new HashMap<>(originals); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); + Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); Map topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).statusStorageTopicSettings() @@ -181,27 +190,25 @@ public void configure(final WorkerConfig config) { .build(); Callback> readCallback = (error, record) -> read(record); - this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminProps); + this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, - final NewTopic topicDescription, final Map adminProps) { - Runnable createTopics = () -> { + final NewTopic topicDescription, Supplier adminSupplier) { + java.util.function.Consumer createTopics = admin -> { log.debug("Creating admin client to manage Connect internal status topic"); - try (TopicAdmin admin = new TopicAdmin(adminProps)) { - // Create the topic if it doesn't exist - Set newTopics = admin.createTopics(topicDescription); - if (!newTopics.contains(topic)) { - // It already existed, so check that the topic cleanup policy is compact only and not delete - log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); - admin.verifyTopicCleanupPolicyOnlyCompact(topic, - DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses"); - } + // Create the topic if it doesn't exist + Set newTopics = admin.createTopics(topicDescription); + if (!newTopics.contains(topic)) { + // It already existed, so check that the topic cleanup policy is compact only and not delete + log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); + admin.verifyTopicCleanupPolicyOnlyCompact(topic, + DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses"); } }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, time, createTopics); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 49c6cf203435b..6a2a787578e84 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -41,10 +41,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** @@ -79,13 +81,15 @@ public class KafkaBasedLog { private final Map producerConfigs; private final Map consumerConfigs; private final Callback> consumedCallback; + private final Supplier topicAdminSupplier; private Consumer consumer; private Producer producer; + private TopicAdmin admin; private Thread thread; private boolean stopRequested; private Queue> readLogEndOffsetCallbacks; - private Runnable initializer; + private java.util.function.Consumer initializer; /** * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until @@ -103,27 +107,63 @@ public class KafkaBasedLog { * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log * @param time Time interface * @param initializer the component that should be run when this log is {@link #start() started}; may be null + * @deprecated Replaced by {@link #KafkaBasedLog(String, Map, Map, Supplier, Callback, Time, java.util.function.Consumer)} */ + @Deprecated public KafkaBasedLog(String topic, Map producerConfigs, Map consumerConfigs, Callback> consumedCallback, Time time, Runnable initializer) { + this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null); + } + + /** + * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until + * {@link #start()} is invoked. + * + * @param topic the topic to treat as a log + * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must + * contain compatible serializer settings for the generic types used on this class. Some + * setting, such as the number of acks, will be overridden to ensure correct behavior of this + * class. + * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must + * contain compatible serializer settings for the generic types used on this class. Some + * setting, such as the auto offset reset policy, will be overridden to ensure correct + * behavior of this class. + * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled + * by the calling component; may not be null + * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log + * @param time Time interface + * @param initializer the function that should be run when this log is {@link #start() started}; may be null + */ + public KafkaBasedLog(String topic, + Map producerConfigs, + Map consumerConfigs, + Supplier topicAdminSupplier, + Callback> consumedCallback, + Time time, + java.util.function.Consumer initializer) { this.topic = topic; this.producerConfigs = producerConfigs; this.consumerConfigs = consumerConfigs; + this.topicAdminSupplier = Objects.requireNonNull(topicAdminSupplier); this.consumedCallback = consumedCallback; this.stopRequested = false; this.readLogEndOffsetCallbacks = new ArrayDeque<>(); this.time = time; - this.initializer = initializer != null ? initializer : () -> { }; + this.initializer = initializer != null ? initializer : admin -> { }; } public void start() { log.info("Starting KafkaBasedLog with topic " + topic); - initializer.run(); + // Create the topic admin client and initialize the topic ... + admin = topicAdminSupplier.get(); // may be null + initializer.accept(admin); + + // Then create the producer and consumer producer = createProducer(); consumer = createConsumer(); @@ -189,6 +229,9 @@ public void stop() { log.error("Failed to stop KafkaBasedLog consumer", e); } + // do not close the admin client, since we don't own it + admin = null; + log.info("Stopped KafkaBasedLog for topic " + topic); } @@ -278,7 +321,29 @@ private void readToLogEnd() { log.trace("Reading to end of offset log"); Set assignment = consumer.assignment(); - Map endOffsets = consumer.endOffsets(assignment); + Map endOffsets; + // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. + // That is because it's possible that the consumer is already blocked waiting for new records to appear, when + // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least + // one more record becomes available, meaning we can't even check whether we're at the end offset. + // Since all we're trying to do here is get the end offset, we should use the supplied admin client + // (if available) + // (which prevents 'consumer.endOffsets(...)' + // from + + // Deprecated constructors do not provide an admin supplier, so the admin is potentially null. + if (admin != null) { + // Use the admin client to immediately find the end offsets for the assigned topic partitions. + // Unlike using the consumer + endOffsets = admin.endOffsets(assignment); + } else { + // The admin may be null if older deprecated constructor is used, though AK Connect currently always provides an admin client. + // Using the consumer is not ideal, because when the topic has low volume, the 'poll(...)' method called from the + // work thread may have blocked the consumer while waiting for more records (even when there are none). + // In such cases, this call to the consumer to simply find the end offsets will block even though we might already be + // at the end offset. + endOffsets = consumer.endOffsets(assignment); + } log.trace("Reading to end of log offsets {}", endOffsets); while (!endOffsets.isEmpty()) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java new file mode 100644 index 0000000000000..a99514e2edade --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.connect.errors.ConnectException; + +/** + * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers. + * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin} + * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}. + * + *

The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)} + * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this + * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + * + *

This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object, + * until this object is closed, at which point it cannot be used anymore + */ +public class SharedTopicAdmin implements AutoCloseable, Supplier { + + // Visible for testing + static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE); + + private final Map adminProps; + private final AtomicReference admin = new AtomicReference<>(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Function, TopicAdmin> factory; + + public SharedTopicAdmin(Map adminProps) { + this(adminProps, TopicAdmin::new); + } + + // Visible for testing + SharedTopicAdmin(Map adminProps, Function, TopicAdmin> factory) { + this.adminProps = Objects.requireNonNull(adminProps); + this.factory = Objects.requireNonNull(factory); + } + + /** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ + @Override + public TopicAdmin get() { + return topicAdmin(); + } + + /** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ + public TopicAdmin topicAdmin() { + return admin.updateAndGet(this::createAdmin); + } + + /** + * Get the string containing the list of bootstrap server addresses to the Kafka broker(s) to which + * the admin client connects. + * + * @return the bootstrap servers as a string; never null + */ + public String bootstrapServers() { + return adminProps.getOrDefault(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "").toString(); + } + + /** + * Close the underlying {@link TopicAdmin} instance, if one has been created, and prevent new ones from being created. + * + *

Once this method is called, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + */ + @Override + public void close() { + close(DEFAULT_CLOSE_DURATION); + } + + /** + * Close the underlying {@link TopicAdmin} instance, if one has been created, and prevent new ones from being created. + * + *

Once this method is called, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + * + * @param timeout the maximum time to wait while the underlying admin client is closed; may not be null + */ + public void close(Duration timeout) { + Objects.requireNonNull(timeout); + if (this.closed.compareAndSet(false, true)) { + TopicAdmin admin = this.admin.getAndSet(null); + if (admin != null) { + admin.close(timeout); + } + } + } + + @Override + public String toString() { + return "admin client for brokers at " + bootstrapServers(); + } + + /** + * Method used to create a {@link TopicAdmin} instance. This method must be side-effect free, since it is called from within + * the {@link AtomicReference#updateAndGet(UnaryOperator)}. + * + * @param existing the existing instance; may be null + * @return the + */ + protected TopicAdmin createAdmin(TopicAdmin existing) { + if (closed.get()) { + throw new ConnectException("The " + this + " has already been closed and cannot be used."); + } + if (existing != null) { + return existing; + } + return factory.apply(adminProps); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 7b208e4301714..9a7907bcdafff 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -23,14 +23,20 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.DescribeConfigsOptions; import org.apache.kafka.clients.admin.DescribeTopicsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicExistsException; @@ -53,6 +59,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -283,6 +290,14 @@ public TopicAdmin(Map adminConfig) { } /** + * Get the {@link Admin} client used by this topic admin object. + * @return the Kafka admin instance; never null + */ + public Admin admin() { + return admin; + } + + /** * Attempt to create the topic described by the given definition, returning true if the topic was created or false * if the topic already existed. * @@ -630,6 +645,58 @@ public Map describeTopicConfigs(String... topicNames) { return result; } + /** + * Fetch the most recent offset for each of the supplied {@link TopicPartition} objects. + * + * @param partitions the topic partitions + * @return the map of offset for each topic partition, or an empty map if the supplied partitions + * are null or empty + * @throws RetriableException if a retriable error occurs, the operation takes too long, or the + * thread is interrupted while attempting to perform this operation + * @throws ConnectException if a non retriable error occurs + */ + public Map endOffsets(Set partitions) { + if (partitions == null || partitions.isEmpty()) { + return Collections.emptyMap(); + } + Map offsetSpecMap = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())); + ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap); + // Get the individual result for each topic partition so we have better error messages + Map result = new HashMap<>(); + for (TopicPartition partition : partitions) { + try { + ListOffsetsResultInfo info = resultFuture.partitionResult(partition).get(); + result.put(partition, info.offset()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + String topic = partition.topic(); + if (cause instanceof AuthorizationException) { + String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new ConnectException(msg, e); + } else if (cause instanceof UnsupportedVersionException) { + // Should theoretically never happen, because this method is the same as what the consumer uses and therefore + // should exist in the broker since before the admin client was added + String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, bootstrapServers()); + throw new ConnectException(msg, e); + } else if (cause instanceof TimeoutException) { + String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new RetriableException(msg, e); + } else if (cause instanceof LeaderNotAvailableException) { + String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new RetriableException(msg, e); + } else { + String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); + throw new ConnectException(msg, e); + } + } catch (InterruptedException e) { + Thread.interrupted(); + String msg = String.format("Interrupted while attempting to read end offsets for topic '%s' on brokers at %s", partition.topic(), bootstrapServers()); + throw new RetriableException(msg, e); + } + } + return result; + } + @Override public void close() { admin.close(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 5bddcf76e3c10..e31a03f11b614 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -75,6 +75,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -189,6 +190,7 @@ public class DistributedHerderTest { @Mock private Plugins plugins; @Mock private PluginClassLoader pluginLoader; @Mock private DelegatingClassLoader delegatingLoader; + private CountDownLatch shutdownCalled = new CountDownLatch(1); private ConfigBackingStore.UpdateListener configUpdateListener; private WorkerRebalanceListener rebalanceListener; @@ -206,6 +208,7 @@ public void setUp() throws Exception { metrics = new MockConnectMetrics(time); worker = PowerMock.createMock(Worker.class); EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE); + AutoCloseable uponShutdown = () -> shutdownCalled.countDown(); // Default to the old protocol unless specified otherwise connectProtocolVersion = CONNECT_PROTOCOL_V0; @@ -213,7 +216,8 @@ public void setUp() throws Exception { herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"}, new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID, - statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy); + statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy, + new AutoCloseable[]{uponShutdown}); configUpdateListener = herder.new ConfigUpdateListener(); rebalanceListener = herder.new RebalanceListener(time); @@ -2211,6 +2215,13 @@ public void testThreadNames() { getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor")); } + @Test + public void testHerderStopServicesClosesUponShutdown() { + assertEquals(1, shutdownCalled.getCount()); + herder.stopServices(); + assertEquals(0, shutdownCalled.getCount()); + } + private void expectRebalance(final long offset, final List assignedConnectors, final List assignedTasks) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index eca291e5a8856..4504d3997675c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TestFuture; +import org.apache.kafka.connect.util.TopicAdmin; import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.Before; @@ -55,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -140,7 +142,7 @@ public class KafkaConfigBackingStoreTest { private Capture capturedTopic = EasyMock.newCapture(); private Capture> capturedProducerProps = EasyMock.newCapture(); private Capture> capturedConsumerProps = EasyMock.newCapture(); - private Capture> capturedAdminProps = EasyMock.newCapture(); + private Capture> capturedAdminSupplier = EasyMock.newCapture(); private Capture capturedNewTopic = EasyMock.newCapture(); private Capture>> capturedConsumedCallback = EasyMock.newCapture(); @@ -893,7 +895,7 @@ private void expectConfigure() throws Exception { PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback), - EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps)) + EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier)) .andReturn(storeLog); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index 0d7c13352a29f..d21606809ccb1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.Before; @@ -49,6 +50,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -105,7 +107,7 @@ public class KafkaOffsetBackingStoreTest { private Capture capturedTopic = EasyMock.newCapture(); private Capture> capturedProducerProps = EasyMock.newCapture(); private Capture> capturedConsumerProps = EasyMock.newCapture(); - private Capture> capturedAdminProps = EasyMock.newCapture(); + private Capture> capturedAdminSupplier = EasyMock.newCapture(); private Capture capturedNewTopic = EasyMock.newCapture(); private Capture>> capturedConsumedCallback = EasyMock.newCapture(); @@ -370,7 +372,7 @@ public void testSetFailure() throws Exception { private void expectConfigure() throws Exception { PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback), - EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps)) + EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier)) .andReturn(storeLog); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java new file mode 100644 index 0000000000000..f5ac6a730fa99 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; + +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.Rule; +import org.mockito.Mock; +import org.junit.Before; +import org.junit.Test; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import static org.apache.kafka.connect.util.SharedTopicAdmin.DEFAULT_CLOSE_DURATION; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SharedTopicAdminTest { + + private static final Map EMPTY_CONFIG = Collections.emptyMap(); + + @Rule + public MockitoRule rule = MockitoJUnit.rule(); + + @Mock private TopicAdmin mockTopicAdmin; + @Mock private Function, TopicAdmin> factory; + private SharedTopicAdmin sharedAdmin; + + @Before + public void beforeEach() { + when(factory.apply(anyMap())).thenReturn(mockTopicAdmin); + sharedAdmin = new SharedTopicAdmin(EMPTY_CONFIG, factory::apply); + } + + @Test + public void shouldCloseWithoutBeingUsed() { + // When closed before being used + sharedAdmin.close(); + // Then should not create or close admin + verifyTopicAdminCreatesAndCloses(0); + } + + @Test + public void shouldCloseAfterTopicAdminUsed() { + // When used and then closed + assertSame(mockTopicAdmin, sharedAdmin.topicAdmin()); + sharedAdmin.close(); + // Then should have created and closed just one admin + verifyTopicAdminCreatesAndCloses(1); + } + + @Test + public void shouldCloseAfterTopicAdminUsedMultipleTimes() { + // When used many times and then closed + for (int i = 0; i != 10; ++i) { + assertSame(mockTopicAdmin, sharedAdmin.topicAdmin()); + } + sharedAdmin.close(); + // Then should have created and closed just one admin + verifyTopicAdminCreatesAndCloses(1); + } + + @Test + public void shouldCloseWithDurationAfterTopicAdminUsed() { + // When used and then closed with a custom timeout + Duration timeout = Duration.ofSeconds(1); + assertSame(mockTopicAdmin, sharedAdmin.topicAdmin()); + sharedAdmin.close(timeout); + // Then should have created and closed just one admin using the supplied timeout + verifyTopicAdminCreatesAndCloses(1, timeout); + } + + @Test + public void shouldFailToGetTopicAdminAfterClose() { + // When closed + sharedAdmin.close(); + // Then using the admin should fail + assertThrows(ConnectException.class, () -> sharedAdmin.topicAdmin()); + } + + private void verifyTopicAdminCreatesAndCloses(int count) { + verifyTopicAdminCreatesAndCloses(count, DEFAULT_CLOSE_DURATION); + } + + private void verifyTopicAdminCreatesAndCloses(int count, Duration expectedDuration) { + verify(factory, times(count)).apply(anyMap()); + verify(mockTopicAdmin, times(count)).close(eq(expectedDuration)); + } +} \ No newline at end of file diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 646da7135543f..9ba0b1d0aabff 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -26,6 +26,8 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; @@ -36,19 +38,26 @@ import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.message.DescribeConfigsResponseData; +import org.apache.kafka.common.message.ListOffsetsResponseData; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; import org.apache.kafka.common.message.MetadataResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DescribeConfigsResponse; +import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -456,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() { } } + @Test + public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + Long offset = null; // response should use error + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + ConnectException e = assertThrows(ConnectException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("Not authorized to get the end offsets")); + } + } + + @Test + public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + Long offset = null; // response should use error + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + ConnectException e = assertThrows(ConnectException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("is unsupported on brokers")); + } + } + + @Test + public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + Long offset = null; // response should use error + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + RetriableException e = assertThrows(RetriableException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("Timed out while waiting")); + } + } + + @Test + public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + Long offset = null; // response should use error + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + ConnectException e = assertThrows(ConnectException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("Error while getting end offsets for topic")); + } + } + + @Test + public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() { + String topicName = "myTopic"; + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + Map offsets = admin.endOffsets(Collections.emptySet()); + assertTrue(offsets.isEmpty()); + } + } + + @Test + public void endOffsetsShouldReturnOffsetsForOnePartition() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + long offset = 1000L; + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + Map offsets = admin.endOffsets(tps); + assertEquals(1, offsets.size()); + assertEquals(Long.valueOf(offset), offsets.get(tp1)); + } + } + + @Test + public void endOffsetsShouldReturnOffsetsForMultiplePartitions() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + TopicPartition tp2 = new TopicPartition(topicName, 1); + Set tps = new HashSet<>(Arrays.asList(tp1, tp2)); + long offset1 = 1001; + long offset2 = 1002; + Cluster cluster = createCluster(1, topicName, 2); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset1, tp2, offset2)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + Map offsets = admin.endOffsets(tps); + assertEquals(2, offsets.size()); + assertEquals(Long.valueOf(offset1), offsets.get(tp1)); + assertEquals(Long.valueOf(offset2), offsets.get(tp2)); + } + } + + @Test + public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set tps = Collections.singleton(tp1); + long offset = 1000; + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, null)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + ConnectException e = assertThrows(ConnectException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("Not authorized to get the end offsets")); + } + } + private Cluster createCluster(int numNodes) { + return createCluster(numNodes, "unused", 0); + } + + private Cluster createCluster(int numNodes, String topicName, int partitions) { + Node[] nodeArray = new Node[numNodes]; HashMap nodes = new HashMap<>(); for (int i = 0; i < numNodes; ++i) { - nodes.put(i, new Node(i, "localhost", 8121 + i)); - } - Cluster cluster = new Cluster("mockClusterId", nodes.values(), - Collections.emptySet(), Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); + nodeArray[i] = new Node(i, "localhost", 8121 + i); + nodes.put(i, nodeArray[i]); + } + Node leader = nodeArray[0]; + List pInfos = new ArrayList<>(); + for (int i = 0; i < partitions; ++i) { + pInfos.add(new PartitionInfo(topicName, i, leader, nodeArray, nodeArray)); + } + Cluster cluster = new Cluster( + "mockClusterId", + nodes.values(), + pInfos, + Collections.emptySet(), + Collections.emptySet(), + leader); return cluster; } + private MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { + List metadata = new ArrayList<>(); + for (String topic : cluster.topics()) { + List pms = new ArrayList<>(); + for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { + MetadataResponseData.MetadataResponsePartition pm = new MetadataResponseData.MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(pInfo.partition()) + .setLeaderId(pInfo.leader().id()) + .setLeaderEpoch(234) + .setReplicaNodes(Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList())) + .setIsrNodes(Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList())) + .setOfflineReplicas(Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList())); + pms.add(pm); + } + MetadataResponseTopic tm = new MetadataResponseTopic() + .setErrorCode(error.code()) + .setName(topic) + .setIsInternal(false) + .setPartitions(pms); + metadata.add(tm); + } + return MetadataResponse.prepareResponse(true, + 0, + cluster.nodes(), + cluster.clusterResource().clusterId(), + cluster.controller().id(), + metadata, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); + } + + private ListOffsetsResponse listOffsetsResultWithUnknownError(TopicPartition tp1, Long offset1) { + return listOffsetsResult( + new ApiError(Errors.UNKNOWN_SERVER_ERROR, "Unknown error"), + Collections.singletonMap(tp1, offset1) + ); + } + + private ListOffsetsResponse listOffsetsResultWithTimeout(TopicPartition tp1, Long offset1) { + return listOffsetsResult( + new ApiError(Errors.REQUEST_TIMED_OUT, "Request timed out"), + Collections.singletonMap(tp1, offset1) + ); + } + + private ListOffsetsResponse listOffsetsResultWithUnsupportedVersion(TopicPartition tp1, Long offset1) { + return listOffsetsResult( + new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), + Collections.singletonMap(tp1, offset1) + ); + } + + private ListOffsetsResponse listOffsetsResultWithClusterAuthorizationException(TopicPartition tp1, Long offset1) { + return listOffsetsResult( + new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), + Collections.singletonMap(tp1, offset1) + ); + } + + private ListOffsetsResponse listOffsetsResult(TopicPartition tp1, Long offset1) { + return listOffsetsResult(null, Collections.singletonMap(tp1, offset1)); + } + + private ListOffsetsResponse listOffsetsResult(TopicPartition tp1, Long offset1, TopicPartition tp2, Long offset2) { + Map offsetsByPartitions = new HashMap<>(); + offsetsByPartitions.put(tp1, offset1); + offsetsByPartitions.put(tp2, offset2); + return listOffsetsResult(null, offsetsByPartitions); + } + + /** + * Create a ListOffsetResponse that exposes the supplied error and includes offsets for the supplied partitions. + * @param error the error; may be null if an unknown error should be used + * @param offsetsByPartitions offset for each partition, where offset is null signals the error should be used + * @return the response + */ + private ListOffsetsResponse listOffsetsResult(ApiError error, Map offsetsByPartitions) { + if (error == null) error = new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "unknown topic"); + List tpResponses = new ArrayList<>(); + for (TopicPartition partition : offsetsByPartitions.keySet()) { + Long offset = offsetsByPartitions.get(partition); + ListOffsetsTopicResponse topicResponse; + if (offset == null) { + topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(partition, error.error(), -1L, 0, 321); + } else { + topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(partition, Errors.NONE, -1L, offset, 321); + } + tpResponses.add(topicResponse); + } + ListOffsetsResponseData responseData = new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(tpResponses); + + return new ListOffsetsResponse(responseData); + } + private CreateTopicsResponse createTopicResponseWithUnsupportedVersion(NewTopic... topics) { return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics); }