Skip to content

Commit f623f54

Browse files
ash211robert3005
authored andcommitted
SPARK-18364 Add relevant measurements for Timer metrics (apache#236)
1 parent a87a7e1 commit f623f54

File tree

2 files changed

+90
-4
lines changed

2 files changed

+90
-4
lines changed

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,27 @@
1919

2020
import com.codahale.metrics.*;
2121
import com.google.common.annotations.VisibleForTesting;
22+
import com.google.common.collect.ImmutableMap;
2223
import org.apache.hadoop.metrics2.MetricsCollector;
2324
import org.apache.hadoop.metrics2.MetricsInfo;
2425
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
2526
import org.apache.hadoop.metrics2.MetricsSource;
2627

2728
import java.util.Map;
29+
import java.util.concurrent.TimeUnit;
2830

2931
/**
3032
* Modeled off of YARN's NodeManagerMetrics.
3133
*/
3234
public class YarnShuffleServiceMetrics implements MetricsSource {
3335

36+
// Converting from the dropwizard-metrics default of nanoseconds into milliseconds to match how
37+
// MetricsServlet serializes times (to milliseconds) configured via the MetricsModule passed into
38+
// its Jackson ObjectMapper. Without this rate factor applied, the Timer metrics from
39+
// ExternalShuffleBlockManager#ShuffleMetrics with "Millis" suffixes are misleading, as they
40+
// would otherwise contain values in nanoseconds units
41+
private static final double rateFactor = (double) TimeUnit.MILLISECONDS.toNanos(1L);
42+
3443
private final MetricSet metricSet;
3544

3645
public YarnShuffleServiceMetrics(MetricSet metricSet) {
@@ -52,13 +61,50 @@ public void getMetrics(MetricsCollector collector, boolean all) {
5261
}
5362
}
5463

64+
private static void addSnapshotToMetricRecordBuilder(Snapshot snapshot,
65+
MetricsRecordBuilder builder,
66+
String name,
67+
String metricType) {
68+
69+
ImmutableMap<String, Double> doubleValues = ImmutableMap.<String, Double>builder()
70+
.put("median", snapshot.getMedian())
71+
.put("mean", snapshot.getMean())
72+
.put("75th", snapshot.get75thPercentile())
73+
.put("95th", snapshot.get95thPercentile())
74+
.put("98th", snapshot.get98thPercentile())
75+
.put("99th", snapshot.get99thPercentile())
76+
.put("999th", snapshot.get999thPercentile())
77+
.build();
78+
79+
ImmutableMap<String, Long> longValues = ImmutableMap.<String, Long>builder()
80+
.put("min", snapshot.getMin())
81+
.put("max", snapshot.getMax())
82+
.build();
83+
84+
for (Map.Entry<String, Double> entry : doubleValues.entrySet()) {
85+
builder.addGauge(
86+
new ShuffleServiceMetricsInfo(name + "_" + entry.getKey(),
87+
entry.getKey() + " of " + metricType + " " + name),
88+
entry.getValue() / rateFactor);
89+
}
90+
91+
for (Map.Entry<String, Long> entry : longValues.entrySet()) {
92+
builder.addGauge(
93+
new ShuffleServiceMetricsInfo(name + "_" + entry.getKey(),
94+
entry.getKey() + " of " + metricType + " " + name),
95+
entry.getValue() / rateFactor);
96+
}
97+
98+
}
99+
55100
@VisibleForTesting
56101
public static void collectMetric(
57102
MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
58103

59104
// The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics
60105
if (metric instanceof Timer) {
61106
Timer t = (Timer) metric;
107+
Snapshot snapshot = t.getSnapshot();
62108
metricsRecordBuilder
63109
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
64110
t.getCount())
@@ -73,6 +119,7 @@ public static void collectMetric(
73119
t.getOneMinuteRate())
74120
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
75121
t.getMeanRate());
122+
addSnapshotToMetricRecordBuilder(snapshot, metricsRecordBuilder, name, "timer");
76123
} else if (metric instanceof Meter) {
77124
Meter m = (Meter) metric;
78125
metricsRecordBuilder

resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
*/
1717
package org.apache.spark.network.yarn
1818

19-
import org.apache.hadoop.metrics2.MetricsRecordBuilder
19+
import java.util.concurrent.TimeUnit
20+
21+
import com.codahale.metrics.Timer
22+
import org.apache.hadoop.metrics2.{MetricsInfo, MetricsRecordBuilder}
23+
import org.mockito.ArgumentCaptor
2024
import org.mockito.Matchers._
2125
import org.mockito.Mockito.{mock, times, verify, when}
2226
import org.scalatest.Matchers
@@ -44,8 +48,7 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
4448

4549
// these three metrics have the same effect on the collector
4650
for (testname <- Seq("openBlockRequestLatencyMillis",
47-
"registerExecutorRequestLatencyMillis",
48-
"blockTransferRateBytes")) {
51+
"registerExecutorRequestLatencyMillis")) {
4952
test(s"$testname - collector receives correct types") {
5053
val builder = mock(classOf[MetricsRecordBuilder])
5154
when(builder.addCounter(any(), anyLong())).thenReturn(builder)
@@ -55,10 +58,22 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
5558
metrics.getMetrics.get(testname))
5659

5760
verify(builder).addCounter(anyObject(), anyLong())
58-
verify(builder, times(4)).addGauge(anyObject(), anyDouble())
61+
verify(builder, times(13)).addGauge(anyObject(), anyDouble())
5962
}
6063
}
6164

