Skip to content

Commit dca94bd

Browse files
stayrascalwuzhiping
and
wuzhiping
authored
[FLINK-36790][cdc-connector][paimon] Set waitCompaction to true in PaimonWriter to avoid CME problem
This closes #3760 Co-authored-by: wuzhiping <[email protected]>
1 parent ddb5f00 commit dca94bd

File tree

2 files changed

+33
-25
lines changed

2 files changed

+33
-25
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public Collection<MultiTableCommittable> prepareCommit() throws IOException {
9696
for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
9797
Identifier key = entry.getKey();
9898
StoreSinkWrite write = entry.getValue();
99-
boolean waitCompaction = false;
99+
boolean waitCompaction = true;
100100
committables.addAll(
101101
// here we set it to lastCheckpointId+1 to
102102
// avoid prepareCommit the same checkpointId with the first round.

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java

+32-24
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
import org.junit.jupiter.api.Assertions;
5959
import org.junit.jupiter.api.io.TempDir;
6060
import org.junit.jupiter.params.ParameterizedTest;
61-
import org.junit.jupiter.params.provider.ValueSource;
61+
import org.junit.jupiter.params.provider.CsvSource;
6262

6363
import java.io.File;
6464
import java.io.IOException;
@@ -140,7 +140,7 @@ private void initialize(String metastore)
140140
.dropDatabase(TEST_DATABASE, true, true);
141141
}
142142

143-
private List<Event> createTestEvents() throws SchemaEvolveException {
143+
private List<Event> createTestEvents(boolean enableDeleteVectors) throws SchemaEvolveException {
144144
List<Event> testEvents = new ArrayList<>();
145145
// create table
146146
Schema schema =
@@ -149,6 +149,7 @@ private List<Event> createTestEvents() throws SchemaEvolveException {
149149
.physicalColumn("col2", DataTypes.STRING())
150150
.primaryKey("col1")
151151
.option("bucket", "1")
152+
.option("deletion-vectors.enabled", String.valueOf(enableDeleteVectors))
152153
.build();
153154
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
154155
testEvents.add(createTableEvent);
@@ -180,8 +181,8 @@ private List<Event> createTestEvents() throws SchemaEvolveException {
180181
}
181182

182183
@ParameterizedTest
183-
@ValueSource(strings = {"filesystem", "hive"})
184-
public void testSinkWithDataChange(String metastore)
184+
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
185+
public void testSinkWithDataChange(String metastore, boolean enableDeleteVector)
185186
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
186187
Catalog.DatabaseNotExistException, SchemaEvolveException {
187188
initialize(metastore);
@@ -192,7 +193,7 @@ public void testSinkWithDataChange(String metastore)
192193
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
193194

194195
// insert
195-
for (Event event : createTestEvents()) {
196+
for (Event event : createTestEvents(enableDeleteVector)) {
196197
writer.write(event, null);
197198
}
198199
writer.flush(false);
@@ -215,7 +216,7 @@ public void testSinkWithDataChange(String metastore)
215216
// delete
216217
Event event =
217218
DataChangeEvent.deleteEvent(
218-
TableId.tableId("test", "table1"),
219+
table1,
219220
generator.generate(
220221
new Object[] {
221222
BinaryStringData.fromString("1"),
@@ -240,7 +241,7 @@ public void testSinkWithDataChange(String metastore)
240241
// update
241242
event =
242243
DataChangeEvent.updateEvent(
243-
TableId.tableId("test", "table1"),
244+
table1,
244245
generator.generate(
245246
new Object[] {
246247
BinaryStringData.fromString("2"),
@@ -273,17 +274,19 @@ public void testSinkWithDataChange(String metastore)
273274
.collect()
274275
.forEachRemaining(result::add);
275276
// Each commit will generate one sequence number(equal to checkpointId).
276-
Assertions.assertEquals(
277-
Arrays.asList(
278-
Row.ofKind(RowKind.INSERT, 1L),
279-
Row.ofKind(RowKind.INSERT, 2L),
280-
Row.ofKind(RowKind.INSERT, 3L)),
281-
result);
277+
List<Row> expected =
278+
enableDeleteVector
279+
? Collections.singletonList(Row.ofKind(RowKind.INSERT, 3L))
280+
: Arrays.asList(
281+
Row.ofKind(RowKind.INSERT, 1L),
282+
Row.ofKind(RowKind.INSERT, 2L),
283+
Row.ofKind(RowKind.INSERT, 3L));
284+
Assertions.assertEquals(expected, result);
282285
}
283286

284287
@ParameterizedTest
285-
@ValueSource(strings = {"filesystem", "hive"})
286-
public void testSinkWithSchemaChange(String metastore)
288+
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
289+
public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVector)
287290
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
288291
Catalog.DatabaseNotExistException, SchemaEvolveException {
289292
initialize(metastore);
@@ -294,7 +297,7 @@ public void testSinkWithSchemaChange(String metastore)
294297
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
295298

296299
// 1. receive only DataChangeEvents during one checkpoint
297-
for (Event event : createTestEvents()) {
300+
for (Event event : createTestEvents(enableDeleteVector)) {
298301
writer.write(event, null);
299302
}
300303
writer.flush(false);
@@ -427,8 +430,8 @@ public void testSinkWithSchemaChange(String metastore)
427430
}
428431

429432
@ParameterizedTest
430-
@ValueSource(strings = {"filesystem", "hive"})
431-
public void testSinkWithMultiTables(String metastore)
433+
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
434+
public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector)
432435
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
433436
Catalog.DatabaseNotExistException, SchemaEvolveException {
434437
initialize(metastore);
@@ -437,7 +440,7 @@ public void testSinkWithMultiTables(String metastore)
437440
catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
438441
PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext());
439442
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
440-
List<Event> testEvents = createTestEvents();
443+
List<Event> testEvents = createTestEvents(enableDeleteVector);
441444
// create table
442445
TableId table2 = TableId.tableId("test", "table2");
443446
Schema schema =
@@ -492,8 +495,8 @@ public void testSinkWithMultiTables(String metastore)
492495
}
493496

494497
@ParameterizedTest
495-
@ValueSource(strings = {"filesystem", "hive"})
496-
public void testDuplicateCommitAfterRestore(String metastore)
498+
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
499+
public void testDuplicateCommitAfterRestore(String metastore, boolean enableDeleteVector)
497500
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
498501
Catalog.DatabaseNotExistException, SchemaEvolveException {
499502
initialize(metastore);
@@ -504,7 +507,7 @@ public void testDuplicateCommitAfterRestore(String metastore)
504507
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
505508

506509
// insert
507-
for (Event event : createTestEvents()) {
510+
for (Event event : createTestEvents(enableDeleteVector)) {
508511
writer.write(event, null);
509512
}
510513
writer.flush(false);
@@ -553,8 +556,13 @@ public void testDuplicateCommitAfterRestore(String metastore)
553556
.execute()
554557
.collect()
555558
.forEachRemaining(result::add);
556-
// 8 APPEND and 1 COMPACT
557-
Assertions.assertEquals(result.size(), 9);
559+
if (enableDeleteVector) {
560+
// Each APPEND will trigger COMPACT once enable deletion-vectors.
561+
Assertions.assertEquals(16, result.size());
562+
} else {
563+
// 8 APPEND and 1 COMPACT
564+
Assertions.assertEquals(9, result.size());
565+
}
558566
result.clear();
559567

560568
tEnv.sqlQuery("select * from paimon_catalog.test.`table1`")

0 commit comments

Comments
 (0)