Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36701][cdc-runtime] Add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent #3802

Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,44 @@

package org.apache.flink.cdc.common.event;

import java.util.List;
import java.util.Objects;

/**
* An {@link Event} from {@code SchemaOperator} to notify {@code DataSinkWriterOperator} that it
* start flushing.
*/
public class FlushEvent implements Event {
/** The sink table(s) that need to be flushed. */
private final List<TableId> tableIds;

/** Which subTask ID this FlushEvent was initiated from. */
private final int sourceSubTaskId;

public FlushEvent(int sourceSubTaskId) {
/** Which type of schema change event caused this FlushEvent. */
private final SchemaChangeEventType schemaChangeEventType;

public FlushEvent(
int sourceSubTaskId,
List<TableId> tableIds,
SchemaChangeEventType schemaChangeEventType) {
this.tableIds = tableIds;
this.sourceSubTaskId = sourceSubTaskId;
this.schemaChangeEventType = schemaChangeEventType;
}

public List<TableId> getTableIds() {
return tableIds;
}

public int getSourceSubTaskId() {
return sourceSubTaskId;
}

public SchemaChangeEventType getSchemaChangeEventType() {
return schemaChangeEventType;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -45,7 +64,9 @@ public boolean equals(Object o) {
return false;
}
FlushEvent that = (FlushEvent) o;
return sourceSubTaskId == that.sourceSubTaskId;
return sourceSubTaskId == that.sourceSubTaskId
&& Objects.equals(tableIds, that.tableIds)
&& Objects.equals(schemaChangeEventType, that.schemaChangeEventType);
}

@Override
Expand All @@ -55,6 +76,13 @@ public int hashCode() {

@Override
public String toString() {
return "FlushEvent{" + "sourceSubTaskId=" + sourceSubTaskId + '}';
return "FlushEvent{"
+ "sourceSubTaskId="
+ sourceSubTaskId
+ ", tableIds="
+ tableIds
+ ", schemaChangeEventType="
+ schemaChangeEventType
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
output.collect(
new StreamRecord<>(
new BucketWrapperFlushEvent(
currentTaskNumber, ((FlushEvent) event).getSourceSubTaskId())));
currentTaskNumber,
((FlushEvent) event).getSourceSubTaskId(),
((FlushEvent) event).getTableIds(),
((FlushEvent) event).getSchemaChangeEventType())));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.runtime.serializer.EnumSerializer;
import org.apache.flink.cdc.runtime.serializer.ListSerializer;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
Expand All @@ -40,8 +43,10 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton<Event>

private final EventSerializer eventSerializer = EventSerializer.INSTANCE;

private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;

private final ListSerializer<TableId> tableIdListSerializer =
new ListSerializer<>(TableIdSerializer.INSTANCE);
private final EnumSerializer<SchemaChangeEventType> schemaChangeEventTypeEnumSerializer =
new EnumSerializer<>(SchemaChangeEventType.class);
/** Sharable instance of the TableIdSerializer. */
public static final BucketWrapperEventSerializer INSTANCE = new BucketWrapperEventSerializer();

Expand Down Expand Up @@ -82,14 +87,21 @@ public void serialize(Event event, DataOutputView dataOutputView) throws IOExcep
BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event;
dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
dataOutputView.writeInt(bucketWrapperFlushEvent.getSourceSubTaskId());
tableIdListSerializer.serialize(bucketWrapperFlushEvent.getTableIds(), dataOutputView);
schemaChangeEventTypeEnumSerializer.serialize(
bucketWrapperFlushEvent.getSchemaChangeEventType(), dataOutputView);
}
}

