29
29
import org .apache .flink .streaming .api .operators .SimpleOperatorFactory ;
30
30
import org .apache .flink .streaming .api .operators .StreamOperatorFactory ;
31
31
import org .apache .flink .streaming .api .operators .async .AsyncWaitOperatorFactory ;
32
+ import org .apache .flink .streaming .api .transformations .PartitionTransformation ;
32
33
import org .apache .flink .table .api .TableException ;
33
34
import org .apache .flink .table .catalog .DataTypeFactory ;
34
35
import org .apache .flink .table .connector .ChangelogMode ;
40
41
import org .apache .flink .table .functions .TableFunction ;
41
42
import org .apache .flink .table .functions .UserDefinedFunction ;
42
43
import org .apache .flink .table .functions .UserDefinedFunctionHelper ;
44
+ import org .apache .flink .table .partitioner .RowDataCustomPartitioner ;
43
45
import org .apache .flink .table .planner .calcite .FlinkTypeFactory ;
44
46
import org .apache .flink .table .planner .codegen .CodeGeneratorContext ;
45
47
import org .apache .flink .table .planner .codegen .LookupJoinCodeGenerator ;
55
57
import org .apache .flink .table .planner .plan .schema .LegacyTableSourceTable ;
56
58
import org .apache .flink .table .planner .plan .schema .TableSourceTable ;
57
59
import org .apache .flink .table .planner .plan .utils .LookupJoinUtil ;
60
+ import org .apache .flink .table .planner .plan .utils .LookupJoinUtil .LookupFunctionAndPartitioner ;
58
61
import org .apache .flink .table .planner .utils .JavaScalaConversionUtil ;
59
62
import org .apache .flink .table .planner .utils .ShortcutUtils ;
60
63
import org .apache .flink .table .runtime .collector .ListenableCollector ;
69
72
import org .apache .flink .table .runtime .operators .join .lookup .LookupJoinRunner ;
70
73
import org .apache .flink .table .runtime .operators .join .lookup .LookupJoinWithCalcRunner ;
71
74
import org .apache .flink .table .runtime .operators .join .lookup .ResultRetryStrategy ;
75
+ import org .apache .flink .table .runtime .partitioner .RowDataCustomPartitionerWrapper ;
72
76
import org .apache .flink .table .runtime .types .PlannerTypeUtils ;
73
77
import org .apache .flink .table .runtime .types .TypeInfoDataTypeConverter ;
74
78
import org .apache .flink .table .runtime .typeutils .InternalSerializers ;
98
102
import java .util .Optional ;
99
103
100
104
import static org .apache .flink .table .planner .calcite .FlinkTypeFactory .toLogicalType ;
105
+ import static org .apache .flink .table .planner .plan .nodes .exec .common .CommonExecSink .PARTITIONER_TRANSFORMATION ;
101
106
import static org .apache .flink .table .planner .utils .ShortcutUtils .unwrapTypeFactory ;
102
107
import static org .apache .flink .util .Preconditions .checkArgument ;
103
108
import static org .apache .flink .util .Preconditions .checkNotNull ;
@@ -252,23 +257,28 @@ protected Transformation<RowData> createJoinTransformation(
252
257
ResultRetryStrategy retryStrategy =
253
258
retryOptions != null ? retryOptions .toRetryStrategy () : null ;
254
259
255
- UserDefinedFunction lookupFunction =
260
+ Transformation <RowData > inputTransformation =
261
+ (Transformation <RowData >) inputEdge .translateToPlan (planner );
262
+
263
+ LookupFunctionAndPartitioner lookupFunctionAndPartitioner =
256
264
LookupJoinUtil .getLookupFunction (
257
265
temporalTable ,
258
266
lookupKeys .keySet (),
259
267
planner .getFlinkContext ().getClassLoader (),
260
268
isAsyncEnabled ,
261
269
retryStrategy );
270
+ UserDefinedFunction lookupFunction = lookupFunctionAndPartitioner .getUserDefinedFunction ();
271
+ Optional <RowDataCustomPartitioner > partitioner =
272
+ lookupFunctionAndPartitioner .getPartitioner ();
262
273
UserDefinedFunctionHelper .prepareInstance (config , lookupFunction );
263
274
264
275
boolean isLeftOuterJoin = joinType == FlinkJoinType .LEFT ;
265
276
if (isAsyncEnabled ) {
266
277
assert lookupFunction instanceof AsyncTableFunction ;
267
278
}
268
279
269
- Transformation <RowData > inputTransformation =
270
- (Transformation <RowData >) inputEdge .translateToPlan (planner );
271
-
280
+ // upsert materialize mod expect that input stream is partitioned by the look-up key. We
281
+ // must trade off between correctness and performance.
272
282
if (upsertMaterialize ) {
273
283
// upsertMaterialize only works on sync lookup mode, async lookup is unsupported.
274
284
assert !isAsyncEnabled && !inputChangelogMode .containsOnly (RowKind .INSERT );
@@ -286,46 +296,57 @@ protected Transformation<RowData> createJoinTransformation(
286
296
isLeftOuterJoin ,
287
297
planner .getExecEnv ().getConfig ().isObjectReuseEnabled (),
288
298
lookupKeyContainsPrimaryKey );
299
+ }
300
+
301
+ StreamOperatorFactory <RowData > operatorFactory ;
302
+ if (isAsyncEnabled ) {
303
+ operatorFactory =
304
+ createAsyncLookupJoin (
305
+ temporalTable ,
306
+ config ,
307
+ planner .getFlinkContext ().getClassLoader (),
308
+ lookupKeys ,
309
+ (AsyncTableFunction <Object >) lookupFunction ,
310
+ planner .createRelBuilder (),
311
+ inputRowType ,
312
+ tableSourceRowType ,
313
+ resultRowType ,
314
+ isLeftOuterJoin ,
315
+ asyncLookupOptions );
289
316
} else {
290
- StreamOperatorFactory <RowData > operatorFactory ;
291
- if (isAsyncEnabled ) {
292
- operatorFactory =
293
- createAsyncLookupJoin (
294
- temporalTable ,
295
- config ,
296
- planner .getFlinkContext ().getClassLoader (),
297
- lookupKeys ,
298
- (AsyncTableFunction <Object >) lookupFunction ,
299
- planner .createRelBuilder (),
300
- inputRowType ,
301
- tableSourceRowType ,
302
- resultRowType ,
303
- isLeftOuterJoin ,
304
- asyncLookupOptions );
305
- } else {
306
- operatorFactory =
307
- createSyncLookupJoin (
308
- temporalTable ,
309
- config ,
310
- planner .getFlinkContext ().getClassLoader (),
311
- lookupKeys ,
312
- (TableFunction <Object >) lookupFunction ,
313
- planner .createRelBuilder (),
314
- inputRowType ,
315
- tableSourceRowType ,
316
- resultRowType ,
317
- isLeftOuterJoin ,
318
- planner .getExecEnv ().getConfig ().isObjectReuseEnabled ());
319
- }
317
+ operatorFactory =
318
+ createSyncLookupJoin (
319
+ temporalTable ,
320
+ config ,
321
+ planner .getFlinkContext ().getClassLoader (),
322
+ lookupKeys ,
323
+ (TableFunction <Object >) lookupFunction ,
324
+ planner .createRelBuilder (),
325
+ inputRowType ,
326
+ tableSourceRowType ,
327
+ resultRowType ,
328
+ isLeftOuterJoin ,
329
+ planner .getExecEnv ().getConfig ().isObjectReuseEnabled ());
330
+ }
320
331
321
- return ExecNodeUtil .createOneInputTransformation (
322
- inputTransformation ,
323
- createTransformationMeta (LOOKUP_JOIN_TRANSFORMATION , config ),
324
- operatorFactory ,
325
- InternalTypeInfo .of (resultRowType ),
326
- inputTransformation .getParallelism (),
327
- false );
332
+ if (partitioner .isPresent ()) {
333
+ Transformation <RowData > partitionedTransform =
334
+ new PartitionTransformation <>(
335
+ inputTransformation ,
336
+ new RowDataCustomPartitionerWrapper (partitioner .get ()));
337
+ createTransformationMeta (
338
+ PARTITIONER_TRANSFORMATION , "Partitioner" , "Partitioner" , config )
339
+ .fill (partitionedTransform );
340
+ partitionedTransform .setParallelism (inputTransformation .getParallelism (), false );
341
+ inputTransformation = partitionedTransform ;
328
342
}
343
+ return ExecNodeUtil .createOneInputTransformation (
344
+ inputTransformation ,
345
+ createTransformationMeta (LOOKUP_JOIN_TRANSFORMATION , config ),
346
+ operatorFactory ,
347
+ InternalTypeInfo .of (resultRowType ),
348
+ inputTransformation .getParallelism (),
349
+ false );
329
350
}
330
351
331
352
protected abstract Transformation <RowData > createSyncLookupJoinWithState (
0 commit comments