Skip to content

Commit 1659cd3

Browse files
Add fallbacks when retrieving databricks ids (#5910)
Add fallbacks when extracting databricks_job_id and databricks_task_run_id by adding searching in other properties if spark.databricks.job.id and spark.databricks.job.runId are not present - spark.jobGroup.id that has the pattern <scheduler_id>_job-<job_id>-run-<task_run_id>-action-<action_id> - spark.databricks.workload.id that has the pattern <org_id>-<job_id>-<task_run_id> For databricks_job_run_id it will use: - The new property spark.databricks.job.parentRunId as the default - The current method of extracting the id present in the clusterName of job clusters as a fallback The properties spark.databricks.job.id and spark.databricks.job.runId set by databricks are missing in some customer environnements The property spark.databricks.job.parentRunId is a more robust way to extract the databricks_job_run_id
1 parent 050e5c9 commit 1659cd3

File tree

3 files changed

+138
-20
lines changed

3 files changed

+138
-20
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/DatadogSparkListener.java

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,9 @@ private void addDatabricksSpecificTags(
201201
captureJobParameters(builder, properties);
202202

203203
if (properties != null) {
204-
String databricksJobId = properties.getProperty("spark.databricks.job.id");
204+
String databricksJobId = getDatabricksJobId(properties);
205205
String databricksJobRunId = getDatabricksJobRunId(properties, databricksClusterName);
206-
207-
// spark.databricks.job.runId is the runId of the task, not of the Job
208-
String databricksTaskRunId = properties.getProperty("spark.databricks.job.runId");
206+
String databricksTaskRunId = getDatabricksTaskRunId(properties);
209207

210208
// ids to link those spans to databricks job/task traces
211209
builder.withTag("databricks_job_id", databricksJobId);
@@ -769,11 +767,48 @@ private long stageSpanKey(int stageId, int attemptId) {
769767
return ((long) stageId << 32) + attemptId;
770768
}
771769

770+
@SuppressForbidden // split with one-char String use a fast-path without regex usage
771+
private static String getDatabricksJobId(Properties properties) {
772+
String jobId = properties.getProperty("spark.databricks.job.id");
773+
if (jobId != null) {
774+
return jobId;
775+
}
776+
777+
// First fallback, use spark.jobGroup.id with the pattern
778+
// <scheduler_id>_job-<job_id>-run-<task_run_id>-action-<action_id>
779+
String jobGroupId = properties.getProperty("spark.jobGroup.id");
780+
if (jobGroupId != null) {
781+
int startIndex = jobGroupId.indexOf("job-");
782+
int endIndex = jobGroupId.indexOf("-run", startIndex);
783+
if (startIndex != -1 && endIndex != -1) {
784+
return jobGroupId.substring(startIndex + 4, endIndex);
785+
}
786+
}
787+
788+
// Second fallback, use spark.databricks.workload.id with pattern
789+
// <org_id>-<job_id>-<task_run_id>
790+
String workloadId = properties.getProperty("spark.databricks.workload.id");
791+
if (workloadId != null) {
792+
String[] parts = workloadId.split("-");
793+
if (parts.length > 1) {
794+
return parts[1];
795+
}
796+
}
797+
798+
return null;
799+
}
800+
772801
@SuppressForbidden // split with one-char String use a fast-path without regex usage
773802
private static String getDatabricksJobRunId(
774803
Properties jobProperties, String databricksClusterName) {
775-
String clusterName =
776-
(String) jobProperties.get("spark.databricks.clusterUsageTags.clusterName");
804+
String jobRunId = jobProperties.getProperty("spark.databricks.job.parentRunId");
805+
if (jobRunId != null) {
806+
return jobRunId;
807+
}
808+
809+
// Fallback, extract the jobRunId from the cluster name for job clusters having the pattern
810+
// job-<job_id>-run-<job_run_id>
811+
String clusterName = jobProperties.getProperty("spark.databricks.clusterUsageTags.clusterName");
777812

778813
// Using the databricksClusterName as fallback, if not present in jobProperties
779814
clusterName = (clusterName == null) ? databricksClusterName : clusterName;
@@ -791,6 +826,38 @@ private static String getDatabricksJobRunId(
791826
return null;
792827
}
793828

829+
@SuppressForbidden // split with one-char String use a fast-path without regex usage
830+
private static String getDatabricksTaskRunId(Properties properties) {
831+
// spark.databricks.job.runId is the runId of the task, not of the Job
832+
String taskRunId = properties.getProperty("spark.databricks.job.runId");
833+
if (taskRunId != null) {
834+
return taskRunId;
835+
}
836+
837+
// First fallback, use spark.jobGroup.id with the pattern
838+
// <scheduler_id>_job-<job_id>-run-<task_run_id>-action-<action_id>
839+
String jobGroupId = properties.getProperty("spark.jobGroup.id");
840+
if (jobGroupId != null) {
841+
int startIndex = jobGroupId.indexOf("run-");
842+
int endIndex = jobGroupId.indexOf("-action", startIndex);
843+
if (startIndex != -1 && endIndex != -1) {
844+
return jobGroupId.substring(startIndex + 4, endIndex);
845+
}
846+
}
847+
848+
// Second fallback, use spark.databricks.workload.id with pattern
849+
// <org_id>-<job_id>-<task_run_id>
850+
String workloadId = properties.getProperty("spark.databricks.workload.id");
851+
if (workloadId != null) {
852+
String[] parts = workloadId.split("-");
853+
if (parts.length > 2) {
854+
return parts[2];
855+
}
856+
}
857+
858+
return null;
859+
}
860+
794861
private String stackTraceToString(Throwable e) {
795862
StringWriter stringWriter = new StringWriter();
796863
e.printStackTrace(new PrintWriter(stringWriter));

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@ class SparkConfAllowList {
6666
"spark.databricks.clusterUsageTags.sparkVersion",
6767
"spark.databricks.clusterUsageTags.workerEnvironmentId",
6868
"spark.databricks.env",
69+
"spark.databricks.job.parentRunId",
6970
"spark.databricks.job.type",
7071
"spark.databricks.sparkContextId",
72+
"spark.databricks.workload.name",
7173
"spark.job.description",
7274
"spark.jobGroup.id",
7375
"spark.sql.execution.id",

dd-java-agent/instrumentation/spark/src/test/groovy/SparkTest.groovy

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -223,63 +223,112 @@ class SparkTest extends AgentTestRunner {
223223
.config("spark.default.parallelism", "2") // Small parallelism to speed up tests
224224
.config("spark.sql.shuffle.partitions", "2")
225225
.config("spark.databricks.sparkContextId", "some_id")
226-
.config("spark.databricks.clusterUsageTags.clusterName", "job-1234-run-5678-Job_cluster")
226+
.config("spark.databricks.clusterUsageTags.clusterName", "job-1234-run-8765-Job_cluster")
227227
.getOrCreate()
228228

229229
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.id", "1234")
230230
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.runId", "9012")
231+
sparkSession.sparkContext().setLocalProperty("spark.jobGroup.id", "0000_job-3456-run-7890-action-0000")
232+
sparkSession.sparkContext().setLocalProperty("spark.databricks.workload.id", "01-123-456")
233+
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.parentRunId", "5678")
234+
sparkSession.sparkContext().setLocalProperty("spark.databricks.clusterUsageTags.clusterName", "job-1234-run-901-Job_cluster")
231235
TestSparkComputation.generateTestSparkComputation(sparkSession)
232236

233237
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.id", null)
234238
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.runId", null)
239+
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.parentRunId", null)
240+
TestSparkComputation.generateTestSparkComputation(sparkSession)
241+
242+
sparkSession.sparkContext().setLocalProperty("spark.jobGroup.id", null)
243+
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.parentRunId", null)
244+
sparkSession.sparkContext().setLocalProperty("spark.databricks.clusterUsageTags.clusterName", null)
245+
TestSparkComputation.generateTestSparkComputation(sparkSession)
246+
247+
sparkSession.sparkContext().setLocalProperty("spark.databricks.workload.id", null)
235248
TestSparkComputation.generateTestSparkComputation(sparkSession)
236249

237250
expect:
238-
assertTraces(2) {
251+
assertTraces(4) {
239252
trace(3) {
240253
span {
241254
operationName "spark.job"
242-
resourceName "count at TestSparkComputation.java:17"
243255
spanType "spark"
244-
errored false
245256
traceId 8944764253919609482G
246257
parentSpanId 15104224823446433673G
258+
assert span.tags["databricks_job_id"] == "1234"
259+
assert span.tags["databricks_job_run_id"] == "5678"
260+
assert span.tags["databricks_task_run_id"] == "9012"
261+
}
262+
span {
263+
operationName "spark.stage"
264+
spanType "spark"
265+
childOf(span(0))
266+
}
267+
span {
268+
operationName "spark.stage"
269+
spanType "spark"
270+
childOf(span(0))
271+
}
272+
}
273+
trace(3) {
274+
span {
275+
operationName "spark.job"
276+
spanType "spark"
277+
traceId 5240384461065211484G
278+
parentSpanId 14128229261586201946G
279+
assert span.tags["databricks_job_id"] == "3456"
280+
assert span.tags["databricks_job_run_id"] == "901"
281+
assert span.tags["databricks_task_run_id"] == "7890"
282+
}
283+
span {
284+
operationName "spark.stage"
285+
spanType "spark"
286+
childOf(span(0))
287+
}
288+
span {
289+
operationName "spark.stage"
290+
spanType "spark"
291+
childOf(span(0))
292+
}
293+
}
294+
trace(3) {
295+
span {
296+
operationName "spark.job"
297+
spanType "spark"
298+
traceId 2235374731114184741G
299+
parentSpanId 8956125882166502063G
300+
assert span.tags["databricks_job_id"] == "123"
301+
assert span.tags["databricks_job_run_id"] == "8765"
302+
assert span.tags["databricks_task_run_id"] == "456"
247303
}
248304
span {
249305
operationName "spark.stage"
250-
resourceName "count at TestSparkComputation.java:17"
251306
spanType "spark"
252-
errored false
253307
childOf(span(0))
254308
}
255309
span {
256310
operationName "spark.stage"
257-
resourceName "distinct at TestSparkComputation.java:17"
258311
spanType "spark"
259-
errored false
260312
childOf(span(0))
261313
}
262314
}
263315
trace(3) {
264316
span {
265317
operationName "spark.job"
266-
resourceName "count at TestSparkComputation.java:17"
267318
spanType "spark"
268-
errored false
269319
parent()
320+
assert span.tags["databricks_job_id"] == null
321+
assert span.tags["databricks_job_run_id"] == "8765"
322+
assert span.tags["databricks_task_run_id"] == null
270323
}
271324
span {
272325
operationName "spark.stage"
273-
resourceName "count at TestSparkComputation.java:17"
274326
spanType "spark"
275-
errored false
276327
childOf(span(0))
277328
}
278329
span {
279330
operationName "spark.stage"
280-
resourceName "distinct at TestSparkComputation.java:17"
281331
spanType "spark"
282-
errored false
283332
childOf(span(0))
284333
}
285334
}

0 commit comments

Comments
 (0)