Skip to content

Commit 5434ac7

Browse files
committed
refactor task/job executors
* migrate from java's executor impl to spring's abstraction: let spring handle the bean lifecycle, graceful shutdown etc. * separate on interface-level between scheduled and async tasks: we only have 1 use case for scheduled tasks (websocket ping-pongs), whereas all other usages of ScheduledExecutorService were just async job submissions. separation might be useful in future, if we want to have distinct thread pools as well.
1 parent 0b0745b commit 5434ac7

File tree

7 files changed

+52
-81
lines changed

7 files changed

+52
-81
lines changed

src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java

Lines changed: 11 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.fasterxml.jackson.databind.DeserializationFeature;
2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.fasterxml.jackson.databind.SerializationFeature;
24-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2524
import com.mysql.cj.conf.PropertyKey;
2625
import com.zaxxer.hikari.HikariConfig;
2726
import com.zaxxer.hikari.HikariDataSource;
@@ -45,6 +44,7 @@
4544
import org.springframework.http.converter.HttpMessageConverter;
4645
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
4746
import org.springframework.scheduling.annotation.EnableScheduling;
47+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
4848
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
4949
import org.springframework.web.accept.ContentNegotiationManager;
5050
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
@@ -55,16 +55,11 @@
5555
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
5656
import org.springframework.web.servlet.view.InternalResourceViewResolver;
5757

58-
import jakarta.annotation.PreDestroy;
5958
import jakarta.validation.Validator;
6059

6160
import javax.sql.DataSource;
6261
import java.util.List;
63-
import java.util.concurrent.ExecutorService;
64-
import java.util.concurrent.ScheduledExecutorService;
65-
import java.util.concurrent.ScheduledThreadPoolExecutor;
66-
import java.util.concurrent.ThreadFactory;
67-
import java.util.concurrent.TimeUnit;
62+
import java.util.concurrent.Executor;
6863

6964
import static de.rwth.idsg.steve.SteveConfiguration.CONFIG;
7065

