diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java index 9383c9f1f29..ca2a241f9c1 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java @@ -1,6 +1,7 @@ package datadog.trace.bootstrap.instrumentation.decorator; import static datadog.trace.api.cache.RadixTreeCache.UNSET_STATUS; +import static datadog.trace.api.datastreams.DataStreamsContext.fromTags; import static datadog.trace.api.gateway.Events.EVENTS; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; import static datadog.trace.bootstrap.instrumentation.decorator.http.HttpResourceDecorator.HTTP_RESOURCE_DECORATOR; @@ -148,7 +149,7 @@ public AgentSpan startSpan( } AgentPropagation.ContextVisitor getter = getter(); if (null != carrier && null != getter) { - tracer().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); + tracer().getDataStreamsMonitoring().setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS)); } return span; } diff --git a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java index c5b0cee8e9b..5b22d0d927b 100644 --- a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.armeria.grpc.server; +import static datadog.trace.api.datastreams.DataStreamsContext.fromTags; import static datadog.trace.api.gateway.Events.EVENTS; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; @@ -71,7 +72,7 @@ public ServerCall.Listener interceptCall( AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); + .setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS)); RequestContext reqContext = span.getRequestContext(); if (reqContext != null) { diff --git a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java index 088649206c2..59b2c306f93 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.aws.v0; +import static datadog.trace.api.datastreams.DataStreamsContext.create; import static datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities.RPC_COMMAND_NAME; import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT; @@ -264,7 +265,7 @@ public AgentSpan onServiceResponse( AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, 0, responseSize); + .setCheckpoint(span, create(sortedTags, 0, responseSize)); } if ("PutObjectRequest".equalsIgnoreCase(awsOperation) @@ -285,7 +286,7 @@ public AgentSpan onServiceResponse( AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, 0, payloadSize); + .setCheckpoint(span, create(sortedTags, 0, payloadSize)); } } } diff --git a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java index 463263a9ce0..080980b8a02 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.aws.v0; +import static datadog.trace.api.datastreams.DataStreamsContext.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.XRAY_TRACING_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.blackholeSpan; @@ -19,6 +20,7 @@ import datadog.context.propagation.Propagators; import datadog.trace.api.Config; import datadog.trace.api.datastreams.AgentDataStreamsMonitoring; +import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.PathwayContext; import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; @@ -122,8 +124,8 @@ public void afterResponse(final Request request, final Response response) AgentDataStreamsMonitoring dataStreamsMonitoring = AgentTracer.get().getDataStreamsMonitoring(); PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext(); - pathwayContext.setCheckpoint( - sortedTags, dataStreamsMonitoring::add, arrivalTime.getTime()); + DataStreamsContext context = create(sortedTags, arrivalTime.getTime(), 0); + pathwayContext.setCheckpoint(context, dataStreamsMonitoring::add); if (!span.context().getPathwayContext().isStarted()) { span.context().mergePathwayContext(pathwayContext); } diff --git a/dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java b/dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java index d80bc2df4da..3e9912a3dae 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.aws.v2; +import static datadog.trace.api.datastreams.DataStreamsContext.create; import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN; import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT; import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG; @@ -361,7 +362,8 @@ public AgentSpan onSdkResponse( AgentTracer.get().getDataStreamsMonitoring(); PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext(); pathwayContext.setCheckpoint( - sortedTags, dataStreamsMonitoring::add, arrivalTime.toEpochMilli()); + create(sortedTags, arrivalTime.toEpochMilli(), 0), + dataStreamsMonitoring::add); if (!span.context().getPathwayContext().isStarted()) { span.context().mergePathwayContext(pathwayContext); } @@ -391,7 +393,7 @@ public AgentSpan onSdkResponse( AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, 0, responseSize); + .setCheckpoint(span, create(sortedTags, 0, responseSize)); } if ("PutObject".equalsIgnoreCase(awsOperation)) { @@ -411,7 +413,7 @@ public AgentSpan onSdkResponse( AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, 0, payloadSize); + .setCheckpoint(span, create(sortedTags, 0, payloadSize)); } } } diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java index ab7067801df..0cd6f6cbcdf 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.aws.v1.sqs; +import static datadog.trace.api.datastreams.DataStreamsContext.create; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; @@ -90,7 +91,7 @@ protected void startNewMessageSpan(Message message) { sortedTags.put(DIRECTION_TAG, DIRECTION_IN); sortedTags.put(TOPIC_TAG, urlFileName(queueUrl)); sortedTags.put(TYPE_TAG, "sqs"); - AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0); + AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(sortedTags, 0, 0)); CONSUMER_DECORATE.afterStart(span); CONSUMER_DECORATE.onConsume(span, queueUrl); diff --git a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java index 2bff193532b..503d5efa5b2 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.aws.v2.sqs; +import static datadog.trace.api.datastreams.DataStreamsContext.create; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; @@ -92,7 +93,7 @@ protected void startNewMessageSpan(Message message) { sortedTags.put(DIRECTION_TAG, DIRECTION_IN); sortedTags.put(TOPIC_TAG, urlFileName(queueUrl)); sortedTags.put(TYPE_TAG, "sqs"); - AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0); + AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(sortedTags, 0, 0)); CONSUMER_DECORATE.afterStart(span); CONSUMER_DECORATE.onConsume(span, queueUrl, requestId); diff --git a/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java b/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java index 31c430def97..f966ca8b3c1 100644 --- a/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java +++ b/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java @@ -13,6 +13,7 @@ import datadog.trace.api.Functions; import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; +import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.naming.SpanNaming; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; @@ -141,9 +142,10 @@ public AgentSpan onConsume(final PubsubMessage message, final String subscriptio .getDataStreamsMonitoring() .setCheckpoint( span, - sortedTags, - publishTime.getSeconds() * 1_000 + publishTime.getNanos() / (int) 1e6, - message.getSerializedSize()); + DataStreamsContext.create( + sortedTags, + publishTime.getSeconds() * 1_000 + publishTime.getNanos() / (int) 1e6, + message.getSerializedSize())); afterStart(span); span.setResourceName( CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(parsedSubscription, CONSUMER_PREFIX)); diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java index aaf5385619d..954d7458231 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.grpc.server; +import static datadog.trace.api.datastreams.DataStreamsContext.fromTags; import static datadog.trace.api.gateway.Events.EVENTS; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; @@ -70,7 +71,7 @@ public ServerCall.Listener interceptCall( AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); + .setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS)); RequestContext reqContext = span.getRequestContext(); if (reqContext != null) { diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java index e7a35eccc9a..610ad20a2ca 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java @@ -1,6 +1,6 @@ package datadog.trace.instrumentation.kafka_clients; -import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka; +import static datadog.trace.api.datastreams.DataStreamsContext.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; @@ -113,7 +113,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { if (STREAMING_CONTEXT.isDisabledForTopic(val.topic())) { AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, val.timestamp(), payloadSize); + .setCheckpoint(span, create(sortedTags, val.timestamp(), payloadSize)); } else { // when we're in a streaming context we want to consume only from source topics if (STREAMING_CONTEXT.isSourceTopic(val.topic())) { @@ -122,7 +122,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { // some other instance of the application, breaking the context propagation // for DSM users Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); - DataStreamsContext dsmContext = fromKafka(sortedTags, val.timestamp(), payloadSize); + DataStreamsContext dsmContext = create(sortedTags, val.timestamp(), payloadSize); dsmPropagator.inject(span.with(dsmContext), val.headers(), SETTER); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java index 942cac3e990..1c127778141 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java @@ -1,6 +1,6 @@ package datadog.trace.instrumentation.kafka_clients38; -import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka; +import static datadog.trace.api.datastreams.DataStreamsContext.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; @@ -113,7 +113,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { if (StreamingContext.STREAMING_CONTEXT.isDisabledForTopic(val.topic())) { AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, val.timestamp(), payloadSize); + .setCheckpoint(span, create(sortedTags, val.timestamp(), payloadSize)); } else { // when we're in a streaming context we want to consume only from source topics if (StreamingContext.STREAMING_CONTEXT.isSourceTopic(val.topic())) { @@ -122,7 +122,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { // some other instance of the application, breaking the context propagation // for DSM users Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); - DataStreamsContext dsmContext = fromKafka(sortedTags, val.timestamp(), payloadSize); + DataStreamsContext dsmContext = create(sortedTags, val.timestamp(), payloadSize); dsmPropagator.inject(span.with(dsmContext), val.headers(), SETTER); } } diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java index 51c663e9b57..33d62f4cdb2 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java @@ -1,7 +1,7 @@ package datadog.trace.instrumentation.kafka_streams; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; -import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka; +import static datadog.trace.api.datastreams.DataStreamsContext.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; @@ -265,11 +265,11 @@ public static void start( if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())) { AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, record.timestamp, payloadSize); + .setCheckpoint(span, create(sortedTags, record.timestamp, payloadSize)); } else { if (STREAMING_CONTEXT.isSourceTopic(record.topic())) { Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); - DataStreamsContext dsmContext = fromKafka(sortedTags, record.timestamp, payloadSize); + DataStreamsContext dsmContext = create(sortedTags, record.timestamp, payloadSize); dsmPropagator.inject(span.with(dsmContext), record, SR_SETTER); } } @@ -347,11 +347,11 @@ public static void start( if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())) { AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, record.timestamp(), payloadSize); + .setCheckpoint(span, create(sortedTags, record.timestamp(), payloadSize)); } else { if (STREAMING_CONTEXT.isSourceTopic(record.topic())) { Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); - DataStreamsContext dsmContext = fromKafka(sortedTags, record.timestamp(), payloadSize); + DataStreamsContext dsmContext = create(sortedTags, record.timestamp(), payloadSize); dsmPropagator.inject(span.with(dsmContext), record, PR_SETTER); } } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java index da98323640e..baf0985199c 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.rabbitmq.amqp; +import static datadog.trace.api.datastreams.DataStreamsContext.create; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -253,7 +254,7 @@ public static AgentScope startReceivingSpan( sortedTags.put(TYPE_TAG, "rabbitmq"); AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, produceMillis, 0); + .setCheckpoint(span, create(sortedTags, produceMillis, 0)); } CONSUMER_DECORATE.afterStart(span); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index d54c392a47b..577a2e82160 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -86,7 +86,6 @@ import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.monitor.MonitoringImpl; import datadog.trace.core.monitor.TracerHealthMetrics; -import datadog.trace.core.propagation.CorePropagation; import datadog.trace.core.propagation.ExtractedContext; import datadog.trace.core.propagation.HttpCodec; import datadog.trace.core.propagation.PropagationTags; @@ -228,7 +227,7 @@ public static CoreTracerBuilder builder() { private final SortedSet interceptors = new ConcurrentSkipListSet<>(Comparator.comparingInt(TraceInterceptor::priority)); - private final CorePropagation propagation; + private final AgentPropagation propagation; private final boolean logs128bTraceIdEnabled; private final InstrumentationGateway instrumentationGateway; @@ -714,8 +713,8 @@ private CoreTracer( sharedCommunicationObjects.whenReady(this.dataStreamsMonitoring::start); - // Store all propagators to propagation -- only DSM injection left - this.propagation = new CorePropagation(this.dataStreamsMonitoring.injector()); + // TODO Need to be removed + this.propagation = new AgentPropagation() {}; // Register context propagators HttpCodec.Extractor tracingExtractor = diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java deleted file mode 100644 index f831d1cf10d..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java +++ /dev/null @@ -1,64 +0,0 @@ -package datadog.trace.core.datastreams; - -import datadog.trace.api.TraceConfig; -import datadog.trace.api.time.TimeSource; -import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; -import datadog.trace.bootstrap.instrumentation.api.TagContext; -import datadog.trace.core.propagation.HttpCodec; -import java.util.function.Supplier; - -public class DataStreamContextExtractor implements HttpCodec.Extractor { - private final HttpCodec.Extractor delegate; - private final TimeSource timeSource; - private final Supplier traceConfigSupplier; - private final long hashOfKnownTags; - private final String serviceNameOverride; - - public DataStreamContextExtractor( - HttpCodec.Extractor delegate, - TimeSource timeSource, - Supplier traceConfigSupplier, - long hashOfKnownTags, - String serviceNameOverride) { - this.delegate = delegate; - this.timeSource = timeSource; - this.traceConfigSupplier = traceConfigSupplier; - this.hashOfKnownTags = hashOfKnownTags; - this.serviceNameOverride = serviceNameOverride; - } - - @Override - public TagContext extract(C carrier, AgentPropagation.ContextVisitor getter) { - // Delegate the default HTTP extraction - TagContext extracted = this.delegate.extract(carrier, getter); - - if (extracted != null) { - boolean shouldExtractPathwayContext = - extracted.getTraceConfig() == null - ? traceConfigSupplier.get().isDataStreamsEnabled() - : extracted.getTraceConfig().isDataStreamsEnabled(); - - if (shouldExtractPathwayContext) { - DefaultPathwayContext pathwayContext = - DefaultPathwayContext.extract( - carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); - - extracted.withPathwayContext(pathwayContext); - } - - return extracted; - } else if (traceConfigSupplier.get().isDataStreamsEnabled()) { - DefaultPathwayContext pathwayContext = - DefaultPathwayContext.extract( - carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); - - if (pathwayContext != null) { - extracted = new TagContext(); - extracted.withPathwayContext(pathwayContext); - return extracted; - } - } - - return null; - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java deleted file mode 100644 index 2994aa9518e..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java +++ /dev/null @@ -1,90 +0,0 @@ -package datadog.trace.core.datastreams; - -import static datadog.trace.api.DDTags.PATHWAY_HASH; -import static datadog.trace.api.datastreams.PathwayContext.PROPAGATION_KEY_BASE64; - -import datadog.trace.api.datastreams.PathwayContext; -import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import java.io.IOException; -import java.util.LinkedHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DataStreamContextInjector { - private static final Logger LOGGER = LoggerFactory.getLogger(DataStreamContextInjector.class); - private final DataStreamsMonitoring dataStreamsMonitoring; - - public DataStreamContextInjector(DataStreamsMonitoring dataStreamsMonitoring) { - this.dataStreamsMonitoring = dataStreamsMonitoring; - } - - public void injectPathwayContext( - AgentSpan span, - C carrier, - AgentPropagation.Setter setter, - LinkedHashMap sortedTags) { - injectPathwayContext(span, carrier, setter, sortedTags, 0, 0, true); - } - - public void injectPathwayContext( - AgentSpan span, - C carrier, - AgentPropagation.Setter setter, - LinkedHashMap sortedTags, - long defaultTimestamp, - long payloadSizeBytes) { - injectPathwayContext( - span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes, true); - } - - /** Same as injectPathwayContext, but the stats collected in the StatsPoint are not sent. */ - public void injectPathwayContextWithoutSendingStats( - AgentSpan span, - C carrier, - AgentPropagation.Setter setter, - LinkedHashMap sortedTags) { - injectPathwayContext(span, carrier, setter, sortedTags, 0, 0, false); - } - - private void injectPathwayContext( - AgentSpan span, - C carrier, - AgentPropagation.Setter setter, - LinkedHashMap sortedTags, - long defaultTimestamp, - long payloadSizeBytes, - boolean sendCheckpoint) { - PathwayContext pathwayContext = span.context().getPathwayContext(); - if (pathwayContext == null - || (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) { - return; - } - pathwayContext.setCheckpoint( - sortedTags, - sendCheckpoint ? dataStreamsMonitoring::add : pathwayContext::saveStats, - defaultTimestamp, - payloadSizeBytes); - - boolean injected = injectPathwayContext(pathwayContext, carrier, setter); - - if (injected && pathwayContext.getHash() != 0) { - span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash())); - } - } - - private static boolean injectPathwayContext( - PathwayContext pathwayContext, C carrier, AgentPropagation.Setter setter) { - try { - String encodedContext = pathwayContext.encode(); - if (encodedContext != null) { - LOGGER.debug("Injecting pathway context {}", pathwayContext); - setter.set(carrier, PROPAGATION_KEY_BASE64, encodedContext); - return true; - } - } catch (IOException e) { - LOGGER.debug("Unable to set encode pathway context", e); - } - return false; - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsMonitoring.java index 8f7082a4270..82d5af02f04 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsMonitoring.java @@ -6,7 +6,6 @@ import datadog.trace.api.experimental.DataStreamsContextCarrier; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; -import datadog.trace.core.propagation.HttpCodec; public interface DataStreamsMonitoring extends AgentDataStreamsMonitoring, AutoCloseable { void start(); @@ -18,21 +17,6 @@ public interface DataStreamsMonitoring extends AgentDataStreamsMonitoring, AutoC */ Propagator propagator(); - /** - * Get a context extractor that support {@link PathwayContext} extraction. - * - * @param delegate The extractor to delegate the common trace context extraction. - * @return An extractor with DSM context extraction. - */ - HttpCodec.Extractor extractor(HttpCodec.Extractor delegate); - - /** - * Gets a context injector to propagate {@link PathwayContext}. - * - * @return A context injector if supported, {@code null} otherwise. - */ - DataStreamContextInjector injector(); - /** * Injects DSM {@link PathwayContext} into a span {@link AgentSpanContext}. * diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsPropagator.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsPropagator.java index 9769d55e573..65de974d367 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsPropagator.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsPropagator.java @@ -10,11 +10,13 @@ import datadog.trace.api.TraceConfig; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.PathwayContext; +import datadog.trace.api.datastreams.StatsPoint; import datadog.trace.api.time.TimeSource; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.TagContext; import java.io.IOException; +import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.ParametersAreNonnullByDefault; @@ -54,15 +56,10 @@ public void inject(Context context, C carrier, CarrierSetter setter) { return; } - // TODO Allow set checkpoint to use DsmContext as parameter? - pathwayContext.setCheckpoint( - dsmContext.sortedTags(), - dsmContext.sendCheckpoint() ? dataStreamsMonitoring::add : pathwayContext::saveStats, - dsmContext.defaultTimestamp(), - dsmContext.payloadSizeBytes()); - + Consumer pointConsumer = + dsmContext.sendCheckpoint() ? this.dataStreamsMonitoring::add : pathwayContext::saveStats; + pathwayContext.setCheckpoint(dsmContext, pointConsumer); boolean injected = injectPathwayContext(pathwayContext, carrier, setter); - if (injected && pathwayContext.getHash() != 0) { span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash())); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index a537a9fb25c..ae392ff94ed 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -1,6 +1,7 @@ package datadog.trace.core.datastreams; import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V01_DATASTREAMS_ENDPOINT; +import static datadog.trace.api.datastreams.DataStreamsContext.fromTags; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN; import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT; @@ -19,6 +20,7 @@ import datadog.trace.api.TraceConfig; import datadog.trace.api.WellKnownTags; import datadog.trace.api.datastreams.Backlog; +import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.InboxItem; import datadog.trace.api.datastreams.NoopPathwayContext; import datadog.trace.api.datastreams.PathwayContext; @@ -33,7 +35,6 @@ import datadog.trace.common.metrics.Sink; import datadog.trace.core.DDSpan; import datadog.trace.core.DDTraceCoreInfo; -import datadog.trace.core.propagation.HttpCodec; import datadog.trace.util.AgentTaskScheduler; import java.util.ArrayList; import java.util.Collections; @@ -68,8 +69,8 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private final long hashOfKnownTags; private final Supplier traceConfigSupplier; private final long bucketDurationNanos; - private final DataStreamContextInjector injector; private final Thread thread; + private final DataStreamsPropagator propagator; private AgentTaskScheduler.Scheduled cancellation; private volatile long nextFeatureCheck; private volatile boolean supportsDataStreams = false; @@ -128,11 +129,18 @@ public DefaultDataStreamsMonitoring( this.hashOfKnownTags = DefaultPathwayContext.getBaseHash(wellKnownTags); this.payloadWriter = payloadWriter; this.bucketDurationNanos = bucketDurationNanos; - this.injector = new DataStreamContextInjector(this); thread = newAgentThread(DATA_STREAMS_MONITORING, new InboxProcessor()); sink.register(this); schemaSamplers = new ConcurrentHashMap<>(); + + this.propagator = + new DataStreamsPropagator( + this, + this.traceConfigSupplier, + this.timeSource, + this.hashOfKnownTags, + getThreadServiceName()); } @Override @@ -203,23 +211,7 @@ public PathwayContext newPathwayContext() { @Override public Propagator propagator() { - return new DataStreamsPropagator( - this, - this.traceConfigSupplier, - this.timeSource, - this.hashOfKnownTags, - getThreadServiceName()); - } - - @Override - public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) { - return new DataStreamContextExtractor( - delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName()); - } - - @Override - public DataStreamContextInjector injector() { - return this.injector; + return this.propagator; } @Override @@ -249,14 +241,10 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { } @Override - public void setCheckpoint( - AgentSpan span, - LinkedHashMap sortedTags, - long defaultTimestamp, - long payloadSizeBytes) { + public void setCheckpoint(AgentSpan span, DataStreamsContext context) { PathwayContext pathwayContext = span.context().getPathwayContext(); if (pathwayContext != null) { - pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp, payloadSizeBytes); + pathwayContext.setCheckpoint(context, this::add); } } @@ -280,7 +268,7 @@ public void setConsumeCheckpoint(String type, String source, DataStreamsContextC sortedTags.put(TOPIC_TAG, source); sortedTags.put(TYPE_TAG, type); - setCheckpoint(span, sortedTags, 0, 0); + setCheckpoint(span, fromTags(sortedTags)); } public void setProduceCheckpoint( @@ -304,8 +292,9 @@ public void setProduceCheckpoint( sortedTags.put(TOPIC_TAG, target); sortedTags.put(TYPE_TAG, type); - this.injector.injectPathwayContext( - span, carrier, DataStreamsContextCarrierAdapter.INSTANCE, sortedTags); + DataStreamsContext dsmContext = fromTags(sortedTags); + this.propagator.inject( + span.with(dsmContext), carrier, DataStreamsContextCarrierAdapter.INSTANCE); } @Override diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 7db93af2518..aad8be72b5f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -10,6 +10,7 @@ import datadog.context.propagation.CarrierVisitor; import datadog.trace.api.Config; import datadog.trace.api.WellKnownTags; +import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.PathwayContext; import datadog.trace.api.datastreams.StatsPoint; import datadog.trace.api.time.TimeSource; @@ -105,37 +106,21 @@ public long getHash() { } @Override - public void setCheckpoint( - LinkedHashMap sortedTags, Consumer pointConsumer) { - setCheckpoint(sortedTags, pointConsumer, 0, 0); - } - - @Override - public void setCheckpoint( - LinkedHashMap sortedTags, - Consumer pointConsumer, - long defaultTimestamp) { - setCheckpoint(sortedTags, pointConsumer, defaultTimestamp, 0); - } - - @Override - public void setCheckpoint( - LinkedHashMap sortedTags, - Consumer pointConsumer, - long defaultTimestamp, - long payloadSizeBytes) { + public void setCheckpoint(DataStreamsContext context, Consumer pointConsumer) { long startNanos = timeSource.getCurrentTimeNanos(); long nanoTicks = timeSource.getNanoTicks(); lock.lock(); try { // So far, each tag key has only one tag value, so we're initializing the capacity to match // the number of tag keys for now. We should revisit this later if it's no longer the case. + LinkedHashMap sortedTags = context.sortedTags(); List allTags = new ArrayList<>(sortedTags.size()); PathwayHashBuilder pathwayHashBuilder = new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride); DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder(); if (!started) { + long defaultTimestamp = context.defaultTimestamp(); if (defaultTimestamp == 0) { pathwayStartNanos = startNanos; pathwayStartNanoTicks = nanoTicks; @@ -196,7 +181,7 @@ public void setCheckpoint( startNanos, pathwayLatencyNano, edgeLatencyNano, - payloadSizeBytes, + context.payloadSizeBytes(), serviceNameOverride); edgeStartNanoTicks = nanoTicks; hash = newHash; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java b/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java deleted file mode 100644 index 90f43736c38..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java +++ /dev/null @@ -1,45 +0,0 @@ -package datadog.trace.core.propagation; - -import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import datadog.trace.core.datastreams.DataStreamContextInjector; -import java.util.LinkedHashMap; - -public class CorePropagation implements AgentPropagation { - private final DataStreamContextInjector dataStreamContextInjector; - - /** - * Constructor - * - * @param dataStreamContextInjector The DSM context injector, as a specific object until generic - * context injection is available. - */ - public CorePropagation(DataStreamContextInjector dataStreamContextInjector) { - this.dataStreamContextInjector = dataStreamContextInjector; - } - - @Override - public void injectPathwayContext( - AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) { - this.dataStreamContextInjector.injectPathwayContext(span, carrier, setter, sortedTags); - } - - @Override - public void injectPathwayContext( - AgentSpan span, - C carrier, - Setter setter, - LinkedHashMap sortedTags, - long defaultTimestamp, - long payloadSizeBytes) { - this.dataStreamContextInjector.injectPathwayContext( - span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes); - } - - @Override - public void injectPathwayContextWithoutSendingStats( - AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) { - this.dataStreamContextInjector.injectPathwayContextWithoutSendingStats( - span, carrier, setter, sortedTags); - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index c50a5145c62..3761944b6ba 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -5,20 +5,22 @@ import datadog.trace.api.Config import datadog.trace.api.DDTraceId import datadog.trace.api.TraceConfig import datadog.trace.api.WellKnownTags +import datadog.trace.api.datastreams.StatsPoint import datadog.trace.api.time.ControllableTimeSource import datadog.trace.bootstrap.instrumentation.api.AgentPropagation -import datadog.trace.api.datastreams.PathwayContext -import datadog.trace.api.datastreams.StatsPoint -import datadog.trace.bootstrap.instrumentation.api.TagContext +import datadog.trace.bootstrap.instrumentation.api.AgentSpan import datadog.trace.common.metrics.Sink import datadog.trace.core.propagation.ExtractedContext -import datadog.trace.core.propagation.HttpCodec import datadog.trace.core.test.DDCoreSpecification import java.util.function.Consumer +import static datadog.context.Context.root import static datadog.trace.api.TracePropagationStyle.DATADOG import static datadog.trace.api.config.GeneralConfig.PRIMARY_TAG +import static datadog.trace.api.datastreams.DataStreamsContext.create +import static datadog.trace.api.datastreams.DataStreamsContext.fromTags +import static datadog.trace.api.datastreams.PathwayContext.PROPAGATION_KEY_BASE64 import static java.util.concurrent.TimeUnit.MILLISECONDS class DefaultPathwayContextTest extends DDCoreSpecification { @@ -49,7 +51,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(50) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) then: context.isStarted() @@ -64,10 +66,10 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(50) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) timeSource.advance(25) context.setCheckpoint( - new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) + fromTags(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"])), pointConsumer) then: context.isStarted() @@ -91,7 +93,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(25) context.setCheckpoint( - new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer, 0, 72) + create(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), 0, 72), + pointConsumer) then: context.isStarted() @@ -111,13 +114,13 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(50) - context.setCheckpoint(new LinkedHashMap<>(["direction": "out", "type": "kafka"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["direction": "out", "type": "kafka"])), pointConsumer) timeSource.advance(25) context.setCheckpoint( - new LinkedHashMap<>(["direction": "in", "group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) + fromTags(new LinkedHashMap<>(["direction": "in", "group": "group", "topic": "topic", "type": "kafka"])), pointConsumer) timeSource.advance(30) context.setCheckpoint( - new LinkedHashMap<>(["direction": "in", "group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) + fromTags(new LinkedHashMap<>(["direction": "in", "group": "group", "topic": "topic", "type": "kafka"])), pointConsumer) then: context.isStarted() @@ -163,12 +166,12 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "in"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "in"])), pointConsumer) def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(2)) def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) - context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "out"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "out"])), pointConsumer) then: decodedContext.isStarted() @@ -189,12 +192,12 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(2)) def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) - context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"])), pointConsumer) then: decodedContext.isStarted() @@ -216,7 +219,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def context = new DefaultPathwayContext(timeSource, baseHash, null) def timeFromQueue = timeSource.getCurrentTimeMillis() - 200 when: - context.setCheckpoint(["type": "internal"], pointConsumer, timeFromQueue) + context.setCheckpoint(create(["type": "internal"], timeFromQueue, 0), pointConsumer) then: context.isStarted() pointConsumer.points.size() == 1 @@ -238,13 +241,13 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(1)) def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) - context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"])), pointConsumer) then: decodedContext.isStarted() @@ -263,7 +266,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { timeSource.advance(MILLISECONDS.toNanos(2)) def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, null, secondEncode) timeSource.advance(MILLISECONDS.toNanos(30)) - context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"])), pointConsumer) then: secondDecode.isStarted() @@ -287,14 +290,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) def encoded = context.encode() - Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] + Map carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) - context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"])), pointConsumer) then: decodedContext.isStarted() @@ -310,11 +313,11 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: def secondEncode = decodedContext.encode() - carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] + carrier = [(PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) - context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"])), pointConsumer) then: secondDecode.isStarted() @@ -338,14 +341,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) def encoded = context.encode() - Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] + Map carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) - context.setCheckpoint(new LinkedHashMap<>(["topic": "topic", "type": "sqs"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["topic": "topic", "type": "sqs"])), pointConsumer) then: decodedContext.isStarted() @@ -361,11 +364,11 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: def secondEncode = decodedContext.encode() - carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] + carrier = [(PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) - context.setCheckpoint(new LinkedHashMap<>(["topic": "topicB", "type": "sqs"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["topic": "topicB", "type": "sqs"])), pointConsumer) then: secondDecode.isStarted() @@ -387,11 +390,11 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(50) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) timeSource.advance(25) - context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "type"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "type"])), pointConsumer) timeSource.advance(25) - context.setCheckpoint(new LinkedHashMap<>(), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>()), pointConsumer) then: context.isStarted() @@ -446,16 +449,24 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) def encoded = context.encode() - Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] + Map carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] def contextVisitor = new Base64MapContextVisitor() + def spanContext = new ExtractedContext(DDTraceId.ONE, 1, 0, null, 0, null, null, null, null, localTraceConfig, DATADOG) + def baseContext = AgentSpan.fromSpanContext(spanContext).storeInto(root()) + def propagator = dataStreams.propagator() + when: - def extractor = new FakeExtractor() - extractor.traceConfig = localTraceConfig - def decorated = dataStreams.extractor(extractor) - def extracted = decorated.extract(carrier, contextVisitor) + def extractedContext = propagator.extract(baseContext, carrier, contextVisitor) + def extractedSpan = AgentSpan.fromContext(extractedContext) + + then: + extractedSpan != null + + when: + def extracted = extractedSpan.context() then: extracted != null @@ -492,24 +503,25 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) def encoded = context.encode() - Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] + Map carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] def contextVisitor = new Base64MapContextVisitor() - def extractor = new NullExtractor() - def decorated = dataStreams.extractor(extractor) + def propagator = dataStreams.propagator() when: - def extracted = decorated.extract(carrier, contextVisitor) + def extractedContext = propagator.extract(root(), carrier, contextVisitor) + def extractedSpan = AgentSpan.fromContext(extractedContext) then: - if (globalDsmEnabled) { + extractedSpan != null + def extracted = extractedSpan.context() extracted != null extracted.pathwayContext != null extracted.pathwayContext.isStarted() } else { - extracted == null + extractedSpan == null } where: @@ -533,15 +545,24 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + context.setCheckpoint(fromTags(new LinkedHashMap<>(["type": "internal"])), pointConsumer) def encoded = context.encode() - Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] + Map carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] def contextVisitor = new Base64MapContextVisitor() - def extractor = new FakeExtractor() - def decorated = dataStreams.extractor(extractor) + def spanContext = new ExtractedContext(DDTraceId.ONE, 1, 0, null, 0, null, null, null, null, null, DATADOG) + def baseContext = AgentSpan.fromSpanContext(spanContext).storeInto(root()) + def propagator = dataStreams.propagator() + + + when: + def extractedContext = propagator.extract(baseContext, carrier, contextVisitor) + def extractedSpan = AgentSpan.fromContext(extractedContext) + + then: + extractedSpan != null when: - def extracted = decorated.extract(carrier, contextVisitor) + def extracted = extractedSpan.context() then: extracted != null @@ -573,30 +594,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { Map carrier = ["someotherkey": "someothervalue"] def contextVisitor = new Base64MapContextVisitor() - def extractor = new NullExtractor() - def decorated = dataStreams.extractor(extractor) + def propagator = dataStreams.propagator() when: - def extracted = decorated.extract(carrier, contextVisitor) + def extractedContext = propagator.extract(root(), carrier, contextVisitor) + def extractedSpan = AgentSpan.fromContext(extractedContext) then: - extracted == null - } - - class FakeExtractor implements HttpCodec.Extractor { - TraceConfig traceConfig - - @Override - TagContext extract(C carrier, AgentPropagation.ContextVisitor getter) { - return new ExtractedContext(DDTraceId.ONE, 1, 0, null, 0, null, null, null, null, traceConfig, DATADOG) - } - } - - class NullExtractor implements HttpCodec.Extractor { - @Override - TagContext extract(C carrier, AgentPropagation.ContextVisitor getter) { - return null - } + extractedSpan == null } class Base64MapContextVisitor implements AgentPropagation.ContextVisitor> { diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java index d1683696700..e6ddac36bac 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java @@ -13,19 +13,9 @@ public interface AgentDataStreamsMonitoring extends DataStreamsCheckpointer { * Sets data streams checkpoint, used for both produce and consume operations. * * @param span active span - * @param sortedTags alphabetically sorted tags for the checkpoint (direction, queue type etc) - * @param defaultTimestamp unix timestamp to use as a start of the pathway if this is the first - * checkpoint in the chain. Zero should be passed if we can't extract the timestamp from the - * message / payload itself (for instance: produce operations; http produce / consume etc). - * Value will be ignored for checkpoints happening not at the start of the pipeline. - * @param payloadSizeBytes size of the message (body + headers) in bytes. Zero should be passed if - * the size cannot be evaluated. + * @param context the data streams context */ - void setCheckpoint( - AgentSpan span, - LinkedHashMap sortedTags, - long defaultTimestamp, - long payloadSizeBytes); + void setCheckpoint(AgentSpan span, DataStreamsContext context); PathwayContext newPathwayContext(); diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsContext.java b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsContext.java index 8cd0d6c33e0..ff9118df05d 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsContext.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsContext.java @@ -18,11 +18,29 @@ public static DataStreamsContext fromContext(Context context) { return context.get(CONTEXT_KEY); } + /** + * Creates a DSM context. + * + * @param sortedTags alphabetically sorted tags for the checkpoint (direction, queue type etc) + * @return the created context. + */ public static DataStreamsContext fromTags(LinkedHashMap sortedTags) { return new DataStreamsContext(sortedTags, 0, 0, true); } - public static DataStreamsContext fromKafka( + /** + * Creates a DSM context. + * + * @param sortedTags alphabetically sorted tags for the checkpoint (direction, queue type etc) + * @param defaultTimestamp unix timestamp to use as a start of the pathway if this is the first + * checkpoint in the chain. Zero should be passed if we can't extract the timestamp from the + * message / payload itself (for instance: produce operations; http produce / consume etc). + * Value will be ignored for checkpoints happening not at the start of the pipeline. + * @param payloadSizeBytes size of the message (body + headers) in bytes. Zero should be passed if + * the size cannot be evaluated. + * @return the created context. + */ + public static DataStreamsContext create( LinkedHashMap sortedTags, long defaultTimestamp, long payloadSizeBytes) { return new DataStreamsContext(sortedTags, defaultTimestamp, payloadSizeBytes, true); } diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java index 88737bad3b3..020b492639d 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java @@ -13,11 +13,7 @@ public class NoopDataStreamsMonitoring implements AgentDataStreamsMonitoring { public void trackBacklog(LinkedHashMap sortedTags, long value) {} @Override - public void setCheckpoint( - AgentSpan span, - LinkedHashMap sortedTags, - long defaultTimestamp, - long payloadSizeBytes) {} + public void setCheckpoint(AgentSpan span, DataStreamsContext context) {} @Override public PathwayContext newPathwayContext() { diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopPathwayContext.java b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopPathwayContext.java index 84ccaac93c9..08e28ac83b0 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopPathwayContext.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopPathwayContext.java @@ -1,6 +1,5 @@ package datadog.trace.api.datastreams; -import java.util.LinkedHashMap; import java.util.function.Consumer; public class NoopPathwayContext implements PathwayContext { @@ -17,21 +16,7 @@ public long getHash() { } @Override - public void setCheckpoint( - LinkedHashMap sortedTags, - Consumer pointConsumer, - long defaultTimestamp, - long payloadSizeBytes) {} - - @Override - public void setCheckpoint( - LinkedHashMap sortedTags, - Consumer pointConsumer, - long defaultTimestamp) {} - - @Override - public void setCheckpoint( - LinkedHashMap sortedTags, Consumer pointConsumer) {} + public void setCheckpoint(DataStreamsContext context, Consumer pointConsumer) {} @Override public void saveStats(StatsPoint point) {} diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/PathwayContext.java b/internal-api/src/main/java/datadog/trace/api/datastreams/PathwayContext.java index da3c5bbca1e..f5a373e70ec 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/PathwayContext.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/PathwayContext.java @@ -1,7 +1,6 @@ package datadog.trace.api.datastreams; import java.io.IOException; -import java.util.LinkedHashMap; import java.util.function.Consumer; public interface PathwayContext { @@ -12,19 +11,7 @@ public interface PathwayContext { long getHash(); - void setCheckpoint( - LinkedHashMap sortedTags, - Consumer pointConsumer, - long defaultTimestamp, - long payloadSizeBytes); - - void setCheckpoint( - LinkedHashMap sortedTags, - Consumer pointConsumer, - long defaultTimestamp); - - // The input tags should be sorted. - void setCheckpoint(LinkedHashMap sortedTags, Consumer pointConsumer); + void setCheckpoint(DataStreamsContext context, Consumer pointConsumer); void saveStats(StatsPoint point); diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java index 0fd8cbca94f..091c7465260 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java @@ -8,7 +8,6 @@ import datadog.context.propagation.CarrierVisitor; import datadog.context.propagation.Concern; import datadog.context.propagation.Propagators; -import java.util.LinkedHashMap; import java.util.function.BiConsumer; import javax.annotation.ParametersAreNonnullByDefault; @@ -20,21 +19,6 @@ public interface AgentPropagation { // TODO into the span context for now. Remove priority after the migration is complete. Concern DSM_CONCERN = withPriority("data-stream-monitoring", 110); - // The input tags should be sorted. - void injectPathwayContext( - AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags); - - void injectPathwayContext( - AgentSpan span, - C carrier, - Setter setter, - LinkedHashMap sortedTags, - long defaultTimestamp, - long payloadSizeBytes); - - void injectPathwayContextWithoutSendingStats( - AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags); - interface Setter extends CarrierSetter { void set(C carrier, String key, String value); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index 7208023ca93..fdc201c10c4 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -21,7 +21,6 @@ import datadog.trace.context.TraceScope; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -560,23 +559,6 @@ public void updatePreferredServiceName(String serviceName) { static class NoopAgentPropagation implements AgentPropagation { static final NoopAgentPropagation INSTANCE = new NoopAgentPropagation(); - - @Override - public void injectPathwayContext( - AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) {} - - @Override - public void injectPathwayContext( - AgentSpan span, - C carrier, - Setter setter, - LinkedHashMap sortedTags, - long defaultTimestamp, - long payloadSizeBytes) {} - - @Override - public void injectPathwayContextWithoutSendingStats( - AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) {} } static class NoopContinuation implements AgentScope.Continuation { diff --git a/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsContextTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsContextTest.groovy index 8d3240ff8b4..a565e0ed5e0 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsContextTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsContextTest.groovy @@ -29,7 +29,7 @@ class DataStreamsContextTest extends Specification { when: def timestamp = 123L def payloadSize = 456L - dsmContext = DataStreamsContext.fromKafka(tags, timestamp, payloadSize) + dsmContext = DataStreamsContext.create(tags, timestamp, payloadSize) then: dsmContext.sortedTags() == tags