Skip to content

Commit

Permalink
use openContext name
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Sep 26, 2024
1 parent 6c3275e commit 4971601
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;

/** A special {@link OpenContext} for passing configuration to udf. */
@PublicEvolving
public class WithConfigurationOpenContext implements OpenContext {
private final Configuration configuration;

public WithConfigurationOpenContext(Configuration configuration) {
this.configuration = configuration;
}

public Configuration getConfiguration() {
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.WithConfigurationOpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
Expand Down Expand Up @@ -62,10 +64,16 @@ public class FunctionContext {
public FunctionContext(
@Nullable RuntimeContext context,
@Nullable ClassLoader userClassLoader,
@Nullable Configuration jobParameters) {
@Nullable OpenContext openContext) {
this.context = context;
this.userClassLoader = userClassLoader;
this.jobParameters = jobParameters != null ? jobParameters.toMap() : null;
if (openContext instanceof WithConfigurationOpenContext) {
Configuration configuration =
((WithConfigurationOpenContext) openContext).getConfiguration();
this.jobParameters = configuration.toMap();
} else {
this.jobParameters = null;
}
}

public FunctionContext(RuntimeContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object CollectorCodeGenerator {
}

@Override
public void open(${className[OpenContext]} context) throws Exception {
public void open(${className[OpenContext]} openContext) throws Exception {
${ctx.reuseOpenCode()}
}

Expand Down Expand Up @@ -143,7 +143,7 @@ object CollectorCodeGenerator {
}

@Override
public void open(${className[OpenContext]} context) throws Exception {
public void open(${className[OpenContext]} openContext) throws Exception {
${ctx.reuseOpenCode()}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.codegen

import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction, OpenContext, RichMapFunction}
import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction, OpenContext, RichMapFunction, WithConfigurationOpenContext}
import org.apache.flink.configuration.{Configuration, PipelineOptions, ReadableConfig}
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.data.{DecimalData, GenericRowData, TimestampData}
Expand Down Expand Up @@ -102,9 +102,12 @@ class ExpressionReducer(
throw new TableException("RichMapFunction[GenericRowData, GenericRowData] required here")
}

val parameters = toScala(tableConfig.getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS))
.map(Configuration.fromMap)
.getOrElse(new Configuration)
val reduced =
try {
richMapFunction.open(DefaultOpenContext.INSTANCE)
richMapFunction.open(new WithConfigurationOpenContext(parameters))
// execute
richMapFunction.map(EMPTY_ROW)
} catch {
Expand Down Expand Up @@ -312,7 +315,7 @@ class ConstantCodeGeneratorContext(tableConfig: ReadableConfig, classLoader: Cla
super.addReusableFunction(
function,
classOf[FunctionContext],
Seq("null", "this.getClass().getClassLoader()", "context"))
Seq("null", "this.getClass().getClassLoader()", "openContext"))
}

override def addReusableConverter(dataType: DataType, classLoaderTerm: String = null): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ object FunctionCodeGenerator {
${ctx.reuseConstructorCode(funcName)}

@Override
public void open(${classOf[OpenContext].getCanonicalName} context) throws Exception {
public void open(${classOf[OpenContext].getCanonicalName} openContext) throws Exception {
${ctx.reuseOpenCode()}
}

Expand Down Expand Up @@ -230,7 +230,7 @@ object FunctionCodeGenerator {
${ctx.reuseConstructorCode(funcName)}

@Override
public void open(${className[OpenContext]} context) throws Exception {
public void open(${className[OpenContext]} openContext) throws Exception {
${ctx.reuseOpenCode()}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ object LookupJoinCodeGenerator {
}

@Override
public void open(${className[OpenContext]} context) throws Exception {
public void open(${className[OpenContext]} openContext) throws Exception {
${ctx.reuseOpenCode()}
}

Expand Down Expand Up @@ -492,7 +492,7 @@ object LookupJoinCodeGenerator {
}

@Override
public void open(${className[OpenContext]} context) throws Exception {
public void open(${className[OpenContext]} openContext) throws Exception {
${ctx.reuseOpenCode()}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object WatermarkGeneratorCodeGenerator {
}

@Override
public void open(${classOf[OpenContext].getCanonicalName} context) throws Exception {
public void open(${classOf[OpenContext].getCanonicalName} openContext) throws Exception {
${ctx.reuseOpenCode()}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ object BridgingFunctionGenUtil {
| ${ctx.reuseInitCode()}
| }
|
| public void open(${className[OpenContext]} context) throws Exception {
| public void open(${className[OpenContext]} openContext) throws Exception {
| ${ctx.reuseOpenCode()}
| }
|
Expand Down

0 comments on commit 4971601

Please sign in to comment.