Skip to content

Commit ee110aa

Browse files
committed
[FLINK-18445][table] Pre-step: add a new FilterCondition to support generating pre-filter condition for lookup join
1 parent 2c50b4e commit ee110aa

File tree

4 files changed

+179
-8
lines changed

4 files changed

+179
-8
lines changed

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction
2323
import org.apache.flink.streaming.api.functions.async.{AsyncFunction, RichAsyncFunction}
2424
import org.apache.flink.table.planner.codegen.CodeGenUtils._
2525
import org.apache.flink.table.planner.codegen.Indenter.toISC
26-
import org.apache.flink.table.runtime.generated.{GeneratedFunction, GeneratedJoinCondition, JoinCondition}
26+
import org.apache.flink.table.runtime.generated.{FilterCondition, GeneratedFilterCondition, GeneratedFunction, GeneratedJoinCondition, JoinCondition}
2727
import org.apache.flink.table.types.logical.LogicalType
2828

2929
/**
@@ -180,33 +180,46 @@ object FunctionCodeGenerator {
180180
}
181181

182182
/**
183-
* Generates a [[JoinCondition]] that can be passed to Java compiler.
183+
* Generates a [[AbstractRichFunction]] class code that can be passed to Java compiler. Including
184+
* [[JoinCondition]] and [[FilterCondition]] rich functions.
184185
*
185186
* @param ctx
186187
* The context of the code generator
187188
* @param name
188189
* Class name of the Function. Not must be unique but has to be a valid Java class identifier.
190+
* @param clazz
191+
* Function to be generated.
189192
* @param bodyCode
190193
* code contents of the SAM (Single Abstract Method).
191194
* @param input1Term
192195
* the first input term
193196
* @param input2Term
194197
* the second input term.
195198
* @return
196-
* instance of GeneratedJoinCondition
199+
* the generated condition function name and code
197200
*/
198-
def generateJoinCondition(
201+
private def generateCondition[F <: Function](
199202
ctx: CodeGeneratorContext,
200203
name: String,
204+
clazz: Class[F],
201205
bodyCode: String,
202206
input1Term: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
203-
input2Term: String = CodeGenUtils.DEFAULT_INPUT2_TERM): GeneratedJoinCondition = {
207+
input2Term: String = CodeGenUtils.DEFAULT_INPUT2_TERM): (String, String) = {
204208
val funcName = newName(name)
205209

210+
val methodHeader = {
211+
if (clazz == classOf[JoinCondition]) {
212+
s"apply($ROW_DATA $input1Term, $ROW_DATA $input2Term)"
213+
} else if (clazz == classOf[FilterCondition]) {
214+
s"apply($ROW_DATA $input1Term)"
215+
} else {
216+
throw new CodeGenException(s"Unsupported Condition Function $clazz.")
217+
}
218+
}
206219
val funcCode =
207220
j"""
208221
public class $funcName extends ${className[AbstractRichFunction]}
209-
implements ${className[JoinCondition]} {
222+
implements ${clazz.getCanonicalName} {
210223

211224
${ctx.reuseMemberCode()}
212225

@@ -222,7 +235,7 @@ object FunctionCodeGenerator {
222235
}
223236

224237
@Override
225-
public boolean apply($ROW_DATA $input1Term, $ROW_DATA $input2Term) throws Exception {
238+
public boolean $methodHeader throws Exception {
226239
${ctx.reusePerRecordCode()}
227240
${ctx.reuseLocalVariableCode()}
228241
${ctx.reuseInputUnboxingCode()}
@@ -237,6 +250,61 @@ object FunctionCodeGenerator {
237250
}
238251
""".stripMargin
239252

253+
(funcName, funcCode)
254+
}
255+
256+
/**
257+
* Generates a [[JoinCondition]] that can be passed to Java compiler.
258+
*
259+
* @param ctx
260+
* The context of the code generator
261+
* @param name
262+
* Class name of the Function. Not must be unique but has to be a valid Java class identifier.
263+
* @param bodyCode
264+
* code contents of the SAM (Single Abstract Method).
265+
* @param input1Term
266+
* the first input term
267+
* @param input2Term
268+
* the second input term.
269+
* @return
270+
* instance of GeneratedJoinCondition
271+
*/
272+
def generateJoinCondition(
273+
ctx: CodeGeneratorContext,
274+
name: String,
275+
bodyCode: String,
276+
input1Term: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
277+
input2Term: String = CodeGenUtils.DEFAULT_INPUT2_TERM): GeneratedJoinCondition = {
278+
279+
val (funcName, funcCode) =
280+
generateCondition(ctx, name, classOf[JoinCondition], bodyCode, input1Term, input2Term)
281+
240282
new GeneratedJoinCondition(funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
241283
}
284+
285+
/**
286+
* Generates a [[FilterCondition]] that can be passed to Java compiler.
287+
*
288+
* @param ctx
289+
* The context of the code generator
290+
* @param name
291+
* Class name of the Function. Not must be unique but has to be a valid Java class identifier.
292+
* @param bodyCode
293+
* code contents of the SAM (Single Abstract Method).
294+
* @param inputTerm
295+
* the input term
296+
* @return
297+
* instance of GeneratedFilterCondition
298+
*/
299+
def generateFilterCondition(
300+
ctx: CodeGeneratorContext,
301+
name: String,
302+
bodyCode: String,
303+
inputTerm: String = CodeGenUtils.DEFAULT_INPUT_TERM): GeneratedFilterCondition = {
304+
305+
val (funcName, funcCode) =
306+
generateCondition(ctx, name, classOf[FilterCondition], bodyCode, inputTerm)
307+
308+
new GeneratedFilterCondition(funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
309+
}
242310
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.flink.table.planner.plan.utils.RexLiteralUtil
3939
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
4040
import org.apache.flink.table.runtime.collector.{ListenableCollector, TableFunctionResultFuture}
4141
import org.apache.flink.table.runtime.collector.ListenableCollector.CollectListener
42-
import org.apache.flink.table.runtime.generated.{GeneratedCollector, GeneratedFunction, GeneratedResultFuture}
42+
import org.apache.flink.table.runtime.generated.{GeneratedCollector, GeneratedFilterCondition, GeneratedFunction, GeneratedResultFuture}
4343
import org.apache.flink.table.types.DataType
4444
import org.apache.flink.table.types.extraction.ExtractionUtils.extractSimpleGeneric
4545
import org.apache.flink.table.types.inference.{TypeInference, TypeStrategies, TypeTransformations}
@@ -533,4 +533,31 @@ object LookupJoinCodeGenerator {
533533
classLoader
534534
)
535535
}
536+
537+
/**
538+
* Generates pre-filter condition for lookup join which can be applied before access the dimension
539+
* table.
540+
*/
541+
def generatePreFilterCondition(
542+
tableConfig: ReadableConfig,
543+
classLoader: ClassLoader,
544+
preFilterCondition: RexNode,
545+
leftType: LogicalType): GeneratedFilterCondition = {
546+
val ctx = new CodeGeneratorContext(tableConfig, classLoader)
547+
// should consider null fields
548+
val exprGenerator =
549+
new ExprCodeGenerator(ctx, false).bindInput(leftType, CodeGenUtils.DEFAULT_INPUT_TERM)
550+
551+
val bodyCode = if (preFilterCondition == null) {
552+
"return true;"
553+
} else {
554+
val condition = exprGenerator.generateExpression(preFilterCondition)
555+
s"""
556+
|${condition.code}
557+
|return ${condition.resultTerm};
558+
|""".stripMargin
559+
}
560+
561+
FunctionCodeGenerator.generateFilterCondition(ctx, "PreFilterCondition", bodyCode)
562+
}
536563
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.generated;
20+
21+
import org.apache.flink.api.common.functions.RichFunction;
22+
import org.apache.flink.table.data.RowData;
23+
24+
/** Interface for code generated filter condition function on single RowData. */
25+
public interface FilterCondition extends RichFunction {
26+
27+
/** @return true if the filter condition stays true for the input row */
28+
boolean apply(RowData in);
29+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.generated;
20+
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.ReadableConfig;
24+
25+
/** Describes a generated {@link FilterCondition}. */
26+
public class GeneratedFilterCondition extends GeneratedFunction<FilterCondition> {
27+
28+
private static final long serialVersionUID = 1L;
29+
30+
@VisibleForTesting
31+
public GeneratedFilterCondition(String className, String code, Object[] references) {
32+
super(className, code, references, new Configuration());
33+
}
34+
35+
/**
36+
* Creates a GeneratedFilterCondition.
37+
*
38+
* @param className class name of the generated FilterCondition.
39+
* @param code code of the generated FilterCondition.
40+
* @param references referenced objects of the generated FilterCondition.
41+
* @param conf configuration when generating FilterCondition.
42+
*/
43+
public GeneratedFilterCondition(
44+
String className, String code, Object[] references, ReadableConfig conf) {
45+
super(className, code, references, conf);
46+
}
47+
}

0 commit comments

Comments
 (0)