From 673c731d386059215999ba70b54debbb46762bf4 Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Wed, 28 Feb 2024 17:22:41 +0800 Subject: [PATCH] [API] Expose OperatorMetricGroup via RuntimeContext Move the dependency of flink-metric-core from flink-core to flink-core-api --- flink-core-api/pom.xml | 6 ++++++ flink-core/pom.xml | 6 ------ .../apache/flink/process/api/context/RuntimeContext.java | 4 ++++ .../impl/context/DefaultNonPartitionedContext.java | 6 ++++++ .../process/impl/context/DefaultRuntimeContext.java | 9 +++++++++ .../context/DefaultTwoOutputNonPartitionedContext.java | 6 ++++++ 6 files changed, 31 insertions(+), 6 deletions(-) diff --git a/flink-core-api/pom.xml b/flink-core-api/pom.xml index bb98d0023791a..6b3fd4a3189da 100644 --- a/flink-core-api/pom.xml +++ b/flink-core-api/pom.xml @@ -39,6 +39,12 @@ under the License. flink-annotations ${project.version} + + + org.apache.flink + flink-metrics-core + ${project.version} + diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 7c267f76fb025..4521e7464da39 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -71,12 +71,6 @@ under the License. ${project.version} - - org.apache.flink - flink-metrics-core - ${project.version} - - diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/context/RuntimeContext.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/context/RuntimeContext.java index b866797115a6e..f3bf954da311f 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/context/RuntimeContext.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/context/RuntimeContext.java @@ -19,6 +19,7 @@ package org.apache.flink.process.api.context; import org.apache.flink.annotation.Experimental; +import org.apache.flink.metrics.groups.OperatorMetricGroup; /** * A RuntimeContext contains information about the context in which process functions are executed. @@ -41,6 +42,9 @@ public interface RuntimeContext { /** Get the {@link ProcessingTimeManager} of this process function. */ ProcessingTimeManager getProcessingTimeManager(); + /** Get the metric group of this process function. */ + OperatorMetricGroup getMetricGroup(); + /** Get the {@link TimestampManager} of this process function. */ TimestampManager getTimestampManager(); } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultNonPartitionedContext.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultNonPartitionedContext.java index 0b2998825e143..47c1be254d79d 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultNonPartitionedContext.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultNonPartitionedContext.java @@ -18,6 +18,7 @@ package org.apache.flink.process.impl.context; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.process.api.context.JobInfo; import org.apache.flink.process.api.context.NonPartitionedContext; import org.apache.flink.process.api.context.ProcessingTimeManager; @@ -61,6 +62,11 @@ public ProcessingTimeManager getProcessingTimeManager() { return UnsupportedProcessingTimeManager.INSTANCE; } + @Override + public OperatorMetricGroup getMetricGroup() { + return context.getMetricGroup(); + } + @Override public TimestampManager getTimestampManager() { return context.getTimestampManager(); diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultRuntimeContext.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultRuntimeContext.java index ab426f7e042e5..3ff9eb9c77fa1 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultRuntimeContext.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultRuntimeContext.java @@ -18,6 +18,7 @@ package org.apache.flink.process.impl.context; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.process.api.context.JobInfo; import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.context.RuntimeContext; @@ -37,6 +38,8 @@ public class DefaultRuntimeContext implements RuntimeContext { private final ProcessingTimeManager processingTimeManager; + private final OperatorMetricGroup metricGroup; + private final DefaultTimestampManager timestampManager; public DefaultRuntimeContext( @@ -50,6 +53,7 @@ public DefaultRuntimeContext( this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName); this.stateManager = new DefaultStateManager(currentKeySupplier); this.processingTimeManager = processingTimeManager; + this.metricGroup = operatorContext.getMetricGroup(); this.timestampManager = new DefaultTimestampManager(); } @@ -72,6 +76,11 @@ public ProcessingTimeManager getProcessingTimeManager() { return processingTimeManager; } + @Override + public OperatorMetricGroup getMetricGroup() { + return metricGroup; + } + @Override public DefaultTimestampManager getTimestampManager() { return timestampManager; diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultTwoOutputNonPartitionedContext.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultTwoOutputNonPartitionedContext.java index e4e5ec4c94a2f..7f0cb1d8c46a8 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultTwoOutputNonPartitionedContext.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultTwoOutputNonPartitionedContext.java @@ -18,6 +18,7 @@ package org.apache.flink.process.impl.context; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.process.api.context.JobInfo; import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.context.StateManager; @@ -61,6 +62,11 @@ public ProcessingTimeManager getProcessingTimeManager() { return UnsupportedProcessingTimeManager.INSTANCE; } + @Override + public OperatorMetricGroup getMetricGroup() { + return context.getMetricGroup(); + } + @Override public TimestampManager getTimestampManager() { return context.getTimestampManager();