Skip to content

Commit 75b8a0c

Browse files
lvyanquancdcbot
andauthored
[FLINK-36964][pipeline-connector/paimon] Fix potential exception when SchemaChange in parallel with Paimon Sink
This closes #3818 Co-authored-by: yuxiqian.yxq <[email protected]>
1 parent 2fd03e6 commit 75b8a0c

File tree

15 files changed

+794
-86
lines changed

15 files changed

+794
-86
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,19 @@ limitations under the License.
285285
</execution>
286286
</executions>
287287
</plugin>
288+
289+
<plugin>
290+
<groupId>org.apache.maven.plugins</groupId>
291+
<artifactId>maven-jar-plugin</artifactId>
292+
<executions>
293+
<execution>
294+
<id>test-jar</id>
295+
<goals>
296+
<goal>test-jar</goal>
297+
</goals>
298+
</execution>
299+
</executions>
300+
</plugin>
288301
</plugins>
289302
</build>
290303

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,9 @@
2424
import org.apache.flink.cdc.common.factories.FactoryHelper;
2525
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
2626
import org.apache.flink.cdc.common.sink.DataSink;
27-
import org.apache.flink.cdc.common.utils.Preconditions;
2827
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordEventSerializer;
2928
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
3029

31-
import org.apache.paimon.catalog.Catalog;
32-
import org.apache.paimon.flink.FlinkCatalogFactory;
3330
import org.apache.paimon.options.Options;
3431

3532
import java.time.ZoneId;
@@ -71,12 +68,6 @@ public DataSink createDataSink(Context context) {
7168
Options options = Options.fromMap(catalogOptions);
7269
// Avoid using previous table schema.
7370
options.setString("cache-enabled", "false");
74-
try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) {
75-
Preconditions.checkNotNull(
76-
catalog.listDatabases(), "catalog option of Paimon is invalid.");
77-
} catch (Exception e) {
78-
throw new RuntimeException("failed to create or use paimon catalog", e);
79-
}
8071
ZoneId zoneId = ZoneId.systemDefault();
8172
if (!Objects.equals(
8273
context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
2222
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapper;
2323
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
24+
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.FlushEventAlignmentOperator;
2425
import org.apache.flink.core.io.SimpleVersionedSerializer;
2526
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
2627
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -63,7 +64,12 @@ public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
6364
// All Events after BucketAssignOperator are decorated with BucketWrapper.
6465
.partitionCustom(
6566
(bucket, numPartitions) -> bucket % numPartitions,
66-
(event) -> ((BucketWrapper) event).getBucket());
67+
(event) -> ((BucketWrapper) event).getBucket())
68+
// Avoid disorder of FlushEvent and DataChangeEvent.
69+
.transform(
70+
"FlushEventAlignment",
71+
new BucketWrapperEventTypeInfo(),
72+
new FlushEventAlignmentOperator());
6773
}
6874

6975
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.event.Event;
2424
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2525
import org.apache.flink.cdc.common.event.TableId;
26+
import org.apache.flink.cdc.common.schema.Schema;
2627
import org.apache.flink.cdc.common.utils.SchemaUtils;
2728
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent;
2829

@@ -66,13 +67,14 @@ public PaimonEvent serialize(Event event) {
6667
new TableSchemaInfo(createTableEvent.getSchema(), zoneId));
6768
} else {
6869
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
69-
schemaMaps.put(
70-
schemaChangeEvent.tableId(),
71-
new TableSchemaInfo(
72-
SchemaUtils.applySchemaChangeEvent(
73-
schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
74-
schemaChangeEvent),
75-
zoneId));
70+
Schema schema = schemaMaps.get(schemaChangeEvent.tableId()).getSchema();
71+
if (!SchemaUtils.isSchemaChangeEventRedundant(schema, schemaChangeEvent)) {
72+
schemaMaps.put(
73+
schemaChangeEvent.tableId(),
74+
new TableSchemaInfo(
75+
SchemaUtils.applySchemaChangeEvent(schema, schemaChangeEvent),
76+
zoneId));
77+
}
7678
}
7779
return new PaimonEvent(tableId, null, true);
7880
} else if (event instanceof DataChangeEvent) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ public DataStream<CommittableMessage<MultiTableCommittable>> addPreCommitTopolog
9494

9595
// add correct checkpointId to MultiTableCommittable and recreate CommittableSummary.
9696
return partitioned
97-
.transform("preCommit", typeInformation, new PreCommitOperator())
97+
.transform(
98+
"preCommit",
99+
typeInformation,
100+
new PreCommitOperator(catalogOptions, commitUser))
98101
.setParallelism(committables.getParallelism());
99102
}
100103
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java

Lines changed: 72 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,25 @@
1717

1818
package org.apache.flink.cdc.connectors.paimon.sink.v2;
1919

20+
import org.apache.flink.runtime.state.StateSnapshotContext;
2021
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
21-
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
2222
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
2323
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
2424
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
2525
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
2626

