Skip to content

Commit

Permalink
fixup! aa
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Sep 24, 2024
1 parent bfef897 commit 75e1998
Show file tree
Hide file tree
Showing 12 changed files with 15 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;

import java.io.Serializable;

Expand Down Expand Up @@ -68,9 +67,6 @@ public IterationRuntimeContext getIterationRuntimeContext() {
// Default life cycle methods
// --------------------------------------------------------------------------------------------

@Override
public void open(Configuration parameters) throws Exception {}

@Override
public void open(OpenContext openContext) throws Exception {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;

/**
* An base interface for all rich user-defined functions. This class defines methods for the life
Expand All @@ -30,51 +29,6 @@
@Public
public interface RichFunction extends Function {

/**
* Initialization method for the function. It is called before the actual working methods (like
* <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
* part of an iteration, this method will be invoked at the beginning of each iteration
* superstep.
*
* <p>The configuration object passed to the function can be used for configuration and
* initialization. The configuration contains all parameters that were configured on the
* function in the program composition.
*
* <pre>{@code
* public class MyFilter extends RichFilterFunction<String> {
*
* private String searchString;
*
* public void open(Configuration parameters) {
* this.searchString = parameters.getString("foo");
* }
*
* public boolean filter(String value) {
* return value.equals(searchString);
* }
* }
* }</pre>
*
* <p>By default, this method does nothing.
*
* @param parameters The configuration containing the parameters attached to the contract.
* @throws Exception Implementations may forward exceptions, which are caught by the runtime.
* When the runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
* @see org.apache.flink.configuration.Configuration
* @deprecated This method is deprecated since Flink 1.19. The users are recommended to
* implement {@code open(OpenContext openContext)} and implement {@code open(Configuration
* parameters)} with an empty body instead. 1. If you implement {@code open(OpenContext
* openContext)}, the {@code open(OpenContext openContext)} will be invoked and the {@code
* open(Configuration parameters)} won't be invoked. 2. If you don't implement {@code
* open(OpenContext openContext)}, the {@code open(Configuration parameters)} will be
* invoked in the default implementation of the {@code open(OpenContext openContext)}.
* @see <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231">
* FLIP-344: Remove parameter in RichFunction#open </a>
*/
@Deprecated
void open(Configuration parameters) throws Exception;

/**
* Initialization method for the function. It is called before the actual working methods (like
* <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@

package org.apache.flink.state.api.functions;

import java.util.Set;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.util.Set;

/**
* A function that processes keys from a restored operator
*
* <p>For every key {@link #readKey(Object, Context, Collector)} is invoked. This can produce zero
* or more elements as output.
*
* <p><b>NOTE:</b> State descriptors must be eagerly registered in {@code open(Configuration)}. Any
* <p><b>NOTE:</b> State descriptors must be eagerly registered in {@code open(OpenContext)}. Any
* attempt to dynamically register states inside of {@code readKey} will result in a {@code
* RuntimeException}.
*
Expand All @@ -51,15 +50,6 @@ public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunct

private static final long serialVersionUID = 3873843034140417407L;

/**
* Initialization method for the function. It is called before {@link #readKey(Object, Context,
* Collector)} and thus suitable for one time setup work.
*
* <p>This is the only method that my register state descriptors within a {@code
* KeyedStateReaderFunction}.
*/
public abstract void open(Configuration parameters) throws Exception;

/**
* Process one key from the restored state backend.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
Expand Down Expand Up @@ -110,12 +109,6 @@ public void open(OpenContext openContext) {
state = getRuntimeContext().getState(stateDescriptor);
}

@Override
public void open(Configuration parameters) throws Exception {
throw new UnsupportedOperationException(
"This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead.");
}

@Override
public void readKey(String key, Context ctx, Collector<Tuple2<String, String>> out)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,6 @@ public void open(OpenContext openContext) {
state = getRuntimeContext().getState(valueState);
}

@Override
public void open(Configuration parameters) {
throw new UnsupportedOperationException(
"This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead.");
}

@Override
public void readKey(Integer key, Context ctx, Collector<Pojo> out) throws Exception {
Pojo pojo = new Pojo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,6 @@ public void open(OpenContext openContext) {
state = getRuntimeContext().getState(stateDescriptor);
}

@Override
public void open(Configuration parameters) throws Exception {
throw new UnsupportedOperationException(
"This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead.");
}

@Override
public void readKey(
Integer key, KeyedStateReaderFunction.Context ctx, Collector<Integer> out)
Expand All @@ -284,12 +278,6 @@ public void open(OpenContext openContext) {
state = getRuntimeContext().getState(stateDescriptor);
}

@Override
public void open(Configuration parameters) throws Exception {
throw new UnsupportedOperationException(
"This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead.");
}

@Override
public void readKey(
Integer key, KeyedStateReaderFunction.Context ctx, Collector<Integer> out)
Expand All @@ -306,12 +294,6 @@ public void open(OpenContext openContext) {
getRuntimeContext().getState(stateDescriptor);
}

@Override
public void open(Configuration parameters) throws Exception {
throw new UnsupportedOperationException(
"This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead.");
}

@Override
public void readKey(
Integer key, KeyedStateReaderFunction.Context ctx, Collector<Integer> out)
Expand Down Expand Up @@ -360,12 +342,6 @@ public void open(OpenContext openContext) {
state = getRuntimeContext().getState(stateDescriptor);
}

@Override
public void open(Configuration parameters) throws Exception {
throw new UnsupportedOperationException(
"This method is deprecated and shouldn't be invoked. Please use open(OpenContext) instead.");
}

@Override
public void readKey(
Integer key, KeyedStateReaderFunction.Context ctx, Collector<Integer> out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.flink.table.planner.codegen

import org.apache.flink.api.common.functions.DefaultOpenContext
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.codegen.Indenter.toISC
Expand Down Expand Up @@ -180,7 +181,7 @@ object CollectorCodeGenerator {
s"""
|$collectorTerm = new ${generatedCollector.getClassName}();
|$collectorTerm.setRuntimeContext(getRuntimeContext());
|$collectorTerm.open(new ${className[Configuration]}());
|$collectorTerm.open(new ${className[DefaultOpenContext]}());
|""".stripMargin
ctx.addReusableOpenStatement(openCollector)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.codegen

import org.apache.flink.api.common.functions.{MapFunction, OpenContext, RichMapFunction}
import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction, OpenContext, RichMapFunction}
import org.apache.flink.configuration.{Configuration, PipelineOptions, ReadableConfig}
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.data.{DecimalData, GenericRowData, TimestampData}
Expand All @@ -33,7 +33,6 @@ import org.apache.flink.table.planner.utils.Logging
import org.apache.flink.table.planner.utils.TimestampStringUtils.fromLocalDateTime
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical.RowType

import org.apache.calcite.avatica.util.ByteString
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeName
Expand Down Expand Up @@ -102,12 +101,9 @@ class ExpressionReducer(
throw new TableException("RichMapFunction[GenericRowData, GenericRowData] required here")
}

val parameters = toScala(tableConfig.getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS))
.map(Configuration.fromMap)
.getOrElse(new Configuration)
val reduced =
try {
richMapFunction.open(parameters)
richMapFunction.open(DefaultOpenContext.INSTANCE)
// execute
richMapFunction.map(EMPTY_ROW)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ object FunctionCodeGenerator {
${ctx.reuseConstructorCode(funcName)}

@Override
public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception {
public void open(${classOf[OpenContext].getCanonicalName} context) throws Exception {
${ctx.reuseOpenCode()}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
*/
package org.apache.flink.table.planner.codegen

import org.apache.flink.api.common.functions.DefaultOpenContext
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.metrics.Gauge
import org.apache.flink.table.data.{RowData, TimestampData}
import org.apache.flink.table.data.utils.JoinedRowData
import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{generateCollect, INPUT_SELECTION}
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{INPUT_SELECTION, generateCollect}
import org.apache.flink.table.runtime.generated.{GeneratedJoinCondition, GeneratedProjection}
import org.apache.flink.table.runtime.hashtable.{LongHashPartition, LongHybridHashTable, ProbeIterator}
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
Expand Down Expand Up @@ -157,7 +158,7 @@ object LongHashJoinGenerator {
val condRefs = ctx.addReusableObject(condFunc.getReferences, "condRefs")
ctx.addReusableInitStatement(s"condFunc = new ${condFunc.getClassName}($condRefs);")
ctx.addReusableOpenStatement(s"condFunc.setRuntimeContext(getRuntimeContext());")
ctx.addReusableOpenStatement(s"condFunc.open(new ${className[Configuration]}());")
ctx.addReusableOpenStatement(s"condFunc.open(new ${className[DefaultOpenContext]}());")
ctx.addReusableCloseStatement(s"condFunc.close();")

val leftIsBuildTerm = newName(ctx, "leftIsBuild")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.table.planner.expressions.utils

import org.apache.flink.api.common.{TaskInfo, TaskInfoImpl}
import org.apache.flink.api.common.functions.{MapFunction, RichFunction, RichMapFunction}
import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction, OpenContext, RichFunction, RichMapFunction}
import org.apache.flink.api.common.functions.util.RuntimeUDFContext
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.configuration.Configuration
Expand All @@ -45,7 +45,6 @@ import org.apache.flink.table.types.{AbstractDataType, DataType}
import org.apache.flink.table.types.logical.{RowType, VarCharType}
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.types.Row

import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.logical.LogicalCalc
Expand All @@ -58,9 +57,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}

import javax.annotation.Nullable

import java.util.Collections

import scala.collection.JavaConverters._
import scala.collection.mutable

Expand Down Expand Up @@ -241,7 +238,7 @@ abstract class ExpressionTestBase(isStreaming: Boolean = true) {
Collections.emptyMap(),
null)
richMapper.setRuntimeContext(t)
richMapper.open(new Configuration())
richMapper.open(DefaultOpenContext.INSTANCE)
}

val testRow = if (containsLegacyTypes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
Expand Down Expand Up @@ -332,7 +332,7 @@ public static final class TestingPreFilterCondition extends AbstractRichFunction
private static final long serialVersionUID = 1L;

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext context) throws Exception {
// do nothing
}

Expand Down

0 comments on commit 75e1998

Please sign in to comment.