Skip to content

Commit 7d08882

Browse files
authored
Clear internal stream map on connection/stream close. (#12212)
1 parent c8b6bb3 commit 7d08882

File tree

3 files changed

+83
-3
lines changed

3 files changed

+83
-3
lines changed

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ void setTickRate(long tickRateInMilliseconds) {
109109
this.tickRate = tickRateInMilliseconds;
110110
}
111111

112+
// For testing purposes
113+
Map<String,Observable> getStreamMap() {
114+
return this.streamMap;
115+
}
116+
112117
TransportEnum getTransportEnum() {
113118
return this.transportEnum;
114119
}
@@ -517,6 +522,7 @@ private void stopConnection(String errorMessage) {
517522
connectionId = null;
518523
transportEnum = TransportEnum.ALL;
519524
this.localHeaders.clear();
525+
this.streamMap.clear();
520526
} finally {
521527
hubConnectionStateLock.unlock();
522528
}
@@ -575,8 +581,14 @@ void launchStreams(List<String> streamIds) {
575581
Observable observable = this.streamMap.get(streamId);
576582
observable.subscribe(
577583
(item) -> sendHubMessage(new StreamItem(streamId, item)),
578-
(error) -> sendHubMessage(new CompletionMessage(streamId, null, error.toString())),
579-
() -> sendHubMessage(new CompletionMessage(streamId, null, null)));
584+
(error) -> {
585+
sendHubMessage(new CompletionMessage(streamId, null, error.toString()));
586+
this.streamMap.remove(streamId);
587+
},
588+
() -> {
589+
sendHubMessage(new CompletionMessage(streamId, null, null));
590+
this.streamMap.remove(streamId);
591+
});
580592
}
581593
}
582594

@@ -599,7 +611,6 @@ Object[] checkUploadStream(Object[] args, List<String> streamIds) {
599611
return params.toArray();
600612
}
601613

602-
603614
/**
604615
* Invokes a hub method on the server using the specified method name and arguments.
605616
*

src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,74 @@ public void checkStreamUploadThroughSendWithArgs() {
515515
assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[3]);
516516
}
517517

518+
@Test
519+
public void streamMapIsClearedOnClose() {
520+
MockTransport mockTransport = new MockTransport();
521+
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
522+
523+
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
524+
525+
ReplaySubject<String> stream = ReplaySubject.create();
526+
hubConnection.send("UploadStream", stream, 12);
527+
528+
stream.onNext("FirstItem");
529+
String[] messages = mockTransport.getSentMessages();
530+
assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[12],\"streamIds\":[\"1\"]}\u001E", messages[1]);
531+
assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[2]);
532+
533+
stream.onComplete();
534+
messages = mockTransport.getSentMessages();
535+
assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[3]);
536+
537+
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
538+
539+
assertTrue(hubConnection.getStreamMap().isEmpty());
540+
}
541+
542+
@Test
543+
public void streamMapEntriesRemovedOnStreamClose() {
544+
MockTransport mockTransport = new MockTransport();
545+
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
546+
547+
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
548+
549+
ReplaySubject<String> stream = ReplaySubject.create();
550+
hubConnection.send("UploadStream", stream, 12);
551+
552+
ReplaySubject<String> secondStream = ReplaySubject.create();
553+
hubConnection.send("SecondUploadStream", secondStream, 13);
554+
555+
556+
stream.onNext("FirstItem");
557+
secondStream.onNext("SecondItem");
558+
String[] messages = mockTransport.getSentMessages();
559+
assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[12],\"streamIds\":[\"1\"]}\u001E", messages[1]);
560+
assertEquals("{\"type\":1,\"target\":\"SecondUploadStream\",\"arguments\":[13],\"streamIds\":[\"2\"]}\u001E", messages[2]);
561+
assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[3]);
562+
assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"SecondItem\"}\u001E", messages[4]);
563+
564+
565+
assertEquals(2, hubConnection.getStreamMap().size());
566+
assertTrue(hubConnection.getStreamMap().keySet().contains("1"));
567+
assertTrue(hubConnection.getStreamMap().keySet().contains("2"));
568+
569+
// Verify that we clear the entry from the stream map after we clear the first stream.
570+
stream.onComplete();
571+
assertEquals(1, hubConnection.getStreamMap().size());
572+
assertTrue(hubConnection.getStreamMap().keySet().contains("2"));
573+
574+
secondStream.onError(new Exception("Exception"));
575+
assertEquals(0, hubConnection.getStreamMap().size());
576+
assertTrue(hubConnection.getStreamMap().isEmpty());
577+
578+
messages = mockTransport.getSentMessages();
579+
assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[5]);
580+
assertEquals("{\"type\":3,\"invocationId\":\"2\",\"error\":\"java.lang.Exception: Exception\"}\u001E", messages[6]);
581+
582+
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
583+
assertTrue(hubConnection.getStreamMap().isEmpty());
584+
}
585+
518586
@Test
519587
public void useSameSubjectMultipleTimes() {
520588
MockTransport mockTransport = new MockTransport();

src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.junit.jupiter.params.provider.Arguments;
1313
import org.junit.jupiter.params.provider.MethodSource;
1414

15+
1516
class WebSocketTransportUrlFormatTest {
1617
private static Stream<Arguments> protocols() {
1718
return Stream.of(

0 commit comments

Comments
 (0)