File tree 6 files changed +32
-6
lines changed
flink-process-function-parent
flink-process-function/src/main/java/org/apache/flink/process/impl/context
flink-process-function-api/src/main/java/org/apache/flink/process/api/context
6 files changed +32
-6
lines changed Original file line number Diff line number Diff line change @@ -40,6 +40,13 @@ under the License.
40
40
<version >${project.version} </version >
41
41
</dependency >
42
42
43
+ <dependency >
44
+ <groupId >org.apache.flink</groupId >
45
+ <artifactId >flink-metrics-core</artifactId >
46
+ <version >${project.version} </version >
47
+ </dependency >
48
+
49
+
43
50
<!-- ================== test dependencies ================== -->
44
51
45
52
<dependency >
Original file line number Diff line number Diff line change @@ -71,12 +71,6 @@ under the License.
71
71
<version >${project.version} </version >
72
72
</dependency >
73
73
74
- <dependency >
75
- <groupId >org.apache.flink</groupId >
76
- <artifactId >flink-metrics-core</artifactId >
77
- <version >${project.version} </version >
78
- </dependency >
79
-
80
74
<!-- ArchUnit test dependencies -->
81
75
82
76
<dependency >
Original file line number Diff line number Diff line change 19
19
package org .apache .flink .process .api .context ;
20
20
21
21
import org .apache .flink .annotation .Experimental ;
22
+ import org .apache .flink .metrics .groups .OperatorMetricGroup ;
22
23
23
24
/**
24
25
* A RuntimeContext contains information about the context in which process functions are executed.
@@ -41,6 +42,9 @@ public interface RuntimeContext {
41
42
/** Get the {@link ProcessingTimeManager} of this process function. */
42
43
ProcessingTimeManager getProcessingTimeManager ();
43
44
45
+ /** Get the metric group of this process function. */
46
+ OperatorMetricGroup getMetricGroup ();
47
+
44
48
/** Get the {@link TimestampManager} of this process function. */
45
49
TimestampManager getTimestampManager ();
46
50
}
Original file line number Diff line number Diff line change 18
18
19
19
package org .apache .flink .process .impl .context ;
20
20
21
+ import org .apache .flink .metrics .groups .OperatorMetricGroup ;
21
22
import org .apache .flink .process .api .context .JobInfo ;
22
23
import org .apache .flink .process .api .context .NonPartitionedContext ;
23
24
import org .apache .flink .process .api .context .ProcessingTimeManager ;
@@ -61,6 +62,11 @@ public ProcessingTimeManager getProcessingTimeManager() {
61
62
return UnsupportedProcessingTimeManager .INSTANCE ;
62
63
}
63
64
65
+ @ Override
66
+ public OperatorMetricGroup getMetricGroup () {
67
+ return context .getMetricGroup ();
68
+ }
69
+
64
70
@ Override
65
71
public TimestampManager getTimestampManager () {
66
72
return context .getTimestampManager ();
Original file line number Diff line number Diff line change 18
18
19
19
package org .apache .flink .process .impl .context ;
20
20
21
+ import org .apache .flink .metrics .groups .OperatorMetricGroup ;
21
22
import org .apache .flink .process .api .context .JobInfo ;
22
23
import org .apache .flink .process .api .context .ProcessingTimeManager ;
23
24
import org .apache .flink .process .api .context .RuntimeContext ;
@@ -38,6 +39,8 @@ public class DefaultRuntimeContext implements RuntimeContext {
38
39
39
40
private final ProcessingTimeManager processingTimeManager ;
40
41
42
+ private final OperatorMetricGroup metricGroup ;
43
+
41
44
private final DefaultTimestampManager timestampManager ;
42
45
43
46
public DefaultRuntimeContext (
@@ -52,6 +55,7 @@ public DefaultRuntimeContext(
52
55
this .taskInfo = new DefaultTaskInfo (parallelism , maxParallelism , taskName );
53
56
this .stateManager = new DefaultStateManager (currentKeySupplier , currentKeySetter );
54
57
this .processingTimeManager = processingTimeManager ;
58
+ this .metricGroup = operatorContext .getMetricGroup ();
55
59
this .timestampManager = new DefaultTimestampManager ();
56
60
}
57
61
@@ -74,6 +78,11 @@ public ProcessingTimeManager getProcessingTimeManager() {
74
78
return processingTimeManager ;
75
79
}
76
80
81
+ @ Override
82
+ public OperatorMetricGroup getMetricGroup () {
83
+ return metricGroup ;
84
+ }
85
+
77
86
@ Override
78
87
public DefaultTimestampManager getTimestampManager () {
79
88
return timestampManager ;
Original file line number Diff line number Diff line change 18
18
19
19
package org .apache .flink .process .impl .context ;
20
20
21
+ import org .apache .flink .metrics .groups .OperatorMetricGroup ;
21
22
import org .apache .flink .process .api .context .JobInfo ;
22
23
import org .apache .flink .process .api .context .ProcessingTimeManager ;
23
24
import org .apache .flink .process .api .context .StateManager ;
@@ -63,6 +64,11 @@ public ProcessingTimeManager getProcessingTimeManager() {
63
64
return UnsupportedProcessingTimeManager .INSTANCE ;
64
65
}
65
66
67
+ @ Override
68
+ public OperatorMetricGroup getMetricGroup () {
69
+ return context .getMetricGroup ();
70
+ }
71
+
66
72
@ Override
67
73
public TimestampManager getTimestampManager () {
68
74
return context .getTimestampManager ();
You can’t perform that action at this time.
0 commit comments