65+
test(s"blockTransferRateBytes - collector receives correct types") {
66+
val builder = mock(classOf[MetricsRecordBuilder])
67+
when(builder.addCounter(any(), anyLong())).thenReturn(builder)
68+
when(builder.addGauge(any(), anyDouble())).thenReturn(builder)
69+
70+
YarnShuffleServiceMetrics.collectMetric(builder, "blockTransferRateBytes",
71+
metrics.getMetrics.get("blockTransferRateBytes"))
72+
73+
verify(builder).addCounter(anyObject(), anyLong())
74+
verify(builder, times(4)).addGauge(anyObject(), anyDouble())
75+
}
76+
6277
// this metric writes only one gauge to the collector
6378
test("registeredExecutorsSize - collector receives correct types") {
6479
val builder = mock(classOf[MetricsRecordBuilder])
@@ -71,4 +86,28 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
7186
// only one
7287
verify(builder).addGauge(anyObject(), anyInt())
7388
}
89+
90+
test("openBlockRequestLatencyMillis has correct units") {
91+
val builder = mock(classOf[MetricsRecordBuilder])
92+
when(builder.addCounter(any(), anyLong())).thenReturn(builder)
93+
when(builder.addGauge(any(), anyDouble())).thenReturn(builder)
94+
95+
metrics.getMetrics.get("openBlockRequestLatencyMillis").asInstanceOf[Timer]
96+
.update(1, TimeUnit.MILLISECONDS)
97+
98+
YarnShuffleServiceMetrics.collectMetric(builder, "openBlockRequestLatencyMillis",
99+
metrics.getMetrics.get("openBlockRequestLatencyMillis"))
100+
101+
val descriptorCaptor = ArgumentCaptor.forClass(classOf[MetricsInfo])
102+
val doubleCaptor = ArgumentCaptor.forClass(classOf[java.lang.Double])
103+
verify(builder, times(13)).addGauge(descriptorCaptor.capture(), doubleCaptor.capture())
104+
105+
descriptorCaptor.getAllValues.asScala.zip(doubleCaptor.getAllValues.asScala).foreach {
106+
case (arg: MetricsInfo, double: java.lang.Double) =>
107+
arg.name() match {
108+
case s if !s.contains("rate") => double shouldEqual 1.0 // still in milliseconds
109+
case _ => None
110+
}
111+
}
112+
}
74113
}

0 commit comments

Comments
 (0)