Skip to content

Commit db47c6e

Browse files
Warren Zhugengliangwang
Warren Zhu
authored andcommitted
[SPARK-32125][UI] Support get taskList by status in Web UI and SHS Rest API
### What changes were proposed in this pull request? Support fetching taskList by status as below: ``` /applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList?status=failed ``` ### Why are the changes needed? When there're large number of tasks in one stage, current api is hard to get taskList by status ### Does this PR introduce _any_ user-facing change? Yes. Updated monitoring doc. ### How was this patch tested? Added tests in `HistoryServerSuite` Closes apache#28942 from warrenzhu25/SPARK-32125. Authored-by: Warren Zhu <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent c28a6fa commit db47c6e

File tree

8 files changed

+1666
-7
lines changed

8 files changed

+1666
-7
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.api.v1;
19+
20+
import org.apache.spark.util.EnumUtil;
21+
22+
public enum TaskStatus {
23+
RUNNING,
24+
KILLED,
25+
FAILED,
26+
SUCCESS,
27+
UNKNOWN;
28+
29+
public static TaskStatus fromString(String str) {
30+
return EnumUtil.parseIgnoreCase(TaskStatus.class, str);
31+
}
32+
}

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

+12-4
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ private[spark] class AppStatusStore(
386386
stageAttemptId: Int,
387387
offset: Int,
388388
length: Int,
389-
sortBy: v1.TaskSorting): Seq[v1.TaskData] = {
389+
sortBy: v1.TaskSorting,
390+
statuses: JList[v1.TaskStatus]): Seq[v1.TaskData] = {
390391
val (indexName, ascending) = sortBy match {
391392
case v1.TaskSorting.ID =>
392393
(None, true)
@@ -395,7 +396,7 @@ private[spark] class AppStatusStore(
395396
case v1.TaskSorting.DECREASING_RUNTIME =>
396397
(Some(TaskIndexNames.EXEC_RUN_TIME), false)
397398
}
398-
taskList(stageId, stageAttemptId, offset, length, indexName, ascending)
399+
taskList(stageId, stageAttemptId, offset, length, indexName, ascending, statuses)
399400
}
400401

401402
def taskList(
@@ -404,7 +405,8 @@ private[spark] class AppStatusStore(
404405
offset: Int,
405406
length: Int,
406407
sortBy: Option[String],
407-
ascending: Boolean): Seq[v1.TaskData] = {
408+
ascending: Boolean,
409+
statuses: JList[v1.TaskStatus] = List().asJava): Seq[v1.TaskData] = {
408410
val stageKey = Array(stageId, stageAttemptId)
409411
val base = store.view(classOf[TaskDataWrapper])
410412
val indexed = sortBy match {
@@ -417,7 +419,13 @@ private[spark] class AppStatusStore(
417419
}
418420

419421
val ordered = if (ascending) indexed else indexed.reverse()
420-
val taskDataWrapperIter = ordered.skip(offset).max(length).asScala
422+
val taskDataWrapperIter = if (statuses != null && !statuses.isEmpty) {
423+
val statusesStr = statuses.asScala.map(_.toString).toSet
424+
ordered.asScala.filter(s => statusesStr.contains(s.status)).slice(offset, offset + length)
425+
} else {
426+
ordered.skip(offset).max(length).asScala
427+
}
428+
421429
constructTaskDataList(taskDataWrapperIter)
422430
}
423431

core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,9 @@ private[v1] class StagesResource extends BaseAppResource {
9696
@PathParam("stageAttemptId") stageAttemptId: Int,
9797
@DefaultValue("0") @QueryParam("offset") offset: Int,
9898
@DefaultValue("20") @QueryParam("length") length: Int,
99-
@DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = {
100-
withUI(_.store.taskList(stageId, stageAttemptId, offset, length, sortBy))
99+
@DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting,
100+
@QueryParam("status") statuses: JList[TaskStatus]): Seq[TaskData] = {
101+
withUI(_.store.taskList(stageId, stageAttemptId, offset, length, sortBy, statuses))
101102
}
102103

103104
// This api needs to stay formatted exactly as it is below, since, it is being used by the
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
[ {
2+
"taskId" : 1,
3+
"index" : 1,
4+
"attempt" : 0,
5+
"launchTime" : "2015-05-06T13:03:06.502GMT",
6+
"duration" : 421,
7+
"executorId" : "driver",
8+
"host" : "localhost",
9+
"status" : "SUCCESS",
10+
"taskLocality" : "PROCESS_LOCAL",
11+
"speculative" : false,
12+
"accumulatorUpdates" : [ ],
13+
"taskMetrics" : {
14+
"executorDeserializeTime" : 31,
15+
"executorDeserializeCpuTime" : 0,
16+
"executorRunTime" : 350,
17+
"executorCpuTime" : 0,
18+
"resultSize" : 2010,
19+
"jvmGcTime" : 7,
20+
"resultSerializationTime" : 0,
21+
"memoryBytesSpilled" : 0,
22+
"diskBytesSpilled" : 0,
23+
"peakExecutionMemory" : 0,
24+
"inputMetrics" : {
25+
"bytesRead" : 60488,
26+
"recordsRead" : 10000
27+
},
28+
"outputMetrics" : {
29+
"bytesWritten" : 0,
30+
"recordsWritten" : 0
31+
},
32+
"shuffleReadMetrics" : {
33+
"remoteBlocksFetched" : 0,
34+
"localBlocksFetched" : 0,
35+
"fetchWaitTime" : 0,
36+
"remoteBytesRead" : 0,
37+
"remoteBytesReadToDisk" : 0,
38+
"localBytesRead" : 0,
39+
"recordsRead" : 0
40+
},
41+
"shuffleWriteMetrics" : {
42+
"bytesWritten" : 1710,
43+
"writeTime" : 3934399,
44+
"recordsWritten" : 10
45+
}
46+
},
47+
"executorLogs" : { },
48+
"schedulerDelay" : 40,
49+
"gettingResultTime" : 0
50+
}, {
51+
"taskId" : 2,
52+
"index" : 2,
53+
"attempt" : 0,
54+
"launchTime" : "2015-05-06T13:03:06.503GMT",
55+
"duration" : 419,
56+
"executorId" : "driver",
57+
"host" : "localhost",
58+
"status" : "SUCCESS",
59+
"taskLocality" : "PROCESS_LOCAL",
60+
"speculative" : false,
61+
"accumulatorUpdates" : [ ],
62+
"taskMetrics" : {
63+
"executorDeserializeTime" : 32,
64+
"executorDeserializeCpuTime" : 0,
65+
"executorRunTime" : 348,
66+
"executorCpuTime" : 0,
67+
"resultSize" : 2010,
68+
"jvmGcTime" : 7,
69+
"resultSerializationTime" : 2,
70+
"memoryBytesSpilled" : 0,
71+
"diskBytesSpilled" : 0,
72+
"peakExecutionMemory" : 0,
73+
"inputMetrics" : {
74+
"bytesRead" : 60488,
75+
"recordsRead" : 10000
76+
},
77+
"outputMetrics" : {
78+
"bytesWritten" : 0,
79+
"recordsWritten" : 0
80+
},
81+
"shuffleReadMetrics" : {
82+
"remoteBlocksFetched" : 0,
83+
"localBlocksFetched" : 0,
84+
"fetchWaitTime" : 0,
85+
"remoteBytesRead" : 0,
86+
"remoteBytesReadToDisk" : 0,
87+
"localBytesRead" : 0,
88+
"recordsRead" : 0
89+
},
90+
"shuffleWriteMetrics" : {
91+
"bytesWritten" : 1710,
92+
"writeTime" : 89885,
93+
"recordsWritten" : 10
94+
}
95+
},
96+
"executorLogs" : { },
97+
"schedulerDelay" : 37,
98+
"gettingResultTime" : 0
99+
} ]

0 commit comments

Comments
 (0)