Skip to content

Commit

Permalink
[server] Fix NPE when collecting heartbeat_delay_ms and QuotaRcuPerSe…
Browse files Browse the repository at this point in the history
…condAllowed stats (#894)

We noticed NPE exceptions in deployed instances in the metric collection code paths for two metric classes - "HeartbeatStatReporter" and "ServerQuotaTokenBucketStats". This commit handles "null" gracefully and prevents these NPEs
  • Loading branch information
nisargthakkar authored Mar 12, 2024
1 parent 4037bf9 commit 9216ee4
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 45 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ ext.createDiffFile = { ->
// da-vinci-client
':!clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java',
':!clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java',
':!clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStatReporter.java',

// venice-client
':!clients/venice-client/src/main/java/com/linkedin/venice/fastclient/factory/ClientFactory.java',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,31 +92,29 @@ protected VeniceVersionedStats<STATS, STATS_REPORTER> addStore(String storeName)
}

protected void updateStatsVersionInfo(String storeName, List<Version> existingVersions, int newCurrentVersion) {
VeniceVersionedStats<STATS, STATS_REPORTER> versionedDIVStats = getVersionedStats(storeName);
VeniceVersionedStats<STATS, STATS_REPORTER> versionedStats = getVersionedStats(storeName);

if (newCurrentVersion != versionedDIVStats.getCurrentVersion()) {
versionedDIVStats.setCurrentVersion(newCurrentVersion);
if (newCurrentVersion != versionedStats.getCurrentVersion()) {
versionedStats.setCurrentVersion(newCurrentVersion);
}

List<Integer> existingVersionNumbers =
existingVersions.stream().map(Version::getNumber).collect(Collectors.toList());

// remove old versions except version 0. Version 0 is the default version when a store is created. Since no one will
// report to it, it is always "empty". We use it to reset reporters. eg. when a topic goes from in-flight to
// current,
// we reset in-flight reporter to version 0.
versionedDIVStats.getAllVersionNumbers()
// current, we reset in-flight reporter to version 0.
versionedStats.getAllVersionNumbers()
.stream()
.filter(versionNum -> !existingVersionNumbers.contains(versionNum) && versionNum != NON_EXISTING_VERSION)
.forEach(versionedDIVStats::removeVersion);
.forEach(versionedStats::removeVersion);

int futureVersion = NON_EXISTING_VERSION;
int backupVersion = NON_EXISTING_VERSION;
for (Version version: existingVersions) {
int versionNum = version.getNumber();

// add this version to stats if it is absent
versionedDIVStats.addVersion(versionNum);
versionedStats.addVersion(versionNum);

VersionStatus status = version.getStatus();
if (status == VersionStatus.STARTED || status == VersionStatus.PUSHED) {
Expand All @@ -130,20 +128,11 @@ protected void updateStatsVersionInfo(String storeName, List<Version> existingVe
if (futureVersion < versionNum) {
futureVersion = versionNum;
}
} else {
// check past version
if (status == VersionStatus.ONLINE && versionNum != newCurrentVersion) {
if (backupVersion != 0) {
LOGGER.warn("There are more than 1 backup versions. Something might be wrong. Store: {}", storeName);
}

backupVersion = versionNum;
}
}
}

if (futureVersion != versionedDIVStats.getFutureVersion()) {
versionedDIVStats.setFutureVersion(futureVersion);
if (futureVersion != versionedStats.getFutureVersion()) {
versionedStats.setFutureVersion(futureVersion);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public HeartbeatMonitoringService(
metricsRepository,
metadataRepository,
() -> new HeartbeatStat(new MetricConfig(), regionNames),
(aMetricsRepository, s) -> new HeartbeatStatReporter(aMetricsRepository, s, regionNames),
(aMetricsRepository, storeName) -> new HeartbeatStatReporter(aMetricsRepository, storeName, regionNames),
leaderHeartbeatTimeStamps,
followerHeartbeatTimeStamps);
}
Expand Down Expand Up @@ -162,7 +162,6 @@ public void stopInner() throws Exception {
*/
public void recordLeaderHeartbeat(String store, int version, int partition, String region, Long timestamp) {
recordHeartbeat(store, version, partition, region, timestamp, leaderHeartbeatTimeStamps);

}

/**
Expand Down Expand Up @@ -224,10 +223,12 @@ protected void recordLags(
protected void record() {
recordLags(
leaderHeartbeatTimeStamps,
((storeName, version, region, lag) -> versionStatsReporter.recordLeaderLag(storeName, version, region, lag)));
((storeName, version, region, heartbeatTs) -> versionStatsReporter
.recordLeaderLag(storeName, version, region, heartbeatTs)));
recordLags(
followerHeartbeatTimeStamps,
((storeName, version, region, lag) -> versionStatsReporter.recordFollowerLag(storeName, version, region, lag)));
((storeName, version, region, heartbeatTs) -> versionStatsReporter
.recordFollowerLag(storeName, version, region, heartbeatTs)));
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

import static com.linkedin.venice.stats.StatsErrorCode.NULL_INGESTION_STATS;

import com.linkedin.davinci.stats.AbstractVeniceStatsReporter;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.stats.AsyncGauge;
Expand All @@ -15,22 +17,36 @@ public class HeartbeatStatReporter extends AbstractVeniceStatsReporter<Heartbeat
public HeartbeatStatReporter(MetricsRepository metricsRepository, String storeName, Set<String> regions) {
super(metricsRepository, storeName);
for (String region: regions) {
registerSensor(
new AsyncGauge(
(ignored, ignored2) -> getStats().getLeaderLag(region).getMax(),
LEADER_METRIC_PREFIX + region + MAX));
registerSensor(
new AsyncGauge(
(ignored, ignored2) -> getStats().getFollowerLag(region).getMax(),
FOLLOWER_METRIC_PREFIX + region + MAX));
registerSensor(
new AsyncGauge(
(ignored, ignored2) -> getStats().getLeaderLag(region).getAvg(),
LEADER_METRIC_PREFIX + region + AVG));
registerSensor(
new AsyncGauge(
(ignored, ignored2) -> getStats().getFollowerLag(region).getAvg(),
FOLLOWER_METRIC_PREFIX + region + AVG));
registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}
return getStats().getLeaderLag(region).getMax();
}, LEADER_METRIC_PREFIX + region + MAX));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getFollowerLag(region).getMax();
}, FOLLOWER_METRIC_PREFIX + region + MAX));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getLeaderLag(region).getAvg();
}, LEADER_METRIC_PREFIX + region + AVG));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getFollowerLag(region).getAvg();
}, FOLLOWER_METRIC_PREFIX + region + AVG));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public HeartbeatVersionedStats(
this.followerMonitors = followerMonitors;
}

public void recordLeaderLag(String storeName, int version, String region, long lag) {
getStats(storeName, version).recordLeaderLag(region, lag);
public void recordLeaderLag(String storeName, int version, String region, long heartbeatTs) {
getStats(storeName, version).recordLeaderLag(region, heartbeatTs);
}

public void recordFollowerLag(String storeName, int version, String region, long lag) {
getStats(storeName, version).recordFollowerLag(region, lag);
public void recordFollowerLag(String storeName, int version, String region, long heartbeatTs) {
getStats(storeName, version).recordFollowerLag(region, heartbeatTs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,12 @@ public TokenBucket getBucketForStore(String storeName) {
if (storeName.equals(AbstractVeniceAggStats.STORE_NAME_FOR_TOTAL_STAT)) {
return storageNodeBucket;
} else {
int currentVersion = storeRepository.getStore(storeName).getCurrentVersion();
Store store = storeRepository.getStore(storeName);
if (store == null) {
return null;
}

int currentVersion = store.getCurrentVersion();
String topic = Version.composeKafkaTopic(storeName, currentVersion);
return storeVersionBuckets.get(topic);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.grpc.GrpcErrorCodes;
Expand All @@ -34,7 +35,9 @@
import com.linkedin.venice.protocols.VeniceServerResponse;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.routerapi.ReplicaState;
import com.linkedin.venice.stats.AbstractVeniceAggStats;
import com.linkedin.venice.stats.AggServerQuotaUsageStats;
import com.linkedin.venice.throttle.TokenBucket;
import com.linkedin.venice.utils.Utils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -460,6 +463,38 @@ public void grpcTestStoreLevelStorageNodeReadQuotaEnabled() {
assertEquals(blocked.get(), 0);
}

@Test
public void testGetBucketForStore() {
String storeName = "testStore";
String topic = Version.composeKafkaTopic(storeName, 1);
Version version = mock(Version.class);
doReturn(topic).when(version).kafkaTopicName();
Store store = setUpStoreMock(storeName, 1, Collections.singletonList(version), 100, true);
doReturn(store).when(storeRepository).getStore(storeName);
doReturn(Collections.singletonList(store)).when(storeRepository).getAllStores();

Instance thisInstance = mock(Instance.class);
doReturn(thisNodeId).when(thisInstance).getNodeId();

Partition partition = setUpPartitionMock(topic, thisInstance, true, 0);
doReturn(0).when(partition).getId();

PartitionAssignment pa = setUpPartitionAssignmentMock(topic, Collections.singletonList(partition));

quotaEnforcer.onCustomizedViewChange(pa);
TokenBucket bucketForStore = quotaEnforcer.getBucketForStore(storeName);
// Actual stale buckets = quota (100) * enforcementCapacityMultiple (5) * enforcementInterval (10)
assertEquals(bucketForStore.getStaleTokenCount(), 100 * 5 * 10);

// Total buckets = node capacity (10) * enforcementCapacityMultiple (5) * enforcementInterval (10)
TokenBucket totalBuckets = quotaEnforcer.getBucketForStore(AbstractVeniceAggStats.STORE_NAME_FOR_TOTAL_STAT);
assertEquals(totalBuckets.getStaleTokenCount(), nodeCapacity * 5 * 10);

// Non-existent store should return "null" TokenBucket object
TokenBucket bucketForInvalidStore = quotaEnforcer.getBucketForStore("incorrect_store");
assertNull(bucketForInvalidStore);
}

/**
* After appropriate setup, this test ensures we can read the initial capacity of the TokenBucket, cannot read
* beyond that, then increments time to allow for a bucket refill, and again ensures we can read the amount that was
Expand Down

0 comments on commit 9216ee4

Please sign in to comment.