diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/SchedulerServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/SchedulerServiceImpl.java index bb3b76739b..20156b5c2f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/SchedulerServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/SchedulerServiceImpl.java @@ -29,6 +29,7 @@ import org.dinky.scheduler.enums.ReleaseState; import org.dinky.scheduler.exception.SchedulerException; import org.dinky.scheduler.model.DagData; +import org.dinky.scheduler.model.DagNodeLocation; import org.dinky.scheduler.model.DinkyTaskParams; import org.dinky.scheduler.model.DinkyTaskRequest; import org.dinky.scheduler.model.ProcessDefinition; @@ -39,6 +40,8 @@ import org.dinky.service.CatalogueService; import org.dinky.service.SchedulerService; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.springframework.stereotype.Service; @@ -47,6 +50,8 @@ import com.google.common.base.Strings; import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.RandomUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; @@ -94,18 +99,25 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) { dinkyTaskRequest.setName(taskName); TaskRequest taskRequest = new TaskRequest(); + JSONArray array = new JSONArray(); + Long taskCode = taskClient.genTaskCode(projectCode); if (process == null) { - Long taskCode = taskClient.genTaskCode(projectCode); dinkyTaskRequest.setCode(taskCode); BeanUtil.copyProperties(dinkyTaskRequest, taskRequest); taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag()); taskRequest.setFlag(dinkyTaskRequest.getFlag()); JSONObject jsonObject = JSONUtil.parseObj(taskRequest); - JSONArray array = new JSONArray(); array.set(jsonObject); - processClient.createProcessDefinition(projectCode, processName, taskCode, array.toString()); log.info(Status.DS_ADD_WORK_FLOW_DEFINITION_SUCCESS.getMessage()); + // 随机出一个 x y 坐标 + DagNodeLocation dagNodeLocation = new DagNodeLocation(); + dagNodeLocation.setTaskCode(taskCode); + dagNodeLocation.setX(RandomUtil.randomLong(200, 500)); + dagNodeLocation.setY(RandomUtil.randomLong(100, 400)); + log.info("DagNodeLocation Info: {}", dagNodeLocation); + processClient.createOrUpdateProcessDefinition( + projectCode, null, processName, taskCode, array.toString(), Arrays.asList(dagNodeLocation), false); } if (process != null && process.getReleaseState() == ReleaseState.ONLINE) { @@ -120,7 +132,6 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) { projectCode, taskMainInfo.getProcessDefinitionCode(), taskMainInfo.getTaskCode(), dinkyTaskRequest); } - Long taskCode = taskClient.genTaskCode(projectCode); dinkyTaskRequest.setCode(taskCode); BeanUtil.copyProperties(dinkyTaskRequest, taskRequest); taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag()); @@ -129,12 +140,70 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) { if (process != null) { taskClient.createTaskDefinition( projectCode, process.getCode(), dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj); + // 更新 process 的 location 信息 + updateProcessDefinition(process, taskCode, taskRequest, array, projectCode); + log.info(Status.DS_ADD_TASK_DEFINITION_SUCCESS.getMessage()); return true; } return false; } + private void updateProcessDefinition( + ProcessDefinition process, Long taskCode, TaskRequest taskRequest, JSONArray array, long projectCode) { + JSONObject jsonObject = JSONUtil.parseObj(taskRequest); + array.set(jsonObject); + + List locations = new ArrayList<>(); + + if (CollUtil.isNotEmpty(process.getLocations())) { + boolean matched = process.getLocations().stream().anyMatch(location -> location.getTaskCode() == taskCode); + // if not matched, add a new location + if (!matched) { + // 获取最大的 x y 坐标 + long xMax = process.getLocations().stream() + .mapToLong(DagNodeLocation::getX) + .max() + .getAsLong(); + long xMin = process.getLocations().stream() + .mapToLong(DagNodeLocation::getX) + .min() + .getAsLong(); + long yMax = process.getLocations().stream() + .mapToLong(DagNodeLocation::getY) + .max() + .getAsLong(); + long yMin = process.getLocations().stream() + .mapToLong(DagNodeLocation::getY) + .max() + .getAsLong(); + // 随机出一个 x y 坐标 + DagNodeLocation dagNodeLocation = new DagNodeLocation(); + dagNodeLocation.setTaskCode(taskCode); + dagNodeLocation.setX(RandomUtil.randomLong(xMax, xMin)); + dagNodeLocation.setY(RandomUtil.randomLong(yMax, yMin)); + locations = process.getLocations(); + locations.add(dagNodeLocation); + } + } else { + // 随机出一个 x y 坐标 + DagNodeLocation dagNodeLocation = new DagNodeLocation(); + dagNodeLocation.setTaskCode(taskCode); + dagNodeLocation.setX(RandomUtil.randomLong(200, 500)); + dagNodeLocation.setY(RandomUtil.randomLong(100, 400)); + locations.add(dagNodeLocation); + } + + processClient.createOrUpdateProcessDefinition( + projectCode, process.getCode(), process.getName(), taskCode, array.toString(), locations, true); + log.info( + Status.DS_PROCESS_DEFINITION_UPDATE.getMessage(), + process.getName(), + taskCode, + array.toString(), + locations); + } + /** * Pushes an update task to the API. * @@ -186,6 +255,10 @@ public boolean pushUpdateTask( String taskDefinitionJsonObj = JSONUtil.toJsonStr(taskRequest); Long updatedTaskDefinition = taskClient.updateTaskDefinition( projectCode, taskCode, dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj); + JSONObject jsonObject = JSONUtil.parseObj(taskRequest); + JSONArray array = new JSONArray(); + array.set(jsonObject); + updateProcessDefinition(process, taskCode, taskRequest, array, projectCode); if (updatedTaskDefinition != null && updatedTaskDefinition > 0) { log.info(Status.MODIFY_SUCCESS.getMessage()); return true; @@ -239,8 +312,8 @@ public TaskDefinition getTaskDefinitionInfo(long dinkyTaskId) { TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, "DINKY"); TaskDefinition taskDefinition = null; if (taskMainInfo == null) { - log.error(Status.DS_WORK_FLOW_DEFINITION_TASK_NAME_EXIST.getMessage(), processName, taskName); - throw new BusException(Status.DS_WORK_FLOW_DEFINITION_TASK_NAME_EXIST, processName, taskName); + log.error(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST.getMessage(), processName, taskName); + throw new BusException(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST, processName, taskName); } taskDefinition = taskClient.getTaskDefinition(projectCode, taskMainInfo.getTaskCode()); diff --git a/dinky-common/src/main/java/org/dinky/data/enums/Status.java b/dinky-common/src/main/java/org/dinky/data/enums/Status.java index 8e62580cda..c8de16f090 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/Status.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/Status.java @@ -233,6 +233,7 @@ public enum Status { DS_TASK_NOT_EXIST(17007, "ds.task.not.exist"), DS_TASK_TYPE_NOT_SUPPORT(17008, "ds.task.type.not.support"), DS_WORK_FLOW_DEFINITION_NOT_EXIST(17009, "ds.work.flow.definition.not.exist"), + DS_PROCESS_DEFINITION_UPDATE(17010, "ds.work.flow.definition.process.update"), /** * LDAP About * diff --git a/dinky-common/src/main/resources/i18n/messages_en_US.properties b/dinky-common/src/main/resources/i18n/messages_en_US.properties index d58dddabf9..b795c55af5 100644 --- a/dinky-common/src/main/resources/i18n/messages_en_US.properties +++ b/dinky-common/src/main/resources/i18n/messages_en_US.properties @@ -95,7 +95,8 @@ test.msg.title=Real Time alarm mertics user.name.passwd.error=UserName Or Password Not Correct no.prefix=The token was not submitted according to the specified prefix query.success=Query Successfully -ds.work.flow.definition.not.exist=Workflow Definition Not Exist +ds.work.flow.definition.not.exist=Workflow Definition Not Exist, You Can Add Workflow Definition +ds.work.flow.definition.process.update=Workflow Definition [{}] Update, TaskCode: [{}], Parameter 1: [{}], Parameter 2: [{}] tenant.name.exist=Tenant Already Exists failed=Failed added.failed=Added Failed diff --git a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties index 00f0f60653..11e5ae246a 100644 --- a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties +++ b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties @@ -95,7 +95,8 @@ test.msg.title=实时告警监控 user.name.passwd.error=用户名或密码不正确 no.prefix=未按照指定前缀提交 token query.success=查询成功 -ds.work.flow.definition.not.exist=工作流定义不存在 +ds.work.flow.definition.not.exist=工作流定义不存在,你可以添加工作流定义 +ds.work.flow.definition.process.update=工作流定义 [{}] 进行更新,TaskCode: [{}],参数 1: [{}],参数 2: [{}] tenant.name.exist=租户已存在 failed=获取失败 added.failed=新增失败 diff --git a/dinky-scheduler/src/main/java/org/dinky/scheduler/client/ProcessClient.java b/dinky-scheduler/src/main/java/org/dinky/scheduler/client/ProcessClient.java index f41ce7ac51..fe1c492f6c 100644 --- a/dinky-scheduler/src/main/java/org/dinky/scheduler/client/ProcessClient.java +++ b/dinky-scheduler/src/main/java/org/dinky/scheduler/client/ProcessClient.java @@ -22,6 +22,7 @@ import org.dinky.data.model.SystemConfiguration; import org.dinky.scheduler.constant.Constants; import org.dinky.scheduler.model.DagData; +import org.dinky.scheduler.model.DagNodeLocation; import org.dinky.scheduler.model.ProcessDefinition; import org.dinky.scheduler.result.PageInfo; import org.dinky.scheduler.result.Result; @@ -43,9 +44,12 @@ import cn.hutool.core.lang.TypeReference; import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpRequest; +import cn.hutool.http.HttpResponse; import cn.hutool.json.JSONObject; -/** 工作流定义 */ +/** + * 工作流定义 + */ @Component public class ProcessClient { @@ -133,14 +137,20 @@ public DagData getProcessDefinitionInfo(Long projectCode, Long processCode) { /** * Create a new process definition. * - * @param projectCode The ID of the project to create the process definition for. - * @param processName The name of the process definition to create. - * @param taskCode The ID of the task to associate with the process definition. + * @param projectCode The ID of the project to create the process definition for. + * @param processName The name of the process definition to create. + * @param taskCode The ID of the task to associate with the process definition. * @param taskDefinitionJson A JSON string representing the task definition to associate with the process definition. * @return A {@link ProcessDefinition} object representing the newly created process definition. */ - public ProcessDefinition createProcessDefinition( - Long projectCode, String processName, Long taskCode, String taskDefinitionJson) { + public ProcessDefinition createOrUpdateProcessDefinition( + Long projectCode, + Long processCode, + String processName, + Long taskCode, + String taskDefinitionJson, + List locations, + boolean isModify) { String format = StrUtil.format( SystemConfiguration.getInstances().getDolphinschedulerUrl().getValue() + "/projects/{projectCode}/process-definition", @@ -150,11 +160,18 @@ public ProcessDefinition createProcessDefinition( params.put("name", processName); params.put("description", "系统添加"); params.put("tenantCode", "default"); + params.put("locations", locations); params.put("taskRelationJson", ReadFileUtil.taskRelation(Collections.singletonMap("code", taskCode))); params.put("taskDefinitionJson", taskDefinitionJson); params.put("executionType", "PARALLEL"); - String content = HttpRequest.post(format) + HttpRequest httpRequest; + if (!isModify) { + httpRequest = HttpRequest.post(format); + } else { + httpRequest = HttpRequest.put(format + "/" + processCode); + } + HttpResponse httpResponse = httpRequest .header( Constants.TOKEN, SystemConfiguration.getInstances() @@ -162,9 +179,8 @@ public ProcessDefinition createProcessDefinition( .getValue()) .form(params) .timeout(5000) - .execute() - .body(); - + .execute(); + String content = httpResponse.body(); return MyJSONUtil.verifyResult(MyJSONUtil.toBean(content, new TypeReference>() {})); } } diff --git a/dinky-scheduler/src/main/java/org/dinky/scheduler/model/DagNodeLocation.java b/dinky-scheduler/src/main/java/org/dinky/scheduler/model/DagNodeLocation.java new file mode 100644 index 0000000000..da1e473575 --- /dev/null +++ b/dinky-scheduler/src/main/java/org/dinky/scheduler/model/DagNodeLocation.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.scheduler.model; + +import java.io.Serializable; + +import lombok.Data; + +@Data +public class DagNodeLocation implements Serializable { + + private long taskCode; + private long x; + private long y; +} diff --git a/dinky-scheduler/src/main/java/org/dinky/scheduler/model/ProcessDefinition.java b/dinky-scheduler/src/main/java/org/dinky/scheduler/model/ProcessDefinition.java index dea87a0e10..303c72a010 100644 --- a/dinky-scheduler/src/main/java/org/dinky/scheduler/model/ProcessDefinition.java +++ b/dinky-scheduler/src/main/java/org/dinky/scheduler/model/ProcessDefinition.java @@ -23,6 +23,7 @@ import org.dinky.scheduler.enums.ProcessExecutionTypeEnum; import org.dinky.scheduler.enums.ReleaseState; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; @@ -87,7 +88,7 @@ public class ProcessDefinition { private String projectName; @ApiModelProperty(value = "位置") - private String locations; + private List locations = new ArrayList<>(); @ApiModelProperty(value = "计划发布状态 online/offline") private ReleaseState scheduleReleaseState; diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/Explain/index.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/Explain/index.tsx index 91d0bc9083..9cb1d5ff0f 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/Explain/index.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/Explain/index.tsx @@ -71,7 +71,10 @@ const Explain: React.FC = (props: any) => { }; setResult({l('pages.datastudio.explain.validate')}); setExplainData([]); - const result = explainSql(l('pages.datastudio.editor.checking', '', { jobName: current?.name }),param); + const result = explainSql( + l('pages.datastudio.editor.checking', '', { jobName: current?.name }), + param + ); result.then((res) => { const errorExplainData: [] = []; let errorCount: number = 0; diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/PushDolphin/index.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/PushDolphin/index.tsx index 3b9d57456f..5d99bb7de7 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/PushDolphin/index.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/PushDolphin/index.tsx @@ -110,7 +110,7 @@ export const PushDolphin: React.FC = (props) => { false ) as DolphinTaskDefinition; onSubmit(transformPushDolphinParamsValue); - console.log('transformPushDolphinParamsValue', transformPushDolphinParamsValue); + handleCancel(); }; const renderFooter = () => { diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx index c899f75155..26b2ded479 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx @@ -321,7 +321,8 @@ const HeaderContainer = (props: connect) => { // 推送海豚, 此处需要将系统设置中的 ds 的配置拿出来做判断 启用才展示 icon: , title: l('button.push'), - hotKey: (e: KeyboardEvent) => e.ctrlKey && e.key === 's', + hotKey: (e: KeyboardEvent) => e.ctrlKey && e.key === 'e', + hotKeyDesc: 'Ctrl+E', isShow: enabledDs && isCanPushDolphin(currentData), click: () => handlePushDolphinOpen() }, diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx index 4290be5c0a..7af2d60581 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx @@ -17,7 +17,7 @@ * */ -import {handleGetOption, handleOption} from '@/services/BusinessCrud'; +import { handleGetOption, handleOption } from '@/services/BusinessCrud'; import { DIALECT } from '@/services/constants'; export async function explainSql(title: string, params: any) {