From 21c0b2df63a9b3f65a31297f9670311329ce27f5 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Bempel Date: Tue, 5 Nov 2024 18:29:37 +0100 Subject: [PATCH 1/8] Fix memory leak in Exception Replay (#7885) The weak map stateByThrowable keep the throwable as the key but this exception is also strong referenced by snapshots stored inside the ThrowableState in CapturedThrowable and in locals and extensions for @exception. Fixing by storing weak reference inside the CapturedThrowable and clearing the other ref for @exception at commit time --- .../trace/bootstrap/debugger/CapturedContext.java | 11 ++++++++--- .../debugger/exception/ExceptionProbeManager.java | 13 ++++++++++--- .../com/datadog/debugger/probe/ExceptionProbe.java | 13 +++++++++++++ .../exception/DefaultExceptionDebuggerTest.java | 8 ++++++++ .../ExceptionProbeInstrumentationTest.java | 2 -- 5 files changed, 39 insertions(+), 8 deletions(-) diff --git a/dd-java-agent/agent-debugger/debugger-bootstrap/src/main/java/datadog/trace/bootstrap/debugger/CapturedContext.java b/dd-java-agent/agent-debugger/debugger-bootstrap/src/main/java/datadog/trace/bootstrap/debugger/CapturedContext.java index 6e278662543..4205eb5aff0 100644 --- a/dd-java-agent/agent-debugger/debugger-bootstrap/src/main/java/datadog/trace/bootstrap/debugger/CapturedContext.java +++ b/dd-java-agent/agent-debugger/debugger-bootstrap/src/main/java/datadog/trace/bootstrap/debugger/CapturedContext.java @@ -9,6 +9,7 @@ import datadog.trace.bootstrap.debugger.util.Redaction; import datadog.trace.bootstrap.debugger.util.TimeoutChecker; import datadog.trace.bootstrap.debugger.util.WellKnownClasses; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -183,6 +184,10 @@ public ValueReferenceResolver withExtensions(Map extensions) { return new CapturedContext(this, extensions); } + public void removeExtension(String name) { + extensions.remove(name); + } + private void addExtension(String name, Object value) { extensions.put(name, value); } @@ -642,7 +647,7 @@ public TimeoutChecker getTimeoutChecker() { public static class CapturedThrowable { private final String type; private final String message; - private final transient Throwable throwable; + private final transient WeakReference throwable; /* * Need to exclude stacktrace from equals/hashCode computation. @@ -663,7 +668,7 @@ public CapturedThrowable( this.type = type; this.message = message; this.stacktrace = new ArrayList<>(stacktrace); - this.throwable = t; + this.throwable = new WeakReference<>(t); } public String getType() { @@ -679,7 +684,7 @@ public List getStacktrace() { } public Throwable getThrowable() { - return throwable; + return throwable.get(); } private static List captureFrames(StackTraceElement[] stackTrace) { diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/exception/ExceptionProbeManager.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/exception/ExceptionProbeManager.java index d509073c30e..c020731a74d 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/exception/ExceptionProbeManager.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/exception/ExceptionProbeManager.java @@ -146,11 +146,14 @@ boolean shouldCaptureException(String fingerprint, Clock clock) { public void addSnapshot(Snapshot snapshot) { Throwable throwable = snapshot.getCaptures().getReturn().getCapturedThrowable().getThrowable(); + if (throwable == null) { + LOGGER.debug("Snapshot has no throwable: {}", snapshot.getId()); + return; + } throwable = ExceptionHelper.getInnerMostThrowable(throwable); if (throwable == null) { - LOGGER.debug( - "Unable to find root cause of exception: {}", - snapshot.getCaptures().getReturn().getCapturedThrowable().getThrowable().toString()); + throwable = snapshot.getCaptures().getReturn().getCapturedThrowable().getThrowable(); + LOGGER.debug("Unable to find root cause of exception: {}", String.valueOf(throwable)); return; } ThrowableState state = @@ -172,6 +175,10 @@ void updateLastCapture(String fingerprint, Clock clock) { fingerprints.put(fingerprint, Instant.now(clock)); } + boolean hasExceptionStateTracked() { + return !snapshotsByThrowable.isEmpty(); + } + public static class ThrowableState { private final String exceptionId; private List snapshots; diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/ExceptionProbe.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/ExceptionProbe.java index c068aa278cb..6bc46117409 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/ExceptionProbe.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/ExceptionProbe.java @@ -13,6 +13,7 @@ import datadog.trace.bootstrap.debugger.ProbeId; import datadog.trace.bootstrap.debugger.ProbeImplementation; import datadog.trace.bootstrap.debugger.ProbeLocation; +import datadog.trace.bootstrap.debugger.el.ValueReferences; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,10 @@ public void evaluate( return; } Throwable throwable = context.getCapturedThrowable().getThrowable(); + if (throwable == null) { + LOGGER.debug("Throwable cleared by GC"); + return; + } Throwable innerMostThrowable = getInnerMostThrowable(throwable); String fingerprint = Fingerprinter.fingerprint(innerMostThrowable, exceptionProbeManager.getClassNameFilter()); @@ -107,6 +112,9 @@ public void commit( * - DebuggerContext.commit() */ snapshot.recordStackTrace(4); + // clear any strong ref to the exception before adding the snapshot to avoid leaking snapshots + // inside the stateByThrowable map + clearExceptionRefs(snapshot); // add snapshot for later to wait for triggering point (ExceptionDebugger::handleException) exceptionProbeManager.addSnapshot(snapshot); LOGGER.debug( @@ -117,6 +125,11 @@ public void commit( } } + private void clearExceptionRefs(Snapshot snapshot) { + snapshot.getCaptures().getReturn().getLocals().remove(ValueReferences.EXCEPTION_REF); + snapshot.getCaptures().getReturn().removeExtension(ValueReferences.EXCEPTION_EXTENSION_NAME); + } + @Override public void buildLocation(InstrumentationResult result) { String type = where.getTypeName(); diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/exception/DefaultExceptionDebuggerTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/exception/DefaultExceptionDebuggerTest.java index 4262d0fb796..a531bc022d3 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/exception/DefaultExceptionDebuggerTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/exception/DefaultExceptionDebuggerTest.java @@ -143,6 +143,14 @@ public void nestedException() { expectedFrameIndex, "com.datadog.debugger.exception.DefaultExceptionDebuggerTest", "createTest1Exception"); + // make sure we are not leaking references + exception = null; // release strong reference + System.gc(); + // calling ExceptionProbeManager#hasExceptionStateTracked() will call WeakIdentityHashMap#size() + // through isEmpty() an will purge stale entries + assertWithTimeout( + () -> !exceptionDebugger.getExceptionProbeManager().hasExceptionStateTracked(), + Duration.ofSeconds(30)); } @Test diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/exception/ExceptionProbeInstrumentationTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/exception/ExceptionProbeInstrumentationTest.java index 59e181fcc4a..5342486296d 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/exception/ExceptionProbeInstrumentationTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/exception/ExceptionProbeInstrumentationTest.java @@ -131,7 +131,6 @@ public void instrumentAndCaptureSnapshots() throws Exception { Snapshot snapshot0 = listener.snapshots.get(0); assertProbeId(probeIdsByMethodName, "processWithException", snapshot0.getProbe().getId()); assertEquals("oops", snapshot0.getCaptures().getReturn().getCapturedThrowable().getMessage()); - assertTrue(snapshot0.getCaptures().getReturn().getLocals().containsKey("@exception")); ProbeLocation location = snapshot0.getProbe().getLocation(); assertEquals( location.getType() + "." + location.getMethod(), snapshot0.getStack().get(0).getFunction()); @@ -212,7 +211,6 @@ public void recursive() throws Exception { assertProbeId(probeIdsByMethodName, "fiboException", snapshot0.getProbe().getId()); assertEquals( "oops fibo", snapshot0.getCaptures().getReturn().getCapturedThrowable().getMessage()); - assertTrue(snapshot0.getCaptures().getReturn().getLocals().containsKey("@exception")); assertEquals("1", getValue(snapshot0.getCaptures().getReturn().getArguments().get("n"))); Snapshot snapshot1 = listener.snapshots.get(1); assertEquals("2", getValue(snapshot1.getCaptures().getReturn().getArguments().get("n"))); From a177616ed30653f1d7921943659fda40a00f2e50 Mon Sep 17 00:00:00 2001 From: Bruce Bujon Date: Wed, 6 Nov 2024 10:52:31 +0100 Subject: [PATCH 2/8] feat(tooling): Add support for multiple commits PR backport (#7896) --- tooling/backport-pr-to-patch-release.sh | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tooling/backport-pr-to-patch-release.sh b/tooling/backport-pr-to-patch-release.sh index d56aa5b4453..12b23b723ae 100755 --- a/tooling/backport-pr-to-patch-release.sh +++ b/tooling/backport-pr-to-patch-release.sh @@ -50,8 +50,8 @@ git fetch --quiet git show-ref --verify --quiet "refs/remotes/origin/$PATCH_RELEASE_BRANCH" 1>/dev/null 2>&1 || { echo "Branch $PATCH_RELEASE_BRANCH does not exist"; exit 1; } # Check PR exists echo "- Checking PR exists" -PR_COMMIT=$(gh pr view "$PR_NUMBER" --json commits --jq '.commits[].oid') -if [ -z "$PR_COMMIT" ]; then +PR_COMMITS=$(gh pr view "$PR_NUMBER" --json commits --jq '.commits[].oid') +if [ -z "$PR_COMMITS" ]; then echo "PR $PR_NUMBER does not exist" exit 1 fi @@ -68,8 +68,10 @@ git pull # Create a new branch for the backport BRANCH_NAME="$USER/backport-pr-$PR_NUMBER" git checkout -b "$BRANCH_NAME" -# Cherry-pick PR commit -git cherry-pick "$PR_COMMIT" +# Cherry-pick PR commits +for PR_COMMIT in $PR_COMMITS; do + git cherry-pick -x "$PR_COMMIT" +done # Push the branch git push -u origin "$BRANCH_NAME" --no-verify # Create a PR From 8233fc5ff8fae7272c7c6261aca65527b8e4bb67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Vidal=20Dom=C3=ADnguez?= <60353145+Mariovido@users.noreply.github.com> Date: Wed, 6 Nov 2024 10:56:31 +0100 Subject: [PATCH 3/8] Expand SSRF support in IAST to java.net.http.HttpClient (#7877) --- .../httpclient/JavaNetClientDecorator.java | 5 ++ .../iast-util/iast-util-11/build.gradle | 35 ++++++++++ .../springboot/controller/SsrfController.java | 40 +++++++++++ .../AbstractIast11SpringBootTest.groovy | 68 +++++++++++++++++++ .../AbstractIastServerSmokeTest.groovy | 1 + .../AbstractIastSpringBootTest.groovy | 12 ++-- .../springboot-java-11/build.gradle | 45 ++++++++++++ .../springboot/SpringbootApplication.java | 14 ++++ .../springboot/IastSpringBootSmokeTest.groovy | 6 ++ settings.gradle | 3 +- 10 files changed, 222 insertions(+), 7 deletions(-) create mode 100644 dd-smoke-tests/iast-util/iast-util-11/build.gradle create mode 100644 dd-smoke-tests/iast-util/iast-util-11/src/main/java/datadog/smoketest/springboot/controller/SsrfController.java create mode 100644 dd-smoke-tests/iast-util/iast-util-11/src/testFixtures/groovy/datadog/smoketest/AbstractIast11SpringBootTest.groovy create mode 100644 dd-smoke-tests/springboot-java-11/build.gradle create mode 100644 dd-smoke-tests/springboot-java-11/src/main/java/datadog/smoketest/springboot/SpringbootApplication.java create mode 100644 dd-smoke-tests/springboot-java-11/src/test/groovy/datadog/smoketest/springboot/IastSpringBootSmokeTest.groovy diff --git a/dd-java-agent/instrumentation/java-http-client/src/main/java11/datadog/trace/instrumentation/httpclient/JavaNetClientDecorator.java b/dd-java-agent/instrumentation/java-http-client/src/main/java11/datadog/trace/instrumentation/httpclient/JavaNetClientDecorator.java index e69a00915b4..c83dd5c5617 100644 --- a/dd-java-agent/instrumentation/java-http-client/src/main/java11/datadog/trace/instrumentation/httpclient/JavaNetClientDecorator.java +++ b/dd-java-agent/instrumentation/java-http-client/src/main/java11/datadog/trace/instrumentation/httpclient/JavaNetClientDecorator.java @@ -36,6 +36,11 @@ protected URI url(HttpRequest httpRequest) throws URISyntaxException { return httpRequest.uri(); } + @Override + protected Object sourceUrl(final HttpRequest request) { + return request.uri(); + } + @Override protected int status(HttpResponse httpResponse) { return httpResponse.statusCode(); diff --git a/dd-smoke-tests/iast-util/iast-util-11/build.gradle b/dd-smoke-tests/iast-util/iast-util-11/build.gradle new file mode 100644 index 00000000000..f6740371baa --- /dev/null +++ b/dd-smoke-tests/iast-util/iast-util-11/build.gradle @@ -0,0 +1,35 @@ +plugins { + id 'idea' + id 'java-test-fixtures' +} + + +apply from: "$rootDir/gradle/java.gradle" + +description = 'iast-smoke-tests-utils-java-11' + +idea { + module { + jdkName = '11' + } +} + +dependencies { + api project(':dd-smoke-tests') + compileOnly group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '2.2.0.RELEASE' + + testFixturesImplementation testFixtures(project(":dd-smoke-tests:iast-util")) +} + +project.tasks.withType(AbstractCompile).configureEach { + setJavaVersion(it, 11) + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 + if (it instanceof JavaCompile) { + it.options.release.set(11) + } +} + +forbiddenApisMain { + failOnMissingClasses = false +} diff --git a/dd-smoke-tests/iast-util/iast-util-11/src/main/java/datadog/smoketest/springboot/controller/SsrfController.java b/dd-smoke-tests/iast-util/iast-util-11/src/main/java/datadog/smoketest/springboot/controller/SsrfController.java new file mode 100644 index 00000000000..832f2bad295 --- /dev/null +++ b/dd-smoke-tests/iast-util/iast-util-11/src/main/java/datadog/smoketest/springboot/controller/SsrfController.java @@ -0,0 +1,40 @@ +package datadog.smoketest.springboot.controller; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/ssrf") +public class SsrfController { + + @PostMapping("/java-net") + public String javaNet( + @RequestParam(value = "url", required = false) final String url, + @RequestParam(value = "async", required = false) final boolean async, + @RequestParam(value = "promise", required = false) final boolean promise) { + HttpClient httpClient = HttpClient.newBuilder().build(); + try { + HttpRequest httpRequest = HttpRequest.newBuilder().uri(new URI(url)).build(); + if (async) { + if (promise) { + httpClient.sendAsync( + httpRequest, + HttpResponse.BodyHandlers.ofString(), + (initiatingRequest, pushPromiseRequest, acceptor) -> {}); + } else { + httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString()); + } + } else { + httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString()); + } + } catch (Exception e) { + } + return "ok"; + } +} diff --git a/dd-smoke-tests/iast-util/iast-util-11/src/testFixtures/groovy/datadog/smoketest/AbstractIast11SpringBootTest.groovy b/dd-smoke-tests/iast-util/iast-util-11/src/testFixtures/groovy/datadog/smoketest/AbstractIast11SpringBootTest.groovy new file mode 100644 index 00000000000..2f7c6740121 --- /dev/null +++ b/dd-smoke-tests/iast-util/iast-util-11/src/testFixtures/groovy/datadog/smoketest/AbstractIast11SpringBootTest.groovy @@ -0,0 +1,68 @@ +package datadog.smoketest + +import okhttp3.FormBody +import okhttp3.Request + +import static datadog.trace.api.config.IastConfig.IAST_DEBUG_ENABLED +import static datadog.trace.api.config.IastConfig.IAST_DETECTION_MODE +import static datadog.trace.api.config.IastConfig.IAST_ENABLED + +abstract class AbstractIast11SpringBootTest extends AbstractIastServerSmokeTest { + + @Override + ProcessBuilder createProcessBuilder() { + String springBootShadowJar = System.getProperty('datadog.smoketest.springboot.shadowJar.path') + + List command = [] + command.add(javaPath()) + command.addAll(defaultJavaProperties) + command.addAll(iastJvmOpts()) + command.addAll((String[]) ['-jar', springBootShadowJar, "--server.port=${httpPort}"]) + ProcessBuilder processBuilder = new ProcessBuilder(command) + processBuilder.directory(new File(buildDirectory)) + // Spring will print all environment variables to the log, which may pollute it and affect log assertions. + processBuilder.environment().clear() + return processBuilder + } + + protected List iastJvmOpts() { + return [ + withSystemProperty(IAST_ENABLED, true), + withSystemProperty(IAST_DETECTION_MODE, 'FULL'), + withSystemProperty(IAST_DEBUG_ENABLED, true), + ] + } + + void 'ssrf is present (#path)'() { + setup: + final url = "http://localhost:${httpPort}/ssrf/${path}" + final body = new FormBody.Builder() + .add(parameter, value) + .add("async", async) + .add("promise", promise).build() + final request = new Request.Builder().url(url).post(body).build() + + when: + client.newCall(request).execute() + + then: + hasVulnerability { vul -> + if (vul.type != 'SSRF') { + return false + } + final parts = vul.evidence.valueParts + if (parameter == 'url') { + return parts.size() == 1 + && parts[0].value == value && parts[0].source.origin == 'http.request.parameter' && parts[0].source.name == parameter + } else { + throw new IllegalArgumentException("Parameter $parameter not supported") + } + } + + where: + path | parameter | value | async | promise + "java-net" | "url" | "https://dd.datad0g.com/" | "false" | "false" + "java-net" | "url" | "https://dd.datad0g.com/" | "true" | "false" + "java-net" | "url" | "https://dd.datad0g.com/" | "true" | "true" + } +} diff --git a/dd-smoke-tests/iast-util/src/testFixtures/groovy/datadog/smoketest/AbstractIastServerSmokeTest.groovy b/dd-smoke-tests/iast-util/src/testFixtures/groovy/datadog/smoketest/AbstractIastServerSmokeTest.groovy index 00f395ba278..c6b200197a5 100644 --- a/dd-smoke-tests/iast-util/src/testFixtures/groovy/datadog/smoketest/AbstractIastServerSmokeTest.groovy +++ b/dd-smoke-tests/iast-util/src/testFixtures/groovy/datadog/smoketest/AbstractIastServerSmokeTest.groovy @@ -119,6 +119,7 @@ abstract class AbstractIastServerSmokeTest extends AbstractServerSmokeTest { } final vulnerabilities = parseVulnerabilities(json) found.addAll(vulnerabilities) + return vulnerabilities.find(matcher) != null } } catch (SpockTimeoutError toe) { throw new AssertionError("No matching vulnerability found. Vulnerabilities found: ${new JsonBuilder(found).toPrettyString()}") diff --git a/dd-smoke-tests/iast-util/src/testFixtures/groovy/datadog/smoketest/AbstractIastSpringBootTest.groovy b/dd-smoke-tests/iast-util/src/testFixtures/groovy/datadog/smoketest/AbstractIastSpringBootTest.groovy index a81e41bd476..e5bd0a58e84 100644 --- a/dd-smoke-tests/iast-util/src/testFixtures/groovy/datadog/smoketest/AbstractIastSpringBootTest.groovy +++ b/dd-smoke-tests/iast-util/src/testFixtures/groovy/datadog/smoketest/AbstractIastSpringBootTest.groovy @@ -753,12 +753,12 @@ abstract class AbstractIastSpringBootTest extends AbstractIastServerSmokeTest { } where: - path | parameter | value | method | protocolSecure - "apache-httpclient4" | "url" | "https://dd.datad0g.com/" | "apacheHttpClient4" | false - "apache-httpclient4" | "host" | "dd.datad0g.com" | "apacheHttpClient4" | false - "commons-httpclient2" | "url" | "https://dd.datad0g.com/" | "commonsHttpClient2" | false - "okHttp2" | "url" | "https://dd.datad0g.com/" | "okHttp2" | false - "okHttp3" | "url" | "https://dd.datad0g.com/" | "okHttp3" | false + path | parameter | value | protocolSecure + "apache-httpclient4" | "url" | "https://dd.datad0g.com/" | true + "apache-httpclient4" | "host" | "dd.datad0g.com" | false + "commons-httpclient2" | "url" | "https://dd.datad0g.com/" | true + "okHttp2" | "url" | "https://dd.datad0g.com/" | true + "okHttp3" | "url" | "https://dd.datad0g.com/" | true } void 'test iast metrics stored in spans'() { diff --git a/dd-smoke-tests/springboot-java-11/build.gradle b/dd-smoke-tests/springboot-java-11/build.gradle new file mode 100644 index 00000000000..031e6615c5c --- /dev/null +++ b/dd-smoke-tests/springboot-java-11/build.gradle @@ -0,0 +1,45 @@ +plugins { + id 'java' + id 'org.springframework.boot' version '2.7.15' + id 'io.spring.dependency-management' version '1.0.15.RELEASE' + id 'java-test-fixtures' +} + +ext { + minJavaVersionForTests = JavaVersion.VERSION_11 +} + +apply from: "$rootDir/gradle/java.gradle" +description = 'SpringBoot Java 11 Smoke Tests.' + +repositories { + mavenCentral() +} + +dependencies { + implementation group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '2.2.0.RELEASE' + + testImplementation project(':dd-smoke-tests') + testImplementation testFixtures(project(":dd-smoke-tests:iast-util:iast-util-11")) + testImplementation testFixtures(project(':dd-smoke-tests:iast-util')) + + implementation project(':dd-smoke-tests:iast-util:iast-util-11') +} + +project.tasks.withType(AbstractCompile).configureEach { + setJavaVersion(it, 11) + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 + if (it instanceof JavaCompile) { + it.options.release.set(11) + } +} + +forbiddenApisMain { + failOnMissingClasses = false +} + +tasks.withType(Test).configureEach { + dependsOn "bootJar" + jvmArgs "-Ddatadog.smoketest.springboot.shadowJar.path=${tasks.bootJar.archiveFile.get()}" +} diff --git a/dd-smoke-tests/springboot-java-11/src/main/java/datadog/smoketest/springboot/SpringbootApplication.java b/dd-smoke-tests/springboot-java-11/src/main/java/datadog/smoketest/springboot/SpringbootApplication.java new file mode 100644 index 00000000000..03cc9791085 --- /dev/null +++ b/dd-smoke-tests/springboot-java-11/src/main/java/datadog/smoketest/springboot/SpringbootApplication.java @@ -0,0 +1,14 @@ +package datadog.smoketest.springboot; + +import java.lang.management.ManagementFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SpringbootApplication { + + public static void main(final String[] args) { + SpringApplication.run(SpringbootApplication.class, args); + System.out.println("Started in " + ManagementFactory.getRuntimeMXBean().getUptime() + "ms"); + } +} diff --git a/dd-smoke-tests/springboot-java-11/src/test/groovy/datadog/smoketest/springboot/IastSpringBootSmokeTest.groovy b/dd-smoke-tests/springboot-java-11/src/test/groovy/datadog/smoketest/springboot/IastSpringBootSmokeTest.groovy new file mode 100644 index 00000000000..1d3ce88f806 --- /dev/null +++ b/dd-smoke-tests/springboot-java-11/src/test/groovy/datadog/smoketest/springboot/IastSpringBootSmokeTest.groovy @@ -0,0 +1,6 @@ +package datadog.smoketest.springboot + +import datadog.smoketest.AbstractIast11SpringBootTest + +class IastSpringBootSmokeTest extends AbstractIast11SpringBootTest { +} diff --git a/settings.gradle b/settings.gradle index 6e6bb98a524..95e5c24cda3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -139,6 +139,7 @@ include ':dd-smoke-tests:spring-security' include ':dd-smoke-tests:springboot' include ':dd-smoke-tests:springboot-freemarker' include ':dd-smoke-tests:springboot-grpc' +include ':dd-smoke-tests:springboot-java-11' include ':dd-smoke-tests:springboot-jetty-jsp' include ':dd-smoke-tests:springboot-mongo' include ':dd-smoke-tests:springboot-openliberty-20' @@ -162,6 +163,7 @@ include ':dd-smoke-tests:debugger-integration-tests' include ':dd-smoke-tests:datastreams:kafkaschemaregistry' include ':dd-smoke-tests:iast-propagation' include ':dd-smoke-tests:iast-util' +include ':dd-smoke-tests:iast-util:iast-util-11' // TODO this fails too often with a jgit failure, so disable until fixed //include ':dd-smoke-tests:debugger-integration-tests:latest-jdk-app' @@ -500,4 +502,3 @@ include ':dd-java-agent:benchmark' include ':dd-java-agent:benchmark-integration' include ':dd-java-agent:benchmark-integration:jetty-perftest' include ':dd-java-agent:benchmark-integration:play-perftest' - From a63909c793d38cffdfc71347dfc0f283bc2653d1 Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Wed, 6 Nov 2024 18:00:10 +0100 Subject: [PATCH 4/8] Use telemetry 'is_sensitive' attribute instead of redacting the crash stacktrace (#7899) --- .../java/com/datadog/crashtracking/CrashUploader.java | 1 + .../crashtracking/parsers/HotspotCrashLogParser.java | 9 ++------- .../com/datadog/crashtracking/CrashUploaderTest.java | 2 ++ .../resources/golden/sample-crash-for-telemetry-2.txt | 2 +- .../resources/golden/sample-crash-for-telemetry-3.txt | 2 +- .../test/resources/golden/sample-crash-for-telemetry.txt | 2 +- 6 files changed, 8 insertions(+), 10 deletions(-) diff --git a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/CrashUploader.java b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/CrashUploader.java index fc811ead8b7..2421b4a49a2 100644 --- a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/CrashUploader.java +++ b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/CrashUploader.java @@ -293,6 +293,7 @@ private RequestBody makeTelemetryRequestBody(@Nonnull String content) throws IOE writer.name("message").value(crashLog.toJson()); writer.name("level").value("ERROR"); writer.name("tags").value("severity:crash"); + writer.name("is_sensitive").value(true); writer.endObject(); writer.endArray(); writer.name("application"); diff --git a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/parsers/HotspotCrashLogParser.java b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/parsers/HotspotCrashLogParser.java index ff0d46167b5..1ee3b7efec1 100644 --- a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/parsers/HotspotCrashLogParser.java +++ b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/parsers/HotspotCrashLogParser.java @@ -188,13 +188,8 @@ public CrashLog parse(String crashLog) { state = State.DONE; } else { // Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code) - if (line.contains("libjvm.so") || line.contains("libjavaProfiler")) { - message.append(line).append('\n'); - frames.add(parseLine(line)); - } else { - message.append(line.charAt(0)).append(" [redacted]\n"); - frames.add(new StackFrame(null, 0, "[redacted]")); - } + message.append(line).append('\n'); + frames.add(parseLine(line)); } break; case DONE: diff --git a/dd-java-agent/agent-crashtracking/src/test/java/com/datadog/crashtracking/CrashUploaderTest.java b/dd-java-agent/agent-crashtracking/src/test/java/com/datadog/crashtracking/CrashUploaderTest.java index e86cab2d26b..2f5255a0440 100644 --- a/dd-java-agent/agent-crashtracking/src/test/java/com/datadog/crashtracking/CrashUploaderTest.java +++ b/dd-java-agent/agent-crashtracking/src/test/java/com/datadog/crashtracking/CrashUploaderTest.java @@ -77,6 +77,7 @@ public class CrashUploaderTest { @BeforeEach public void setup() throws IOException { server.start(); + System.out.println("Setting up test: " + server.getPort()); url = server.url(URL_PATH); when(config.getEnv()).thenReturn(ENV); @@ -170,6 +171,7 @@ public void testTelemetryHappyPath(String log) throws Exception { // payload: assertEquals("ERROR", event.get("payload").get(0).get("level").asText()); + assertTrue(event.get("payload").get(0).get("is_sensitive").asBoolean()); // we need to sanitize the UIID which keeps on changing String message = event.get("payload").get(0).get("message").asText(); CrashLog extracted = CrashLog.fromJson(message); diff --git a/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry-2.txt b/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry-2.txt index 9d54e13d6e5..789e3c47096 100644 --- a/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry-2.txt +++ b/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry-2.txt @@ -1 +1 @@ -{"error":{"is_crash":true,"kind":"SIGSEGV","message":"\n\nJRE version: OpenJDK Runtime Environment Temurin-22.0.1+8 (22.0.1+8) (build 22.0.1+8)\nJava VM: OpenJDK 64-Bit Server VM Temurin-22.0.1+8 (22.0.1+8, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)\nProblematic frame:\nC [libpthread.so.0+0x9cd5] __pthread_clockjoin_ex+0x255\n\nNative frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)\nC [redacted]\n","source_type":"crashtracking","stack":{"format":"CrashTrackerV1","frames":[{"function":"[redacted]","line":0}]}},"incomplete":false,"metadata":{"family":"java","library_name":"dd-trace-java","library_version":"1.40.0-SNAPSHOT~b4718bd887","tags":{}},"os_info":{"architecture":"aarch64","bitness":"64","os_type":"Mac OS X","version":{"Semantic":[14,6,1]}},"proc_info":{"pid":"576034"},"timestamp":"2024-09-20T13:19:06Z","uuid":"477a8d3f-d381-4352-a2a9-76eeefeef242","version_id":0} +{"error":{"is_crash":true,"kind":"SIGSEGV","message":"\n\nJRE version: OpenJDK Runtime Environment Temurin-22.0.1+8 (22.0.1+8) (build 22.0.1+8)\nJava VM: OpenJDK 64-Bit Server VM Temurin-22.0.1+8 (22.0.1+8, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)\nProblematic frame:\nC [libpthread.so.0+0x9cd5] __pthread_clockjoin_ex+0x255\n\nNative frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)\nC [libpthread.so.0+0x9cd5] __pthread_clockjoin_ex+0x255\n","source_type":"crashtracking","stack":{"format":"CrashTrackerV1","frames":[{"function":"__pthread_clockjoin_ex","line":0}]}},"incomplete":false,"metadata":{"family":"java","library_name":"dd-trace-java","library_version":"1.42.0-SNAPSHOT~aa29078ace","tags":{}},"os_info":{"architecture":"aarch64","bitness":"64","os_type":"Mac OS X","version":{"Semantic":[14,7,0]}},"proc_info":{"pid":"576034"},"timestamp":"2024-09-20T13:19:06Z","uuid":"363ac4ba-c104-4c8f-9aaa-5870486a8926","version_id":0} diff --git a/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry-3.txt b/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry-3.txt index bda3870f062..b91529dd658 100644 --- a/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry-3.txt +++ b/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry-3.txt @@ -1 +1 @@ -{"error":{"is_crash":true,"kind":"INVALID","message":"\n\n fatal error: OutOfMemory encountered: Java heap space\nJRE version: OpenJDK Runtime Environment (Zulu 8.70.0.23-CA-macos-aarch64) (8.0_372-b07) (build 1.8.0_372-b07)\nJava VM: OpenJDK 64-Bit Server VM (25.372-b07 mixed mode bsd-aarch64 compressed oops)\n\nNative frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)\nV [redacted]\nV [redacted]\nV [redacted]\nV [redacted]\nV [redacted]\nV [redacted]\nj [redacted]\nv [redacted]\nV [redacted]\nV [redacted]\nV [redacted]\nC [redacted]\nC [redacted]\nC [redacted]\n","source_type":"crashtracking","stack":{"format":"CrashTrackerV1","frames":[{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0},{"function":"[redacted]","line":0}]}},"incomplete":false,"metadata":{"family":"java","library_name":"dd-trace-java","library_version":"1.40.0-SNAPSHOT~b4718bd887","tags":{}},"os_info":{"architecture":"aarch64","bitness":"64","os_type":"Mac OS X","version":{"Semantic":[14,6,1]}},"proc_info":{"pid":"96267"},"uuid":"9b651ca7-0671-4805-bd91-c83ba131ece9","version_id":0} +{"error":{"is_crash":true,"kind":"INVALID","message":"\n\n fatal error: OutOfMemory encountered: Java heap space\nJRE version: OpenJDK Runtime Environment (Zulu 8.70.0.23-CA-macos-aarch64) (8.0_372-b07) (build 1.8.0_372-b07)\nJava VM: OpenJDK 64-Bit Server VM (25.372-b07 mixed mode bsd-aarch64 compressed oops)\n\nNative frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)\nV [libjvm.dylib+0x565d30] VMError::report_and_die()+0x468\nV [libjvm.dylib+0x1941a0] report_vm_error(char const*, int, char const*, char const*)+0x5c\nV [libjvm.dylib+0x1943d8] report_java_out_of_memory(char const*)+0xfc\nV [libjvm.dylib+0x70430] CollectedHeap::common_mem_allocate_noinit(KlassHandle, unsigned long, Thread*)+0x128\nV [libjvm.dylib+0x53eba8] TypeArrayKlass::allocate_common(int, bool, Thread*)+0xfc\nV [libjvm.dylib+0x285b6c] InterpreterRuntime::newarray(JavaThread*, BasicType, int)+0x48\nj datadog.smoketest.crashtracking.CrashtrackingTestApplication.main([Ljava/lang/String;)V+105\nv ~StubRoutines::call_stub\nV [libjvm.dylib+0x28f86c] JavaCalls::call_helper(JavaValue*, methodHandle*, JavaCallArguments*, Thread*)+0x840\nV [libjvm.dylib+0x2d3b44] jni_invoke_static(JNIEnv_*, JavaValue*, _jobject*, JNICallType, _jmethodID*, JNI_ArgumentPusher*, Thread*)+0x294\nV [libjvm.dylib+0x2d7160] jni_CallStaticVoidMethod+0x188\nC [java+0x6404] JavaMain+0xa10\nC [libsystem_pthread.dylib+0x6f94] _pthread_start+0x88\nC [libsystem_pthread.dylib+0x1d34] thread_start+0x8\n","source_type":"crashtracking","stack":{"format":"CrashTrackerV1","frames":[{"function":"VMError::report_and_die()","line":0},{"function":"report_vm_error(char const*, int, char const*, char const*)","line":0},{"function":"report_java_out_of_memory(char const*)","line":0},{"function":"CollectedHeap::common_mem_allocate_noinit(KlassHandle, unsigned long, Thread*)","line":0},{"function":"TypeArrayKlass::allocate_common(int, bool, Thread*)","line":0},{"function":"InterpreterRuntime::newarray(JavaThread*, BasicType, int)","line":0},{"function":"datadog.smoketest.crashtracking.CrashtrackingTestApplication.main([Ljava/lang/String;)V","line":0},{"function":" ~StubRoutines::call_stub","line":0},{"function":"JavaCalls::call_helper(JavaValue*, methodHandle*, JavaCallArguments*, Thread*)","line":0},{"function":"jni_invoke_static(JNIEnv_*, JavaValue*, _jobject*, JNICallType, _jmethodID*, JNI_ArgumentPusher*, Thread*)","line":0},{"function":"jni_CallStaticVoidMethod","line":0},{"function":"JavaMain","line":0},{"function":"_pthread_start","line":0},{"function":"thread_start","line":0}]}},"incomplete":false,"metadata":{"family":"java","library_name":"dd-trace-java","library_version":"1.42.0-SNAPSHOT~aa29078ace","tags":{}},"os_info":{"architecture":"aarch64","bitness":"64","os_type":"Mac OS X","version":{"Semantic":[14,7,0]}},"proc_info":{"pid":"96267"},"uuid":"bfe4e4b4-b59f-4954-bee3-91f60e653e61","version_id":0} diff --git a/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry.txt b/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry.txt index 8d551d2d030..c7f887ecd0a 100644 --- a/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry.txt +++ b/dd-java-agent/agent-crashtracking/src/test/resources/golden/sample-crash-for-telemetry.txt @@ -1 +1 @@ -{"error":{"is_crash":true,"kind":"SIGSEGV","message":"\n\nJRE version: OpenJDK Runtime Environment Zulu17.42+20-SA (17.0.7+7) (build 17.0.7+7-LTS)\nJava VM: OpenJDK 64-Bit Server VM Zulu17.42+20-SA (17.0.7+7-LTS, mixed mode, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)\nProblematic frame:\nV [libjvm.so+0x6b7187] vframeStreamForte::forte_next()+0x797\n\nNative frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)\nV [libjvm.so+0x6b7187] vframeStreamForte::forte_next()+0x797\nV [libjvm.so+0x6b79ad] forte_fill_call_trace_given_top(JavaThread*, ASGCT_CallTrace*, int, frame) [clone .isra.22]+0x7bd\nV [libjvm.so+0x6b8123] AsyncGetCallTrace+0x193\nC [libjavaProfiler586350205236920700.so+0x146c8] Profiler::getJavaTraceAsync(void*, ASGCT_CallFrame*, int, StackContext*, bool*) [clone .isra.531]+0xd8\nC [libjavaProfiler586350205236920700.so+0x1c18d] Profiler::recordSample(void*, unsigned long long, int, int, Event*)+0xa2d\nC [libjavaProfiler586350205236920700.so+0x1d128] WallClock::sharedSignalHandler(int, siginfo_t*, void*)+0x148\nC [redacted]\nV [libjvm.so+0x85f9e9] JfrStackTrace::record_safe(JavaThread*, int)+0x5a9\nV [libjvm.so+0x861650] JfrStackTraceRepository::record_for_leak_profiler(JavaThread*, int)+0x50\nV [libjvm.so+0xb1ecba] ObjectSampler::sample(HeapWordImpl**, unsigned long, JavaThread*)+0x10a\nV [libjvm.so+0x81abee] JfrAllocationTracer::JfrAllocationTracer(Klass const*, HeapWordImpl**, unsigned long, bool, JavaThread*)+0x5e\nV [libjvm.so+0x3afa20] AllocTracer::send_allocation_in_new_tlab(Klass*, HeapWordImpl**, unsigned long, unsigned long, JavaThread*)+0x30\nV [libjvm.so+0xa8f8ed] MemAllocator::allocate() const+0x13d\nV [libjvm.so+0x7e443c] InstanceKlass::allocate_objArray(int, int, JavaThread*)+0x13c\nV [libjvm.so+0xbe1b59] OptoRuntime::new_array_C(Klass*, int, JavaThread*)+0x129\n","source_type":"crashtracking","stack":{"format":"CrashTrackerV1","frames":[{"function":"vframeStreamForte::forte_next()","line":0},{"function":"forte_fill_call_trace_given_top(JavaThread*, ASGCT_CallTrace*, int, frame) [clone .isra.22]","line":0},{"function":"AsyncGetCallTrace","line":0},{"function":"Profiler::getJavaTraceAsync(void*, ASGCT_CallFrame*, int, StackContext*, bool*) [clone .isra.531]","line":0},{"function":"Profiler::recordSample(void*, unsigned long long, int, int, Event*)","line":0},{"function":"WallClock::sharedSignalHandler(int, siginfo_t*, void*)","line":0},{"function":"[redacted]","line":0},{"function":"JfrStackTrace::record_safe(JavaThread*, int)","line":0},{"function":"JfrStackTraceRepository::record_for_leak_profiler(JavaThread*, int)","line":0},{"function":"ObjectSampler::sample(HeapWordImpl**, unsigned long, JavaThread*)","line":0},{"function":"JfrAllocationTracer::JfrAllocationTracer(Klass const*, HeapWordImpl**, unsigned long, bool, JavaThread*)","line":0},{"function":"AllocTracer::send_allocation_in_new_tlab(Klass*, HeapWordImpl**, unsigned long, unsigned long, JavaThread*)","line":0},{"function":"MemAllocator::allocate() const","line":0},{"function":"InstanceKlass::allocate_objArray(int, int, JavaThread*)","line":0},{"function":"OptoRuntime::new_array_C(Klass*, int, JavaThread*)","line":0}]}},"incomplete":false,"metadata":{"family":"java","library_name":"dd-trace-java","library_version":"1.40.0-SNAPSHOT~b4718bd887","tags":{}},"os_info":{"architecture":"aarch64","bitness":"64","os_type":"Mac OS X","version":{"Semantic":[14,6,1]}},"proc_info":{"pid":"161958"},"timestamp":"2023-10-17T20:25:14+08:00","uuid":"a570abb4-60e6-4590-be2f-5f8445f664a9","version_id":0} +{"error":{"is_crash":true,"kind":"SIGSEGV","message":"\n\nJRE version: OpenJDK Runtime Environment Zulu17.42+20-SA (17.0.7+7) (build 17.0.7+7-LTS)\nJava VM: OpenJDK 64-Bit Server VM Zulu17.42+20-SA (17.0.7+7-LTS, mixed mode, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)\nProblematic frame:\nV [libjvm.so+0x6b7187] vframeStreamForte::forte_next()+0x797\n\nNative frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)\nV [libjvm.so+0x6b7187] vframeStreamForte::forte_next()+0x797\nV [libjvm.so+0x6b79ad] forte_fill_call_trace_given_top(JavaThread*, ASGCT_CallTrace*, int, frame) [clone .isra.22]+0x7bd\nV [libjvm.so+0x6b8123] AsyncGetCallTrace+0x193\nC [libjavaProfiler586350205236920700.so+0x146c8] Profiler::getJavaTraceAsync(void*, ASGCT_CallFrame*, int, StackContext*, bool*) [clone .isra.531]+0xd8\nC [libjavaProfiler586350205236920700.so+0x1c18d] Profiler::recordSample(void*, unsigned long long, int, int, Event*)+0xa2d\nC [libjavaProfiler586350205236920700.so+0x1d128] WallClock::sharedSignalHandler(int, siginfo_t*, void*)+0x148\nC [libpthread.so.0+0x12cf0]\nV [libjvm.so+0x85f9e9] JfrStackTrace::record_safe(JavaThread*, int)+0x5a9\nV [libjvm.so+0x861650] JfrStackTraceRepository::record_for_leak_profiler(JavaThread*, int)+0x50\nV [libjvm.so+0xb1ecba] ObjectSampler::sample(HeapWordImpl**, unsigned long, JavaThread*)+0x10a\nV [libjvm.so+0x81abee] JfrAllocationTracer::JfrAllocationTracer(Klass const*, HeapWordImpl**, unsigned long, bool, JavaThread*)+0x5e\nV [libjvm.so+0x3afa20] AllocTracer::send_allocation_in_new_tlab(Klass*, HeapWordImpl**, unsigned long, unsigned long, JavaThread*)+0x30\nV [libjvm.so+0xa8f8ed] MemAllocator::allocate() const+0x13d\nV [libjvm.so+0x7e443c] InstanceKlass::allocate_objArray(int, int, JavaThread*)+0x13c\nV [libjvm.so+0xbe1b59] OptoRuntime::new_array_C(Klass*, int, JavaThread*)+0x129\n","source_type":"crashtracking","stack":{"format":"CrashTrackerV1","frames":[{"function":"vframeStreamForte::forte_next()","line":0},{"function":"forte_fill_call_trace_given_top(JavaThread*, ASGCT_CallTrace*, int, frame) [clone .isra.22]","line":0},{"function":"AsyncGetCallTrace","line":0},{"function":"Profiler::getJavaTraceAsync(void*, ASGCT_CallFrame*, int, StackContext*, bool*) [clone .isra.531]","line":0},{"function":"Profiler::recordSample(void*, unsigned long long, int, int, Event*)","line":0},{"function":"WallClock::sharedSignalHandler(int, siginfo_t*, void*)","line":0},{"function":"libpthread.so.0+0x12cf0","line":0},{"function":"JfrStackTrace::record_safe(JavaThread*, int)","line":0},{"function":"JfrStackTraceRepository::record_for_leak_profiler(JavaThread*, int)","line":0},{"function":"ObjectSampler::sample(HeapWordImpl**, unsigned long, JavaThread*)","line":0},{"function":"JfrAllocationTracer::JfrAllocationTracer(Klass const*, HeapWordImpl**, unsigned long, bool, JavaThread*)","line":0},{"function":"AllocTracer::send_allocation_in_new_tlab(Klass*, HeapWordImpl**, unsigned long, unsigned long, JavaThread*)","line":0},{"function":"MemAllocator::allocate() const","line":0},{"function":"InstanceKlass::allocate_objArray(int, int, JavaThread*)","line":0},{"function":"OptoRuntime::new_array_C(Klass*, int, JavaThread*)","line":0}]}},"incomplete":false,"metadata":{"family":"java","library_name":"dd-trace-java","library_version":"1.42.0-SNAPSHOT~aa29078ace","tags":{}},"os_info":{"architecture":"aarch64","bitness":"64","os_type":"Mac OS X","version":{"Semantic":[14,7,0]}},"proc_info":{"pid":"161958"},"timestamp":"2023-10-17T20:25:14+08:00","uuid":"0d1c4f7c-9d41-481d-909a-343ad12257b2","version_id":0} From 8e1ab2bcd89e9b0eacd4bd96635cc1ef07b49599 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Thu, 7 Nov 2024 08:40:24 +0100 Subject: [PATCH 5/8] Test support for reactor-kafka (#7886) * Test support for reactor-kafka * leftover * Run test foked * dispose kafka receivers at test end * Test on 3.8 are just flaky --- .../kafka-clients-0.11/build.gradle | 5 + .../groovy/KafkaReactorForkedTest.groovy | 230 ++++++++++++++++++ .../test/groovy/KafkaClientTestBase.groovy | 43 ++-- .../test/groovy/KafkaReactorForkedTest.groovy | 218 +++++++++++++++++ 4 files changed, 475 insertions(+), 21 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaReactorForkedTest.groovy create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle b/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle index 0168abcdd42..5560d1f6c4f 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle @@ -19,10 +19,13 @@ dependencies { testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.3.3.RELEASE' testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '1.3.3.RELEASE' + testImplementation group: 'io.projectreactor.kafka', name: 'reactor-kafka', version: '1.0.0.RELEASE' testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3' testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+' testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0' testRuntimeOnly project(':dd-java-agent:instrumentation:spring-scheduling-3.1') + testRuntimeOnly project(':dd-java-agent:instrumentation:reactor-core-3.1') + testRuntimeOnly project(':dd-java-agent:instrumentation:reactive-streams') testImplementation(testFixtures(project(':dd-java-agent:agent-iast'))) @@ -38,6 +41,8 @@ dependencies { // This seems to help with jar compatibility hell. latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.+' latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.+' + // latest depending to kafka client 2.x -> to be fixed when this instrumentation will test 3.x as well + latestDepTestImplementation group: 'io.projectreactor.kafka', name: 'reactor-kafka', version: '1.3.21' latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.+' latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.+' latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+' diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaReactorForkedTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaReactorForkedTest.groovy new file mode 100644 index 00000000000..0930c836fe8 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaReactorForkedTest.groovy @@ -0,0 +1,230 @@ +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.common.writer.ListWriter +import datadog.trace.core.DDSpan +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.junit.Rule +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.utils.KafkaTestUtils +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions +import reactor.kafka.sender.SenderRecord + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +class KafkaReactorForkedTest extends AgentTestRunner { + @Rule + EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, true, 4, KafkaClientTest.SHARED_TOPIC) + EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka + + + @Override + boolean useStrictTraceWrites() { + false + } + + def setup() { + TEST_WRITER.setFilter(new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll")) + } + }) + } + + def "test reactive produce and consume"() { + setup: + def senderProps = KafkaTestUtils.producerProps(embeddedKafka) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + + def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps)) + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic()) + + final KafkaReceiver kafkaReceiver = KafkaReceiver.create(ReceiverOptions. create(consumerProperties) + .subscription([KafkaClientTest.SHARED_TOPIC]) + .addAssignListener { + it.each { subscriptionReady.countDown() } + }) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + kafkaReceiver.receive() + // publish on another thread to be sure we're propagating that receive span correctly + .publishOn(Schedulers.parallel()) + .flatMap { receiverRecord -> + { + records.add(receiverRecord) + receiverRecord.receiverOffset().commit() + } + }.subscribe() + + + // wait until the container has the required number of assigned partitions + subscriptionReady.await() + + when: + String greeting = "Hello Reactor Kafka Sender!" + runUnderTrace("parent") { + kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTest.SHARED_TOPIC, greeting), null))) + .doOnError { ex -> runUnderTrace("producer exception: " + ex) {} } + .doOnNext { runUnderTrace("producer callback") {} } + .blockFirst() + blockUntilChildSpansFinished(2) + } + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + + assertTraces(2, SORT_TRACES_BY_START) { + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0)) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[2]) + } + } + } + + def "test reactive 100 msg produce and consume have only one parent"() { + setup: + def senderProps = KafkaTestUtils.producerProps(embeddedKafka) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + + def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps)) + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic()) + + final KafkaReceiver kafkaReceiver = KafkaReceiver.create(ReceiverOptions. create(consumerProperties) + .subscription([KafkaClientTest.SHARED_TOPIC]) + .addAssignListener { + it.each { subscriptionReady.countDown() } + }) + + // create a thread safe queue to store the received message + kafkaReceiver.receive() + // publish on another thread to be sure we're propagating that receive span correctly + .publishOn(Schedulers.parallel()) + .flatMap { receiverRecord -> + { + receiverRecord.receiverOffset().commit() + } + } + .subscribeOn(Schedulers.parallel()) + .subscribe() + + + // wait until the container has the required number of assigned partitions + subscriptionReady.await() + + when: + String greeting = "Hello Reactor Kafka Sender!" + Flux.range(0, 100) + .flatMap { kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTest.SHARED_TOPIC, greeting), null))) } + .publishOn(Schedulers.parallel()) + .subscribe() + then: + // check that the all the consume (100) and the send (100) are reported + TEST_WRITER.waitForTraces(200) + Map> traces = TEST_WRITER.inject([:]) { map, entry -> + def key = entry.get(0).getTraceId().toString() + map[key] = (map[key] ?: []) + entry + return map + } + traces.values().each { + assert it.size() == 2 + int produceIndex = 0 + int consumeIndex = 1 + if ("kafka.produce".contentEquals(it.get(1).getOperationName().toString())) { + produceIndex = 1 + consumeIndex = 0 + } + //assert that the consumer has the producer as parent and that the producer is root + assert it.get(consumeIndex).getParentId() == it.get(produceIndex).getSpanId() + assert it.get(produceIndex).getParentId() == 0 + } + } + + def producerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null) { + trace.span { + serviceName "kafka" + operationName "kafka.produce" + resourceName "Produce Topic $KafkaClientTest.SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + defaultTags() + } + } + } + + def consumerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null) { + trace.span { + serviceName "kafka" + operationName "kafka.consume" + resourceName "Consume Topic $KafkaClientTest.SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "$InstrumentationTags.PARTITION" { it >= 0 } + "$InstrumentationTags.OFFSET" { Integer } + "$InstrumentationTags.CONSUMER_GROUP" "sender" + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } + defaultTags(true) + } + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index f17c4ad8fb5..cecf73f26d4 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1,5 +1,3 @@ -import datadog.trace.common.writer.ListWriter - import static datadog.trace.agent.test.utils.TraceUtils.basicSpan import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope @@ -11,6 +9,7 @@ import datadog.trace.api.Config import datadog.trace.api.DDTags import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.common.writer.ListWriter import datadog.trace.core.DDSpan import datadog.trace.core.datastreams.StatsGroup import datadog.trace.test.util.Flaky @@ -34,6 +33,7 @@ import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Shared import java.util.concurrent.ExecutionException import java.util.concurrent.Future @@ -56,7 +56,8 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { public static final LinkedHashMap PRODUCER_PATHWAY_EDGE_TAGS // filter out Kafka poll, since the function is called in a loop, giving inconsistent results - final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter() { + @Shared + static final ListWriter.Filter DROP_KAFKA_POLL = new ListWriter.Filter() { @Override boolean accept(List trace) { return !(trace.size() == 1 && @@ -103,7 +104,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } def setup() { - TEST_WRITER.setFilter(dropKafkaPoll) + TEST_WRITER.setFilter(DROP_KAFKA_POLL) } @Override @@ -299,15 +300,15 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } List produce = [ "kafka_cluster_id:$clusterId", - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_produce" ] List commit = [ "consumer_group:sender", "kafka_cluster_id:$clusterId", - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_commit" ] verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) { @@ -452,15 +453,15 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } List produce = [ "kafka_cluster_id:$clusterId".toString(), - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_produce" ] List commit = [ "consumer_group:sender", "kafka_cluster_id:$clusterId".toString(), - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_commit" ] verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) { @@ -1014,7 +1015,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def producerSpan( TraceAssert trace, - Map config, + Map config, DDSpan parentSpan = null, boolean partitioned = true, boolean tombstone = false, @@ -1042,7 +1043,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { if (tombstone) { "$InstrumentationTags.TOMBSTONE" true } - if ({isDataStreamsEnabled()}) { + if ({ isDataStreamsEnabled() }) { "$DDTags.PATHWAY_HASH" { String } if (schema != null) { "$DDTags.SCHEMA_DEFINITION" schema @@ -1063,7 +1064,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { DDSpan parentSpan = null ) { trace.span { - serviceName splitByDestination() ? "$SHARED_TOPIC" : serviceForTimeInQueue() + serviceName splitByDestination() ? "$SHARED_TOPIC" : serviceForTimeInQueue() operationName "kafka.deliver" resourceName "$SHARED_TOPIC" spanType "queue" @@ -1084,7 +1085,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def consumerSpan( TraceAssert trace, - Map config, + Map config, DDSpan parentSpan = null, Range offset = 0..0, boolean tombstone = false, @@ -1114,7 +1115,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { if (tombstone) { "$InstrumentationTags.TOMBSTONE" true } - if ({isDataStreamsEnabled()}) { + if ({ isDataStreamsEnabled() }) { "$DDTags.PATHWAY_HASH" { String } } defaultTags(distributedRootSpan) @@ -1146,9 +1147,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def waitForKafkaMetadataUpdate(KafkaTemplate kafkaTemplate) { kafkaTemplate.flush() Producer wrappedProducer = kafkaTemplate.getTheProducer() - assert(wrappedProducer instanceof DefaultKafkaProducerFactory.CloseSafeProducer) + assert (wrappedProducer instanceof DefaultKafkaProducerFactory.CloseSafeProducer) Producer producer = wrappedProducer.delegate - assert(producer instanceof KafkaProducer) + assert (producer instanceof KafkaProducer) String clusterId = producer.metadata.cluster.clusterResource().clusterId() while (clusterId == null || clusterId.isEmpty()) { Thread.sleep(1500) @@ -1265,12 +1266,12 @@ abstract class KafkaClientLegacyTracingForkedTest extends KafkaClientTestBase { } } -class KafkaClientLegacyTracingV0ForkedTest extends KafkaClientLegacyTracingForkedTest{ +class KafkaClientLegacyTracingV0ForkedTest extends KafkaClientLegacyTracingForkedTest { } -class KafkaClientLegacyTracingV1ForkedTest extends KafkaClientLegacyTracingForkedTest{ +class KafkaClientLegacyTracingV1ForkedTest extends KafkaClientLegacyTracingForkedTest { @Override int version() { diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy new file mode 100644 index 00000000000..da6eaf91644 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy @@ -0,0 +1,218 @@ +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.core.DDSpan +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.junit.Rule +import org.springframework.kafka.test.rule.KafkaEmbedded +import org.springframework.kafka.test.utils.KafkaTestUtils +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions +import reactor.kafka.sender.SenderRecord + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +class KafkaReactorForkedTest extends AgentTestRunner { + @Rule + // create 4 partitions for more parallelism + KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 4, KafkaClientTestBase.SHARED_TOPIC) + + @Override + boolean useStrictTraceWrites() { + false + } + + def setup() { + TEST_WRITER.setFilter(KafkaClientTestBase.DROP_KAFKA_POLL) + } + + def "test reactive produce and consume"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + + def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps)) + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic()) + + final KafkaReceiver kafkaReceiver = KafkaReceiver.create(ReceiverOptions. create(consumerProperties) + .subscription([KafkaClientTestBase.SHARED_TOPIC]) + .addAssignListener { + it.each { subscriptionReady.countDown() } + }) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + kafkaReceiver.receive() + // publish on another thread to be sure we're propagating that receive span correctly + .publishOn(Schedulers.parallel()) + .flatMap { receiverRecord -> + { + records.add(receiverRecord) + receiverRecord.receiverOffset().commit() + } + }.subscribe() + + + // wait until the container has the required number of assigned partitions + subscriptionReady.await() + + when: + String greeting = "Hello Reactor Kafka Sender!" + runUnderTrace("parent") { + kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTestBase.SHARED_TOPIC, greeting), null))) + .doOnError { ex -> runUnderTrace("producer exception: " + ex) {} } + .doOnNext { runUnderTrace("producer callback") {} } + .blockFirst() + blockUntilChildSpansFinished(2) + } + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + + assertTraces(2, SORT_TRACES_BY_START) { + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0)) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[2]) + } + } + } + + def "test reactive 100 msg produce and consume have only one parent"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + + def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps)) + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic()) + + final KafkaReceiver kafkaReceiver = KafkaReceiver.create(ReceiverOptions. create(consumerProperties) + .subscription([KafkaClientTestBase.SHARED_TOPIC]) + .addAssignListener { + it.each { subscriptionReady.countDown() } + }) + + // create a thread safe queue to store the received message + kafkaReceiver.receive() + // publish on another thread to be sure we're propagating that receive span correctly + .publishOn(Schedulers.parallel()) + .flatMap { receiverRecord -> + { + receiverRecord.receiverOffset().commit() + } + } + .subscribeOn(Schedulers.parallel()) + .subscribe() + + + // wait until the container has the required number of assigned partitions + subscriptionReady.await() + + when: + String greeting = "Hello Reactor Kafka Sender!" + Flux.range(0, 100) + .flatMap { kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTestBase.SHARED_TOPIC, greeting), null))) } + .publishOn(Schedulers.parallel()) + .subscribe() + then: + // check that the all the consume (100) and the send (100) are reported + TEST_WRITER.waitForTraces(200) + Map> traces = TEST_WRITER.inject([:]) { map, entry -> + def key = entry.get(0).getTraceId().toString() + map[key] = (map[key] ?: []) + entry + return map + } + traces.values().each { + assert it.size() == 2 + int produceIndex = 0 + int consumeIndex = 1 + if ("kafka.produce".contentEquals(it.get(1).getOperationName().toString())) { + produceIndex = 1 + consumeIndex = 0 + } + //assert that the consumer has the producer as parent and that the producer is root + assert it.get(consumeIndex).getParentId() == it.get(produceIndex).getSpanId() + assert it.get(produceIndex).getParentId() == 0 + } + } + + def producerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null) { + trace.span { + serviceName "kafka" + operationName "kafka.produce" + resourceName "Produce Topic $KafkaClientTestBase.SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + defaultTags() + } + } + } + + def consumerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null) { + trace.span { + serviceName "kafka" + operationName "kafka.consume" + resourceName "Consume Topic $KafkaClientTestBase.SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "$InstrumentationTags.PARTITION" { it >= 0 } + "$InstrumentationTags.OFFSET" { Integer } + "$InstrumentationTags.CONSUMER_GROUP" "sender" + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } + defaultTags(true) + } + } + } +} From 15e5861d70ab71764d30952920257c6f7088e1ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Gonz=C3=A1lez=20Garc=C3=ADa?= Date: Thu, 7 Nov 2024 11:05:35 +0100 Subject: [PATCH 6/8] Fix stack trace inconsistency between excluded frames in vulnerability location and metastruct stack trace (#7865) What Does This Do Parametrize StackUtils#generateUserCodeStackTrace with a predicate Add AbstractStackWalker#isNotDatadogTraceStackElement as default filter Update isNotDatadogTraceStackElement to also filter com.datadog.appsec Motivation We are filtering different stack frames according to their class to calculate the vulnerability location and the vulnerability stack trace that is reported via meta struct --- .../util/stacktrace/AbstractStackWalker.java | 4 ++- .../trace/util/stacktrace/StackUtils.java | 15 ++++----- .../trace/util/stacktrace/StackUtilsTest.java | 32 ++++++++++++++++--- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/internal-api/src/main/java/datadog/trace/util/stacktrace/AbstractStackWalker.java b/internal-api/src/main/java/datadog/trace/util/stacktrace/AbstractStackWalker.java index 17c90ae02ab..fc248169641 100644 --- a/internal-api/src/main/java/datadog/trace/util/stacktrace/AbstractStackWalker.java +++ b/internal-api/src/main/java/datadog/trace/util/stacktrace/AbstractStackWalker.java @@ -18,6 +18,8 @@ final Stream doFilterStack(Stream stream) static boolean isNotDatadogTraceStackElement(final StackTraceElement el) { final String clazz = el.getClassName(); - return !clazz.startsWith("datadog.trace.") && !clazz.startsWith("com.datadog.iast."); + return !clazz.startsWith("datadog.trace.") + && !clazz.startsWith("com.datadog.iast.") + && !clazz.startsWith("com.datadog.appsec."); } } diff --git a/internal-api/src/main/java/datadog/trace/util/stacktrace/StackUtils.java b/internal-api/src/main/java/datadog/trace/util/stacktrace/StackUtils.java index 2cc833cc2a9..35ad5151054 100644 --- a/internal-api/src/main/java/datadog/trace/util/stacktrace/StackUtils.java +++ b/internal-api/src/main/java/datadog/trace/util/stacktrace/StackUtils.java @@ -60,19 +60,18 @@ public static E filterUntil( }); } - /** Function generates stack trace of the user code (excluding datadog classes) */ public static List generateUserCodeStackTrace() { + return generateUserCodeStackTrace(AbstractStackWalker::isNotDatadogTraceStackElement); + } + + /** Function generates stack trace of the user code (excluding datadog classes) */ + public static List generateUserCodeStackTrace( + final Predicate filterPredicate) { int stackCapacity = Config.get().getAppSecMaxStackTraceDepth(); List elements = StackWalkerFactory.INSTANCE.walk( stream -> - stream - .filter( - elem -> - !elem.getClassName().startsWith("com.datadog") - && !elem.getClassName().startsWith("datadog.trace")) - .limit(stackCapacity) - .collect(Collectors.toList())); + stream.filter(filterPredicate).limit(stackCapacity).collect(Collectors.toList())); return IntStream.range(0, elements.size()) .mapToObj(idx -> new StackTraceFrame(idx, elements.get(idx))) .collect(Collectors.toList()); diff --git a/internal-api/src/test/java/datadog/trace/util/stacktrace/StackUtilsTest.java b/internal-api/src/test/java/datadog/trace/util/stacktrace/StackUtilsTest.java index ef1bfeec3c6..1fb68fbd680 100644 --- a/internal-api/src/test/java/datadog/trace/util/stacktrace/StackUtilsTest.java +++ b/internal-api/src/test/java/datadog/trace/util/stacktrace/StackUtilsTest.java @@ -14,7 +14,12 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class StackUtilsTest { @@ -92,13 +97,30 @@ public void test_filter_until() { assertThat(noRemoval.getStackTrace()).isEqualTo(stack); } - @Test - public void test_generateUserCodeStackTrace() { - List userCodeStack = StackUtils.generateUserCodeStackTrace(); + private static Stream test_generateUserCodeStackTrace_Params() { + return Stream.of( + Arguments.of((Predicate) stack -> true, false), + Arguments.of( + (Predicate) stack -> !stack.getClassName().startsWith("org.junit"), + true)); + } + + @ParameterizedTest + @MethodSource("test_generateUserCodeStackTrace_Params") + public void test_generateUserCodeStackTrace( + final Predicate filter, final boolean expected) { + List userCodeStack = StackUtils.generateUserCodeStackTrace(filter); assertThat(userCodeStack).isNotNull(); + int junitFramesCounter = 0; for (StackTraceFrame frame : userCodeStack) { - assertThat(frame.getClass_name()).doesNotContain("com.datadog"); - assertThat(frame.getClass_name()).doesNotContain("datadog.trace"); + if (frame.getClass_name() != null && frame.getClass_name().startsWith("org.junit")) { + junitFramesCounter++; + } + } + if (expected) { + assertThat(junitFramesCounter).isEqualTo(0); + } else { + assertThat(junitFramesCounter).isGreaterThan(0); } } From 99355a3b6a0753005a38690dbe3f3481003c1241 Mon Sep 17 00:00:00 2001 From: Richard Startin Date: Thu, 7 Nov 2024 10:57:18 +0000 Subject: [PATCH 7/8] paranoid exception handling when setting profiling thread context (#7903) --- .../profiling/ddprof/DatadogProfiler.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/dd-java-agent/agent-profiling/profiling-ddprof/src/main/java/com/datadog/profiling/ddprof/DatadogProfiler.java b/dd-java-agent/agent-profiling/profiling-ddprof/src/main/java/com/datadog/profiling/ddprof/DatadogProfiler.java index c7144e8d891..c4c49d82731 100644 --- a/dd-java-agent/agent-profiling/profiling-ddprof/src/main/java/com/datadog/profiling/ddprof/DatadogProfiler.java +++ b/dd-java-agent/agent-profiling/profiling-ddprof/src/main/java/com/datadog/profiling/ddprof/DatadogProfiler.java @@ -329,7 +329,7 @@ public void setSpanContext(long spanId, long rootSpanId) { debugLogging(rootSpanId); try { profiler.setContext(spanId, rootSpanId); - } catch (IllegalStateException e) { + } catch (Throwable e) { log.debug("Failed to clear context", e); } } @@ -338,14 +338,18 @@ public void clearSpanContext() { debugLogging(0L); try { profiler.setContext(0L, 0L); - } catch (IllegalStateException e) { + } catch (Throwable e) { log.debug("Failed to set context", e); } } public boolean setContextValue(int offset, int encoding) { if (contextSetter != null && offset >= 0) { - return contextSetter.setContextValue(offset, encoding); + try { + return contextSetter.setContextValue(offset, encoding); + } catch (Throwable e) { + log.debug("Failed to set context", e); + } } return false; } @@ -353,7 +357,11 @@ public boolean setContextValue(int offset, int encoding) { public boolean setContextValue(int offset, CharSequence value) { if (contextSetter != null && offset >= 0) { int encoding = encode(value); - return contextSetter.setContextValue(offset, encoding); + try { + return contextSetter.setContextValue(offset, encoding); + } catch (Throwable e) { + log.debug("Failed to set context", e); + } } return false; } @@ -374,7 +382,11 @@ public boolean clearContextValue(String attribute) { public boolean clearContextValue(int offset) { if (contextSetter != null && offset >= 0) { - return contextSetter.clearContextValue(offset); + try { + return contextSetter.clearContextValue(offset); + } catch (Throwable t) { + log.debug("Failed to clear context", t); + } } return false; } From 060ab7250fd97c02074ef0c53e3df83c54aaea53 Mon Sep 17 00:00:00 2001 From: Bruce Bujon Date: Thu, 7 Nov 2024 13:14:05 +0100 Subject: [PATCH 8/8] fix(iast): Fix test name uniqueness (#7904) --- .../groovy/com/datadog/iast/sink/ApplicationModuleTest.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-java-agent/agent-iast/src/test/groovy/com/datadog/iast/sink/ApplicationModuleTest.groovy b/dd-java-agent/agent-iast/src/test/groovy/com/datadog/iast/sink/ApplicationModuleTest.groovy index 4c038069cf3..241c563c1fb 100644 --- a/dd-java-agent/agent-iast/src/test/groovy/com/datadog/iast/sink/ApplicationModuleTest.groovy +++ b/dd-java-agent/agent-iast/src/test/groovy/com/datadog/iast/sink/ApplicationModuleTest.groovy @@ -41,7 +41,7 @@ class ApplicationModuleTest extends IastModuleImplTestBase { 0 * reporter._ } - void 'check vulnerabilities'() { + void 'check vulnerabilities #path'() { given: final file = ClassLoader.getSystemResource(path) final realPath = file.path