Skip to content

Commit 5c34add

Browse files
committed
[FLINK-36061][iceberg] Add Iceberg Sink.
1 parent 5a9eb85 commit 5c34add

21 files changed

+1095
-390
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml

+174-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,22 @@
1-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one or more
4+
contributor license agreements. See the NOTICE file distributed with
5+
this work for additional information regarding copyright ownership.
6+
The ASF licenses this file to You under the Apache License, Version 2.0
7+
(the "License"); you may not use this file except in compliance with
8+
the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
220
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
321
<modelVersion>4.0.0</modelVersion>
422
<parent>
@@ -13,7 +31,9 @@
1331
<name>flink-cdc-pipeline-connector-iceberg</name>
1432

1533
<properties>
16-
<iceberg.version>1.7.1</iceberg.version>
34+
<iceberg.version>1.6.1</iceberg.version>
35+
<hadoop.version>2.8.5</hadoop.version>
36+
<hive.version>2.3.9</hive.version>
1737
</properties>
1838

1939
<dependencies>
@@ -39,5 +59,157 @@
3959
<version>${flink.version}</version>
4060
<scope>test</scope>
4161
</dependency>
62+
63+
64+
<!-- hadoop dependency -->
65+
66+
<dependency>
67+
<groupId>org.apache.hadoop</groupId>
68+
<artifactId>hadoop-common</artifactId>
69+
<version>${hadoop.version}</version>
70+
<scope>provided</scope>
71+
<exclusions>
72+
<exclusion>
73+
<groupId>org.apache.avro</groupId>
74+
<artifactId>avro</artifactId>
75+
</exclusion>
76+
<exclusion>
77+
<groupId>log4j</groupId>
78+
<artifactId>log4j</artifactId>
79+
</exclusion>
80+
<exclusion>
81+
<groupId>org.slf4j</groupId>
82+
<artifactId>slf4j-log4j12</artifactId>
83+
</exclusion>
84+
<exclusion>
85+
<groupId>jdk.tools</groupId>
86+
<artifactId>jdk.tools</artifactId>
87+
</exclusion>
88+
<exclusion>
89+
<groupId>com.google.protobuf</groupId>
90+
<artifactId>protobuf-java</artifactId>
91+
</exclusion>
92+
</exclusions>
93+
</dependency>
94+
95+
<dependency>
96+
<groupId>org.apache.hadoop</groupId>
97+
<artifactId>hadoop-hdfs</artifactId>
98+
<version>${hadoop.version}</version>
99+
<scope>provided</scope>
100+
<exclusions>
101+
<exclusion>
102+
<groupId>jdk.tools</groupId>
103+
<artifactId>jdk.tools</artifactId>
104+
</exclusion>
105+
<exclusion>
106+
<groupId>log4j</groupId>
107+
<artifactId>log4j</artifactId>
108+
</exclusion>
109+
<exclusion>
110+
<groupId>org.slf4j</groupId>
111+
<artifactId>slf4j-log4j12</artifactId>
112+
</exclusion>
113+
<exclusion>
114+
<groupId>com.google.protobuf</groupId>
115+
<artifactId>protobuf-java</artifactId>
116+
</exclusion>
117+
</exclusions>
118+
</dependency>
119+
120+
<!-- hive dependency -->
121+
122+
<dependency>
123+
<groupId>org.apache.hive</groupId>
124+
<artifactId>hive-metastore</artifactId>
125+
<version>${hive.version}</version>
126+
<exclusions>
127+
<exclusion>
128+
<groupId>log4j</groupId>
129+
<artifactId>log4j</artifactId>
130+
</exclusion>
131+
<exclusion>
132+
<groupId>org.slf4j</groupId>
133+
<artifactId>slf4j-log4j12</artifactId>
134+
</exclusion>
135+
<exclusion>
136+
<groupId>com.fasterxml.jackson.core</groupId>
137+
<artifactId>jackson-annotations</artifactId>
138+
</exclusion>
139+
<exclusion>
140+
<groupId>com.fasterxml.jackson.core</groupId>
141+
<artifactId>jackson-core</artifactId>
142+
</exclusion>
143+
<exclusion>
144+
<groupId>com.fasterxml.jackson.core</groupId>
145+
<artifactId>jackson-databind</artifactId>
146+
</exclusion>
147+
<exclusion>
148+
<groupId>org.apache.orc</groupId>
149+
<artifactId>orc-core</artifactId>
150+
</exclusion>
151+
<exclusion>
152+
<groupId>jdk.tools</groupId>
153+
<artifactId>jdk.tools</artifactId>
154+
</exclusion>
155+
<exclusion>
156+
<groupId>com.google.protobuf</groupId>
157+
<artifactId>protobuf-java</artifactId>
158+
</exclusion>
159+
</exclusions>
160+
</dependency>
161+
162+
<dependency>
163+
<groupId>org.apache.hive</groupId>
164+
<artifactId>hive-exec</artifactId>
165+
<version>${hive.version}</version>
166+
<scope>provided</scope>
167+
<exclusions>
168+
<exclusion>
169+
<groupId>log4j</groupId>
170+
<artifactId>log4j</artifactId>
171+
</exclusion>
172+
<exclusion>
173+
<groupId>org.slf4j</groupId>
174+
<artifactId>slf4j-log4j12</artifactId>
175+
</exclusion>
176+
<exclusion>
177+
<groupId>com.fasterxml.jackson.core</groupId>
178+
<artifactId>jackson-annotations</artifactId>
179+
</exclusion>
180+
<exclusion>
181+
<groupId>com.fasterxml.jackson.core</groupId>
182+
<artifactId>jackson-core</artifactId>
183+
</exclusion>
184+
<exclusion>
185+
<groupId>com.fasterxml.jackson.core</groupId>
186+
<artifactId>jackson-databind</artifactId>
187+
</exclusion>
188+
<exclusion>
189+
<groupId>org.apache.orc</groupId>
190+
<artifactId>orc-core</artifactId>
191+
</exclusion>
192+
<exclusion>
193+
<groupId>org.pentaho</groupId>
194+
<artifactId>*</artifactId>
195+
</exclusion>
196+
<exclusion>
197+
<groupId>org.apache.calcite</groupId>
198+
<artifactId>calcite-core</artifactId>
199+
</exclusion>
200+
<exclusion>
201+
<groupId>org.apache.calcite</groupId>
202+
<artifactId>calcite-druid</artifactId>
203+
</exclusion>
204+
<exclusion>
205+
<groupId>org.apache.calcite.avatica</groupId>
206+
<artifactId>avatica</artifactId>
207+
</exclusion>
208+
<exclusion>
209+
<groupId>org.apache.calcite</groupId>
210+
<artifactId>calcite-avatica</artifactId>
211+
</exclusion>
212+
</exclusions>
213+
</dependency>
42214
</dependencies>
43215
</project>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java

