diff --git a/src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java b/src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java index 4ab2ef3fa..6f0f39a42 100644 --- a/src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java +++ b/src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java @@ -44,6 +44,7 @@ import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean; import org.springframework.web.accept.ContentNegotiationManager; @@ -59,7 +60,6 @@ import javax.sql.DataSource; import java.util.List; -import java.util.concurrent.Executor; import static de.rwth.idsg.steve.SteveConfiguration.CONFIG; @@ -137,15 +137,28 @@ public DSLContext dslContext(DataSource dataSource) { return DSL.using(conf); } - @Bean(name = {"asyncTaskScheduler", "asyncTaskExecutor"}) - public ThreadPoolTaskScheduler asyncTaskScheduler() { + @Bean(destroyMethod = "close") + public DelegatingTaskScheduler asyncTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(5); - scheduler.setThreadNamePrefix("SteVe-Executor-"); + scheduler.setThreadNamePrefix("SteVe-TaskScheduler-"); scheduler.setWaitForTasksToCompleteOnShutdown(true); scheduler.setAwaitTerminationSeconds(30); scheduler.initialize(); - return scheduler; + + return new DelegatingTaskScheduler(scheduler); + } + + @Bean(destroyMethod = "close") + public DelegatingTaskExecutor asyncTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setThreadNamePrefix("SteVe-TaskExecutor-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(30); + executor.initialize(); + + return new DelegatingTaskExecutor(executor); } @Bean diff --git a/src/main/java/de/rwth/idsg/steve/config/DelegatingTaskExecutor.java b/src/main/java/de/rwth/idsg/steve/config/DelegatingTaskExecutor.java new file mode 100644 index 000000000..0defde9d5 --- /dev/null +++ b/src/main/java/de/rwth/idsg/steve/config/DelegatingTaskExecutor.java @@ -0,0 +1,48 @@ +/* + * SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve + * Copyright (C) 2013-2025 SteVe Community Team + * All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package de.rwth.idsg.steve.config; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.io.Closeable; +import java.io.IOException; + +/** + * @author Sevket Goekay + * @since 02.02.2025 + */ +@Slf4j +@RequiredArgsConstructor +public class DelegatingTaskExecutor implements Closeable { + + private final ThreadPoolTaskExecutor delegate; + + @Override + public void close() throws IOException { + log.info("Shutting down"); + delegate.shutdown(); + } + + public void execute(Runnable task) { + delegate.execute(task); + } + +} diff --git a/src/main/java/de/rwth/idsg/steve/config/DelegatingTaskScheduler.java b/src/main/java/de/rwth/idsg/steve/config/DelegatingTaskScheduler.java new file mode 100644 index 000000000..5f6110413 --- /dev/null +++ b/src/main/java/de/rwth/idsg/steve/config/DelegatingTaskScheduler.java @@ -0,0 +1,51 @@ +/* + * SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve + * Copyright (C) 2013-2025 SteVe Community Team + * All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package de.rwth.idsg.steve.config; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ScheduledFuture; + +/** + * @author Sevket Goekay + * @since 02.02.2025 + */ +@Slf4j +@RequiredArgsConstructor +public class DelegatingTaskScheduler implements Closeable { + + private final ThreadPoolTaskScheduler delegate; + + @Override + public void close() throws IOException { + log.info("Shutting down"); + delegate.shutdown(); + } + + public ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) { + return delegate.scheduleAtFixedRate(task, startTime, period); + } + +} diff --git a/src/main/java/de/rwth/idsg/steve/ocpp/soap/MessageHeaderInterceptor.java b/src/main/java/de/rwth/idsg/steve/ocpp/soap/MessageHeaderInterceptor.java index 498fdb2ac..d0b3040aa 100644 --- a/src/main/java/de/rwth/idsg/steve/ocpp/soap/MessageHeaderInterceptor.java +++ b/src/main/java/de/rwth/idsg/steve/ocpp/soap/MessageHeaderInterceptor.java @@ -18,6 +18,7 @@ */ package de.rwth.idsg.steve.ocpp.soap; +import de.rwth.idsg.steve.config.DelegatingTaskExecutor; import de.rwth.idsg.steve.ocpp.OcppProtocol; import de.rwth.idsg.steve.repository.OcppServerRepository; import de.rwth.idsg.steve.repository.impl.ChargePointRepositoryImpl; @@ -41,7 +42,6 @@ import javax.xml.namespace.QName; import java.util.Optional; -import java.util.concurrent.Executor; import static org.apache.cxf.ws.addressing.JAXWSAConstants.ADDRESSING_PROPERTIES_INBOUND; @@ -62,7 +62,7 @@ public class MessageHeaderInterceptor extends AbstractPhaseInterceptor @Autowired private OcppServerRepository ocppServerRepository; @Autowired private ChargePointHelperService chargePointHelperService; - @Autowired private Executor asyncTaskExecutor; + @Autowired private DelegatingTaskExecutor asyncTaskExecutor; private static final String BOOT_OPERATION_NAME = "BootNotification"; private static final String CHARGEBOX_ID_HEADER = "ChargeBoxIdentity"; diff --git a/src/main/java/de/rwth/idsg/steve/ocpp/ws/AbstractWebSocketEndpoint.java b/src/main/java/de/rwth/idsg/steve/ocpp/ws/AbstractWebSocketEndpoint.java index 0d1929d4b..5f4782578 100644 --- a/src/main/java/de/rwth/idsg/steve/ocpp/ws/AbstractWebSocketEndpoint.java +++ b/src/main/java/de/rwth/idsg/steve/ocpp/ws/AbstractWebSocketEndpoint.java @@ -20,6 +20,7 @@ import com.google.common.base.Strings; import de.rwth.idsg.steve.config.WebSocketConfiguration; +import de.rwth.idsg.steve.config.DelegatingTaskScheduler; import de.rwth.idsg.steve.ocpp.OcppTransport; import de.rwth.idsg.steve.ocpp.OcppVersion; import de.rwth.idsg.steve.ocpp.ws.data.CommunicationContext; @@ -31,7 +32,6 @@ import org.joda.time.DateTime; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.PongMessage; @@ -55,7 +55,7 @@ */ public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandler implements SubProtocolCapable { - @Autowired private ThreadPoolTaskScheduler asyncTaskScheduler; + @Autowired private DelegatingTaskScheduler asyncTaskScheduler; @Autowired private OcppServerRepository ocppServerRepository; @Autowired private FutureResponseContextStore futureResponseContextStore; @Autowired private ApplicationEventPublisher applicationEventPublisher; diff --git a/src/main/java/de/rwth/idsg/steve/service/BackgroundService.java b/src/main/java/de/rwth/idsg/steve/service/BackgroundService.java index 4b11b11de..3f60476f6 100644 --- a/src/main/java/de/rwth/idsg/steve/service/BackgroundService.java +++ b/src/main/java/de/rwth/idsg/steve/service/BackgroundService.java @@ -18,12 +18,12 @@ */ package de.rwth.idsg.steve.service; +import de.rwth.idsg.steve.config.DelegatingTaskExecutor; import de.rwth.idsg.steve.repository.dto.ChargePointSelect; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import java.util.List; -import java.util.concurrent.Executor; import java.util.function.Consumer; /** @@ -32,9 +32,10 @@ */ @RequiredArgsConstructor public class BackgroundService { - private final Executor asyncTaskExecutor; - public static BackgroundService with(Executor asyncTaskExecutor) { + private final DelegatingTaskExecutor asyncTaskExecutor; + + public static BackgroundService with(DelegatingTaskExecutor asyncTaskExecutor) { return new BackgroundService(asyncTaskExecutor); } diff --git a/src/main/java/de/rwth/idsg/steve/service/ChargePointServiceClient.java b/src/main/java/de/rwth/idsg/steve/service/ChargePointServiceClient.java index f85f7420c..ed5088a2d 100644 --- a/src/main/java/de/rwth/idsg/steve/service/ChargePointServiceClient.java +++ b/src/main/java/de/rwth/idsg/steve/service/ChargePointServiceClient.java @@ -19,6 +19,7 @@ package de.rwth.idsg.steve.service; import de.rwth.idsg.steve.SteveException; +import de.rwth.idsg.steve.config.DelegatingTaskExecutor; import de.rwth.idsg.steve.ocpp.ChargePointServiceInvokerImpl; import de.rwth.idsg.steve.ocpp.OcppCallback; import de.rwth.idsg.steve.ocpp.task.CancelReservationTask; @@ -74,7 +75,6 @@ import org.springframework.stereotype.Service; import java.util.List; -import java.util.concurrent.Executor; /** * @author Sevket Goekay @@ -89,7 +89,7 @@ public class ChargePointServiceClient { private final ReservationRepository reservationRepository; private final OcppTagService ocppTagService; - private final Executor asyncTaskExecutor; + private final DelegatingTaskExecutor asyncTaskExecutor; private final TaskStore taskStore; private final ChargePointServiceInvokerImpl invoker; diff --git a/src/main/java/de/rwth/idsg/steve/service/MailService.java b/src/main/java/de/rwth/idsg/steve/service/MailService.java index c8101a261..bdd88a589 100644 --- a/src/main/java/de/rwth/idsg/steve/service/MailService.java +++ b/src/main/java/de/rwth/idsg/steve/service/MailService.java @@ -20,6 +20,7 @@ import com.google.common.base.Strings; import de.rwth.idsg.steve.SteveException; +import de.rwth.idsg.steve.config.DelegatingTaskExecutor; import de.rwth.idsg.steve.repository.SettingsRepository; import de.rwth.idsg.steve.repository.dto.MailSettings; import lombok.extern.slf4j.Slf4j; @@ -36,7 +37,6 @@ import jakarta.mail.internet.MimeMessage; import java.util.Properties; -import java.util.concurrent.Executor; /** * @author Sevket Goekay @@ -47,7 +47,7 @@ public class MailService { @Autowired private SettingsRepository settingsRepository; - @Autowired private Executor asyncTaskExecutor; + @Autowired private DelegatingTaskExecutor asyncTaskExecutor; public MailSettings getSettings() { return settingsRepository.getMailSettings();