Skip to content

Commit 302a691

Browse files
authored
[FLINK-35354] Support host mapping in Flink tikv cdc (#3336)
* [FLINK-35337][cdc] Keep up with the latest version of tikv client * [FLINK-35354]Support host mapping in Flink tikv cdc * [FLINK-35354] Add doc for host mapping feature * [FLINK-35354] fixed annotation import
1 parent ca1470d commit 302a691

File tree

8 files changed

+189
-3
lines changed

8 files changed

+189
-3
lines changed

docs/content.zh/docs/connectors/flink-sources/tidb-cdc.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ Connector Options
124124
<td style="word-wrap: break-word;">(none)</td>
125125
<td>String</td>
126126
<td>TiKV cluster's PD address.</td>
127+
</tr>
128+
<tr>
129+
<td>host-mapping</td>
130+
<td>optional</td>
131+
<td style="word-wrap: break-word;">(none)</td>
132+
<td>String</td>
133+
<td>TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.</td>
127134
</tr>
128135
<tr>
129136
<td>tikv.grpc.timeout_in_ms</td>

docs/content/docs/connectors/flink-sources/tidb-cdc.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,13 @@ Connector Options
125125
<td>String</td>
126126
<td>TiKV cluster's PD address.</td>
127127
</tr>
128+
<tr>
129+
<td>host-mapping</td>
130+
<td>optional</td>
131+
<td style="word-wrap: break-word;">(none)</td>
132+
<td>String</td>
133+
<td>TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.</td>
134+
</tr>
128135
<tr>
129136
<td>tikv.grpc.timeout_in_ms</td>
130137
<td>optional</td>

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.cdc.connectors.tidb;
1919

20+
import org.apache.flink.cdc.connectors.tidb.table.utils.UriHostMapping;
2021
import org.apache.flink.configuration.ConfigOption;
2122
import org.apache.flink.configuration.ConfigOptions;
2223
import org.apache.flink.configuration.Configuration;
@@ -25,6 +26,7 @@
2526
import org.tikv.common.TiConfiguration;
2627

2728
import java.util.Map;
29+
import java.util.Optional;
2830

2931
/** Configurations for {@link TiDBSource}. */
3032
public class TDBSourceOptions {
@@ -57,6 +59,12 @@ private TDBSourceOptions() {}
5759
.noDefaultValue()
5860
.withDescription("TiKV cluster's PD address");
5961

62+
public static final ConfigOption<String> HOST_MAPPING =
63+
ConfigOptions.key("host-mapping")
64+
.stringType()
65+
.noDefaultValue()
66+
.withDescription(
67+
"TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.");
6068
public static final ConfigOption<Long> TIKV_GRPC_TIMEOUT =
6169
ConfigOptions.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
6270
.longType()
@@ -82,10 +90,11 @@ private TDBSourceOptions() {}
8290
.withDescription("TiKV GRPC batch scan concurrency");
8391

8492
public static TiConfiguration getTiConfiguration(
85-
final String pdAddrsStr, final Map<String, String> options) {
93+
final String pdAddrsStr, final String hostMapping, final Map<String, String> options) {
8694
final Configuration configuration = Configuration.fromMap(options);
8795

8896
final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr);
97+
Optional.of(new UriHostMapping(hostMapping)).ifPresent(tiConf::setHostMapping);
8998
configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout);
9099
configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout);
91100
configuration

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
import org.tikv.common.TiConfiguration;
3535

36+
import javax.annotation.Nullable;
37+
3638
import java.util.Collections;
3739
import java.util.List;
3840
import java.util.Map;
@@ -52,6 +54,7 @@ public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata
5254
private final String database;
5355
private final String tableName;
5456
private final String pdAddresses;
57+
@Nullable private final String hostMapping;
5558
private final StartupOptions startupOptions;
5659
private final Map<String, String> options;
5760

