Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-4004] Fix unable to execute statements such as create database #4184

Merged
merged 4 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static Executor getDefaultExecutor() {
}

public static Executor buildExecutor(ExecutorConfig executorConfig, DinkyClassLoader classLoader) {
if (executorConfig.isRemote()) {
if (executorConfig.isRemote() && !executorConfig.isPlan()) {
return buildRemoteExecutor(executorConfig, classLoader);
} else {
return buildLocalExecutor(executorConfig, classLoader);
Expand Down
2 changes: 1 addition & 1 deletion dinky-core/src/main/java/org/dinky/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class Job {
private Executor executor;
private boolean useGateway;
private List<String> jids;
private boolean isPipeline = true;
private boolean isPipeline = false;

@Getter
public enum JobStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public Optional<JobClient> execute(JobStatement jobStatement) throws Exception {

@Override
public void run(JobStatement jobStatement) throws Exception {
jobManager.getJob().setPipeline(true);
if (!jobManager.isUseGateway()) {
submitNormal(jobStatement);
} else {
Expand Down Expand Up @@ -177,7 +178,12 @@ private GatewayResult submitNormalWithGateway(JobStatement jobStatement) {
private Pipeline getPipeline(JobStatement jobStatement) {
Pipeline pipeline = getJarStreamGraph(jobStatement.getStatement(), jobManager.getDinkyClassLoader());
if (pipeline instanceof StreamGraph) {
if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath())) {
if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath())
|| (Asserts.isNotNull(jobManager.getConfig().getConfigJson())
&& Asserts.isNotNullString(jobManager
.getConfig()
.getConfigJson()
.get(SavepointConfigOptions.SAVEPOINT_PATH)))) {
((StreamGraph) pipeline)
.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
jobManager.getConfig().getSavePointPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public Optional<JobClient> execute(JobStatement jobStatement) throws Exception {

@Override
public void run(JobStatement jobStatement) throws Exception {
jobManager.getJob().setPipeline(true);
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
jobJarRunner.run(jobStatement);
Expand Down
11 changes: 7 additions & 4 deletions dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public Optional<JobClient> execute(JobStatement jobStatement) throws Exception {
public void run(JobStatement jobStatement) throws Exception {
statements.add(jobStatement);
if (jobStatement.isFinalExecutableStatement()) {
jobManager.getJob().setPipeline(isPipeline());
if (inferStatementSet()) {
handleStatementSet();
} else {
Expand Down Expand Up @@ -225,6 +226,11 @@ public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) {
throw new DinkyException("None jobs in statement.");
}

private boolean isPipeline() {
return statements.stream()
.anyMatch(jobStatement -> jobStatement.getSqlType().isPipeline());
}

private boolean inferStatementSet() {
boolean hasInsert = false;
for (JobStatement item : statements) {
Expand Down Expand Up @@ -266,7 +272,6 @@ private void processWithoutGateway() {

private void processSingleInsertWithGateway() {
List<JobStatement> singleInsert = Collections.singletonList(statements.get(0));
jobManager.getJob().setPipeline(statements.get(0).getSqlType().isPipeline());
GatewayResult gatewayResult = submitByGateway(singleInsert);
setJobResultFromGatewayResult(gatewayResult);
}
Expand All @@ -276,9 +281,7 @@ private void processFirstStatement() throws Exception {
return;
}
// Only process the first statement when not using statement set
JobStatement item = statements.get(0);
jobManager.getJob().setPipeline(item.getSqlType().isPipeline());
processSingleStatement(item);
processSingleStatement(statements.get(0));
}

private void processSingleStatement(JobStatement item) {
Expand Down
Loading