Skip to content

Commit

Permalink
Hot fix: [server] Use thread safe map for versioned stats in SN read …
Browse files Browse the repository at this point in the history
…quota (#1415) (#1416)

[server] Use thread safe map for versioned stats in SN read quota (#1415)

Int2ObjectOpenHashMap is not thread safe and using it for ServerReadQuotaUsageStats could result in unexpected behaviors during race conditions since the map is accessed and modified by multiple triggers and threads:

Read traffic
CV/routing change events
Version creation and deletion events
Added a test and confirmed that it will hang/timeout with Int2ObjectOpenHashMap but succeed with a thread safe map like VeniceConcurrentHashMap

Also minor defensive code change in getReadQuotaUsageRatio to avoid NPE since the map could change in between check and access.
  • Loading branch information
xunyin8 authored Jan 3, 2025
1 parent a15f3c5 commit fd8ada1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricConfig;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import io.tehuti.metrics.stats.AsyncGauge;
import io.tehuti.metrics.stats.Count;
import io.tehuti.metrics.stats.Rate;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -31,7 +30,8 @@ public class ServerReadQuotaUsageStats extends AbstractVeniceStats {
private final Sensor rejectedKPS; // rejected key per second
private final Sensor allowedUnintentionallyKPS; // allowed KPS unintentionally due to error or insufficient info
private final Sensor usageRatioSensor; // requested kps divided by nodes quota responsibility
private final Int2ObjectMap<ServerReadQuotaVersionedStats> versionedStats = new Int2ObjectOpenHashMap<>();
private final VeniceConcurrentHashMap<Integer, ServerReadQuotaVersionedStats> versionedStats =
new VeniceConcurrentHashMap<>();
private final AtomicInteger currentVersion = new AtomicInteger(0);
private final AtomicInteger backupVersion = new AtomicInteger(0);
private final Time time;
Expand Down Expand Up @@ -144,10 +144,10 @@ final Double getVersionedRequestedKPS(int version) {
*/
final Double getReadQuotaUsageRatio() {
int version = currentVersion.get();
if (version < 1 || !versionedStats.containsKey(version)) {
ServerReadQuotaVersionedStats stats = versionedStats.get(version);
if (version < 1 || stats == null) {
return Double.NaN;
}
ServerReadQuotaVersionedStats stats = versionedStats.get(version);
long nodeKpsResponsibility = stats.getNodeKpsResponsibility();
if (nodeKpsResponsibility < 1) {
return Double.NaN;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package com.linkedin.venice.stats;

import com.linkedin.venice.utils.Time;
import io.tehuti.metrics.MetricsRepository;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -50,4 +57,25 @@ public void testGetReadQuotaMetricsWithNoVersionOrRecordings() {
Assert.assertTrue(stats.getVersionedRequestedQPS(currentVersion) > 0);
Assert.assertTrue(stats.getVersionedRequestedKPS(currentVersion) > 0);
}

/**
* A non-thread safe map like Int2ObjectOpenHashMap could cause the threads to busy loop inside the internal find
* method when a race condition happens
*/
@Test(timeOut = 10 * Time.MS_PER_SECOND)
public void testVersionedStatsThreadSafe() throws ExecutionException, InterruptedException, TimeoutException {
MetricsRepository metricsRepository = new MetricsRepository();
String storeName = "test-store";
ServerReadQuotaUsageStats stats = new ServerReadQuotaUsageStats(metricsRepository, storeName);
ExecutorService service = Executors.newFixedThreadPool(100);
CompletableFuture[] completableFutures = new CompletableFuture[100];
for (int j = 0; j < 100; j++) {
completableFutures[j] = CompletableFuture.runAsync(() -> {
for (int i = 0; i < 100000; i++) {
stats.recordAllowed(i, 1);
}
}, service);
}
CompletableFuture.allOf(completableFutures).get(10, TimeUnit.SECONDS);
}
}

0 comments on commit fd8ada1

Please sign in to comment.