Skip to content

Commit b233240

Browse files
authored
Split SymDB payload when too large (#7838)
SymDB payload cannot be larger than 50MB otherwise datadog agent will generate an error. When serializing the upload request we are now verifying that we are below this limit, otherwise we are splitting the payload first by uploading by jar scope, but if a jar scope is still large we are splitting by class scopes. If down to one class the payload is still too large we are dropping the upload.
1 parent 24ccbaa commit b233240

File tree

3 files changed

+251
-13
lines changed

3 files changed

+251
-13
lines changed

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java

Lines changed: 117 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.datadog.debugger.sink;
22

33
import com.datadog.debugger.symbol.Scope;
4+
import com.datadog.debugger.symbol.ScopeType;
45
import com.datadog.debugger.symbol.ServiceVersion;
56
import com.datadog.debugger.uploader.BatchUploader;
67
import com.datadog.debugger.util.MoshiHelper;
@@ -9,6 +10,8 @@
910
import datadog.trace.util.TagsHelper;
1011
import java.nio.charset.StandardCharsets;
1112
import java.util.ArrayList;
13+
import java.util.Arrays;
14+
import java.util.Collections;
1215
import java.util.List;
1316
import java.util.concurrent.ArrayBlockingQueue;
1417
import java.util.concurrent.BlockingQueue;
@@ -29,24 +32,30 @@ public class SymbolSink {
2932
+ "\"service\": \"%s\",%n"
3033
+ "\"runtimeId\": \"%s\"%n"
3134
+ "}";
35+
static final int MAX_SYMDB_UPLOAD_SIZE = 50 * 1024 * 1024;
3236

3337
private final String serviceName;
3438
private final String env;
3539
private final String version;
3640
private final BatchUploader symbolUploader;
41+
private final int maxPayloadSize;
3742
private final BatchUploader.MultiPartContent event;
3843
private final BlockingQueue<Scope> scopes = new ArrayBlockingQueue<>(CAPACITY);
3944
private final Stats stats = new Stats();
4045

4146
public SymbolSink(Config config) {
42-
this(config, new BatchUploader(config, config.getFinalDebuggerSymDBUrl(), RETRY_POLICY));
47+
this(
48+
config,
49+
new BatchUploader(config, config.getFinalDebuggerSymDBUrl(), RETRY_POLICY),
50+
MAX_SYMDB_UPLOAD_SIZE);
4351
}
4452

45-
SymbolSink(Config config, BatchUploader symbolUploader) {
53+
SymbolSink(Config config, BatchUploader symbolUploader, int maxPayloadSize) {
4654
this.serviceName = TagsHelper.sanitize(config.getServiceName());
4755
this.env = TagsHelper.sanitize(config.getEnv());
4856
this.version = TagsHelper.sanitize(config.getVersion());
4957
this.symbolUploader = symbolUploader;
58+
this.maxPayloadSize = maxPayloadSize;
5059
byte[] eventContent =
5160
String.format(
5261
EVENT_FORMAT, TagsHelper.sanitize(config.getServiceName()), config.getRuntimeId())
@@ -87,15 +96,119 @@ public void flush() {
8796
String json =
8897
SERVICE_VERSION_ADAPTER.toJson(
8998
new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize));
90-
LOGGER.debug("Sending {} jar scopes size={}", scopesToSerialize.size(), json.length());
91-
updateStats(scopesToSerialize, json);
99+
if (json.length() > maxPayloadSize) {
100+
LOGGER.debug(
101+
"Upload split is required for {} scopes: {}/{}",
102+
scopesToSerialize.size(),
103+
json.length(),
104+
maxPayloadSize);
105+
splitAndSend(scopesToSerialize);
106+
} else {
107+
LOGGER.debug("Sending {} jar scopes size={}", scopesToSerialize.size(), json.length());
108+
updateStats(scopesToSerialize, json);
109+
LOGGER.debug("SymbolSink stats: {}", stats);
110+
symbolUploader.uploadAsMultipart(
111+
"",
112+
event,
113+
new BatchUploader.MultiPartContent(
114+
json.getBytes(StandardCharsets.UTF_8), "file", "file.json"));
115+
}
116+
}
117+
118+
/*
119+
* Try to split recursively the scopes to send in smaller chunks
120+
* first try by splitting the jar scopes, then by splitting the class scopes
121+
* stopped the recursion when the scopes are small enough to be sent or <2 classes in one scope
122+
*/
123+
private void splitAndSend(List<Scope> scopesToSerialize) {
124+
if (scopesToSerialize.size() > 1) {
125+
// try to split by jar scopes: one scope per request
126+
if (scopesToSerialize.size() < BatchUploader.MAX_ENQUEUED_REQUESTS) {
127+
for (Scope scope : scopesToSerialize) {
128+
String json =
129+
SERVICE_VERSION_ADAPTER.toJson(
130+
new ServiceVersion(
131+
serviceName, env, version, "JAVA", Collections.singletonList(scope)));
132+
if (json.length() > maxPayloadSize) {
133+
// this jar scope is still too big, split it by classes
134+
LOGGER.debug(
135+
"Upload split is required for jar scope {}: {}/{}",
136+
scope.getName(),
137+
json.length(),
138+
maxPayloadSize);
139+
splitAndSend(Collections.singletonList(scope));
140+
continue;
141+
}
142+
LOGGER.debug("Sending {} jar scope size={}", scope.getName(), json.length());
143+
doUpload(Collections.singletonList(scope), json);
144+
}
145+
} else {
146+
// split the list of jar scope in 2 list jar scopes with half of the jar scopes
147+
int half = scopesToSerialize.size() / 2;
148+
List<Scope> firstHalf = scopesToSerialize.subList(0, half);
149+
List<Scope> secondHalf = scopesToSerialize.subList(half, scopesToSerialize.size());
150+
LOGGER.debug("split jar scope list in 2: {} and {}", firstHalf.size(), secondHalf.size());
151+
String jsonFirstHalf =
152+
SERVICE_VERSION_ADAPTER.toJson(
153+
new ServiceVersion(serviceName, env, version, "JAVA", firstHalf));
154+
if (jsonFirstHalf.length() > maxPayloadSize) {
155+
LOGGER.warn(
156+
"Cannot split jar scope list in 2, first half is too big: {}",
157+
jsonFirstHalf.length());
158+
return;
159+
}
160+
doUpload(firstHalf, jsonFirstHalf);
161+
String jsonSecondHalf =
162+
SERVICE_VERSION_ADAPTER.toJson(
163+
new ServiceVersion(serviceName, env, version, "JAVA", secondHalf));
164+
if (jsonSecondHalf.length() > maxPayloadSize) {
165+
LOGGER.warn(
166+
"Cannot split jar scope list in 2, second half is too big: {}",
167+
jsonSecondHalf.length());
168+
return;
169+
}
170+
doUpload(secondHalf, jsonSecondHalf);
171+
}
172+
} else {
173+
Scope jarScope = scopesToSerialize.get(0);
174+
if (jarScope.getScopes() == null) {
175+
LOGGER.debug("No class scopes to send for jar scope {}", jarScope.getName());
176+
return;
177+
}
178+
if (jarScope.getScopes().size() < 2) {
179+
LOGGER.warn(
180+
"Cannot split jar scope with less than 2 classes scope: {}", jarScope.getName());
181+
return;
182+
}
183+
// split the jar scope in 2 jar scopes with half of the class scopes
184+
int half = jarScope.getScopes().size() / 2;
185+
List<Scope> firstHalf = jarScope.getScopes().subList(0, half);
186+
List<Scope> secondHalf = jarScope.getScopes().subList(half, jarScope.getScopes().size());
187+
LOGGER.debug(
188+
"split jar scope {} in 2 jar scopes: {} and {}",
189+
jarScope.getName(),
190+
firstHalf.size(),
191+
secondHalf.size());
192+
splitAndSend(
193+
Arrays.asList(
194+
createJarScope(jarScope.getName(), firstHalf),
195+
createJarScope(jarScope.getName(), secondHalf)));
196+
}
197+
}
198+
199+
private void doUpload(List<Scope> scopes, String json) {
200+
updateStats(scopes, json);
92201
symbolUploader.uploadAsMultipart(
93202
"",
94203
event,
95204
new BatchUploader.MultiPartContent(
96205
json.getBytes(StandardCharsets.UTF_8), "file", "file.json"));
97206
}
98207

208+
private static Scope createJarScope(String jarName, List<Scope> classScopes) {
209+
return Scope.builder(ScopeType.JAR, jarName, 0, 0).name(jarName).scopes(classScopes).build();
210+
}
211+
99212
private void updateStats(List<Scope> scopesToSerialize, String json) {
100213
for (Scope scope : scopesToSerialize) {
101214
stats.updateStats(scope.getScopes() != null ? scope.getScopes().size() : 0, json.length());

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/uploader/BatchUploader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public RetryPolicy(int maxFailures) {
7474
private static final String HEADER_DD_ENTITY_ID = "Datadog-Entity-ID";
7575
static final String HEADER_DD_API_KEY = "DD-API-KEY";
7676
static final int MAX_RUNNING_REQUESTS = 10;
77-
static final int MAX_ENQUEUED_REQUESTS = 20;
77+
public static final int MAX_ENQUEUED_REQUESTS = 20;
7878
static final int TERMINATION_TIMEOUT = 5;
7979

8080
private final String containerId;

dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java

Lines changed: 133 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.datadog.debugger.sink;
22

3+
import static com.datadog.debugger.sink.SymbolSink.MAX_SYMDB_UPLOAD_SIZE;
4+
import static java.util.Collections.singletonList;
35
import static org.junit.jupiter.api.Assertions.*;
46
import static org.mockito.Mockito.mock;
57
import static org.mockito.Mockito.when;
@@ -10,7 +12,9 @@
1012
import datadog.trace.api.Config;
1113
import java.util.ArrayList;
1214
import java.util.Arrays;
15+
import java.util.Collections;
1316
import java.util.List;
17+
import org.jetbrains.annotations.NotNull;
1418
import org.junit.jupiter.api.Test;
1519

1620
class SymbolSinkTest {
@@ -20,7 +24,7 @@ public void testSimpleFlush() {
2024
SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock();
2125
Config config = mock(Config.class);
2226
when(config.getServiceName()).thenReturn("service1");
23-
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock);
27+
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, MAX_SYMDB_UPLOAD_SIZE);
2428
symbolSink.addScope(Scope.builder(ScopeType.JAR, null, 0, 0).build());
2529
symbolSink.flush();
2630
assertEquals(2, symbolUploaderMock.multiPartContents.size());
@@ -43,17 +47,13 @@ public void testMultiScopeFlush() {
4347
SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock();
4448
Config config = mock(Config.class);
4549
when(config.getServiceName()).thenReturn("service1");
46-
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock);
50+
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, MAX_SYMDB_UPLOAD_SIZE);
4751
symbolSink.addScope(Scope.builder(ScopeType.JAR, "jar1.jar", 0, 0).build());
4852
symbolSink.addScope(Scope.builder(ScopeType.JAR, "jar2.jar", 0, 0).build());
4953
symbolSink.flush();
5054
// only 1 request because we are batching the scopes
5155
assertEquals(2, symbolUploaderMock.multiPartContents.size());
52-
BatchUploader.MultiPartContent eventContent = symbolUploaderMock.multiPartContents.get(0);
53-
assertEquals("event", eventContent.getPartName());
54-
BatchUploader.MultiPartContent symbolContent = symbolUploaderMock.multiPartContents.get(1);
55-
assertEquals("file", symbolContent.getPartName());
56-
String strContent = new String(symbolContent.getContent());
56+
String strContent = assertMultipartContent(symbolUploaderMock, 0);
5757
assertTrue(strContent.contains("\"source_file\":\"jar1.jar\""));
5858
assertTrue(strContent.contains("\"source_file\":\"jar2.jar\""));
5959
}
@@ -63,7 +63,7 @@ public void testQueueFull() {
6363
SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock();
6464
Config config = mock(Config.class);
6565
when(config.getServiceName()).thenReturn("service1");
66-
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock);
66+
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, MAX_SYMDB_UPLOAD_SIZE);
6767
for (int i = 0; i < SymbolSink.CAPACITY; i++) {
6868
symbolSink.addScope(Scope.builder(ScopeType.JAR, "jar1.jar", 0, 0).build());
6969
}
@@ -81,6 +81,131 @@ public void testQueueFull() {
8181
.contains("\"source_file\":\"jar2.jar\""));
8282
}
8383

84+
@Test
85+
public void splitByJarScopes() {
86+
SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock();
87+
Config config = mock(Config.class);
88+
when(config.getServiceName()).thenReturn("service1");
89+
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, 1024);
90+
final int NUM_JAR_SCOPES = 10;
91+
for (int i = 0; i < NUM_JAR_SCOPES; i++) {
92+
symbolSink.addScope(
93+
Scope.builder(ScopeType.JAR, "jar" + i + ".jar", 0, 0)
94+
.scopes(singletonList(Scope.builder(ScopeType.CLASS, "class" + i, 0, 0).build()))
95+
.build());
96+
}
97+
symbolSink.flush();
98+
// split upload request per jar scope
99+
assertEquals(NUM_JAR_SCOPES * 2, symbolUploaderMock.multiPartContents.size());
100+
for (int i = 0; i < NUM_JAR_SCOPES * 2; i += 2) {
101+
String strContent = assertMultipartContent(symbolUploaderMock, i);
102+
assertTrue(strContent.contains("\"source_file\":\"jar" + (i / 2) + ".jar\""));
103+
}
104+
}
105+
106+
@Test
107+
public void splitTootManyJarScopes() {
108+
SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock();
109+
Config config = mock(Config.class);
110+
when(config.getServiceName()).thenReturn("service1");
111+
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, 2048);
112+
final int NUM_JAR_SCOPES = 21;
113+
for (int i = 0; i < NUM_JAR_SCOPES; i++) {
114+
symbolSink.addScope(
115+
Scope.builder(ScopeType.JAR, "jar" + i + ".jar", 0, 0)
116+
.scopes(singletonList(Scope.builder(ScopeType.CLASS, "class" + i, 0, 0).build()))
117+
.build());
118+
}
119+
symbolSink.flush();
120+
// split upload request per half jar scopes
121+
assertEquals(2 * 2, symbolUploaderMock.multiPartContents.size());
122+
String strContent1 = assertMultipartContent(symbolUploaderMock, 0);
123+
assertTrue(strContent1.contains("\"source_file\":\"jar0.jar\""));
124+
assertTrue(strContent1.contains("\"source_file\":\"jar9.jar\""));
125+
String strContent2 = assertMultipartContent(symbolUploaderMock, 2);
126+
assertTrue(strContent2.contains("\"source_file\":\"jar10.jar\""));
127+
assertTrue(strContent2.contains("\"source_file\":\"jar20.jar\""));
128+
}
129+
130+
@Test
131+
public void splitByClassScopes() {
132+
SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock();
133+
Config config = mock(Config.class);
134+
when(config.getServiceName()).thenReturn("service1");
135+
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, 1024);
136+
final int NUM_CLASS_SCOPES = 10;
137+
List<Scope> classScopes = new ArrayList<>();
138+
for (int i = 0; i < NUM_CLASS_SCOPES; i++) {
139+
classScopes.add(
140+
Scope.builder(ScopeType.CLASS, "class" + i, 0, 0)
141+
.scopes(
142+
Collections.singletonList(
143+
Scope.builder(ScopeType.METHOD, "class" + i, 0, 0)
144+
.name("method" + i)
145+
.build()))
146+
.build());
147+
}
148+
symbolSink.addScope(
149+
Scope.builder(ScopeType.JAR, "jar1.jar", 0, 0)
150+
.name("jar1.jar")
151+
.scopes(classScopes)
152+
.build());
153+
symbolSink.flush();
154+
// split upload request per jar scope
155+
final int EXPECTED_REQUESTS = 4;
156+
assertEquals(EXPECTED_REQUESTS * 2, symbolUploaderMock.multiPartContents.size());
157+
List<List<String>> expectedSourceFiles =
158+
Arrays.asList(
159+
Arrays.asList("class0", "class1"),
160+
Arrays.asList("class2", "class3", "class4"),
161+
Arrays.asList("class5", "class6"),
162+
Arrays.asList("class7", "class8", "class9"));
163+
for (int i = 0; i < EXPECTED_REQUESTS * 2; i += 2) {
164+
String strContent = assertMultipartContent(symbolUploaderMock, i);
165+
for (String sourceFile : expectedSourceFiles.get(i / 2)) {
166+
assertTrue(strContent.contains("\"source_file\":\"" + sourceFile + "\""));
167+
}
168+
}
169+
}
170+
171+
@Test
172+
public void splitByClassScopesImpossible() {
173+
SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock();
174+
Config config = mock(Config.class);
175+
when(config.getServiceName()).thenReturn("service1");
176+
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, 1);
177+
final int NUM_CLASS_SCOPES = 10;
178+
List<Scope> classScopes = new ArrayList<>();
179+
for (int i = 0; i < NUM_CLASS_SCOPES; i++) {
180+
classScopes.add(
181+
Scope.builder(ScopeType.CLASS, "class" + i, 0, 0)
182+
.scopes(
183+
Collections.singletonList(
184+
Scope.builder(ScopeType.METHOD, "class" + i, 0, 0)
185+
.name("method" + i)
186+
.build()))
187+
.build());
188+
}
189+
symbolSink.addScope(
190+
Scope.builder(ScopeType.JAR, "jar1.jar", 0, 0)
191+
.name("jar1.jar")
192+
.scopes(classScopes)
193+
.build());
194+
symbolSink.flush();
195+
// no request to upload because we cannot split the jar scope
196+
assertTrue(symbolUploaderMock.multiPartContents.isEmpty());
197+
}
198+
199+
@NotNull
200+
private static String assertMultipartContent(SymbolUploaderMock symbolUploaderMock, int index) {
201+
BatchUploader.MultiPartContent eventContent = symbolUploaderMock.multiPartContents.get(index);
202+
assertEquals("event", eventContent.getPartName());
203+
BatchUploader.MultiPartContent symbolContent =
204+
symbolUploaderMock.multiPartContents.get(index + 1);
205+
assertEquals("file", symbolContent.getPartName());
206+
return new String(symbolContent.getContent());
207+
}
208+
84209
static class SymbolUploaderMock extends BatchUploader {
85210
final List<MultiPartContent> multiPartContents = new ArrayList<>();
86211

0 commit comments

Comments
 (0)