|
53 | 53 | import org.apache.calcite.sql.SqlInsert;
|
54 | 54 | import org.apache.calcite.sql.SqlNode;
|
55 | 55 | import org.apache.commons.io.Charsets;
|
| 56 | +import org.apache.commons.lang3.SerializationUtils; |
56 | 57 | import org.apache.commons.lang3.StringUtils;
|
57 | 58 | import org.apache.flink.api.common.typeinfo.TypeInformation;
|
58 | 59 | import org.apache.flink.api.java.typeutils.RowTypeInfo;
|
|
75 | 76 | import java.net.URLClassLoader;
|
76 | 77 | import java.net.URLDecoder;
|
77 | 78 | import java.time.ZoneId;
|
78 |
| -import java.util.ArrayList; |
79 |
| -import java.util.Arrays; |
80 |
| -import java.util.List; |
81 |
| -import java.util.Map; |
82 |
| -import java.util.Properties; |
83 |
| -import java.util.Set; |
84 |
| -import java.util.TimeZone; |
| 79 | +import java.util.*; |
85 | 80 | import java.util.stream.Stream;
|
86 | 81 |
|
87 | 82 | /**
|
@@ -215,7 +210,11 @@ private static void sqlTranslation(String localSqlPluginPath,
|
215 | 210 | scope++;
|
216 | 211 | }
|
217 | 212 |
|
| 213 | + final Map<String, AbstractSideTableInfo> tmpTableMap = new HashMap<>(); |
218 | 214 | for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
|
| 215 | + // prevent current sql use last sql's sideTableInfo |
| 216 | + sideTableMap.forEach((s, abstractSideTableInfo) -> tmpTableMap.put(s, SerializationUtils.clone(abstractSideTableInfo))); |
| 217 | + |
219 | 218 | if (LOG.isInfoEnabled()) {
|
220 | 219 | LOG.info("exe-sql:\n" + result.getExecSql());
|
221 | 220 | }
|
@@ -251,6 +250,7 @@ private static void sqlTranslation(String localSqlPluginPath,
|
251 | 250 |
|
252 | 251 | scope++;
|
253 | 252 | }
|
| 253 | + tmpTableMap.clear(); |
254 | 254 | }
|
255 | 255 | }
|
256 | 256 |
|
|
0 commit comments