File tree Expand file tree Collapse file tree 1 file changed +5
-0
lines changed
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink Expand file tree Collapse file tree 1 file changed +5
-0
lines changed Original file line number Diff line number Diff line change @@ -112,6 +112,11 @@ public void processElement(StreamRecord<Event> element) throws Exception {
112
112
// ----------------------------- Helper functions -------------------------------
113
113
private void handleFlushEvent (FlushEvent event ) throws Exception {
114
114
userFunction .finish ();
115
+ if (!processedTableIds .contains (event .getTableId ()) && !event .getIsForCreateTableEvent ()) {
116
+ LOG .info ("Table {} has not been processed" , event .getTableId ());
117
+ emitLatestSchema (event .getTableId ());
118
+ processedTableIds .add (event .getTableId ());
119
+ }
115
120
schemaEvolutionClient .notifyFlushSuccess (
116
121
getRuntimeContext ().getIndexOfThisSubtask (), event .getTableId ());
117
122
}
You can’t perform that action at this time.
0 commit comments