diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md index a58b5a0807dcf..e4d8c3b9c13d6 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md @@ -160,7 +160,7 @@ public class CountWindowAverage extends RichFlatMapFunction, } @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( "average", // the state name diff --git a/docs/content.zh/docs/learn-flink/etl.md b/docs/content.zh/docs/learn-flink/etl.md index 591d6899ef9f9..b5e87ba44c1ed 100644 --- a/docs/content.zh/docs/learn-flink/etl.md +++ b/docs/content.zh/docs/learn-flink/etl.md @@ -236,7 +236,7 @@ minutesByStartCell 对其中的每一个接口,Flink 同样提供了一个所谓 "rich" 的变体,如 `RichFlatMapFunction`,其中增加了以下方法,包括: -- `open(Configuration c)` +- `open(OpenContext context)` - `close()` - `getRuntimeContext()` @@ -280,7 +280,7 @@ public static class Deduplicator extends RichFlatMapFunction { ValueState keyHasBeenSeen; @Override - public void open(Configuration conf) { + public void open(OpenContext ctx) { ValueStateDescriptor desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN); keyHasBeenSeen = getRuntimeContext().getState(desc); } @@ -373,7 +373,7 @@ public static class ControlFunction extends RichCoFlatMapFunction blocked; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { blocked = getRuntimeContext() .getState(new ValueStateDescriptor<>("blocked", Boolean.class)); } diff --git a/docs/content.zh/docs/learn-flink/event_driven.md b/docs/content.zh/docs/learn-flink/event_driven.md index 9037bc153ff7d..f95cf42eb16ea 100644 --- a/docs/content.zh/docs/learn-flink/event_driven.md +++ b/docs/content.zh/docs/learn-flink/event_driven.md @@ -79,7 +79,7 @@ public static class PseudoWindow extends @Override // 在初始化期间调用一次。 - public void open(Configuration conf) { + public void open(OpenContext ctx) { . . . } @@ -126,7 +126,7 @@ public static class PseudoWindow extends private transient MapState sumOfTips; @Override -public void open(Configuration conf) { +public void open(OpenContext ctx) { MapStateDescriptor sumDesc = new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md index e461044dddf8c..b9ab7201dcefd 100644 --- a/docs/content.zh/docs/ops/metrics.md +++ b/docs/content.zh/docs/ops/metrics.md @@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction { private transient Counter counter; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); @@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction { private transient Counter counter; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCustomCounter", new CustomCounter()); @@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction { private transient int valueToExpose = 0; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { getRuntimeContext() .getMetricGroup() .gauge("MyGauge", new Gauge() { @@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction { private transient Histogram histogram; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { this.histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new MyHistogram()); @@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction { private transient Histogram histogram; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)); @@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction { private transient Meter meter; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new MyMeter()); @@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction { private transient Meter meter; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter(); this.meter = getRuntimeContext() diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state.md b/docs/content/docs/dev/datastream/fault-tolerance/state.md index 97115f7bba0da..2486212055ed1 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/state.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/state.md @@ -192,7 +192,7 @@ public class CountWindowAverage extends RichFlatMapFunction, } @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( "average", // the state name diff --git a/docs/content/docs/learn-flink/etl.md b/docs/content/docs/learn-flink/etl.md index aa1423edb3597..113bc6baf5760 100644 --- a/docs/content/docs/learn-flink/etl.md +++ b/docs/content/docs/learn-flink/etl.md @@ -274,7 +274,7 @@ Abstract Method pattern. For each of these interfaces, Flink also provides a so-called "rich" variant, e.g., `RichFlatMapFunction`, which has some additional methods, including: -- `open(Configuration c)` +- `open(OpenContext context)` - `close()` - `getRuntimeContext()` @@ -329,7 +329,7 @@ public static class Deduplicator extends RichFlatMapFunction { ValueState keyHasBeenSeen; @Override - public void open(Configuration conf) { + public void open(OpenContext ctx) { ValueStateDescriptor desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN); keyHasBeenSeen = getRuntimeContext().getState(desc); } @@ -447,7 +447,7 @@ public static class ControlFunction extends RichCoFlatMapFunction blocked; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { blocked = getRuntimeContext() .getState(new ValueStateDescriptor<>("blocked", Boolean.class)); } diff --git a/docs/content/docs/learn-flink/event_driven.md b/docs/content/docs/learn-flink/event_driven.md index e03d4598e20b5..173b0829d31f0 100644 --- a/docs/content/docs/learn-flink/event_driven.md +++ b/docs/content/docs/learn-flink/event_driven.md @@ -75,7 +75,7 @@ public static class PseudoWindow extends @Override // Called once during initialization. - public void open(Configuration conf) { + public void open(OpenContext ctx) { . . . } @@ -116,7 +116,7 @@ Things to be aware of: private transient MapState sumOfTips; @Override -public void open(Configuration conf) { +public void open(OpenContext ctx) { MapStateDescriptor sumDesc = new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md index 5ab5cb2118087..3347bd48c0a03 100644 --- a/docs/content/docs/ops/metrics.md +++ b/docs/content/docs/ops/metrics.md @@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction { private transient Counter counter; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); @@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction { private transient Counter counter; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCustomCounter", new CustomCounter()); @@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction { private transient int valueToExpose = 0; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { getRuntimeContext() .getMetricGroup() .gauge("MyGauge", new Gauge() { @@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction { private transient Histogram histogram; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { this.histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new MyHistogram()); @@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction { private transient Histogram histogram; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)); @@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction { private transient Meter meter; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new MyMeter()); @@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction { private transient Meter meter; @Override - public void open(Configuration config) { + public void open(OpenContext ctx) { com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter(); this.meter = getRuntimeContext() diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java index 7a25c4abc7206..d4ae07068d327 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java @@ -19,7 +19,6 @@ package org.apache.flink.api.common.functions; import org.apache.flink.annotation.Public; -import org.apache.flink.configuration.Configuration; import java.io.Serializable; @@ -69,7 +68,7 @@ public IterationRuntimeContext getIterationRuntimeContext() { // -------------------------------------------------------------------------------------------- @Override - public void open(Configuration parameters) throws Exception {} + public void open(OpenContext openContext) throws Exception {} @Override public void close() throws Exception {} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java index e99e68a078d81..42ef2a8af4081 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java @@ -41,7 +41,7 @@ * * private Map map; * - * public void open(Configuration cfg) throws Exception { + * public void open(OpenContext ctx) throws Exception { * getRuntimeContext().getBroadcastVariableWithInitializer("mapvar", * new BroadcastVariableInitializer, Map>() { * diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java index 4ff5484b3b087..ff07bfc181693 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java @@ -18,12 +18,12 @@ package org.apache.flink.api.common.functions; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Public; /** * The {@link OpenContext} interface provides necessary information required by the {@link * RichFunction} when it is opened. The {@link OpenContext} is currently empty because it can be * used to add more methods without affecting the signature of {@code RichFunction#open}. */ -@PublicEvolving +@Public public interface OpenContext {} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java index ae83fb30f2bd2..f1aa70a97d953 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; /** * An base interface for all rich user-defined functions. This class defines methods for the life @@ -30,51 +29,6 @@ @Public public interface RichFunction extends Function { - /** - * Initialization method for the function. It is called before the actual working methods (like - * map or join) and thus suitable for one time setup work. For functions that are - * part of an iteration, this method will be invoked at the beginning of each iteration - * superstep. - * - *

The configuration object passed to the function can be used for configuration and - * initialization. The configuration contains all parameters that were configured on the - * function in the program composition. - * - *

{@code
-     * public class MyFilter extends RichFilterFunction {
-     *
-     *     private String searchString;
-     *
-     *     public void open(Configuration parameters) {
-     *         this.searchString = parameters.getString("foo");
-     *     }
-     *
-     *     public boolean filter(String value) {
-     *         return value.equals(searchString);
-     *     }
-     * }
-     * }
- * - *

By default, this method does nothing. - * - * @param parameters The configuration containing the parameters attached to the contract. - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. - * When the runtime catches an exception, it aborts the task and lets the fail-over logic - * decide whether to retry the task execution. - * @see org.apache.flink.configuration.Configuration - * @deprecated This method is deprecated since Flink 1.19. The users are recommended to - * implement {@code open(OpenContext openContext)} and implement {@code open(Configuration - * parameters)} with an empty body instead. 1. If you implement {@code open(OpenContext - * openContext)}, the {@code open(OpenContext openContext)} will be invoked and the {@code - * open(Configuration parameters)} won't be invoked. 2. If you don't implement {@code - * open(OpenContext openContext)}, the {@code open(Configuration parameters)} will be - * invoked in the default implementation of the {@code open(OpenContext openContext)}. - * @see - * FLIP-344: Remove parameter in RichFunction#open - */ - @Deprecated - void open(Configuration parameters) throws Exception; - /** * Initialization method for the function. It is called before the actual working methods (like * map or join) and thus suitable for one time setup work. For functions that are @@ -100,14 +54,6 @@ public interface RichFunction extends Function { * } * } * - *

By default, this method does nothing. - * - *

1. If you implement {@code open(OpenContext openContext)}, the {@code open(OpenContext - * openContext)} will be invoked and the {@code open(Configuration parameters)} won't be - * invoked. 2. If you don't implement {@code open(OpenContext openContext)}, the {@code - * open(Configuration parameters)} will be invoked in the default implementation of the {@code - * open(OpenContext openContext)}. - * * @param openContext The context containing information about the context in which the function * is opened. * @throws Exception Implementations may forward exceptions, which are caught by the runtime. @@ -115,9 +61,7 @@ public interface RichFunction extends Function { * decide whether to retry the task execution. */ @PublicEvolving - default void open(OpenContext openContext) throws Exception { - open(new Configuration()); - } + void open(OpenContext openContext) throws Exception; /** * Tear-down method for the user code. It is called after the last call to the main working diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index eb94f65a2f84b..b36c0adaa2ad7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -232,7 +232,7 @@ C getBroadcastVariableWithInitializer( * * private ValueState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getState( * new ValueStateDescriptor("count", LongSerializer.INSTANCE, 0L)); * } @@ -269,7 +269,7 @@ C getBroadcastVariableWithInitializer( * * private ListState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getListState( * new ListStateDescriptor<>("myState", MyType.class)); * } @@ -310,7 +310,7 @@ C getBroadcastVariableWithInitializer( * * private ReducingState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getReducingState( * new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class)); * } @@ -348,7 +348,7 @@ C getBroadcastVariableWithInitializer( * * private AggregatingState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getAggregatingState( * new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class)); * } @@ -388,7 +388,7 @@ AggregatingState getAggregatingState( * * private MapState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getMapState( * new MapStateDescriptor<>("sum", MyType.class, Long.class)); * } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/WithConfigurationOpenContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/WithConfigurationOpenContext.java new file mode 100644 index 0000000000000..7ce3cbcaa30fa --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/WithConfigurationOpenContext.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; + +/** A special {@link OpenContext} for passing configuration to udf. */ +@PublicEvolving +public class WithConfigurationOpenContext implements OpenContext { + private final Configuration configuration; + + public WithConfigurationOpenContext(Configuration configuration) { + this.configuration = configuration; + } + + public Configuration getConfiguration() { + return configuration; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java index 18e2d6670311f..8ce0d9c2c647f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java @@ -46,7 +46,7 @@ public interface KeyedStateStore { * * private ValueState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getState( * new ValueStateDescriptor("count", LongSerializer.INSTANCE, 0L)); * } @@ -83,7 +83,7 @@ public interface KeyedStateStore { * * private ListState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getListState( * new ListStateDescriptor<>("myState", MyType.class)); * } @@ -124,7 +124,7 @@ public interface KeyedStateStore { * * private ReducingState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getReducingState( * new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class)); * } @@ -162,7 +162,7 @@ public interface KeyedStateStore { * * private AggregatingState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getAggregatingState( * new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class)); * } @@ -202,7 +202,7 @@ AggregatingState getAggregatingState( * * private MapState state; * - * public void open(Configuration cfg) { + * public void open(OpenContext ctx) { * state = getRuntimeContext().getMapState( * new MapStateDescriptor<>("sum", MyType.class, Long.class)); * } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java index 4aaf582ecc5db..e5f609500d2f1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; /** @@ -69,12 +68,6 @@ public IterationRuntimeContext getIterationRuntimeContext() { @Override public void open(OpenContext openContext) throws Exception {} - @Override - public void open(Configuration parameters) throws Exception { - throw new UnsupportedOperationException( - "This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead."); - } - @Override public void close() throws Exception {} } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java index 5bbf8c981fafa..652d9253d4513 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.OpenContext; -import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import java.util.Set; @@ -32,7 +31,7 @@ *

For every key {@link #readKey(Object, Context, Collector)} is invoked. This can produce zero * or more elements as output. * - *

NOTE: State descriptors must be eagerly registered in {@code open(Configuration)}. Any + *

NOTE: State descriptors must be eagerly registered in {@code open(OpenContext)}. Any * attempt to dynamically register states inside of {@code readKey} will result in a {@code * RuntimeException}. * @@ -51,15 +50,6 @@ public abstract class KeyedStateReaderFunction extends AbstractRichFunct private static final long serialVersionUID = 3873843034140417407L; - /** - * Initialization method for the function. It is called before {@link #readKey(Object, Context, - * Collector)} and thus suitable for one time setup work. - * - *

This is the only method that my register state descriptors within a {@code - * KeyedStateReaderFunction}. - */ - public abstract void open(Configuration parameters) throws Exception; - /** * Process one key from the restored state backend. * diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java index 17845da02efa6..ce4094a4d0b88 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.StateBackend; @@ -110,12 +109,6 @@ public void open(OpenContext openContext) { state = getRuntimeContext().getState(stateDescriptor); } - @Override - public void open(Configuration parameters) throws Exception { - throw new UnsupportedOperationException( - "This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead."); - } - @Override public void readKey(String key, Context ctx, Collector> out) throws Exception { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java index 75f9f6f3deefe..de3bdcd5f3afa 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java @@ -112,12 +112,6 @@ public void open(OpenContext openContext) { state = getRuntimeContext().getState(valueState); } - @Override - public void open(Configuration parameters) { - throw new UnsupportedOperationException( - "This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead."); - } - @Override public void readKey(Integer key, Context ctx, Collector out) throws Exception { Pojo pojo = new Pojo(); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java index c287cdb5598c7..9b9f845bba175 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java @@ -262,12 +262,6 @@ public void open(OpenContext openContext) { state = getRuntimeContext().getState(stateDescriptor); } - @Override - public void open(Configuration parameters) throws Exception { - throw new UnsupportedOperationException( - "This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead."); - } - @Override public void readKey( Integer key, KeyedStateReaderFunction.Context ctx, Collector out) @@ -284,12 +278,6 @@ public void open(OpenContext openContext) { state = getRuntimeContext().getState(stateDescriptor); } - @Override - public void open(Configuration parameters) throws Exception { - throw new UnsupportedOperationException( - "This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead."); - } - @Override public void readKey( Integer key, KeyedStateReaderFunction.Context ctx, Collector out) @@ -306,12 +294,6 @@ public void open(OpenContext openContext) { getRuntimeContext().getState(stateDescriptor); } - @Override - public void open(Configuration parameters) throws Exception { - throw new UnsupportedOperationException( - "This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead."); - } - @Override public void readKey( Integer key, KeyedStateReaderFunction.Context ctx, Collector out) @@ -360,12 +342,6 @@ public void open(OpenContext openContext) { state = getRuntimeContext().getState(stateDescriptor); } - @Override - public void open(Configuration parameters) throws Exception { - throw new UnsupportedOperationException( - "This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead."); - } - @Override public void readKey( Integer key, KeyedStateReaderFunction.Context ctx, Collector out) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index 658e026f63093..9388d88cb15cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -1496,7 +1496,7 @@ public static void openUserCode(Function stub, Configuration parameters) throws FunctionUtils.openFunction(stub, DefaultOpenContext.INSTANCE); } catch (Throwable t) { throw new Exception( - "The user defined 'open(Configuration)' method in " + "The user defined 'open(OpenContext)' method in " + stub.getClass().toString() + " caused an exception: " + t.getMessage(), diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java index af3a2c7a9ff57..f46855cc5599c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java @@ -131,7 +131,7 @@ * * private ValueState count; * - * public void open(Configuration cfg) throws Exception { + * public void open(OpenContext ctx) throws Exception { * count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class)); * } * diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java index c0894e5241447..f7c8b2c55e911 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.io.CleanupWhenUnsuccessful; import org.apache.flink.api.common.io.OutputFormat; @@ -57,26 +58,10 @@ public OutputFormatSinkFunction(OutputFormat format) { this.format = format; } - /** - * Initialization method for the {@link OutputFormatSinkFunction}. - * - * @param parameters The configuration containing the parameters attached to the contract. - * @throws Exception if an error happens. - * @deprecated This method is deprecated since Flink 1.19. The users are recommended to - * implement {@code open(OpenContext openContext)} and override {@code open(Configuration - * parameters)} with an empty body instead. 1. If you implement {@code open(OpenContext - * openContext)}, the {@code open(OpenContext openContext)} will be invoked and the {@code - * open(Configuration parameters)} won't be invoked. 2. If you don't implement {@code - * open(OpenContext openContext)}, the {@code open(Configuration parameters)} will be - * invoked in the default implementation of the {@code open(OpenContext openContext)}. - * @see - * FLIP-344: Remove parameter in RichFunction#open - */ - @Deprecated @Override - public void open(Configuration parameters) throws Exception { + public void open(OpenContext openContext) throws Exception { RuntimeContext context = getRuntimeContext(); - format.configure(parameters); + format.configure(new Configuration()); int indexInSubtaskGroup = context.getTaskInfo().getIndexOfThisSubtask(); int currentNumberOfSubtasks = context.getTaskInfo().getNumberOfParallelSubtasks(); format.open( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java index e9c7b546e9184..5ece13b2a54bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java @@ -19,8 +19,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; /** @@ -69,25 +69,9 @@ public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) { writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr); } - /** - * Initialization method for the {@link PrintSinkFunction}. - * - * @param parameters The configuration containing the parameters attached to the contract. - * @throws Exception if an error happens. - * @deprecated This method is deprecated since Flink 1.19. The users are recommended to - * implement {@code open(OpenContext openContext)} and override {@code open(Configuration - * parameters)} with an empty body instead. 1. If you implement {@code open(OpenContext - * openContext)}, the {@code open(OpenContext openContext)} will be invoked and the {@code - * open(Configuration parameters)} won't be invoked. 2. If you don't implement {@code - * open(OpenContext openContext)}, the {@code open(Configuration parameters)} will be - * invoked in the default implementation of the {@code open(OpenContext openContext)}. - * @see - * FLIP-344: Remove parameter in RichFunction#open - */ - @Deprecated @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); + public void open(OpenContext openContext) throws Exception { + super.open(openContext); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); writer.open( context.getTaskInfo().getIndexOfThisSubtask(), diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java index 40162f3ca357f..d673652646694 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.Configuration; import org.apache.flink.util.SerializableObject; import org.slf4j.Logger; @@ -129,25 +129,8 @@ public SocketClientSink( // ------------------------------------------------------------------------ // Life cycle // ------------------------------------------------------------------------ - - /** - * Initialize the connection with the Socket in the server. - * - * @param parameters The configuration containing the parameters attached to the contract. - * @throws Exception if an error happens. - * @deprecated This method is deprecated since Flink 1.19. The users are recommended to - * implement {@code open(OpenContext openContext)} and override {@code open(Configuration - * parameters)} with an empty body instead. 1. If you implement {@code open(OpenContext - * openContext)}, the {@code open(OpenContext openContext)} will be invoked and the {@code - * open(Configuration parameters)} won't be invoked. 2. If you don't implement {@code - * open(OpenContext openContext)}, the {@code open(Configuration parameters)} will be - * invoked in the default implementation of the {@code open(OpenContext openContext)}. - * @see - * FLIP-344: Remove parameter in RichFunction#open - */ - @Deprecated @Override - public void open(Configuration parameters) throws Exception { + public void open(OpenContext openContext) throws Exception { try { synchronized (lock) { createConnection(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java index 7896221295176..a44e60c7e0917 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.util.SplittableIterator; import java.util.Iterator; @@ -46,24 +46,8 @@ public FromSplittableIteratorFunction(SplittableIterator iterator) { this.fullIterator = iterator; } - /** - * Initialization method for the {@link FromSplittableIteratorFunction}. - * - * @param parameters The configuration containing the parameters attached to the contract. - * @throws Exception if an error happens. - * @deprecated This method is deprecated since Flink 1.19. The users are recommended to - * implement {@code open(OpenContext openContext)} and override {@code open(Configuration - * parameters)} with an empty body instead. 1. If you implement {@code open(OpenContext - * openContext)}, the {@code open(OpenContext openContext)} will be invoked and the {@code - * open(Configuration parameters)} won't be invoked. 2. If you don't implement {@code - * open(OpenContext openContext)}, the {@code open(Configuration parameters)} will be - * invoked in the default implementation of the {@code open(OpenContext openContext)}. - * @see - * FLIP-344: Remove parameter in RichFunction#open - */ - @Deprecated @Override - public void open(Configuration parameters) throws Exception { + public void open(OpenContext openContext) throws Exception { int numberOfSubTasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); int indexofThisSubTask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask]; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index e91f7e9fc03f7..bf76308424d2f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -114,7 +114,7 @@ class AbstractUdfStreamOperatorLifecycleTest { private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" - + ", open[class org.apache.flink.configuration.Configuration], open[interface org.apache.flink.api.common.functions.OpenContext], setRuntimeContext[interface " + + ", open[interface org.apache.flink.api.common.functions.OpenContext], setRuntimeContext[interface " + "org.apache.flink.api.common.functions.RuntimeContext]]"; private static final List ACTUAL_ORDER_TRACKING = diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java index aa18c5608efd4..5facea46ab485 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java @@ -20,7 +20,9 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.externalresource.ExternalResourceInfo; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.WithConfigurationOpenContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -62,10 +64,16 @@ public class FunctionContext { public FunctionContext( @Nullable RuntimeContext context, @Nullable ClassLoader userClassLoader, - @Nullable Configuration jobParameters) { + @Nullable OpenContext openContext) { this.context = context; this.userClassLoader = userClassLoader; - this.jobParameters = jobParameters != null ? jobParameters.toMap() : null; + if (openContext instanceof WithConfigurationOpenContext) { + Configuration configuration = + ((WithConfigurationOpenContext) openContext).getConfiguration(); + this.jobParameters = configuration.toMap(); + } else { + this.jobParameters = null; + } } public FunctionContext(RuntimeContext context) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala index bd9e4d8f2534c..5fcede82e7d6f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.table.planner.codegen +import org.apache.flink.api.common.functions.{DefaultOpenContext, OpenContext} import org.apache.flink.configuration.Configuration import org.apache.flink.table.planner.codegen.CodeGenUtils._ import org.apache.flink.table.planner.codegen.Indenter.toISC @@ -74,7 +75,7 @@ object CollectorCodeGenerator { } @Override - public void open(${className[Configuration]} parameters) throws Exception { + public void open(${className[OpenContext]} openContext) throws Exception { ${ctx.reuseOpenCode()} } @@ -142,7 +143,7 @@ object CollectorCodeGenerator { } @Override - public void open(${className[Configuration]} parameters) throws Exception { + public void open(${className[OpenContext]} openContext) throws Exception { ${ctx.reuseOpenCode()} } @@ -180,7 +181,7 @@ object CollectorCodeGenerator { s""" |$collectorTerm = new ${generatedCollector.getClassName}(); |$collectorTerm.setRuntimeContext(getRuntimeContext()); - |$collectorTerm.open(new ${className[Configuration]}()); + |$collectorTerm.open(new ${className[DefaultOpenContext]}()); |""".stripMargin ctx.addReusableOpenStatement(openCollector) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala index 63616c0a409a2..11a4ac1f15b97 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.table.planner.codegen -import org.apache.flink.api.common.functions.{MapFunction, OpenContext, RichMapFunction} +import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction, OpenContext, RichMapFunction, WithConfigurationOpenContext} import org.apache.flink.configuration.{Configuration, PipelineOptions, ReadableConfig} import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.data.{DecimalData, GenericRowData, TimestampData} @@ -107,7 +107,7 @@ class ExpressionReducer( .getOrElse(new Configuration) val reduced = try { - richMapFunction.open(parameters) + richMapFunction.open(new WithConfigurationOpenContext(parameters)) // execute richMapFunction.map(EMPTY_ROW) } catch { @@ -315,7 +315,7 @@ class ConstantCodeGeneratorContext(tableConfig: ReadableConfig, classLoader: Cla super.addReusableFunction( function, classOf[FunctionContext], - Seq("null", "this.getClass().getClassLoader()", "parameters")) + Seq("null", "this.getClass().getClassLoader()", "openContext")) } override def addReusableConverter(dataType: DataType, classLoaderTerm: String = null): String = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala index 76a5c4ca46033..3b5943d6ab631 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala @@ -154,7 +154,7 @@ object FunctionCodeGenerator { ${ctx.reuseConstructorCode(funcName)} @Override - public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception { + public void open(${classOf[OpenContext].getCanonicalName} openContext) throws Exception { ${ctx.reuseOpenCode()} } @@ -230,7 +230,7 @@ object FunctionCodeGenerator { ${ctx.reuseConstructorCode(funcName)} @Override - public void open(${className[Configuration]} parameters) throws Exception { + public void open(${className[OpenContext]} openContext) throws Exception { ${ctx.reuseOpenCode()} } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala index fdc7ac8036b4b..4256c90b5960d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.table.planner.codegen +import org.apache.flink.api.common.functions.DefaultOpenContext import org.apache.flink.configuration.{Configuration, ReadableConfig} import org.apache.flink.metrics.Gauge import org.apache.flink.table.data.{RowData, TimestampData} @@ -157,7 +158,7 @@ object LongHashJoinGenerator { val condRefs = ctx.addReusableObject(condFunc.getReferences, "condRefs") ctx.addReusableInitStatement(s"condFunc = new ${condFunc.getClassName}($condRefs);") ctx.addReusableOpenStatement(s"condFunc.setRuntimeContext(getRuntimeContext());") - ctx.addReusableOpenStatement(s"condFunc.open(new ${className[Configuration]}());") + ctx.addReusableOpenStatement(s"condFunc.open(new ${className[DefaultOpenContext]}());") ctx.addReusableCloseStatement(s"condFunc.close();") val leftIsBuildTerm = newName(ctx, "leftIsBuild") diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala index 45a615707f836..5aa5b6c8d2055 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.table.planner.codegen -import org.apache.flink.api.common.functions.{FlatMapFunction, Function} +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, OpenContext} import org.apache.flink.configuration.{Configuration, ReadableConfig} import org.apache.flink.streaming.api.functions.async.AsyncFunction import org.apache.flink.table.api.ValidationException @@ -381,7 +381,7 @@ object LookupJoinCodeGenerator { } @Override - public void open(${className[Configuration]} parameters) throws Exception { + public void open(${className[OpenContext]} openContext) throws Exception { ${ctx.reuseOpenCode()} } @@ -492,7 +492,7 @@ object LookupJoinCodeGenerator { } @Override - public void open(${className[Configuration]} parameters) throws Exception { + public void open(${className[OpenContext]} openContext) throws Exception { ${ctx.reuseOpenCode()} } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala index e15c2c8a20514..18f3337902fbc 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.codegen import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier +import org.apache.flink.api.common.functions.OpenContext import org.apache.flink.configuration.{Configuration, ReadableConfig} import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction} import org.apache.flink.table.planner.calcite.FlinkTypeFactory @@ -87,7 +88,7 @@ object WatermarkGeneratorCodeGenerator { } @Override - public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception { + public void open(${classOf[OpenContext].getCanonicalName} openContext) throws Exception { ${ctx.reuseOpenCode()} } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala index 5ad355e19ee30..f04496d4d9061 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.table.planner.codegen.calls -import org.apache.flink.api.common.functions.{AbstractRichFunction, RichFunction} +import org.apache.flink.api.common.functions.{AbstractRichFunction, OpenContext, RichFunction} import org.apache.flink.configuration.{Configuration, ReadableConfig} import org.apache.flink.table.api.{DataTypes, TableException} import org.apache.flink.table.api.Expressions.callSql @@ -630,7 +630,7 @@ object BridgingFunctionGenUtil { | ${ctx.reuseInitCode()} | } | - | public void open(${className[Configuration]} parameters) throws Exception { + | public void open(${className[OpenContext]} openContext) throws Exception { | ${ctx.reuseOpenCode()} | } | diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala index ffbdfec752c02..3f49c9bf11663 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.expressions.utils import org.apache.flink.api.common.{TaskInfo, TaskInfoImpl} -import org.apache.flink.api.common.functions.{MapFunction, RichFunction, RichMapFunction} +import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction, OpenContext, RichFunction, RichMapFunction} import org.apache.flink.api.common.functions.util.RuntimeUDFContext import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.configuration.Configuration @@ -241,7 +241,7 @@ abstract class ExpressionTestBase(isStreaming: Boolean = true) { Collections.emptyMap(), null) richMapper.setRuntimeContext(t) - richMapper.open(new Configuration()) + richMapper.open(DefaultOpenContext.INSTANCE) } val testRow = if (containsLegacyTypes) { diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java index 0b8f2a6fc352f..2851a100cd3c7 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java @@ -20,8 +20,8 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -332,7 +332,7 @@ public static final class TestingPreFilterCondition extends AbstractRichFunction private static final long serialVersionUID = 1L; @Override - public void open(Configuration parameters) throws Exception { + public void open(OpenContext context) throws Exception { // do nothing } diff --git a/pom.xml b/pom.xml index 907b938e62da5..596e9ca4946e8 100644 --- a/pom.xml +++ b/pom.xml @@ -2339,6 +2339,19 @@ under the License. org.apache.flink.configuration.Configuration#getBytes(java.lang.String,byte[]) org.apache.flink.configuration.Configuration#setBytes(java.lang.String,byte[]) + + org.apache.flink.api.common.functions.AbstractRichFunction#open(org.apache.flink.configuration.Configuration) + org.apache.flink.api.common.functions.RichCoGroupFunction + org.apache.flink.api.common.functions.RichCrossFunction + org.apache.flink.api.common.functions.RichFilterFunction + org.apache.flink.api.common.functions.RichFlatJoinFunction + org.apache.flink.api.common.functions.RichFlatMapFunction + org.apache.flink.api.common.functions.RichGroupCombineFunction + org.apache.flink.api.common.functions.RichGroupReduceFunction + org.apache.flink.api.common.functions.RichJoinFunction + org.apache.flink.api.common.functions.RichMapFunction + org.apache.flink.api.common.functions.RichMapPartitionFunction + org.apache.flink.api.common.functions.RichReduceFunction org.apache.flink.configuration.ConfigConstants