Skip to content

Commit

Permalink
[API] Expose OperatorMetricGroup via RuntimeContext
Browse files Browse the repository at this point in the history
Move the dependency of flink-metric-core from flink-core to flink-core-api
  • Loading branch information
reswqa committed Feb 28, 2024
1 parent 9d6751a commit 673c731
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 6 deletions.
6 changes: 6 additions & 0 deletions flink-core-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ under the License.
<artifactId>flink-annotations</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
6 changes: 0 additions & 6 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,6 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
</dependency>

<!-- ArchUnit test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.process.api.context;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.metrics.groups.OperatorMetricGroup;

/**
* A RuntimeContext contains information about the context in which process functions are executed.
Expand All @@ -41,6 +42,9 @@ public interface RuntimeContext {
/** Get the {@link ProcessingTimeManager} of this process function. */
ProcessingTimeManager getProcessingTimeManager();

/** Get the metric group of this process function. */
OperatorMetricGroup getMetricGroup();

/** Get the {@link TimestampManager} of this process function. */
TimestampManager getTimestampManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.process.impl.context;

import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.process.api.context.JobInfo;
import org.apache.flink.process.api.context.NonPartitionedContext;
import org.apache.flink.process.api.context.ProcessingTimeManager;
Expand Down Expand Up @@ -61,6 +62,11 @@ public ProcessingTimeManager getProcessingTimeManager() {
return UnsupportedProcessingTimeManager.INSTANCE;
}

@Override
public OperatorMetricGroup getMetricGroup() {
return context.getMetricGroup();
}

@Override
public TimestampManager getTimestampManager() {
return context.getTimestampManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.process.impl.context;

import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.process.api.context.JobInfo;
import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.process.api.context.RuntimeContext;
Expand All @@ -37,6 +38,8 @@ public class DefaultRuntimeContext implements RuntimeContext {

private final ProcessingTimeManager processingTimeManager;

private final OperatorMetricGroup metricGroup;

private final DefaultTimestampManager timestampManager;

public DefaultRuntimeContext(
Expand All @@ -50,6 +53,7 @@ public DefaultRuntimeContext(
this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName);
this.stateManager = new DefaultStateManager(currentKeySupplier);
this.processingTimeManager = processingTimeManager;
this.metricGroup = operatorContext.getMetricGroup();
this.timestampManager = new DefaultTimestampManager();
}

Expand All @@ -72,6 +76,11 @@ public ProcessingTimeManager getProcessingTimeManager() {
return processingTimeManager;
}

@Override
public OperatorMetricGroup getMetricGroup() {
return metricGroup;
}

@Override
public DefaultTimestampManager getTimestampManager() {
return timestampManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.process.impl.context;

import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.process.api.context.JobInfo;
import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.process.api.context.StateManager;
Expand Down Expand Up @@ -61,6 +62,11 @@ public ProcessingTimeManager getProcessingTimeManager() {
return UnsupportedProcessingTimeManager.INSTANCE;
}

@Override
public OperatorMetricGroup getMetricGroup() {
return context.getMetricGroup();
}

@Override
public TimestampManager getTimestampManager() {
return context.getTimestampManager();
Expand Down

0 comments on commit 673c731

Please sign in to comment.