Skip to content

Commit

Permalink
[Fix-4163][flink]Fix set statement is not effective in application mo…
Browse files Browse the repository at this point in the history
…de (#4188)
  • Loading branch information
aiwenmo authored Feb 6, 2025
1 parent fa5287a commit d098f06
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,25 @@
package org.dinky.app.flinksql;

import org.dinky.app.db.DBUtil;
import org.dinky.app.model.StatementParam;
import org.dinky.app.model.SysConfig;
import org.dinky.app.util.FlinkAppUtil;
import org.dinky.assertion.Asserts;
import org.dinky.classloader.DinkyClassLoader;
import org.dinky.config.Dialect;
import org.dinky.constant.CustomerConfigureOptions;
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.data.app.AppParamConfig;
import org.dinky.data.app.AppTask;
import org.dinky.data.constant.DirConstant;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.job.JobStatement;
import org.dinky.data.job.SqlType;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.executor.Executor;
import org.dinky.executor.ExecutorConfig;
import org.dinky.executor.ExecutorFactory;
import org.dinky.explainer.Explainer;
import org.dinky.job.JobRunnerFactory;
import org.dinky.job.JobStatementPlan;
import org.dinky.resource.BaseResourceManager;
import org.dinky.trans.Operations;
import org.dinky.trans.dml.ExecuteJarOperation;
Expand All @@ -60,7 +62,6 @@
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;

import java.io.File;
import java.io.IOException;
Expand All @@ -72,7 +73,6 @@
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -141,7 +141,7 @@ public static void submit(AppParamConfig config) throws SQLException {
if (Dialect.FLINK_JAR == appTask.getDialect()) {
jobClient = executeJarJob(appTask.getType(), executor, statements);
} else {
jobClient = executeJob(executor, statements);
jobClient = executeJob(executor, sql);
}
} finally {
log.info("Start Monitor Job");
Expand Down Expand Up @@ -305,85 +305,31 @@ public static Optional<JobClient> executeJarJob(String type, Executor executor,
return jobClient;
}

public static Optional<JobClient> executeJob(Executor executor, String[] statements) {
public static Optional<JobClient> executeJob(Executor executor, String statements) {
Optional<JobClient> jobClient = Optional.empty();

ExecutorConfig executorConfig = executor.getExecutorConfig();
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
List<StatementParam> execute = new ArrayList<>();

for (String item : statements) {
if (item.isEmpty()) {
continue;
}

SqlType operationType = Operations.getOperationType(item);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
trans.add(new StatementParam(item, operationType));
if (!executorConfig.isUseStatementSet()) {
break;
}
} else if (operationType.equals(SqlType.EXECUTE)) {
execute.add(new StatementParam(item, operationType));
if (!executorConfig.isUseStatementSet()) {
break;
}
} else {
ddl.add(new StatementParam(item, operationType));
}
}

for (StatementParam item : ddl) {
log.info("Executing FlinkSQL: {}", item.getValue());
executor.executeSql(item.getValue());
log.info("Execution succeeded.");
}

if (!trans.isEmpty()) {
if (executorConfig.isUseStatementSet()) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : trans) {
if (item.getType().equals(SqlType.INSERT)) {
inserts.add(item.getValue());
}
}
log.info("Executing FlinkSQL statement set: {}", String.join(FlinkSQLConstant.SEPARATOR, inserts));
TableResult tableResult = executor.executeStatementSet(inserts);
jobClient = tableResult.getJobClient();
log.info("Execution succeeded.");
} else {
// UseStatementSet defaults to true, where the logic is never executed
StatementParam item = trans.get(0);
log.info("Executing FlinkSQL: {}", item.getValue());
TableResult tableResult = executor.executeSql(item.getValue());
jobClient = tableResult.getJobClient();
JobStatementPlan jobStatementPlan =
Explainer.build(executor).parseStatementsForApplicationMode(SqlUtil.getStatements(statements));
jobStatementPlan.buildFinalExecutableStatement();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(executor);
String currentSql = "";
try {
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
currentSql = jobStatement.getStatement();
log.info("Executing FlinkSQL: {}", currentSql);
Optional<JobClient> optionalJobClient = jobRunnerFactory
.getJobRunner(jobStatement.getStatementType())
.execute(jobStatement);
log.info("Execution succeeded.");
}
}

if (!execute.isEmpty()) {
List<String> executes = new ArrayList<>();
for (StatementParam item : execute) {
executes.add(item.getValue());
executor.executeSql(item.getValue());
if (!executorConfig.isUseStatementSet()) {
if (optionalJobClient.isPresent()) {
jobClient = optionalJobClient;
break;
}
}

log.info(
"The FlinkSQL statement set is being executed: {}",
String.join(FlinkSQLConstant.SEPARATOR, executes));
try {
JobClient client = executor.executeAsync(executorConfig.getJobName());
jobClient = Optional.of(client);
log.info("The execution was successful");
} catch (Exception e) {
log.error("Execution failed, {}", e.getMessage(), e);
}
} catch (Exception e) {
log.error("Execution failed. Current statement: {} \n Error: {}", currentSql, e.getMessage(), e);
}
log.info("{} The task is successfully submitted", LocalDateTime.now());
log.info("The task is successfully submitted.");
return jobClient;
}
}
22 changes: 14 additions & 8 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,23 @@
public class Explainer {

private Executor executor;
private boolean useStatementSet;
private JobManager jobManager;

public Explainer(Executor executor, boolean useStatementSet, JobManager jobManager) {
public Explainer(Executor executor) {
this.executor = executor;
}

public Explainer(Executor executor, JobManager jobManager) {
this.executor = executor;
this.useStatementSet = useStatementSet;
this.jobManager = jobManager;
}

public static Explainer build(JobManager jobManager) {
return new Explainer(jobManager.getExecutor(), true, jobManager);
public static Explainer build(Executor executor) {
return new Explainer(executor);
}

public static Explainer build(Executor executor, boolean useStatementSet, JobManager jobManager) {
return new Explainer(executor, useStatementSet, jobManager);
public static Explainer build(JobManager jobManager) {
return new Explainer(jobManager.getExecutor(), jobManager);
}

public JobStatementPlan parseStatements(String[] statements) {
Expand All @@ -93,6 +95,10 @@ public JobStatementPlan parseStatements(String[] statements) {
return jobStatementPlanWithMock;
}

public JobStatementPlan parseStatementsForApplicationMode(String[] statements) {
return executor.parseStatementIntoJobStatementPlan(statements);
}

private void generateUDFStatement(JobStatementPlan jobStatementPlan) {
List<String> udfStatements = new ArrayList<>();
Optional.ofNullable(jobManager.getConfig().getUdfRefer())
Expand Down Expand Up @@ -185,7 +191,7 @@ public List<LineageRel> getLineage(String statement) {
.type(GatewayType.LOCAL.getLongValue())
.useRemote(false)
.fragment(true)
.statementSet(useStatementSet)
.statementSet(false)
.parallelism(1)
.udfRefer(jobManager.getConfig().getUdfRefer())
.configJson(executor.getTableConfig().getConfiguration().toMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,19 @@
public class LineageBuilder {

public static LineageResult getColumnLineageByLogicalPlan(String statement, JobConfig jobConfig) {
JobManager jobManager = JobManager.buildPlanMode(jobConfig);
Explainer explainer = new Explainer(jobManager.getExecutor(), false, jobManager);
return getColumnLineageByLogicalPlan(statement, explainer);
Explainer explainer = Explainer.build(JobManager.buildPlanMode(jobConfig));
return getColumnLineageByLogicalPlan(explainer.getLineage(statement));
}

public static LineageResult getColumnLineageByLogicalPlan(String statement, ExecutorConfig executorConfig) {
JobManager jobManager = JobManager.buildPlanMode(JobConfig.buildPlanConfig());
Executor executor = ExecutorFactory.buildExecutor(executorConfig, jobManager.getDinkyClassLoader());
jobManager.setExecutor(executor);
Explainer explainer = new Explainer(executor, false, jobManager);
return getColumnLineageByLogicalPlan(statement, explainer);
Explainer explainer = Explainer.build(jobManager);
return getColumnLineageByLogicalPlan(explainer.getLineage(statement));
}

public static LineageResult getColumnLineageByLogicalPlan(String statement, Explainer explainer) {
List<LineageRel> lineageRelList = explainer.getLineage(statement);
public static LineageResult getColumnLineageByLogicalPlan(List<LineageRel> lineageRelList) {
List<LineageRelation> relations = new ArrayList<>();
Map<String, LineageTable> tableMap = new HashMap<>();
int tableIndex = 1;
Expand Down
7 changes: 0 additions & 7 deletions dinky-core/src/main/java/org/dinky/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,6 @@ public class JobConfig {
notes = "Flag indicating whether to mock sink function")
private boolean mockSinkFunction;

@ApiModelProperty(
value = "Flag indicating whether to be submission mode",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to be submission mode")
private boolean isSubmissionMode;

@ApiModelProperty(value = "Gateway configuration", dataType = "GatewayConfig", notes = "Gateway configuration")
private GatewayConfig gatewayConfig;

Expand Down
13 changes: 5 additions & 8 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.node.ObjectNode;

Expand Down Expand Up @@ -242,17 +241,14 @@ public boolean close() {

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJarSql(String statement) throws Exception {
List<String> statements = Arrays.stream(SqlUtil.getStatements(statement))
.map(t -> executor.pretreatStatement(t))
.collect(Collectors.toList());
statement = String.join(";\n", statements);
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.buildFinalStatement();
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
try {
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.buildFinalStatement();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
setCurrentSql(jobStatement.getStatement());
jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement);
}
if (job.isFailed()) {
Expand Down Expand Up @@ -284,6 +280,7 @@ public JobResult executeSql(String statement) throws Exception {
jobStatementPlan.buildFinalStatement();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
setCurrentSql(jobStatement.getStatement());
jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement);
}
job.setEndTime(LocalDateTime.now());
Expand Down
5 changes: 5 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
import org.dinky.data.job.JobStatement;
import org.dinky.data.result.SqlExplainResult;

import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.Optional;

public interface JobRunner {

Optional<JobClient> execute(JobStatement jobStatement) throws Exception;

void run(JobStatement jobStatement) throws Exception;

SqlExplainResult explain(JobStatement jobStatement);
Expand Down
13 changes: 13 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.job;

import org.dinky.data.job.JobStatementType;
import org.dinky.executor.Executor;
import org.dinky.job.runner.JobDDLRunner;
import org.dinky.job.runner.JobJarRunner;
import org.dinky.job.runner.JobPipelineRunner;
Expand All @@ -42,6 +43,14 @@ public JobRunnerFactory(JobManager jobManager) {
this.jobJarRunner = new JobJarRunner(jobManager);
}

public JobRunnerFactory(Executor executor) {
this.jobSetRunner = new JobSetRunner(executor);
this.jobSqlRunner = new JobSqlRunner(executor);
this.jobPipelineRunner = new JobPipelineRunner(executor);
this.jobDDLRunner = new JobDDLRunner(executor);
this.jobJarRunner = new JobJarRunner(executor);
}

public JobRunner getJobRunner(JobStatementType jobStatementType) {
switch (jobStatementType) {
case SET:
Expand All @@ -58,6 +67,10 @@ public JobRunner getJobRunner(JobStatementType jobStatementType) {
}
}

public static JobRunnerFactory create(Executor executor) {
return new JobRunnerFactory(executor);
}

public static JobRunnerFactory create(JobManager jobManager) {
return new JobRunnerFactory(jobManager);
}
Expand Down
39 changes: 39 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,30 @@ public void buildFinalStatement() {
}
}

public void buildFinalExecutableStatement() {
checkStatement();

int executableIndex = -1;
for (int i = 0; i < jobStatementList.size(); i++) {
if (jobStatementList.get(i).getSqlType().isPipeline()) {
executableIndex = i;
}
}
if (executableIndex >= 0) {
jobStatementList.get(executableIndex).asFinalExecutableStatement();
} else {
// If there is no INSERT/CTAS/RTAS/CALL statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the
// final
// statement.
for (int i = 0; i < jobStatementList.size(); i++) {
if (jobStatementList.get(i).getStatementType().equals(JobStatementType.SQL)) {
jobStatementList.get(i).asFinalExecutableStatement();
break;
}
}
}
}

public void checkStatement() {
checkEmptyStatement();
checkPipelineStatement();
Expand All @@ -93,6 +117,21 @@ private void checkEmptyStatement() {
if (jobStatementList.isEmpty()) {
throw new DinkyException("None of valid statement is executed. Please check your statements.");
}
boolean hasSqlStatement = false;
for (JobStatement jobStatement : jobStatementList) {
if (jobStatement.getStatement().trim().isEmpty()) {
throw new DinkyException("The statement cannot be empty. Please check your statements.");
}
if (jobStatement.getStatementType().equals(JobStatementType.SQL)
|| jobStatement.getStatementType().equals(JobStatementType.PIPELINE)
|| jobStatement.getStatementType().equals(JobStatementType.EXECUTE_JAR)) {
hasSqlStatement = true;
}
}
if (!hasSqlStatement) {
throw new DinkyException(
"The statements must contain at least one INSERT/CTAS/RTAS/SELECT/WITH/SHOW/DESC statement.");
}
}

private void checkPipelineStatement() {
Expand Down
Loading

0 comments on commit d098f06

Please sign in to comment.