From 678ead0bc68e11000ec5ddd5a8dd1763961af716 Mon Sep 17 00:00:00 2001 From: yangzehan <76773988+yangzehan@users.noreply.github.com> Date: Sat, 23 Dec 2023 00:37:39 +0800 Subject: [PATCH] Add a normal task to debug and return multiple query results, and a debugging task to save the script state (#2714) * Debugging a normal job returns multiple query result sets * Debugging a normal job returns multiple query result sets * Debugging a normal job returns multiple query result sets * Debugging a normal job returns multiple query result sets * Debugging a normal job returns multiple query result sets * Debugging a normal job returns multiple query result sets * Added test return success, front-end multi-table display * Fix the debugging of a common job, and return the debugging success even if it fails to be executed * Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception * Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception * Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception * Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception * Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception * Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception * Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception * run mvn spotless:apply * Fix the logic that runs SQL statements * The fixed fix is for unlimited query of Flink job results --------- Co-authored-by: Zzm0809 <934230207@qq.com> --- .../org/dinky/service/DataBaseService.java | 2 + .../service/impl/DataBaseServiceImpl.java | 52 ++++++++++++++++ .../dinky/service/impl/TaskServiceImpl.java | 20 ++++++- .../java/org/dinky/service/task/BaseTask.java | 4 ++ .../org/dinky/service/task/CommonSqlTask.java | 9 +++ .../main/java/org/dinky/job/JobResult.java | 5 ++ .../metadata/driver/AbstractJdbcDriver.java | 59 +++++++++++++++++-- .../org/dinky/metadata/driver/Driver.java | 3 + .../metadata/driver/AbstractDriverTest.java | 6 ++ .../BottomContainer/Result/index.tsx | 37 ++++++++++-- .../DataStudio/HeaderContainer/index.tsx | 10 ++++ dinky-web/src/pages/DataStudio/model.ts | 1 + 12 files changed, 198 insertions(+), 10 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/service/DataBaseService.java b/dinky-admin/src/main/java/org/dinky/service/DataBaseService.java index c8245f6311..959d42f83f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/DataBaseService.java +++ b/dinky-admin/src/main/java/org/dinky/service/DataBaseService.java @@ -196,4 +196,6 @@ public interface DataBaseService extends ISuperService { JobResult executeCommonSql(SqlDTO sqlDTO); List selectListByKeyWord(String keyword); + + JobResult StreamExecuteCommonSql(SqlDTO sqlDTO); } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/DataBaseServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/DataBaseServiceImpl.java index f65d70b713..34afbf7a3e 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/DataBaseServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/DataBaseServiceImpl.java @@ -34,6 +34,7 @@ import org.dinky.data.model.SqlGeneration; import org.dinky.data.model.Table; import org.dinky.data.result.SqlExplainResult; +import org.dinky.job.Job; import org.dinky.job.JobResult; import org.dinky.mapper.DataBaseMapper; import org.dinky.metadata.driver.Driver; @@ -48,6 +49,7 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.stream.Stream; import org.springframework.stereotype.Service; @@ -311,4 +313,54 @@ public List selectListByKeyWord(String keyword) { .or() .like(DataBase::getNote, keyword)); } + + @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE_COMMON_SQL) + @Override + public JobResult StreamExecuteCommonSql(SqlDTO sqlDTO) { + JobResult result = new JobResult(); + result.setStatement(sqlDTO.getStatement()); + result.setStartTime(LocalDateTime.now()); + + if (Asserts.isNull(sqlDTO.getDatabaseId())) { + result.setSuccess(false); + result.setError("please assign data source"); + result.setEndTime(LocalDateTime.now()); + return result; + } + DataBase dataBase = getById(sqlDTO.getDatabaseId()); + if (Asserts.isNull(dataBase)) { + result.setSuccess(false); + result.setError("data source not exist."); + result.setEndTime(LocalDateTime.now()); + return result; + } + List jdbcSelectResults = new ArrayList<>(); + try (Driver driver = Driver.build(dataBase.getDriverConfig())) { + Stream jdbcSelectResultStream = + driver.StreamExecuteSql(sqlDTO.getStatement(), sqlDTO.getMaxRowNum()); + jdbcSelectResultStream.forEach(res -> { + jdbcSelectResults.add(res); + if (!res.isSuccess()) { + throw new RuntimeException(); + } + }); + result.setStatus(Job.JobStatus.SUCCESS); + result.setResults(jdbcSelectResults); + result.setSuccess(true); + result.setEndTime(LocalDateTime.now()); + return result; + } catch (RuntimeException e) { + if (!jdbcSelectResults.isEmpty()) { + result.setError( + jdbcSelectResults.get(jdbcSelectResults.size() - 1).getError()); + } else { + result.setError(e.getMessage()); + } + result.setStatus(Job.JobStatus.FAILED); + result.setSuccess(true); + result.setEndTime(LocalDateTime.now()); + result.setResults(jdbcSelectResults); + return result; + } + } } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 6e6afedf4c..7b6157afb0 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -200,6 +200,18 @@ public JobResult executeJob(TaskDTO task) throws Exception { return jobResult; } + @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) + public JobResult executeJob(TaskDTO task, Boolean stream) throws Exception { + JobResult jobResult; + if (stream) { + jobResult = BaseTask.getTask(task).StreamExecute(); + } else { + jobResult = BaseTask.getTask(task).execute(); + } + log.info("execute job finished,status is {}", jobResult.getStatus()); + return jobResult; + } + // Submit and export task @ProcessStep(type = ProcessStepType.SUBMIT_BUILD_CONFIG) public JobConfig buildJobSubmitConfig(TaskDTO task) { @@ -329,7 +341,13 @@ public JobResult debugTask(TaskDTO task) throws Exception { task.setStatementSet(false); // 注解自调用会失效,这里通过获取对象方法绕过此限制 TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class); - JobResult jobResult = taskServiceBean.executeJob(task); + JobResult jobResult; + if (Dialect.isCommonSql(task.getDialect())) { + jobResult = taskServiceBean.executeJob(task, true); + } else { + jobResult = taskServiceBean.executeJob(task); + } + if (Job.JobStatus.SUCCESS == jobResult.getStatus()) { log.info("Job debug success"); Task newTask = new Task(task.getId(), jobResult.getJobInstanceId()); diff --git a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java index 270bc6617f..3f8df088d0 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java @@ -77,4 +77,8 @@ public static BaseTask getTask(TaskDTO taskDTO) { } throw new RuntimeException("Not support dialect: " + taskDTO.getDialect()); } + + public JobResult StreamExecute() { + return null; + } } diff --git a/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java b/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java index 79bb8786a4..760e46dae7 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java @@ -67,6 +67,15 @@ public JobResult execute() { return jobResult; } + @Override + public JobResult StreamExecute() { + log.info("Preparing to execute common sql..."); + SqlDTO sqlDTO = SqlDTO.build(task.getStatement(), task.getDatabaseId(), null); + DataBaseService dataBaseService = SpringUtil.getBean(DataBaseService.class); + JobResult jobResult = dataBaseService.StreamExecuteCommonSql(sqlDTO); + return jobResult; + } + @Override public boolean stop() { return false; diff --git a/dinky-core/src/main/java/org/dinky/job/JobResult.java b/dinky-core/src/main/java/org/dinky/job/JobResult.java index 992bc6ea1c..6f127a6c5d 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobResult.java +++ b/dinky-core/src/main/java/org/dinky/job/JobResult.java @@ -20,8 +20,10 @@ package org.dinky.job; import org.dinky.data.result.IResult; +import org.dinky.metadata.result.JdbcSelectResult; import java.time.LocalDateTime; +import java.util.List; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -95,6 +97,9 @@ public class JobResult { @ApiModelProperty(value = "Result data of the job", dataType = "IResult", notes = "Result data of the job") private IResult result; + @ApiModelProperty(value = "Result data of the job", dataType = "IResult", notes = "Result data of the job") + private List results; + @ApiModelProperty( value = "Start time of job execution", dataType = "LocalDateTime", diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java index 0a0bb7f301..19ff05ad1f 100644 --- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java +++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java @@ -59,6 +59,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; @@ -595,10 +596,9 @@ public JdbcSelectResult query(String sql, Integer limit) { break; } } - result.setSuccess(true); + result.success(); } catch (Exception e) { - result.setError(LogUtil.getError(e)); - result.setSuccess(false); + result.error(LogUtil.getError(e)); log.error("Query failed", e); } close(preparedStatement, results); @@ -627,7 +627,7 @@ public JdbcSelectResult executeSql(String sql, Integer limit) { || type.toUpperCase().contains("DESC") || type.toUpperCase().contains("SQLEXPLAINSTATEMENT")) { log.info("Execute query."); - result = query(item.toString(), limit); + return query(item.toString(), limit); } else if (type.toUpperCase().contains("INSERT") || type.toUpperCase().contains("UPDATE") || type.toUpperCase().contains("DELETE")) { @@ -661,6 +661,57 @@ public JdbcSelectResult executeSql(String sql, Integer limit) { return result; } + @Override + public Stream StreamExecuteSql(String sql, Integer limit) { + // TODO 改为ProcessStep注释 + log.info("Start parse sql..."); + List stmtList = + SQLUtils.parseStatements(sql, config.getType().toLowerCase()); + log.info(CharSequenceUtil.format("A total of {} statement have been Parsed.", stmtList.size())); + log.info("Start execute sql..."); + return stmtList.stream().map(item -> { + List resList = new ArrayList<>(); + JdbcSelectResult result = JdbcSelectResult.buildResult(); + String type = item.getClass().getSimpleName(); + if (type.toUpperCase().contains("SELECT") + || type.toUpperCase().contains("SHOW") + || type.toUpperCase().contains("DESC") + || type.toUpperCase().contains("SQLEXPLAINSTATEMENT")) { + log.info("Execute query."); + return query(item.toString(), limit); + } else if (type.toUpperCase().contains("INSERT") + || type.toUpperCase().contains("UPDATE") + || type.toUpperCase().contains("DELETE")) { + try { + log.info("Execute update."); + resList.add(executeUpdate(item.toString())); + result.setStatusList(resList); + } catch (Exception e) { + resList.add(0); + result.setStatusList(resList); + result.error(LogUtil.getError(e)); + log.error(e.getMessage()); + return result; + } + } else { + try { + log.info("Execute DDL."); + execute(item.toString()); + resList.add(1); + result.setStatusList(resList); + } catch (Exception e) { + resList.add(0); + result.setStatusList(resList); + result.error(LogUtil.getError(e)); + log.error(e.getMessage()); + return result; + } + } + result.success(); + return result; + }); + } + @Override public List explain(String sql) { List sqlExplainResults = new ArrayList<>(); diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java index 2e34f21938..504829d2f8 100644 --- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java +++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; +import java.util.stream.Stream; import cn.hutool.core.text.StrFormatter; @@ -235,4 +236,6 @@ default Set getSplitTables(List tableRegList, Map } List> getSplitSchemaList(); + + Stream StreamExecuteSql(String statement, Integer maxRowNum); } diff --git a/dinky-metadata/dinky-metadata-base/src/test/java/org/dinky/metadata/driver/AbstractDriverTest.java b/dinky-metadata/dinky-metadata-base/src/test/java/org/dinky/metadata/driver/AbstractDriverTest.java index d7cadc73f0..09c2691485 100644 --- a/dinky-metadata/dinky-metadata-base/src/test/java/org/dinky/metadata/driver/AbstractDriverTest.java +++ b/dinky-metadata/dinky-metadata-base/src/test/java/org/dinky/metadata/driver/AbstractDriverTest.java @@ -36,6 +36,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -251,5 +252,10 @@ public List explain(String sql) { public Map getFlinkColumnTypeConversion() { return null; } + + @Override + public Stream StreamExecuteSql(String statement, Integer maxRowNum) { + return null; + } } } diff --git a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx index 6d1c5dc679..53a4edb1a5 100644 --- a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx +++ b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx @@ -32,7 +32,7 @@ import { transformTableDataToCsv } from '@/utils/function'; import { l } from '@/utils/intl'; import { SearchOutlined } from '@ant-design/icons'; import { Highlight } from '@ant-design/pro-layout/es/components/Help/Search'; -import { Button, Empty, Input, InputRef, Space, Table } from 'antd'; +import {Button, Empty, Input, InputRef, Space, Table, Tabs} from 'antd'; import { ColumnsType, ColumnType } from 'antd/es/table'; import { FilterConfirmProps } from 'antd/es/table/interface'; import { DataIndex } from 'rc-table/es/interface'; @@ -44,11 +44,13 @@ type Data = { columns?: string[]; rowData?: object[]; }; +type DataList=Data[]; const Result = (props: any) => { const { tabs: { panes, activeKey } } = props; const [data, setData] = useState({}); + const [dataList, setDataList] = useState([]); const [loading, setLoading] = useState(true); const currentTabs = getCurrentTab(panes, activeKey); const current = getCurrentData(panes, activeKey) ?? {}; @@ -126,10 +128,15 @@ const Result = (props: any) => { return; } + const params = currentTabs.params; const consoleData = currentTabs.console; if (consoleData.result && !isRefresh) { setData(consoleData.result); - } else { + } + else if(consoleData.results && !isRefresh){ + setDataList(consoleData.results) + } + else { if (current.dialect && current.dialect.toLowerCase() == DIALECT.FLINK_SQL) { // flink sql // to do: get job data by history id list, not flink jid @@ -164,8 +171,9 @@ const Result = (props: any) => { useEffect(() => { setData({}); + setDataList([]); loadData(); - }, [currentTabs?.console?.result]); + }, [currentTabs?.console?.result,currentTabs?.console?.results]); const getColumns = (columns: string[]) => { return columns?.map((item) => { @@ -236,8 +244,27 @@ const Result = (props: any) => { loading={loading} /> ) : ( - - )} + dataList.length>0?( + + + {dataList.map((data, index) => { + return ( + +
{ + return { ...item, key: index }; + })} + loading={loading} + /> + + ); + })} + ): + + )} + ); }; diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx index 09ccfe7844..78efa65add 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx @@ -190,6 +190,8 @@ const HeaderContainer = (props: connect) => { const handlerDebug = async () => { if (!currentData) return; + const saved = currentData.step == JOB_LIFE_CYCLE.PUBLISH ? true : await handleSave(); + if (!saved) return; // @ts-ignore const editor = currentTab.monacoInstance.editor .getEditors() @@ -218,6 +220,14 @@ const HeaderContainer = (props: connect) => { await SuccessMessageAsync(l('pages.datastudio.editor.debug.success')); currentData.status = JOB_STATUS.RUNNING; // Common sql task is synchronized, so it needs to automatically update the status to finished. + if (isSql(currentData.dialect)) { + currentData.status = JOB_STATUS.FINISHED; + if (currentTab) currentTab.console.results = res.data.results; + } + else { + if (currentTab) currentTab.console.result = res.data.result; + } + // Common sql task is synchronized, so it needs to automatically update the status to finished. if (isSql(currentData.dialect)) { currentData.status = JOB_STATUS.FINISHED; } diff --git a/dinky-web/src/pages/DataStudio/model.ts b/dinky-web/src/pages/DataStudio/model.ts index 7b8467d753..c03c24e315 100644 --- a/dinky-web/src/pages/DataStudio/model.ts +++ b/dinky-web/src/pages/DataStudio/model.ts @@ -110,6 +110,7 @@ export type TaskType = { }; export type ConsoleType = { + results:{}[]; // eslint-disable-next-line @typescript-eslint/ban-types result: {}; // eslint-disable-next-line @typescript-eslint/ban-types