From 39d7a409df0be2ed5d1efd061425caa57c483acd Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 9 Jan 2025 20:37:11 +0800 Subject: [PATCH] [Improve][Zeta] Make sure the local mode CompletableFuture behavior same as server mode (#8476) --- .../starter/seatunnel/command/ClientExecuteCommand.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index ced60e64266..bb7fe67a874 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -38,6 +38,7 @@ import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.common.runtime.ExecutionMode; +import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture; import org.apache.seatunnel.engine.core.job.JobResult; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelNodeContext; @@ -47,6 +48,7 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.instance.impl.HazelcastInstanceFactory; +import com.hazelcast.internal.util.ConcurrencyUtil; import lombok.extern.slf4j.Slf4j; import java.nio.file.Path; @@ -55,7 +57,6 @@ import java.util.Collections; import java.util.List; import java.util.Random; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -276,6 +277,10 @@ private HazelcastInstance createServerInLocal( // set local mode seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL); seaTunnelConfig.getHazelcastConfig().getNetworkConfig().setPortAutoIncrement(true); + + // set the default async executor for Hazelcast InvocationFuture + ConcurrencyUtil.setDefaultAsyncExecutor(CompletableFuture.EXECUTOR); + return HazelcastInstanceFactory.newHazelcastInstance( seaTunnelConfig.getHazelcastConfig(), Thread.currentThread().getName(),