Skip to content

Commit

Permalink
separate TaskExecutor and TaskScheduler usage
Browse files Browse the repository at this point in the history
  • Loading branch information
goekay committed Feb 2, 2025
1 parent 5434ac7 commit 535d87c
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 16 deletions.
23 changes: 18 additions & 5 deletions src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
import org.springframework.web.accept.ContentNegotiationManager;
Expand All @@ -59,7 +60,6 @@

import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.Executor;

import static de.rwth.idsg.steve.SteveConfiguration.CONFIG;

Expand Down Expand Up @@ -137,15 +137,28 @@ public DSLContext dslContext(DataSource dataSource) {
return DSL.using(conf);
}

@Bean(name = {"asyncTaskScheduler", "asyncTaskExecutor"})
public ThreadPoolTaskScheduler asyncTaskScheduler() {
@Bean(destroyMethod = "close")
public DelegatingTaskScheduler asyncTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.setThreadNamePrefix("SteVe-Executor-");
scheduler.setThreadNamePrefix("SteVe-TaskScheduler-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(30);
scheduler.initialize();
return scheduler;

return new DelegatingTaskScheduler(scheduler);
}

@Bean(destroyMethod = "close")
public DelegatingTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setThreadNamePrefix("SteVe-TaskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();

return new DelegatingTaskExecutor(executor);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
* Copyright (C) 2013-2025 SteVe Community Team
* All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.rwth.idsg.steve.config;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.io.Closeable;
import java.io.IOException;

/**
* @author Sevket Goekay <[email protected]>
* @since 02.02.2025
*/
@Slf4j
@RequiredArgsConstructor
public class DelegatingTaskExecutor implements Closeable {

private final ThreadPoolTaskExecutor delegate;

@Override
public void close() throws IOException {
log.info("Shutting down");
delegate.shutdown();
}

public void execute(Runnable task) {
delegate.execute(task);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
* Copyright (C) 2013-2025 SteVe Community Team
* All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.rwth.idsg.steve.config;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ScheduledFuture;

/**
* @author Sevket Goekay <[email protected]>
* @since 02.02.2025
*/
@Slf4j
@RequiredArgsConstructor
public class DelegatingTaskScheduler implements Closeable {

private final ThreadPoolTaskScheduler delegate;

@Override
public void close() throws IOException {
log.info("Shutting down");
delegate.shutdown();
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
return delegate.scheduleAtFixedRate(task, startTime, period);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package de.rwth.idsg.steve.ocpp.soap;

import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
import de.rwth.idsg.steve.ocpp.OcppProtocol;
import de.rwth.idsg.steve.repository.OcppServerRepository;
import de.rwth.idsg.steve.repository.impl.ChargePointRepositoryImpl;
Expand All @@ -41,7 +42,6 @@

import javax.xml.namespace.QName;
import java.util.Optional;
import java.util.concurrent.Executor;

import static org.apache.cxf.ws.addressing.JAXWSAConstants.ADDRESSING_PROPERTIES_INBOUND;

Expand All @@ -62,7 +62,7 @@ public class MessageHeaderInterceptor extends AbstractPhaseInterceptor<Message>

@Autowired private OcppServerRepository ocppServerRepository;
@Autowired private ChargePointHelperService chargePointHelperService;
@Autowired private Executor asyncTaskExecutor;
@Autowired private DelegatingTaskExecutor asyncTaskExecutor;

private static final String BOOT_OPERATION_NAME = "BootNotification";
private static final String CHARGEBOX_ID_HEADER = "ChargeBoxIdentity";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Strings;
import de.rwth.idsg.steve.config.WebSocketConfiguration;
import de.rwth.idsg.steve.config.DelegatingTaskScheduler;
import de.rwth.idsg.steve.ocpp.OcppTransport;
import de.rwth.idsg.steve.ocpp.OcppVersion;
import de.rwth.idsg.steve.ocpp.ws.data.CommunicationContext;
Expand All @@ -31,7 +32,6 @@
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PongMessage;
Expand All @@ -55,7 +55,7 @@
*/
public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandler implements SubProtocolCapable {

@Autowired private ThreadPoolTaskScheduler asyncTaskScheduler;
@Autowired private DelegatingTaskScheduler asyncTaskScheduler;
@Autowired private OcppServerRepository ocppServerRepository;
@Autowired private FutureResponseContextStore futureResponseContextStore;
@Autowired private ApplicationEventPublisher applicationEventPublisher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package de.rwth.idsg.steve.service;

import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
import de.rwth.idsg.steve.repository.dto.ChargePointSelect;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

/**
Expand All @@ -32,9 +32,10 @@
*/
@RequiredArgsConstructor
public class BackgroundService {
private final Executor asyncTaskExecutor;

public static BackgroundService with(Executor asyncTaskExecutor) {
private final DelegatingTaskExecutor asyncTaskExecutor;

public static BackgroundService with(DelegatingTaskExecutor asyncTaskExecutor) {
return new BackgroundService(asyncTaskExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package de.rwth.idsg.steve.service;

import de.rwth.idsg.steve.SteveException;
import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
import de.rwth.idsg.steve.ocpp.ChargePointServiceInvokerImpl;
import de.rwth.idsg.steve.ocpp.OcppCallback;
import de.rwth.idsg.steve.ocpp.task.CancelReservationTask;
Expand Down Expand Up @@ -74,7 +75,6 @@
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.Executor;

/**
* @author Sevket Goekay <[email protected]>
Expand All @@ -89,7 +89,7 @@ public class ChargePointServiceClient {
private final ReservationRepository reservationRepository;
private final OcppTagService ocppTagService;

private final Executor asyncTaskExecutor;
private final DelegatingTaskExecutor asyncTaskExecutor;
private final TaskStore taskStore;
private final ChargePointServiceInvokerImpl invoker;

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/de/rwth/idsg/steve/service/MailService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Strings;
import de.rwth.idsg.steve.SteveException;
import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
import de.rwth.idsg.steve.repository.SettingsRepository;
import de.rwth.idsg.steve.repository.dto.MailSettings;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,7 +37,6 @@
import jakarta.mail.internet.MimeMessage;

import java.util.Properties;
import java.util.concurrent.Executor;

/**
* @author Sevket Goekay <[email protected]>
Expand All @@ -47,7 +47,7 @@
public class MailService {

@Autowired private SettingsRepository settingsRepository;
@Autowired private Executor asyncTaskExecutor;
@Autowired private DelegatingTaskExecutor asyncTaskExecutor;

public MailSettings getSettings() {
return settingsRepository.getMailSettings();
Expand Down

0 comments on commit 535d87c

Please sign in to comment.