5555import org .apache .calcite .sql .SqlInsert ;
5656import org .apache .calcite .sql .SqlNode ;
5757import org .apache .commons .io .Charsets ;
58+ import org .apache .commons .lang3 .SerializationUtils ;
5859import org .apache .commons .lang3 .StringUtils ;
5960import org .apache .flink .api .common .typeinfo .TypeInformation ;
6061import org .apache .flink .api .java .typeutils .RowTypeInfo ;
7980import java .time .ZoneId ;
8081import java .util .ArrayList ;
8182import java .util .Arrays ;
83+ import java .util .HashMap ;
8284import java .util .List ;
8385import java .util .Map ;
8486import java .util .Objects ;
@@ -244,7 +246,11 @@ private static void sqlTranslation(String localSqlPluginPath,
244246 scope ++;
245247 }
246248
249+ final Map <String , AbstractSideTableInfo > tmpTableMap = new HashMap <>();
247250 for (InsertSqlParser .SqlParseResult result : sqlTree .getExecSqlList ()) {
251+ // prevent current sql use last sql's sideTableInfo
252+ sideTableMap .forEach ((s , abstractSideTableInfo ) -> tmpTableMap .put (s , SerializationUtils .clone (abstractSideTableInfo )));
253+
248254 if (LOG .isInfoEnabled ()) {
249255 LOG .info ("exe-sql:\n " + result .getExecSql ());
250256 }
@@ -257,17 +263,17 @@ private static void sqlTranslation(String localSqlPluginPath,
257263 SqlNode sqlNode = flinkPlanner .getParser ().parse (realSql );
258264 String tmpSql = ((SqlInsert ) sqlNode ).getSource ().toString ();
259265 tmp .setExecSql (tmpSql );
260- sideSqlExec .exec (tmp .getExecSql (), sideTableMap , tableEnv , registerTableCache , tmp , scope + "" );
266+ sideSqlExec .exec (tmp .getExecSql (), tmpTableMap , tableEnv , registerTableCache , tmp , scope + "" );
261267 } else {
262268 for (String sourceTable : result .getSourceTableList ()) {
263- if (sideTableMap .containsKey (sourceTable )) {
269+ if (tmpTableMap .containsKey (sourceTable )) {
264270 isSide = true ;
265271 break ;
266272 }
267273 }
268274 if (isSide ) {
269275 //sql-dimensional table contains the dimension table of execution
270- sideSqlExec .exec (result .getExecSql (), sideTableMap , tableEnv , registerTableCache , null , String .valueOf (scope ));
276+ sideSqlExec .exec (result .getExecSql (), tmpTableMap , tableEnv , registerTableCache , null , String .valueOf (scope ));
271277 } else {
272278 LOG .info ("----------exec sql without dimension join-----------" );
273279 LOG .info ("----------real sql exec is--------------------------\n {}" , result .getExecSql ());
@@ -280,26 +286,17 @@ private static void sqlTranslation(String localSqlPluginPath,
280286
281287 scope ++;
282288 }
289+ tmpTableMap .clear ();
283290 }
284291 }
285292
286293 public static void registerUserDefinedFunction (SqlTree sqlTree , List <URL > jarUrlList , TableEnvironment tableEnv , boolean getPlan )
287294 throws IllegalAccessException , InvocationTargetException {
288295 // udf和tableEnv须由同一个类加载器加载
289- ClassLoader levelClassLoader = tableEnv .getClass ().getClassLoader ();
290296 ClassLoader currentClassLoader = Thread .currentThread ().getContextClassLoader ();
291- URLClassLoader classLoader = null ;
297+ URLClassLoader classLoader = ClassLoaderManager . loadExtraJar ( jarUrlList , ( URLClassLoader ) currentClassLoader ) ;
292298 List <CreateFuncParser .SqlParserResult > funcList = sqlTree .getFunctionList ();
293299 for (CreateFuncParser .SqlParserResult funcInfo : funcList ) {
294- // 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
295- if (getPlan ) {
296- classLoader = ClassLoaderManager .loadExtraJar (jarUrlList , (URLClassLoader ) currentClassLoader );
297- }
298-
299- //classloader
300- if (classLoader == null ) {
301- classLoader = ClassLoaderManager .loadExtraJar (jarUrlList , (URLClassLoader ) levelClassLoader );
302- }
303300 FunctionManager .registerUDF (funcInfo .getType (), funcInfo .getClassName (), funcInfo .getName (), tableEnv , classLoader );
304301 }
305302 }
0 commit comments