Skip to content

Commit 23a67dc

Browse files
authored
[FLINK-35255][cdc][runtime] DataSinkWriterOperator adds overrides for the snapshotState and processWatermark methods (#3271)
1 parent 75a553e commit 23a67dc

File tree

1 file changed

+23
-0
lines changed

1 file changed

+23
-0
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,19 @@
2929
import org.apache.flink.cdc.common.schema.Schema;
3030
import org.apache.flink.runtime.jobgraph.OperatorID;
3131
import org.apache.flink.runtime.state.StateInitializationContext;
32+
import org.apache.flink.runtime.state.StateSnapshotContext;
3233
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
3334
import org.apache.flink.streaming.api.graph.StreamConfig;
3435
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
3536
import org.apache.flink.streaming.api.operators.BoundedOneInput;
3637
import org.apache.flink.streaming.api.operators.ChainingStrategy;
3738
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
3839
import org.apache.flink.streaming.api.operators.Output;
40+
import org.apache.flink.streaming.api.watermark.Watermark;
3941
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
4042
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
4143
import org.apache.flink.streaming.runtime.tasks.StreamTask;
44+
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
4245

4346
import java.lang.reflect.Constructor;
4447
import java.lang.reflect.Field;
@@ -123,6 +126,26 @@ public void initializeState(StateInitializationContext context) throws Exception
123126
.initializeState(context);
124127
}
125128

129+
@Override
130+
public void snapshotState(StateSnapshotContext context) throws Exception {
131+
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
132+
.snapshotState(context);
133+
}
134+
135+
@Override
136+
public void processWatermark(Watermark mark) throws Exception {
137+
super.processWatermark(mark);
138+
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
139+
.processWatermark(mark);
140+
}
141+
142+
@Override
143+
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
144+
super.processWatermarkStatus(watermarkStatus);
145+
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
146+
.processWatermarkStatus(watermarkStatus);
147+
}
148+
126149
@Override
127150
public void processElement(StreamRecord<Event> element) throws Exception {
128151
Event event = element.getValue();

0 commit comments

Comments
 (0)