Skip to content

Commit a270664

Browse files
Jzjsnowjzjsnow
authored andcommitted
[FLINK-36701][cdc-runtime] Add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
1 parent b50d172 commit a270664

File tree

4 files changed

+38
-5
lines changed

4 files changed

+38
-5
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,27 @@ public class FlushEvent implements Event {
2828
/** The schema changes from which table. */
2929
private final TableId tableId;
3030

31+
/** Flag indicating whether the FlushEvent is sent before a create table event. */
32+
private final Boolean isForCreateTableEvent;
33+
3134
public FlushEvent(TableId tableId) {
3235
this.tableId = tableId;
36+
this.isForCreateTableEvent = false;
37+
}
38+
39+
public FlushEvent(TableId tableId, boolean isForCreateTableEvent) {
40+
this.tableId = tableId;
41+
this.isForCreateTableEvent = isForCreateTableEvent;
3342
}
3443

3544
public TableId getTableId() {
3645
return tableId;
3746
}
3847

48+
public Boolean getIsForCreateTableEvent() {
49+
return isForCreateTableEvent;
50+
}
51+
3952
@Override
4053
public boolean equals(Object o) {
4154
if (this == o) {
@@ -45,7 +58,8 @@ public boolean equals(Object o) {
4558
return false;
4659
}
4760
FlushEvent that = (FlushEvent) o;
48-
return Objects.equals(tableId, that.tableId);
61+
return Objects.equals(tableId, that.tableId)
62+
&& Objects.equals(isForCreateTableEvent, that.isForCreateTableEvent);
4963
}
5064

5165
@Override

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,15 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh
445445
// The request will block if another schema change event is being handled
446446
SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);
447447
if (response.isAccepted()) {
448-
LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId);
449-
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
448+
TableId sinkTable = response.getSchemaChangeEvents().get(0).tableId();
449+
LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, sinkTable);
450+
output.collect(
451+
new StreamRecord<>(
452+
new FlushEvent(
453+
sinkTable,
454+
schemaChangeEvent.getType()
455+
== SchemaChangeEventType.CREATE_TABLE)));
456+
450457
List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents();
451458
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
452459

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ public void endInput() throws Exception {
198198

199199
private void handleFlushEvent(FlushEvent event) throws Exception {
200200
copySinkWriter.flush(false);
201+
if (!processedTableIds.contains(event.getTableId()) && !event.getIsForCreateTableEvent()) {
202+
LOG.info("Table {} has not been processed", event.getTableId());
203+
emitLatestSchema(event.getTableId());
204+
processedTableIds.add(event.getTableId());
205+
}
201206
schemaEvolutionClient.notifyFlushSuccess(
202207
getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());
203208
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.cdc.common.event.Event;
2525
import org.apache.flink.cdc.common.event.FlushEvent;
2626
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
27+
import org.apache.flink.cdc.runtime.serializer.BooleanSerializer;
2728
import org.apache.flink.cdc.runtime.serializer.EnumSerializer;
2829
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
2930
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
@@ -47,6 +48,7 @@ public final class EventSerializer extends TypeSerializerSingleton<Event> {
4748
new EnumSerializer<>(EventClass.class);
4849
private final TypeSerializer<DataChangeEvent> dataChangeEventSerializer =
4950
DataChangeEventSerializer.INSTANCE;
51+
private final BooleanSerializer booleanSerializer = BooleanSerializer.INSTANCE;
5052

5153
@Override
5254
public boolean isImmutableType() {
@@ -61,7 +63,9 @@ public Event createInstance() {
6163
@Override
6264
public Event copy(Event from) {
6365
if (from instanceof FlushEvent) {
64-
return new FlushEvent(tableIdSerializer.copy(((FlushEvent) from).getTableId()));
66+
return new FlushEvent(
67+
tableIdSerializer.copy(((FlushEvent) from).getTableId()),
68+
booleanSerializer.copy(((FlushEvent) from).getIsForCreateTableEvent()));
6569
} else if (from instanceof SchemaChangeEvent) {
6670
return schemaChangeEventSerializer.copy((SchemaChangeEvent) from);
6771
} else if (from instanceof DataChangeEvent) {
@@ -85,6 +89,7 @@ public void serialize(Event record, DataOutputView target) throws IOException {
8589
if (record instanceof FlushEvent) {
8690
enumSerializer.serialize(EventClass.FLUSH_EVENT, target);
8791
tableIdSerializer.serialize(((FlushEvent) record).getTableId(), target);
92+
booleanSerializer.serialize(((FlushEvent) record).getIsForCreateTableEvent(), target);
8893
} else if (record instanceof SchemaChangeEvent) {
8994
enumSerializer.serialize(EventClass.SCHEME_CHANGE_EVENT, target);
9095
schemaChangeEventSerializer.serialize((SchemaChangeEvent) record, target);
@@ -101,7 +106,9 @@ public Event deserialize(DataInputView source) throws IOException {
101106
EventClass eventClass = enumSerializer.deserialize(source);
102107
switch (eventClass) {
103108
case FLUSH_EVENT:
104-
return new FlushEvent(tableIdSerializer.deserialize(source));
109+
return new FlushEvent(
110+
tableIdSerializer.deserialize(source),
111+
booleanSerializer.deserialize(source));
105112
case DATA_CHANGE_EVENT:
106113
return dataChangeEventSerializer.deserialize(source);
107114
case SCHEME_CHANGE_EVENT:

0 commit comments

Comments
 (0)