Skip to content

Commit 596fa99

Browse files
jzjsnow姜卓君
jzjsnow
authored and
姜卓君
committed
fixup! [FLINK-36701][cdc-runtime] Add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
1 parent 798ab3b commit 596fa99

File tree

4 files changed

+528
-9
lines changed

4 files changed

+528
-9
lines changed

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void testEvolveSchema() throws Exception {
108108

109109
Assertions.assertThat(
110110
ListUtils.union(
111-
Collections.singletonList(new FlushEvent(tableId)),
111+
Collections.singletonList(new FlushEvent(tableId, true)),
112112
createAndInsertDataEvents))
113113
.isEqualTo(
114114
harness.getOutputRecords().stream()
@@ -369,7 +369,7 @@ public void testTryEvolveSchema() throws Exception {
369369
.collect(Collectors.toList()))
370370
.isEqualTo(
371371
ListUtils.union(
372-
Collections.singletonList(new FlushEvent(tableId)),
372+
Collections.singletonList(new FlushEvent(tableId, true)),
373373
createAndInsertDataEvents));
374374

375375
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -626,7 +626,7 @@ public void testExceptionEvolveSchema() throws Exception {
626626
.collect(Collectors.toList()))
627627
.isEqualTo(
628628
ListUtils.union(
629-
Collections.singletonList(new FlushEvent(tableId)),
629+
Collections.singletonList(new FlushEvent(tableId, true)),
630630
createAndInsertDataEvents));
631631

632632
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -735,7 +735,7 @@ public void testIgnoreEvolveSchema() throws Exception {
735735
.collect(Collectors.toList()))
736736
.isEqualTo(
737737
ListUtils.union(
738-
Collections.singletonList(new FlushEvent(tableId)),
738+
Collections.singletonList(new FlushEvent(tableId, true)),
739739
createAndInsertDataEvents));
740740

741741
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1021,7 +1021,7 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)),
10211021
.collect(Collectors.toList()))
10221022
.isEqualTo(
10231023
ListUtils.union(
1024-
Collections.singletonList(new FlushEvent(tableId)),
1024+
Collections.singletonList(new FlushEvent(tableId, true)),
10251025
createAndInsertDataEvents));
10261026

10271027
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1108,7 +1108,7 @@ public void testTryEvolveSchemaWithFailure() throws Exception {
11081108
.collect(Collectors.toList()))
11091109
.isEqualTo(
11101110
ListUtils.union(
1111-
Collections.singletonList(new FlushEvent(tableId)),
1111+
Collections.singletonList(new FlushEvent(tableId, true)),
11121112
createAndInsertDataEvents));
11131113

11141114
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1420,7 +1420,7 @@ public void testFineGrainedSchemaEvolves() throws Exception {
14201420
.collect(Collectors.toList()))
14211421
.isEqualTo(
14221422
ListUtils.union(
1423-
Collections.singletonList(new FlushEvent(tableId)),
1423+
Collections.singletonList(new FlushEvent(tableId, true)),
14241424
createAndInsertDataEvents));
14251425

14261426
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1720,7 +1720,7 @@ public void testLenientSchemaEvolves() throws Exception {
17201720
.collect(Collectors.toList()))
17211721
.isEqualTo(
17221722
ListUtils.union(
1723-
Collections.singletonList(new FlushEvent(tableId)),
1723+
Collections.singletonList(new FlushEvent(tableId, true)),
17241724
createAndInsertDataEvents));
17251725

17261726
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -2100,7 +2100,7 @@ public void testLenientEvolveTweaks() throws Exception {
21002100
.collect(Collectors.toList()))
21012101
.isEqualTo(
21022102
ListUtils.union(
2103-
Collections.singletonList(new FlushEvent(tableId)),
2103+
Collections.singletonList(new FlushEvent(tableId, true)),
21042104
createAndInsertDataEvents));
21052105

