Skip to content

Commit 90dd662

Browse files
authored
[server] Fix NPE for ServerReadQuotaUsageStats when store has no version or traffic (#1291)
* [server] Fix NPE for ServerReadQuotaUsageStats when store has no version or traffic 1. Prevent NPE in ServerReadQuotaUsageStats's async gauges when a store has no version or traffic. 2. Set backup version using version list in handle store change since the old behavior will not set the correct backup version after server restart.
1 parent 0ea8dce commit 90dd662

File tree

6 files changed

+65
-15
lines changed

6 files changed

+65
-15
lines changed

services/venice-server/src/main/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandler.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,12 @@ public void handleStoreChanged(Store store) {
413413

414414
List<String> topics =
415415
store.getVersions().stream().map((version) -> version.kafkaTopicName()).collect(Collectors.toList());
416+
int currentVersion = store.getCurrentVersion();
417+
int backupVersion = 0;
416418
for (String topic: topics) {
417419
toBeRemovedTopics.remove(topic);
418420
customizedViewRepository.subscribeRoutingDataChange(topic, this);
421+
int versionNumber = Version.parseVersionFromKafkaTopicName(topic);
419422
try {
420423
/**
421424
* make sure we're up-to-date after registering as a listener
@@ -427,8 +430,12 @@ public void handleStoreChanged(Store store) {
427430
*
428431
*/
429432
this.onCustomizedViewChange(customizedViewRepository.getPartitionAssignments(topic));
433+
if (versionNumber != currentVersion && versionNumber > backupVersion
434+
&& VersionStatus.isBootstrapCompleted(store.getVersionStatus(versionNumber))) {
435+
backupVersion = versionNumber;
436+
}
430437
} catch (VeniceNoHelixResourceException e) {
431-
Version version = store.getVersion(Version.parseVersionFromKafkaTopicName(topic));
438+
Version version = store.getVersion(versionNumber);
432439
if (version != null && version.getStatus().equals(VersionStatus.ONLINE)) {
433440
/**
434441
* The store metadata believes this version is online, but the partition assignment is not in the
@@ -462,7 +469,12 @@ public void handleStoreChanged(Store store) {
462469
}
463470
}
464471
removeTopics(toBeRemovedTopics);
465-
stats.setCurrentVersion(store.getName(), store.getCurrentVersion());
472+
if (currentVersion > 0) {
473+
stats.setCurrentVersion(store.getName(), currentVersion);
474+
}
475+
if (backupVersion > 0) {
476+
stats.setBackupVersion(store.getName(), backupVersion);
477+
}
466478
}
467479

468480
private Set<String> getStoreTopics(String storeName) {

services/venice-server/src/main/java/com/linkedin/venice/stats/AggServerQuotaUsageStats.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ public void setNodeQuotaResponsibility(String storeName, int version, long nodeK
3737
public void setCurrentVersion(String storeName, int version) {
3838
getStoreStats(storeName).setCurrentVersion(version);
3939
}
40+
41+
public void setBackupVersion(String storeName, int version) {
42+
getStoreStats(storeName).setBackupVersion(version);
43+
}
4044
}

services/venice-server/src/main/java/com/linkedin/venice/stats/ServerReadQuotaUsageStats.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,16 @@ public ServerReadQuotaUsageStats(MetricsRepository metricsRepository, String nam
6666

6767
public void setCurrentVersion(int version) {
6868
int oldCurrentVersion = currentVersion.get();
69-
if (version == oldCurrentVersion) {
69+
if (version != oldCurrentVersion) {
7070
// Defensive coding since set current version can be called multiple times with the same current version
71-
return;
71+
currentVersion.compareAndSet(oldCurrentVersion, version);
7272
}
73-
if (currentVersion.compareAndSet(oldCurrentVersion, version)) {
74-
// Old current version becomes the backup. This should work even if:
75-
// a) we rolled back current version
76-
// b) current version used to be 0
77-
backupVersion.set(oldCurrentVersion);
73+
}
74+
75+
public void setBackupVersion(int version) {
76+
int oldBackupVersion = backupVersion.get();
77+
if (version != oldBackupVersion) {
78+
backupVersion.compareAndSet(oldBackupVersion, version);
7879
}
7980
}
8081

@@ -114,24 +115,26 @@ private ServerReadQuotaVersionedStats getVersionedStats(int version) {
114115
return versionedStats.computeIfAbsent(version, (ignored) -> new ServerReadQuotaVersionedStats(time, metricConfig));
115116
}
116117

117-
private Double getVersionedRequestedQPS(int version) {
118+
// Package private for testing purpose
119+
final Double getVersionedRequestedQPS(int version) {
118120
if (version < 1) {
119121
return Double.NaN;
120122
}
121-
return versionedStats.get(version).getRequestedQPS();
123+
return getVersionedStats(version).getRequestedQPS();
122124
}
123125

124-
private Double getVersionedRequestedKPS(int version) {
126+
// Package private for testing purpose
127+
final Double getVersionedRequestedKPS(int version) {
125128
if (version < 1) {
126129
return Double.NaN;
127130
}
128-
return versionedStats.get(version).getRequestedKPS();
131+
return getVersionedStats(version).getRequestedKPS();
129132
}
130133

131134
/**
132135
* @return the ratio of the read quota usage to the node's quota responsibility
133136
*/
134-
private Double getReadQuotaUsageRatio() {
137+
final Double getReadQuotaUsageRatio() {
135138
int version = currentVersion.get();
136139
if (version < 1 || !versionedStats.containsKey(version)) {
137140
return Double.NaN;

services/venice-server/src/test/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
4141
import com.linkedin.venice.meta.Store;
4242
import com.linkedin.venice.meta.Version;
43+
import com.linkedin.venice.meta.VersionStatus;
4344
import com.linkedin.venice.protocols.VeniceServerResponse;
4445
import com.linkedin.venice.read.RequestType;
4546
import com.linkedin.venice.response.VeniceReadResponseStatus;
@@ -831,6 +832,7 @@ private Store setUpStoreMock(
831832
doReturn(versionList).when(store).getVersions();
832833
doReturn(readQuota).when(store).getReadQuotaInCU();
833834
doReturn(readQuotaEnabled).when(store).isStorageNodeReadQuotaEnabled();
835+
doReturn(VersionStatus.ONLINE).when(store).getVersionStatus(anyInt());
834836
return store;
835837
}
836838

services/venice-server/src/test/java/com/linkedin/venice/stats/AggServerReadQuotaUsageStatsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void testAggServerQuotaUsageStats() {
3030
String totalReadQuotaRequestedKPSString = ".total--current_quota_request_key_count.Gauge";
3131
long batchSize = 100;
3232
long batchSize2 = 200;
33-
aggServerQuotaUsageStats.setCurrentVersion(storeName, 1);
33+
aggServerQuotaUsageStats.setBackupVersion(storeName, 1);
3434
aggServerQuotaUsageStats.setCurrentVersion(storeName, 2);
3535
aggServerQuotaUsageStats.setCurrentVersion(storeName2, 1);
3636
aggServerQuotaUsageStats.recordAllowed(storeName, 1, batchSize);

services/venice-server/src/test/java/com/linkedin/venice/stats/ServerReadQuotaUsageStatsTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,33 @@ public void testGetReadQuotaUsageRatio() {
2121
Assert.assertTrue(
2222
metricsRepository.getMetric(".test-store--quota_requested_usage_ratio.Gauge").value() <= usageRatio);
2323
}
24+
25+
@Test
26+
public void testGetReadQuotaMetricsWithNoVersionOrRecordings() {
27+
MetricsRepository metricsRepository = new MetricsRepository();
28+
String storeName = "test-store";
29+
int currentVersion = 3;
30+
int backupVersion = 2;
31+
ServerReadQuotaUsageStats stats = new ServerReadQuotaUsageStats(metricsRepository, storeName);
32+
// Stats shouldn't fail if the store don't have any versions yet
33+
Assert.assertEquals(stats.getVersionedRequestedQPS(backupVersion), 0d);
34+
Assert.assertEquals(stats.getVersionedRequestedQPS(currentVersion), 0d);
35+
Assert.assertEquals(stats.getVersionedRequestedKPS(backupVersion), 0d);
36+
Assert.assertEquals(stats.getVersionedRequestedKPS(currentVersion), 0d);
37+
Assert.assertEquals(stats.getReadQuotaUsageRatio(), Double.NaN);
38+
// Stats shouldn't fail if there are no recordings yet
39+
stats.setCurrentVersion(currentVersion);
40+
stats.setBackupVersion(backupVersion);
41+
Assert.assertEquals(stats.getVersionedRequestedQPS(backupVersion), 0d);
42+
Assert.assertEquals(stats.getVersionedRequestedQPS(currentVersion), 0d);
43+
Assert.assertEquals(stats.getVersionedRequestedKPS(backupVersion), 0d);
44+
Assert.assertEquals(stats.getVersionedRequestedKPS(currentVersion), 0d);
45+
Assert.assertEquals(stats.getReadQuotaUsageRatio(), Double.NaN);
46+
// The replica receives some assignment and traffic for current version
47+
stats.setNodeQuotaResponsibility(currentVersion, 1000);
48+
stats.recordAllowed(currentVersion, 100);
49+
Assert.assertTrue(stats.getReadQuotaUsageRatio() > 0);
50+
Assert.assertTrue(stats.getVersionedRequestedQPS(currentVersion) > 0);
51+
Assert.assertTrue(stats.getVersionedRequestedKPS(currentVersion) > 0);
52+
}
2453
}

0 commit comments

Comments
 (0)