+20-14
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.flink.cdc.connectors.iceberg.sink;
219

320
import org.apache.flink.cdc.common.event.DataChangeEvent;
4-
import org.apache.flink.cdc.common.event.Event;
521
import org.apache.flink.cdc.common.event.TableId;
622
import org.apache.flink.cdc.common.function.HashFunctionProvider;
723
import org.apache.flink.cdc.common.sink.DataSink;
824
import org.apache.flink.cdc.common.sink.EventSinkProvider;
925
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
1026
import org.apache.flink.cdc.common.sink.MetadataApplier;
11-
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSink;
12-
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;
27+
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergSink;
1328

1429
import java.io.Serializable;
1530
import java.time.ZoneId;
1631
import java.util.List;
1732
import java.util.Map;
1833

34+
/** A {@link DataSink} for Apache Iceberg, */
1935
public class IcebergDataSink implements DataSink, Serializable {
2036

2137
// options for creating Iceberg catalog.
@@ -24,38 +40,28 @@ public class IcebergDataSink implements DataSink, Serializable {
2440
// options for creating Iceberg table.
2541
private final Map<String, String> tableOptions;
2642

27-
private final String commitUser;
28-
2943
private final Map<TableId, List<String>> partitionMaps;
3044

31-
private final IcebergRecordSerializer<Event> serializer;
32-
3345
private final ZoneId zoneId;
3446

3547
public final String schemaOperatorUid;
3648

3749
public IcebergDataSink(
3850
Map<String, String> catalogOptions,
3951
Map<String, String> tableOptions,
40-
String commitUser,
4152
Map<TableId, List<String>> partitionMaps,
42-
IcebergRecordSerializer<Event> serializer,
4353
ZoneId zoneId,
4454
String schemaOperatorUid) {
4555
this.catalogOptions = catalogOptions;
4656
this.tableOptions = tableOptions;
47-
this.commitUser = commitUser;
4857
this.partitionMaps = partitionMaps;
49-
this.serializer = serializer;
5058
this.zoneId = zoneId;
5159
this.schemaOperatorUid = schemaOperatorUid;
5260
}
5361

5462
@Override
5563
public EventSinkProvider getEventSinkProvider() {
56-
IcebergEventSink icebergEventSink =
57-
new IcebergEventSink(
58-
tableOptions, commitUser, serializer, schemaOperatorUid, zoneId);
64+
IcebergSink icebergEventSink = new IcebergSink(tableOptions, schemaOperatorUid, zoneId);
5965
return FlinkSinkProvider.of(icebergEventSink);
6066
}
6167

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java

+19-30
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.flink.cdc.connectors.iceberg.sink;
219

320
import org.apache.flink.cdc.common.configuration.ConfigOption;
4-
import org.apache.flink.cdc.common.event.Event;
521
import org.apache.flink.cdc.common.factories.DataSinkFactory;
622
import org.apache.flink.cdc.common.factories.FactoryHelper;
723
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
824
import org.apache.flink.cdc.common.sink.DataSink;
9-
import org.apache.flink.cdc.common.utils.Preconditions;
10-
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordEventSerializer;
11-
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;
12-
import org.apache.flink.table.catalog.Catalog;
13-
14-
import org.apache.commons.collections.map.HashedMap;
15-
import org.apache.iceberg.flink.FlinkCatalogFactory;
1625

1726
import java.time.ZoneId;
1827
import java.util.HashMap;
@@ -24,6 +33,7 @@
2433
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
2534
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;
2635

36+
/** A {@link DataSinkFactory} for Apache Iceberg. */
2737
public class IcebergDataSinkFactory implements DataSinkFactory {
2838

2939
public static final String IDENTIFIER = "iceberg";
@@ -47,17 +57,6 @@ public DataSink createDataSink(Context context) {
4757
value);
4858
}
4959
});
50-
FlinkCatalogFactory factory = new FlinkCatalogFactory();
51-
try {
52-
Catalog catalog =
53-
factory.createCatalog(
54-
catalogOptions.getOrDefault("default-database", "default"),
55-
catalogOptions);
56-
Preconditions.checkNotNull(
57-
catalog.listDatabases(), "catalog option of Paimon is invalid.");
58-
} catch (Exception e) {
59-
throw new RuntimeException("failed to create or use paimon catalog", e);
60-
}
6160
ZoneId zoneId = ZoneId.systemDefault();
6261
if (!Objects.equals(
6362
context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
@@ -67,21 +66,11 @@ public DataSink createDataSink(Context context) {
6766
context.getPipelineConfiguration()
6867
.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
6968
}
70-
String commitUser =
71-
context.getFactoryConfiguration().get(IcebergDataSinkOptions.COMMIT_USER);
72-
IcebergRecordSerializer<Event> serializer =
73-
new IcebergRecordEventSerializer(new HashedMap(), zoneId);
7469
String schemaOperatorUid =
7570
context.getPipelineConfiguration()
7671
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
7772
return new IcebergDataSink(
78-
catalogOptions,
79-
tableOptions,
80-
commitUser,
81-
new HashMap<>(),
82-
serializer,
83-
zoneId,
84-
schemaOperatorUid);
73+
catalogOptions, tableOptions, new HashMap<>(), zoneId, schemaOperatorUid);
8574
}
8675

8776
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java

+18
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.flink.cdc.connectors.iceberg.sink;
219

320
import org.apache.flink.cdc.common.configuration.ConfigOption;
421

522
import static org.apache.flink.cdc.common.configuration.ConfigOptions.key;
623

24+
/** Config options for {@link IcebergDataSink}. */
725
public class IcebergDataSinkOptions {
826

927
// prefix for passing properties for table creation.

0 commit comments

Comments
 (0)