|
80 | 80 | import java.util.Set;
|
81 | 81 |
|
82 | 82 | /**
|
83 |
| - * 任务执行时的流程方法 |
| 83 | + * 任务执行时的流程方法 |
84 | 84 | * Date: 2020/2/17
|
85 | 85 | * Company: www.dtstack.com
|
| 86 | + * |
86 | 87 | * @author maqi
|
87 | 88 | */
|
88 | 89 | public class ExecuteProcessHelper {
|
@@ -131,7 +132,8 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
|
131 | 132 | }
|
132 | 133 |
|
133 | 134 | /**
|
134 |
| - * 非local模式或者shipfile部署模式,remoteSqlPluginPath必填 |
| 135 | + * 非local模式或者shipfile部署模式,remoteSqlPluginPath必填 |
| 136 | + * |
135 | 137 | * @param remoteSqlPluginPath
|
136 | 138 | * @param deployMode
|
137 | 139 | * @param pluginLoadMode
|
@@ -189,12 +191,12 @@ private static List<URL> getExternalJarUrls(String addJarListStr) throws java.io
|
189 | 191 | }
|
190 | 192 |
|
191 | 193 | private static void sqlTranslation(String localSqlPluginPath,
|
192 |
| - StreamTableEnvironment tableEnv, |
193 |
| - SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap, |
194 |
| - Map<String, Table> registerTableCache, |
195 |
| - StreamQueryConfig queryConfig) throws Exception { |
| 194 | + StreamTableEnvironment tableEnv, |
| 195 | + SqlTree sqlTree, Map<String, SideTableInfo> sideTableMap, |
| 196 | + Map<String, Table> registerTableCache, |
| 197 | + StreamQueryConfig queryConfig) throws Exception { |
196 | 198 |
|
197 |
| - SideSqlExec sideSqlExec = new SideSqlExec(); |
| 199 | + SideSqlExec sideSqlExec = new SideSqlExec(); |
198 | 200 | sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
|
199 | 201 | for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
|
200 | 202 | sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
|
@@ -254,13 +256,14 @@ private static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUr
|
254 | 256 | }
|
255 | 257 |
|
256 | 258 | /**
|
257 |
| - * 向Flink注册源表和结果表,返回执行时插件包的全路径 |
| 259 | + * 向Flink注册源表和结果表,返回执行时插件包的全路径 |
| 260 | + * |
258 | 261 | * @param sqlTree
|
259 | 262 | * @param env
|
260 | 263 | * @param tableEnv
|
261 | 264 | * @param localSqlPluginPath
|
262 | 265 | * @param remoteSqlPluginPath
|
263 |
| - * @param pluginLoadMode 插件加载模式 classpath or shipfile |
| 266 | + * @param pluginLoadMode 插件加载模式 classpath or shipfile |
264 | 267 | * @param sideTableMap
|
265 | 268 | * @param registerTableCache
|
266 | 269 | * @return
|
@@ -293,7 +296,23 @@ private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironmen
|
293 | 296 |
|
294 | 297 | if (waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)) {
|
295 | 298 | adaptStream = waterMarkerAssigner.assignWaterMarker(adaptStream, typeInfo, sourceTableInfo);
|
296 |
| - fields += ",ROWTIME.ROWTIME"; |
| 299 | + String eventTimeField = sourceTableInfo.getEventTimeField(); |
| 300 | + boolean hasEventTimeField = false; |
| 301 | + if (!Strings.isNullOrEmpty(eventTimeField)) { |
| 302 | + String[] fieldArray = fields.split(","); |
| 303 | + for (int i = 0; i < fieldArray.length; i++) { |
| 304 | + if (fieldArray[i].equals(eventTimeField)) { |
| 305 | + fieldArray[i] = eventTimeField + ".ROWTIME"; |
| 306 | + hasEventTimeField = true; |
| 307 | + break; |
| 308 | + } |
| 309 | + } |
| 310 | + if (hasEventTimeField) { |
| 311 | + fields = String.join(",", fieldArray); |
| 312 | + } else { |
| 313 | + fields += ",ROWTIME.ROWTIME"; |
| 314 | + } |
| 315 | + } |
297 | 316 | } else {
|
298 | 317 | fields += ",PROCTIME.PROCTIME";
|
299 | 318 | }
|
@@ -329,7 +348,8 @@ private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironmen
|
329 | 348 | }
|
330 | 349 |
|
331 | 350 | /**
|
332 |
| - * perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph |
| 351 | + * perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph |
| 352 | + * |
333 | 353 | * @param env
|
334 | 354 | * @param classPathSet
|
335 | 355 | */
|
|
0 commit comments