Skip to content

Commit 1d97142

Browse files
authored
FFM-6392 - Java SDK - Investigate thread contention MetricsProcessor.pushToQueue (#133)
What - Remove synchronized keyword from pushToQueue (also renamed to registerEvaluation) - Remove synchronized keyword from runOneIteration, instead a new method was added to drain frequencyMap atomically which means we no longer need synchronized here - Remove debug logs causing excessive string concat operations in pushToQueue as reported by profiler - New stress test added to place load on this method with multiple threads, for easier profiling Why Platform team are reporting contention around MetricsProcessor.pushToQueue with JFR when using a large number of targets and flags. Testing Tested manually with new junit stress test added for MatricsProcessor with list of targets/flags used by platform.
1 parent d7ed6fe commit 1d97142

File tree

5 files changed

+314
-69
lines changed

5 files changed

+314
-69
lines changed

src/main/java/io/harness/cf/client/api/InnerClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ public JsonObject jsonVariation(
374374
public void processEvaluation(
375375
@NonNull FeatureConfig featureConfig, Target target, @NonNull Variation variation) {
376376
if (this.options.isAnalyticsEnabled()) {
377-
metricsProcessor.pushToQueue(target, featureConfig.getFeature(), variation);
377+
metricsProcessor.registerEvaluation(target, featureConfig.getFeature(), variation);
378378
}
379379
}
380380

src/main/java/io/harness/cf/client/api/MetricsProcessor.java

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,46 @@
1616
@Slf4j
1717
class MetricsProcessor extends AbstractScheduledService {
1818

19+
static class FrequencyMap<K> {
20+
21+
private final AtomicLongMap<K> map;
22+
23+
FrequencyMap() {
24+
map = AtomicLongMap.create();
25+
}
26+
27+
void increment(K key) {
28+
map.incrementAndGet(key);
29+
}
30+
31+
long get(K key) {
32+
return map.get(key);
33+
}
34+
35+
int size() {
36+
return map.size();
37+
}
38+
39+
long sum() {
40+
return map.sum();
41+
}
42+
43+
Map<K, Long> drainToMap() {
44+
// Guava doesn't have a function to atomically drain an AtomicLongMap.
45+
// Here we need to atomically set each key to zero as we transfer it to the new map else we
46+
// see missed evaluations
47+
final HashMap<K, Long> result = new HashMap<>();
48+
map.asMap()
49+
.forEach(
50+
(k, v) -> {
51+
final long oldVal = map.getAndUpdate(k, old -> 0);
52+
result.put(k, oldVal);
53+
});
54+
map.removeAllZeros();
55+
return result;
56+
}
57+
}
58+
1959
private static final String FEATURE_NAME_ATTRIBUTE = "featureName";
2060
private static final String VARIATION_IDENTIFIER_ATTRIBUTE = "variationIdentifier";
2161
private static final String TARGET_ATTRIBUTE = "target";
@@ -36,7 +76,7 @@ class MetricsProcessor extends AbstractScheduledService {
3676

3777
private final Connector connector;
3878
private final BaseConfig config;
39-
private final AtomicLongMap<MetricEvent> frequencyMap;
79+
private final FrequencyMap<MetricEvent> frequencyMap;
4080
private final Set<Target> uniqueTargetSet;
4181
private final ScheduledExecutorService executor;
4282

@@ -50,26 +90,32 @@ public MetricsProcessor(
5090
this.config = config;
5191
this.executor = executor();
5292

53-
this.frequencyMap = AtomicLongMap.create();
93+
this.frequencyMap = new FrequencyMap<>();
5494
this.uniqueTargetSet = ConcurrentHashMap.newKeySet();
5595

5696
callback.onMetricsReady();
5797
}
5898

59-
// push the incoming data to the queue
60-
public synchronized void pushToQueue(Target target, String featureName, Variation variation) {
99+
@Deprecated /* The name of this method no longer makes sense since we moved to a map, kept for source compatibility */
100+
public void pushToQueue(Target target, String featureName, Variation variation) {
101+
registerEvaluation(target, featureName, variation);
102+
}
61103

62-
if (frequencyMap.size() > config.getBufferSize()) {
63-
log.warn(
64-
"Metric frequency map exceeded buffer size ({} > {}), force flushing",
65-
frequencyMap.size(),
66-
config.getBufferSize());
104+
void registerEvaluation(Target target, String featureName, Variation variation) {
105+
106+
final int freqMapSize = frequencyMap.size();
107+
108+
if (freqMapSize > config.getBufferSize()) {
109+
if (log.isWarnEnabled()) {
110+
log.warn(
111+
"Metric frequency map exceeded buffer size ({} > {}), force flushing",
112+
freqMapSize,
113+
config.getBufferSize());
114+
}
67115
// If the map is starting to grow too much then push the events now and reset the counters
68116
executor.submit(this::runOneIteration);
69117
}
70118

71-
log.debug("Flag: " + featureName + " Target: " + target + " Variation: " + variation);
72-
73119
Target metricTarget = globalTarget;
74120

75121
if (target != null) {
@@ -79,7 +125,7 @@ public synchronized void pushToQueue(Target target, String featureName, Variatio
79125
}
80126
}
81127

82-
frequencyMap.incrementAndGet(new MetricEvent(featureName, metricTarget, variation));
128+
frequencyMap.increment(new MetricEvent(featureName, metricTarget, variation));
83129
}
84130

85131
/** This method sends the metrics data to the analytics server and resets the cache */
@@ -202,17 +248,15 @@ private String getVersion() {
202248
}
203249

204250
@Override
205-
protected synchronized void runOneIteration() {
251+
protected void runOneIteration() {
206252
if (log.isDebugEnabled()) {
207253
log.debug(
208-
"Drain metrics queue : frequencyMap size={} uniqueTargetSet size={} metric events pending={}",
254+
"Drain metrics queue : frequencyMap size={} uniqueTargetSet size={}",
209255
frequencyMap.size(),
210-
uniqueTargetSet.size(),
211-
frequencyMap.sum());
256+
uniqueTargetSet.size());
212257
}
213-
sendDataAndResetCache(new HashMap<>(frequencyMap.asMap()), new HashSet<>(uniqueTargetSet));
258+
sendDataAndResetCache(frequencyMap.drainToMap(), new HashSet<>(uniqueTargetSet));
214259

215-
frequencyMap.clear();
216260
uniqueTargetSet.clear();
217261
}
218262

@@ -238,6 +282,8 @@ public void close() {
238282
log.info("Closing MetricsProcessor");
239283
}
240284

285+
/* package private */
286+
241287
synchronized void flushQueue() {
242288
executor.submit(this::runOneIteration);
243289
}

src/test/java/io/harness/cf/client/api/CfClientTest.java

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,12 @@
1010
import com.google.gson.JsonObject;
1111
import io.harness.cf.client.api.dispatchers.TestWebServerDispatcher;
1212
import io.harness.cf.client.api.dispatchers.UnimplementedStreamDispatcher;
13+
import io.harness.cf.client.api.testutils.DummyConnector;
1314
import io.harness.cf.client.common.Cache;
14-
import io.harness.cf.client.connector.Connector;
15-
import io.harness.cf.client.connector.ConnectorException;
16-
import io.harness.cf.client.connector.Service;
17-
import io.harness.cf.client.connector.Updater;
1815
import io.harness.cf.client.dto.Target;
19-
import io.harness.cf.model.FeatureConfig;
20-
import io.harness.cf.model.Metrics;
21-
import io.harness.cf.model.Segment;
2216
import java.io.IOException;
2317
import java.net.URISyntaxException;
2418
import java.time.Duration;
25-
import java.util.Collections;
2619
import java.util.List;
2720
import java.util.Set;
2821
import java.util.concurrent.CountDownLatch;
@@ -367,46 +360,4 @@ public List<String> keys() {
367360
return null;
368361
}
369362
}
370-
371-
static class DummyConnector implements Connector {
372-
373-
@Override
374-
public String authenticate() throws ConnectorException {
375-
return "dummy";
376-
}
377-
378-
@Override
379-
public void setOnUnauthorized(Runnable runnable) {}
380-
381-
@Override
382-
public List<FeatureConfig> getFlags() throws ConnectorException {
383-
return Collections.emptyList();
384-
}
385-
386-
@Override
387-
public FeatureConfig getFlag(@NonNull String identifier) throws ConnectorException {
388-
return null;
389-
}
390-
391-
@Override
392-
public List<Segment> getSegments() throws ConnectorException {
393-
return Collections.emptyList();
394-
}
395-
396-
@Override
397-
public Segment getSegment(@NonNull String identifier) throws ConnectorException {
398-
return null;
399-
}
400-
401-
@Override
402-
public void postMetrics(Metrics metrics) throws ConnectorException {}
403-
404-
@Override
405-
public Service stream(Updater updater) throws ConnectorException {
406-
return null;
407-
}
408-
409-
@Override
410-
public void close() {}
411-
}
412363
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package io.harness.cf.client.api;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import io.harness.cf.client.api.testutils.DummyConnector;
6+
import io.harness.cf.client.dto.Target;
7+
import io.harness.cf.model.FeatureConfig;
8+
import io.harness.cf.model.Variation;
9+
import java.io.IOException;
10+
import java.nio.file.Files;
11+
import java.nio.file.Paths;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.concurrent.*;
15+
import java.util.concurrent.atomic.LongAdder;
16+
import java.util.stream.Stream;
17+
import lombok.AllArgsConstructor;
18+
import lombok.EqualsAndHashCode;
19+
import lombok.NonNull;
20+
import org.junit.jupiter.api.Disabled;
21+
import org.junit.jupiter.api.Test;
22+
23+
/*
24+
* This stress test is disabled by default and needs to be run manually. Make sure to set TARGETS_FILE and
25+
* FLAGS_FILE from the production files in ff-sdk-testgrid or create files with at least 18K unique targets
26+
* and 400 unique flags if you want to run this test.
27+
*
28+
* If running in IntelliJ you can attach JFR from a terminal after starting the test using something like:
29+
*
30+
* jcmd $(jcmd | grep -i junit | cut -f 1 -d " ") JFR.start duration=10m filename=flight.jfr
31+
*/
32+
@Disabled
33+
class MetricsProcessorStressTest {
34+
35+
boolean RUN_PERPETUALLY = false; // useful for longer tests, note if true, test will never exit
36+
boolean DUMP_POSTED_METRICS = false;
37+
int NUM_THREADS = 32;
38+
int VARIATION_COUNT = 4;
39+
String TARGETS_FILE = "/tmp/prod2_targets.txt";
40+
String FLAGS_FILE = "/tmp/flags.txt";
41+
42+
@Test
43+
void testRegisterEvaluationContention() throws Exception {
44+
45+
final DummyConnector dummyConnector = new DummyConnector(DUMP_POSTED_METRICS);
46+
47+
final MetricsProcessor metricsProcessor =
48+
new MetricsProcessor(
49+
dummyConnector,
50+
BaseConfig.builder()
51+
// .globalTargetEnabled(false)
52+
.build(),
53+
new DummyMetricsCallback());
54+
55+
metricsProcessor.start();
56+
57+
System.out.println("Loading...");
58+
59+
final List<String> targets = loadFile(TARGETS_FILE);
60+
final List<String> flags = loadFile(FLAGS_FILE);
61+
62+
System.out.printf("Loaded %d targets\n", targets.size());
63+
System.out.printf("Loaded %d flags\n", flags.size());
64+
65+
final ConcurrentLinkedQueue<TargetAndFlag> targetAndFlags =
66+
createFlagTargetVariationPermutations(flags, targets);
67+
68+
System.out.printf("Starting...processing %d flags/targets\n", targetAndFlags.size());
69+
70+
final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
71+
final LongAdder totalProcessed = new LongAdder();
72+
73+
for (int i = 0; i < NUM_THREADS; i++) {
74+
final int threadNum = i;
75+
executor.submit(
76+
() -> {
77+
Thread.currentThread().setName("THREAD" + threadNum);
78+
System.out.println("start thread " + Thread.currentThread().getName());
79+
80+
TargetAndFlag next;
81+
82+
while ((next = targetAndFlags.poll()) != null) {
83+
final Target target = Target.builder().identifier(next.target).build();
84+
final FeatureConfig feature = FeatureConfig.builder().feature(next.flag).build();
85+
final Variation variation =
86+
Variation.builder()
87+
.identifier(next.variation)
88+
.value(next.variation + "Value")
89+
.build();
90+
91+
metricsProcessor.registerEvaluation(target, feature.getFeature(), variation);
92+
93+
if (RUN_PERPETUALLY) {
94+
targetAndFlags.add(next);
95+
}
96+
97+
totalProcessed.increment();
98+
}
99+
100+
System.out.printf("thread %s finished\n", Thread.currentThread().getName());
101+
});
102+
}
103+
104+
while (targetAndFlags.size() > 0) {
105+
System.out.printf(
106+
"target/flags/variations processed %d, map size %d, pending evaluations=%d \n",
107+
totalProcessed.sum(),
108+
metricsProcessor.getQueueSize(),
109+
metricsProcessor.getPendingMetricsToBeSent());
110+
111+
Thread.sleep(1000);
112+
}
113+
114+
metricsProcessor.runOneIteration();
115+
116+
assertEquals(flags.size() * targets.size() * VARIATION_COUNT, (int) totalProcessed.sum());
117+
assertEquals(
118+
flags.size() * targets.size() * VARIATION_COUNT,
119+
dummyConnector.getTotalMetricEvaluations());
120+
}
121+
122+
private List<String> loadFile(String filename) throws IOException {
123+
final List<String> map = new ArrayList<>();
124+
try (Stream<String> stream = Files.lines(Paths.get(filename))) {
125+
stream.forEach(map::add);
126+
}
127+
return map;
128+
}
129+
130+
private ConcurrentLinkedQueue<TargetAndFlag> createFlagTargetVariationPermutations(
131+
List<String> flags, List<String> targets) {
132+
final ConcurrentLinkedQueue<TargetAndFlag> targetAndFlags = new ConcurrentLinkedQueue<>();
133+
134+
for (String flag : flags) {
135+
for (String target : targets) {
136+
for (int v = 0; v < VARIATION_COUNT; v++) { // variations per flag/target combination
137+
targetAndFlags.add(new TargetAndFlag(target, flag, "variation" + v));
138+
}
139+
}
140+
}
141+
return targetAndFlags;
142+
}
143+
144+
@EqualsAndHashCode
145+
@AllArgsConstructor
146+
static class TargetAndFlag {
147+
String target, flag, variation;
148+
}
149+
150+
static class DummyMetricsCallback implements MetricsCallback {
151+
@Override
152+
public void onMetricsReady() {
153+
System.out.println("onMetricsReady");
154+
}
155+
156+
@Override
157+
public void onMetricsError(@NonNull String error) {
158+
System.out.println("onMetricsError " + error);
159+
}
160+
161+
@Override
162+
public void onMetricsFailure() {
163+
System.out.println("onMetricsFailure");
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)