Skip to content

Commit

Permalink
[Feature-4145][flink] Support CALL statement
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Feb 4, 2025
1 parent e88f1aa commit 266f51c
Show file tree
Hide file tree
Showing 9 changed files with 1,263 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ public void initTenantByTaskId(Integer id) {
Integer tenantId = baseMapper.getTenantByTaskId(id);
Asserts.checkNull(tenantId, Status.TASK_NOT_EXIST.getMessage());
TenantContextHolder.set(tenantId);
log.info("Init task tenan finished..");
log.info("Init task tenant finished..");
}

@Override
Expand Down Expand Up @@ -997,7 +997,7 @@ public Result<Tree<Integer>> queryAllCatalogue() {
@Override
public LineageResult getTaskLineage(Integer id) {
TaskDTO task = getTaskInfoById(id);
if (!Dialect.isCommonSql(task.getDialect())) {
if (Dialect.isCommonSql(task.getDialect())) {
if (Asserts.isNull(task.getDatabaseId())) {
return null;
}
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions dinky-common/src/main/java/org/dinky/data/job/SqlType.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,21 @@ public enum SqlType {

WITH("WITH", "^WITH.*", SqlCategory.DQL),

CALL("CALL", "^CALL.*", SqlCategory.DQL),

UNKNOWN("UNKNOWN", "^UNKNOWN.*", SqlCategory.UNKNOWN);

private String type;
private final Pattern pattern;
private final SqlCategory category;

private static final List<SqlType> TRANS_SQL_TYPES =
Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS, RTAS, UPDATE, DELETE);
Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS, RTAS, UPDATE, DELETE, CALL);

private static final List<SqlType> CTAS_TYPES = Lists.newArrayList(CTAS, RTAS, PRINT);

private static final List<SqlType> PIPELINE_SQL_TYPES = Lists.newArrayList(INSERT, SELECT, WITH, CTAS, RTAS, PRINT);
private static final List<SqlType> PIPELINE_SQL_TYPES =
Lists.newArrayList(INSERT, SELECT, WITH, CTAS, RTAS, PRINT, CALL);

private static final List<SqlType> SINK_MODIFY_SQL_TYPES = Lists.newArrayList(INSERT, CTAS, RTAS, PRINT);

Expand Down
2 changes: 0 additions & 2 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ public JobResult executeJarSql(String statement) throws Exception {
.collect(Collectors.toList());
statement = String.join(";\n", statements);
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.setSubmissionMode(config.isSubmissionMode());
jobStatementPlan.buildFinalStatement();
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
Expand Down Expand Up @@ -282,7 +281,6 @@ public JobResult executeSql(String statement) throws Exception {
ready();
try {
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.setSubmissionMode(config.isSubmissionMode());
jobStatementPlan.buildFinalStatement();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
Expand Down
42 changes: 2 additions & 40 deletions dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,13 @@
public class JobStatementPlan {

private List<JobStatement> jobStatementList = new ArrayList<>();
private boolean isSubmissionMode;

public JobStatementPlan() {}

public List<JobStatement> getJobStatementList() {
return jobStatementList;
}

public boolean isSubmissionMode() {
return isSubmissionMode;
}

public void setSubmissionMode(boolean isSubmissionMode) {
this.isSubmissionMode = isSubmissionMode;
}

public void addJobStatement(String statement, JobStatementType statementType, SqlType sqlType) {
jobStatementList.add(new JobStatement(jobStatementList.size() + 1, statement, statementType, sqlType));
}
Expand All @@ -76,7 +67,8 @@ public void buildFinalStatement() {
if (executableIndex >= 0) {
jobStatementList.get(executableIndex).asFinalExecutableStatement();
} else {
// If there is no INSERT/CTAS/RTAS statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the final
// If there is no INSERT/CTAS/RTAS/CALL statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the
// final
// statement.
for (int i = 0; i < jobStatementList.size(); i++) {
if (jobStatementList.get(i).getStatementType().equals(JobStatementType.SQL)) {
Expand All @@ -94,43 +86,13 @@ public void buildFinalStatement() {

public void checkStatement() {
checkEmptyStatement();
checkSubmissionStatement();
checkPipelineStatement();
}

private void checkEmptyStatement() {
if (jobStatementList.isEmpty()) {
throw new DinkyException("None of valid statement is executed. Please check your statements.");
}
boolean hasSqlStatement = false;
for (JobStatement jobStatement : jobStatementList) {
if (jobStatement.getStatement().trim().isEmpty()) {
throw new DinkyException("The statement cannot be empty. Please check your statements.");
}
if (jobStatement.getStatementType().equals(JobStatementType.SQL)
|| jobStatement.getStatementType().equals(JobStatementType.PIPELINE)
|| jobStatement.getStatementType().equals(JobStatementType.EXECUTE_JAR)) {
hasSqlStatement = true;
}
}
if (!hasSqlStatement) {
throw new DinkyException(
"The statements must contain at least one INSERT/CTAS/RTAS/SELECT/WITH/SHOW/DESC statement.");
}
}

private void checkSubmissionStatement() {
if (isSubmissionMode) {
for (JobStatement jobStatement : jobStatementList) {
if (jobStatement.getStatementType().equals(JobStatementType.SQL)) {
if (!jobStatement.getSqlType().isSinkyModify()) {
throw new DinkyException(
"The submission mode cannot contain one statement which is not a sink operation."
+ "\nThe valid statement is: " + jobStatement.getStatement());
}
}
}
}
}

private void checkPipelineStatement() {
Expand Down
16 changes: 16 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.time.LocalDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -297,6 +301,18 @@ private void updateJobWithTableResult(TableResult tableResult, SqlType sqlType)
jobManager
.getJob()
.setJids(Collections.singletonList(jobManager.getJob().getJobId()));
} else if (ResultKind.SUCCESS_WITH_CONTENT.equals(tableResult.getResultKind())) {
TableResultImpl tableResultImpl = (TableResultImpl) tableResult;
CloseableIterator<Row> rowCloseableIterator = tableResultImpl.collect();
if (rowCloseableIterator.hasNext()) {
Row row = rowCloseableIterator.next();
String jobIDStringData = String.valueOf(row.getField(0));
if (Asserts.isNotNullString(jobIDStringData)) {
String jobID = jobIDStringData.replace("JobID=", "");
jobManager.getJob().setJobId(jobID);
jobManager.getJob().setJids(Collections.singletonList(jobID));
}
}
}

if (jobManager.getConfig().isUseResult()) {
Expand Down
23 changes: 1 addition & 22 deletions dinky-core/src/test/java/org/dinky/job/JobStatementPlanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,7 @@ void testEmptyStatements() {
@Test
void testEmptyStatement() {
JobStatementPlan jobStatementPlan = new JobStatementPlan();
jobStatementPlan.addJobStatement("", JobStatementType.SQL, SqlType.UNKNOWN);
checkInvalidStatement(jobStatementPlan, "The statement cannot be empty. Please check your statements.");
}

@Test
void testNoSqlStatement() {
JobStatementPlan jobStatementPlan = new JobStatementPlan();
jobStatementPlan.addJobStatement("set 'parallelism.default' = '2';\n", JobStatementType.DDL, SqlType.SET);
checkInvalidStatement(
jobStatementPlan,
"The statements must contain at least one INSERT/CTAS/RTAS/SELECT/WITH/SHOW/DESC statement.");
}

@Test
void testSubmissionWithQueryStatement() {
JobStatementPlan jobStatementPlan = new JobStatementPlan();
jobStatementPlan.setSubmissionMode(true);
jobStatementPlan.addJobStatement("select 'A' as name;\n", JobStatementType.SQL, SqlType.SET);
checkInvalidStatement(
jobStatementPlan,
"The submission mode cannot contain one statement which is not a sink operation."
+ "\nThe valid statement is: select 'A' as name;\n");
checkInvalidStatement(jobStatementPlan, "None of valid statement is executed. Please check your statements.");
}

@Test
Expand Down

0 comments on commit 266f51c

Please sign in to comment.