Skip to content

Commit

Permalink
[Feature][Core] shell batch cancel task (#7612)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangshenghang authored Sep 10, 2024
1 parent bc1a6b4 commit 6c7bb04
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 6 deletions.
4 changes: 3 additions & 1 deletion docs/en/seatunnel-engine/user-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ Both failed jobs and jobs paused by seatunnel.sh -s <jobId> can be resumed
## Canceling Jobs

```shell
sh bin/seatunnel.sh -can <jobId>
sh bin/seatunnel.sh -can <jobId1> [<jobId2> <jobId3> ...]
```

This command will cancel the specified job. After canceling the job, the job will be stopped and its status will become `CANCELED`.

Supports batch cancellation of jobs, and can cancel multiple jobs at one time.

All breakpoint information of the canceled job will be deleted and cannot be resumed by seatunnel.sh -r &lt;jobId&gt;.
4 changes: 3 additions & 1 deletion docs/zh/seatunnel-engine/user-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,12 @@ bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.template
## 取消作业

```shell
./bin/seatunnel.sh -can <jobId>
./bin/seatunnel.sh -can <jobId1> [<jobId2> <jobId3> ...]
```

该命令会取消指定作业,取消作业后,作业会被停止,作业的状态会变为`CANCELED`

支持批量取消作业,可以一次取消多个作业。

被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r &lt;jobId&gt;恢复。

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;

import java.util.Arrays;
import java.util.List;

import static org.apache.seatunnel.core.starter.constants.SeaTunnelStarterConstants.USAGE_EXIT_CODE;

public class CommandLineUtils {
Expand All @@ -38,6 +41,12 @@ public static <T extends CommandArgs> T parse(String[] args, T obj) {

public static <T extends CommandArgs> T parse(
String[] args, T obj, String programName, boolean acceptUnknownOptions) {
List<String> list = Arrays.asList(args);
if (list.contains("-can") || list.contains("--cancel-job")) {
// When acceptUnknown Options is true, the List parameter cannot be parsed.
// For details, please refer to the official code JCommander.class#DefaultVariableArity
acceptUnknownOptions = false;
}
JCommander jCommander =
JCommander.newBuilder()
.programName(programName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ public class ClientCommandArgs extends AbstractCommandArgs {

@Parameter(
names = {"-can", "--cancel-job"},
variableArity = true,
description = "Cancel job by JobId")
private String cancelJobId;
private List<String> cancelJobId;

@Parameter(
names = {"--metrics"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -113,9 +114,10 @@ public void execute() throws CommandExecuteException {
.getJobDetailStatus(Long.parseLong(clientCommandArgs.getJobId()));
System.out.println(jobState);
} else if (null != clientCommandArgs.getCancelJobId()) {
engineClient
.getJobClient()
.cancelJob(Long.parseLong(clientCommandArgs.getCancelJobId()));
List<String> cancelJobIds = clientCommandArgs.getCancelJobId();
for (String cancelJobId : cancelJobIds) {
engineClient.getJobClient().cancelJob(Long.parseLong(cancelJobId));
}
} else if (null != clientCommandArgs.getMetricsJobId()) {
String jobMetrics =
engineClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.core.starter.seatunnel;

import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.testcontainers.containers.Container;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Only support for seatunnel")
@DisabledOnOs(OS.WINDOWS)
@Slf4j
public class SeaTunnelConnectorBatchCancelTest extends TestSuiteBase implements TestResource {

@Override
public void startUp() throws Exception {}

@Override
public void tearDown() throws Exception {}

@TestTemplate
public void task(TestContainer container) throws IOException, InterruptedException {
// Start test task
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/batch_cancel_task_1.conf");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
});
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/batch_cancel_task_2.conf");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
});

// Wait for the task to start
Thread.sleep(15000);

// Get the task id
Container.ExecResult execResult = container.executeBaseCommand(new String[] {"-l"});
String regex = "(\\d+)\\s+";
Pattern pattern = Pattern.compile(regex);
List<String> runningJobId =
Arrays.stream(execResult.getStdout().toString().split("\n"))
.filter(s -> s.contains("batch_cancel_task"))
.map(
s -> {
Matcher matcher = pattern.matcher(s);
return matcher.find() ? matcher.group(1) : null;
})
.filter(jobId -> jobId != null)
.collect(Collectors.toList());
Assertions.assertEquals(2, runningJobId.size());

// Verify that the status is Running
for (String jobId : runningJobId) {
Container.ExecResult execResult1 =
container.executeBaseCommand(new String[] {"-j", jobId});
String stdout = execResult1.getStdout();
ObjectNode jsonNodes = JsonUtils.parseObject(stdout);
Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), "RUNNING");
}

// Execute batch cancellation tasks
String[] batchCancelCommand =
Stream.concat(Arrays.stream(new String[] {"-can"}), runningJobId.stream())
.toArray(String[]::new);
Assertions.assertEquals(0, container.executeBaseCommand(batchCancelCommand).getExitCode());

// Verify whether the cancellation is successful
for (String jobId : runningJobId) {
Container.ExecResult execResult1 =
container.executeBaseCommand(new String[] {"-j", jobId});
String stdout = execResult1.getStdout();
ObjectNode jsonNodes = JsonUtils.parseObject(stdout);
Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), "CANCELED");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 3000
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
row.num = 10000
split.num = 5
split.read-interval = 3000
schema = {
fields {
id = "int"
name = "string"
age = "int"
}
}
}
}

transform {
}

sink {
Console {
source_table_name = "fake"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 3000
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
row.num = 10000
split.num = 5
split.read-interval = 3000
schema = {
fields {
id = "int"
name = "string"
age = "int"
}
}
}
}

transform {
}

sink {
Console {
source_table_name = "fake"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ default Container.ExecResult executeConnectorCheck(String[] args)
throw new UnsupportedOperationException("Not implemented");
};

default Container.ExecResult executeBaseCommand(String[] args)
throws IOException, InterruptedException {
throw new UnsupportedOperationException("Not implemented");
};

default Container.ExecResult savepointJob(String jobId)
throws IOException, InterruptedException {
throw new UnsupportedOperationException("Not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ public Container.ExecResult executeConnectorCheck(String[] args)
return executeCommand(server, command);
}

public Container.ExecResult executeBaseCommand(String[] args)
throws IOException, InterruptedException {
final List<String> command = new ArrayList<>();
String binPath = Paths.get(SEATUNNEL_HOME, "bin", getStartShellName()).toString();
command.add(adaptPathForWin(binPath));
Arrays.stream(args).forEach(arg -> command.add(arg));
return executeCommand(server, command);
}

@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
Expand Down

0 comments on commit 6c7bb04

Please sign in to comment.