Skip to content

Commit b32df93

Browse files
[Feature][Connectors-v2-Paimon] Adaptation Paimon 0.6 Version (#6061)
1 parent b3dc0bb commit b32df93

File tree

4 files changed

+16
-11
lines changed

4 files changed

+16
-11
lines changed

seatunnel-connectors-v2/connector-paimon/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<name>SeaTunnel : Connectors V2 : Paimon</name>
3131

3232
<properties>
33-
<paimon.version>0.4.0-incubating</paimon.version>
33+
<paimon.version>0.6.0-incubating</paimon.version>
3434
</properties>
3535

3636
<dependencies>

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2727
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
2828
import org.apache.seatunnel.api.sink.SinkWriter;
29+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2930
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3031
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3132
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -123,6 +124,11 @@ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
123124
this.seaTunnelRowType = seaTunnelRowType;
124125
}
125126

127+
@Override
128+
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
129+
return this.seaTunnelRowType;
130+
}
131+
126132
@Override
127133
public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState> createWriter(
128134
SinkWriter.Context context) throws IOException {

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@
2727
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
2828

2929
import org.apache.paimon.data.InternalRow;
30-
import org.apache.paimon.operation.Lock;
3130
import org.apache.paimon.table.Table;
3231
import org.apache.paimon.table.sink.BatchTableCommit;
3332
import org.apache.paimon.table.sink.BatchTableWrite;
33+
import org.apache.paimon.table.sink.BatchWriteBuilder;
3434
import org.apache.paimon.table.sink.CommitMessage;
35-
import org.apache.paimon.table.sink.InnerTableCommit;
3635

3736
import lombok.extern.slf4j.Slf4j;
3837

@@ -51,6 +50,8 @@ public class PaimonSinkWriter
5150

5251
private String commitUser = UUID.randomUUID().toString();
5352

53+
private final BatchWriteBuilder tableWriteBuilder;
54+
5455
private final BatchTableWrite tableWrite;
5556

5657
private long checkpointId = 0;
@@ -65,7 +66,8 @@ public class PaimonSinkWriter
6566

6667
public PaimonSinkWriter(Context context, Table table, SeaTunnelRowType seaTunnelRowType) {
6768
this.table = table;
68-
this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
69+
this.tableWriteBuilder = this.table.newBatchWriteBuilder().withOverwrite();
70+
this.tableWrite = tableWriteBuilder.newWrite();
6971
this.seaTunnelRowType = seaTunnelRowType;
7072
this.context = context;
7173
}
@@ -76,17 +78,16 @@ public PaimonSinkWriter(
7678
SeaTunnelRowType seaTunnelRowType,
7779
List<PaimonSinkState> states) {
7880
this.table = table;
79-
this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
81+
this.tableWriteBuilder = this.table.newBatchWriteBuilder().withOverwrite();
82+
this.tableWrite = tableWriteBuilder.newWrite();
8083
this.seaTunnelRowType = seaTunnelRowType;
8184
this.context = context;
8285
if (Objects.isNull(states) || states.isEmpty()) {
8386
return;
8487
}
8588
this.commitUser = states.get(0).getCommitUser();
8689
this.checkpointId = states.get(0).getCheckpointId();
87-
try (BatchTableCommit tableCommit =
88-
((InnerTableCommit) table.newBatchWriteBuilder().newCommit())
89-
.withLock(Lock.emptyFactory().create())) {
90+
try (BatchTableCommit tableCommit = tableWriteBuilder.newCommit()) {
9091
List<CommitMessage> commitables =
9192
states.stream()
9293
.map(PaimonSinkState::getCommittables)

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.paimon.table.Table;
2626
import org.apache.paimon.table.sink.BatchTableCommit;
2727
import org.apache.paimon.table.sink.CommitMessage;
28-
import org.apache.paimon.table.sink.InnerTableCommit;
2928

3029
import lombok.extern.slf4j.Slf4j;
3130

@@ -54,8 +53,7 @@ public PaimonAggregatedCommitter(Table table) {
5453
public List<PaimonAggregatedCommitInfo> commit(
5554
List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws IOException {
5655
try (BatchTableCommit tableCommit =
57-
((InnerTableCommit) table.newBatchWriteBuilder().newCommit())
58-
.withLock(localFactory.create())) {
56+
table.newBatchWriteBuilder().withOverwrite().newCommit()) {
5957
List<CommitMessage> fileCommittables =
6058
aggregatedCommitInfo.stream()
6159
.map(PaimonAggregatedCommitInfo::getCommittables)

0 commit comments

Comments
 (0)