@@ -81,8 +76,6 @@
8176
@ComponentScan("de.rwth.idsg.steve")
8277
public class BeanConfiguration implements WebMvcConfigurer {
8378

84-
private ScheduledThreadPoolExecutor executor;
85-
8679
/**
8780
* https://github.com/brettwooldridge/HikariCP/wiki/MySQL-Configuration
8881
*/
@@ -144,13 +137,15 @@ public DSLContext dslContext(DataSource dataSource) {
144137
return DSL.using(conf);
145138
}
146139

147-
@Bean
148-
public ScheduledExecutorService scheduledExecutorService() {
149-
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("SteVe-Executor-%d")
150-
.build();
151-
152-
executor = new ScheduledThreadPoolExecutor(5, threadFactory);
153-
return executor;
140+
@Bean(name = {"asyncTaskScheduler", "asyncTaskExecutor"})
141+
public ThreadPoolTaskScheduler asyncTaskScheduler() {
142+
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
143+
scheduler.setPoolSize(5);
144+
scheduler.setThreadNamePrefix("SteVe-Executor-");
145+
scheduler.setWaitForTasksToCompleteOnShutdown(true);
146+
scheduler.setAwaitTerminationSeconds(30);
147+
scheduler.initialize();
148+
return scheduler;
154149
}
155150

156151
@Bean
@@ -173,29 +168,6 @@ public ReleaseCheckService releaseCheckService() {
173168
}
174169
}
175170

176-
@PreDestroy
177-
public void shutDown() {
178-
if (executor != null) {
179-
gracefulShutDown(executor);
180-
}
181-
}
182-
183-
private void gracefulShutDown(ExecutorService executor) {
184-
try {
185-
executor.shutdown();
186-
executor.awaitTermination(30, TimeUnit.SECONDS);
187-
188-
} catch (InterruptedException e) {
189-
log.error("Termination interrupted", e);
190-
191-
} finally {
192-
if (!executor.isTerminated()) {
193-
log.warn("Killing non-finished tasks");
194-
}
195-
executor.shutdownNow();
196-
}
197-
}
198-
199171
// -------------------------------------------------------------------------
200172
// Web config
201173
// -------------------------------------------------------------------------

src/main/java/de/rwth/idsg/steve/config/WebSocketConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
5151
@Autowired private Ocpp16WebSocketEndpoint ocpp16WebSocketEndpoint;
5252

5353
public static final String PATH_INFIX = "/websocket/CentralSystemService/";
54-
public static final long PING_INTERVAL = TimeUnit.MINUTES.toMinutes(15);
54+
public static final Duration PING_INTERVAL = Duration.ofMinutes(15);
5555
public static final Duration IDLE_TIMEOUT = Duration.ofHours(2);
5656
public static final int MAX_MSG_SIZE = 8_388_608; // 8 MB for max message size
5757

src/main/java/de/rwth/idsg/steve/ocpp/soap/MessageHeaderInterceptor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
import javax.xml.namespace.QName;
4343
import java.util.Optional;
44-
import java.util.concurrent.ScheduledExecutorService;
44+
import java.util.concurrent.Executor;
4545

4646
import static org.apache.cxf.ws.addressing.JAXWSAConstants.ADDRESSING_PROPERTIES_INBOUND;
4747

@@ -62,7 +62,7 @@ public class MessageHeaderInterceptor extends AbstractPhaseInterceptor<Message>
6262

6363
@Autowired private OcppServerRepository ocppServerRepository;
6464
@Autowired private ChargePointHelperService chargePointHelperService;
65-
@Autowired private ScheduledExecutorService executorService;
65+
@Autowired private Executor asyncTaskExecutor;
6666

6767
private static final String BOOT_OPERATION_NAME = "BootNotification";
6868
private static final String CHARGEBOX_ID_HEADER = "ChargeBoxIdentity";
@@ -93,7 +93,7 @@ public void handleMessage(Message message) throws Fault {
9393
// 2. update endpoint
9494
// -------------------------------------------------------------------------
9595

96-
executorService.execute(() -> {
96+
asyncTaskExecutor.execute(() -> {
9797
try {
9898
String endpointAddress = getEndpointAddress(message);
9999
if (endpointAddress != null) {

src/main/java/de/rwth/idsg/steve/ocpp/ws/AbstractWebSocketEndpoint.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.joda.time.DateTime;
3232
import org.springframework.beans.factory.annotation.Autowired;
3333
import org.springframework.context.ApplicationEventPublisher;
34+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
3435
import org.springframework.web.socket.BinaryMessage;
3536
import org.springframework.web.socket.CloseStatus;
3637
import org.springframework.web.socket.PongMessage;
@@ -39,14 +40,13 @@
3940
import org.springframework.web.socket.WebSocketMessage;
4041
import org.springframework.web.socket.WebSocketSession;
4142

43+
import java.time.Instant;
4244
import java.util.ArrayList;
4345
import java.util.Collections;
4446
import java.util.Deque;
4547
import java.util.List;
4648
import java.util.Map;
47-
import java.util.concurrent.ScheduledExecutorService;
4849
import java.util.concurrent.ScheduledFuture;
49-
import java.util.concurrent.TimeUnit;
5050
import java.util.function.Consumer;
5151

5252
/**
@@ -55,7 +55,7 @@
5555
*/
5656
public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandler implements SubProtocolCapable {
5757

58-
@Autowired private ScheduledExecutorService service;
58+
@Autowired private ThreadPoolTaskScheduler asyncTaskScheduler;
5959
@Autowired private OcppServerRepository ocppServerRepository;
6060
@Autowired private FutureResponseContextStore futureResponseContextStore;
6161
@Autowired private ApplicationEventPublisher applicationEventPublisher;
@@ -131,11 +131,11 @@ public void onOpen(WebSocketSession session) throws Exception {
131131

132132
// Just to keep the connection alive, such that the servers do not close
133133
// the connection because of a idle timeout, we ping-pong at fixed intervals.
134-
ScheduledFuture pingSchedule = service.scheduleAtFixedRate(
134+
ScheduledFuture pingSchedule = asyncTaskScheduler.scheduleAtFixedRate(
135135
new PingTask(chargeBoxId, session),
136-
WebSocketConfiguration.PING_INTERVAL,
137-
WebSocketConfiguration.PING_INTERVAL,
138-
TimeUnit.MINUTES);
136+
Instant.now().plus(WebSocketConfiguration.PING_INTERVAL),
137+
WebSocketConfiguration.PING_INTERVAL
138+
);
139139

140140
futureResponseContextStore.addSession(session);
141141

src/main/java/de/rwth/idsg/steve/service/BackgroundService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import lombok.RequiredArgsConstructor;
2424

2525
import java.util.List;
26-
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executor;
2727
import java.util.function.Consumer;
2828

2929
/**
@@ -32,10 +32,10 @@
3232
*/
3333
@RequiredArgsConstructor
3434
public class BackgroundService {
35-
private final ExecutorService executorService;
35+
private final Executor asyncTaskExecutor;
3636

37-
public static BackgroundService with(ExecutorService executorService) {
38-
return new BackgroundService(executorService);
37+
public static BackgroundService with(Executor asyncTaskExecutor) {
38+
return new BackgroundService(asyncTaskExecutor);
3939
}
4040

4141
public Runner forFirst(List<ChargePointSelect> list) {
@@ -56,7 +56,7 @@ private class BackgroundSingleRunner implements Runner {
5656

5757
@Override
5858
public void execute(Consumer<ChargePointSelect> consumer) {
59-
executorService.execute(() -> consumer.accept(cps));
59+
asyncTaskExecutor.execute(() -> consumer.accept(cps));
6060
}
6161
}
6262

@@ -66,7 +66,7 @@ private class BackgroundListRunner implements Runner {
6666

6767
@Override
6868
public void execute(Consumer<ChargePointSelect> consumer) {
69-
executorService.execute(() -> list.forEach(consumer));
69+
asyncTaskExecutor.execute(() -> list.forEach(consumer));
7070
}
7171
}
7272
}

0 commit comments

Comments
 (0)