Skip to content

Commit 82bb81b

Browse files
committed
[FLINK-35652][table] Introduce SupportsLookupCustomShuffle interface
1 parent 4ed6e0e commit 82bb81b

File tree

3 files changed

+172
-0
lines changed

3 files changed

+172
-0
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.connector.source;
2020

2121
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.connector.source.abilities.SupportsLookupCustomShuffle;
2223
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
2324
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
2425
import org.apache.flink.types.RowKind;
@@ -91,6 +92,20 @@ interface LookupContext extends DynamicTableSource.Context {
9192
* @return array of key index paths
9293
*/
9394
int[][] getKeys();
95+
96+
/**
97+
* Whether the distribution of the input stream data matches the partitioner provided by the
98+
* {@link LookupTableSource}. If the interface {@link SupportsLookupCustomShuffle} is not
99+
* implemented, false is guaranteed to be returned.
100+
*
101+
* <p>The method {@link LookupTableSource#getLookupRuntimeProvider} will be called first,
102+
* then the framework will set up the custom shuffle based on the result returned by {@link
103+
* SupportsLookupCustomShuffle#getPartitioner}.
104+
*
105+
* @return true if planner is ready to apply the custom partitioner provided by the source,
106+
* otherwise returns false
107+
*/
108+
boolean preferCustomShuffle();
94109
}
95110

96111
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.connector.source.abilities;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.api.common.functions.Partitioner;
23+
import org.apache.flink.table.connector.source.LookupTableSource;
24+
import org.apache.flink.table.data.RowData;
25+
26+
import java.io.Serializable;
27+
import java.util.Optional;
28+
29+
/**
30+
* This interface is designed to allow connectors to provide a custom partitioning strategy for the
31+
* data that is fed into the {@link LookupTableSource}. This enables the Flink Planner to optimize
32+
* the distribution of input stream across different subtasks of lookup-join node to match the
33+
* distribution of data in the external data source.
34+
*/
35+
@PublicEvolving
36+
public interface SupportsLookupCustomShuffle {
37+
/**
38+
* This method is used to retrieve a custom partitioner that will be applied to the input stream
39+
* of lookup-join node.
40+
*
41+
* @return An {@link InputDataPartitioner} that defines how records should be distributed across
42+
* the different subtasks. If the connector expects the input data to remain in its original
43+
* distribution, an {@link Optional#empty()} should be returned.
44+
*/
45+
Optional<InputDataPartitioner> getPartitioner();
46+
47+
/**
48+
* This interface is responsible for providing custom partitioning logic for the RowData
49+
* records. We didn't use {@link Partitioner} directly because the input data is always RowData
50+
* type, and we need to extract all join keys from the input data before send it to partitioner.
51+
*/
52+
@PublicEvolving
53+
interface InputDataPartitioner extends Serializable {
54+
/**
55+
* Determining the partition id for each input data.
56+
*
57+
* <p>This data is projected to only including all join keys before emit to this
58+
* partitioner.
59+
*
60+
* @param joinKeys The extracted join key for each input record.
61+
* @param numPartitions The total number of partition.
62+
* @return An integer representing the partition id to which the record should be sent.
63+
*/
64+
int partition(RowData joinKeys, int numPartitions);
65+
66+
/**
67+
* Returns information about the determinism of this partitioner.
68+
*
69+
* <p>It returns true if and only if a call to the {@link #partition(RowData, int)} method
70+
* is guaranteed to always return the same result given the same joinKeyRow. If the
71+
* partitioning logic depends on not purely functional like <code>
72+
* random(), date(), now(), ...</code> this method must return false.
73+
*
74+
* <p>If this method return false, planner may not apply this partitioner in upsert mode to
75+
* avoid out-of-order of the changelog events.
76+
*/
77+
default boolean isDeterministic() {
78+
return true;
79+
}
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.partitioner;
20+
21+
import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
22+
import org.apache.flink.runtime.plugable.SerializationDelegate;
23+
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
24+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
25+
import org.apache.flink.table.connector.source.abilities.SupportsLookupCustomShuffle.InputDataPartitioner;
26+
import org.apache.flink.table.data.RowData;
27+
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
28+
import org.apache.flink.util.Preconditions;
29+
30+
/** The partitioner used to partition row data by the connector's custom logic. */
31+
public class RowDataCustomStreamPartitioner extends StreamPartitioner<RowData> {
32+
33+
private final InputDataPartitioner partitioner;
34+
35+
private final RowDataKeySelector keySelector;
36+
37+
public RowDataCustomStreamPartitioner(
38+
InputDataPartitioner partitioner, RowDataKeySelector keySelector) {
39+
this.partitioner = partitioner;
40+
this.keySelector = keySelector;
41+
}
42+
43+
@Override
44+
public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) {
45+
RowData key;
46+
try {
47+
key = keySelector.getKey(record.getInstance().getValue());
48+
} catch (Exception e) {
49+
throw new RuntimeException(
50+
"Could not extract key from " + record.getInstance().getValue(), e);
51+
}
52+
int partition = partitioner.partition(key, numberOfChannels);
53+
Preconditions.checkState(partition < numberOfChannels);
54+
return partition;
55+
}
56+
57+
@Override
58+
public StreamPartitioner<RowData> copy() {
59+
return this;
60+
}
61+
62+
@Override
63+
public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
64+
return SubtaskStateMapper.ARBITRARY;
65+
}
66+
67+
@Override
68+
public boolean isPointwise() {
69+
return false;
70+
}
71+
72+
@Override
73+
public String toString() {
74+
return "CUSTOM";
75+
}
76+
}

0 commit comments

Comments
 (0)