Skip to content

Commit

Permalink
[FLINK-36390][core] Remove deprecated open(Configuration) method in R…
Browse files Browse the repository at this point in the history
…ichFunction
  • Loading branch information
reswqa committed Sep 27, 2024
1 parent be4549d commit f181e17
Show file tree
Hide file tree
Showing 38 changed files with 136 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
}

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
Expand Down
6 changes: 3 additions & 3 deletions docs/content.zh/docs/learn-flink/etl.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ minutesByStartCell

对其中的每一个接口,Flink 同样提供了一个所谓 "rich" 的变体,如 `RichFlatMapFunction`,其中增加了以下方法,包括:

- `open(Configuration c)`
- `open(OpenContext context)`
- `close()`
- `getRuntimeContext()`

Expand Down Expand Up @@ -280,7 +280,7 @@ public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;

@Override
public void open(Configuration conf) {
public void open(OpenContext ctx) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
Expand Down Expand Up @@ -373,7 +373,7 @@ public static class ControlFunction extends RichCoFlatMapFunction<String, String
private ValueState<Boolean> blocked;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
Expand Down
4 changes: 2 additions & 2 deletions docs/content.zh/docs/learn-flink/event_driven.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static class PseudoWindow extends

@Override
// 在初始化期间调用一次。
public void open(Configuration conf) {
public void open(OpenContext ctx) {
. . .
}

Expand Down Expand Up @@ -126,7 +126,7 @@ public static class PseudoWindow extends
private transient MapState<Long, Float> sumOfTips;

@Override
public void open(Configuration conf) {
public void open(OpenContext ctx) {

MapStateDescriptor<Long, Float> sumDesc =
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
Expand Down
14 changes: 7 additions & 7 deletions docs/content.zh/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
Expand Down Expand Up @@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
Expand Down Expand Up @@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction<String, String> {
private transient int valueToExpose = 0;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
Expand Down Expand Up @@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
Expand Down Expand Up @@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

Expand Down Expand Up @@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
Expand Down Expand Up @@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();

this.meter = getRuntimeContext()
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/datastream/fault-tolerance/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
}

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
Expand Down
6 changes: 3 additions & 3 deletions docs/content/docs/learn-flink/etl.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Abstract Method pattern.
For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
`RichFlatMapFunction`, which has some additional methods, including:

- `open(Configuration c)`
- `open(OpenContext context)`
- `close()`
- `getRuntimeContext()`

Expand Down Expand Up @@ -329,7 +329,7 @@ public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;

@Override
public void open(Configuration conf) {
public void open(OpenContext ctx) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
Expand Down Expand Up @@ -447,7 +447,7 @@ public static class ControlFunction extends RichCoFlatMapFunction<String, String
private ValueState<Boolean> blocked;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
Expand Down
4 changes: 2 additions & 2 deletions docs/content/docs/learn-flink/event_driven.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static class PseudoWindow extends

@Override
// Called once during initialization.
public void open(Configuration conf) {
public void open(OpenContext ctx) {
. . .
}

Expand Down Expand Up @@ -116,7 +116,7 @@ Things to be aware of:
private transient MapState<Long, Float> sumOfTips;

@Override
public void open(Configuration conf) {
public void open(OpenContext ctx) {

MapStateDescriptor<Long, Float> sumDesc =
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
Expand Down
14 changes: 7 additions & 7 deletions docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
Expand Down Expand Up @@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
Expand Down Expand Up @@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction<String, String> {
private transient int valueToExpose = 0;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
Expand Down Expand Up @@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
Expand Down Expand Up @@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

Expand Down Expand Up @@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
Expand Down Expand Up @@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;

@Override
public void open(Configuration config) {
public void open(OpenContext ctx) {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();

this.meter = getRuntimeContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;

import java.io.Serializable;

Expand Down Expand Up @@ -69,7 +68,7 @@ public IterationRuntimeContext getIterationRuntimeContext() {
// --------------------------------------------------------------------------------------------

@Override
public void open(Configuration parameters) throws Exception {}
public void open(OpenContext openContext) throws Exception {}

@Override
public void close() throws Exception {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*
* private Map<Long, String> map;
*
* public void open(Configuration cfg) throws Exception {
* public void open(OpenContext ctx) throws Exception {
* getRuntimeContext().getBroadcastVariableWithInitializer("mapvar",
* new BroadcastVariableInitializer<Tuple2<Long, String>, Map<Long, String>>() {
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;

/**
* The {@link OpenContext} interface provides necessary information required by the {@link
* RichFunction} when it is opened. The {@link OpenContext} is currently empty because it can be
* used to add more methods without affecting the signature of {@code RichFunction#open}.
*/
@PublicEvolving
@Public
public interface OpenContext {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;

/**
* An base interface for all rich user-defined functions. This class defines methods for the life
Expand All @@ -30,51 +29,6 @@
@Public
public interface RichFunction extends Function {

/**
* Initialization method for the function. It is called before the actual working methods (like
* <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
* part of an iteration, this method will be invoked at the beginning of each iteration
* superstep.
*
* <p>The configuration object passed to the function can be used for configuration and
* initialization. The configuration contains all parameters that were configured on the
* function in the program composition.
*
* <pre>{@code
* public class MyFilter extends RichFilterFunction<String> {
*
* private String searchString;
*
* public void open(Configuration parameters) {
* this.searchString = parameters.getString("foo");
* }
*
* public boolean filter(String value) {
* return value.equals(searchString);
* }
* }
* }</pre>
*
* <p>By default, this method does nothing.
*
* @param parameters The configuration containing the parameters attached to the contract.
* @throws Exception Implementations may forward exceptions, which are caught by the runtime.
* When the runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
* @see org.apache.flink.configuration.Configuration
* @deprecated This method is deprecated since Flink 1.19. The users are recommended to
* implement {@code open(OpenContext openContext)} and implement {@code open(Configuration
* parameters)} with an empty body instead. 1. If you implement {@code open(OpenContext
* openContext)}, the {@code open(OpenContext openContext)} will be invoked and the {@code
* open(Configuration parameters)} won't be invoked. 2. If you don't implement {@code
* open(OpenContext openContext)}, the {@code open(Configuration parameters)} will be
* invoked in the default implementation of the {@code open(OpenContext openContext)}.
* @see <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231">
* FLIP-344: Remove parameter in RichFunction#open </a>
*/
@Deprecated
void open(Configuration parameters) throws Exception;

/**
* Initialization method for the function. It is called before the actual working methods (like
* <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
Expand All @@ -100,24 +54,14 @@ public interface RichFunction extends Function {
* }
* }</pre>
*
* <p>By default, this method does nothing.
*
* <p>1. If you implement {@code open(OpenContext openContext)}, the {@code open(OpenContext
* openContext)} will be invoked and the {@code open(Configuration parameters)} won't be
* invoked. 2. If you don't implement {@code open(OpenContext openContext)}, the {@code
* open(Configuration parameters)} will be invoked in the default implementation of the {@code
* open(OpenContext openContext)}.
*
* @param openContext The context containing information about the context in which the function
* is opened.
* @throws Exception Implementations may forward exceptions, which are caught by the runtime.
* When the runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*/
@PublicEvolving
default void open(OpenContext openContext) throws Exception {
open(new Configuration());
}
void open(OpenContext openContext) throws Exception;

/**
* Tear-down method for the user code. It is called after the last call to the main working
Expand Down
Loading

0 comments on commit f181e17

Please sign in to comment.