Skip to content

Commit 6856dc5

Browse files
committed
Reverted instrumentation-specific code; switched to thread local
1 parent 0e9f3b6 commit 6856dc5

File tree

8 files changed

+24
-152
lines changed

8 files changed

+24
-152
lines changed

dd-java-agent/instrumentation/spark-executor/build.gradle

-11
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,13 @@ ext {
3333
dependencies {
3434
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
3535
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
36-
compileOnly group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0"
3736

3837
baseTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: "2.4.0"
3938
baseTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "2.4.0"
40-
baseTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "2.4.0"
41-
baseTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0"
42-
testImplementation group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0'
43-
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.0'
44-
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.4.0.RELEASE'
45-
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.4.0.RELEASE'
4639

4740
latest212DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: '3.+'
4841
latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: '3.+'
49-
latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "3.+"
50-
latest212DepTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0"
5142

5243
latest213DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.13", version: '3.+'
5344
latest213DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.13", version: '3.+'
54-
latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.13", version: "3.+"
55-
latest212DepTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.13', version: "3.+"
5645
}

dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy

-72
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,17 @@
11
import datadog.trace.agent.test.AgentTestRunner
22
import datadog.trace.bootstrap.instrumentation.api.Tags
3-
import org.apache.kafka.clients.producer.ProducerRecord
4-
import org.apache.spark.api.java.function.VoidFunction2
53
import org.apache.spark.sql.Dataset
64
import org.apache.spark.sql.Row
75
import org.apache.spark.sql.RowFactory
86
import org.apache.spark.sql.SparkSession
9-
import org.apache.spark.sql.streaming.Trigger
107
import org.apache.spark.sql.types.StructType
11-
import org.junit.ClassRule
12-
import org.springframework.kafka.core.DefaultKafkaProducerFactory
13-
import org.springframework.kafka.test.EmbeddedKafkaBroker
14-
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
15-
import org.springframework.kafka.test.utils.KafkaTestUtils
16-
import spock.lang.Shared
17-
188

199
class SparkExecutorTest extends AgentTestRunner {
20-
static final SOURCE_TOPIC = "source"
21-
static final SINK_TOPIC = "sink"
22-
23-
@Shared
24-
@ClassRule
25-
EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, SOURCE_TOPIC, SINK_TOPIC)
26-
EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka
2710

2811
@Override
2912
void configurePreAgent() {
3013
super.configurePreAgent()
3114
injectSysConfig("dd.integration.spark-executor.enabled", "true")
32-
injectSysConfig("dd.integration.spark.enabled", "true")
33-
injectSysConfig("dd.integration.kafka.enabled", "true")
34-
injectSysConfig("dd.data.streams.enabled", "true")
35-
injectSysConfig("dd.trace.debug", "true")
3615
}
3716

3817
private Dataset<Row> generateSampleDataframe(SparkSession spark) {
@@ -44,57 +23,6 @@ class SparkExecutorTest extends AgentTestRunner {
4423
spark.createDataFrame(rows, structType)
4524
}
4625

47-
def "test dsm service name override"() {
48-
setup:
49-
def sparkSession = SparkSession.builder()
50-
.config("spark.master", "local[2]")
51-
.config("spark.driver.bindAddress", "localhost")
52-
// .config("spark.sql.shuffle.partitions", "2")
53-
.appName("test-app")
54-
.getOrCreate()
55-
56-
def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
57-
def producer = new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer()
58-
59-
when:
60-
for (int i = 0; i < 100; i++) {
61-
producer.send(new ProducerRecord<>(SOURCE_TOPIC, i, i.toString()))
62-
}
63-
producer.flush()
64-
65-
def df = sparkSession
66-
.readStream()
67-
.format("kafka")
68-
.option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString())
69-
.option("startingOffsets", "earliest")
70-
.option("failOnDataLoss", "false")
71-
.option("subscribe", SOURCE_TOPIC)
72-
.load()
73-
74-
def query = df
75-
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
76-
.writeStream()
77-
.format("kafka")
78-
.option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString())
79-
.option("checkpointLocation", "/tmp/" + System.currentTimeMillis().toString())
80-
.option("topic", SINK_TOPIC)
81-
.trigger(Trigger.Once())
82-
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
83-
@Override
84-
void call(Dataset<Row> rowDataset, Long aLong) throws Exception {
85-
rowDataset.show()
86-
rowDataset.write()
87-
}
88-
})
89-
.start()
90-
91-
query.processAllAvailable()
92-
93-
then:
94-
query.stop()
95-
producer.close()
96-
}
97-
9826
def "generate spark task run spans"() {
9927
setup:
10028
def sparkSession = SparkSession.builder()

dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java

+2-37
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,15 @@
11
package datadog.trace.instrumentation.spark;
22

33
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
4-
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
54
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
65
import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator;
7-
import datadog.trace.util.MethodHandles;
8-
import java.lang.invoke.MethodHandle;
9-
import java.util.Properties;
106
import org.apache.spark.executor.Executor;
117
import org.apache.spark.executor.TaskMetrics;
128

139
public class SparkExecutorDecorator extends BaseDecorator {
1410
public static final CharSequence SPARK_TASK = UTF8BytesString.create("spark.task");
1511
public static final CharSequence SPARK = UTF8BytesString.create("spark");
1612
public static SparkExecutorDecorator DECORATE = new SparkExecutorDecorator();
17-
private final String propSparkAppName = "spark.app.name";
18-
private static final String TASK_DESCRIPTION_CLASSNAME =
19-
"org.apache.spark.scheduler.TaskDescription";
20-
private static final MethodHandle propertiesField_mh = getFieldGetter();
21-
22-
private static MethodHandle getFieldGetter() {
23-
try {
24-
return new MethodHandles(Executor.class.getClassLoader())
25-
.privateFieldGetter(TASK_DESCRIPTION_CLASSNAME, "properties");
26-
} catch (Throwable ignored) {
27-
// should be already logged
28-
}
29-
return null;
30-
}
3113

3214
@Override
3315
protected String[] instrumentationNames() {
@@ -44,29 +26,12 @@ protected CharSequence component() {
4426
return SPARK;
4527
}
4628

47-
public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner, Object taskDescription) {
29+
public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) {
4830
span.setTag("task_id", taskRunner.taskId());
4931
span.setTag("task_thread_name", taskRunner.threadName());
50-
51-
if (taskDescription != null && propertiesField_mh != null) {
52-
try {
53-
Properties props = (Properties) propertiesField_mh.invoke(taskDescription);
54-
if (props != null) {
55-
String appName = props.getProperty(propSparkAppName);
56-
if (appName != null) {
57-
AgentTracer.get()
58-
.getDataStreamsMonitoring()
59-
.setThreadServiceName(taskRunner.getThreadId(), appName);
60-
}
61-
}
62-
} catch (Throwable ignored) {
63-
}
64-
}
6532
}
6633

6734
public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) {
68-
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(taskRunner.getThreadId());
69-
7035
// task is set by spark in run() by deserializing the task binary coming from the driver
7136
if (taskRunner.task() == null) {
7237
return;
@@ -85,7 +50,7 @@ public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) {
8550
span.setTag("app_attempt_id", taskRunner.task().appAttemptId().get());
8651
}
8752
span.setTag(
88-
"application_name", taskRunner.task().localProperties().getProperty(propSparkAppName));
53+
"application_name", taskRunner.task().localProperties().getProperty("spark.app.name"));
8954

9055
TaskMetrics metrics = taskRunner.task().metrics();
9156
span.setMetric("spark.executor_deserialize_time", metrics.executorDeserializeTime());

dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,11 @@ public void methodAdvice(MethodTransformer transformer) {
5252

5353
public static final class RunAdvice {
5454
@Advice.OnMethodEnter(suppress = Throwable.class)
55-
public static AgentScope enter(
56-
@Advice.FieldValue("taskDescription") final Object taskDescription,
57-
@Advice.This Executor.TaskRunner taskRunner) {
55+
public static AgentScope enter(@Advice.This Executor.TaskRunner taskRunner) {
5856
final AgentSpan span = startSpan("spark-executor", SPARK_TASK);
5957

6058
DECORATE.afterStart(span);
61-
DECORATE.onTaskStart(span, taskRunner, taskDescription);
59+
DECORATE.onTaskStart(span, taskRunner);
6260

6361
return activateSpan(span);
6462
}

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

+12-15
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
7575
private volatile boolean agentSupportsDataStreams = false;
7676
private volatile boolean configSupportsDataStreams = false;
7777
private final ConcurrentHashMap<String, SchemaSampler> schemaSamplers;
78-
private static final ConcurrentHashMap<Long, String> threadServiceNames =
79-
new ConcurrentHashMap<>();
78+
private static final ThreadLocal<String> serviceNameOverride = new ThreadLocal<>();
8079

8180
public DefaultDataStreamsMonitoring(
8281
Config config,
@@ -188,29 +187,28 @@ public void setProduceCheckpoint(String type, String target) {
188187
}
189188

190189
@Override
191-
public void setThreadServiceName(Long threadId, String serviceName) {
192-
// setting service name to null == removing the value
190+
public void setThreadServiceName(String serviceName) {
193191
if (serviceName == null) {
194-
clearThreadServiceName(threadId);
192+
clearThreadServiceName();
195193
return;
196194
}
197195

198-
threadServiceNames.put(threadId, serviceName);
196+
serviceNameOverride.set(serviceName);
199197
}
200198

201199
@Override
202-
public void clearThreadServiceName(Long threadId) {
203-
threadServiceNames.remove(threadId);
200+
public void clearThreadServiceName() {
201+
serviceNameOverride.remove();
204202
}
205203

206-
private static String getThreadServiceNameOverride() {
207-
return threadServiceNames.getOrDefault(Thread.currentThread().getId(), null);
204+
private static String getThreadServiceName() {
205+
return serviceNameOverride.get();
208206
}
209207

210208
@Override
211209
public PathwayContext newPathwayContext() {
212210
if (configSupportsDataStreams) {
213-
return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceNameOverride());
211+
return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceName());
214212
} else {
215213
return AgentTracer.NoopPathwayContext.INSTANCE;
216214
}
@@ -219,7 +217,7 @@ public PathwayContext newPathwayContext() {
219217
@Override
220218
public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) {
221219
return new DataStreamContextExtractor(
222-
delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceNameOverride());
220+
delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName());
223221
}
224222

225223
@Override
@@ -236,7 +234,7 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie
236234
DataStreamsContextCarrierAdapter.INSTANCE,
237235
this.timeSource,
238236
this.hashOfKnownTags,
239-
getThreadServiceNameOverride());
237+
getThreadServiceName());
240238
((DDSpan) span).context().mergePathwayContext(pathwayContext);
241239
}
242240
}
@@ -250,8 +248,7 @@ public void trackBacklog(LinkedHashMap<String, String> sortedTags, long value) {
250248
}
251249
tags.add(tag);
252250
}
253-
inbox.offer(
254-
new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceNameOverride()));
251+
inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName()));
255252
}
256253

