Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor task/job executors #1686

Merged
merged 2 commits into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 23 additions & 38 deletions src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mysql.cj.conf.PropertyKey;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
Expand All @@ -45,6 +44,8 @@
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
import org.springframework.web.accept.ContentNegotiationManager;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
Expand All @@ -55,16 +56,10 @@
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
import org.springframework.web.servlet.view.InternalResourceViewResolver;

import jakarta.annotation.PreDestroy;
import jakarta.validation.Validator;

import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static de.rwth.idsg.steve.SteveConfiguration.CONFIG;

Expand All @@ -81,8 +76,6 @@
@ComponentScan("de.rwth.idsg.steve")
public class BeanConfiguration implements WebMvcConfigurer {

private ScheduledThreadPoolExecutor executor;

/**
* https://github.com/brettwooldridge/HikariCP/wiki/MySQL-Configuration
*/
Expand Down Expand Up @@ -144,13 +137,28 @@ public DSLContext dslContext(DataSource dataSource) {
return DSL.using(conf);
}

@Bean
public ScheduledExecutorService scheduledExecutorService() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("SteVe-Executor-%d")
.build();
@Bean(destroyMethod = "close")
public DelegatingTaskScheduler asyncTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.setThreadNamePrefix("SteVe-TaskScheduler-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(30);
scheduler.initialize();

executor = new ScheduledThreadPoolExecutor(5, threadFactory);
return executor;
return new DelegatingTaskScheduler(scheduler);
}

@Bean(destroyMethod = "close")
public DelegatingTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setThreadNamePrefix("SteVe-TaskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();

return new DelegatingTaskExecutor(executor);
}

@Bean
Expand All @@ -173,29 +181,6 @@ public ReleaseCheckService releaseCheckService() {
}
}

@PreDestroy
public void shutDown() {
if (executor != null) {
gracefulShutDown(executor);
}
}

private void gracefulShutDown(ExecutorService executor) {
try {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);

} catch (InterruptedException e) {
log.error("Termination interrupted", e);

} finally {
if (!executor.isTerminated()) {
log.warn("Killing non-finished tasks");
}
executor.shutdownNow();
}
}

// -------------------------------------------------------------------------
// Web config
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
* Copyright (C) 2013-2025 SteVe Community Team
* All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.rwth.idsg.steve.config;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.io.Closeable;
import java.io.IOException;

