Skip to content

Commit e311ac5

Browse files
author
gituser
committed
Merge branch 'hotfix_3.9.x_23188' into 1.8_release_3.9.x
2 parents 0e612c3 + a965b11 commit e311ac5

File tree

3 files changed

+110
-18
lines changed

3 files changed

+110
-18
lines changed

core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java

+21
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.common.typeinfo.TypeInformation;
2424

2525
import java.io.Serializable;
26+
import java.util.Objects;
2627

2728
/**
2829
* Reason:
@@ -64,4 +65,24 @@ public TypeInformation getTypeInformation() {
6465
public void setTypeInformation(TypeInformation typeInformation) {
6566
this.typeInformation = typeInformation;
6667
}
68+
69+
@Override
70+
public boolean equals(Object o) {
71+
if (this == o) {
72+
return true;
73+
}
74+
75+
if (o == null || getClass() != o.getClass()) {
76+
return false;
77+
}
78+
79+
FieldInfo fieldInfo = (FieldInfo) o;
80+
return Objects.equals(table, fieldInfo.table) &&
81+
Objects.equals(fieldName, fieldInfo.fieldName);
82+
}
83+
84+
@Override
85+
public int hashCode() {
86+
return Objects.hash(table, fieldName);
87+
}
6788
}

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,7 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet,
190190
tableInfo.setJoinType(joinType);
191191
tableInfo.setCondition(joinNode.getCondition());
192192

193-
if(!needBuildTemp){
194-
return tableInfo;
195-
}
196-
197-
if(tableInfo.getLeftNode().getKind() != AS){
193+
if(tableInfo.getLeftNode().getKind() != AS && needBuildTemp){
198194
extractTemporaryQuery(tableInfo.getLeftNode(), tableInfo.getLeftTableAlias(), (SqlBasicCall) parentWhere,
199195
parentSelectList, queueInfo, joinFieldSet, tableRef);
200196
}else {

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

+88-13
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@
2020
package com.dtstack.flink.sql.util;
2121

2222
import com.dtstack.flink.sql.side.FieldInfo;
23-
import com.dtstack.flink.sql.side.FieldReplaceInfo;
2423
import com.dtstack.flink.sql.side.JoinInfo;
2524
import com.google.common.base.Preconditions;
2625
import com.google.common.base.Strings;
2726
import com.google.common.collect.Lists;
28-
import com.google.common.collect.Sets;
2927
import org.apache.calcite.sql.SqlAsOperator;
3028
import org.apache.calcite.sql.SqlBasicCall;
3129
import org.apache.calcite.sql.SqlDataTypeSpec;
@@ -40,7 +38,6 @@
4038
import org.apache.calcite.sql.fun.SqlCase;
4139
import org.apache.calcite.sql.parser.SqlParserPos;
4240
import org.apache.commons.lang3.StringUtils;
43-
import org.apache.flink.api.java.tuple.Tuple2;
4441
import org.apache.flink.table.api.Table;
4542

4643
import java.util.List;
@@ -72,41 +69,119 @@ public static List<FieldInfo> parserSelectField(SqlSelect sqlSelect, Map<String,
7269
List<FieldInfo> fieldInfoList = Lists.newArrayList();
7370
String fromNode = sqlSelect.getFrom().toString();
7471

75-
for(SqlNode fieldNode : sqlNodeList.getList()){
76-
SqlIdentifier identifier = (SqlIdentifier)fieldNode;
77-
if(!identifier.isStar()) {
72+
for (SqlNode fieldNode : sqlNodeList.getList()) {
73+
extractSelectFieldToFieldInfo(fieldNode,fromNode,fieldInfoList,localTableCache);
74+
}
75+
76+
return fieldInfoList;
77+
}
78+
79+
/**
80+
* 解析select Node 提取FieldInfo
81+
* @param fieldNode
82+
* @param fromNode
83+
* @param fieldInfoList
84+
* @param localTableCache
85+
*/
86+
public static void extractSelectFieldToFieldInfo(SqlNode fieldNode, String fromNode, List<FieldInfo> fieldInfoList, Map<String, Table> localTableCache) {
87+
if (fieldNode.getKind() == IDENTIFIER) {
88+
SqlIdentifier identifier = (SqlIdentifier) fieldNode;
89+
if (!identifier.isStar()) {
7890
String tableName = identifier.names.size() == 1 ? fromNode : identifier.getComponent(0).getSimple();
7991
String fieldName = identifier.names.size() == 1 ? identifier.getComponent(0).getSimple() : identifier.getComponent(1).getSimple();
8092
FieldInfo fieldInfo = new FieldInfo();
8193
fieldInfo.setTable(tableName);
8294
fieldInfo.setFieldName(fieldName);
83-
fieldInfoList.add(fieldInfo);
95+
96+
if (!fieldInfoList.contains(fieldInfo)) {
97+
fieldInfoList.add(fieldInfo);
98+
}
8499
} else {
85100
//处理
86101
int identifierSize = identifier.names.size();
87-
88-
switch(identifierSize) {
102+
switch (identifierSize) {
89103
case 1:
90104
throw new RuntimeException("not support to parse * without scope of table");
91105
default:
92106
SqlIdentifier tableIdentify = identifier.skipLast(1);
93107
Table registerTable = localTableCache.get(tableIdentify.getSimple());
94-
if(registerTable == null){
108+
if (registerTable == null) {
95109
throw new RuntimeException("can't find table alias " + tableIdentify.getSimple());
96110
}
97111

98112
String[] fieldNames = registerTable.getSchema().getFieldNames();
99-
for(String fieldName : fieldNames){
113+
for (String fieldName : fieldNames) {
100114
FieldInfo fieldInfo = new FieldInfo();
101115
fieldInfo.setTable(tableIdentify.getSimple());
102116
fieldInfo.setFieldName(fieldName);
103117
fieldInfoList.add(fieldInfo);
104118
}
105119
}
106120
}
107-
}
121+
} else if (AGGREGATE.contains(fieldNode.getKind())
122+
|| AVG_AGG_FUNCTIONS.contains(fieldNode.getKind())
123+
|| COMPARISON.contains(fieldNode.getKind())
124+
|| fieldNode.getKind() == OTHER_FUNCTION
125+
|| fieldNode.getKind() == DIVIDE
126+
|| fieldNode.getKind() == CAST
127+
|| fieldNode.getKind() == TRIM
128+
|| fieldNode.getKind() == TIMES
129+
|| fieldNode.getKind() == PLUS
130+
|| fieldNode.getKind() == NOT_IN
131+
|| fieldNode.getKind() == OR
132+
|| fieldNode.getKind() == AND
133+
|| fieldNode.getKind() == MINUS
134+
|| fieldNode.getKind() == TUMBLE
135+
|| fieldNode.getKind() == TUMBLE_START
136+
|| fieldNode.getKind() == TUMBLE_END
137+
|| fieldNode.getKind() == SESSION
138+
|| fieldNode.getKind() == SESSION_START
139+
|| fieldNode.getKind() == SESSION_END
140+
|| fieldNode.getKind() == HOP
141+
|| fieldNode.getKind() == HOP_START
142+
|| fieldNode.getKind() == HOP_END
143+
|| fieldNode.getKind() == BETWEEN
144+
|| fieldNode.getKind() == IS_NULL
145+
|| fieldNode.getKind() == IS_NOT_NULL
146+
|| fieldNode.getKind() == CONTAINS
147+
|| fieldNode.getKind() == TIMESTAMP_ADD
148+
|| fieldNode.getKind() == TIMESTAMP_DIFF
149+
|| fieldNode.getKind() == LIKE
150+
) {
151+
SqlBasicCall sqlBasicCall = (SqlBasicCall) fieldNode;
152+
for (int i = 0; i < sqlBasicCall.getOperands().length; i++) {
153+
SqlNode sqlNode = sqlBasicCall.getOperands()[i];
154+
if (sqlNode instanceof SqlLiteral) {
155+
continue;
156+
}
108157

109-
return fieldInfoList;
158+
if (sqlNode instanceof SqlDataTypeSpec) {
159+
continue;
160+
}
161+
extractSelectFieldToFieldInfo(sqlNode, fromNode, fieldInfoList, localTableCache);
162+
}
163+
} else if (fieldNode.getKind() == AS) {
164+
SqlNode leftNode = ((SqlBasicCall) fieldNode).getOperands()[0];
165+
extractSelectFieldToFieldInfo(leftNode, fromNode,fieldInfoList, localTableCache);
166+
} else if (fieldNode.getKind() == CASE) {
167+
SqlCase sqlCase = (SqlCase) fieldNode;
168+
SqlNodeList whenOperands = sqlCase.getWhenOperands();
169+
SqlNodeList thenOperands = sqlCase.getThenOperands();
170+
SqlNode elseNode = sqlCase.getElseOperand();
171+
172+
for (int i = 0; i < whenOperands.size(); i++) {
173+
SqlNode oneOperand = whenOperands.get(i);
174+
extractSelectFieldToFieldInfo(oneOperand, fromNode, fieldInfoList, localTableCache);
175+
}
176+
177+
for (int i = 0; i < thenOperands.size(); i++) {
178+
SqlNode oneOperand = thenOperands.get(i);
179+
extractSelectFieldToFieldInfo(oneOperand, fromNode, fieldInfoList, localTableCache);
180+
181+
}
182+
183+
extractSelectFieldToFieldInfo(elseNode, fromNode, fieldInfoList, localTableCache);
184+
}
110185
}
111186

112187
public static String buildInternalTableName(String left, char split, String right) {

0 commit comments

Comments
 (0)