Skip to content

Commit b333ed0

Browse files
committed
[SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing
### What changes were proposed in this pull request? Ignore internal accumulators that use unrecognized types rather than crashing so that an event log containing such accumulators can still be converted to JSON and logged. ### Why are the changes needed? A user may use internal accumulators by adding the `internal.metrics.` prefix to the accumulator name to hide sensitive information from UI (Accumulators except internal ones will be shown in Spark UI). However, `org.apache.spark.util.JsonProtocol.accumValueToJson` assumes an internal accumulator has only 3 possible types: `int`, `long`, and `java.util.List[(BlockId, BlockStatus)]`. When an internal accumulator uses an unexpected type, it will crash. An event log that contains such accumulator will be dropped because it cannot be converted to JSON, and it will cause weird UI issue when rendering in Spark History Server. For example, if `SparkListenerTaskEnd` is dropped because of this issue, the user will see the task is still running even if it was finished. It's better to make `accumValueToJson` more robust because it's up to the user to pick up the accumulator name. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The new unit tests. Closes apache#28744 from zsxwing/fix-internal-accum. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent a42af81 commit b333ed0

File tree

2 files changed

+63
-5
lines changed

2 files changed

+63
-5
lines changed

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,22 @@ private[spark] object JsonProtocol {
363363
case v: Long => JInt(v)
364364
// We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
365365
// the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]`
366-
case v =>
367-
JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map {
368-
case (id, status) =>
369-
("Block ID" -> id.toString) ~
370-
("Status" -> blockStatusToJson(status))
366+
case v: java.util.List[_] =>
367+
JArray(v.asScala.toList.flatMap {
368+
case (id: BlockId, status: BlockStatus) =>
369+
Some(
370+
("Block ID" -> id.toString) ~
371+
("Status" -> blockStatusToJson(status))
372+
)
373+
case _ =>
374+
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should
375+
// not crash.
376+
None
371377
})
378+
case _ =>
379+
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not
380+
// crash.
381+
JNothing
372382
}
373383
} else {
374384
// For all external accumulators, just use strings

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,54 @@ class JsonProtocolSuite extends SparkFunSuite {
507507
testAccumValue(Some("anything"), 123, JString("123"))
508508
}
509509

510+
/** Create an AccumulableInfo and verify we can serialize and deserialize it. */
511+
private def testAccumulableInfo(
512+
name: String,
513+
value: Option[Any],
514+
expectedValue: Option[Any]): Unit = {
515+
val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
516+
val accum = AccumulableInfo(
517+
123L,
518+
Some(name),
519+
update = value,
520+
value = value,
521+
internal = isInternal,
522+
countFailedValues = false)
523+
val json = JsonProtocol.accumulableInfoToJson(accum)
524+
val newAccum = JsonProtocol.accumulableInfoFromJson(json)
525+
assert(newAccum == accum.copy(update = expectedValue, value = expectedValue))
526+
}
527+
528+
test("SPARK-31923: unexpected value type of internal accumulator") {
529+
// Because a user may use `METRICS_PREFIX` in an accumulator name, we should test unexpected
530+
// types to make sure we don't crash.
531+
import InternalAccumulator.METRICS_PREFIX
532+
testAccumulableInfo(
533+
METRICS_PREFIX + "fooString",
534+
value = Some("foo"),
535+
expectedValue = None)
536+
testAccumulableInfo(
537+
METRICS_PREFIX + "fooList",
538+
value = Some(java.util.Arrays.asList("string")),
539+
expectedValue = Some(java.util.Collections.emptyList())
540+
)
541+
val blocks = Seq(
542+
(TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
543+
(TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
544+
testAccumulableInfo(
545+
METRICS_PREFIX + "fooList",
546+
value = Some(java.util.Arrays.asList(
547+
"string",
548+
blocks(0),
549+
blocks(1))),
550+
expectedValue = Some(blocks.asJava)
551+
)
552+
testAccumulableInfo(
553+
METRICS_PREFIX + "fooSet",
554+
value = Some(Set("foo")),
555+
expectedValue = None)
556+
}
557+
510558
test("SPARK-30936: forwards compatibility - ignore unknown fields") {
511559
val expected = TestListenerEvent("foo", 123)
512560
val unknownFieldsJson =

0 commit comments

Comments
 (0)