Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support push ds modify process #2668

Merged
merged 2 commits into from
Dec 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.dinky.scheduler.enums.ReleaseState;
import org.dinky.scheduler.exception.SchedulerException;
import org.dinky.scheduler.model.DagData;
import org.dinky.scheduler.model.DagNodeLocation;
import org.dinky.scheduler.model.DinkyTaskParams;
import org.dinky.scheduler.model.DinkyTaskRequest;
import org.dinky.scheduler.model.ProcessDefinition;
Expand All @@ -39,6 +40,8 @@
import org.dinky.service.CatalogueService;
import org.dinky.service.SchedulerService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.springframework.stereotype.Service;
Expand All @@ -47,6 +50,8 @@
import com.google.common.base.Strings;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
Expand Down Expand Up @@ -94,18 +99,25 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
dinkyTaskRequest.setName(taskName);

TaskRequest taskRequest = new TaskRequest();
JSONArray array = new JSONArray();
Long taskCode = taskClient.genTaskCode(projectCode);

if (process == null) {
Long taskCode = taskClient.genTaskCode(projectCode);
dinkyTaskRequest.setCode(taskCode);
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
taskRequest.setFlag(dinkyTaskRequest.getFlag());
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
JSONArray array = new JSONArray();
array.set(jsonObject);
processClient.createProcessDefinition(projectCode, processName, taskCode, array.toString());
log.info(Status.DS_ADD_WORK_FLOW_DEFINITION_SUCCESS.getMessage());
// 随机出一个 x y 坐标
DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(200, 500));
dagNodeLocation.setY(RandomUtil.randomLong(100, 400));
log.info("DagNodeLocation Info: {}", dagNodeLocation);
processClient.createOrUpdateProcessDefinition(
projectCode, null, processName, taskCode, array.toString(), Arrays.asList(dagNodeLocation), false);
}

if (process != null && process.getReleaseState() == ReleaseState.ONLINE) {
Expand All @@ -120,7 +132,6 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
projectCode, taskMainInfo.getProcessDefinitionCode(), taskMainInfo.getTaskCode(), dinkyTaskRequest);
}

Long taskCode = taskClient.genTaskCode(projectCode);
dinkyTaskRequest.setCode(taskCode);
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
Expand All @@ -129,12 +140,70 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
if (process != null) {
taskClient.createTaskDefinition(
projectCode, process.getCode(), dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
// 更新 process 的 location 信息
updateProcessDefinition(process, taskCode, taskRequest, array, projectCode);

log.info(Status.DS_ADD_TASK_DEFINITION_SUCCESS.getMessage());
return true;
}
return false;
}

private void updateProcessDefinition(
ProcessDefinition process, Long taskCode, TaskRequest taskRequest, JSONArray array, long projectCode) {
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
array.set(jsonObject);

List<DagNodeLocation> locations = new ArrayList<>();

if (CollUtil.isNotEmpty(process.getLocations())) {
boolean matched = process.getLocations().stream().anyMatch(location -> location.getTaskCode() == taskCode);
// if not matched, add a new location
if (!matched) {
// 获取最大的 x y 坐标
long xMax = process.getLocations().stream()
.mapToLong(DagNodeLocation::getX)
.max()
.getAsLong();
long xMin = process.getLocations().stream()
.mapToLong(DagNodeLocation::getX)
.min()
.getAsLong();
long yMax = process.getLocations().stream()
.mapToLong(DagNodeLocation::getY)
.max()
.getAsLong();
long yMin = process.getLocations().stream()
.mapToLong(DagNodeLocation::getY)
.max()
.getAsLong();
// 随机出一个 x y 坐标
DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(xMax, xMin));
dagNodeLocation.setY(RandomUtil.randomLong(yMax, yMin));
locations = process.getLocations();
locations.add(dagNodeLocation);
}
} else {
// 随机出一个 x y 坐标
DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(200, 500));
dagNodeLocation.setY(RandomUtil.randomLong(100, 400));
locations.add(dagNodeLocation);
}

processClient.createOrUpdateProcessDefinition(
projectCode, process.getCode(), process.getName(), taskCode, array.toString(), locations, true);
log.info(
Status.DS_PROCESS_DEFINITION_UPDATE.getMessage(),
process.getName(),
taskCode,
array.toString(),
locations);
}