@Override
public Event deserialize(DataInputView source) throws IOException {
EventClass eventClass = enumSerializer.deserialize(source);
if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) {
return new BucketWrapperFlushEvent(source.readInt(), source.readInt());
return new BucketWrapperFlushEvent(
source.readInt(),
source.readInt(),
tableIdListSerializer.deserialize(source),
schemaChangeEventTypeEnumSerializer.deserialize(source));
} else {
return new BucketWrapperChangeEvent(
source.readInt(), (ChangeEvent) eventSerializer.deserialize(source));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;

import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;

import java.util.List;
import java.util.Objects;

/** A wrapper class for {@link FlushEvent} to attach bucket id. */
public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper {

private final int bucket;

public BucketWrapperFlushEvent(int bucket, int subTaskId) {
super(subTaskId);
public BucketWrapperFlushEvent(
int bucket,
int subTaskId,
List<TableId> tableIds,
SchemaChangeEventType schemaChangeEventType) {
super(subTaskId, tableIds, schemaChangeEventType);
this.bucket = bucket;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,6 @@ public void testOceanBaseCDC() throws Exception {
expectResult,
"ob_products_sink",
new String[] {"id", "name", "description", "weight", "enum_c", "json_c"},
60000L);
300000L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void processElement(StreamRecord<PartitioningEvent> streamRecord) throws

// Then, notify this information to the coordinator
requestSchemaChange(
tableId,
new SchemaChangeRequest(sourcePartition, subTaskId, schemaChangeEvent));
schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(1);
} else if (event instanceof DataChangeEvent) {
Expand Down Expand Up @@ -188,9 +189,15 @@ public void processElement(StreamRecord<PartitioningEvent> streamRecord) throws
}
}

private void requestSchemaChange(SchemaChangeRequest schemaChangeRequest) {
private void requestSchemaChange(
TableId sourceTableId, SchemaChangeRequest schemaChangeRequest) {
LOG.info("{}> Sent FlushEvent to downstream...", subTaskId);
output.collect(new StreamRecord<>(new FlushEvent(subTaskId)));
output.collect(
new StreamRecord<>(
new FlushEvent(
subTaskId,
tableIdRouter.route(sourceTableId),
schemaChangeRequest.getSchemaChangeEvent().getType())));

LOG.info("{}> Sending evolve request...", subTaskId);
SchemaChangeResponse response = sendRequestToCoordinator(schemaChangeRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,10 @@ private void handleSchemaChangeEvent(SchemaChangeEvent originalEvent) throws Exc
schemaOperatorMetrics.increaseSchemaChangeEvents(1);

// First, send FlushEvent or it might be blocked later
List<TableId> sinkTables = router.route(tableId);
LOG.info("{}> Sending the FlushEvent.", subTaskId);
output.collect(new StreamRecord<>(new FlushEvent(subTaskId)));
output.collect(
new StreamRecord<>(new FlushEvent(subTaskId, sinkTables, originalEvent.getType())));

// Then, queue to request schema change to SchemaCoordinator.
SchemaChangeResponse response = requestSchemaChange(tableId, originalEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -112,6 +113,20 @@ public void processElement(StreamRecord<Event> element) throws Exception {
// ----------------------------- Helper functions -------------------------------
private void handleFlushEvent(FlushEvent event) throws Exception {
userFunction.finish();
if (event.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE) {
event.getTableIds().stream()
.filter(tableId -> !processedTableIds.contains(tableId))
.forEach(
tableId -> {
LOG.info("Table {} has not been processed", tableId);
try {
emitLatestSchema(tableId);
} catch (Exception e) {
throw new RuntimeException(e);
}
processedTableIds.add(tableId);
});
}
schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -198,6 +199,20 @@ public void endInput() throws Exception {

private void handleFlushEvent(FlushEvent event) throws Exception {
copySinkWriter.flush(false);
if (event.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE) {
event.getTableIds().stream()
.filter(tableId -> !processedTableIds.contains(tableId))
.forEach(
tableId -> {
LOG.info("Table {} has not been processed", tableId);
try {
emitLatestSchema(tableId);
} catch (Exception e) {
throw new RuntimeException(e);
}
processedTableIds.add(tableId);
});
}
schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.runtime.serializer.EnumSerializer;
import org.apache.flink.cdc.runtime.serializer.ListSerializer;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
Expand All @@ -42,11 +45,14 @@ public final class EventSerializer extends TypeSerializerSingleton<Event> {

private final SchemaChangeEventSerializer schemaChangeEventSerializer =
SchemaChangeEventSerializer.INSTANCE;
private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
private final ListSerializer<TableId> listSerializer =
new ListSerializer<>(TableIdSerializer.INSTANCE);
private final EnumSerializer<EventClass> enumSerializer =
new EnumSerializer<>(EventClass.class);
private final TypeSerializer<DataChangeEvent> dataChangeEventSerializer =
DataChangeEventSerializer.INSTANCE;
private final EnumSerializer<SchemaChangeEventType> schemaChangeEventTypeEnumSerializer =
new EnumSerializer<>(SchemaChangeEventType.class);

@Override
public boolean isImmutableType() {
Expand All @@ -62,7 +68,11 @@ public Event createInstance() {
public Event copy(Event from) {
if (from instanceof FlushEvent) {
FlushEvent flushEvent = (FlushEvent) from;
return new FlushEvent(((FlushEvent) from).getSourceSubTaskId());
return new FlushEvent(
flushEvent.getSourceSubTaskId(),
listSerializer.copy(((FlushEvent) from).getTableIds()),
schemaChangeEventTypeEnumSerializer.copy(
flushEvent.getSchemaChangeEventType()));
} else if (from instanceof SchemaChangeEvent) {
return schemaChangeEventSerializer.copy((SchemaChangeEvent) from);
} else if (from instanceof DataChangeEvent) {
Expand All @@ -86,6 +96,9 @@ public void serialize(Event record, DataOutputView target) throws IOException {
if (record instanceof FlushEvent) {
enumSerializer.serialize(EventClass.FLUSH_EVENT, target);
target.writeInt(((FlushEvent) record).getSourceSubTaskId());
listSerializer.serialize(((FlushEvent) record).getTableIds(), target);
schemaChangeEventTypeEnumSerializer.serialize(
((FlushEvent) record).getSchemaChangeEventType(), target);
} else if (record instanceof SchemaChangeEvent) {
enumSerializer.serialize(EventClass.SCHEME_CHANGE_EVENT, target);
schemaChangeEventSerializer.serialize((SchemaChangeEvent) record, target);
Expand All @@ -102,7 +115,10 @@ public Event deserialize(DataInputView source) throws IOException {
EventClass eventClass = enumSerializer.deserialize(source);
switch (eventClass) {
case FLUSH_EVENT:
return new FlushEvent(source.readInt());
return new FlushEvent(
source.readInt(),
listSerializer.deserialize(source),
schemaChangeEventTypeEnumSerializer.deserialize(source));
case DATA_CHANGE_EVENT:
return dataChangeEventSerializer.deserialize(source);
case SCHEME_CHANGE_EVENT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,24 @@ void testLenientSchemaEvolution() throws Exception {
}))
.map(StreamRecord::getValue)
.containsExactly(
new FlushEvent(0),
new FlushEvent(
0, Collections.singletonList(TABLE_ID), createTableEvent.getType()),
createTableEvent,
genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"),
new FlushEvent(0),
new FlushEvent(
0, Collections.singletonList(TABLE_ID), addColumnEvent.getType()),
addColumnEventAtLast,
genInsert(TABLE_ID, "ISFSB", 2, "Bob", 31.415926f, "Bye-bye", false),
new FlushEvent(0),
new FlushEvent(
0,
Collections.singletonList(TABLE_ID),
renameColumnEvent.getType()),
appendRenamedColumnAtLast,
genInsert(TABLE_ID, "ISFSBS", 3, "Cicada", 123.456f, null, true, "Ok"),
new FlushEvent(0),
new FlushEvent(
0,
Collections.singletonList(TABLE_ID),
alterColumnTypeEvent.getType()),
alterColumnTypeEventWithBackfill,
genInsert(
TABLE_ID,
Expand All @@ -201,11 +209,16 @@ void testLenientSchemaEvolution() throws Exception {
null,
false,
"Nah"),
new FlushEvent(0),
new FlushEvent(
0, Collections.singletonList(TABLE_ID), dropColumnEvent.getType()),
genInsert(TABLE_ID, "ISDSBS", 5, "Eve", 1.414, null, true, null),
new FlushEvent(0),
new FlushEvent(
0,
Collections.singletonList(TABLE_ID),
truncateTableEvent.getType()),
genInsert(TABLE_ID, "ISDSBS", 6, "Ferris", 0.001, null, false, null),
new FlushEvent(0));
new FlushEvent(
0, Collections.singletonList(TABLE_ID), dropTableEvent.getType()));
}

@Test
Expand Down Expand Up @@ -308,7 +321,8 @@ void testIgnoreSchemaEvolution() throws Exception {
}))
.map(StreamRecord::getValue)
.containsExactly(
new FlushEvent(0),
new FlushEvent(
0, Collections.singletonList(TABLE_ID), createTableEvent.getType()),
createTableEvent,
genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"),
genInsert(TABLE_ID, "ISFS", 2, "Bob", 31.415926f, "Bye-bye"),
Expand Down
Loading
Loading