27+
import org.apache.paimon.catalog.Catalog;
28+
import org.apache.paimon.flink.FlinkCatalogFactory;
29+
import org.apache.paimon.flink.sink.Committer;
2730
import org.apache.paimon.flink.sink.MultiTableCommittable;
31+
import org.apache.paimon.flink.sink.StoreMultiCommitter;
32+
import org.apache.paimon.manifest.WrappedManifestCommittable;
33+
import org.apache.paimon.options.Options;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
2836

2937
import java.util.ArrayList;
38+
import java.util.Collections;
3039
import java.util.List;
3140

3241
/** An Operator to add checkpointId to MultiTableCommittable and generate CommittableSummary. */
@@ -35,12 +44,23 @@ public class PreCommitOperator
3544
implements OneInputStreamOperator<
3645
CommittableMessage<MultiTableCommittable>,
3746
CommittableMessage<MultiTableCommittable>> {
47+
protected static final Logger LOGGER = LoggerFactory.getLogger(PreCommitOperator.class);
48+
49+
private final String commitUser;
50+
51+
private final Options catalogOptions;
52+
53+
private Catalog catalog;
54+
55+
private StoreMultiCommitter storeMultiCommitter;
3856

3957
/** store a list of MultiTableCommittable in one checkpoint. */
40-
private final List<MultiTableCommittable> results;
58+
private final List<MultiTableCommittable> multiTableCommittables;
4159

42-
public PreCommitOperator() {
43-
results = new ArrayList<>();
60+
public PreCommitOperator(Options catalogOptions, String commitUser) {
61+
multiTableCommittables = new ArrayList<>();
62+
this.catalogOptions = catalogOptions;
63+
this.commitUser = commitUser;
4464
}
4565

4666
@Override
@@ -50,8 +70,16 @@ public void open() throws Exception {
5070

5171
@Override
5272
public void processElement(StreamRecord<CommittableMessage<MultiTableCommittable>> element) {
73+
if (catalog == null) {
74+
this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
75+
this.storeMultiCommitter =
76+
new StoreMultiCommitter(
77+
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions),
78+
Committer.createContext(
79+
commitUser, getMetricGroup(), true, false, null));
80+
}
5381
if (element.getValue() instanceof CommittableWithLineage) {
54-
results.add(
82+
multiTableCommittables.add(
5583
((CommittableWithLineage<MultiTableCommittable>) element.getValue())
5684
.getCommittable());
5785
}
@@ -64,34 +92,44 @@ public void finish() {
6492

6593
@Override
6694
public void prepareSnapshotPreBarrier(long checkpointId) {
67-
// CommittableSummary should be sent before all CommittableWithLineage.
68-
CommittableMessage<MultiTableCommittable> summary =
69-
new CommittableSummary<>(
70-
getRuntimeContext().getIndexOfThisSubtask(),
71-
getRuntimeContext().getNumberOfParallelSubtasks(),
72-
checkpointId,
73-
results.size(),
74-
results.size(),
75-
0);
76-
output.collect(new StreamRecord<>(summary));
77-
78-
results.forEach(
79-
committable -> {
80-
// update the right checkpointId for MultiTableCommittable
81-
MultiTableCommittable committableWithCheckPointId =
82-
new MultiTableCommittable(
83-
committable.getDatabase(),
84-
committable.getTable(),
85-
checkpointId,
86-
committable.kind(),
87-
committable.wrappedCommittable());
88-
CommittableMessage<MultiTableCommittable> message =
89-
new CommittableWithLineage<>(
90-
committableWithCheckPointId,
91-
checkpointId,
92-
getRuntimeContext().getIndexOfThisSubtask());
93-
output.collect(new StreamRecord<>(message));
94-
});
95-
results.clear();
95+
for (int i = 0; i < multiTableCommittables.size(); i++) {
96+
MultiTableCommittable multiTableCommittable = multiTableCommittables.get(i);
97+
multiTableCommittables.set(
98+
i,
99+
new MultiTableCommittable(
100+
multiTableCommittable.getDatabase(),
101+
multiTableCommittable.getTable(),
102+
checkpointId,
103+
multiTableCommittable.kind(),
104+
multiTableCommittable.wrappedCommittable()));
105+
}
106+
}
107+
108+
@Override
109+
public void snapshotState(StateSnapshotContext context) throws Exception {
110+
super.snapshotState(context);
111+
long checkpointId = context.getCheckpointId();
112+
if (!multiTableCommittables.isEmpty()) {
113+
multiTableCommittables.forEach(
114+
(multiTableCommittable) ->
115+
LOGGER.debug(
116+
"Try to commit for {}.{} : {} in checkpoint {}",
117+
multiTableCommittable.getDatabase(),
118+
multiTableCommittable.getTable(),
119+
multiTableCommittables,
120+
checkpointId));
121+
WrappedManifestCommittable wrappedManifestCommittable =
122+
storeMultiCommitter.combine(checkpointId, checkpointId, multiTableCommittables);
123+
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
124+
multiTableCommittables.clear();
125+
}
126+
}
127+
128+
@Override
129+
public void close() throws Exception {
130+
super.close();
131+
if (storeMultiCommitter != null) {
132+
storeMultiCommitter.close();
133+
}
96134
}
97135
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.api.java.tuple.Tuple4;
2121
import org.apache.flink.cdc.common.event.ChangeEvent;
22-
import org.apache.flink.cdc.common.event.CreateTableEvent;
2322
import org.apache.flink.cdc.common.event.DataChangeEvent;
2423
import org.apache.flink.cdc.common.event.Event;
2524
import org.apache.flink.cdc.common.event.FlushEvent;
@@ -53,6 +52,8 @@
5352
import org.apache.paimon.table.sink.RowKeyExtractor;
5453
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
5554
import org.apache.paimon.utils.MathUtils;
55+
import org.slf4j.Logger;
56+
import org.slf4j.LoggerFactory;
5657

5758
import java.time.ZoneId;
5859
import java.util.HashMap;
@@ -63,6 +64,8 @@
6364
public class BucketAssignOperator extends AbstractStreamOperator<Event>
6465
implements OneInputStreamOperator<Event, Event> {
6566

67+
protected static final Logger LOGGER = LoggerFactory.getLogger(BucketAssignOperator.class);
68+
6669
public final String commitUser;
6770

6871
private final Options catalogOptions;
@@ -99,8 +102,8 @@ public void open() throws Exception {
99102
super.open();
100103
this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
101104
this.bucketAssignerMap = new HashMap<>();
102-
this.totalTasksNumber = getRuntimeContext().getNumberOfParallelSubtasks();
103-
this.currentTaskNumber = getRuntimeContext().getIndexOfThisSubtask();
105+
this.totalTasksNumber = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
106+
this.currentTaskNumber = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
104107
this.schemaMaps = new HashMap<>();
105108
}
106109

@@ -121,13 +124,16 @@ public void setup(
121124
public void processElement(StreamRecord<Event> streamRecord) throws Exception {
122125
Event event = streamRecord.getValue();
123126
if (event instanceof FlushEvent) {
124-
output.collect(
125-
new StreamRecord<>(
126-
new BucketWrapperFlushEvent(
127-
currentTaskNumber,
128-
((FlushEvent) event).getSourceSubTaskId(),
129-
((FlushEvent) event).getTableIds(),
130-
((FlushEvent) event).getSchemaChangeEventType())));
127+
for (int i = 0; i < totalTasksNumber; i++) {
128+
output.collect(
129+
new StreamRecord<>(
130+
new BucketWrapperFlushEvent(
131+
i,
132+
((FlushEvent) event).getSourceSubTaskId(),
133+
currentTaskNumber,
134+
((FlushEvent) event).getTableIds(),
135+
((FlushEvent) event).getSchemaChangeEventType())));
136+
}
131137
return;
132138
}
133139

@@ -181,24 +187,21 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
181187
}
182188
output.collect(
183189
new StreamRecord<>(new BucketWrapperChangeEvent(bucket, (ChangeEvent) event)));
184-
} else if (event instanceof CreateTableEvent) {
185-
CreateTableEvent createTableEvent = (CreateTableEvent) event;
186-
schemaMaps.put(
187-
createTableEvent.tableId(),
188-
new TableSchemaInfo(createTableEvent.getSchema(), zoneId));
189-
output.collect(
190-
new StreamRecord<>(
191-
new BucketWrapperChangeEvent(currentTaskNumber, (ChangeEvent) event)));
192190
} else if (event instanceof SchemaChangeEvent) {
193191
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
194192
Schema schema =
195193
SchemaUtils.applySchemaChangeEvent(
196-
schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
194+
Optional.ofNullable(schemaMaps.get(schemaChangeEvent.tableId()))
195+
.map(TableSchemaInfo::getSchema)
196+
.orElse(null),
197197
schemaChangeEvent);
198198
schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, zoneId));
199-
output.collect(
200-
new StreamRecord<>(
201-
new BucketWrapperChangeEvent(currentTaskNumber, (ChangeEvent) event)));
199+
// Broadcast SchemachangeEvent.
200+
for (int index = 0; index < totalTasksNumber; index++) {
201+
output.collect(
202+
new StreamRecord<>(
203+
new BucketWrapperChangeEvent(index, (ChangeEvent) event)));
204+
}
202205
}
203206
}
204207

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public void serialize(Event event, DataOutputView dataOutputView) throws IOExcep
8787
BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event;
8888
dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
8989
dataOutputView.writeInt(bucketWrapperFlushEvent.getSourceSubTaskId());
90+
dataOutputView.writeInt(bucketWrapperFlushEvent.getBucketAssignTaskId());
9091
tableIdListSerializer.serialize(bucketWrapperFlushEvent.getTableIds(), dataOutputView);
9192
schemaChangeEventTypeEnumSerializer.serialize(
9293
bucketWrapperFlushEvent.getSchemaChangeEventType(), dataOutputView);
@@ -98,6 +99,7 @@ public Event deserialize(DataInputView source) throws IOException {
9899
EventClass eventClass = enumSerializer.deserialize(source);
99100
if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) {
100101
return new BucketWrapperFlushEvent(
102+
source.readInt(),
101103
source.readInt(),
102104
source.readInt(),
103105
tableIdListSerializer.deserialize(source),

0 commit comments

Comments
 (0)