257254
@Override

dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy

+2-2
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ class DataStreamsWritingTest extends DDCoreSpecification {
7979
when:
8080
def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig })
8181
dataStreams.start()
82-
dataStreams.setThreadServiceName(Thread.currentThread().getId(), serviceNameOverride)
82+
dataStreams.setThreadServiceName(serviceNameOverride)
8383
dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, serviceNameOverride))
8484
dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130)
8585
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
8686
// force flush
8787
dataStreams.report()
8888
dataStreams.close()
89-
dataStreams.clearThreadServiceName(Thread.currentThread().getId())
89+
dataStreams.clearThreadServiceName()
9090
then:
9191
conditions.eventually {
9292
assert requestBodies.size() == 1

internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,12 @@ void setCheckpoint(
4545

4646
/**
4747
* setServiceNameOverride is used override service name for all DataStreams payloads produced
48-
* within given thread
48+
* within Thread.currentThread()
4949
*
50-
* @param threadId thread Id
5150
* @param serviceName new service name to use for DSM checkpoints.
5251
*/
53-
void setThreadServiceName(Long threadId, String serviceName);
52+
void setThreadServiceName(String serviceName);
5453

55-
/**
56-
* clearThreadServiceName clears up threadId -> Service name mapping
57-
*
58-
* @param threadId thread Id
59-
*/
60-
void clearThreadServiceName(Long threadId);
54+
/** clearThreadServiceName clears up service name override for Thread.currentThread() */
55+
void clearThreadServiceName();
6156
}

internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1135,10 +1135,10 @@ public Schema getSchema(String schemaName, SchemaIterator iterator) {
11351135
public void setProduceCheckpoint(String type, String target) {}
11361136

11371137
@Override
1138-
public void setThreadServiceName(Long threadId, String serviceName) {}
1138+
public void setThreadServiceName(String serviceName) {}
11391139

11401140
@Override
1141-
public void clearThreadServiceName(Long threadId) {}
1141+
public void clearThreadServiceName() {}
11421142

11431143
@Override
11441144
public void setConsumeCheckpoint(

0 commit comments

Comments
 (0)