File tree Expand file tree Collapse file tree 1 file changed +2
-2
lines changed
dagger-core/src/main/java/com/gotocompany/dagger/core Expand file tree Collapse file tree 1 file changed +2
-2
lines changed Original file line number Diff line number Diff line change @@ -105,11 +105,11 @@ public JobBuilder registerSourceWithPreProcessors() {
105105 DataStream <Row > dataStream = stream .registerSource (executionEnvironment , watermarkStrategyDefinition .getWatermarkStrategy (watermarkDelay ));
106106 StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner (new LastColumnWatermark ());
107107
108- DataStream <Row > rowSingleOutputStreamOperator = streamWatermarkAssigner
108+ DataStream <Row > dataStreamWithWaterMark = streamWatermarkAssigner
109109 .assignTimeStampAndWatermark (dataStream , watermarkDelay , enablePerPartitionWatermark );
110110
111111 TableSchema tableSchema = TableSchema .fromTypeInfo (dataStream .getType ());
112- StreamInfo streamInfo = new StreamInfo (rowSingleOutputStreamOperator , tableSchema .getFieldNames ());
112+ StreamInfo streamInfo = new StreamInfo (dataStreamWithWaterMark , tableSchema .getFieldNames ());
113113 streamInfo = addPreProcessor (streamInfo , tableName );
114114
115115 Table table = tableEnvironment .fromDataStream (streamInfo .getDataStream (), getApiExpressions (streamInfo ));
You can’t perform that action at this time.
0 commit comments