Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extract interfaces #1691

Merged
merged 2 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandl

public static final String CHARGEBOX_ID_KEY = "CHARGEBOX_ID_KEY";

private final SessionContextStore sessionContextStore = new SessionContextStore();
private final SessionContextStore sessionContextStore = new SessionContextStoreImpl();
private final List<Consumer<String>> connectedCallbackList = new ArrayList<>();
private final List<Consumer<String>> disconnectedCallbackList = new ArrayList<>();
private final Object sessionContextLock = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,20 @@
package de.rwth.idsg.steve.ocpp.ws;

import de.rwth.idsg.steve.ocpp.ws.data.FutureResponseContext;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketSession;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;

/**
* Presumption: The responses must be sent using the same connection as the requests!
*
* @author Sevket Goekay <[email protected]>
* @since 21.03.2015
* @since 08.02.2025
*/
@Slf4j
@Service
public class FutureResponseContextStore {

// We store for each chargeBox connection, multiple pairs of (messageId, context)
// (session, (messageId, context))
private final Map<WebSocketSession, Map<String, FutureResponseContext>> lookupTable = new ConcurrentHashMap<>();

public void addSession(WebSocketSession session) {
addIfAbsent(session);
}

public void removeSession(WebSocketSession session) {
log.debug("Deleting the store for sessionId '{}'", session.getId());
lookupTable.remove(session);
}

public void add(WebSocketSession session, String messageId, FutureResponseContext context) {
Map<String, FutureResponseContext> map = addIfAbsent(session);
map.put(messageId, context);
log.debug("Store size for sessionId '{}': {}", session.getId(), map.size());
}

@Nullable
public FutureResponseContext get(WebSocketSession session, String messageId) {
RemoveFunction removeFunction = new RemoveFunction(messageId);
lookupTable.computeIfPresent(session, removeFunction);
return removeFunction.removedContext;
}
public interface FutureResponseContextStore {

private Map<String, FutureResponseContext> addIfAbsent(WebSocketSession session) {
return lookupTable.computeIfAbsent(session, innerSession -> {
log.debug("Creating new store for sessionId '{}'", innerSession.getId());
return new ConcurrentHashMap<>();
});
}
void addSession(WebSocketSession session);

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
private static class RemoveFunction implements
BiFunction<WebSocketSession, Map<String, FutureResponseContext>, Map<String, FutureResponseContext>> {
void removeSession(WebSocketSession session);

private final String messageId;
@Nullable private FutureResponseContext removedContext;
void add(WebSocketSession session, String messageId, FutureResponseContext context);

@Override
public Map<String, FutureResponseContext> apply(WebSocketSession session,
Map<String, FutureResponseContext> map) {
removedContext = map.remove(messageId);
log.debug("Store size for sessionId '{}': {}", session.getId(), map.size());
return map;
}
}
@Nullable FutureResponseContext get(WebSocketSession session, String messageId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
* Copyright (C) 2013-2025 SteVe Community Team
* All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.rwth.idsg.steve.ocpp.ws;

import de.rwth.idsg.steve.ocpp.ws.data.FutureResponseContext;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketSession;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;

/**
* Presumption: The responses must be sent using the same connection as the requests!
*
* @author Sevket Goekay <[email protected]>
* @since 21.03.2015
*/
@Slf4j
@Service
public class FutureResponseContextStoreImpl implements FutureResponseContextStore {

// We store for each chargeBox connection, multiple pairs of (messageId, context)
// (session, (messageId, context))
private final Map<WebSocketSession, Map<String, FutureResponseContext>> lookupTable = new ConcurrentHashMap<>();

@Override
public void addSession(WebSocketSession session) {
addIfAbsent(session);
}

@Override
public void removeSession(WebSocketSession session) {
log.debug("Deleting the store for sessionId '{}'", session.getId());
lookupTable.remove(session);
}

@Override
public void add(WebSocketSession session, String messageId, FutureResponseContext context) {
Map<String, FutureResponseContext> map = addIfAbsent(session);
map.put(messageId, context);
log.debug("Store size for sessionId '{}': {}", session.getId(), map.size());
}

@Nullable
@Override
public FutureResponseContext get(WebSocketSession session, String messageId) {
RemoveFunction removeFunction = new RemoveFunction(messageId);
lookupTable.computeIfPresent(session, removeFunction);
return removeFunction.removedContext;
}

private Map<String, FutureResponseContext> addIfAbsent(WebSocketSession session) {
return lookupTable.computeIfAbsent(session, innerSession -> {
log.debug("Creating new store for sessionId '{}'", innerSession.getId());
return new ConcurrentHashMap<>();
});
}

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
private static class RemoveFunction implements
BiFunction<WebSocketSession, Map<String, FutureResponseContext>, Map<String, FutureResponseContext>> {

private final String messageId;
@Nullable private FutureResponseContext removedContext;

@Override
public Map<String, FutureResponseContext> apply(WebSocketSession session,
Map<String, FutureResponseContext> map) {
removedContext = map.remove(messageId);
log.debug("Store size for sessionId '{}': {}", session.getId(), map.size());
return map;
}
}
}
122 changes: 9 additions & 113 deletions src/main/java/de/rwth/idsg/steve/ocpp/ws/SessionContextStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,135 +18,31 @@
*/
package de.rwth.idsg.steve.ocpp.ws;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Striped;
import de.rwth.idsg.steve.SteveException;
import de.rwth.idsg.steve.ocpp.ws.custom.WsSessionSelectStrategy;
import de.rwth.idsg.steve.ocpp.ws.data.SessionContext;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.web.socket.WebSocketSession;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;

import static de.rwth.idsg.steve.SteveConfiguration.CONFIG;

/**
* @author Sevket Goekay <[email protected]>
* @since 17.03.2015
* @since 08.02.2025
*/
@Slf4j
public class SessionContextStore {

/**
* Key (String) = chargeBoxId
* Value (Deque<SessionContext>) = WebSocket session contexts
*/
private final ConcurrentHashMap<String, Deque<SessionContext>> lookupTable = new ConcurrentHashMap<>();

private final Striped<Lock> locks = Striped.lock(16);

private final WsSessionSelectStrategy wsSessionSelectStrategy = CONFIG.getOcpp().getWsSessionSelectStrategy();

public void add(String chargeBoxId, WebSocketSession session, ScheduledFuture pingSchedule) {
Lock l = locks.get(chargeBoxId);
l.lock();
try {
SessionContext context = new SessionContext(session, pingSchedule, DateTime.now());

Deque<SessionContext> endpointDeque = lookupTable.computeIfAbsent(chargeBoxId, str -> new ArrayDeque<>());
endpointDeque.addLast(context); // Adding at the end

log.debug("A new SessionContext is stored for chargeBoxId '{}'. Store size: {}",
chargeBoxId, endpointDeque.size());
} finally {
l.unlock();
}
}

public void remove(String chargeBoxId, WebSocketSession session) {
Lock l = locks.get(chargeBoxId);
l.lock();
try {
Deque<SessionContext> endpointDeque = lookupTable.get(chargeBoxId);
if (endpointDeque == null) {
log.debug("No session context to remove for chargeBoxId '{}'", chargeBoxId);
return;
}
public interface SessionContextStore {

// Prevent "java.util.ConcurrentModificationException: null"
// Reason: Cannot modify the set (remove the item) we are iterating
// Solution: Iterate the set, find the item, remove the item after the for-loop
//
SessionContext toRemove = null;
for (SessionContext context : endpointDeque) {
if (context.getSession().getId().equals(session.getId())) {
toRemove = context;
break;
}
}
void add(String chargeBoxId, WebSocketSession session, ScheduledFuture pingSchedule);

if (toRemove != null) {
// 1. Cancel the ping task
toRemove.getPingSchedule().cancel(true);
// 2. Delete from collection
if (endpointDeque.remove(toRemove)) {
log.debug("A SessionContext is removed for chargeBoxId '{}'. Store size: {}",
chargeBoxId, endpointDeque.size());
}
// 3. Delete empty collection from lookup table in order to correctly calculate
// the number of connected chargeboxes with getNumberOfChargeBoxes()
if (endpointDeque.size() == 0) {
lookupTable.remove(chargeBoxId);
}
}
} finally {
l.unlock();
}
}
void remove(String chargeBoxId, WebSocketSession session);

public WebSocketSession getSession(String chargeBoxId) {
Lock l = locks.get(chargeBoxId);
l.lock();
try {
Deque<SessionContext> endpointDeque = lookupTable.get(chargeBoxId);
if (endpointDeque == null) {
throw new NoSuchElementException();
}
return wsSessionSelectStrategy.getSession(endpointDeque);
} catch (NoSuchElementException e) {
throw new SteveException("No session context for chargeBoxId '%s'", chargeBoxId, e);
} finally {
l.unlock();
}
}
WebSocketSession getSession(String chargeBoxId);

public int getSize(String chargeBoxId) {
Deque<SessionContext> endpointDeque = lookupTable.get(chargeBoxId);
if (endpointDeque == null) {
return 0;
} else {
return endpointDeque.size();
}
}
int getSize(String chargeBoxId);

public int getNumberOfChargeBoxes() {
return lookupTable.size();
}
int getNumberOfChargeBoxes();

public List<String> getChargeBoxIdList() {
return Collections.list(lookupTable.keys());
}
List<String> getChargeBoxIdList();

public Map<String, Deque<SessionContext>> getACopy() {
return ImmutableMap.copyOf(lookupTable);
}
Map<String, Deque<SessionContext>> getACopy();
}
Loading