Skip to content

Commit

Permalink
[Feature][flink] Support CALL statement (#4145)
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo authored Jan 15, 2025
1 parent 0bdb863 commit 2ca6b0d
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public void initTenantByTaskId(Integer id) {
Integer tenantId = baseMapper.getTenantByTaskId(id);
Asserts.checkNull(tenantId, Status.TASK_NOT_EXIST.getMessage());
TenantContextHolder.set(tenantId);
log.info("Init task tenan finished..");
log.info("Init task tenant finished..");
}

@Override
Expand Down Expand Up @@ -995,7 +995,7 @@ public Result<Tree<Integer>> queryAllCatalogue() {
@Override
public LineageResult getTaskLineage(Integer id) {
TaskDTO task = getTaskInfoById(id);
if (!Dialect.isCommonSql(task.getDialect())) {
if (Dialect.isCommonSql(task.getDialect())) {
if (Asserts.isNull(task.getDatabaseId())) {
return null;
}
Expand Down
7 changes: 5 additions & 2 deletions dinky-common/src/main/java/org/dinky/data/job/SqlType.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,21 @@ public enum SqlType {

WITH("WITH", "^WITH.*", SqlCategory.DQL),

CALL("CALL", "^CALL.*", SqlCategory.DQL),

UNKNOWN("UNKNOWN", "^UNKNOWN.*", SqlCategory.UNKNOWN);

private String type;
private final Pattern pattern;
private final SqlCategory category;

private static final List<SqlType> TRANS_SQL_TYPES =
Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS, RTAS, UPDATE, DELETE);
Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS, RTAS, UPDATE, DELETE, CALL);

private static final List<SqlType> CTAS_TYPES = Lists.newArrayList(CTAS, RTAS, PRINT);

private static final List<SqlType> PIPELINE_SQL_TYPES = Lists.newArrayList(INSERT, SELECT, WITH, CTAS, RTAS, PRINT);
private static final List<SqlType> PIPELINE_SQL_TYPES =
Lists.newArrayList(INSERT, SELECT, WITH, CTAS, RTAS, PRINT, CALL);

private static final List<SqlType> SINK_MODIFY_SQL_TYPES = Lists.newArrayList(INSERT, CTAS, RTAS, PRINT);

Expand Down
2 changes: 0 additions & 2 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ public JobResult executeJarSql(String statement) throws Exception {
.collect(Collectors.toList());
statement = String.join(";\n", statements);
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.setSubmissionMode(config.isSubmissionMode());
jobStatementPlan.buildFinalStatement();
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
Expand Down Expand Up @@ -282,7 +281,6 @@ public JobResult executeSql(String statement) throws Exception {
ready();
try {
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.setSubmissionMode(config.isSubmissionMode());
jobStatementPlan.buildFinalStatement();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
Expand Down
43 changes: 2 additions & 41 deletions dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,13 @@
public class JobStatementPlan {

private List<JobStatement> jobStatementList = new ArrayList<>();
private boolean isSubmissionMode;

public JobStatementPlan() {}

public List<JobStatement> getJobStatementList() {
return jobStatementList;
}

public boolean isSubmissionMode() {
return isSubmissionMode;
}

public void setSubmissionMode(boolean isSubmissionMode) {
this.isSubmissionMode = isSubmissionMode;
}

public void addJobStatement(String statement, JobStatementType statementType, SqlType sqlType) {
jobStatementList.add(new JobStatement(jobStatementList.size() + 1, statement, statementType, sqlType));
}
Expand All @@ -76,8 +67,8 @@ public void buildFinalStatement() {
if (executableIndex >= 0) {
jobStatementList.get(executableIndex).asFinalExecutableStatement();
} else {
// If there is no INSERT/CTAS/RTAS statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the final
// statement.
// If there is no INSERT/CTAS/RTAS/CALL statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the
// final statement.
for (int i = 0; i < jobStatementList.size(); i++) {
if (jobStatementList.get(i).getStatementType().equals(JobStatementType.SQL)) {
jobStatementList.get(i).asFinalExecutableStatement();
Expand All @@ -94,43 +85,13 @@ public void buildFinalStatement() {

public void checkStatement() {
checkEmptyStatement();
checkSubmissionStatement();
checkPipelineStatement();
}

private void checkEmptyStatement() {
if (jobStatementList.isEmpty()) {
throw new DinkyException("None of valid statement is executed. Please check your statements.");
}
boolean hasSqlStatement = false;
for (JobStatement jobStatement : jobStatementList) {
if (jobStatement.getStatement().trim().isEmpty()) {
throw new DinkyException("The statement cannot be empty. Please check your statements.");
}
if (jobStatement.getStatementType().equals(JobStatementType.SQL)
|| jobStatement.getStatementType().equals(JobStatementType.PIPELINE)
|| jobStatement.getStatementType().equals(JobStatementType.EXECUTE_JAR)) {
hasSqlStatement = true;
}
}
if (!hasSqlStatement) {
throw new DinkyException(
"The statements must contain at least one INSERT/CTAS/RTAS/SELECT/WITH/SHOW/DESC statement.");
}
}

private void checkSubmissionStatement() {
if (isSubmissionMode) {
for (JobStatement jobStatement : jobStatementList) {
if (jobStatement.getStatementType().equals(JobStatementType.SQL)) {
if (!jobStatement.getSqlType().isSinkyModify()) {
throw new DinkyException(
"The submission mode cannot contain one statement which is not a sink operation."
+ "\nThe valid statement is: " + jobStatement.getStatement());
}
}
}
}
}

private void checkPipelineStatement() {
Expand Down
23 changes: 1 addition & 22 deletions dinky-core/src/test/java/org/dinky/job/JobStatementPlanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,7 @@ void testEmptyStatements() {
@Test
void testEmptyStatement() {
JobStatementPlan jobStatementPlan = new JobStatementPlan();
jobStatementPlan.addJobStatement("", JobStatementType.SQL, SqlType.UNKNOWN);
checkInvalidStatement(jobStatementPlan, "The statement cannot be empty. Please check your statements.");
}

@Test
void testNoSqlStatement() {
JobStatementPlan jobStatementPlan = new JobStatementPlan();
jobStatementPlan.addJobStatement("set 'parallelism.default' = '2';\n", JobStatementType.DDL, SqlType.SET);
checkInvalidStatement(
jobStatementPlan,
"The statements must contain at least one INSERT/CTAS/RTAS/SELECT/WITH/SHOW/DESC statement.");
}

@Test
void testSubmissionWithQueryStatement() {
JobStatementPlan jobStatementPlan = new JobStatementPlan();
jobStatementPlan.setSubmissionMode(true);
jobStatementPlan.addJobStatement("select 'A' as name;\n", JobStatementType.SQL, SqlType.SET);
checkInvalidStatement(
jobStatementPlan,
"The submission mode cannot contain one statement which is not a sink operation."
+ "\nThe valid statement is: select 'A' as name;\n");
checkInvalidStatement(jobStatementPlan, "None of valid statement is executed. Please check your statements.");
}

@Test
Expand Down

0 comments on commit 2ca6b0d

Please sign in to comment.