@@ -137,7 +137,7 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
137
137
* @param pluginLoadMode
138
138
* @return
139
139
*/
140
- public static boolean checkRemoteSqlPluginPath (String remoteSqlPluginPath , String deployMode , String pluginLoadMode ) {
140
+ private static boolean checkRemoteSqlPluginPath (String remoteSqlPluginPath , String deployMode , String pluginLoadMode ) {
141
141
if (StringUtils .isEmpty (remoteSqlPluginPath )) {
142
142
return StringUtils .equalsIgnoreCase (pluginLoadMode , EPluginLoadMode .SHIPFILE .name ())
143
143
|| StringUtils .equalsIgnoreCase (deployMode , ClusterMode .local .name ());
@@ -174,7 +174,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
174
174
}
175
175
176
176
177
- public static List <URL > getExternalJarUrls (String addJarListStr ) throws java .io .IOException {
177
+ private static List <URL > getExternalJarUrls (String addJarListStr ) throws java .io .IOException {
178
178
List <URL > jarUrlList = Lists .newArrayList ();
179
179
if (Strings .isNullOrEmpty (addJarListStr )) {
180
180
return jarUrlList ;
@@ -187,7 +187,7 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
187
187
}
188
188
return jarUrlList ;
189
189
}
190
-
190
+
191
191
private static void sqlTranslation (String localSqlPluginPath ,
192
192
StreamTableEnvironment tableEnv ,
193
193
SqlTree sqlTree ,Map <String , SideTableInfo > sideTableMap ,
@@ -238,7 +238,7 @@ private static void sqlTranslation(String localSqlPluginPath,
238
238
}
239
239
}
240
240
241
- public static void registerUserDefinedFunction (SqlTree sqlTree , List <URL > jarUrlList , TableEnvironment tableEnv )
241
+ private static void registerUserDefinedFunction (SqlTree sqlTree , List <URL > jarUrlList , TableEnvironment tableEnv )
242
242
throws IllegalAccessException , InvocationTargetException {
243
243
// udf和tableEnv须由同一个类加载器加载
244
244
ClassLoader levelClassLoader = tableEnv .getClass ().getClassLoader ();
@@ -266,9 +266,9 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
266
266
* @return
267
267
* @throws Exception
268
268
*/
269
- public static Set <URL > registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
269
+ private static Set <URL > registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
270
270
String remoteSqlPluginPath , String pluginLoadMode , Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache ) throws Exception {
271
- Set <URL > pluginClassPatshSets = Sets .newHashSet ();
271
+ Set <URL > pluginClassPathSets = Sets .newHashSet ();
272
272
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner ();
273
273
for (TableInfo tableInfo : sqlTree .getTableInfoMap ().values ()) {
274
274
@@ -306,34 +306,34 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
306
306
registerTableCache .put (tableInfo .getName (), regTable );
307
307
308
308
URL sourceTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
309
- pluginClassPatshSets .add (sourceTablePathUrl );
309
+ pluginClassPathSets .add (sourceTablePathUrl );
310
310
} else if (tableInfo instanceof TargetTableInfo ) {
311
311
312
312
TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
313
313
TypeInformation [] flinkTypes = FunctionManager .transformTypes (tableInfo .getFieldClasses ());
314
314
tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
315
315
316
316
URL sinkTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
317
- pluginClassPatshSets .add (sinkTablePathUrl );
317
+ pluginClassPathSets .add (sinkTablePathUrl );
318
318
} else if (tableInfo instanceof SideTableInfo ) {
319
319
String sideOperator = ECacheType .ALL .name ().equals (((SideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
320
320
sideTableMap .put (tableInfo .getName (), (SideTableInfo ) tableInfo );
321
321
322
322
URL sideTablePathUrl = PluginUtil .buildSidePathByLoadMode (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
323
- pluginClassPatshSets .add (sideTablePathUrl );
323
+ pluginClassPathSets .add (sideTablePathUrl );
324
324
} else {
325
325
throw new RuntimeException ("not support table type:" + tableInfo .getType ());
326
326
}
327
327
}
328
- return pluginClassPatshSets ;
328
+ return pluginClassPathSets ;
329
329
}
330
330
331
331
/**
332
332
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
333
333
* @param env
334
334
* @param classPathSet
335
335
*/
336
- public static void registerPluginUrlToCachedFile (StreamExecutionEnvironment env , Set <URL > classPathSet ) {
336
+ private static void registerPluginUrlToCachedFile (StreamExecutionEnvironment env , Set <URL > classPathSet ) {
337
337
int i = 0 ;
338
338
for (URL url : classPathSet ) {
339
339
String classFileName = String .format (CLASS_FILE_NAME_FMT , i );
@@ -342,7 +342,7 @@ public static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env,
342
342
}
343
343
}
344
344
345
- public static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws Exception {
345
+ private static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws Exception {
346
346
StreamExecutionEnvironment env = !ClusterMode .local .name ().equals (deployMode ) ?
347
347
StreamExecutionEnvironment .getExecutionEnvironment () :
348
348
new MyLocalStreamEnvironment ();
0 commit comments