Skip to content

Commit

Permalink
Add a normal task to debug and return multiple query results, and a d…
Browse files Browse the repository at this point in the history
…ebugging task to save the script state (#2714)

* Debugging a normal job returns multiple query result sets

* Debugging a normal job returns multiple query result sets

* Debugging a normal job returns multiple query result sets

* Debugging a normal job returns multiple query result sets

* Debugging a normal job returns multiple query result sets

* Debugging a normal job returns multiple query result sets

* Added test return success, front-end multi-table display

* Fix the debugging of a common job, and return the debugging success even if it fails to be executed

* Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception

* Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception

* Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception

* Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception

* Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception

* Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception

* Fixed the logic that submitting a debugging task returns success, and now the result set and error message will be returned when encountering an exception

* run mvn spotless:apply

* Fix the logic that runs SQL statements

* The fixed fix is for unlimited query of Flink job results

---------

Co-authored-by: Zzm0809 <[email protected]>
  • Loading branch information
yangzehan and Zzm0809 authored Dec 22, 2023
1 parent 9c58423 commit 678ead0
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,6 @@ public interface DataBaseService extends ISuperService<DataBase> {
JobResult executeCommonSql(SqlDTO sqlDTO);

List<DataBase> selectListByKeyWord(String keyword);

JobResult StreamExecuteCommonSql(SqlDTO sqlDTO);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.dinky.data.model.SqlGeneration;
import org.dinky.data.model.Table;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.job.Job;
import org.dinky.job.JobResult;
import org.dinky.mapper.DataBaseMapper;
import org.dinky.metadata.driver.Driver;
Expand All @@ -48,6 +49,7 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import org.springframework.stereotype.Service;

Expand Down Expand Up @@ -311,4 +313,54 @@ public List<DataBase> selectListByKeyWord(String keyword) {
.or()
.like(DataBase::getNote, keyword));
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE_COMMON_SQL)
@Override
public JobResult StreamExecuteCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement());
result.setStartTime(LocalDateTime.now());

