diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 8fa5e3dbe1..84570f14db 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -568,7 +568,7 @@ public void initTenantByTaskId(Integer id) { Integer tenantId = baseMapper.getTenantByTaskId(id); Asserts.checkNull(tenantId, Status.TASK_NOT_EXIST.getMessage()); TenantContextHolder.set(tenantId); - log.info("Init task tenan finished.."); + log.info("Init task tenant finished.."); } @Override @@ -995,7 +995,7 @@ public Result> queryAllCatalogue() { @Override public LineageResult getTaskLineage(Integer id) { TaskDTO task = getTaskInfoById(id); - if (!Dialect.isCommonSql(task.getDialect())) { + if (Dialect.isCommonSql(task.getDialect())) { if (Asserts.isNull(task.getDatabaseId())) { return null; } diff --git a/dinky-common/src/main/java/org/dinky/data/job/SqlType.java b/dinky-common/src/main/java/org/dinky/data/job/SqlType.java index 7223d097b6..fb5288bf44 100644 --- a/dinky-common/src/main/java/org/dinky/data/job/SqlType.java +++ b/dinky-common/src/main/java/org/dinky/data/job/SqlType.java @@ -88,6 +88,8 @@ public enum SqlType { WITH("WITH", "^WITH.*", SqlCategory.DQL), + CALL("CALL", "^CALL.*", SqlCategory.DQL), + UNKNOWN("UNKNOWN", "^UNKNOWN.*", SqlCategory.UNKNOWN); private String type; @@ -95,11 +97,12 @@ public enum SqlType { private final SqlCategory category; private static final List TRANS_SQL_TYPES = - Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS, RTAS, UPDATE, DELETE); + Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS, RTAS, UPDATE, DELETE, CALL); private static final List CTAS_TYPES = Lists.newArrayList(CTAS, RTAS, PRINT); - private static final List PIPELINE_SQL_TYPES = Lists.newArrayList(INSERT, SELECT, WITH, CTAS, RTAS, PRINT); + private static final List PIPELINE_SQL_TYPES = + Lists.newArrayList(INSERT, SELECT, WITH, CTAS, RTAS, PRINT, CALL); private static final List SINK_MODIFY_SQL_TYPES = Lists.newArrayList(INSERT, CTAS, RTAS, PRINT); diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 322aed3e87..12d9c06e05 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -247,7 +247,6 @@ public JobResult executeJarSql(String statement) throws Exception { .collect(Collectors.toList()); statement = String.join(";\n", statements); jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement)); - jobStatementPlan.setSubmissionMode(config.isSubmissionMode()); jobStatementPlan.buildFinalStatement(); job = Job.build(runMode, config, executorConfig, executor, statement, useGateway); ready(); @@ -282,7 +281,6 @@ public JobResult executeSql(String statement) throws Exception { ready(); try { jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement)); - jobStatementPlan.setSubmissionMode(config.isSubmissionMode()); jobStatementPlan.buildFinalStatement(); JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this); for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) { diff --git a/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java b/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java index 5838d6dd19..6cd1f6aba5 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java +++ b/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java @@ -35,7 +35,6 @@ public class JobStatementPlan { private List jobStatementList = new ArrayList<>(); - private boolean isSubmissionMode; public JobStatementPlan() {} @@ -43,14 +42,6 @@ public List getJobStatementList() { return jobStatementList; } - public boolean isSubmissionMode() { - return isSubmissionMode; - } - - public void setSubmissionMode(boolean isSubmissionMode) { - this.isSubmissionMode = isSubmissionMode; - } - public void addJobStatement(String statement, JobStatementType statementType, SqlType sqlType) { jobStatementList.add(new JobStatement(jobStatementList.size() + 1, statement, statementType, sqlType)); } @@ -76,8 +67,8 @@ public void buildFinalStatement() { if (executableIndex >= 0) { jobStatementList.get(executableIndex).asFinalExecutableStatement(); } else { - // If there is no INSERT/CTAS/RTAS statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the final - // statement. + // If there is no INSERT/CTAS/RTAS/CALL statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the + // final statement. for (int i = 0; i < jobStatementList.size(); i++) { if (jobStatementList.get(i).getStatementType().equals(JobStatementType.SQL)) { jobStatementList.get(i).asFinalExecutableStatement(); @@ -94,7 +85,6 @@ public void buildFinalStatement() { public void checkStatement() { checkEmptyStatement(); - checkSubmissionStatement(); checkPipelineStatement(); } @@ -102,35 +92,6 @@ private void checkEmptyStatement() { if (jobStatementList.isEmpty()) { throw new DinkyException("None of valid statement is executed. Please check your statements."); } - boolean hasSqlStatement = false; - for (JobStatement jobStatement : jobStatementList) { - if (jobStatement.getStatement().trim().isEmpty()) { - throw new DinkyException("The statement cannot be empty. Please check your statements."); - } - if (jobStatement.getStatementType().equals(JobStatementType.SQL) - || jobStatement.getStatementType().equals(JobStatementType.PIPELINE) - || jobStatement.getStatementType().equals(JobStatementType.EXECUTE_JAR)) { - hasSqlStatement = true; - } - } - if (!hasSqlStatement) { - throw new DinkyException( - "The statements must contain at least one INSERT/CTAS/RTAS/SELECT/WITH/SHOW/DESC statement."); - } - } - - private void checkSubmissionStatement() { - if (isSubmissionMode) { - for (JobStatement jobStatement : jobStatementList) { - if (jobStatement.getStatementType().equals(JobStatementType.SQL)) { - if (!jobStatement.getSqlType().isSinkyModify()) { - throw new DinkyException( - "The submission mode cannot contain one statement which is not a sink operation." - + "\nThe valid statement is: " + jobStatement.getStatement()); - } - } - } - } } private void checkPipelineStatement() { diff --git a/dinky-core/src/test/java/org/dinky/job/JobStatementPlanTest.java b/dinky-core/src/test/java/org/dinky/job/JobStatementPlanTest.java index 58a3b76f16..ddd6cee24e 100644 --- a/dinky-core/src/test/java/org/dinky/job/JobStatementPlanTest.java +++ b/dinky-core/src/test/java/org/dinky/job/JobStatementPlanTest.java @@ -37,28 +37,7 @@ void testEmptyStatements() { @Test void testEmptyStatement() { JobStatementPlan jobStatementPlan = new JobStatementPlan(); - jobStatementPlan.addJobStatement("", JobStatementType.SQL, SqlType.UNKNOWN); - checkInvalidStatement(jobStatementPlan, "The statement cannot be empty. Please check your statements."); - } - - @Test - void testNoSqlStatement() { - JobStatementPlan jobStatementPlan = new JobStatementPlan(); - jobStatementPlan.addJobStatement("set 'parallelism.default' = '2';\n", JobStatementType.DDL, SqlType.SET); - checkInvalidStatement( - jobStatementPlan, - "The statements must contain at least one INSERT/CTAS/RTAS/SELECT/WITH/SHOW/DESC statement."); - } - - @Test - void testSubmissionWithQueryStatement() { - JobStatementPlan jobStatementPlan = new JobStatementPlan(); - jobStatementPlan.setSubmissionMode(true); - jobStatementPlan.addJobStatement("select 'A' as name;\n", JobStatementType.SQL, SqlType.SET); - checkInvalidStatement( - jobStatementPlan, - "The submission mode cannot contain one statement which is not a sink operation." - + "\nThe valid statement is: select 'A' as name;\n"); + checkInvalidStatement(jobStatementPlan, "None of valid statement is executed. Please check your statements."); } @Test