Skip to content

Commit bbed518

Browse files
authored
Add method to prune pending requests in client (#82)
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
1 parent b6f04ee commit bbed518

File tree

3 files changed

+97
-8
lines changed

3 files changed

+97
-8
lines changed

rcljava/src/main/java/org/ros2/rcljava/client/Client.java

+14
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,20 @@ <U extends MessageDefinition, V extends MessageDefinition> ResponseFuture<V> asy
3838

3939
<V extends MessageDefinition> boolean removePendingRequest(ResponseFuture<V> future);
4040

41+
/**
42+
* Remove old pending requests that this client has done.
43+
* Future responses to removed requests will be ignored.
44+
*/
45+
long prunePendingRequests();
46+
47+
/**
48+
* Remove old pending requests that where done before the specified time point.
49+
* Future responses to removed requests will be ignored.
50+
*
51+
* @param nanoTime requests done before this timepoint will be removed.
52+
*/
53+
long prunePendingRequestsOlderThan(long nanoTime);
54+
4155
/**
4256
* Check if the service server is available.
4357
*

rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java

+50-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.lang.Long;
2323
import java.util.AbstractMap;
2424
import java.util.HashMap;
25+
import java.util.Iterator;
2526
import java.util.Map;
2627
import java.util.concurrent.Future;
2728
import java.util.concurrent.TimeUnit;
@@ -52,7 +53,25 @@ public class ClientImpl<T extends ServiceDefinition> implements Client<T> {
5253
private final WeakReference<Node> nodeReference;
5354
private long handle;
5455
private final String serviceName;
55-
private Map<Long, Map.Entry<Consumer, ResponseFuture>> pendingRequests;
56+
57+
private class PendingRequest
58+
{
59+
public Consumer callback;
60+
public ResponseFuture future;
61+
public long requestTimestamp;
62+
63+
public PendingRequest(
64+
final Consumer callback,
65+
final ResponseFuture future,
66+
final long requestTimestamp)
67+
{
68+
this.callback = callback;
69+
this.future = future;
70+
this.requestTimestamp = requestTimestamp;
71+
}
72+
}
73+
74+
private Map<Long, PendingRequest> pendingRequests;
5675

5776
private final ServiceDefinition serviceDefinition;
5877

@@ -66,7 +85,7 @@ public ClientImpl(
6685
this.handle = handle;
6786
this.serviceName = serviceName;
6887
this.serviceDefinition = serviceDefinition;
69-
this.pendingRequests = new HashMap<Long, Map.Entry<Consumer, ResponseFuture>>();
88+
this.pendingRequests = new HashMap<Long, PendingRequest>();
7089
}
7190

7291
public ServiceDefinition getServiceDefinition() {
@@ -88,8 +107,7 @@ public void accept(Future<V> input) {}
88107
request.getDestructorInstance(), request);
89108
ResponseFuture<V> future = new ResponseFuture<V>(sequenceNumber);
90109

91-
Map.Entry<Consumer, ResponseFuture> entry =
92-
new AbstractMap.SimpleEntry<Consumer, ResponseFuture>(callback, future);
110+
PendingRequest entry = new PendingRequest(callback, future, System.nanoTime());
93111
pendingRequests.put(sequenceNumber, entry);
94112
return future;
95113
}
@@ -98,20 +116,44 @@ public void accept(Future<V> input) {}
98116
public final <V extends MessageDefinition> boolean
99117
removePendingRequest(ResponseFuture<V> future) {
100118
synchronized (pendingRequests) {
101-
Map.Entry<Consumer, ResponseFuture> entry = pendingRequests.remove(
119+
PendingRequest entry = pendingRequests.remove(
102120
future.getRequestSequenceNumber());
103121
return entry != null;
104122
}
105123
}
106124

125+
public final long
126+
prunePendingRequests() {
127+
synchronized (pendingRequests) {
128+
long size = pendingRequests.size();
129+
pendingRequests.clear();
130+
return size;
131+
}
132+
}
133+
134+
public final long
135+
prunePendingRequestsOlderThan(long nanoTime) {
136+
synchronized (pendingRequests) {
137+
Iterator<Map.Entry<Long, PendingRequest>> iter = pendingRequests.entrySet().iterator();
138+
long removed = 0;
139+
while(iter.hasNext()) {
140+
if(iter.next().getValue().requestTimestamp < nanoTime) {
141+
iter.remove();
142+
++removed;
143+
}
144+
}
145+
return removed;
146+
}
147+
}
148+
107149
public final <U extends MessageDefinition> void handleResponse(
108150
final RMWRequestId header, final U response) {
109151
synchronized (pendingRequests) {
110152
long sequenceNumber = header.sequenceNumber;
111-
Map.Entry<Consumer, ResponseFuture> entry = pendingRequests.remove(sequenceNumber);
153+
PendingRequest entry = pendingRequests.remove(sequenceNumber);
112154
if (entry != null) {
113-
Consumer<Future> callback = entry.getKey();
114-
ResponseFuture<U> future = entry.getValue();
155+
Consumer<Future> callback = entry.callback;
156+
ResponseFuture<U> future = entry.future;
115157
future.set(response);
116158
callback.accept(future);
117159
return;

rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import org.ros2.rcljava.RCLJava;
3939
import org.ros2.rcljava.concurrent.RCLFuture;
40+
import org.ros2.rcljava.consumers.Consumer;
4041
import org.ros2.rcljava.consumers.TriConsumer;
4142
import org.ros2.rcljava.executors.Executor;
4243
import org.ros2.rcljava.node.Node;
@@ -157,4 +158,36 @@ public final void testRemovePendingRequest() throws Exception {
157158
assertTrue(client.removePendingRequest(responseFuture));
158159
assertFalse(client.removePendingRequest(responseFuture));
159160
}
161+
162+
@Test
163+
public final void testPrunePendingRequestsOlderThan() throws Exception {
164+
RCLFuture<rcljava.srv.AddTwoInts_Response> consumerFuture =
165+
new RCLFuture<rcljava.srv.AddTwoInts_Response>();
166+
167+
TestClientConsumer clientConsumer = new TestClientConsumer(consumerFuture);
168+
169+
Service<rcljava.srv.AddTwoInts> service = node.<rcljava.srv.AddTwoInts>createService(
170+
rcljava.srv.AddTwoInts.class, "add_two_ints", clientConsumer);
171+
172+
rcljava.srv.AddTwoInts_Request request = new rcljava.srv.AddTwoInts_Request();
173+
request.setA(2);
174+
request.setB(3);
175+
176+
Client<rcljava.srv.AddTwoInts> client =
177+
node.<rcljava.srv.AddTwoInts>createClient(
178+
rcljava.srv.AddTwoInts.class, "add_two_ints");
179+
180+
assertTrue(client.waitForService(Duration.ofSeconds(10)));
181+
182+
long oldNanoTime = System.nanoTime();
183+
ResponseFuture<rcljava.srv.AddTwoInts_Response> responseFuture = client.asyncSendRequest(
184+
request,
185+
new Consumer<Future<rcljava.srv.AddTwoInts_Response>>() {
186+
public final void accept(final Future<rcljava.srv.AddTwoInts_Response> futureResponse) {}
187+
});
188+
189+
assertEquals(0, client.prunePendingRequestsOlderThan(oldNanoTime));
190+
assertEquals(1, client.prunePendingRequestsOlderThan(System.nanoTime()));
191+
assertEquals(0, client.prunePendingRequestsOlderThan(System.nanoTime()));
192+
}
160193
}

0 commit comments

Comments
 (0)