if (Asserts.isNull(sqlDTO.getDatabaseId())) {
result.setSuccess(false);
result.setError("please assign data source");
result.setEndTime(LocalDateTime.now());
return result;
}
DataBase dataBase = getById(sqlDTO.getDatabaseId());
if (Asserts.isNull(dataBase)) {
result.setSuccess(false);
result.setError("data source not exist.");
result.setEndTime(LocalDateTime.now());
return result;
}
List<JdbcSelectResult> jdbcSelectResults = new ArrayList<>();
try (Driver driver = Driver.build(dataBase.getDriverConfig())) {
Stream<JdbcSelectResult> jdbcSelectResultStream =
driver.StreamExecuteSql(sqlDTO.getStatement(), sqlDTO.getMaxRowNum());
jdbcSelectResultStream.forEach(res -> {
jdbcSelectResults.add(res);
if (!res.isSuccess()) {
throw new RuntimeException();
}
});
result.setStatus(Job.JobStatus.SUCCESS);
result.setResults(jdbcSelectResults);
result.setSuccess(true);
result.setEndTime(LocalDateTime.now());
return result;
} catch (RuntimeException e) {
if (!jdbcSelectResults.isEmpty()) {
result.setError(
jdbcSelectResults.get(jdbcSelectResults.size() - 1).getError());
} else {
result.setError(e.getMessage());
}
result.setStatus(Job.JobStatus.FAILED);
result.setSuccess(true);
result.setEndTime(LocalDateTime.now());
result.setResults(jdbcSelectResults);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@ public JobResult executeJob(TaskDTO task) throws Exception {
return jobResult;
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJob(TaskDTO task, Boolean stream) throws Exception {
JobResult jobResult;
if (stream) {
jobResult = BaseTask.getTask(task).StreamExecute();
} else {
jobResult = BaseTask.getTask(task).execute();
}
log.info("execute job finished,status is {}", jobResult.getStatus());
return jobResult;
}

// Submit and export task
@ProcessStep(type = ProcessStepType.SUBMIT_BUILD_CONFIG)
public JobConfig buildJobSubmitConfig(TaskDTO task) {
Expand Down Expand Up @@ -329,7 +341,13 @@ public JobResult debugTask(TaskDTO task) throws Exception {
task.setStatementSet(false);
// 注解自调用会失效,这里通过获取对象方法绕过此限制
TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class);
JobResult jobResult = taskServiceBean.executeJob(task);
JobResult jobResult;
if (Dialect.isCommonSql(task.getDialect())) {
jobResult = taskServiceBean.executeJob(task, true);
} else {
jobResult = taskServiceBean.executeJob(task);
}

if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
log.info("Job debug success");
Task newTask = new Task(task.getId(), jobResult.getJobInstanceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ public static BaseTask getTask(TaskDTO taskDTO) {
}
throw new RuntimeException("Not support dialect: " + taskDTO.getDialect());
}

public JobResult StreamExecute() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ public JobResult execute() {
return jobResult;
}

@Override
public JobResult StreamExecute() {
log.info("Preparing to execute common sql...");
SqlDTO sqlDTO = SqlDTO.build(task.getStatement(), task.getDatabaseId(), null);
DataBaseService dataBaseService = SpringUtil.getBean(DataBaseService.class);
JobResult jobResult = dataBaseService.StreamExecuteCommonSql(sqlDTO);
return jobResult;
}

@Override
public boolean stop() {
return false;
Expand Down
5 changes: 5 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.dinky.job;

import org.dinky.data.result.IResult;
import org.dinky.metadata.result.JdbcSelectResult;

import java.time.LocalDateTime;
import java.util.List;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
Expand Down Expand Up @@ -95,6 +97,9 @@ public class JobResult {
@ApiModelProperty(value = "Result data of the job", dataType = "IResult", notes = "Result data of the job")
private IResult result;

@ApiModelProperty(value = "Result data of the job", dataType = "IResult", notes = "Result data of the job")
private List<JdbcSelectResult> results;

@ApiModelProperty(
value = "Start time of job execution",
dataType = "LocalDateTime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
Expand Down Expand Up @@ -595,10 +596,9 @@ public JdbcSelectResult query(String sql, Integer limit) {
break;
}
}
result.setSuccess(true);
result.success();
} catch (Exception e) {
result.setError(LogUtil.getError(e));
result.setSuccess(false);
result.error(LogUtil.getError(e));
log.error("Query failed", e);
}
close(preparedStatement, results);
Expand Down Expand Up @@ -627,7 +627,7 @@ public JdbcSelectResult executeSql(String sql, Integer limit) {
|| type.toUpperCase().contains("DESC")
|| type.toUpperCase().contains("SQLEXPLAINSTATEMENT")) {
log.info("Execute query.");
result = query(item.toString(), limit);
return query(item.toString(), limit);
} else if (type.toUpperCase().contains("INSERT")
|| type.toUpperCase().contains("UPDATE")
|| type.toUpperCase().contains("DELETE")) {
Expand Down Expand Up @@ -661,6 +661,57 @@ public JdbcSelectResult executeSql(String sql, Integer limit) {
return result;
}

@Override
public Stream<JdbcSelectResult> StreamExecuteSql(String sql, Integer limit) {
// TODO 改为ProcessStep注释
log.info("Start parse sql...");
List<SQLStatement> stmtList =
SQLUtils.parseStatements(sql, config.getType().toLowerCase());
log.info(CharSequenceUtil.format("A total of {} statement have been Parsed.", stmtList.size()));
log.info("Start execute sql...");
return stmtList.stream().map(item -> {
List<Object> resList = new ArrayList<>();
JdbcSelectResult result = JdbcSelectResult.buildResult();
String type = item.getClass().getSimpleName();
if (type.toUpperCase().contains("SELECT")
|| type.toUpperCase().contains("SHOW")
|| type.toUpperCase().contains("DESC")
|| type.toUpperCase().contains("SQLEXPLAINSTATEMENT")) {
log.info("Execute query.");
return query(item.toString(), limit);
} else if (type.toUpperCase().contains("INSERT")
|| type.toUpperCase().contains("UPDATE")
|| type.toUpperCase().contains("DELETE")) {
try {
log.info("Execute update.");
resList.add(executeUpdate(item.toString()));
result.setStatusList(resList);
} catch (Exception e) {
resList.add(0);
result.setStatusList(resList);
result.error(LogUtil.getError(e));
log.error(e.getMessage());
return result;
}
} else {
try {
log.info("Execute DDL.");
execute(item.toString());
resList.add(1);
result.setStatusList(resList);
} catch (Exception e) {
resList.add(0);
result.setStatusList(resList);
result.error(LogUtil.getError(e));
log.error(e.getMessage());
return result;
}
}
result.success();
return result;
});
}

@Override
public List<SqlExplainResult> explain(String sql) {
List<SqlExplainResult> sqlExplainResults = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Stream;

import cn.hutool.core.text.StrFormatter;

Expand Down Expand Up @@ -235,4 +236,6 @@ default Set<Table> getSplitTables(List<String> tableRegList, Map<String, String>
}

List<Map<String, String>> getSplitSchemaList();

Stream<JdbcSelectResult> StreamExecuteSql(String statement, Integer maxRowNum);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -251,5 +252,10 @@ public List<SqlExplainResult> explain(String sql) {
public Map<String, String> getFlinkColumnTypeConversion() {
return null;
}

@Override
public Stream<JdbcSelectResult> StreamExecuteSql(String statement, Integer maxRowNum) {
return null;
}
}
}
37 changes: 32 additions & 5 deletions dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { transformTableDataToCsv } from '@/utils/function';
import { l } from '@/utils/intl';
import { SearchOutlined } from '@ant-design/icons';
import { Highlight } from '@ant-design/pro-layout/es/components/Help/Search';
import { Button, Empty, Input, InputRef, Space, Table } from 'antd';
import {Button, Empty, Input, InputRef, Space, Table, Tabs} from 'antd';
import { ColumnsType, ColumnType } from 'antd/es/table';
import { FilterConfirmProps } from 'antd/es/table/interface';
import { DataIndex } from 'rc-table/es/interface';
Expand All @@ -44,11 +44,13 @@ type Data = {
columns?: string[];
rowData?: object[];
};
type DataList=Data[];
const Result = (props: any) => {
const {
tabs: { panes, activeKey }
} = props;
const [data, setData] = useState<Data>({});
const [dataList, setDataList] = useState<DataList>([]);
const [loading, setLoading] = useState<boolean>(true);
const currentTabs = getCurrentTab(panes, activeKey);
const current = getCurrentData(panes, activeKey) ?? {};
Expand Down Expand Up @@ -126,10 +128,15 @@ const Result = (props: any) => {
return;
}

const params = currentTabs.params;
const consoleData = currentTabs.console;
if (consoleData.result && !isRefresh) {
setData(consoleData.result);
} else {
}
else if(consoleData.results && !isRefresh){
setDataList(consoleData.results)
}
else {
if (current.dialect && current.dialect.toLowerCase() == DIALECT.FLINK_SQL) {
// flink sql
// to do: get job data by history id list, not flink jid
Expand Down Expand Up @@ -164,8 +171,9 @@ const Result = (props: any) => {

useEffect(() => {
setData({});
setDataList([]);
loadData();
}, [currentTabs?.console?.result]);
}, [currentTabs?.console?.result,currentTabs?.console?.results]);

const getColumns = (columns: string[]) => {
return columns?.map((item) => {
Expand Down Expand Up @@ -236,8 +244,27 @@ const Result = (props: any) => {
loading={loading}
/>
) : (
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
)}
dataList.length>0?(

<Tabs defaultActiveKey="0">
{dataList.map((data, index) => {
return (
<Tabs.TabPane key={index} tab={`Table ${index + 1}`}>
<Table
columns={getColumns(data.columns)}
size='small'
dataSource={data.rowData?.map((item: any, index: number) => {
return { ...item, key: index };
})}
loading={loading}
/>
</Tabs.TabPane>
);
})}
</Tabs>):
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
)}

</div>
);
};
Expand Down
10 changes: 10 additions & 0 deletions dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ const HeaderContainer = (props: connect) => {

const handlerDebug = async () => {
if (!currentData) return;
const saved = currentData.step == JOB_LIFE_CYCLE.PUBLISH ? true : await handleSave();
if (!saved) return;
// @ts-ignore
const editor = currentTab.monacoInstance.editor
.getEditors()
Expand Down Expand Up @@ -218,6 +220,14 @@ const HeaderContainer = (props: connect) => {
await SuccessMessageAsync(l('pages.datastudio.editor.debug.success'));
currentData.status = JOB_STATUS.RUNNING;
// Common sql task is synchronized, so it needs to automatically update the status to finished.
if (isSql(currentData.dialect)) {
currentData.status = JOB_STATUS.FINISHED;
if (currentTab) currentTab.console.results = res.data.results;
}
else {
if (currentTab) currentTab.console.result = res.data.result;
}
// Common sql task is synchronized, so it needs to automatically update the status to finished.
if (isSql(currentData.dialect)) {
currentData.status = JOB_STATUS.FINISHED;
}
Expand Down
1 change: 1 addition & 0 deletions dinky-web/src/pages/DataStudio/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ export type TaskType = {
};

export type ConsoleType = {
results:{}[];
// eslint-disable-next-line @typescript-eslint/ban-types
result: {};
// eslint-disable-next-line @typescript-eslint/ban-types
Expand Down

0 comments on commit 678ead0

Please sign in to comment.