diff --git a/docs/en/seatunnel-engine/user-command.md b/docs/en/seatunnel-engine/user-command.md index 2bf3fb2c8c9..2504198b2b1 100644 --- a/docs/en/seatunnel-engine/user-command.md +++ b/docs/en/seatunnel-engine/user-command.md @@ -114,9 +114,11 @@ Both failed jobs and jobs paused by seatunnel.sh -s <jobId> can be resumed ## Canceling Jobs ```shell -sh bin/seatunnel.sh -can +sh bin/seatunnel.sh -can [ ...] ``` This command will cancel the specified job. After canceling the job, the job will be stopped and its status will become `CANCELED`. +Supports batch cancellation of jobs, and can cancel multiple jobs at one time. + All breakpoint information of the canceled job will be deleted and cannot be resumed by seatunnel.sh -r <jobId>. diff --git a/docs/zh/seatunnel-engine/user-command.md b/docs/zh/seatunnel-engine/user-command.md index 5066e5615e7..9311ad912f6 100644 --- a/docs/zh/seatunnel-engine/user-command.md +++ b/docs/zh/seatunnel-engine/user-command.md @@ -129,10 +129,12 @@ bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.template ## 取消作业 ```shell -./bin/seatunnel.sh -can +./bin/seatunnel.sh -can [ ...] ``` 该命令会取消指定作业,取消作业后,作业会被停止,作业的状态会变为`CANCELED`。 +支持批量取消作业,可以一次取消多个作业。 + 被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r <jobId>恢复。 diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java index 35e68c81e81..300555a86fa 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java @@ -23,6 +23,9 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.ParameterException; +import java.util.Arrays; +import java.util.List; + import static org.apache.seatunnel.core.starter.constants.SeaTunnelStarterConstants.USAGE_EXIT_CODE; public class CommandLineUtils { @@ -38,6 +41,12 @@ public static T parse(String[] args, T obj) { public static T parse( String[] args, T obj, String programName, boolean acceptUnknownOptions) { + List list = Arrays.asList(args); + if (list.contains("-can") || list.contains("--cancel-job")) { + // When acceptUnknown Options is true, the List parameter cannot be parsed. + // For details, please refer to the official code JCommander.class#DefaultVariableArity + acceptUnknownOptions = false; + } JCommander jCommander = JCommander.newBuilder() .programName(programName) diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java index 67293ba2a32..2189ba7e627 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java @@ -70,8 +70,9 @@ public class ClientCommandArgs extends AbstractCommandArgs { @Parameter( names = {"-can", "--cancel-job"}, + variableArity = true, description = "Cancel job by JobId") - private String cancelJobId; + private List cancelJobId; @Parameter( names = {"--metrics"}, diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index fe22ca221b5..251dc6a1a7e 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -51,6 +51,7 @@ import java.time.Duration; import java.time.LocalDateTime; import java.util.Collections; +import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -113,9 +114,10 @@ public void execute() throws CommandExecuteException { .getJobDetailStatus(Long.parseLong(clientCommandArgs.getJobId())); System.out.println(jobState); } else if (null != clientCommandArgs.getCancelJobId()) { - engineClient - .getJobClient() - .cancelJob(Long.parseLong(clientCommandArgs.getCancelJobId())); + List cancelJobIds = clientCommandArgs.getCancelJobId(); + for (String cancelJobId : cancelJobIds) { + engineClient.getJobClient().cancelJob(Long.parseLong(cancelJobId)); + } } else if (null != clientCommandArgs.getMetricsJobId()) { String jobMetrics = engineClient diff --git a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorBatchCancelTest.java b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorBatchCancelTest.java new file mode 100644 index 00000000000..cb8969c853d --- /dev/null +++ b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorBatchCancelTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.core.starter.seatunnel; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; + +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.testcontainers.containers.Container; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Only support for seatunnel") +@DisabledOnOs(OS.WINDOWS) +@Slf4j +public class SeaTunnelConnectorBatchCancelTest extends TestSuiteBase implements TestResource { + + @Override + public void startUp() throws Exception {} + + @Override + public void tearDown() throws Exception {} + + @TestTemplate + public void task(TestContainer container) throws IOException, InterruptedException { + // Start test task + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/batch_cancel_task_1.conf"); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + }); + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/batch_cancel_task_2.conf"); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + }); + + // Wait for the task to start + Thread.sleep(15000); + + // Get the task id + Container.ExecResult execResult = container.executeBaseCommand(new String[] {"-l"}); + String regex = "(\\d+)\\s+"; + Pattern pattern = Pattern.compile(regex); + List runningJobId = + Arrays.stream(execResult.getStdout().toString().split("\n")) + .filter(s -> s.contains("batch_cancel_task")) + .map( + s -> { + Matcher matcher = pattern.matcher(s); + return matcher.find() ? matcher.group(1) : null; + }) + .filter(jobId -> jobId != null) + .collect(Collectors.toList()); + Assertions.assertEquals(2, runningJobId.size()); + + // Verify that the status is Running + for (String jobId : runningJobId) { + Container.ExecResult execResult1 = + container.executeBaseCommand(new String[] {"-j", jobId}); + String stdout = execResult1.getStdout(); + ObjectNode jsonNodes = JsonUtils.parseObject(stdout); + Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), "RUNNING"); + } + + // Execute batch cancellation tasks + String[] batchCancelCommand = + Stream.concat(Arrays.stream(new String[] {"-can"}), runningJobId.stream()) + .toArray(String[]::new); + Assertions.assertEquals(0, container.executeBaseCommand(batchCancelCommand).getExitCode()); + + // Verify whether the cancellation is successful + for (String jobId : runningJobId) { + Container.ExecResult execResult1 = + container.executeBaseCommand(new String[] {"-j", jobId}); + String stdout = execResult1.getStdout(); + ObjectNode jsonNodes = JsonUtils.parseObject(stdout); + Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), "CANCELED"); + } + } +} diff --git a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/resources/batch_cancel_task_1.conf b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/resources/batch_cancel_task_1.conf new file mode 100644 index 00000000000..dde5420b0af --- /dev/null +++ b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/resources/batch_cancel_task_1.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 3000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + row.num = 10000 + split.num = 5 + split.read-interval = 3000 + schema = { + fields { + id = "int" + name = "string" + age = "int" + } + } + } +} + +transform { +} + +sink { + Console { + source_table_name = "fake" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/resources/batch_cancel_task_2.conf b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/resources/batch_cancel_task_2.conf new file mode 100644 index 00000000000..dde5420b0af --- /dev/null +++ b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/resources/batch_cancel_task_2.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 3000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + row.num = 10000 + split.num = 5 + split.read-interval = 3000 + schema = { + fields { + id = "int" + name = "string" + age = "int" + } + } + } +} + +transform { +} + +sink { + Console { + source_table_name = "fake" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java index 07fef2c295a..e83a3635e80 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java @@ -44,6 +44,11 @@ default Container.ExecResult executeConnectorCheck(String[] args) throw new UnsupportedOperationException("Not implemented"); }; + default Container.ExecResult executeBaseCommand(String[] args) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("Not implemented"); + }; + default Container.ExecResult savepointJob(String jobId) throws IOException, InterruptedException { throw new UnsupportedOperationException("Not implemented"); diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java index 7c2d6dda70e..4a30756f8f7 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java @@ -265,6 +265,15 @@ public Container.ExecResult executeConnectorCheck(String[] args) return executeCommand(server, command); } + public Container.ExecResult executeBaseCommand(String[] args) + throws IOException, InterruptedException { + final List command = new ArrayList<>(); + String binPath = Paths.get(SEATUNNEL_HOME, "bin", getStartShellName()).toString(); + command.add(adaptPathForWin(binPath)); + Arrays.stream(args).forEach(arg -> command.add(arg)); + return executeCommand(server, command); + } + @Override public Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException {