Skip to content

Commit 67c6e5a

Browse files
committed
[da-vinci][server][controller][router][fc][tc][test]: Close async OTel
instruments properly with a uniform lifecycle Stats classes that own ASYNC_GAUGE / ASYNC_COUNTER wrappers previously had no consistent way to deregister the SDK-side callbacks at shutdown, so the OTel SDK kept polling deleted stores and dropped instruments. This change introduces a single close pattern and applies it uniformly across every stats class that owns metric wrappers. Lifecycle primitives (venice-client-common): - CompositeCloseable: idempotent LIFO registry of Closeables. - AbstractStatsCloseable: base providing the statsCloseables field and a default close() that drains it. - MetricEntityStateUtils.closeQuietly / closeAndClear. - MetricEntityState* and AsyncMetricEntityState* factories now take a CompositeCloseable so each wrapper self-registers and gets closed (and async callbacks deregistered) when its owner shuts down. Uniform adoption across stats and owner classes: - AbstractVeniceAggVersionedStats extends AbstractStatsCloseable; close() unregisters the metadata listener then drains the registry. - Per-store PerStoreEntry inner classes (AggVersionedDIVStats, AggVersionedDaVinciRecordTransformerStats, ServerMetadataServiceStats, NativeMetadataRepositoryStats, BackupVersionOptimizationServiceStats) extend AbstractStatsCloseable so handleStoreDeleted closes the per-store wrappers and deregisters their SDK callbacks. - AbstractVeniceService-based owners (HelixParticipation, KafkaStoreIngestion, AggKafkaConsumer, AdaptiveThrottlerSignal, StoreBuffer, HeartbeatMonitoring) hold their own statsCloseables and drain it from stopInner / clear / shutdown. - VeniceServer, HelixVeniceClusterResources, PartitionedProducerExecutor, NativeMetadataRepository, ErrorPartitionResetTask, DaVinciBackend, and the two VeniceChangelogConsumer* classes extend AbstractStatsCloseable. Simplifications discovered while standardizing: - Drop the StoreDataChangedListener pattern from BackupVersionOptimizationServiceStats and ServerMetadataServiceStats (sync COUNTERs don't need SDK-callback deregistration). - Revert StoreVersionOtelStats to its original Map<String, AtomicReference<VersionInfo>> design; handleStoreDeleted resets to NON_EXISTING. Delete the now-unused StoreDataChangedListenerRegistration. - Remove the volatile boolean closed flags (4 classes) — bounded post-close leak is acceptable and matches sibling classes. Helper consolidation: - OpenTelemetryMetricsSetup.buildStoreDimensionsMap(baseDims, storeName) replaces 5 private helpers plus 3 inline patterns and sanitizes the store name consistently. - MetricsRepositoryUtils.createOtelEnabledRepository / createOtelDisabledRepository replace OTel test-setup boilerplate across ~13 test classes.
1 parent 510b37f commit 67c6e5a

149 files changed

Lines changed: 3601 additions & 1388 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@
6565
import com.linkedin.venice.service.AbstractVeniceService;
6666
import com.linkedin.venice.service.ICProvider;
6767
import com.linkedin.venice.stats.TehutiUtils;
68+
import com.linkedin.venice.stats.metrics.AbstractStatsCloseable;
6869
import com.linkedin.venice.utils.DaemonThreadFactory;
6970
import com.linkedin.venice.utils.LogContext;
7071
import com.linkedin.venice.utils.Utils;
7172
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
7273
import io.tehuti.metrics.MetricsRepository;
73-
import java.io.Closeable;
7474
import java.util.Collections;
7575
import java.util.HashSet;
7676
import java.util.Map;
@@ -100,7 +100,7 @@
100100
* the shared behavior of this class. Regular clients participate in version swaps while version-specific
101101
* clients subscribe to a fixed version and ignore version swap events.
102102
*/
103-
public class DaVinciBackend implements Closeable {
103+
public class DaVinciBackend extends AbstractStatsCloseable {
104104
private static final Logger LOGGER = LogManager.getLogger(DaVinciBackend.class);
105105

106106
// Client type tracking for version-specific vs regular clients
@@ -189,16 +189,18 @@ public DaVinciBackend(
189189
configLoader.getVeniceClusterConfig().getClusterName());
190190

191191
// OTel per-store version gauge
192-
storeVersionOtelStats = StoreVersionOtelStats
193-
.create(metricsRepository, configLoader.getVeniceClusterConfig().getClusterName(), storeRepository);
194-
195-
rocksDBMemoryStats = backendConfig.isDatabaseMemoryStatsEnabled()
196-
? new RocksDBMemoryStats(
197-
metricsRepository,
198-
"RocksDBMemoryStats",
199-
backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled(),
200-
configLoader.getVeniceClusterConfig().getClusterName())
201-
: null;
192+
storeVersionOtelStats = statsCloseables.register(
193+
StoreVersionOtelStats
194+
.create(metricsRepository, configLoader.getVeniceClusterConfig().getClusterName(), storeRepository));
195+
196+
rocksDBMemoryStats = statsCloseables.register(
197+
backendConfig.isDatabaseMemoryStatsEnabled()
198+
? new RocksDBMemoryStats(
199+
metricsRepository,
200+
"RocksDBMemoryStats",
201+
backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled(),
202+
configLoader.getVeniceClusterConfig().getClusterName())
203+
: null);
202204

203205
/**
204206
* The constructor of {@link #storageService} will take care of unused store/store version cleanup.
@@ -298,8 +300,11 @@ public DaVinciBackend(
298300
ingestionService.start();
299301

300302
if (BlobTransferUtils.isBlobTransferManagerEnabled(backendConfig)) {
301-
aggVersionedBlobTransferStats =
302-
new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig());
303+
aggVersionedBlobTransferStats = statsCloseables.register(
304+
new AggVersionedBlobTransferStats(
305+
metricsRepository,
306+
storeRepository,
307+
configLoader.getVeniceServerConfig()));
303308
aggBlobTransferStats =
304309
new AggBlobTransferStats(aggVersionedBlobTransferStats, ingestionService.getHostLevelIngestionStats());
305310
P2PBlobTransferConfig p2PBlobTransferConfig = new P2PBlobTransferConfig(
@@ -434,9 +439,7 @@ public synchronized void close() {
434439
cacheBackend.ifPresent(
435440
objectCacheBackend -> storeRepository
436441
.unregisterStoreDataChangedListener(objectCacheBackend.getCacheInvalidatingStoreChangeListener()));
437-
if (storeVersionOtelStats != null) {
438-
storeVersionOtelStats.close();
439-
}
442+
super.close();
440443
ExecutorService storeBackendCloseExecutor = Executors.newCachedThreadPool(
441444
new DaemonThreadFactory(
442445
"DaVinciBackend-StoreBackend-Close",

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerDaVinciRecordTransformerImpl.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.linkedin.venice.pubsub.api.PubSubPosition;
2828
import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition;
2929
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
30+
import com.linkedin.venice.stats.metrics.AbstractStatsCloseable;
3031
import com.linkedin.venice.utils.DaemonThreadFactory;
3132
import com.linkedin.venice.utils.ExceptionUtils;
3233
import com.linkedin.venice.utils.LogContext;
@@ -64,7 +65,7 @@
6465
import org.apache.logging.log4j.Logger;
6566

6667

67-
public class VeniceChangelogConsumerDaVinciRecordTransformerImpl<K, V>
68+
public class VeniceChangelogConsumerDaVinciRecordTransformerImpl<K, V> extends AbstractStatsCloseable
6869
implements StatefulVeniceChangelogConsumer<K, V>, VeniceChangelogConsumer<K, V> {
6970
private static final Logger LOGGER = LogManager.getLogger(VeniceChangelogConsumerDaVinciRecordTransformerImpl.class);
7071
private long START_TIMEOUT_IN_SECONDS = 60;
@@ -184,10 +185,11 @@ public VeniceChangelogConsumerDaVinciRecordTransformerImpl(
184185
}
185186

186187
if (changelogClientConfig.getInnerClientConfig().getMetricsRepository() != null) {
187-
this.changeCaptureStats = new BasicConsumerStats(
188-
changelogClientConfig.getInnerClientConfig().getMetricsRepository(),
189-
"vcc-" + changelogClientConfig.getConsumerName(),
190-
storeName);
188+
this.changeCaptureStats = statsCloseables.register(
189+
new BasicConsumerStats(
190+
changelogClientConfig.getInnerClientConfig().getMetricsRepository(),
191+
"vcc-" + changelogClientConfig.getConsumerName(),
192+
storeName));
191193
} else {
192194
changeCaptureStats = null;
193195
}
@@ -439,12 +441,15 @@ public void resume() {
439441
this.resume(Collections.emptySet());
440442
}
441443

444+
@Override
442445
public void close() {
443446
try {
444447
this.stop();
445448
} catch (Exception e) {
446449
LOGGER.error("Close failed for VeniceChangelogConsumer", e);
447450
throw new RuntimeException(e);
451+
} finally {
452+
super.close();
448453
}
449454
}
450455

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.linkedin.venice.serialization.avro.AvroSpecificStoreDeserializerCache;
5858
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
5959
import com.linkedin.venice.serializer.RecordDeserializer;
60+
import com.linkedin.venice.stats.metrics.AbstractStatsCloseable;
6061
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
6162
import com.linkedin.venice.utils.ByteUtils;
6263
import com.linkedin.venice.utils.DaemonThreadFactory;
@@ -102,7 +103,7 @@
102103
import org.apache.logging.log4j.Logger;
103104

104105

105-
public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsumer<K, V> {
106+
public class VeniceChangelogConsumerImpl<K, V> extends AbstractStatsCloseable implements VeniceChangelogConsumer<K, V> {
106107
private static final Logger LOGGER = LogManager.getLogger(VeniceChangelogConsumerImpl.class);
107108
private static final int MAX_SUBSCRIBE_RETRIES = 5;
108109
private static final String ROCKSDB_BUFFER_FOLDER = "rocksdb-chunk-buffer";
@@ -250,10 +251,11 @@ public VeniceChangelogConsumerImpl(
250251
}
251252

252253
if (changelogClientConfig.getInnerClientConfig().getMetricsRepository() != null) {
253-
this.changeCaptureStats = new BasicConsumerStats(
254-
changelogClientConfig.getInnerClientConfig().getMetricsRepository(),
255-
"vcc-" + changelogClientConfig.getConsumerName(),
256-
storeName);
254+
this.changeCaptureStats = statsCloseables.register(
255+
new BasicConsumerStats(
256+
changelogClientConfig.getInnerClientConfig().getMetricsRepository(),
257+
"vcc-" + changelogClientConfig.getConsumerName(),
258+
storeName));
257259
} else {
258260
changeCaptureStats = null;
259261
}
@@ -1169,18 +1171,22 @@ public void close() {
11691171
LOGGER.info("Closing Changelog Consumer with name: {}", changelogClientConfig.getConsumerName());
11701172
subscriptionLock.writeLock().lock();
11711173
try {
1172-
this.unsubscribeAll();
1173-
pubSubConsumer.close();
1174-
heartbeatReporterThread.interrupt();
1175-
seekExecutorService.shutdown();
1176-
compressorFactory.close();
1177-
1178-
if (rocksDBStorageEngineFactory != null) {
1179-
rocksDBStorageEngineFactory.close();
1180-
}
1174+
try {
1175+
this.unsubscribeAll();
1176+
pubSubConsumer.close();
1177+
heartbeatReporterThread.interrupt();
1178+
seekExecutorService.shutdown();
1179+
compressorFactory.close();
1180+
1181+
if (rocksDBStorageEngineFactory != null) {
1182+
rocksDBStorageEngineFactory.close();
1183+
}
11811184

1182-
veniceChangelogConsumerClientFactory.deregisterClient(changelogClientConfig.getConsumerName());
1183-
LOGGER.info("Closed Changelog Consumer with name: {}", changelogClientConfig.getConsumerName());
1185+
veniceChangelogConsumerClientFactory.deregisterClient(changelogClientConfig.getConsumerName());
1186+
LOGGER.info("Closed Changelog Consumer with name: {}", changelogClientConfig.getConsumerName());
1187+
} finally {
1188+
super.close();
1189+
}
11841190
} finally {
11851191
subscriptionLock.writeLock().unlock();
11861192
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/stats/BasicConsumerStats.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
7474
BasicConsumerTehutiMetricName.MAX_PARTITION_LAG,
7575
Collections.singletonList(new Max()),
7676
baseDimensionsMap,
77-
baseAttributes);
77+
baseAttributes,
78+
resources);
7879

7980
minimumConsumingVersionMetric = MetricEntityStateBase.create(
8081
BasicConsumerMetricEntity.CURRENT_CONSUMING_VERSION.getMetricEntity(),
@@ -83,7 +84,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
8384
BasicConsumerTehutiMetricName.MINIMUM_CONSUMING_VERSION,
8485
Collections.singletonList(new Gauge()),
8586
baseDimensionsMap,
86-
baseAttributes);
87+
baseAttributes,
88+
resources);
8789

8890
maximumConsumingVersionMetric = MetricEntityStateBase.create(
8991
BasicConsumerMetricEntity.CURRENT_CONSUMING_VERSION.getMetricEntity(),
@@ -92,7 +94,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
9294
BasicConsumerTehutiMetricName.MAXIMUM_CONSUMING_VERSION,
9395
Collections.singletonList(new Gauge()),
9496
baseDimensionsMap,
95-
baseAttributes);
97+
baseAttributes,
98+
resources);
9699

97100
recordsConsumedCountMetric = MetricEntityStateBase.create(
98101
BasicConsumerMetricEntity.RECORDS_CONSUMED_COUNT.getMetricEntity(),
@@ -101,7 +104,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
101104
BasicConsumerTehutiMetricName.RECORDS_CONSUMED,
102105
Arrays.asList(new Avg(), new Max(), new Rate()),
103106
baseDimensionsMap,
104-
baseAttributes);
107+
baseAttributes,
108+
resources);
105109

106110
pollSuccessCountMetric = MetricEntityStateOneEnum.create(
107111
BasicConsumerMetricEntity.POLL_COUNT.getMetricEntity(),
@@ -110,7 +114,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
110114
BasicConsumerTehutiMetricName.POLL_SUCCESS_COUNT,
111115
Collections.singletonList(new Rate()),
112116
baseDimensionsMap,
113-
VeniceResponseStatusCategory.class);
117+
VeniceResponseStatusCategory.class,
118+
resources);
114119

115120
pollFailCountMetric = MetricEntityStateOneEnum.create(
116121
BasicConsumerMetricEntity.POLL_COUNT.getMetricEntity(),
@@ -119,7 +124,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
119124
BasicConsumerTehutiMetricName.POLL_FAIL_COUNT,
120125
Collections.singletonList(new Rate()),
121126
baseDimensionsMap,
122-
VeniceResponseStatusCategory.class);
127+
VeniceResponseStatusCategory.class,
128+
resources);
123129

124130
versionSwapSuccessCountMetric = MetricEntityStateOneEnum.create(
125131
BasicConsumerMetricEntity.VERSION_SWAP_COUNT.getMetricEntity(),
@@ -128,7 +134,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
128134
BasicConsumerTehutiMetricName.VERSION_SWAP_SUCCESS_COUNT,
129135
Collections.singletonList(new Total()),
130136
baseDimensionsMap,
131-
VeniceResponseStatusCategory.class);
137+
VeniceResponseStatusCategory.class,
138+
resources);
132139

133140
versionSwapFailCountMetric = MetricEntityStateOneEnum.create(
134141
BasicConsumerMetricEntity.VERSION_SWAP_COUNT.getMetricEntity(),
@@ -137,7 +144,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
137144
BasicConsumerTehutiMetricName.VERSION_SWAP_FAIL_COUNT,
138145
Collections.singletonList(new Total()),
139146
baseDimensionsMap,
140-
VeniceResponseStatusCategory.class);
147+
VeniceResponseStatusCategory.class,
148+
resources);
141149

142150
chunkedRecordSuccessCountMetric = MetricEntityStateOneEnum.create(
143151
BasicConsumerMetricEntity.CHUNKED_RECORD_COUNT.getMetricEntity(),
@@ -146,7 +154,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
146154
BasicConsumerTehutiMetricName.CHUNKED_RECORD_SUCCESS_COUNT,
147155
Collections.singletonList(new Rate()),
148156
baseDimensionsMap,
149-
VeniceResponseStatusCategory.class);
157+
VeniceResponseStatusCategory.class,
158+
resources);
150159

151160
chunkedRecordFailCountMetric = MetricEntityStateOneEnum.create(
152161
BasicConsumerMetricEntity.CHUNKED_RECORD_COUNT.getMetricEntity(),
@@ -155,7 +164,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
155164
BasicConsumerTehutiMetricName.CHUNKED_RECORD_FAIL_COUNT,
156165
Collections.singletonList(new Rate()),
157166
baseDimensionsMap,
158-
VeniceResponseStatusCategory.class);
167+
VeniceResponseStatusCategory.class,
168+
resources);
159169

160170
/*
161171
* Record default value for version swap metrics so the UP_DOWN_COUNTER in OTEL will emit a default 0.

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
3535
import com.linkedin.venice.service.AbstractVeniceService;
3636
import com.linkedin.venice.stats.HelixMessageChannelStats;
37+
import com.linkedin.venice.stats.metrics.CompositeCloseable;
3738
import com.linkedin.venice.status.StatusMessageHandler;
3839
import com.linkedin.venice.utils.DaemonThreadFactory;
3940
import com.linkedin.venice.utils.HelixUtils;
@@ -90,6 +91,8 @@ public class HelixParticipationService extends AbstractVeniceService
9091
private VeniceOfflinePushMonitorAccessor veniceOfflinePushMonitorAccessor;
9192
private BlobTransferManager<Void> blobTransferManager;
9293
private final HeartbeatMonitoringService heartbeatMonitoringService;
94+
/** Stats fields owned by this class; drained by {@link #stopInner()}. */
95+
private final CompositeCloseable statsCloseables = new CompositeCloseable();
9396

9497
// This is ONLY for testing purpose.
9598
public ThreadPoolExecutor getLeaderFollowerHelixStateTransitionThreadPool() {
@@ -193,20 +196,22 @@ public boolean startInner() {
193196
config.getMaxLeaderFollowerStateTransitionThreadNumber(),
194197
"Venice-L/F-state-transition");
195198
// register stats that tracks the thread pool
196-
ParticipantStateTransitionStats stateTransitionStats = new ParticipantStateTransitionStats(
197-
metricsRepository,
198-
leaderFollowerHelixStateTransitionThreadPool,
199-
"Venice_L/F_ST_thread_pool");
199+
ParticipantStateTransitionStats stateTransitionStats = statsCloseables.register(
200+
new ParticipantStateTransitionStats(
201+
metricsRepository,
202+
leaderFollowerHelixStateTransitionThreadPool,
203+
"Venice_L/F_ST_thread_pool"));
200204

201205
if (config.getLeaderFollowerThreadPoolStrategy()
202206
.equals(LeaderFollowerPartitionStateModelFactory.LeaderFollowerThreadPoolStrategy.DUAL_POOL_STRATEGY)) {
203207
ThreadPoolExecutor futureVersionThreadPool = initHelixStateTransitionThreadPool(
204208
config.getMaxFutureVersionLeaderFollowerStateTransitionThreadNumber(),
205209
"venice-L/F-state-transition-future-version");
206-
ParticipantStateTransitionStats futureVersionStateTransitionStats = new ParticipantStateTransitionStats(
207-
metricsRepository,
208-
futureVersionThreadPool,
209-
"Venice_L/F_ST_thread_pool_future_version");
210+
ParticipantStateTransitionStats futureVersionStateTransitionStats = statsCloseables.register(
211+
new ParticipantStateTransitionStats(
212+
metricsRepository,
213+
futureVersionThreadPool,
214+
"Venice_L/F_ST_thread_pool_future_version"));
210215
leaderFollowerParticipantModelFactory = new LeaderFollowerPartitionStateModelDualPoolFactory(
211216
ingestionBackend,
212217
veniceConfigLoader,
@@ -301,6 +306,7 @@ public void stopInner() throws IOException {
301306
zkClient.close();
302307
LOGGER.info("Closed ZkClient.");
303308
}
309+
statsCloseables.close();
304310
LOGGER.info("Finished stopping HelixParticipation service.");
305311
}
306312

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AdaptiveThrottlerSignalService.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.linkedin.davinci.stats.ingestion.heartbeat.AggregatedHeartbeatLagEntry;
88
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
99
import com.linkedin.venice.service.AbstractVeniceService;
10+
import com.linkedin.venice.stats.metrics.CompositeCloseable;
1011
import com.linkedin.venice.throttle.VeniceAdaptiveThrottler;
1112
import com.linkedin.venice.utils.DaemonThreadFactory;
1213
import io.tehuti.Metric;
@@ -49,6 +50,8 @@ public class AdaptiveThrottlerSignalService extends AbstractVeniceService {
4950
private boolean nonCurrentLeaderMaxHeartbeatLagSignal = false;
5051
private boolean nonCurrentFollowerMaxHeartbeatLagSignal = false;
5152
private final AdaptiveThrottlingServiceStats adaptiveThrottlingServiceStats;
53+
/** Stats fields owned by this class; drained by {@link #stopInner()}. */
54+
private final CompositeCloseable statsCloseables = new CompositeCloseable();
5255

5356
public AdaptiveThrottlerSignalService(
5457
VeniceServerConfig veniceServerConfig,
@@ -63,8 +66,8 @@ public AdaptiveThrottlerSignalService(
6366
this.updateService = Executors.newSingleThreadScheduledExecutor(
6467
new DaemonThreadFactory("AdaptiveThrottlerSignalService", veniceServerConfig.getLogContext()));
6568
this.heartbeatMonitoringService = heartbeatMonitoringService;
66-
this.adaptiveThrottlingServiceStats =
67-
new AdaptiveThrottlingServiceStats(metricsRepository, veniceServerConfig.getClusterName());
69+
this.adaptiveThrottlingServiceStats = statsCloseables
70+
.register(new AdaptiveThrottlingServiceStats(metricsRepository, veniceServerConfig.getClusterName()));
6871
}
6972

7073
public void registerThrottler(VeniceAdaptiveThrottler adaptiveIngestionThrottler) {
@@ -171,6 +174,7 @@ public boolean startInner() throws Exception {
171174
@Override
172175
public void stopInner() throws Exception {
173176
updateService.shutdownNow();
177+
statsCloseables.close();
174178
}
175179

176180
List<VeniceAdaptiveThrottler> getThrottlerList() {

0 commit comments

Comments
 (0)