Skip to content

Commit c952998

Browse files
committed
[FLINK-34549][API] Expose MetricGroup via RuntimeContext
1 parent d8cedd1 commit c952998

File tree

7 files changed

+38
-6
lines changed

7 files changed

+38
-6
lines changed

flink-core-api/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ under the License.
4040
<version>${project.version}</version>
4141
</dependency>
4242

43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-metrics-core</artifactId>
46+
<version>${project.version}</version>
47+
</dependency>
48+
49+
4350
<!-- ================== test dependencies ================== -->
4451

4552
<dependency>

flink-core/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,6 @@ under the License.
7171
<version>${project.version}</version>
7272
</dependency>
7373

74-
<dependency>
75-
<groupId>org.apache.flink</groupId>
76-
<artifactId>flink-metrics-core</artifactId>
77-
<version>${project.version}</version>
78-
</dependency>
79-
8074
<!-- ArchUnit test dependencies -->
8175

8276
<dependency>

flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.datastream.api.context;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.metrics.MetricGroup;
2223

2324
/**
2425
* A RuntimeContext contains information about the context in which process functions are executed.
@@ -33,4 +34,7 @@ public interface RuntimeContext {
3334

3435
/** Get the {@link TaskInfo} of this process function. */
3536
TaskInfo getTaskInfo();
37+
38+
/** Get the metric group of this process function. */
39+
MetricGroup getMetricGroup();
3640
}

flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.datastream.api.context.NonPartitionedContext;
2323
import org.apache.flink.datastream.api.context.TaskInfo;
2424
import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
25+
import org.apache.flink.metrics.MetricGroup;
2526

2627
/** The default implementation of {@link NonPartitionedContext}. */
2728
public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> {
@@ -45,4 +46,9 @@ public JobInfo getJobInfo() {
4546
public TaskInfo getTaskInfo() {
4647
return context.getTaskInfo();
4748
}
49+
50+
@Override
51+
public MetricGroup getMetricGroup() {
52+
return context.getMetricGroup();
53+
}
4854
}

flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
2424
import org.apache.flink.datastream.api.context.RuntimeContext;
2525
import org.apache.flink.datastream.api.context.TaskInfo;
26+
import org.apache.flink.metrics.MetricGroup;
2627

2728
import java.util.function.Consumer;
2829
import java.util.function.Supplier;
@@ -64,4 +65,9 @@ public DefaultStateManager getStateManager() {
6465
public ProcessingTimeManager getProcessingTimeManager() {
6566
return processingTimeManager;
6667
}
68+
69+
@Override
70+
public MetricGroup getMetricGroup() {
71+
return context.getMetricGroup();
72+
}
6773
}

flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.datastream.api.context.JobInfo;
2222
import org.apache.flink.datastream.api.context.RuntimeContext;
2323
import org.apache.flink.datastream.api.context.TaskInfo;
24+
import org.apache.flink.metrics.MetricGroup;
2425
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2526

2627
/** The default implementation of {@link RuntimeContext}. */
@@ -29,13 +30,16 @@ public class DefaultRuntimeContext implements RuntimeContext {
2930

3031
private final DefaultTaskInfo taskInfo;
3132

33+
private final MetricGroup metricGroup;
34+
3235
public DefaultRuntimeContext(
3336
StreamingRuntimeContext operatorContext,
3437
int parallelism,
3538
int maxParallelism,
3639
String taskName) {
3740
this.jobInfo = new DefaultJobInfo(operatorContext);
3841
this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName);
42+
this.metricGroup = operatorContext.getMetricGroup();
3943
}
4044

4145
@Override
@@ -47,4 +51,9 @@ public JobInfo getJobInfo() {
4751
public TaskInfo getTaskInfo() {
4852
return taskInfo;
4953
}
54+
55+
@Override
56+
public MetricGroup getMetricGroup() {
57+
return metricGroup;
58+
}
5059
}

flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.datastream.api.context.TaskInfo;
2323
import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
2424
import org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
25+
import org.apache.flink.metrics.MetricGroup;
2526

2627
/** The default implementation of {@link TwoOutputNonPartitionedContext}. */
2728
public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
@@ -47,4 +48,9 @@ public JobInfo getJobInfo() {
4748
public TaskInfo getTaskInfo() {
4849
return context.getTaskInfo();
4950
}
51+
52+
@Override
53+
public MetricGroup getMetricGroup() {
54+
return context.getMetricGroup();
55+
}
5056
}

0 commit comments

Comments
 (0)