Skip to content

Commit

Permalink
[FLINK-35652][table] Shuffle input stream of lookup join based on LOO…
Browse files Browse the repository at this point in the history
…KUP hint
  • Loading branch information
reswqa committed Dec 4, 2024
1 parent 82bb81b commit f82bde7
Show file tree
Hide file tree
Showing 26 changed files with 2,084 additions and 418 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public class LookupJoinHintOptions {
.noDefaultValue()
.withDescription("Max attempt number of the 'fixed-delay' retry strategy.");

public static final ConfigOption<Boolean> SHUFFLE =
key("shuffle")
.booleanType()
.defaultValue(false)
.withDescription("Enable shuffle before lookup join.");

public static final String LOOKUP_MISS_PREDICATE = "lookup_miss";

private static final Set<ConfigOption<?>> requiredKeys = new HashSet<>();
Expand All @@ -117,6 +123,7 @@ public class LookupJoinHintOptions {
supportedKeys.add(RETRY_STRATEGY);
supportedKeys.add(FIXED_DELAY);
supportedKeys.add(MAX_ATTEMPTS);
supportedKeys.add(SHUFFLE);
}

public static ImmutableSet<ConfigOption> getRequiredOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public BatchExecLookupJoin(
@Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions,
InputProperty inputProperty,
RowType outputType,
String description) {
String description,
boolean preferCustomShuffle) {
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecLookupJoin.class),
Expand All @@ -94,7 +95,8 @@ public BatchExecLookupJoin(
ChangelogMode.insertOnly(),
Collections.singletonList(inputProperty),
outputType,
description);
description,
preferCustomShuffle);
}

