diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java index 7eb7be21528..56fff233264 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java @@ -22,6 +22,8 @@ public abstract class HttpClientDecorator extends UriBasedClientDecorator { public static final LinkedHashMap CLIENT_PATHWAY_EDGE_TAGS; + public static final boolean SHOULD_INSTRUMENT_DATA_STREAMS = + Config.get().isHttpDataStreamsEnabled(); static { CLIENT_PATHWAY_EDGE_TAGS = new LinkedHashMap<>(2); diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java index d4b57da9808..135f2df7577 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java @@ -73,6 +73,8 @@ public abstract class HttpServerDecorator getter = getter(); - if (null != carrier && null != getter) { + if (null != carrier && null != getter && SHOULD_INSTRUMENT_DATA_STREAMS) { tracer().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); } return span; diff --git a/dd-java-agent/instrumentation/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpSingleRequestInstrumentation.java b/dd-java-agent/instrumentation/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpSingleRequestInstrumentation.java index 08fce4d798d..f19fd48fbcb 100644 --- a/dd-java-agent/instrumentation/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpSingleRequestInstrumentation.java +++ b/dd-java-agent/instrumentation/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpSingleRequestInstrumentation.java @@ -79,9 +79,11 @@ public static AgentScope methodEnter( if (request != null) { propagate().inject(span, request, headers); - propagate() - .injectPathwayContext( - span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } // Request is immutable, so we have to assign new value once we update headers request = headers.getRequest(); } diff --git a/dd-java-agent/instrumentation/akka-http/akka-http-10.6/src/main/java11/datadog/trace/instrumentation/akkahttp106/SingleRequestAdvice.java b/dd-java-agent/instrumentation/akka-http/akka-http-10.6/src/main/java11/datadog/trace/instrumentation/akkahttp106/SingleRequestAdvice.java index 8e987bc8ec8..dbe46124392 100644 --- a/dd-java-agent/instrumentation/akka-http/akka-http-10.6/src/main/java11/datadog/trace/instrumentation/akkahttp106/SingleRequestAdvice.java +++ b/dd-java-agent/instrumentation/akka-http/akka-http-10.6/src/main/java11/datadog/trace/instrumentation/akkahttp106/SingleRequestAdvice.java @@ -29,9 +29,11 @@ public static AgentScope methodEnter( if (request != null) { propagate().inject(span, request, headers); - propagate() - .injectPathwayContext( - span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } // Request is immutable, so we have to assign new value once we update headers request = headers.getRequest(); } diff --git a/dd-java-agent/instrumentation/apache-httpasyncclient-4/src/main/java/datadog/trace/instrumentation/apachehttpasyncclient/DelegatingRequestProducer.java b/dd-java-agent/instrumentation/apache-httpasyncclient-4/src/main/java/datadog/trace/instrumentation/apachehttpasyncclient/DelegatingRequestProducer.java index c39f1d54fa3..fd5bc3fc6ff 100644 --- a/dd-java-agent/instrumentation/apache-httpasyncclient-4/src/main/java/datadog/trace/instrumentation/apachehttpasyncclient/DelegatingRequestProducer.java +++ b/dd-java-agent/instrumentation/apache-httpasyncclient-4/src/main/java/datadog/trace/instrumentation/apachehttpasyncclient/DelegatingRequestProducer.java @@ -35,8 +35,10 @@ public HttpRequest generateRequest() throws IOException, HttpException { DECORATE.onRequest(span, new HostAndRequestAsHttpUriRequest(delegate.getTarget(), request)); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } return request; } diff --git a/dd-java-agent/instrumentation/apache-httpclient-4/src/main/java/datadog/trace/instrumentation/apachehttpclient/HelperMethods.java b/dd-java-agent/instrumentation/apache-httpclient-4/src/main/java/datadog/trace/instrumentation/apachehttpclient/HelperMethods.java index 959510b9699..5c4fd04bd78 100644 --- a/dd-java-agent/instrumentation/apache-httpclient-4/src/main/java/datadog/trace/instrumentation/apachehttpclient/HelperMethods.java +++ b/dd-java-agent/instrumentation/apache-httpclient-4/src/main/java/datadog/trace/instrumentation/apachehttpclient/HelperMethods.java @@ -46,9 +46,11 @@ private static AgentScope activateHttpSpan(final HttpUriRequest request) { // AWS calls are often signed, so we can't add headers without breaking the signature. if (!awsClientCall) { propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext( - span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } } return scope; diff --git a/dd-java-agent/instrumentation/apache-httpclient-5/src/main/java/datadog/trace/instrumentation/apachehttpclient5/DelegatingRequestChannel.java b/dd-java-agent/instrumentation/apache-httpclient-5/src/main/java/datadog/trace/instrumentation/apachehttpclient5/DelegatingRequestChannel.java index 36ae6cfc5c6..989da40d517 100644 --- a/dd-java-agent/instrumentation/apache-httpclient-5/src/main/java/datadog/trace/instrumentation/apachehttpclient5/DelegatingRequestChannel.java +++ b/dd-java-agent/instrumentation/apache-httpclient-5/src/main/java/datadog/trace/instrumentation/apachehttpclient5/DelegatingRequestChannel.java @@ -28,8 +28,10 @@ public void sendRequest(HttpRequest request, EntityDetails entityDetails, HttpCo DECORATE.onRequest(span, request); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } delegate.sendRequest(request, entityDetails, context); } } diff --git a/dd-java-agent/instrumentation/apache-httpclient-5/src/main/java/datadog/trace/instrumentation/apachehttpclient5/HelperMethods.java b/dd-java-agent/instrumentation/apache-httpclient-5/src/main/java/datadog/trace/instrumentation/apachehttpclient5/HelperMethods.java index ebbde1b1ba6..5bf4d6c809b 100644 --- a/dd-java-agent/instrumentation/apache-httpclient-5/src/main/java/datadog/trace/instrumentation/apachehttpclient5/HelperMethods.java +++ b/dd-java-agent/instrumentation/apache-httpclient-5/src/main/java/datadog/trace/instrumentation/apachehttpclient5/HelperMethods.java @@ -46,9 +46,11 @@ private static AgentScope activateHttpSpan(final HttpRequest request) { // AWS calls are often signed, so we can't add headers without breaking the signature. if (!awsClientCall) { propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext( - span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } } return scope; diff --git a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/ClientCallImplInstrumentation.java b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/ClientCallImplInstrumentation.java index f5e70e8e9e5..89a2114f3f5 100644 --- a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/ClientCallImplInstrumentation.java +++ b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/ClientCallImplInstrumentation.java @@ -6,6 +6,7 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; import static datadog.trace.instrumentation.armeria.grpc.client.GrpcClientDecorator.CLIENT_PATHWAY_EDGE_TAGS; +import static datadog.trace.instrumentation.armeria.grpc.client.GrpcClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS; import static datadog.trace.instrumentation.armeria.grpc.client.GrpcClientDecorator.DECORATE; import static datadog.trace.instrumentation.armeria.grpc.client.GrpcClientDecorator.GRPC_MESSAGE; import static datadog.trace.instrumentation.armeria.grpc.client.GrpcClientDecorator.OPERATION_NAME; @@ -19,10 +20,12 @@ import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.agent.tooling.muzzle.Reference; +import datadog.trace.api.Config; import datadog.trace.api.InstrumenterConfig; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator; import io.grpc.ClientCall; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -121,7 +124,9 @@ public static AgentScope before( span = InstrumentationContext.get(ClientCall.class, AgentSpan.class).get(call); if (null != span) { propagate().inject(span, headers, SETTER); - propagate().injectPathwayContext(span, headers, SETTER, CLIENT_PATHWAY_EDGE_TAGS); + if (SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate().injectPathwayContext(span, headers, SETTER, CLIENT_PATHWAY_EDGE_TAGS); + } return activateSpan(span); } } diff --git a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/GrpcClientDecorator.java b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/GrpcClientDecorator.java index 060f88eaa43..04ab0c7b634 100644 --- a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/GrpcClientDecorator.java +++ b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/GrpcClientDecorator.java @@ -38,6 +38,8 @@ private static LinkedHashMap createClientPathwaySortedTags() { public static final LinkedHashMap CLIENT_PATHWAY_EDGE_TAGS = createClientPathwaySortedTags(); + public static final boolean SHOULD_INSTRUMENT_DATA_STREAMS = + Config.get().isGrpcDataStreamsEnabled(); public static final GrpcClientDecorator DECORATE = new GrpcClientDecorator(); diff --git a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java index 8e60fecaf30..6d5583abea9 100644 --- a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java @@ -45,6 +45,8 @@ public class TracingServerInterceptor implements ServerInterceptor { public static final TracingServerInterceptor INSTANCE = new TracingServerInterceptor(); private static final Set IGNORED_METHODS = Config.get().getGrpcIgnoredInboundMethods(); + private static final boolean SHOULD_INSTRUMENT_DATA_STREAMS = + Config.get().isGrpcDataStreamsEnabled(); private TracingServerInterceptor() {} @@ -69,9 +71,11 @@ public ServerCall.Listener interceptCall( final AgentSpan span = startSpan(DECORATE.instrumentationNames()[0], GRPC_SERVER, spanContext).setMeasured(true); - AgentTracer.get() - .getDataStreamsMonitoring() - .setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); + if (SHOULD_INSTRUMENT_DATA_STREAMS) { + AgentTracer.get() + .getDataStreamsMonitoring() + .setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); + } RequestContext reqContext = span.getRequestContext(); if (reqContext != null) { diff --git a/dd-java-agent/instrumentation/commons-httpclient-2/src/main/java/datadog/trace/instrumentation/commonshttpclient/CommonsHttpClientInstrumentation.java b/dd-java-agent/instrumentation/commons-httpclient-2/src/main/java/datadog/trace/instrumentation/commonshttpclient/CommonsHttpClientInstrumentation.java index 984bd66edd3..ee592842e57 100644 --- a/dd-java-agent/instrumentation/commons-httpclient-2/src/main/java/datadog/trace/instrumentation/commonshttpclient/CommonsHttpClientInstrumentation.java +++ b/dd-java-agent/instrumentation/commons-httpclient-2/src/main/java/datadog/trace/instrumentation/commonshttpclient/CommonsHttpClientInstrumentation.java @@ -66,9 +66,11 @@ public static AgentScope methodEnter(@Advice.Argument(1) final HttpMethod httpMe DECORATE.afterStart(span); DECORATE.onRequest(span, httpMethod); propagate().inject(span, httpMethod, SETTER); - propagate() - .injectPathwayContext( - span, httpMethod, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, httpMethod, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } return scope; } diff --git a/dd-java-agent/instrumentation/google-http-client/src/main/java/datadog/trace/instrumentation/googlehttpclient/GoogleHttpClientDecorator.java b/dd-java-agent/instrumentation/google-http-client/src/main/java/datadog/trace/instrumentation/googlehttpclient/GoogleHttpClientDecorator.java index 2bbf86d63a6..d1bc03564d4 100644 --- a/dd-java-agent/instrumentation/google-http-client/src/main/java/datadog/trace/instrumentation/googlehttpclient/GoogleHttpClientDecorator.java +++ b/dd-java-agent/instrumentation/google-http-client/src/main/java/datadog/trace/instrumentation/googlehttpclient/GoogleHttpClientDecorator.java @@ -38,8 +38,10 @@ public AgentSpan prepareSpan(AgentSpan span, HttpRequest request) { DECORATE.afterStart(span); DECORATE.onRequest(span, request); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } return span; } diff --git a/dd-java-agent/instrumentation/grizzly-client-1.9/src/main/java/datadog/trace/instrumentation/grizzly/client/AsyncHttpClientInstrumentation.java b/dd-java-agent/instrumentation/grizzly-client-1.9/src/main/java/datadog/trace/instrumentation/grizzly/client/AsyncHttpClientInstrumentation.java index f514b230d52..f5cb4cadd2f 100644 --- a/dd-java-agent/instrumentation/grizzly-client-1.9/src/main/java/datadog/trace/instrumentation/grizzly/client/AsyncHttpClientInstrumentation.java +++ b/dd-java-agent/instrumentation/grizzly-client-1.9/src/main/java/datadog/trace/instrumentation/grizzly/client/AsyncHttpClientInstrumentation.java @@ -67,9 +67,11 @@ public static void onEnter( DECORATE.afterStart(span); DECORATE.onRequest(span, request); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext( - span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } handler = new AsyncHandlerAdapter<>(span, parentSpan, handler); } } diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/ClientCallImplInstrumentation.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/ClientCallImplInstrumentation.java index f0234c39b95..de6412738be 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/ClientCallImplInstrumentation.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/ClientCallImplInstrumentation.java @@ -4,6 +4,7 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; import static datadog.trace.instrumentation.grpc.client.GrpcClientDecorator.CLIENT_PATHWAY_EDGE_TAGS; +import static datadog.trace.instrumentation.grpc.client.GrpcClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS; import static datadog.trace.instrumentation.grpc.client.GrpcClientDecorator.DECORATE; import static datadog.trace.instrumentation.grpc.client.GrpcInjectAdapter.SETTER; import static net.bytebuddy.matcher.ElementMatchers.isConstructor; @@ -16,6 +17,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator; import io.grpc.ClientCall; import io.grpc.Grpc; import io.grpc.Metadata; @@ -93,7 +95,9 @@ public static AgentScope before( span = InstrumentationContext.get(ClientCall.class, AgentSpan.class).get(call); if (null != span) { propagate().inject(span, headers, SETTER); - propagate().injectPathwayContext(span, headers, SETTER, CLIENT_PATHWAY_EDGE_TAGS); + if (SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate().injectPathwayContext(span, headers, SETTER, CLIENT_PATHWAY_EDGE_TAGS); + } return activateSpan(span); } return null; diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientDecorator.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientDecorator.java index cded71d6c8f..b541a65e450 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientDecorator.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientDecorator.java @@ -38,6 +38,8 @@ private static LinkedHashMap createClientPathwaySortedTags() { public static final LinkedHashMap CLIENT_PATHWAY_EDGE_TAGS = createClientPathwaySortedTags(); + public static final boolean SHOULD_INSTRUMENT_DATA_STREAMS = + Config.get().isGrpcDataStreamsEnabled(); public static final GrpcClientDecorator DECORATE = new GrpcClientDecorator(); diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java index 276524a098a..2262770c5ab 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java @@ -45,6 +45,8 @@ public class TracingServerInterceptor implements ServerInterceptor { public static final TracingServerInterceptor INSTANCE = new TracingServerInterceptor(); private static final Set IGNORED_METHODS = Config.get().getGrpcIgnoredInboundMethods(); + private static final boolean SHOULD_INSTRUMENT_DATA_STREAMS = + Config.get().isGrpcDataStreamsEnabled(); private TracingServerInterceptor() {} @@ -68,9 +70,11 @@ public ServerCall.Listener interceptCall( CallbackProvider cbp = tracer.getCallbackProvider(RequestContextSlot.APPSEC); final AgentSpan span = startSpan(GRPC_SERVER, spanContext).setMeasured(true); - AgentTracer.get() - .getDataStreamsMonitoring() - .setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); + if (SHOULD_INSTRUMENT_DATA_STREAMS) { + AgentTracer.get() + .getDataStreamsMonitoring() + .setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); + } RequestContext reqContext = span.getRequestContext(); if (reqContext != null) { diff --git a/dd-java-agent/instrumentation/http-url-connection/src/main/java/datadog/trace/instrumentation/http_url_connection/HttpUrlConnectionInstrumentation.java b/dd-java-agent/instrumentation/http-url-connection/src/main/java/datadog/trace/instrumentation/http_url_connection/HttpUrlConnectionInstrumentation.java index 63cf53d9c3d..ffb4b034fbf 100644 --- a/dd-java-agent/instrumentation/http-url-connection/src/main/java/datadog/trace/instrumentation/http_url_connection/HttpUrlConnectionInstrumentation.java +++ b/dd-java-agent/instrumentation/http-url-connection/src/main/java/datadog/trace/instrumentation/http_url_connection/HttpUrlConnectionInstrumentation.java @@ -85,9 +85,11 @@ public static HttpUrlState methodEnter( final AgentSpan span = state.start(thiz); if (!connected) { propagate().inject(span, thiz, SETTER); - propagate() - .injectPathwayContext( - span, thiz, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, thiz, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } } } return state; diff --git a/dd-java-agent/instrumentation/java-http-client/src/main/java11/datadog/trace/instrumentation/httpclient/HeadersAdvice.java b/dd-java-agent/instrumentation/java-http-client/src/main/java11/datadog/trace/instrumentation/httpclient/HeadersAdvice.java index fb625651681..905b27dcdfe 100644 --- a/dd-java-agent/instrumentation/java-http-client/src/main/java11/datadog/trace/instrumentation/httpclient/HeadersAdvice.java +++ b/dd-java-agent/instrumentation/java-http-client/src/main/java11/datadog/trace/instrumentation/httpclient/HeadersAdvice.java @@ -19,9 +19,11 @@ public static void methodExit(@Advice.Return(readOnly = false) HttpHeaders heade final Map> headerMap = new HashMap<>(headers.map()); final AgentSpan span = activeSpan(); propagate().inject(span, headerMap, SETTER); - propagate() - .injectPathwayContext( - span, headerMap, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, headerMap, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } headers = HttpHeaders.of(headerMap, KEEP); } } diff --git a/dd-java-agent/instrumentation/jax-rs-client-1.1/src/main/java/datadog/trace/instrumentation/jaxrs/v1/JaxRsClientV1Instrumentation.java b/dd-java-agent/instrumentation/jax-rs-client-1.1/src/main/java/datadog/trace/instrumentation/jaxrs/v1/JaxRsClientV1Instrumentation.java index d148b0e627b..5e78775f67d 100644 --- a/dd-java-agent/instrumentation/jax-rs-client-1.1/src/main/java/datadog/trace/instrumentation/jaxrs/v1/JaxRsClientV1Instrumentation.java +++ b/dd-java-agent/instrumentation/jax-rs-client-1.1/src/main/java/datadog/trace/instrumentation/jaxrs/v1/JaxRsClientV1Instrumentation.java @@ -76,9 +76,11 @@ public static AgentScope onEnter( request.getProperties().put(DD_SPAN_ATTRIBUTE, span); propagate().inject(span, request.getHeaders(), SETTER); - propagate() - .injectPathwayContext( - span, request.getHeaders(), SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request.getHeaders(), SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } return activateSpan(span); } return null; diff --git a/dd-java-agent/instrumentation/jax-rs-client-2.0/src/main/java/datadog/trace/instrumentation/jaxrs/ClientTracingFilter.java b/dd-java-agent/instrumentation/jax-rs-client-2.0/src/main/java/datadog/trace/instrumentation/jaxrs/ClientTracingFilter.java index 22b9373a8fd..cb8f969c227 100644 --- a/dd-java-agent/instrumentation/jax-rs-client-2.0/src/main/java/datadog/trace/instrumentation/jaxrs/ClientTracingFilter.java +++ b/dd-java-agent/instrumentation/jax-rs-client-2.0/src/main/java/datadog/trace/instrumentation/jaxrs/ClientTracingFilter.java @@ -29,12 +29,14 @@ public void filter(final ClientRequestContext requestContext) { DECORATE.onRequest(span, requestContext); propagate().inject(span, requestContext.getHeaders(), SETTER); - propagate() - .injectPathwayContext( - span, - requestContext.getHeaders(), - SETTER, - HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, + requestContext.getHeaders(), + SETTER, + HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } requestContext.setProperty(SPAN_PROPERTY_NAME, span); } diff --git a/dd-java-agent/instrumentation/jetty-client/jetty-client-10.0/src/main/java11/datadog/trace/instrumentation/jetty_client10/SendAdvice.java b/dd-java-agent/instrumentation/jetty-client/jetty-client-10.0/src/main/java11/datadog/trace/instrumentation/jetty_client10/SendAdvice.java index fb6073f9c40..0949739dfa1 100644 --- a/dd-java-agent/instrumentation/jetty-client/jetty-client-10.0/src/main/java11/datadog/trace/instrumentation/jetty_client10/SendAdvice.java +++ b/dd-java-agent/instrumentation/jetty-client/jetty-client-10.0/src/main/java11/datadog/trace/instrumentation/jetty_client10/SendAdvice.java @@ -26,8 +26,10 @@ public static AgentSpan methodEnter( DECORATE.afterStart(span); DECORATE.onRequest(span, request); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } return span; } diff --git a/dd-java-agent/instrumentation/jetty-client/jetty-client-12.0/src/main/java17/datadog/trace/instrumentation/jetty_client12/SendAdvice.java b/dd-java-agent/instrumentation/jetty-client/jetty-client-12.0/src/main/java17/datadog/trace/instrumentation/jetty_client12/SendAdvice.java index 2e09929c4a6..6ce74c09105 100644 --- a/dd-java-agent/instrumentation/jetty-client/jetty-client-12.0/src/main/java17/datadog/trace/instrumentation/jetty_client12/SendAdvice.java +++ b/dd-java-agent/instrumentation/jetty-client/jetty-client-12.0/src/main/java17/datadog/trace/instrumentation/jetty_client12/SendAdvice.java @@ -21,8 +21,10 @@ public static AgentScope methodEnter(@Advice.This final HttpRequest request) { JettyClientDecorator.DECORATE.afterStart(span); JettyClientDecorator.DECORATE.onRequest(span, request); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } return activateSpan(span); } diff --git a/dd-java-agent/instrumentation/jetty-client/jetty-client-9.1/src/main/java/datadog/trace/instrumentation/jetty_client91/JettyClientInstrumentation.java b/dd-java-agent/instrumentation/jetty-client/jetty-client-9.1/src/main/java/datadog/trace/instrumentation/jetty_client91/JettyClientInstrumentation.java index 89e038f5b98..205ea11be6a 100644 --- a/dd-java-agent/instrumentation/jetty-client/jetty-client-9.1/src/main/java/datadog/trace/instrumentation/jetty_client91/JettyClientInstrumentation.java +++ b/dd-java-agent/instrumentation/jetty-client/jetty-client-9.1/src/main/java/datadog/trace/instrumentation/jetty_client91/JettyClientInstrumentation.java @@ -92,9 +92,11 @@ public static AgentSpan methodEnter( DECORATE.afterStart(span); DECORATE.onRequest(span, request); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext( - span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } return span; } diff --git a/dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle b/dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle new file mode 100644 index 00000000000..2ee24aefad3 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle @@ -0,0 +1,34 @@ +muzzle { + pass { + group = "org.apache.kafka" + module = "connect-runtime" + versions = "[0.11.0.0,)" + assertInverse = true + } +} + +apply from: "$rootDir/gradle/java.gradle" + +dependencies { + compileOnly group: 'org.apache.kafka', name: 'connect-runtime', version: '0.11.0.0' + + testImplementation group: 'org.apache.kafka', name: 'connect-runtime', version: '0.11.0.0' + testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3' + testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0' + testImplementation(testFixtures(project(':dd-java-agent:agent-iast'))) + + + // IAST testing dependencies + testRuntimeOnly project(':dd-java-agent:instrumentation:iast-instrumenter') + testRuntimeOnly project(':dd-java-agent:instrumentation:java-lang') + testRuntimeOnly project(':dd-java-agent:instrumentation:java-io') + testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core') + testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core:jackson-core-2.8') + testImplementation(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.10') +} + +configurations.testRuntimeClasspath { + // spock-core depends on assertj version that is not compatible with kafka-clients + resolutionStrategy.force 'org.assertj:assertj-core:2.9.1' +} diff --git a/dd-java-agent/instrumentation/kafka-connect-0.11/gradle.lockfile b/dd-java-agent/instrumentation/kafka-connect-0.11/gradle.lockfile new file mode 100644 index 00000000000..eb091a85891 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-connect-0.11/gradle.lockfile @@ -0,0 +1,258 @@ +# This is a Gradle generated file for dependency locking. +# Manual edits can break the build and are not advised. +# This file is expected to be part of source control. +cafe.cryptography:curve25519-elisabeth:0.1.0=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +cafe.cryptography:ed25519-elisabeth:0.1.0=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +ch.qos.logback:logback-classic:1.2.3=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +ch.qos.logback:logback-core:1.2.3=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.101tec:zkclient:0.10=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.beust:jcommander:1.78=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +com.blogspot.mydailyjava:weak-lock-free:0.17=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.datadoghq.okhttp3:okhttp:3.12.15=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.datadoghq.okio:okio:1.17.6=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.datadoghq:dd-javac-plugin-client:0.1.7=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,muzzleBootstrap,muzzleTooling,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.datadoghq:java-dogstatsd-client:4.4.0=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.datadoghq:sketches-java:0.8.3=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.fasterxml.jackson.core:jackson-annotations:2.13.3=latestDepTestRuntimeClasspath +com.fasterxml.jackson.core:jackson-annotations:2.15.3=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +com.fasterxml.jackson.core:jackson-annotations:2.9.10=latestDepTestCompileClasspath,testCompileClasspath,testRuntimeClasspath +com.fasterxml.jackson.core:jackson-core:2.13.3=latestDepTestRuntimeClasspath +com.fasterxml.jackson.core:jackson-core:2.15.3=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +com.fasterxml.jackson.core:jackson-core:2.9.10=latestDepTestCompileClasspath,testCompileClasspath,testRuntimeClasspath +com.fasterxml.jackson.core:jackson-databind:2.13.3=latestDepTestRuntimeClasspath +com.fasterxml.jackson.core:jackson-databind:2.15.3=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +com.fasterxml.jackson.core:jackson-databind:2.9.10=latestDepTestCompileClasspath,testCompileClasspath,testRuntimeClasspath +com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.13.3=latestDepTestRuntimeClasspath +com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.13.3=latestDepTestRuntimeClasspath +com.fasterxml.jackson.module:jackson-module-scala_2.13:2.13.3=latestDepTestRuntimeClasspath +com.fasterxml.jackson:jackson-bom:2.13.3=latestDepTestRuntimeClasspath +com.github.javaparser:javaparser-core:3.25.1=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.jnr:jffi:1.3.13=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-a64asm:1.0.0=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-constants:0.10.4=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-enxio:0.32.17=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-ffi:2.2.16=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-posix:3.1.19=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-unixsocket:0.38.22=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-x86asm:1.0.2=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.github.luben:zstd-jni:1.5.2-1=latestDepTestRuntimeClasspath +com.github.luben:zstd-jni:1.5.6-3=iastLatestDepTest3RuntimeClasspath +com.github.spotbugs:spotbugs-annotations:4.2.0=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.spotbugs:spotbugs-annotations:4.7.3=spotbugs +com.github.spotbugs:spotbugs:4.7.3=spotbugs +com.github.stefanbirkner:system-rules:1.19.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.google.auto.service:auto-service-annotations:1.0-rc7=annotationProcessor,compileClasspath,iastLatestDepTest3AnnotationProcessor,iastLatestDepTest3CompileClasspath,latestDepTestAnnotationProcessor,latestDepTestCompileClasspath,testAnnotationProcessor,testCompileClasspath +com.google.auto.service:auto-service:1.0-rc7=annotationProcessor,iastLatestDepTest3AnnotationProcessor,latestDepTestAnnotationProcessor,testAnnotationProcessor +com.google.auto:auto-common:0.10=annotationProcessor,iastLatestDepTest3AnnotationProcessor,latestDepTestAnnotationProcessor,testAnnotationProcessor +com.google.code.findbugs:jsr305:3.0.2=annotationProcessor,compileClasspath,iastLatestDepTest3AnnotationProcessor,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestAnnotationProcessor,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,spotbugs,testAnnotationProcessor,testCompileClasspath,testRuntimeClasspath +com.google.code.gson:gson:2.9.1=spotbugs +com.google.errorprone:error_prone_annotations:2.2.0=annotationProcessor,iastLatestDepTest3AnnotationProcessor,latestDepTestAnnotationProcessor,testAnnotationProcessor +com.google.guava:failureaccess:1.0.1=annotationProcessor,iastLatestDepTest3AnnotationProcessor,latestDepTestAnnotationProcessor,testAnnotationProcessor +com.google.guava:guava:20.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.google.guava:guava:27.0.1-jre=annotationProcessor,iastLatestDepTest3AnnotationProcessor,latestDepTestAnnotationProcessor,testAnnotationProcessor +com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava=annotationProcessor,iastLatestDepTest3AnnotationProcessor,latestDepTestAnnotationProcessor,testAnnotationProcessor +com.google.j2objc:j2objc-annotations:1.1=annotationProcessor,iastLatestDepTest3AnnotationProcessor,latestDepTestAnnotationProcessor,testAnnotationProcessor +com.google.re2j:re2j:1.7=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +com.squareup.moshi:moshi:1.11.0=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.squareup.okhttp3:logging-interceptor:3.12.12=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.squareup.okhttp3:okhttp:3.12.12=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.squareup.okio:okio:1.17.5=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.thoughtworks.paranamer:paranamer:2.8=latestDepTestRuntimeClasspath +com.thoughtworks.qdox:qdox:1.12.1=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +com.typesafe.scala-logging:scala-logging_2.13:3.9.4=latestDepTestRuntimeClasspath +com.yammer.metrics:metrics-core:2.2.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +commons-cli:commons-cli:1.4=latestDepTestRuntimeClasspath +commons-codec:commons-codec:1.15=spotbugs +commons-fileupload:commons-fileupload:1.5=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +commons-io:commons-io:2.11.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +commons-logging:commons-logging:1.2=testCompileClasspath,testRuntimeClasspath +de.thetaphi:forbiddenapis:3.1=compileClasspath +info.picocli:picocli:4.6.3=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +io.dropwizard.metrics:metrics-core:4.1.12.1=latestDepTestRuntimeClasspath +io.micrometer:micrometer-commons:1.13.4=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +io.micrometer:micrometer-observation:1.13.4=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +io.netty:netty-buffer:4.1.63.Final=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +io.netty:netty-codec:4.1.63.Final=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +io.netty:netty-common:4.1.63.Final=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +io.netty:netty-handler:4.1.63.Final=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +io.netty:netty-resolver:4.1.63.Final=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +io.netty:netty-transport-native-epoll:4.1.63.Final=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +io.netty:netty-transport-native-unix-common:4.1.63.Final=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +io.netty:netty-transport:4.1.63.Final=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +io.sqreen:libsqreen:11.0.1=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +javax.activation:activation:1.1=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +javax.servlet:javax.servlet-api:3.1.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +javax.xml.bind:jaxb-api:2.2.3=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +javax.xml.stream:stax-api:1.0-2=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +jaxen:jaxen:1.2.0=spotbugs +jline:jline:2.14.6=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +junit:junit-dep:4.11=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +junit:junit:4.13.2=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +log4j:log4j:1.2.16=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +net.bytebuddy:byte-buddy-agent:1.14.18=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +net.bytebuddy:byte-buddy:1.14.18=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +net.java.dev.jna:jna-platform:5.8.0=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +net.java.dev.jna:jna:5.8.0=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +net.jcip:jcip-annotations:1.0=compileClasspath,iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,spotbugs,testCompileClasspath,testRuntimeClasspath +net.jpountz.lz4:lz4:1.3.0=compileClasspath,testCompileClasspath,testRuntimeClasspath +net.sf.jopt-simple:jopt-simple:5.0.3=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +net.sf.jopt-simple:jopt-simple:5.0.4=latestDepTestRuntimeClasspath +net.sf.saxon:Saxon-HE:11.4=spotbugs +net.sourceforge.argparse4j:argparse4j:0.7.0=latestDepTestRuntimeClasspath +org.apache.ant:ant-antlr:1.10.12=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.apache.ant:ant-antlr:1.9.15=codenarc +org.apache.ant:ant-junit:1.10.12=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.apache.ant:ant-junit:1.9.15=codenarc +org.apache.ant:ant-launcher:1.10.12=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.apache.ant:ant:1.10.12=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.bcel:bcel:6.5.0=spotbugs +org.apache.commons:commons-lang3:3.12.0=spotbugs +org.apache.commons:commons-text:1.10.0=spotbugs +org.apache.httpcomponents.client5:httpclient5:5.1.3=spotbugs +org.apache.httpcomponents.core5:httpcore5-h2:5.1.3=spotbugs +org.apache.httpcomponents.core5:httpcore5:5.1.3=spotbugs +org.apache.kafka:kafka-clients:0.11.0.0=compileClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.kafka:kafka-clients:3.2.3=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.apache.kafka:kafka-clients:3.8.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.apache.kafka:kafka-metadata:3.2.3=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.apache.kafka:kafka-raft:3.2.3=latestDepTestRuntimeClasspath +org.apache.kafka:kafka-server-common:3.2.3=latestDepTestRuntimeClasspath +org.apache.kafka:kafka-storage-api:3.2.3=latestDepTestRuntimeClasspath +org.apache.kafka:kafka-storage:3.2.3=latestDepTestRuntimeClasspath +org.apache.kafka:kafka-streams-test-utils:3.2.3=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.apache.kafka:kafka-streams:3.2.3=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.apache.kafka:kafka_2.11:0.11.0.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.kafka:kafka_2.13:3.2.3=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.apache.logging.log4j:log4j-api:2.19.0=spotbugs +org.apache.logging.log4j:log4j-core:2.19.0=spotbugs +org.apache.yetus:audience-annotations:0.5.0=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.apache.zookeeper:zookeeper-jute:3.6.3=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.apache.zookeeper:zookeeper:3.4.10=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.zookeeper:zookeeper:3.6.3=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.apiguardian:apiguardian-api:1.1.2=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.assertj:assertj-core:2.9.1=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.assertj:assertj-core:3.19.0=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.bitbucket.b_c:jose4j:0.7.9=latestDepTestRuntimeClasspath +org.checkerframework:checker-qual:2.5.2=annotationProcessor,iastLatestDepTest3AnnotationProcessor,latestDepTestAnnotationProcessor,testAnnotationProcessor +org.codehaus.groovy:groovy-all:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-ant:2.5.14=codenarc +org.codehaus.groovy:groovy-ant:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-astbuilder:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-cli-picocli:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-console:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-datetime:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-docgenerator:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-groovydoc:2.5.14=codenarc +org.codehaus.groovy:groovy-groovydoc:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-groovysh:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-jmx:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-json:2.5.14=codenarc +org.codehaus.groovy:groovy-json:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-jsr223:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-macro:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-nio:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-servlet:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-sql:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-swing:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-templates:2.5.14=codenarc +org.codehaus.groovy:groovy-templates:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-test-junit5:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-test:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-testng:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-xml:2.5.14=codenarc +org.codehaus.groovy:groovy-xml:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy:2.5.14=codenarc +org.codehaus.groovy:groovy:3.0.17=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.codehaus.mojo:animal-sniffer-annotations:1.17=annotationProcessor,iastLatestDepTest3AnnotationProcessor,latestDepTestAnnotationProcessor,testAnnotationProcessor +org.codenarc:CodeNarc:2.2.0=codenarc +org.dom4j:dom4j:2.1.3=spotbugs +org.eclipse.jetty:jetty-http:9.4.56.v20240826=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.eclipse.jetty:jetty-io:9.4.56.v20240826=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.eclipse.jetty:jetty-server:9.4.56.v20240826=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.eclipse.jetty:jetty-util:9.4.56.v20240826=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.gmetrics:GMetrics:1.1=codenarc +org.hamcrest:hamcrest-core:1.3=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.hamcrest:hamcrest:2.2=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.jctools:jctools-core:3.3.0=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +org.junit.jupiter:junit-jupiter-api:5.9.2=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.junit.jupiter:junit-jupiter-engine:5.9.2=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-commons:1.9.2=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-engine:1.9.2=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-launcher:1.9.2=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-runner:1.9.2=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-suite-api:1.9.2=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-suite-commons:1.9.2=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.junit:junit-bom:5.9.1=spotbugs +org.junit:junit-bom:5.9.2=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.lz4:lz4-java:1.8.0=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath +org.mockito:mockito-core:2.19.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.objenesis:objenesis:3.3=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opentest4j:opentest4j:1.2.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.ow2.asm:asm-analysis:9.2=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +org.ow2.asm:asm-analysis:9.4=spotbugs +org.ow2.asm:asm-commons:9.2=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +org.ow2.asm:asm-commons:9.4=spotbugs +org.ow2.asm:asm-tree:9.2=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +org.ow2.asm:asm-tree:9.4=spotbugs +org.ow2.asm:asm-util:9.2=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +org.ow2.asm:asm-util:9.4=spotbugs +org.ow2.asm:asm:9.2=iastLatestDepTest3RuntimeClasspath,instrumentPluginClasspath,latestDepTestRuntimeClasspath,muzzleTooling,runtimeClasspath,testRuntimeClasspath +org.ow2.asm:asm:9.4=spotbugs +org.rocksdb:rocksdbjni:6.29.4.1=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.scala-lang.modules:scala-collection-compat_2.13:2.6.0=latestDepTestRuntimeClasspath +org.scala-lang.modules:scala-java8-compat_2.13:1.0.2=latestDepTestRuntimeClasspath +org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.scala-lang:scala-library:2.11.11=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.scala-lang:scala-library:2.13.8=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.scala-lang:scala-reflect:2.13.8=latestDepTestRuntimeClasspath +org.slf4j:jcl-over-slf4j:1.7.30=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.slf4j:jul-to-slf4j:1.7.30=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.slf4j:log4j-over-slf4j:1.7.30=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.slf4j:slf4j-api:1.7.30=compileClasspath,iastLatestDepTest3CompileClasspath,instrumentPluginClasspath,latestDepTestCompileClasspath,muzzleBootstrap,muzzleTooling,runtimeClasspath,testCompileClasspath +org.slf4j:slf4j-api:1.7.32=testRuntimeClasspath +org.slf4j:slf4j-api:1.7.36=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath +org.slf4j:slf4j-api:2.0.0=spotbugs,spotbugsSlf4j +org.slf4j:slf4j-simple:2.0.0=spotbugsSlf4j +org.spockframework:spock-core:2.2-groovy-3.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.spockframework:spock-junit4:2.2-groovy-3.0=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,latestDepTestCompileClasspath,latestDepTestRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework.kafka:spring-kafka-test:2.9.13=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework.kafka:spring-kafka:1.3.3.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework.kafka:spring-kafka:2.9.13=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework.kafka:spring-kafka:3.2.4=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.springframework.retry:spring-retry:1.2.2.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework.retry:spring-retry:1.3.4=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework.retry:spring-retry:2.0.9=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.springframework:spring-aop:4.3.14.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-aop:5.3.29=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework:spring-aop:6.1.13=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.springframework:spring-beans:4.3.14.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-beans:5.3.29=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework:spring-beans:6.1.13=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.springframework:spring-context:4.3.14.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-context:5.3.29=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework:spring-context:6.1.13=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.springframework:spring-core:4.3.14.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-core:5.3.29=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework:spring-core:6.1.13=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.springframework:spring-expression:4.3.14.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-expression:5.3.29=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework:spring-expression:6.1.13=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.springframework:spring-jcl:5.3.29=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework:spring-jcl:6.1.13=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.springframework:spring-messaging:4.3.14.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-messaging:5.3.29=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework:spring-messaging:6.1.13=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.springframework:spring-test:4.3.14.RELEASE=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework:spring-test:5.3.29=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework:spring-tx:4.3.14.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-tx:5.3.29=latestDepTestCompileClasspath,latestDepTestRuntimeClasspath +org.springframework:spring-tx:6.1.13=iastLatestDepTest3CompileClasspath,iastLatestDepTest3RuntimeClasspath +org.testng:testng:7.5=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.webjars:jquery:3.5.1=iastLatestDepTest3RuntimeClasspath,latestDepTestRuntimeClasspath,testRuntimeClasspath +org.xerial.snappy:snappy-java:1.1.10.5=iastLatestDepTest3RuntimeClasspath +org.xerial.snappy:snappy-java:1.1.2.6=compileClasspath,testCompileClasspath,testRuntimeClasspath +org.xerial.snappy:snappy-java:1.1.8.4=latestDepTestRuntimeClasspath +org.xmlresolver:xmlresolver:4.4.3=spotbugs +xml-apis:xml-apis:1.4.01=spotbugs +empty=spotbugsPlugins diff --git a/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/ConnectWorkerInstrumentation.java b/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/ConnectWorkerInstrumentation.java new file mode 100644 index 00000000000..0ca323c6eca --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/ConnectWorkerInstrumentation.java @@ -0,0 +1,65 @@ +package datadog.trace.instrumentation.kafka_connect; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import net.bytebuddy.matcher.ElementMatchers; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.runtime.TaskStatus.Listener; + +import java.util.Arrays; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.*; + +@AutoService(InstrumenterModule.class) +public final class ConnectWorkerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy { + + static final String TARGET_TYPE = "org.apache.kafka.connect.runtime.WorkerTask"; + + public ConnectWorkerInstrumentation() { + super("kafka", "kafka-connect"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".TaskListener", + }; + } + + @Override + public String hierarchyMarkerType() { + return TARGET_TYPE; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(named(hierarchyMarkerType())); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isConstructor().and(takesArgument(0, named("org.apache.kafka.connect.util.ConnectorTaskId"))) + .and(takesArgument(1, named("org.apache.kafka.connect.runtime.TaskStatus$Listener"))), + ConnectWorkerInstrumentation.class.getName() + "$ConstructorAdvice"); + } + + public static class ConstructorAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void wrap( + @Advice.Argument(value = 0, readOnly = true) ConnectorTaskId id, + @Advice.Argument(value = 1, readOnly = false) Listener statusListener + ) { + statusListener = new TaskListener(statusListener); + System.out.println("building worker task!!" + id.connector()); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/TaskListener.java b/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/TaskListener.java new file mode 100644 index 00000000000..e19bc15d083 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/TaskListener.java @@ -0,0 +1,41 @@ +package datadog.trace.instrumentation.kafka_connect; + +import org.apache.kafka.connect.runtime.TaskStatus.Listener; +import org.apache.kafka.connect.util.ConnectorTaskId; + +public class TaskListener implements Listener{ + final private Listener delegate; + public TaskListener(Listener delegate) { + this.delegate = delegate; + + } + @Override + public void onStartup(ConnectorTaskId connectorTaskId) { + System.out.println("start up" + connectorTaskId.connector()); + delegate.onStartup(connectorTaskId); + } + + @Override + public void onPause(ConnectorTaskId connectorTaskId) { + System.out.println("pause" + connectorTaskId.connector()); + delegate.onPause(connectorTaskId); + } + + @Override + public void onResume(ConnectorTaskId connectorTaskId) { + System.out.println("resume" + connectorTaskId.connector()); + delegate.onResume(connectorTaskId); + } + + @Override + public void onFailure(ConnectorTaskId connectorTaskId, Throwable throwable) { + System.out.println("failure" + connectorTaskId.connector()); + delegate.onFailure(connectorTaskId, throwable); + } + + @Override + public void onShutdown(ConnectorTaskId connectorTaskId) { + System.out.println("shutdown" + connectorTaskId.connector()); + delegate.onShutdown(connectorTaskId); + } +} diff --git a/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/KafkaClientTestBase.groovy new file mode 100644 index 00000000000..f17c4ad8fb5 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -0,0 +1,1323 @@ +import datadog.trace.common.writer.ListWriter + +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan + +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.agent.test.naming.VersionedNamingTestBase +import datadog.trace.api.Config +import datadog.trace.api.DDTags +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.core.DDSpan +import datadog.trace.core.datastreams.StatsGroup +import datadog.trace.test.util.Flaky +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.Rule +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.BatchMessageListener +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.rule.KafkaEmbedded +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils + +import java.util.concurrent.ExecutionException +import java.util.concurrent.Future +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +abstract class KafkaClientTestBase extends VersionedNamingTestBase { + static final SHARED_TOPIC = "shared.topic" + + @Rule + KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC) + + @Override + void configurePreAgent() { + super.configurePreAgent() + + injectSysConfig("dd.kafka.e2e.duration.enabled", "true") + } + + public static final LinkedHashMap PRODUCER_PATHWAY_EDGE_TAGS + + // filter out Kafka poll, since the function is called in a loop, giving inconsistent results + final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll")) + } + } + + final ListWriter.Filter dropEmptyKafkaPoll = new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll") && + trace.get(0).getTag(InstrumentationTags.KAFKA_RECORDS_COUNT).equals(0)) + } + } + + // TraceID, start times & names changed based on the configuration, so overriding the sort to give consistent test results + private static class SortKafkaTraces implements Comparator> { + @Override + int compare(List o1, List o2) { + return rootSpanTrace(o1) - rootSpanTrace(o2) + } + + int rootSpanTrace(List trace) { + assert !trace.isEmpty() + def rootSpan = trace.get(0).localRootSpan + switch (rootSpan.operationName.toString()) { + case "parent": + return 3 + case "kafka.poll": + return 2 + default: + return 1 + } + } + } + + + static { + PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<>(3) + PRODUCER_PATHWAY_EDGE_TAGS.put("direction", "out") + PRODUCER_PATHWAY_EDGE_TAGS.put("topic", SHARED_TOPIC) + PRODUCER_PATHWAY_EDGE_TAGS.put("type", "kafka") + } + + def setup() { + TEST_WRITER.setFilter(dropKafkaPoll) + } + + @Override + int version() { + 0 + } + + @Override + String operation() { + return null + } + + String operationForProducer() { + "kafka.produce" + } + + String operationForConsumer() { + "kafka.consume" + } + + String serviceForTimeInQueue() { + "kafka" + } + + abstract boolean hasQueueSpan() + + abstract boolean splitByDestination() + + @Override + protected boolean isDataStreamsEnabled() { + return true + } + + def "test extracting avro schema"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + Producer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new AvroMockSerializer()) + + when: + AvroMock message = new AvroMock("{\"name\":\"test\"}") + runUnderTrace("parent") { + producer.send(new ProducerRecord(SHARED_TOPIC, message)) { meta, ex -> + assert activeScope().isAsyncPropagating() + if (ex == null) { + runUnderTrace("producer callback") {} + } else { + runUnderTrace("producer exception: " + ex) {} + } + } + blockUntilChildSpansFinished(2) + } + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(1) + TEST_DATA_STREAMS_WRITER.waitForBacklogs(1) + } + + then: + // check that the message was received + assertTraces(1, SORT_TRACES_BY_ID) { + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false, false, "{\"name\":\"test\"}") + } + } + + cleanup: + producer.close() + } + + def "test kafka produce and consume"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + TEST_WRITER.setFilter(dropEmptyKafkaPoll) + KafkaProducer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) + String clusterId = "" + if (isDataStreamsEnabled()) { + producer.flush() + clusterId = producer.metadata.cluster.clusterResource().clusterId() + while (clusterId == null || clusterId.isEmpty()) { + Thread.sleep(1500) + clusterId = producer.metadata.cluster.clusterResource().clusterId() + } + } + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + String greeting = "Hello Spring Kafka Sender!" + runUnderTrace("parent") { + producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> + assert activeScope().isAsyncPropagating() + if (ex == null) { + runUnderTrace("producer callback") {} + } else { + runUnderTrace("producer exception: " + ex) {} + } + } + blockUntilChildSpansFinished(2) + } + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + // wait for produce offset 0, commit offset 0 on partition 0 and 1, and commit offset 1 on 1 partition. + TEST_DATA_STREAMS_WRITER.waitForBacklogs(4) + } + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + int nTraces = isDataStreamsEnabled() ? 3 : 2 + int produceTraceIdx = nTraces - 1 + TEST_WRITER.waitForTraces(nTraces) + def traces = (Arrays.asList(TEST_WRITER.toArray()) as List>) + Collections.sort(traces, new SortKafkaTraces()) + assertTraces(nTraces, new SortKafkaTraces()) { + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, span(1)) + queueSpan(it, trace(produceTraceIdx)[2]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(produceTraceIdx)[2]) + } + } + if (isDataStreamsEnabled()) { + trace(1, { + pollSpan(it) + }) + } + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false) + } + } + def headers = received.headers() + headers.iterator().hasNext() + new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${traces[produceTraceIdx][2].traceId}" + new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${traces[produceTraceIdx][2].spanId}" + + if (isDataStreamsEnabled()) { + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + verifyAll(first) { + edgeTags == ["direction:out", "kafka_cluster_id:$clusterId", "topic:$SHARED_TOPIC".toString(), "type:kafka"] + edgeTags.size() == 4 + } + + StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } + verifyAll(second) { + edgeTags == [ + "direction:in", + "group:sender", + "kafka_cluster_id:$clusterId", + "topic:$SHARED_TOPIC".toString(), + "type:kafka" + ] + edgeTags.size() == 5 + } + List produce = [ + "kafka_cluster_id:$clusterId", + "partition:"+received.partition(), + "topic:"+SHARED_TOPIC, + "type:kafka_produce" + ] + List commit = [ + "consumer_group:sender", + "kafka_cluster_id:$clusterId", + "partition:"+received.partition(), + "topic:"+SHARED_TOPIC, + "type:kafka_commit" + ] + verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) { + contains(new AbstractMap.SimpleEntry, Long>(commit, 1).toString()) + contains(new AbstractMap.SimpleEntry, Long>(produce, 0).toString()) + } + } + + cleanup: + producer.close() + container?.stop() + } + + def "test producing message too large"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + // set a low max request size, so that we can crash it + senderProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10) + Producer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) + + when: + String greeting = "Hello Spring Kafka" + Future future = producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> + } + future.get() + then: + thrown ExecutionException + cleanup: + producer.close() + } + + def "test spring kafka template produce and consume"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + String clusterId = null + if (isDataStreamsEnabled()) { + clusterId = waitForKafkaMetadataUpdate(kafkaTemplate) + } + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + String greeting = "Hello Spring Kafka Sender!" + runUnderTrace("parent") { + kafkaTemplate.send(SHARED_TOPIC, greeting).addCallback({ + runUnderTrace("producer callback") {} + }, { ex -> + runUnderTrace("producer exception: " + ex) {} + }) + blockUntilChildSpansFinished(2) + } + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + // wait for produce offset 0, commit offset 0 on partition 0 and 1, and commit offset 1 on 1 partition. + TEST_DATA_STREAMS_WRITER.waitForBacklogs(4) + } + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1]) + queueSpan(it, trace(0)[2]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[2]) + } + } + } + + def headers = received.headers() + headers.iterator().hasNext() + new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[0][2].traceId}" + new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[0][2].spanId}" + + if (isDataStreamsEnabled()) { + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + verifyAll(first) { + edgeTags == [ + "direction:out", + "kafka_cluster_id:$clusterId".toString(), + "topic:$SHARED_TOPIC".toString(), + "type:kafka" + ] + edgeTags.size() == 4 + } + + StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } + verifyAll(second) { + edgeTags == [ + "direction:in", + "group:sender", + "kafka_cluster_id:$clusterId".toString(), + "topic:$SHARED_TOPIC".toString(), + "type:kafka" + ] + edgeTags.size() == 5 + } + List produce = [ + "kafka_cluster_id:$clusterId".toString(), + "partition:"+received.partition(), + "topic:"+SHARED_TOPIC, + "type:kafka_produce" + ] + List commit = [ + "consumer_group:sender", + "kafka_cluster_id:$clusterId".toString(), + "partition:"+received.partition(), + "topic:"+SHARED_TOPIC, + "type:kafka_commit" + ] + verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) { + contains(new AbstractMap.SimpleEntry, Long>(commit, 1).toString()) + contains(new AbstractMap.SimpleEntry, Long>(produce, 0).toString()) + } + } + + cleanup: + producerFactory.stop() + container?.stop() + } + + def "test pass through tombstone"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + kafkaTemplate.send(SHARED_TOPIC, null) + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == null + received.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(1) { + producerSpan(it, senderProps, null, false, true) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1], 0..0, true) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0], 0..0, true) + } + } + } + + cleanup: + producerFactory.stop() + container?.stop() + } + + def "test records(TopicPartition) kafka consume"() { + setup: + // set up the Kafka consumer properties + def kafkaPartition = 0 + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + + then: + TEST_WRITER.waitForTraces(1) + def pollResult = KafkaTestUtils.getRecords(consumer) + + def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator() + + def first = null + if (recs.hasNext()) { + first = recs.next() + } + + then: + recs.hasNext() == false + first.value() == greeting + first.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(1) { + producerSpan(it, senderProps) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1]) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0]) + } + } + } + + cleanup: + consumer.close() + producer.close() + + } + + def "test records(TopicPartition).subList kafka consume"() { + setup: + // set up the Kafka consumer properties + def kafkaPartition = 0 + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + + then: + TEST_WRITER.waitForTraces(1) + def pollResult = KafkaTestUtils.getRecords(consumer) + + def records = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)) + def recs = records.subList(0, records.size()).iterator() + + def first = null + if (recs.hasNext()) { + first = recs.next() + } + + then: + recs.hasNext() == false + first.value() == greeting + first.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(1) { + producerSpan(it, senderProps) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1]) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0]) + } + } + } + + cleanup: + consumer.close() + producer.close() + + } + + def "test records(TopicPartition).forEach kafka consume"() { + setup: + // set up the Kafka consumer properties + def kafkaPartition = 0 + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + + then: + TEST_WRITER.waitForTraces(1) + def pollResult = KafkaTestUtils.getRecords(consumer) + + def records = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)) + + def last = null + records.forEach { + last = it + assert activeSpan() != null + } + + then: + records.size() == 1 + last.value() == greeting + last.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(1) { + producerSpan(it, senderProps) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1]) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0]) + } + } + } + + cleanup: + consumer.close() + producer.close() + + } + + def "test iteration backwards over ConsumerRecords"() { + setup: + def kafkaPartition = 0 + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greetings = ["msg 1", "msg 2", "msg 3"] + greetings.each { + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, it)) + } + + then: + TEST_WRITER.waitForTraces(3) + def pollRecords = KafkaTestUtils.getRecords(consumer) + + def listIter = + pollRecords.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).listIterator() + + then: + def receivedSet = greetings.toSet() + while (listIter.hasNext()) { + listIter.next() + } + while (listIter.hasPrevious()) { + def record = listIter.previous() + receivedSet.remove(record.value()) + assert record.key() == null + } + receivedSet.isEmpty() + + assertTraces(9, SORT_TRACES_BY_ID) { + + // producing traces + trace(1) { + producerSpan(it, senderProps) + } + trace(1) { + producerSpan(it, senderProps) + } + trace(1) { + producerSpan(it, senderProps) + } + + // iterating to the end of ListIterator: + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(3)[1], 0..0) + queueSpan(it, trace(0)[0]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(4)[1], 1..1) + queueSpan(it, trace(1)[0]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(5)[1], 2..2) + queueSpan(it, trace(2)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0], 0..0) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(1)[0], 1..1) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(2)[0], 2..2) + } + } + + // backwards iteration over ListIterator to the beginning + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(6)[1], 2..2) + queueSpan(it, trace(2)[0]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(7)[1], 1..1) + queueSpan(it, trace(1)[0]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(8)[1], 0..0) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(2)[0], 2..2) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(1)[0], 1..1) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0], 0..0) + } + } + } + + cleanup: + consumer.close() + producer.close() + + } + + @Flaky("Repeatedly fails with a partition set to 1 but expects 0 https://github.com/DataDog/dd-trace-java/issues/3864") + def "test spring kafka template produce and batch consume"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + String clusterId = null + if (isDataStreamsEnabled()) { + clusterId = waitForKafkaMetadataUpdate(kafkaTemplate) + } + + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + def containerProperties = containerProperties() + + + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + def records = new LinkedBlockingQueue>() + container.setupMessageListener(new BatchMessageListener() { + @Override + void onMessage(List> consumerRecords) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + consumerRecords.each { + records.add(it) + } + } + }) + container.start() + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + List greetings = ["msg 1", "msg 2", "msg 3"] + runUnderTrace("parent") { + for (g in greetings) { + kafkaTemplate.send(SHARED_TOPIC, g).addCallback({ + runUnderTrace("producer callback") {} + }, { ex -> + runUnderTrace("producer exception: " + ex) {} + }) + } + blockUntilChildSpansFinished(2 * greetings.size()) + } + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + // wait for produce offset 0, commit offset 0 on partition 0 and 1, and commit offset 1 on 1 partition. + TEST_DATA_STREAMS_WRITER.waitForBacklogs(4) + } + + then: + def receivedSet = greetings.toSet() + greetings.eachWithIndex { g, i -> + def received = records.poll(5, TimeUnit.SECONDS) + receivedSet.remove(received.value()) //maybe received out of order in case several partitions + assert received.key() == null + + def headers = received.headers() + assert headers.iterator().hasNext() + + } + assert receivedSet.isEmpty() + + assertTraces(4, SORT_TRACES_BY_ID) { + trace(7) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false) + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false) + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false) + } + + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1], 0..0) + queueSpan(it, trace(0)[6]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(2)[1], 0..1) + queueSpan(it, trace(0)[4]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(3)[1], 0..1) + queueSpan(it, trace(0)[2]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[6], 0..0) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[4], 0..1) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[2], 0..1) + } + } + } + + if (isDataStreamsEnabled()) { + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + verifyAll(first) { + edgeTags == ["direction:out", "kafka_cluster_id:$clusterId", "topic:$SHARED_TOPIC".toString(), "type:kafka"] + edgeTags.size() == 4 + } + + StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } + verifyAll(second) { + edgeTags == [ + "direction:in", + "group:sender", + "kafka_cluster_id:$clusterId", + "topic:$SHARED_TOPIC".toString(), + "type:kafka" + ] + edgeTags.size() == 5 + } + } + + cleanup: + producerFactory.stop() + container?.stop() + } + + def "test kafka client header propagation manual config"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + if (isDataStreamsEnabled()) { + // even if header propagation is disabled, we want data streams to work. + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + } + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + String message = "Testing without headers" + injectSysConfig("kafka.client.propagation.enabled", value) + kafkaTemplate.send(SHARED_TOPIC, message) + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + + received.headers().iterator().hasNext() == expected + + cleanup: + producerFactory.stop() + container?.stop() + + where: + value | expected + "false" | false + "true" | true + } + + def containerProperties() { + try { + // Different class names for test and latestDepTest. + return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) + } + } + + def producerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null, + boolean partitioned = true, + boolean tombstone = false, + String schema = null + ) { + trace.span { + serviceName service() + operationName operationForProducer() + resourceName "Produce Topic $SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + if (partitioned) { + "$InstrumentationTags.PARTITION" { it >= 0 } + } + if (tombstone) { + "$InstrumentationTags.TOMBSTONE" true + } + if ({isDataStreamsEnabled()}) { + "$DDTags.PATHWAY_HASH" { String } + if (schema != null) { + "$DDTags.SCHEMA_DEFINITION" schema + "$DDTags.SCHEMA_WEIGHT" 1 + "$DDTags.SCHEMA_TYPE" "avro" + "$DDTags.SCHEMA_OPERATION" "serialization" + "$DDTags.SCHEMA_ID" "10810872322569724838" + } + } + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + defaultTags() + } + } + } + + def queueSpan( + TraceAssert trace, + DDSpan parentSpan = null + ) { + trace.span { + serviceName splitByDestination() ? "$SHARED_TOPIC" : serviceForTimeInQueue() + operationName "kafka.deliver" + resourceName "$SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_BROKER + defaultTags(true) + } + } + } + + def consumerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null, + Range offset = 0..0, + boolean tombstone = false, + boolean distributedRootSpan = !hasQueueSpan() + ) { + trace.span { + serviceName service() + operationName operationForConsumer() + resourceName "Consume Topic $SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "$InstrumentationTags.PARTITION" { it >= 0 } + "$InstrumentationTags.OFFSET" { offset.containsWithinBounds(it as int) } + "$InstrumentationTags.CONSUMER_GROUP" "sender" + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } + "$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 } + if (tombstone) { + "$InstrumentationTags.TOMBSTONE" true + } + if ({isDataStreamsEnabled()}) { + "$DDTags.PATHWAY_HASH" { String } + } + defaultTags(distributedRootSpan) + } + } + } + + def pollSpan( + TraceAssert trace, + int recordCount = 1, + DDSpan parentSpan = null, + Range offset = 0..0, + boolean tombstone = false, + boolean distributedRootSpan = !hasQueueSpan() + ) { + trace.span { + serviceName Config.get().getServiceName() + operationName "kafka.poll" + resourceName "kafka.poll" + errored false + measured false + tags { + "$InstrumentationTags.KAFKA_RECORDS_COUNT" recordCount + defaultTags(true) + } + } + } + + def waitForKafkaMetadataUpdate(KafkaTemplate kafkaTemplate) { + kafkaTemplate.flush() + Producer wrappedProducer = kafkaTemplate.getTheProducer() + assert(wrappedProducer instanceof DefaultKafkaProducerFactory.CloseSafeProducer) + Producer producer = wrappedProducer.delegate + assert(producer instanceof KafkaProducer) + String clusterId = producer.metadata.cluster.clusterResource().clusterId() + while (clusterId == null || clusterId.isEmpty()) { + Thread.sleep(1500) + clusterId = producer.metadata.cluster.clusterResource().clusterId() + } + return clusterId + } +} + +abstract class KafkaClientForkedTest extends KafkaClientTestBase { + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.kafka.legacy.tracing.enabled", "false") + injectSysConfig("dd.service", "KafkaClientTest") + } + + @Override + boolean hasQueueSpan() { + return true + } + + @Override + boolean splitByDestination() { + return false + } +} + +class KafkaClientV0ForkedTest extends KafkaClientForkedTest { + @Override + String service() { + return "KafkaClientTest" + } +} + +class KafkaClientV1ForkedTest extends KafkaClientForkedTest { + @Override + int version() { + 1 + } + + @Override + String service() { + return "KafkaClientTest" + } + + @Override + String operationForProducer() { + "kafka.send" + } + + @Override + String operationForConsumer() { + return "kafka.process" + } + + @Override + String serviceForTimeInQueue() { + "kafka-queue" + } + + @Override + boolean hasQueueSpan() { + false + } +} + +class KafkaClientSplitByDestinationForkedTest extends KafkaClientTestBase { + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.service", "KafkaClientTest") + injectSysConfig("dd.kafka.legacy.tracing.enabled", "false") + injectSysConfig("dd.message.broker.split-by-destination", "true") + } + + @Override + String service() { + return "KafkaClientTest" + } + + @Override + boolean hasQueueSpan() { + return true + } + + @Override + boolean splitByDestination() { + return true + } +} + +abstract class KafkaClientLegacyTracingForkedTest extends KafkaClientTestBase { + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.service", "KafkaClientLegacyTest") + injectSysConfig("dd.kafka.legacy.tracing.enabled", "true") + } + + @Override + String service() { + return "kafka" + } + + @Override + boolean hasQueueSpan() { + return false + } + + @Override + boolean splitByDestination() { + return false + } +} + +class KafkaClientLegacyTracingV0ForkedTest extends KafkaClientLegacyTracingForkedTest{ + + +} + +class KafkaClientLegacyTracingV1ForkedTest extends KafkaClientLegacyTracingForkedTest{ + + @Override + int version() { + 1 + } + + @Override + String operationForProducer() { + "kafka.send" + } + + @Override + String operationForConsumer() { + return "kafka.process" + } + + @Override + String service() { + return Config.get().getServiceName() + } +} + +class KafkaClientDataStreamsDisabledForkedTest extends KafkaClientTestBase { + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.service", "KafkaClientDataStreamsDisabledForkedTest") + injectSysConfig("dd.kafka.legacy.tracing.enabled", "true") + } + + @Override + String service() { + return "kafka" + } + + @Override + boolean hasQueueSpan() { + return false + } + + @Override + boolean splitByDestination() { + return false + } + + @Override + boolean isDataStreamsEnabled() { + return false + } +} diff --git a/dd-java-agent/instrumentation/netty-3.8/src/main/java/datadog/trace/instrumentation/netty38/client/HttpClientRequestTracingHandler.java b/dd-java-agent/instrumentation/netty-3.8/src/main/java/datadog/trace/instrumentation/netty38/client/HttpClientRequestTracingHandler.java index 9c6adf8aba6..4f217f95b49 100644 --- a/dd-java-agent/instrumentation/netty-3.8/src/main/java/datadog/trace/instrumentation/netty38/client/HttpClientRequestTracingHandler.java +++ b/dd-java-agent/instrumentation/netty-3.8/src/main/java/datadog/trace/instrumentation/netty38/client/HttpClientRequestTracingHandler.java @@ -66,9 +66,11 @@ public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent m } propagate().inject(span, request.headers(), SETTER); - propagate() - .injectPathwayContext( - span, request.headers(), SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request.headers(), SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } channelTraceContext.setClientSpan(span); diff --git a/dd-java-agent/instrumentation/netty-4.0/src/main/java/datadog/trace/instrumentation/netty40/client/HttpClientRequestTracingHandler.java b/dd-java-agent/instrumentation/netty-4.0/src/main/java/datadog/trace/instrumentation/netty40/client/HttpClientRequestTracingHandler.java index c6ee72f517d..38aec17f669 100644 --- a/dd-java-agent/instrumentation/netty-4.0/src/main/java/datadog/trace/instrumentation/netty40/client/HttpClientRequestTracingHandler.java +++ b/dd-java-agent/instrumentation/netty-4.0/src/main/java/datadog/trace/instrumentation/netty40/client/HttpClientRequestTracingHandler.java @@ -89,9 +89,11 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann // AWS calls are often signed, so we can't add headers without breaking the signature. if (!awsClientCall) { propagate().inject(span, request.headers(), SETTER); - propagate() - .injectPathwayContext( - span, request.headers(), SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request.headers(), SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } } ctx.channel().attr(SPAN_ATTRIBUTE_KEY).set(span); diff --git a/dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/client/HttpClientRequestTracingHandler.java b/dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/client/HttpClientRequestTracingHandler.java index a21522f1a45..90d6c740318 100644 --- a/dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/client/HttpClientRequestTracingHandler.java +++ b/dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/client/HttpClientRequestTracingHandler.java @@ -90,9 +90,11 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann // AWS calls are often signed, so we can't add headers without breaking the signature. if (!awsClientCall) { propagate().inject(span, request.headers(), SETTER); - propagate() - .injectPathwayContext( - span, request.headers(), SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request.headers(), SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } } ctx.channel().attr(SPAN_ATTRIBUTE_KEY).set(span); diff --git a/dd-java-agent/instrumentation/okhttp-2/src/main/java/datadog/trace/instrumentation/okhttp2/TracingInterceptor.java b/dd-java-agent/instrumentation/okhttp-2/src/main/java/datadog/trace/instrumentation/okhttp2/TracingInterceptor.java index dcea94a5cd7..9725305a248 100644 --- a/dd-java-agent/instrumentation/okhttp-2/src/main/java/datadog/trace/instrumentation/okhttp2/TracingInterceptor.java +++ b/dd-java-agent/instrumentation/okhttp-2/src/main/java/datadog/trace/instrumentation/okhttp2/TracingInterceptor.java @@ -27,9 +27,11 @@ public Response intercept(final Chain chain) throws IOException { final Request.Builder requestBuilder = chain.request().newBuilder(); propagate().inject(span, requestBuilder, SETTER); - propagate() - .injectPathwayContext( - span, requestBuilder, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, requestBuilder, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } final Response response; try { diff --git a/dd-java-agent/instrumentation/okhttp-3/src/main/java/datadog/trace/instrumentation/okhttp3/TracingInterceptor.java b/dd-java-agent/instrumentation/okhttp-3/src/main/java/datadog/trace/instrumentation/okhttp3/TracingInterceptor.java index b2862aa9f73..ebb50787272 100644 --- a/dd-java-agent/instrumentation/okhttp-3/src/main/java/datadog/trace/instrumentation/okhttp3/TracingInterceptor.java +++ b/dd-java-agent/instrumentation/okhttp-3/src/main/java/datadog/trace/instrumentation/okhttp3/TracingInterceptor.java @@ -30,9 +30,11 @@ public Response intercept(final Chain chain) throws IOException { final Request.Builder requestBuilder = chain.request().newBuilder(); propagate().inject(span, requestBuilder, SETTER); - propagate() - .injectPathwayContext( - span, requestBuilder, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, requestBuilder, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } final Response response; try { diff --git a/dd-java-agent/instrumentation/pekko-http-1.0/src/main/java/datadog/trace/instrumentation/pekkohttp/PekkoHttpSingleRequestInstrumentation.java b/dd-java-agent/instrumentation/pekko-http-1.0/src/main/java/datadog/trace/instrumentation/pekkohttp/PekkoHttpSingleRequestInstrumentation.java index 5f1f324c334..eb67d177873 100644 --- a/dd-java-agent/instrumentation/pekko-http-1.0/src/main/java/datadog/trace/instrumentation/pekkohttp/PekkoHttpSingleRequestInstrumentation.java +++ b/dd-java-agent/instrumentation/pekko-http-1.0/src/main/java/datadog/trace/instrumentation/pekkohttp/PekkoHttpSingleRequestInstrumentation.java @@ -80,9 +80,11 @@ public static AgentScope methodEnter( if (request != null) { propagate().inject(span, request, headers); - propagate() - .injectPathwayContext( - span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } // Request is immutable, so we have to assign new value once we update headers request = headers.getRequest(); } diff --git a/dd-java-agent/instrumentation/play-ws-1/src/main/java/datadog/trace/instrumentation/playws1/PlayWSClientInstrumentation.java b/dd-java-agent/instrumentation/play-ws-1/src/main/java/datadog/trace/instrumentation/playws1/PlayWSClientInstrumentation.java index 4f7401034b8..aef8f65fa82 100644 --- a/dd-java-agent/instrumentation/play-ws-1/src/main/java/datadog/trace/instrumentation/playws1/PlayWSClientInstrumentation.java +++ b/dd-java-agent/instrumentation/play-ws-1/src/main/java/datadog/trace/instrumentation/playws1/PlayWSClientInstrumentation.java @@ -30,9 +30,11 @@ public static AgentSpan methodEnter( DECORATE.afterStart(span); DECORATE.onRequest(span, request); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext( - span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } if (asyncHandler instanceof StreamedAsyncHandler) { asyncHandler = new StreamedAsyncHandlerWrapper((StreamedAsyncHandler) asyncHandler, span); diff --git a/dd-java-agent/instrumentation/play-ws-2.1/src/main/java/datadog/trace/instrumentation/playws21/PlayWSClientInstrumentation.java b/dd-java-agent/instrumentation/play-ws-2.1/src/main/java/datadog/trace/instrumentation/playws21/PlayWSClientInstrumentation.java index 796e1f049dd..81197ce2e81 100644 --- a/dd-java-agent/instrumentation/play-ws-2.1/src/main/java/datadog/trace/instrumentation/playws21/PlayWSClientInstrumentation.java +++ b/dd-java-agent/instrumentation/play-ws-2.1/src/main/java/datadog/trace/instrumentation/playws21/PlayWSClientInstrumentation.java @@ -30,9 +30,11 @@ public static AgentSpan methodEnter( DECORATE.afterStart(span); DECORATE.onRequest(span, request); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext( - span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } if (asyncHandler instanceof StreamedAsyncHandler) { asyncHandler = new StreamedAsyncHandlerWrapper((StreamedAsyncHandler) asyncHandler, span); diff --git a/dd-java-agent/instrumentation/play-ws-2/src/main/java/datadog/trace/instrumentation/playws2/PlayWSClientInstrumentation.java b/dd-java-agent/instrumentation/play-ws-2/src/main/java/datadog/trace/instrumentation/playws2/PlayWSClientInstrumentation.java index e795667d7f3..c4ce0479da1 100644 --- a/dd-java-agent/instrumentation/play-ws-2/src/main/java/datadog/trace/instrumentation/playws2/PlayWSClientInstrumentation.java +++ b/dd-java-agent/instrumentation/play-ws-2/src/main/java/datadog/trace/instrumentation/playws2/PlayWSClientInstrumentation.java @@ -30,9 +30,11 @@ public static AgentSpan methodEnter( DECORATE.afterStart(span); DECORATE.onRequest(span, request); propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext( - span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } if (asyncHandler instanceof StreamedAsyncHandler) { asyncHandler = new StreamedAsyncHandlerWrapper((StreamedAsyncHandler) asyncHandler, span); diff --git a/dd-java-agent/instrumentation/servlet/src/main/java/datadog/trace/instrumentation/servlet/dispatcher/RequestDispatcherInstrumentation.java b/dd-java-agent/instrumentation/servlet/src/main/java/datadog/trace/instrumentation/servlet/dispatcher/RequestDispatcherInstrumentation.java index 1d88985971e..b1f37f65f0d 100644 --- a/dd-java-agent/instrumentation/servlet/src/main/java/datadog/trace/instrumentation/servlet/dispatcher/RequestDispatcherInstrumentation.java +++ b/dd-java-agent/instrumentation/servlet/src/main/java/datadog/trace/instrumentation/servlet/dispatcher/RequestDispatcherInstrumentation.java @@ -121,9 +121,11 @@ public static AgentScope start( // In case we lose context, inject trace into to the request. propagate().inject(span, request, SETTER); - propagate() - .injectPathwayContext( - span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + if (HttpClientDecorator.SHOULD_INSTRUMENT_DATA_STREAMS) { + propagate() + .injectPathwayContext( + span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS); + } // temporarily replace from request to avoid spring resource name bubbling up: requestSpan = request.getAttribute(DD_SPAN_ATTRIBUTE); diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index 6b7688b9b91..6d2b61d1e8d 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -152,6 +152,8 @@ public final class TraceInstrumentationConfig { public static final String JAX_RS_ADDITIONAL_ANNOTATIONS = "trace.jax-rs.additional.annotations"; /** If set, the instrumentation will set its resource name on the local root too. */ public static final String AXIS_PROMOTE_RESOURCE_NAME = "trace.axis.promote.resource-name"; + public static final String HTTP_DATA_STREAMS_ENABLED = "data.streams.http.enabled"; + public static final String GRPC_DATA_STREAMS_ENABLED = "data.streams.http.enabled"; private TraceInstrumentationConfig() {} } diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 9a8a6b6dad5..99deb54732c 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -167,6 +167,7 @@ public static String getHostName() { private final boolean httpClientTagQueryString; private final boolean httpClientTagHeaders; private final boolean httpClientSplitByDomain; + private final boolean httpDataStreamsEnabled; private final boolean dbClientSplitByInstance; private final boolean dbClientSplitByInstanceTypeSuffix; private final boolean dbClientSplitByHost; @@ -463,6 +464,7 @@ public static String getHostName() { private final boolean grpcServerTrimPackageResource; private final BitSet grpcServerErrorStatuses; private final BitSet grpcClientErrorStatuses; + private final boolean grpcDataStreamsEnabled; private final boolean cwsEnabled; private final int cwsTlsRefresh; @@ -812,6 +814,9 @@ private Config(final ConfigProvider configProvider, final InstrumenterConfig ins httpClientSplitByDomain = configProvider.getBoolean( HTTP_CLIENT_HOST_SPLIT_BY_DOMAIN, DEFAULT_HTTP_CLIENT_SPLIT_BY_DOMAIN); + httpDataStreamsEnabled = + configProvider.getBoolean( + HTTP_DATA_STREAMS_ENABLED, false); dbClientSplitByInstance = configProvider.getBoolean( @@ -1597,6 +1602,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) grpcClientErrorStatuses = configProvider.getIntegerRange( GRPC_CLIENT_ERROR_STATUSES, DEFAULT_GRPC_CLIENT_ERROR_STATUSES); + grpcDataStreamsEnabled = configProvider.getBoolean(GRPC_DATA_STREAMS_ENABLED, false); hystrixTagsEnabled = configProvider.getBoolean(HYSTRIX_TAGS_ENABLED, false); hystrixMeasuredEnabled = configProvider.getBoolean(HYSTRIX_MEASURED_ENABLED, false); @@ -2015,6 +2021,10 @@ public boolean isHttpClientSplitByDomain() { return httpClientSplitByDomain; } + public boolean isHttpDataStreamsEnabled() { + return httpDataStreamsEnabled; + } + public boolean isDbClientSplitByInstance() { return dbClientSplitByInstance; } @@ -3176,6 +3186,10 @@ public BitSet getGrpcClientErrorStatuses() { return grpcClientErrorStatuses; } + public boolean isGrpcDataStreamsEnabled() { + return grpcDataStreamsEnabled; + } + public boolean isCouchbaseInternalSpansEnabled() { return couchbaseInternalSpansEnabled; } diff --git a/settings.gradle b/settings.gradle index f2c5f11f343..685e71e4b61 100644 --- a/settings.gradle +++ b/settings.gradle @@ -337,6 +337,7 @@ include ':dd-java-agent:instrumentation:kafka-clients-0.11' include 'dd-java-agent:instrumentation:kafka-clients-3.8' include ':dd-java-agent:instrumentation:kafka-streams-0.11' include ':dd-java-agent:instrumentation:kafka-streams-1.0' +include ':dd-java-agent:instrumentation:kafka-connect-0.11' include ':dd-java-agent:instrumentation:karate' include ':dd-java-agent:instrumentation:kotlin-coroutines' include ':dd-java-agent:instrumentation:kotlin-coroutines:coroutines-1.3'