/**
* @author Sevket Goekay <[email protected]>
* @since 02.02.2025
*/
@Slf4j
@RequiredArgsConstructor
public class DelegatingTaskExecutor implements Closeable {

private final ThreadPoolTaskExecutor delegate;

@Override
public void close() throws IOException {
log.info("Shutting down");
delegate.shutdown();
}

public void execute(Runnable task) {
delegate.execute(task);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
* Copyright (C) 2013-2025 SteVe Community Team
* All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.rwth.idsg.steve.config;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ScheduledFuture;

/**
* @author Sevket Goekay <[email protected]>
* @since 02.02.2025
*/
@Slf4j
@RequiredArgsConstructor
public class DelegatingTaskScheduler implements Closeable {

private final ThreadPoolTaskScheduler delegate;

@Override
public void close() throws IOException {
log.info("Shutting down");
delegate.shutdown();
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
return delegate.scheduleAtFixedRate(task, startTime, period);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
@Autowired private Ocpp16WebSocketEndpoint ocpp16WebSocketEndpoint;

public static final String PATH_INFIX = "/websocket/CentralSystemService/";
public static final long PING_INTERVAL = TimeUnit.MINUTES.toMinutes(15);
public static final Duration PING_INTERVAL = Duration.ofMinutes(15);
public static final Duration IDLE_TIMEOUT = Duration.ofHours(2);
public static final int MAX_MSG_SIZE = 8_388_608; // 8 MB for max message size

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package de.rwth.idsg.steve.ocpp.soap;

import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
import de.rwth.idsg.steve.ocpp.OcppProtocol;
import de.rwth.idsg.steve.repository.OcppServerRepository;
import de.rwth.idsg.steve.repository.impl.ChargePointRepositoryImpl;
Expand All @@ -41,7 +42,6 @@

import javax.xml.namespace.QName;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.cxf.ws.addressing.JAXWSAConstants.ADDRESSING_PROPERTIES_INBOUND;

Expand All @@ -62,7 +62,7 @@ public class MessageHeaderInterceptor extends AbstractPhaseInterceptor<Message>

@Autowired private OcppServerRepository ocppServerRepository;
@Autowired private ChargePointHelperService chargePointHelperService;
@Autowired private ScheduledExecutorService executorService;
@Autowired private DelegatingTaskExecutor asyncTaskExecutor;

private static final String BOOT_OPERATION_NAME = "BootNotification";
private static final String CHARGEBOX_ID_HEADER = "ChargeBoxIdentity";
Expand Down Expand Up @@ -93,7 +93,7 @@ public void handleMessage(Message message) throws Fault {
// 2. update endpoint
// -------------------------------------------------------------------------

executorService.execute(() -> {
asyncTaskExecutor.execute(() -> {
try {
String endpointAddress = getEndpointAddress(message);
if (endpointAddress != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Strings;
import de.rwth.idsg.steve.config.WebSocketConfiguration;
import de.rwth.idsg.steve.config.DelegatingTaskScheduler;
import de.rwth.idsg.steve.ocpp.OcppTransport;
import de.rwth.idsg.steve.ocpp.OcppVersion;
import de.rwth.idsg.steve.ocpp.ws.data.CommunicationContext;
Expand All @@ -39,14 +40,13 @@
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
Expand All @@ -55,7 +55,7 @@
*/
public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandler implements SubProtocolCapable {

@Autowired private ScheduledExecutorService service;
@Autowired private DelegatingTaskScheduler asyncTaskScheduler;
@Autowired private OcppServerRepository ocppServerRepository;
@Autowired private FutureResponseContextStore futureResponseContextStore;
@Autowired private ApplicationEventPublisher applicationEventPublisher;
Expand Down Expand Up @@ -131,11 +131,11 @@ public void onOpen(WebSocketSession session) throws Exception {

// Just to keep the connection alive, such that the servers do not close
// the connection because of a idle timeout, we ping-pong at fixed intervals.
ScheduledFuture pingSchedule = service.scheduleAtFixedRate(
ScheduledFuture pingSchedule = asyncTaskScheduler.scheduleAtFixedRate(
new PingTask(chargeBoxId, session),
WebSocketConfiguration.PING_INTERVAL,
WebSocketConfiguration.PING_INTERVAL,
TimeUnit.MINUTES);
Instant.now().plus(WebSocketConfiguration.PING_INTERVAL),
WebSocketConfiguration.PING_INTERVAL
);

futureResponseContextStore.addSession(session);

Expand Down
13 changes: 7 additions & 6 deletions src/main/java/de/rwth/idsg/steve/service/BackgroundService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package de.rwth.idsg.steve.service;

import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
import de.rwth.idsg.steve.repository.dto.ChargePointSelect;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

/**
Expand All @@ -32,10 +32,11 @@
*/
@RequiredArgsConstructor
public class BackgroundService {
private final ExecutorService executorService;

public static BackgroundService with(ExecutorService executorService) {
return new BackgroundService(executorService);
private final DelegatingTaskExecutor asyncTaskExecutor;

public static BackgroundService with(DelegatingTaskExecutor asyncTaskExecutor) {
return new BackgroundService(asyncTaskExecutor);
}

public Runner forFirst(List<ChargePointSelect> list) {
Expand All @@ -56,7 +57,7 @@ private class BackgroundSingleRunner implements Runner {

@Override
public void execute(Consumer<ChargePointSelect> consumer) {
executorService.execute(() -> consumer.accept(cps));
asyncTaskExecutor.execute(() -> consumer.accept(cps));
}
}

Expand All @@ -66,7 +67,7 @@ private class BackgroundListRunner implements Runner {

@Override
public void execute(Consumer<ChargePointSelect> consumer) {
executorService.execute(() -> list.forEach(consumer));
asyncTaskExecutor.execute(() -> list.forEach(consumer));
}
}
}
Loading