@@ -70,12 +73,14 @@ public TiDBTableSource(
7073
String database,
7174
String tableName,
7275
String pdAddresses,
76+
String hostMapping,
7377
StartupOptions startupOptions,
7478
Map<String, String> options) {
7579
this.physicalSchema = physicalSchema;
7680
this.database = checkNotNull(database);
7781
this.tableName = checkNotNull(tableName);
7882
this.pdAddresses = checkNotNull(pdAddresses);
83+
this.hostMapping = hostMapping;
7984
this.startupOptions = startupOptions;
8085
this.producedDataType = physicalSchema.toPhysicalRowDataType();
8186
this.options = options;
@@ -93,7 +98,8 @@ public ChangelogMode getChangelogMode() {
9398

9499
@Override
95100
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
96-
final TiConfiguration tiConf = TDBSourceOptions.getTiConfiguration(pdAddresses, options);
101+
final TiConfiguration tiConf =
102+
TDBSourceOptions.getTiConfiguration(pdAddresses, hostMapping, options);
97103
RowType physicalDataType =
98104
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
99105
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
@@ -132,7 +138,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
132138
public DynamicTableSource copy() {
133139
TiDBTableSource source =
134140
new TiDBTableSource(
135-
physicalSchema, database, tableName, pdAddresses, startupOptions, options);
141+
physicalSchema,
142+
database,
143+
tableName,
144+
pdAddresses,
145+
hostMapping,
146+
startupOptions,
147+
options);
136148
source.producedDataType = producedDataType;
137149
source.metadataKeys = metadataKeys;
138150
return source;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSourceFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Set;
3434

3535
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.DATABASE_NAME;
36+
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.HOST_MAPPING;
3637
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.PD_ADDRESSES;
3738
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.SCAN_STARTUP_MODE;
3839
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.TABLE_NAME;
@@ -56,6 +57,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
5657
String databaseName = config.get(DATABASE_NAME);
5758
String tableName = config.get(TABLE_NAME);
5859
String pdAddresses = config.get(PD_ADDRESSES);
60+
String hostMapping = config.get(HOST_MAPPING);
5961
StartupOptions startupOptions = getStartupOptions(config);
6062
ResolvedSchema physicalSchema =
6163
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
@@ -67,6 +69,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
6769
databaseName,
6870
tableName,
6971
pdAddresses,
72+
hostMapping,
7073
startupOptions,
7174
TiKVOptions.getTiKVOptions(context.getCatalogTable().getOptions()));
7275
}
@@ -89,6 +92,7 @@ public Set<ConfigOption<?>> requiredOptions() {
8992
public Set<ConfigOption<?>> optionalOptions() {
9093
Set<ConfigOption<?>> options = new HashSet<>();
9194
options.add(SCAN_STARTUP_MODE);
95+
options.add(HOST_MAPPING);
9296
options.add(TIKV_GRPC_TIMEOUT);
9397
options.add(TIKV_GRPC_SCAN_TIMEOUT);
9498
options.add(TIKV_BATCH_GET_CONCURRENCY);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.tidb.table.utils;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
import org.tikv.common.HostMapping;
23+
24+
import java.net.URI;
25+
import java.net.URISyntaxException;
26+
import java.util.Arrays;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.ConcurrentMap;
29+
import java.util.stream.Collectors;
30+
31+
/** Get TiKV Host Mapping Function. {@link HostMapping}.* */
32+
public class UriHostMapping implements HostMapping {
33+
private static final Logger LOG = LoggerFactory.getLogger(UriHostMapping.class);
34+
private final ConcurrentMap<String, String> hostMapping;
35+
36+
public UriHostMapping(String hostMappingString) {
37+
if (hostMappingString == null || hostMappingString.isEmpty()) {
38+
hostMapping = null;
39+
return;
40+
}
41+
try {
42+
this.hostMapping =
43+
Arrays.stream(hostMappingString.split(";"))
44+
.map(
45+
s -> {
46+
String[] hostAndPort = s.split(":");
47+
return new ConcurrentHashMap.SimpleEntry<>(
48+
hostAndPort[0], hostAndPort[1]);
49+
})
50+
.collect(
51+
Collectors.toConcurrentMap(
52+
ConcurrentHashMap.SimpleEntry::getKey,
53+
ConcurrentHashMap.SimpleEntry::getValue));
54+
} catch (Exception e) {
55+
LOG.error("Invalid host mapping string: {}", hostMappingString, e);
56+
throw new IllegalArgumentException("Invalid host mapping string: " + hostMappingString);
57+
}
58+
}
59+
60+
public ConcurrentMap<String, String> getHostMapping() {
61+
return hostMapping;
62+
}
63+
64+
@Override
65+
public URI getMappedURI(URI uri) {
66+
if (hostMapping != null && hostMapping.containsKey(uri.getHost())) {
67+
try {
68+
return new URI(
69+
uri.getScheme(),
70+
uri.getUserInfo(),
71+
hostMapping.get(uri.getHost()),
72+
uri.getPort(),
73+
uri.getPath(),
74+
uri.getQuery(),
75+
uri.getFragment());
76+
} catch (URISyntaxException ex) {
77+
LOG.error("Failed to get mapped URI", ex);
78+
throw new IllegalArgumentException(ex);
79+
}
80+
}
81+
return uri;
82+
}
83+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class TiDBTableSourceFactoryTest {
7171
private static final String MY_DATABASE = "inventory";
7272
private static final String MY_TABLE = "products";
7373
private static final String PD_ADDRESS = "pd0:2379";
74+
private static final String HOST_MAPPING = "host1:1;host2:2;host3:3";
7475
private static final Map<String, String> OPTIONS = new HashMap<>();
7576

7677
@Test
@@ -85,6 +86,7 @@ public void testCommonProperties() {
8586
MY_DATABASE,
8687
MY_TABLE,
8788
PD_ADDRESS,
89+
HOST_MAPPING,
8890
StartupOptions.latest(),
8991
OPTIONS);
9092
assertEquals(expectedSource, actualSource);
@@ -93,6 +95,7 @@ public void testCommonProperties() {
9395
@Test
9496
public void testOptionalProperties() {
9597
Map<String, String> properties = getAllOptions();
98+
properties.put("host-mapping", "host1:1;host2:2;host3:3");
9699
properties.put("tikv.grpc.timeout_in_ms", "20000");
97100
properties.put("tikv.grpc.scan_timeout_in_ms", "20000");
98101
properties.put("tikv.batch_get_concurrency", "4");
@@ -115,6 +118,7 @@ public void testOptionalProperties() {
115118
MY_DATABASE,
116119
MY_TABLE,
117120
PD_ADDRESS,
121+
HOST_MAPPING,
118122
StartupOptions.latest(),
119123
options);
120124
assertEquals(expectedSource, actualSource);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.tidb.table.utils;
19+
20+
import org.apache.flink.cdc.connectors.tidb.TDBSourceOptions;
21+
22+
import org.junit.Test;
23+
import org.tikv.common.TiConfiguration;
24+
25+
import java.util.HashMap;
26+
27+
import static org.junit.Assert.assertEquals;
28+
29+
/** Unit test for {@link UriHostMapping}. * */
30+
public class UriHostMappingTest {
31+
32+
@Test
33+
public void uriHostMappingTest() {
34+
final TiConfiguration tiConf =
35+
TDBSourceOptions.getTiConfiguration(
36+
"http://0.0.0.0:2347", "host1:1;host2:2;host3:3", new HashMap<>());
37+
UriHostMapping uriHostMapping = (UriHostMapping) tiConf.getHostMapping();
38+
assertEquals(uriHostMapping.getHostMapping().size(), 3);
39+
assertEquals(uriHostMapping.getHostMapping().get("host1"), "1");
40+
}
41+
42+
@Test
43+
public void uriHostMappingEmpty() {
44+
final TiConfiguration tiConf =
45+
TDBSourceOptions.getTiConfiguration("http://0.0.0.0:2347", "", new HashMap<>());
46+
UriHostMapping uriHostMapping = (UriHostMapping) tiConf.getHostMapping();
47+
assertEquals(uriHostMapping.getHostMapping(), null);
48+
}
49+
50+
@Test
51+
public void uriHostMappingError() {
52+
try {
53+
final TiConfiguration tiConf =
54+
TDBSourceOptions.getTiConfiguration(
55+
"http://0.0.0.0:2347", "host1=1;host2=2;host3=3", new HashMap<>());
56+
} catch (IllegalArgumentException e) {
57+
assertEquals(e.getMessage(), "Invalid host mapping string: host1=1;host2=2;host3=3");
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)