21062106
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package org.apache.flink.cdc.runtime.operators.sink;
2+
3+
import org.apache.flink.cdc.common.event.ChangeEvent;
4+
import org.apache.flink.cdc.common.event.CreateTableEvent;
5+
import org.apache.flink.cdc.common.event.Event;
6+
import org.apache.flink.cdc.common.event.FlushEvent;
7+
import org.apache.flink.cdc.common.event.TableId;
8+
import org.apache.flink.cdc.common.schema.Schema;
9+
import org.apache.flink.runtime.jobgraph.OperatorID;
10+
import org.apache.flink.runtime.state.StateInitializationContext;
11+
import org.apache.flink.runtime.state.StateSnapshotContext;
12+
import org.apache.flink.streaming.api.graph.StreamConfig;
13+
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
14+
import org.apache.flink.streaming.api.operators.BoundedOneInput;
15+
import org.apache.flink.streaming.api.operators.ChainingStrategy;
16+
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
17+
import org.apache.flink.streaming.api.operators.Output;
18+
import org.apache.flink.streaming.api.watermark.Watermark;
19+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
20+
import org.apache.flink.streaming.runtime.tasks.StreamTask;
21+
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
22+
23+
import java.util.HashSet;
24+
import java.util.Optional;
25+
import java.util.Set;
26+
27+
/**
28+
* The DataSinkOperatorAdapter class acts as an adapter for testing the core schema evolution
29+
* process in both {@link DataSinkWriterOperator} and {@link DataSinkFunctionOperator}.
30+
*/
31+
public class DataSinkOperatorAdapter extends AbstractStreamOperator<Event>
32+
implements OneInputStreamOperator<Event, Event>, BoundedOneInput {
33+
34+
private SchemaEvolutionClient schemaEvolutionClient;
35+
36+
private final OperatorID schemaOperatorID;
37+
38+
/** A set of {@link TableId} that already processed {@link CreateTableEvent}. */
39+
private final Set<TableId> processedTableIds;
40+
41+
public DataSinkOperatorAdapter() {
42+
this.schemaOperatorID = new OperatorID();
43+
this.processedTableIds = new HashSet<>();
44+
this.chainingStrategy = ChainingStrategy.ALWAYS;
45+
}
46+
47+
@Override
48+
public void setup(
49+
StreamTask<?, ?> containingTask,
50+
StreamConfig config,
51+
Output<StreamRecord<Event>> output) {
52+
super.setup(containingTask, config, output);
53+
schemaEvolutionClient =
54+
new SchemaEvolutionClient(
55+
containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
56+
schemaOperatorID);
57+
}
58+
59+
@Override
60+
public void open() throws Exception {}
61+
62+
@Override
63+
public void initializeState(StateInitializationContext context) throws Exception {
64+
schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask());
65+
}
66+
67+
@Override
68+
public void snapshotState(StateSnapshotContext context) {}
69+
70+
@Override
71+
public void processWatermark(Watermark mark) {}
72+
73+
@Override
74+
public void processWatermarkStatus(WatermarkStatus watermarkStatus) {}
75+
76+
@Override
77+
public void processElement(StreamRecord<Event> element) throws Exception {
78+
Event event = element.getValue();
79+
80+
// FlushEvent triggers flush
81+
if (event instanceof FlushEvent) {
82+
handleFlushEvent(((FlushEvent) event));
83+
return;
84+
}
85+
86+
// CreateTableEvent marks the table as processed directly
87+
if (event instanceof CreateTableEvent) {
88+
processedTableIds.add(((CreateTableEvent) event).tableId());
89+
// replace FlinkWriterOperator/StreamSink and emit the event for testing
90+
output.collect(element);
91+
return;
92+
}
93+
94+
// Check if the table is processed before emitting all other events, because we have to make
95+
// sure that sink have a view of the full schema before processing any change events,
96+
// including schema changes.
97+
ChangeEvent changeEvent = (ChangeEvent) event;
98+
if (!processedTableIds.contains(changeEvent.tableId())) {
99+
emitLatestSchema(changeEvent.tableId());
100+
processedTableIds.add(changeEvent.tableId());
101+
}
102+
processedTableIds.add(changeEvent.tableId());
103+
output.collect(element);
104+
}
105+
106+
@Override
107+
public void prepareSnapshotPreBarrier(long checkpointId) {}
108+
109+
@Override
110+
public void close() throws Exception {}
111+
112+
@Override
113+
public void endInput() {}
114+
115+
// ----------------------------- Helper functions -------------------------------
116+
117+
private void handleFlushEvent(FlushEvent event) throws Exception {
118+
// omit copySinkWriter/userFunction flush from testing
119+
if (!processedTableIds.contains(event.getTableId()) && !event.getIsForCreateTableEvent()) {
120+
LOG.info("Table {} has not been processed", event.getTableId());
121+
emitLatestSchema(event.getTableId());
122+
processedTableIds.add(event.getTableId());
123+
}
124+
schemaEvolutionClient.notifyFlushSuccess(
125+
getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());
126+
}
127+
128+
private void emitLatestSchema(TableId tableId) throws Exception {
129+
Optional<Schema> schema = schemaEvolutionClient.getLatestEvolvedSchema(tableId);
130+
if (schema.isPresent()) {
131+
// request and process CreateTableEvent because SinkWriter need to retrieve
132+
// Schema to deserialize RecordData after resuming job.
133+
output.collect(new StreamRecord<>(new CreateTableEvent(tableId, schema.get())));
134+
processedTableIds.add(tableId);
135+
} else {
136+
throw new RuntimeException(
137+
"Could not find schema message from SchemaRegistry for " + tableId);
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)