Skip to content

Commit a7f3780

Browse files
committed
[FLINK-34549][API] Expose MetricGroup via RuntimeContext
1 parent f20f938 commit a7f3780

File tree

11 files changed

+52
-11
lines changed

11 files changed

+52
-11
lines changed

flink-core-api/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ under the License.
4040
<version>${project.version}</version>
4141
</dependency>
4242

43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-metrics-core</artifactId>
46+
<version>${project.version}</version>
47+
</dependency>
48+
49+
4350
<!-- ================== test dependencies ================== -->
4451

4552
<dependency>

flink-core/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,6 @@ under the License.
7171
<version>${project.version}</version>
7272
</dependency>
7373

74-
<dependency>
75-
<groupId>org.apache.flink</groupId>
76-
<artifactId>flink-metrics-core</artifactId>
77-
<version>${project.version}</version>
78-
</dependency>
79-
8074
<!-- ArchUnit test dependencies -->
8175

8276
<dependency>

flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.datastream.api.context;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.metrics.MetricGroup;
2223

2324
/**
2425
* A RuntimeContext contains information about the context in which process functions are executed.
@@ -33,4 +34,7 @@ public interface RuntimeContext {
3334

3435
/** Get the {@link TaskInfo} of this process function. */
3536
TaskInfo getTaskInfo();
37+
38+
/** Get the metric group of this process function. */
39+
MetricGroup getMetricGroup();
3640
}

flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.datastream.api.context.NonPartitionedContext;
2323
import org.apache.flink.datastream.api.context.TaskInfo;
2424
import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
25+
import org.apache.flink.metrics.MetricGroup;
2526

2627
/** The default implementation of {@link NonPartitionedContext}. */
2728
public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> {
@@ -45,4 +46,9 @@ public JobInfo getJobInfo() {
4546
public TaskInfo getTaskInfo() {
4647
return context.getTaskInfo();
4748
}
49+
50+
@Override
51+
public MetricGroup getMetricGroup() {
52+
return context.getMetricGroup();
53+
}
4854
}

flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
2424
import org.apache.flink.datastream.api.context.RuntimeContext;
2525
import org.apache.flink.datastream.api.context.TaskInfo;
26+
import org.apache.flink.metrics.MetricGroup;
2627

2728
import java.util.function.Consumer;
2829
import java.util.function.Supplier;
@@ -64,4 +65,9 @@ public DefaultStateManager getStateManager() {
6465
public ProcessingTimeManager getProcessingTimeManager() {
6566
return processingTimeManager;
6667
}
68+
69+
@Override
70+
public MetricGroup getMetricGroup() {
71+
return context.getMetricGroup();
72+
}
6773
}

flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.datastream.api.context.JobInfo;
2222
import org.apache.flink.datastream.api.context.RuntimeContext;
2323
import org.apache.flink.datastream.api.context.TaskInfo;
24+
import org.apache.flink.metrics.MetricGroup;
2425
import org.apache.flink.runtime.jobgraph.JobType;
2526

2627
/** The default implementation of {@link RuntimeContext}. */
@@ -29,10 +30,18 @@ public class DefaultRuntimeContext implements RuntimeContext {
2930

3031
private final DefaultTaskInfo taskInfo;
3132

33+
private final MetricGroup metricGroup;
34+
3235
public DefaultRuntimeContext(
33-
String jobName, JobType jobType, int parallelism, int maxParallelism, String taskName) {
36+
String jobName,
37+
JobType jobType,
38+
int parallelism,
39+
int maxParallelism,
40+
String taskName,
41+
MetricGroup metricGroup) {
3442
this.jobInfo = new DefaultJobInfo(jobName, jobType);
3543
this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName);
44+
this.metricGroup = metricGroup;
3645
}
3746

3847
@Override
@@ -44,4 +53,9 @@ public JobInfo getJobInfo() {
4453
public TaskInfo getTaskInfo() {
4554
return taskInfo;
4655
}
56+
57+
@Override
58+
public MetricGroup getMetricGroup() {
59+
return metricGroup;
60+
}
4761
}

flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.datastream.api.context.TaskInfo;
2323
import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
2424
import org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
25+
import org.apache.flink.metrics.MetricGroup;
2526

2627
/** The default implementation of {@link TwoOutputNonPartitionedContext}. */
2728
public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
@@ -47,4 +48,9 @@ public JobInfo getJobInfo() {
4748
public TaskInfo getTaskInfo() {
4849
return context.getTaskInfo();
4950
}
51+
52+
@Override
53+
public MetricGroup getMetricGroup() {
54+
return context.getMetricGroup();
55+
}
5056
}

flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public void open() throws Exception {
6464
operatorContext.getJobType(),
6565
taskInfo.getNumberOfParallelSubtasks(),
6666
taskInfo.getMaxNumberOfParallelSubtasks(),
67-
taskInfo.getTaskName());
67+
taskInfo.getTaskName(),
68+
operatorContext.getMetricGroup());
6869
partitionedContext =
6970
new DefaultPartitionedContext(
7071
context, this::currentKey, this::setCurrentKey, getProcessingTimeManager());

flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public void open() throws Exception {
6868
operatorContext.getJobType(),
6969
taskInfo.getNumberOfParallelSubtasks(),
7070
taskInfo.getMaxNumberOfParallelSubtasks(),
71-
taskInfo.getTaskName());
71+
taskInfo.getTaskName(),
72+
operatorContext.getMetricGroup());
7273
this.partitionedContext =
7374
new DefaultPartitionedContext(
7475
context, this::currentKey, this::setCurrentKey, getProcessingTimeManager());

flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public void open() throws Exception {
6868
operatorContext.getJobType(),
6969
taskInfo.getNumberOfParallelSubtasks(),
7070
taskInfo.getMaxNumberOfParallelSubtasks(),
71-
taskInfo.getTaskName());
71+
taskInfo.getTaskName(),
72+
operatorContext.getMetricGroup());
7273
this.partitionedContext =
7374
new DefaultPartitionedContext(
7475
context, this::currentKey, this::setCurrentKey, getProcessingTimeManager());

0 commit comments

Comments
 (0)