Skip to content

Commit 138801f

Browse files
committed
[merge] 1.10_release_4.0.x -> 1.10_release_4.1.x resolve conflict by tiezhu.
2 parents 9e931a7 + 3858180 commit 138801f

File tree

5 files changed

+305
-18
lines changed

5 files changed

+305
-18
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
233233
private static void sqlTranslation(String localSqlPluginPath,
234234
String pluginLoadMode,
235235
StreamTableEnvironment tableEnv,
236-
SqlTree sqlTree, Map<String, AbstractSideTableInfo> sideTableMap,
236+
SqlTree sqlTree,
237+
Map<String, AbstractSideTableInfo> sideTableMap,
237238
Map<String, Table> registerTableCache) throws Exception {
238239

239240
SideSqlExec sideSqlExec = new SideSqlExec();
@@ -277,7 +278,8 @@ private static void sqlTranslation(String localSqlPluginPath,
277278
} else {
278279
LOG.info("----------exec sql without dimension join-----------");
279280
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
280-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
281+
282+
FlinkSQLExec.sqlInsert(tableEnv, result.getExecSql(), SideSqlExec.getDimTableNewTable().keySet() );
281283
if (LOG.isInfoEnabled()) {
282284
LOG.info("exec sql: " + result.getExecSql());
283285
}

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21+
import com.dtstack.flink.sql.util.TableUtils;
2122
import org.apache.calcite.sql.SqlIdentifier;
23+
import org.apache.calcite.sql.SqlNode;
2224
import org.apache.flink.sql.parser.dml.RichSqlInsert;
2325
import org.apache.flink.table.api.Table;
2426
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
@@ -32,6 +34,7 @@
3234
import org.apache.flink.table.planner.delegation.PlannerBase;
3335
import org.apache.flink.table.planner.delegation.StreamPlanner;
3436
import org.apache.flink.table.planner.operations.SqlToOperationConverter;
37+
import org.apache.flink.table.planner.plan.QueryOperationConverter;
3538
import org.apache.flink.table.sinks.TableSink;
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
@@ -41,6 +44,7 @@
4144
import java.lang.reflect.InvocationTargetException;
4245
import java.lang.reflect.Method;
4346
import java.util.Arrays;
47+
import java.util.Collection;
4448

4549

4650
/**
@@ -51,14 +55,62 @@
5155
public class FlinkSQLExec {
5256
private static final Logger LOG = LoggerFactory.getLogger(FlinkSQLExec.class);
5357

54-
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
58+
public static void sqlInsert(StreamTableEnvironment tableEnv, SqlNode sqlNode, Collection<String> newRegisterTableList) throws Exception{
59+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(sqlNode, newRegisterTableList);
60+
if(isGroupByTimeWindow){
61+
QueryOperationConverter.setProducesUpdates(true);
62+
}
63+
64+
sqlInsert(tableEnv, (RichSqlInsert) sqlNode);
65+
QueryOperationConverter.setProducesUpdates(false);
66+
}
67+
68+
public static void sqlInsert(StreamTableEnvironment tableEnv, String stmt, Collection<String> newRegisterTableList) throws Exception{
5569
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
5670
StreamPlanner streamPlanner = (StreamPlanner) tableEnvImpl.getPlanner();
5771
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
5872

5973
RichSqlInsert insert = (RichSqlInsert) flinkPlanner.validate(flinkPlanner.parser().parse(stmt));
60-
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
74+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(insert, newRegisterTableList);
75+
if(isGroupByTimeWindow){
76+
QueryOperationConverter.setProducesUpdates(true);
77+
}
6178

79+
sqlInsert(tableEnv, insert);
80+
QueryOperationConverter.setProducesUpdates(false);
81+
}
82+
83+
public static Table sqlQuery(StreamTableEnvironment tableEnv, SqlNode sqlNode, Collection<String> newRegisterTableList){
84+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(sqlNode, newRegisterTableList);
85+
if(isGroupByTimeWindow){
86+
QueryOperationConverter.setProducesUpdates(true);
87+
}
88+
89+
Table resultTable = tableEnv.sqlQuery(sqlNode.toString());
90+
QueryOperationConverter.setProducesUpdates(false);
91+
return resultTable;
92+
}
93+
94+
public static void insertInto(StreamTableEnvironment tableEnv,
95+
SqlNode sqlNode,
96+
String targetTableName,
97+
Table fromTable,
98+
Collection<String> newRegisterTableList){
99+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(sqlNode, newRegisterTableList);
100+
if(isGroupByTimeWindow){
101+
QueryOperationConverter.setProducesUpdates(true);
102+
}
103+
104+
tableEnv.insertInto(targetTableName, fromTable);
105+
QueryOperationConverter.setProducesUpdates(false);
106+
}
107+
108+
public static void sqlInsert(StreamTableEnvironment tableEnv, RichSqlInsert insert) throws Exception {
109+
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
110+
StreamPlanner streamPlanner = (StreamPlanner) tableEnvImpl.getPlanner();
111+
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
112+
113+
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
62114
String targetTableName = ((SqlIdentifier) insert.getTargetTable()).names.get(0);
63115
TableSink tableSink = getTableSinkByPlanner(streamPlanner, targetTableName);
64116

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.flink.table.api.Table;
5454
import org.apache.flink.table.api.java.StreamTableEnvironment;
5555
import org.apache.flink.table.dataformat.BaseRow;
56+
import org.apache.flink.table.planner.plan.QueryOperationConverter;
5657
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
5758
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
5859
import org.apache.flink.table.runtime.typeutils.LegacyLocalDateTimeTypeInfo;
@@ -68,6 +69,7 @@
6869
import java.time.LocalDateTime;
6970
import java.util.ArrayList;
7071
import java.util.Arrays;
72+
import java.util.Collection;
7173
import java.util.LinkedList;
7274
import java.util.List;
7375
import java.util.Map;
@@ -101,6 +103,9 @@ public class SideSqlExec {
101103

102104
private Map<String, Table> localTableCache = Maps.newHashMap();
103105

106+
//维表重新注册之后的名字缓存
107+
private static Map<String, Table> dimTableNewTable = Maps.newHashMap();
108+
104109
public void exec(String sql,
105110
Map<String, AbstractSideTableInfo> sideTableMap,
106111
StreamTableEnvironment tableEnv,
@@ -128,6 +133,8 @@ public void exec(String sql,
128133
sideSQLParser.setLocalTableCache(localTableCache);
129134
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet(), scope);
130135
Object pollObj = null;
136+
// create view中是否包含维表
137+
boolean includeDimTable = false;
131138

132139
while ((pollObj = exeQueue.poll()) != null) {
133140

@@ -136,41 +143,49 @@ public void exec(String sql,
136143

137144

138145
if (pollSqlNode.getKind() == INSERT) {
139-
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
146+
Collection<String> newRegisterTableList = dimTableNewTable.keySet();
147+
FlinkSQLExec.sqlInsert(tableEnv, pollSqlNode, newRegisterTableList);
140148
if (LOG.isInfoEnabled()) {
141149
LOG.info("----------real exec sql-----------\n{}", pollSqlNode.toString());
142150
}
143151

144152
} else if (pollSqlNode.getKind() == AS) {
145-
dealAsSourceTable(tableEnv, pollSqlNode, tableCache);
153+
Collection<String> newRegisterTableList = dimTableNewTable.keySet();
154+
dealAsSourceTable(tableEnv, pollSqlNode, tableCache, newRegisterTableList);
146155

147156
} else if (pollSqlNode.getKind() == WITH_ITEM) {
148157
SqlWithItem sqlWithItem = (SqlWithItem) pollSqlNode;
149-
String TableAlias = sqlWithItem.name.toString();
150-
Table table = tableEnv.sqlQuery(sqlWithItem.query.toString());
151-
tableEnv.registerTable(TableAlias, table);
158+
String tableAlias = sqlWithItem.name.toString();
159+
Collection<String> newRegisterTableList = dimTableNewTable.keySet();
160+
Table table = FlinkSQLExec.sqlQuery(tableEnv, sqlWithItem.query, newRegisterTableList);
161+
tableEnv.createTemporaryView(tableAlias, table);
152162

153163
} else if (pollSqlNode.getKind() == SELECT) {
154164
Preconditions.checkState(createView != null, "select sql must included by create view");
155-
Table table = tableEnv.sqlQuery(pollObj.toString());
165+
Collection<String> newRegisterTableList = dimTableNewTable.keySet();
166+
Table table = FlinkSQLExec.sqlQuery(tableEnv, pollSqlNode, newRegisterTableList);
156167

157168
if (createView.getFieldsInfoStr() == null) {
158169
tableEnv.registerTable(createView.getTableName(), table);
159170
} else {
160171
if (checkFieldsInfo(createView, table)) {
161172
table = table.as(tmpFields);
162-
tableEnv.registerTable(createView.getTableName(), table);
173+
tableEnv.createTemporaryView(createView.getTableName(), table);
163174
} else {
164175
throw new RuntimeException("Fields mismatch");
165176
}
166177
}
167178

168179
localTableCache.put(createView.getTableName(), table);
180+
if(includeDimTable){
181+
dimTableNewTable.put(createView.getTableName(), table);
182+
}
169183
}
170-
184+
includeDimTable = false;
171185
} else if (pollObj instanceof JoinInfo) {
186+
includeDimTable = true;
172187
LOG.info("----------exec join info----------\n{}", pollObj.toString());
173-
joinFun(pollObj, localTableCache, sideTableMap, tableEnv);
188+
joinFun(pollObj, localTableCache, dimTableNewTable,sideTableMap, tableEnv);
174189
}
175190
}
176191

@@ -414,14 +429,21 @@ public List<String> getConditionFields(SqlNode conditionNode, String specifyTabl
414429

415430
protected void dealAsSourceTable(StreamTableEnvironment tableEnv,
416431
SqlNode pollSqlNode,
417-
Map<String, Table> tableCache) throws SqlParseException {
432+
Map<String, Table> tableCache,
433+
Collection<String> newRegisterTableList) throws SqlParseException {
418434

419435
AliasInfo aliasInfo = parseASNode(pollSqlNode);
420436
if (localTableCache.containsKey(aliasInfo.getName())) {
421437
return;
422438
}
423439

440+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(pollSqlNode, newRegisterTableList);
441+
if(isGroupByTimeWindow){
442+
QueryOperationConverter.setProducesUpdates(true);
443+
}
424444
Table table = tableEnv.sqlQuery(aliasInfo.getName());
445+
QueryOperationConverter.setProducesUpdates(false);
446+
425447
tableEnv.registerTable(aliasInfo.getAlias(), table);
426448
localTableCache.put(aliasInfo.getAlias(), table);
427449

@@ -441,6 +463,7 @@ protected void dealAsSourceTable(StreamTableEnvironment tableEnv,
441463

442464
private void joinFun(Object pollObj,
443465
Map<String, Table> localTableCache,
466+
Map<String, Table> dimTableNewTable,
444467
Map<String, AbstractSideTableInfo> sideTableMap,
445468
StreamTableEnvironment tableEnv) throws Exception {
446469
JoinInfo joinInfo = (JoinInfo) pollObj;
@@ -525,7 +548,6 @@ private void joinFun(Object pollObj,
525548
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());
526549

527550
DataStream<BaseRow> adaptStream = tableEnv.toRetractStream(targetTable, typeInfo)
528-
.filter(f -> f.f0)
529551
.map(f -> RowDataConvert.convertToBaseRow(f));
530552

531553
//join side table before keyby ===> Reducing the size of each dimension table cache of async
@@ -565,6 +587,7 @@ private void joinFun(Object pollObj,
565587
Table joinTable = tableEnv.fromDataStream(dsOut);
566588
tableEnv.createTemporaryView(targetTableName, joinTable);
567589
localTableCache.put(joinInfo.getNewTableName(), joinTable);
590+
dimTableNewTable.put(joinInfo.getNewTableName(), joinTable);
568591
}
569592
}
570593

@@ -593,4 +616,8 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab
593616
return true;
594617
}
595618

619+
public static Map<String, Table> getDimTableNewTable(){
620+
return dimTableNewTable;
621+
}
622+
596623
}

0 commit comments

Comments
 (0)