Skip to content

Commit

Permalink
[Improve][EventService] improve event code and extract event code to …
Browse files Browse the repository at this point in the history
…EventService (#7153)
  • Loading branch information
EricJoy2048 authored Sep 5, 2024
1 parent f3ca5a4 commit 90cd46f
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server;

import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class EventService {
private final BlockingQueue<Event> eventBuffer;

private ExecutorService eventForwardService;

private final NodeEngineImpl nodeEngine;

public EventService(NodeEngineImpl nodeEngine) {
eventBuffer = new ArrayBlockingQueue<>(2048);
initEventForwardService();
this.nodeEngine = nodeEngine;
}

private void initEventForwardService() {
eventForwardService =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
eventForwardService.submit(
() -> {
List<Event> events = new ArrayList<>();
RetryUtils.RetryMaterial retryMaterial =
new RetryUtils.RetryMaterial(2, true, e -> true);
while (!Thread.currentThread().isInterrupted()) {
try {
events.clear();

Event first = eventBuffer.take();
events.add(first);

eventBuffer.drainTo(events, 500);
JobEventReportOperation operation = new JobEventReportOperation(events);

RetryUtils.retryWithException(
() ->
NodeEngineUtil.sendOperationToMasterNode(
nodeEngine, operation)
.join(),
retryMaterial);

log.debug("Event forward success, events " + events.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("Event forward thread interrupted");
} catch (Throwable t) {
log.warn("Event forward failed, discard events " + events.size(), t);
}
}
});
}

public void reportEvent(Event e) {
while (!eventBuffer.offer(e)) {
eventBuffer.poll();
log.warn("Event buffer is full, discard the oldest event");
}
}

public void shutdownNow() {
if (eventForwardService != null) {
eventForwardService.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class SeaTunnelServer

private volatile boolean isRunning = true;

@Getter private EventService eventService;

public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
this.liveOperationRegistry = new LiveOperationRegistry();
this.seaTunnelConfig = seaTunnelConfig;
Expand Down Expand Up @@ -116,6 +118,8 @@ public void init(NodeEngine engine, Properties hzProperties) {
new DefaultClassLoaderService(
seaTunnelConfig.getEngineConfig().isClassloaderCacheMode());

eventService = new EventService(nodeEngine);

if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal()
== seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
startWorker();
Expand Down Expand Up @@ -149,7 +153,7 @@ private void startMaster() {
private void startWorker() {
taskExecutionService =
new TaskExecutionService(
classLoaderService, nodeEngine, nodeEngine.getProperties());
classLoaderService, nodeEngine, nodeEngine.getProperties(), eventService);
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
taskExecutionService.start();
getSlotService();
Expand All @@ -176,6 +180,10 @@ public void shutdown(boolean terminate) {
if (coordinatorService != null) {
coordinatorService.shutdown();
}

if (eventService != null) {
eventService.shutdownNow();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.seatunnel.api.tracing.MDCExecutorService;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
Expand All @@ -32,7 +31,6 @@
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.ProgressState;
Expand All @@ -51,12 +49,10 @@
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import org.apache.commons.collections4.CollectionUtils;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.NodeState;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
Expand All @@ -76,15 +72,13 @@

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -150,13 +144,13 @@ public class TaskExecutionService implements DynamicMetricsProvider {

private final ServerConnectorPackageClient serverConnectorPackageClient;

private final BlockingQueue<Event> eventBuffer;
private final ExecutorService eventForwardService;
private final EventService eventService;

public TaskExecutionService(
ClassLoaderService classLoaderService,
NodeEngineImpl nodeEngine,
HazelcastProperties properties) {
HazelcastProperties properties,
EventService eventService) {
seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
this.nodeEngine = nodeEngine;
Expand All @@ -179,42 +173,7 @@ public TaskExecutionService(
serverConnectorPackageClient =
new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);

eventBuffer = new ArrayBlockingQueue<>(2048);
eventForwardService =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
eventForwardService.submit(
() -> {
List<Event> events = new ArrayList<>();
RetryUtils.RetryMaterial retryMaterial =
new RetryUtils.RetryMaterial(2, true, e -> true);
while (!Thread.currentThread().isInterrupted()) {
try {
events.clear();

Event first = eventBuffer.take();
events.add(first);

eventBuffer.drainTo(events, 500);
JobEventReportOperation operation = new JobEventReportOperation(events);

RetryUtils.retryWithException(
() ->
NodeEngineUtil.sendOperationToMasterNode(
nodeEngine, operation)
.join(),
retryMaterial);

logger.fine("Event forward success, events " + events.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Event forward thread interrupted");
} catch (Throwable t) {
logger.warning(
"Event forward failed, discard events " + events.size(), t);
}
}
});
this.eventService = eventService;
}

public void start() {
Expand All @@ -225,7 +184,6 @@ public void shutdown() {
isRunning = false;
executorService.shutdownNow();
scheduledExecutorService.shutdown();
eventForwardService.shutdownNow();
}

public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) {
Expand Down Expand Up @@ -691,10 +649,7 @@ public void printTaskExecutionRuntimeInfo() {
}

public void reportEvent(Event e) {
while (!eventBuffer.offer(e)) {
eventBuffer.poll();
logger.warning("Event buffer is full, discard the oldest event");
}
eventService.reportEvent(e);
}

private final class BlockingWorker implements Runnable {
Expand Down

0 comments on commit 90cd46f

Please sign in to comment.