Skip to content

Commit 2d5f107

Browse files
committed
draft: commit able
1 parent 6750f5e commit 2d5f107

File tree

7 files changed

+250
-40
lines changed

7 files changed

+250
-40
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java

-14
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package org.apache.flink.cdc.connectors.iceberg.sink;
22

3-
import org.apache.flink.cdc.common.event.DataChangeEvent;
43
import org.apache.flink.cdc.common.event.Event;
54
import org.apache.flink.cdc.common.event.TableId;
6-
import org.apache.flink.cdc.common.function.HashFunctionProvider;
75
import org.apache.flink.cdc.common.sink.DataSink;
86
import org.apache.flink.cdc.common.sink.EventSinkProvider;
97
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
@@ -63,16 +61,4 @@ public EventSinkProvider getEventSinkProvider() {
6361
public MetadataApplier getMetadataApplier() {
6462
return new IcebergMetadataApplier(catalogOptions, tableOptions, partitionMaps);
6563
}
66-
67-
@Override
68-
public HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider() {
69-
// TODO getDataChangeEventHashFunctionProvider if use
70-
return DataSink.super.getDataChangeEventHashFunctionProvider();
71-
}
72-
73-
@Override
74-
public HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider(
75-
int parallelism) {
76-
return DataSink.super.getDataChangeEventHashFunctionProvider(parallelism);
77-
}
7864
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
20+
21+
import org.apache.flink.shaded.guava31.com.google.common.base.MoreObjects;
22+
23+
import java.io.Serializable;
24+
import java.util.Arrays;
25+
import java.util.Objects;
26+
27+
/**
28+
* The aggregated results of a single checkpoint which should be committed. Containing the
29+
* serialized {@link org.apache.iceberg.flink.sink.DeltaManifests} file - which contains the commit
30+
* data, and the jobId, operatorId, checkpointId triplet which helps identifying the specific commit
31+
*
32+
* <p>{@link IcebergCommittableSerializer} is used for serializing the objects between the Writer
33+
* and the Aggregator operator and between the Aggregator and the Committer as well.
34+
*/
35+
public class IcebergCommittable implements Serializable {
36+
private final byte[] manifest;
37+
private final String jobId;
38+
private final String operatorId;
39+
private final long checkpointId;
40+
41+
public IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) {
42+
this.manifest = manifest;
43+
this.jobId = jobId;
44+
this.operatorId = operatorId;
45+
this.checkpointId = checkpointId;
46+
}
47+
48+
byte[] manifest() {
49+
return manifest;
50+
}
51+
52+
String jobId() {
53+
return jobId;
54+
}
55+
56+
String operatorId() {
57+
return operatorId;
58+
}
59+
60+
Long checkpointId() {
61+
return checkpointId;
62+
}
63+
64+
@Override
65+
public String toString() {
66+
return MoreObjects.toStringHelper(this)
67+
.add("jobId", jobId)
68+
.add("checkpointId", checkpointId)
69+
.add("operatorId", operatorId)
70+
.toString();
71+
}
72+
73+
@Override
74+
public boolean equals(Object o) {
75+
if (this == o) {
76+
return true;
77+
}
78+
79+
if (o == null || getClass() != o.getClass()) {
80+
return false;
81+
}
82+
83+
IcebergCommittable that = (IcebergCommittable) o;
84+
return checkpointId == that.checkpointId
85+
&& Arrays.equals(manifest, that.manifest)
86+
&& Objects.equals(jobId, that.jobId)
87+
&& Objects.equals(operatorId, that.operatorId);
88+
}
89+
90+
@Override
91+
public int hashCode() {
92+
int result = Objects.hash(jobId, operatorId, checkpointId);
93+
result = 31 * result + Arrays.hashCode(manifest);
94+
return result;
95+
}
96+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
20+
21+
import org.apache.flink.core.io.SimpleVersionedSerializer;
22+
import org.apache.flink.core.memory.DataInputDeserializer;
23+
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
24+
25+
import java.io.ByteArrayOutputStream;
26+
import java.io.IOException;
27+
28+
/**
29+
* This serializer is used for serializing the {@link IcebergCommittable} objects between the Writer
30+
* and the Aggregator operator and between the Aggregator and the Committer as well.
31+
*
32+
* <p>In both cases only the respective part is serialized.
33+
*/
34+
public class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
35+
private static final int VERSION = 1;
36+
37+
@Override
38+
public int getVersion() {
39+
return VERSION;
40+
}
41+
42+
@Override
43+
public byte[] serialize(IcebergCommittable committable) throws IOException {
44+
ByteArrayOutputStream out = new ByteArrayOutputStream();
45+
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
46+
view.writeUTF(committable.jobId());
47+
view.writeUTF(committable.operatorId());
48+
view.writeLong(committable.checkpointId());
49+
view.writeInt(committable.manifest().length);
50+
view.write(committable.manifest());
51+
return out.toByteArray();
52+
}
53+
54+
@Override
55+
public IcebergCommittable deserialize(int version, byte[] serialized) throws IOException {
56+
if (version == 1) {
57+
DataInputDeserializer view = new DataInputDeserializer(serialized);
58+
String jobId = view.readUTF();
59+
String operatorId = view.readUTF();
60+
long checkpointId = view.readLong();
61+
int manifestLen = view.readInt();
62+
byte[] manifestBuf;
63+
manifestBuf = new byte[manifestLen];
64+
view.read(manifestBuf);
65+
return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId);
66+
}
67+
throw new IOException("Unrecognized version or corrupt state: " + version);
68+
}
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
20+
21+
import org.apache.flink.api.connector.sink2.Committer;
22+
23+
import com.google.common.collect.Maps;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import java.io.IOException;
28+
import java.util.Collection;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.NavigableMap;
32+
import java.util.stream.Collectors;
33+
34+
/**
35+
* This class implements the Flink SinkV2 {@link Committer} interface to implement the Iceberg
36+
* commits. The implementation builds on the following assumptions:
37+
*
38+
* <ul>
39+
* <li>There is a single {@link org.apache.iceberg.flink.sink.IcebergCommittable} for every
40+
* checkpoint
41+
* <li>There is no late checkpoint - if checkpoint 'x' has received in one call, then after a
42+
* successful run only checkpoints &gt; x will arrive
43+
* <li>There is no other writer which would generate another commit to the same branch with the
44+
* same jobId-operatorId-checkpointId triplet
45+
* </ul>
46+
*/
47+
public class IcebergCommitter implements Committer<IcebergCommittable> {
48+
49+
private static final Logger LOG = LoggerFactory.getLogger(IcebergCommitter.class);
50+
51+
public IcebergCommitter(Map<String, String> catalogOptions, String commitUser) {}
52+
53+
@Override
54+
public void commit(Collection<CommitRequest<IcebergCommittable>> commitRequests)
55+
throws IOException, InterruptedException {
56+
if (commitRequests.isEmpty()) {
57+
return;
58+
}
59+
NavigableMap<Long, CommitRequest<IcebergCommittable>> commitRequestMap = Maps.newTreeMap();
60+
for (CommitRequest<IcebergCommittable> request : commitRequests) {
61+
commitRequestMap.put(request.getCommittable().checkpointId(), request);
62+
}
63+
// TODO commit
64+
List<IcebergCommittable> committables =
65+
commitRequests.stream()
66+
.map(CommitRequest::getCommittable)
67+
.collect(Collectors.toList());
68+
}
69+
70+
@Override
71+
public void close() throws Exception {}
72+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEventSinkV2.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import java.time.ZoneId;
88
import java.util.Map;
99

10-
public class IcebergEventSinkV2 extends IcebergSinkV2 implements SupportsPreWriteTopology<Event> {
10+
public class IcebergEventSinkV2 extends IcebergSinkV2<Event>
11+
implements SupportsPreWriteTopology<Event> {
1112

1213
public final String schemaOperatorUid;
1314

Original file line numberDiff line numberDiff line change
@@ -1,25 +1,17 @@
11
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
22

33
import org.apache.flink.api.connector.sink2.Committer;
4-
import org.apache.flink.api.connector.sink2.CommitterInitContext;
5-
import org.apache.flink.api.connector.sink2.Sink;
64
import org.apache.flink.api.connector.sink2.SinkWriter;
7-
import org.apache.flink.api.connector.sink2.SupportsCommitter;
8-
import org.apache.flink.api.connector.sink2.WriterInitContext;
95
import org.apache.flink.cdc.common.event.Event;
106
import org.apache.flink.core.io.SimpleVersionedSerializer;
11-
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
7+
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
8+
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
129
import org.apache.flink.streaming.api.datastream.DataStream;
1310

14-
import org.apache.iceberg.catalog.ImmutableTableCommit;
15-
1611
import java.io.IOException;
1712
import java.util.Map;
1813

19-
public class IcebergSinkV2
20-
implements Sink<Event>,
21-
SupportsPreWriteTopology<Event>,
22-
SupportsCommitter<ImmutableTableCommit> {
14+
public class IcebergSinkV2<InputT> implements WithPreCommitTopology<InputT, IcebergCommittable> {
2315

2416
// provided a default commit user.
2517
public static final String DEFAULT_COMMIT_USER = "admin";
@@ -47,28 +39,23 @@ public IcebergSinkV2(
4739
}
4840

4941
@Override
50-
public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
51-
return null;
42+
public Committer<IcebergCommittable> createCommitter() {
43+
return new IcebergCommitter(catalogOptions, commitUser);
5244
}
5345

5446
@Override
55-
public Committer<ImmutableTableCommit> createCommitter(
56-
CommitterInitContext committerInitContext) throws IOException {
57-
return null;
58-
}
59-
60-
@Override
61-
public SimpleVersionedSerializer<ImmutableTableCommit> getCommittableSerializer() {
62-
return null;
47+
public SimpleVersionedSerializer<IcebergCommittable> getCommittableSerializer() {
48+
return new IcebergCommittableSerializer();
6349
}
6450

6551
@Override
66-
public SinkWriter<Event> createWriter(InitContext initContext) throws IOException {
52+
public SinkWriter<InputT> createWriter(InitContext initContext) throws IOException {
6753
return null;
6854
}
6955

7056
@Override
71-
public SinkWriter<Event> createWriter(WriterInitContext context) throws IOException {
72-
return Sink.super.createWriter(context);
57+
public DataStream<CommittableMessage<IcebergCommittable>> addPreCommitTopology(
58+
DataStream<CommittableMessage<IcebergCommittable>> dataStream) {
59+
return dataStream;
7360
}
7461
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ public void write(InputT inputT, Context context) throws IOException, Interrupte
8585
// Write the data to Iceberg
8686
if (icebergEvent.getGenericRow() != null) {
8787
TaskWriter<RowData> writer = writes.computeIfAbsent(tableId, id -> getTaskWriter());
88-
8988
try {
9089
writer.write(icebergEvent.getGenericRow());
9190
} catch (Exception e) {

0 commit comments

Comments
 (0)