From 90cd46f50ac2f01e11ac5b7002688ea6c657cc82 Mon Sep 17 00:00:00 2001 From: Eric Date: Thu, 5 Sep 2024 18:25:38 +0800 Subject: [PATCH] [Improve][EventService] improve event code and extract event code to EventService (#7153) --- .../seatunnel/engine/server/EventService.java | 100 ++++++++++++++++++ .../engine/server/SeaTunnelServer.java | 10 +- .../engine/server/TaskExecutionService.java | 55 +--------- 3 files changed, 114 insertions(+), 51 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java new file mode 100644 index 00000000000..0c7b654b216 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server; + +import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.seatunnel.api.event.Event; +import org.apache.seatunnel.common.utils.RetryUtils; +import org.apache.seatunnel.engine.server.event.JobEventReportOperation; +import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; + +import com.hazelcast.spi.impl.NodeEngineImpl; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public class EventService { + private final BlockingQueue eventBuffer; + + private ExecutorService eventForwardService; + + private final NodeEngineImpl nodeEngine; + + public EventService(NodeEngineImpl nodeEngine) { + eventBuffer = new ArrayBlockingQueue<>(2048); + initEventForwardService(); + this.nodeEngine = nodeEngine; + } + + private void initEventForwardService() { + eventForwardService = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build()); + eventForwardService.submit( + () -> { + List events = new ArrayList<>(); + RetryUtils.RetryMaterial retryMaterial = + new RetryUtils.RetryMaterial(2, true, e -> true); + while (!Thread.currentThread().isInterrupted()) { + try { + events.clear(); + + Event first = eventBuffer.take(); + events.add(first); + + eventBuffer.drainTo(events, 500); + JobEventReportOperation operation = new JobEventReportOperation(events); + + RetryUtils.retryWithException( + () -> + NodeEngineUtil.sendOperationToMasterNode( + nodeEngine, operation) + .join(), + retryMaterial); + + log.debug("Event forward success, events " + events.size()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Event forward thread interrupted"); + } catch (Throwable t) { + log.warn("Event forward failed, discard events " + events.size(), t); + } + } + }); + } + + public void reportEvent(Event e) { + while (!eventBuffer.offer(e)) { + eventBuffer.poll(); + log.warn("Event buffer is full, discard the oldest event"); + } + } + + public void shutdownNow() { + if (eventForwardService != null) { + eventForwardService.shutdownNow(); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index b76af4c19a0..99cd27d5642 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -76,6 +76,8 @@ public class SeaTunnelServer private volatile boolean isRunning = true; + @Getter private EventService eventService; + public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) { this.liveOperationRegistry = new LiveOperationRegistry(); this.seaTunnelConfig = seaTunnelConfig; @@ -116,6 +118,8 @@ public void init(NodeEngine engine, Properties hzProperties) { new DefaultClassLoaderService( seaTunnelConfig.getEngineConfig().isClassloaderCacheMode()); + eventService = new EventService(nodeEngine); + if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal() == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) { startWorker(); @@ -149,7 +153,7 @@ private void startMaster() { private void startWorker() { taskExecutionService = new TaskExecutionService( - classLoaderService, nodeEngine, nodeEngine.getProperties()); + classLoaderService, nodeEngine, nodeEngine.getProperties(), eventService); nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService); taskExecutionService.start(); getSlotService(); @@ -176,6 +180,10 @@ public void shutdown(boolean terminate) { if (coordinatorService != null) { coordinatorService.shutdown(); } + + if (eventService != null) { + eventService.shutdownNow(); + } } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index b45544def1d..b32dd7c6a97 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -22,7 +22,6 @@ import org.apache.seatunnel.api.tracing.MDCExecutorService; import org.apache.seatunnel.api.tracing.MDCTracer; import org.apache.seatunnel.common.utils.ExceptionUtils; -import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; @@ -32,7 +31,6 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.classloader.ClassLoaderService; import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier; -import org.apache.seatunnel.engine.server.event.JobEventReportOperation; import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException; import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.ProgressState; @@ -51,12 +49,10 @@ import org.apache.seatunnel.engine.server.task.SeaTunnelTask; import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation; import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation; -import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; import org.apache.commons.collections4.CollectionUtils; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.hazelcast.core.OperationTimeoutException; import com.hazelcast.instance.impl.NodeState; import com.hazelcast.internal.metrics.DynamicMetricsProvider; @@ -76,7 +72,6 @@ import java.io.IOException; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -84,7 +79,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -150,13 +144,13 @@ public class TaskExecutionService implements DynamicMetricsProvider { private final ServerConnectorPackageClient serverConnectorPackageClient; - private final BlockingQueue eventBuffer; - private final ExecutorService eventForwardService; + private final EventService eventService; public TaskExecutionService( ClassLoaderService classLoaderService, NodeEngineImpl nodeEngine, - HazelcastProperties properties) { + HazelcastProperties properties, + EventService eventService) { seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); this.hzInstanceName = nodeEngine.getHazelcastInstance().getName(); this.nodeEngine = nodeEngine; @@ -179,42 +173,7 @@ public TaskExecutionService( serverConnectorPackageClient = new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig); - eventBuffer = new ArrayBlockingQueue<>(2048); - eventForwardService = - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build()); - eventForwardService.submit( - () -> { - List events = new ArrayList<>(); - RetryUtils.RetryMaterial retryMaterial = - new RetryUtils.RetryMaterial(2, true, e -> true); - while (!Thread.currentThread().isInterrupted()) { - try { - events.clear(); - - Event first = eventBuffer.take(); - events.add(first); - - eventBuffer.drainTo(events, 500); - JobEventReportOperation operation = new JobEventReportOperation(events); - - RetryUtils.retryWithException( - () -> - NodeEngineUtil.sendOperationToMasterNode( - nodeEngine, operation) - .join(), - retryMaterial); - - logger.fine("Event forward success, events " + events.size()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.info("Event forward thread interrupted"); - } catch (Throwable t) { - logger.warning( - "Event forward failed, discard events " + events.size(), t); - } - } - }); + this.eventService = eventService; } public void start() { @@ -225,7 +184,6 @@ public void shutdown() { isRunning = false; executorService.shutdownNow(); scheduledExecutorService.shutdown(); - eventForwardService.shutdownNow(); } public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) { @@ -691,10 +649,7 @@ public void printTaskExecutionRuntimeInfo() { } public void reportEvent(Event e) { - while (!eventBuffer.offer(e)) { - eventBuffer.poll(); - logger.warning("Event buffer is full, discard the oldest event"); - } + eventService.reportEvent(e); } private final class BlockingWorker implements Runnable {