Skip to content

Commit 4d8f26a

Browse files
committed
merge conflicts.
2 parents d798482 + a7d36b5 commit 4d8f26a

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.calcite.sql.SqlInsert;
5656
import org.apache.calcite.sql.SqlNode;
5757
import org.apache.commons.io.Charsets;
58+
import org.apache.commons.lang3.SerializationUtils;
5859
import org.apache.commons.lang3.StringUtils;
5960
import org.apache.flink.api.common.typeinfo.TypeInformation;
6061
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -79,6 +80,7 @@
7980
import java.time.ZoneId;
8081
import java.util.ArrayList;
8182
import java.util.Arrays;
83+
import java.util.HashMap;
8284
import java.util.List;
8385
import java.util.Map;
8486
import java.util.Objects;
@@ -244,7 +246,11 @@ private static void sqlTranslation(String localSqlPluginPath,
244246
scope++;
245247
}
246248

249+
final Map<String, AbstractSideTableInfo> tmpTableMap = new HashMap<>();
247250
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
251+
// prevent current sql use last sql's sideTableInfo
252+
sideTableMap.forEach((s, abstractSideTableInfo) -> tmpTableMap.put(s, SerializationUtils.clone(abstractSideTableInfo)));
253+
248254
if (LOG.isInfoEnabled()) {
249255
LOG.info("exe-sql:\n" + result.getExecSql());
250256
}
@@ -257,17 +263,17 @@ private static void sqlTranslation(String localSqlPluginPath,
257263
SqlNode sqlNode = flinkPlanner.getParser().parse(realSql);
258264
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
259265
tmp.setExecSql(tmpSql);
260-
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, tmp, scope + "");
266+
sideSqlExec.exec(tmp.getExecSql(), tmpTableMap, tableEnv, registerTableCache, tmp, scope + "");
261267
} else {
262268
for (String sourceTable : result.getSourceTableList()) {
263-
if (sideTableMap.containsKey(sourceTable)) {
269+
if (tmpTableMap.containsKey(sourceTable)) {
264270
isSide = true;
265271
break;
266272
}
267273
}
268274
if (isSide) {
269275
//sql-dimensional table contains the dimension table of execution
270-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, null, String.valueOf(scope));
276+
sideSqlExec.exec(result.getExecSql(), tmpTableMap, tableEnv, registerTableCache, null, String.valueOf(scope));
271277
} else {
272278
LOG.info("----------exec sql without dimension join-----------");
273279
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
@@ -280,6 +286,7 @@ private static void sqlTranslation(String localSqlPluginPath,
280286

281287
scope++;
282288
}
289+
tmpTableMap.clear();
283290
}
284291
}
285292

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
8484

8585
public final static String PREFERRED_TEST_QUERY_SQL = "SELECT 1 FROM DUAL";
8686

87-
private static SQLClient rdbSqlClient;
87+
private transient SQLClient rdbSqlClient;
8888

8989
private AtomicBoolean connectionStatus = new AtomicBoolean(true);
9090

@@ -126,7 +126,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
126126
Thread.sleep(100);
127127
}
128128
Map<String, Object> params = formatInputParam(inputParams);
129-
executor.execute(() -> connectWithRetry(params, input, resultFuture, RdbAsyncReqRow.rdbSqlClient));
129+
executor.execute(() -> connectWithRetry(params, input, resultFuture, rdbSqlClient));
130130
}
131131

132132
protected void asyncQueryData(Map<String, Object> inputParams,
@@ -280,8 +280,8 @@ public BaseRow fillData(BaseRow input, Object line) {
280280
@Override
281281
public void close() throws Exception {
282282
super.close();
283-
if (RdbAsyncReqRow.rdbSqlClient != null) {
284-
RdbAsyncReqRow.rdbSqlClient.close();
283+
if (rdbSqlClient != null) {
284+
rdbSqlClient.close();
285285
}
286286

287287
if (executor != null) {
@@ -291,7 +291,7 @@ public void close() throws Exception {
291291
}
292292

293293
public void setRdbSqlClient(SQLClient rdbSqlClient) {
294-
RdbAsyncReqRow.rdbSqlClient = rdbSqlClient;
294+
this.rdbSqlClient = rdbSqlClient;
295295
}
296296

297297
private void handleQuery(SQLConnection connection, Map<String, Object> inputParams, BaseRow input, ResultFuture<BaseRow> resultFuture) {

0 commit comments

Comments
 (0)