Skip to content

Commit 5a9eb85

Browse files
ConradJamlvyanquan
ConradJam
authored andcommitted
[FLINK-36061] Support Iceberg CDC Pipeline SinkV2
1 parent 36da15a commit 5a9eb85

16 files changed

+1106
-0
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<parent>
5+
<artifactId>flink-cdc-pipeline-connectors</artifactId>
6+
<groupId>org.apache.flink</groupId>
7+
<version>${revision}</version>
8+
</parent>
9+
10+
<artifactId>flink-cdc-pipeline-connector-iceberg</artifactId>
11+
<packaging>jar</packaging>
12+
13+
<name>flink-cdc-pipeline-connector-iceberg</name>
14+
15+
<properties>
16+
<iceberg.version>1.7.1</iceberg.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.iceberg</groupId>
22+
<artifactId>iceberg-core</artifactId>
23+
<version>${iceberg.version}</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.apache.iceberg</groupId>
27+
<artifactId>iceberg-flink-1.19</artifactId>
28+
<version>${iceberg.version}</version>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.apache.flink</groupId>
32+
<artifactId>flink-cdc-composer</artifactId>
33+
<version>${project.version}</version>
34+
<scope>test</scope>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.apache.flink</groupId>
38+
<artifactId>flink-test-utils</artifactId>
39+
<version>${flink.version}</version>
40+
<scope>test</scope>
41+
</dependency>
42+
</dependencies>
43+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package org.apache.flink.cdc.connectors.iceberg.sink;
2+
3+
import org.apache.flink.cdc.common.event.DataChangeEvent;
4+
import org.apache.flink.cdc.common.event.Event;
5+
import org.apache.flink.cdc.common.event.TableId;
6+
import org.apache.flink.cdc.common.function.HashFunctionProvider;
7+
import org.apache.flink.cdc.common.sink.DataSink;
8+
import org.apache.flink.cdc.common.sink.EventSinkProvider;
9+
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
10+
import org.apache.flink.cdc.common.sink.MetadataApplier;
11+
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSink;
12+
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;
13+
14+
import java.io.Serializable;
15+
import java.time.ZoneId;
16+
import java.util.List;
17+
import java.util.Map;
18+
19+
public class IcebergDataSink implements DataSink, Serializable {
20+
21+
// options for creating Iceberg catalog.
22+
private final Map<String, String> catalogOptions;
23+
24+
// options for creating Iceberg table.
25+
private final Map<String, String> tableOptions;
26+
27+
private final String commitUser;
28+
29+
private final Map<TableId, List<String>> partitionMaps;
30+
31+
private final IcebergRecordSerializer<Event> serializer;
32+
33+
private final ZoneId zoneId;
34+
35+
public final String schemaOperatorUid;
36+
37+
public IcebergDataSink(
38+
Map<String, String> catalogOptions,
39+
Map<String, String> tableOptions,
40+
String commitUser,
41+
Map<TableId, List<String>> partitionMaps,
42+
IcebergRecordSerializer<Event> serializer,
43+
ZoneId zoneId,
44+
String schemaOperatorUid) {
45+
this.catalogOptions = catalogOptions;
46+
this.tableOptions = tableOptions;
47+
this.commitUser = commitUser;
48+
this.partitionMaps = partitionMaps;
49+
this.serializer = serializer;
50+
this.zoneId = zoneId;
51+
this.schemaOperatorUid = schemaOperatorUid;
52+
}
53+
54+
@Override
55+
public EventSinkProvider getEventSinkProvider() {
56+
IcebergEventSink icebergEventSink =
57+
new IcebergEventSink(
58+
tableOptions, commitUser, serializer, schemaOperatorUid, zoneId);
59+
return FlinkSinkProvider.of(icebergEventSink);
60+
}
61+
62+
@Override
63+
public MetadataApplier getMetadataApplier() {
64+
return new IcebergMetadataApplier(catalogOptions, tableOptions, partitionMaps);
65+
}
66+
67+
@Override
68+
public HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider() {
69+
// TODO getDataChangeEventHashFunctionProvider if use
70+
return DataSink.super.getDataChangeEventHashFunctionProvider();
71+
}
72+
73+
@Override
74+
public HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider(
75+
int parallelism) {
76+
return DataSink.super.getDataChangeEventHashFunctionProvider(parallelism);
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package org.apache.flink.cdc.connectors.iceberg.sink;
2+
3+
import org.apache.flink.cdc.common.configuration.ConfigOption;
4+
import org.apache.flink.cdc.common.event.Event;
5+
import org.apache.flink.cdc.common.factories.DataSinkFactory;
6+
import org.apache.flink.cdc.common.factories.FactoryHelper;
7+
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
8+
import org.apache.flink.cdc.common.sink.DataSink;
9+
import org.apache.flink.cdc.common.utils.Preconditions;
10+
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordEventSerializer;
11+
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;
12+
import org.apache.flink.table.catalog.Catalog;
13+
14+
import org.apache.commons.collections.map.HashedMap;
15+
import org.apache.iceberg.flink.FlinkCatalogFactory;
16+
17+
import java.time.ZoneId;
18+
import java.util.HashMap;
19+
import java.util.HashSet;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
import java.util.Set;
23+
24+
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
25+
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;
26+
27+
public class IcebergDataSinkFactory implements DataSinkFactory {
28+
29+
public static final String IDENTIFIER = "iceberg";
30+
31+
@Override
32+
public DataSink createDataSink(Context context) {
33+
FactoryHelper.createFactoryHelper(this, context)
34+
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);
35+
36+
Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
37+
Map<String, String> catalogOptions = new HashMap<>();
38+
Map<String, String> tableOptions = new HashMap<>();
39+
allOptions.forEach(
40+
(key, value) -> {
41+
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
42+
tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value);
43+
} else if (key.startsWith(IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) {
44+
catalogOptions.put(
45+
key.substring(
46+
IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES.length()),
47+
value);
48+
}
49+
});
50+
FlinkCatalogFactory factory = new FlinkCatalogFactory();
51+
try {
52+
Catalog catalog =
53+
factory.createCatalog(
54+
catalogOptions.getOrDefault("default-database", "default"),
55+
catalogOptions);
56+
Preconditions.checkNotNull(
57+
catalog.listDatabases(), "catalog option of Paimon is invalid.");
58+
} catch (Exception e) {
59+
throw new RuntimeException("failed to create or use paimon catalog", e);
60+
}
61+
ZoneId zoneId = ZoneId.systemDefault();
62+
if (!Objects.equals(
63+
context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
64+
PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue())) {
65+
zoneId =
66+
ZoneId.of(
67+
context.getPipelineConfiguration()
68+
.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
69+
}
70+
String commitUser =
71+
context.getFactoryConfiguration().get(IcebergDataSinkOptions.COMMIT_USER);
72+
IcebergRecordSerializer<Event> serializer =
73+
new IcebergRecordEventSerializer(new HashedMap(), zoneId);
74+
String schemaOperatorUid =
75+
context.getPipelineConfiguration()
76+
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
77+
return new IcebergDataSink(
78+
catalogOptions,
79+
tableOptions,
80+
commitUser,
81+
new HashMap<>(),
82+
serializer,
83+
zoneId,
84+
schemaOperatorUid);
85+
}
86+
87+
@Override
88+
public String identifier() {
89+
return IDENTIFIER;
90+
}
91+
92+
@Override
93+
public Set<ConfigOption<?>> requiredOptions() {
94+
Set<ConfigOption<?>> options = new HashSet<>();
95+
options.add(IcebergDataSinkOptions.METASTORE);
96+
return options;
97+
}
98+
99+
@Override
100+
public Set<ConfigOption<?>> optionalOptions() {
101+
Set<ConfigOption<?>> options = new HashSet<>();
102+
options.add(IcebergDataSinkOptions.WAREHOUSE);
103+
options.add(IcebergDataSinkOptions.URI);
104+
options.add(IcebergDataSinkOptions.COMMIT_USER);
105+
options.add(IcebergDataSinkOptions.PARTITION_KEY);
106+
return options;
107+
}
108+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.apache.flink.cdc.connectors.iceberg.sink;
2+
3+
import org.apache.flink.cdc.common.configuration.ConfigOption;
4+
5+
import static org.apache.flink.cdc.common.configuration.ConfigOptions.key;
6+
7+
public class IcebergDataSinkOptions {
8+
9+
// prefix for passing properties for table creation.
10+
public static final String PREFIX_TABLE_PROPERTIES = "table.properties.";
11+
12+
// prefix for passing properties for catalog creation.
13+
public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties.";
14+
15+
public static final ConfigOption<String> COMMIT_USER =
16+
key("commit.user")
17+
.stringType()
18+
.defaultValue("admin")
19+
.withDescription("User name for committing data files.");
20+
21+
public static final ConfigOption<String> WAREHOUSE =
22+
key("catalog.properties.warehouse")
23+
.stringType()
24+
.noDefaultValue()
25+
.withDescription("The warehouse root path of catalog.");
26+
27+
public static final ConfigOption<String> METASTORE =
28+
key("catalog.properties.metastore")
29+
.stringType()
30+
.noDefaultValue()
31+
.withDescription("Metastore of iceberg catalog, supports filesystem and hive.");
32+
33+
public static final ConfigOption<String> URI =
34+
key("catalog.properties.uri")
35+
.stringType()
36+
.noDefaultValue()
37+
.withDescription("Uri of metastore server.");
38+
39+
public static final ConfigOption<String> PARTITION_KEY =
40+
key("partition.key")
41+
.stringType()
42+
.defaultValue("")
43+
.withDescription(
44+
"Partition keys for each partitioned table, allow setting multiple primary keys for multiTables. "
45+
+ "Tables are separated by ';', and partition keys are separated by ','. "
46+
+ "For example, we can set partition.key of two tables by 'testdb.table1:id1,id2;testdb.table2:name'.");
47+
}

0 commit comments

Comments
 (0)