@JsonCreator
Expand All @@ -117,7 +119,8 @@ public BatchExecLookupJoin(
LookupJoinUtil.AsyncLookupOptions asyncLookupOptions,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
@JsonProperty(FIELD_NAME_DESCRIPTION) String description,
@JsonProperty(FIELD_NAME_PREFER_CUSTOM_SHUFFLE) boolean preferCustomShuffle) {
super(
id,
context,
Expand All @@ -135,7 +138,8 @@ public BatchExecLookupJoin(
ChangelogMode.insertOnly(),
inputProperties,
outputType,
description);
description,
preferCustomShuffle);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {

public static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions";
public static final String FIELD_NAME_RETRY_OPTIONS = "retryOptions";
public static final String FIELD_NAME_PREFER_CUSTOM_SHUFFLE = "preferCustomShuffle";
public static final String CUSTOM_SHUFFLE_TRANSFORMATION = "custom-shuffle";

@JsonProperty(FIELD_NAME_JOIN_TYPE)
private final FlinkJoinType joinType;
Expand Down Expand Up @@ -197,6 +199,9 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
@JsonInclude(JsonInclude.Include.NON_NULL)
private final @Nullable LookupJoinUtil.RetryLookupOptions retryOptions;

@JsonProperty(FIELD_NAME_PREFER_CUSTOM_SHUFFLE)
private final boolean preferCustomShuffle;

protected CommonExecLookupJoin(
int id,
ExecNodeContext context,
Expand All @@ -214,7 +219,8 @@ protected CommonExecLookupJoin(
ChangelogMode inputChangelogMode,
List<InputProperty> inputProperties,
RowType outputType,
String description) {
String description,
boolean preferCustomShuffle) {
super(id, context, persistedConfig, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 1);
this.joinType = checkNotNull(joinType);
Expand All @@ -227,6 +233,7 @@ protected CommonExecLookupJoin(
this.inputChangelogMode = inputChangelogMode;
this.asyncLookupOptions = asyncLookupOptions;
this.retryOptions = retryOptions;
this.preferCustomShuffle = preferCustomShuffle;
}

public TemporalTableSourceSpec getTemporalTableSourceSpec() {
Expand All @@ -252,23 +259,36 @@ protected Transformation<RowData> createJoinTransformation(
ResultRetryStrategy retryStrategy =
retryOptions != null ? retryOptions.toRetryStrategy() : null;

boolean tryApplyCustomShuffle = preferCustomShuffle && !upsertMaterialize;
UserDefinedFunction lookupFunction =
LookupJoinUtil.getLookupFunction(
temporalTable,
lookupKeys.keySet(),
planner.getFlinkContext().getClassLoader(),
isAsyncEnabled,
retryStrategy);
retryStrategy,
tryApplyCustomShuffle);
Transformation<RowData> inputTransformation =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
if (tryApplyCustomShuffle) {
inputTransformation =
LookupJoinUtil.tryApplyCustomShufflePartitioner(
planner,
temporalTable,
inputRowType,
lookupKeys,
inputTransformation,
inputChangelogMode,
createTransformationMeta(CUSTOM_SHUFFLE_TRANSFORMATION, config));
}

UserDefinedFunctionHelper.prepareInstance(config, lookupFunction);

boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
if (isAsyncEnabled) {
assert lookupFunction instanceof AsyncTableFunction;
}

Transformation<RowData> inputTransformation =
(Transformation<RowData>) inputEdge.translateToPlan(planner);

if (upsertMaterialize) {
// upsertMaterialize only works on sync lookup mode, async lookup is unsupported.
assert !isAsyncEnabled && !inputChangelogMode.containsOnly(RowKind.INSERT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public StreamExecLookupJoin(
@Nullable int[] inputUpsertKey,
InputProperty inputProperty,
RowType outputType,
String description) {
String description,
boolean preferCustomShuffle) {
this(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(StreamExecLookupJoin.class),
Expand All @@ -144,7 +145,8 @@ public StreamExecLookupJoin(
: null,
Collections.singletonList(inputProperty),
outputType,
description);
description,
preferCustomShuffle);
}

@JsonCreator
Expand Down Expand Up @@ -176,7 +178,8 @@ public StreamExecLookupJoin(
@JsonProperty(FIELD_NAME_STATE) @Nullable List<StateMetadata> stateMetadataList,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
@JsonProperty(FIELD_NAME_DESCRIPTION) String description,
@JsonProperty(FIELD_NAME_PREFER_CUSTOM_SHUFFLE) boolean preferCustomShuffle) {
super(
id,
context,
Expand All @@ -193,7 +196,8 @@ public StreamExecLookupJoin(
inputChangelogMode,
inputProperties,
outputType,
description);
description,
preferCustomShuffle);
this.lookupKeyContainsPrimaryKey = lookupKeyContainsPrimaryKey;
this.upsertMaterialize = upsertMaterialize;
this.inputUpsertKey = inputUpsertKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
Expand Down Expand Up @@ -390,4 +392,15 @@ public static void makeLegacySourceTransformationsBounded(Transformation<?> tran
}
transformation.getInputs().forEach(ExecNodeUtil::makeLegacySourceTransformationsBounded);
}

/** Create a {@link PartitionTransformation}. */
public static <I> Transformation<I> createPartitionTransformation(
Transformation<I> input,
TransformationMetadata transformationMeta,
StreamPartitioner<I> streamPartitioner) {
PartitionTransformation<I> transformation =
new PartitionTransformation<>(input, streamPartitioner);
transformationMeta.fill(transformation);
return transformation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import java.util.Arrays;
import java.util.Map;

/** Utility for KeySelector. */
public class KeySelectorUtil {

Expand Down Expand Up @@ -91,4 +94,60 @@ public static RowDataKeySelector getRowDataSelector(
return EmptyRowDataKeySelector.INSTANCE;
}
}

/**
* Create a {@link RowDataKeySelector} which select the columns of lookup keys from the row of
* left table in lookup join.
*
* @param classLoader the user classloader
* @param lookupKeysOfRightTable the lookup keys
* @param leftTableRowType the row type of left table
* @return the RowDataKeySelector
*/
public static RowDataKeySelector getLookupKeysSelectorFromLeftTable(
ClassLoader classLoader,
Map<Integer, LookupJoinUtil.LookupKey> lookupKeysOfRightTable,
InternalTypeInfo<RowData> leftTableRowType) {
LogicalType[] inputFieldTypes = leftTableRowType.toRowFieldTypes();
int[] lookupKeyIndicesInOrder =
LookupJoinUtil.getOrderedLookupKeys(lookupKeysOfRightTable.keySet());
// 1. Generate the map from left table to lookup keys.
int[] inputMapping = new int[lookupKeysOfRightTable.size()];
Arrays.fill(inputMapping, ProjectionCodeGenerator.EMPTY_INPUT_MAPPING_VALUE());
// 2. Generate all lookup keys in order.
LookupJoinUtil.LookupKey[] orderedLookupKeys =
new LookupJoinUtil.LookupKey[lookupKeyIndicesInOrder.length];
// 3. Generate the logical types of all lookup keys.
LogicalType[] orderedLookupKeyLogicalTypes = new LogicalType[lookupKeysOfRightTable.size()];
int cnt = 0;
for (int idx : lookupKeyIndicesInOrder) {
LookupJoinUtil.LookupKey key = lookupKeysOfRightTable.get(idx);
if (key instanceof LookupJoinUtil.ConstantLookupKey) {
LogicalType keyType = ((LookupJoinUtil.ConstantLookupKey) key).sourceType;
orderedLookupKeyLogicalTypes[cnt] = keyType;
} else if (key instanceof LookupJoinUtil.FieldRefLookupKey) {
int leftIdx = ((LookupJoinUtil.FieldRefLookupKey) key).index;
inputMapping[cnt] = leftIdx;
orderedLookupKeyLogicalTypes[cnt] = inputFieldTypes[leftIdx];
} else {
throw new UnsupportedOperationException("The lookup key " + key + " is invalid.");
}
orderedLookupKeys[cnt] = key;
cnt++;
}
RowType orderedLookupKeyRowType = RowType.of(orderedLookupKeyLogicalTypes);
GeneratedProjection generatedProjection =
ProjectionCodeGenerator.generateProjectionForLookupKeysFromLeftTable(
orderedLookupKeys,
new CodeGeneratorContext(new Configuration(), classLoader),
"LookupKeyProjection",
leftTableRowType.toRowType(),
orderedLookupKeyRowType,
inputMapping,
GenericRowData.class);
return new GenericRowDataKeySelector(
InternalTypeInfo.of(orderedLookupKeyRowType),
InternalSerializers.create(orderedLookupKeyRowType),
generatedProjection);
}
}
Loading

0 comments on commit f82bde7

Please sign in to comment.