/**
* Pushes an update task to the API.
*
Expand Down Expand Up @@ -186,6 +255,10 @@ public boolean pushUpdateTask(
String taskDefinitionJsonObj = JSONUtil.toJsonStr(taskRequest);
Long updatedTaskDefinition = taskClient.updateTaskDefinition(
projectCode, taskCode, dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
JSONArray array = new JSONArray();
array.set(jsonObject);
updateProcessDefinition(process, taskCode, taskRequest, array, projectCode);
if (updatedTaskDefinition != null && updatedTaskDefinition > 0) {
log.info(Status.MODIFY_SUCCESS.getMessage());
return true;
Expand Down Expand Up @@ -239,8 +312,8 @@ public TaskDefinition getTaskDefinitionInfo(long dinkyTaskId) {
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, "DINKY");
TaskDefinition taskDefinition = null;
if (taskMainInfo == null) {
log.error(Status.DS_WORK_FLOW_DEFINITION_TASK_NAME_EXIST.getMessage(), processName, taskName);
throw new BusException(Status.DS_WORK_FLOW_DEFINITION_TASK_NAME_EXIST, processName, taskName);
log.error(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST.getMessage(), processName, taskName);
throw new BusException(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST, processName, taskName);
}

taskDefinition = taskClient.getTaskDefinition(projectCode, taskMainInfo.getTaskCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public enum Status {
DS_TASK_NOT_EXIST(17007, "ds.task.not.exist"),
DS_TASK_TYPE_NOT_SUPPORT(17008, "ds.task.type.not.support"),
DS_WORK_FLOW_DEFINITION_NOT_EXIST(17009, "ds.work.flow.definition.not.exist"),
DS_PROCESS_DEFINITION_UPDATE(17010, "ds.work.flow.definition.process.update"),

/**
* LDAP About *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ test.msg.title=Real Time alarm mertics
user.name.passwd.error=UserName Or Password Not Correct
no.prefix=The token was not submitted according to the specified prefix
query.success=Query Successfully
ds.work.flow.definition.not.exist=Workflow Definition Not Exist
ds.work.flow.definition.not.exist=Workflow Definition Not Exist, You Can Add Workflow Definition
ds.work.flow.definition.process.update=Workflow Definition [{}] Update, TaskCode: [{}], Parameter 1: [{}], Parameter 2: [{}]
tenant.name.exist=Tenant Already Exists
failed=Failed
added.failed=Added Failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ test.msg.title=实时告警监控
user.name.passwd.error=用户名或密码不正确
no.prefix=未按照指定前缀提交 token
query.success=查询成功
ds.work.flow.definition.not.exist=工作流定义不存在
ds.work.flow.definition.not.exist=工作流定义不存在,你可以添加工作流定义
ds.work.flow.definition.process.update=工作流定义 [{}] 进行更新,TaskCode: [{}],参数 1: [{}],参数 2: [{}]
tenant.name.exist=租户已存在
failed=获取失败
added.failed=新增失败
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.data.model.SystemConfiguration;
import org.dinky.scheduler.constant.Constants;
import org.dinky.scheduler.model.DagData;
import org.dinky.scheduler.model.DagNodeLocation;
import org.dinky.scheduler.model.ProcessDefinition;
import org.dinky.scheduler.result.PageInfo;
import org.dinky.scheduler.result.Result;
Expand All @@ -43,9 +44,12 @@
import cn.hutool.core.lang.TypeReference;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.json.JSONObject;

/** 工作流定义 */
/**
* 工作流定义
*/
@Component
public class ProcessClient {

Expand Down Expand Up @@ -133,14 +137,20 @@ public DagData getProcessDefinitionInfo(Long projectCode, Long processCode) {
/**
* Create a new process definition.
*
* @param projectCode The ID of the project to create the process definition for.
* @param processName The name of the process definition to create.
* @param taskCode The ID of the task to associate with the process definition.
* @param projectCode The ID of the project to create the process definition for.
* @param processName The name of the process definition to create.
* @param taskCode The ID of the task to associate with the process definition.
* @param taskDefinitionJson A JSON string representing the task definition to associate with the process definition.
* @return A {@link ProcessDefinition} object representing the newly created process definition.
*/
public ProcessDefinition createProcessDefinition(
Long projectCode, String processName, Long taskCode, String taskDefinitionJson) {
public ProcessDefinition createOrUpdateProcessDefinition(
Long projectCode,
Long processCode,
String processName,
Long taskCode,
String taskDefinitionJson,
List<DagNodeLocation> locations,
boolean isModify) {
String format = StrUtil.format(
SystemConfiguration.getInstances().getDolphinschedulerUrl().getValue()
+ "/projects/{projectCode}/process-definition",
Expand All @@ -150,21 +160,27 @@ public ProcessDefinition createProcessDefinition(
params.put("name", processName);
params.put("description", "系统添加");
params.put("tenantCode", "default");
params.put("locations", locations);
params.put("taskRelationJson", ReadFileUtil.taskRelation(Collections.singletonMap("code", taskCode)));
params.put("taskDefinitionJson", taskDefinitionJson);
params.put("executionType", "PARALLEL");

String content = HttpRequest.post(format)
HttpRequest httpRequest;
if (!isModify) {
httpRequest = HttpRequest.post(format);
} else {
httpRequest = HttpRequest.put(format + "/" + processCode);
}
HttpResponse httpResponse = httpRequest
.header(
Constants.TOKEN,
SystemConfiguration.getInstances()
.getDolphinschedulerToken()
.getValue())
.form(params)
.timeout(5000)
.execute()
.body();

.execute();
String content = httpResponse.body();
return MyJSONUtil.verifyResult(MyJSONUtil.toBean(content, new TypeReference<Result<ProcessDefinition>>() {}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.scheduler.model;

import java.io.Serializable;

import lombok.Data;

@Data
public class DagNodeLocation implements Serializable {

private long taskCode;
private long x;
private long y;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.scheduler.enums.ProcessExecutionTypeEnum;
import org.dinky.scheduler.enums.ReleaseState;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -87,7 +88,7 @@ public class ProcessDefinition {
private String projectName;

@ApiModelProperty(value = "位置")
private String locations;
private List<DagNodeLocation> locations = new ArrayList<>();

@ApiModelProperty(value = "计划发布状态 online/offline")
private ReleaseState scheduleReleaseState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ const Explain: React.FC<ExplainProps> = (props: any) => {
};
setResult(<Text>{l('pages.datastudio.explain.validate')}</Text>);
setExplainData([]);
const result = explainSql(l('pages.datastudio.editor.checking', '', { jobName: current?.name }),param);
const result = explainSql(
l('pages.datastudio.editor.checking', '', { jobName: current?.name }),
param
);
result.then((res) => {
const errorExplainData: [] = [];
let errorCount: number = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export const PushDolphin: React.FC<PushDolphinProps> = (props) => {
false
) as DolphinTaskDefinition;
onSubmit(transformPushDolphinParamsValue);
console.log('transformPushDolphinParamsValue', transformPushDolphinParamsValue);
handleCancel();
};

const renderFooter = () => {
Expand Down
3 changes: 2 additions & 1 deletion dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ const HeaderContainer = (props: connect) => {
// 推送海豚, 此处需要将系统设置中的 ds 的配置拿出来做判断 启用才展示
icon: <PushpinIcon loading={pushDolphinState.buttonLoading} className={'blue-icon'} />,
title: l('button.push'),
hotKey: (e: KeyboardEvent) => e.ctrlKey && e.key === 's',
hotKey: (e: KeyboardEvent) => e.ctrlKey && e.key === 'e',
hotKeyDesc: 'Ctrl+E',
isShow: enabledDs && isCanPushDolphin(currentData),
click: () => handlePushDolphinOpen()
},
Expand Down
2 changes: 1 addition & 1 deletion dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

import {handleGetOption, handleOption} from '@/services/BusinessCrud';
import { handleGetOption, handleOption } from '@/services/BusinessCrud';
import { DIALECT } from '@/services/constants';

export async function explainSql(title: string, params: any) {
Expand Down
Loading