From 91fde469c09538b9df15297f63cc908ea848a13a Mon Sep 17 00:00:00 2001 From: tadashiya Date: Fri, 17 May 2024 12:55:24 +0900 Subject: [PATCH] Use TopicPartition.topic for metrics (#235) * Use TopicPartition.topic for metrics * Use variable and rename SubscriptionScope.topic to originTopic * Use topicPartition.topic() for ThreadUtilizationMetrics --- .../decaton/processor/runtime/ProcessorSubscription.java | 2 +- .../decaton/processor/runtime/SubscriptionBuilder.java | 2 +- .../processor/runtime/internal/AbstractSubPartitions.java | 6 ++++-- .../processor/runtime/internal/PartitionContext.java | 2 +- .../decaton/processor/runtime/internal/PartitionScope.java | 2 +- .../processor/runtime/internal/SubscriptionScope.java | 6 +++--- .../processor/runtime/internal/ThreadPoolSubPartitions.java | 6 ++++-- .../processor/runtime/ProcessorSubscriptionTest.java | 6 +++--- 8 files changed, 18 insertions(+), 14 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java index 79c121c0..54513c47 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java @@ -218,7 +218,7 @@ private void updateState(SubscriptionStateListener.State newState) { private Set subscribeTopics() { return Stream.concat( - Stream.of(Optional.of(scope.topic()), scope.retryTopic()) + Stream.of(Optional.of(scope.originTopic()), scope.retryTopic()) .filter(Optional::isPresent) .map(Optional::get), scope.shapingTopics().stream()).collect(Collectors.toSet()); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java index 93325c49..77b42029 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java @@ -322,7 +322,7 @@ private QuotaApplier quotaApplier(SubscriptionScope scope) { (properties, new ByteArraySerializer(), new ByteArraySerializer())); return new QuotaApplierImpl( producerSupplier.apply(producerConfig), - perKeyQuotaConfig.callbackSupplier().apply(scope.topic()), + perKeyQuotaConfig.callbackSupplier().apply(scope.originTopic()), scope); } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/AbstractSubPartitions.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/AbstractSubPartitions.java index 6cb8fafa..c9e27d5f 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/AbstractSubPartitions.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/AbstractSubPartitions.java @@ -29,6 +29,7 @@ import com.linecorp.decaton.processor.runtime.Property; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; /** * This class is responsible for following portions: @@ -57,10 +58,11 @@ protected AbstractSubPartitions(PartitionScope scope, Processors processors) shutdownTimeoutMillis = scope.props().get( ProcessorProperties.CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS); rateLimiter = new DynamicRateLimiter(rateProperty(scope)); + TopicPartition topicPartition = scope.topicPartition(); Metrics metrics = Metrics.withTags( "subscription", scope.subscriptionId(), - "topic", scope.topic(), - "partition", String.valueOf(scope.topicPartition().partition())); + "topic", topicPartition.topic(), + "partition", String.valueOf(topicPartition.partition())); taskMetrics = metrics.new TaskMetrics(); schedulerMetrics = metrics.new SchedulerMetrics(); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java index f32e0683..bd3f0f4a 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java @@ -103,7 +103,7 @@ private static SubPartitions createSubPartitions(PartitionScope scope, Processor scope.props().get(ProcessorProperties.CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS), metricsCtor.new CommitControlMetrics()); commitControl = new OutOfOrderCommitControl(scope.topicPartition(), capacity, offsetStateReaper); - if (scope.perKeyQuotaConfig().isPresent() && scope.topic().equals(scope.topicPartition().topic())) { + if (scope.perKeyQuotaConfig().isPresent() && scope.originTopic().equals(scope.topicPartition().topic())) { perKeyQuotaManager = PerKeyQuotaManager.create(scope); } else { perKeyQuotaManager = null; diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionScope.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionScope.java index 5053e6cc..6352095a 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionScope.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionScope.java @@ -29,7 +29,7 @@ public class PartitionScope extends SubscriptionScope { private final TopicPartition topicPartition; PartitionScope(SubscriptionScope parent, TopicPartition topicPartition) { - super(parent.subscriptionId(), parent.topic(), parent.subPartitionRuntime(), + super(parent.subscriptionId(), parent.originTopic(), parent.subPartitionRuntime(), parent.retryConfig(), parent.perKeyQuotaConfig(), parent.props(), parent.tracingProvider(), parent.maxPollRecords(), parent.subPartitionerSupplier()); this.topicPartition = topicPartition; diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/SubscriptionScope.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/SubscriptionScope.java index 10dd8246..d7cd7e17 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/SubscriptionScope.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/SubscriptionScope.java @@ -38,7 +38,7 @@ @Getter public class SubscriptionScope { private final String subscriptionId; - private final String topic; + private final String originTopic; private final SubPartitionRuntime subPartitionRuntime; private final Optional retryConfig; private final Optional perKeyQuotaConfig; @@ -48,11 +48,11 @@ public class SubscriptionScope { private final SubPartitionerSupplier subPartitionerSupplier; public Optional retryTopic() { - return retryConfig.map(conf -> conf.retryTopicOrDefault(topic)); + return retryConfig.map(conf -> conf.retryTopicOrDefault(originTopic)); } public Set shapingTopics() { - return perKeyQuotaConfig.map(conf -> conf.shapingTopicsSupplier().apply(topic)) + return perKeyQuotaConfig.map(conf -> conf.shapingTopicsSupplier().apply(originTopic)) .orElse(Collections.emptySet()); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ThreadPoolSubPartitions.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ThreadPoolSubPartitions.java index 5244ec7a..4d937102 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ThreadPoolSubPartitions.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ThreadPoolSubPartitions.java @@ -32,6 +32,7 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; @Slf4j public class ThreadPoolSubPartitions extends AbstractSubPartitions { @@ -67,10 +68,11 @@ public void addTask(TaskRequest request) { SubPartition subPartition = subPartitions[threadId]; if (subPartition == null) { ThreadScope threadScope = new ThreadScope(scope, threadId); + TopicPartition topicPartition = threadScope.topicPartition(); ThreadUtilizationMetrics metrics = Metrics.withTags("subscription", threadScope.subscriptionId(), - "topic", threadScope.topic(), - "partition", String.valueOf(threadScope.topicPartition().partition()), + "topic", topicPartition.topic(), + "partition", String.valueOf(topicPartition.partition()), "subpartition", String.valueOf(threadId)) .new ThreadUtilizationMetrics(); ExecutorService executor = createExecutorService(threadScope, metrics); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java index d3a58488..f3feff3b 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java @@ -138,7 +138,7 @@ private static ProcessorSubscription subscription(Consumer consu DecatonProcessor processor) { SubscriptionScope scope = scope(tp.topic(), 0L); ProcessorsBuilder builder = - ProcessorsBuilder.consuming(scope.topic(), + ProcessorsBuilder.consuming(scope.originTopic(), (byte[] bytes) -> new DecatonTask<>( TaskMetadata.builder().build(), new String(bytes), bytes)); @@ -276,7 +276,7 @@ public synchronized ConsumerRecords poll(Duration timeout) { scope, consumer, NoopQuotaApplier.INSTANCE, - ProcessorsBuilder.consuming(scope.topic(), + ProcessorsBuilder.consuming(scope.originTopic(), (byte[] bytes) -> new DecatonTask<>( TaskMetadata.builder().build(), "dummy", bytes)) .thenProcess(processor) @@ -350,7 +350,7 @@ public synchronized void commitSync(Map offse scope, consumer, NoopQuotaApplier.INSTANCE, - ProcessorsBuilder.consuming(scope.topic(), + ProcessorsBuilder.consuming(scope.originTopic(), (byte[] bytes) -> new DecatonTask<>( TaskMetadata.builder().build(), "dummy", bytes)) .thenProcess((ctx, task) -> {