Skip to content

Commit 51897bf

Browse files
rajuGTrajuGT
andauthored
small refactor (#63)
Co-authored-by: rajuGT <[email protected]>
1 parent 9f841cc commit 51897bf

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,11 @@ public JobBuilder registerSourceWithPreProcessors() {
105105
DataStream<Row> dataStream = stream.registerSource(executionEnvironment, watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelay));
106106
StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new LastColumnWatermark());
107107

108-
DataStream<Row> rowSingleOutputStreamOperator = streamWatermarkAssigner
108+
DataStream<Row> dataStreamWithWaterMark = streamWatermarkAssigner
109109
.assignTimeStampAndWatermark(dataStream, watermarkDelay, enablePerPartitionWatermark);
110110

111111
TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType());
112-
StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames());
112+
StreamInfo streamInfo = new StreamInfo(dataStreamWithWaterMark, tableSchema.getFieldNames());
113113
streamInfo = addPreProcessor(streamInfo, tableName);
114114

115115
Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo));

0 commit comments

Comments
 (0)