Skip to content

Commit dc1612d

Browse files
committed
Don't close channel when receiving a STOP_SENDING frame
Motivation: We need to ensure we correctly handle the situation when receiving a STOP_SENDING frame. Due a bug we fired an exception trough the pipeline and closed the channel. Modifications: - When receiving a STOP_SENDING frame just faill all writes without closing the channel Result: Fixes bug reported in netty/netty-incubator-codec-http3#262
1 parent 0b97e81 commit dc1612d

File tree

3 files changed

+92
-1
lines changed

3 files changed

+92
-1
lines changed

codec-classes-quic/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,10 @@ private boolean handleWritableStreams() {
992992
QuicheQuicStreamChannel streamChannel = streams.get(streamId);
993993
if (streamChannel != null) {
994994
int capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
995-
if (capacity < 0) {
995+
if (capacity < 0 &&
996+
// If we received a STOP_SENDING frame we still want to call writable(...)
997+
// to force all data to be written and so failed.
998+
capacity != Quiche.QUICHE_ERR_STREAM_STOPPED) {
996999
if (!Quiche.quiche_conn_stream_finished(connAddr, streamId)) {
9971000
// Only fire an exception if the error was not caused because the stream is
9981001
// considered finished.

codec-classes-quic/src/main/java/io/netty/incubator/codec/quic/QuicheQuicStreamChannel.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,12 @@ boolean writeQueued() {
642642
return written;
643643
}
644644
} catch (Exception e) {
645+
if (e instanceof QuicException && (
646+
(QuicException) e).error() == QuicError.STREAM_STOPPED) {
647+
// Once its signaled that the stream is stopped we can just fail everything.
648+
queue.removeAndFailAll(e);
649+
break;
650+
}
645651
queue.remove().setFailure(e);
646652
continue;
647653
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2023 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.incubator.codec.quic;
17+
18+
import io.netty.buffer.ByteBuf;
19+
import io.netty.buffer.Unpooled;
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.ChannelFuture;
22+
import io.netty.channel.ChannelFutureListener;
23+
import io.netty.channel.ChannelHandlerContext;
24+
import io.netty.channel.ChannelInboundHandlerAdapter;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.MethodSource;
27+
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.Executor;
30+
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
33+
public class QuicStreamShutdownTest extends AbstractQuicTest {
34+
35+
@ParameterizedTest
36+
@MethodSource("newSslTaskExecutors")
37+
public void testShutdownInputClosureCausesStreamStopped(Executor executor) throws Throwable {
38+
Channel server = null;
39+
Channel channel = null;
40+
CountDownLatch latch = new CountDownLatch(2);
41+
try {
42+
server = QuicTestUtils.newServer(executor, new ChannelInboundHandlerAdapter(), new ChannelInboundHandlerAdapter() {
43+
@Override
44+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
45+
ChannelFutureListener futureListener = new ChannelFutureListener() {
46+
@Override
47+
public void operationComplete(ChannelFuture channelFuture) {
48+
Throwable cause = channelFuture.cause();
49+
if (cause instanceof QuicException &&
50+
((QuicException) cause).error() == QuicError.STREAM_STOPPED) {
51+
latch.countDown();
52+
}
53+
}
54+
};
55+
ByteBuf buffer = (ByteBuf) msg;
56+
ctx.write(buffer.retainedDuplicate()).addListener(futureListener);
57+
ctx.writeAndFlush(buffer).addListener(futureListener);
58+
}
59+
});
60+
channel = QuicTestUtils.newClient(executor);
61+
QuicChannel quicChannel = QuicTestUtils.newQuicChannelBootstrap(channel)
62+
.handler(new ChannelInboundHandlerAdapter())
63+
.streamHandler(new ChannelInboundHandlerAdapter())
64+
.remoteAddress(server.localAddress())
65+
.connect()
66+
.get();
67+
68+
QuicStreamChannel streamChannel = quicChannel.createStream(QuicStreamType.BIDIRECTIONAL,
69+
new ChannelInboundHandlerAdapter()).sync().getNow();
70+
streamChannel.shutdownInput().sync();
71+
assertTrue(streamChannel.isInputShutdown());
72+
streamChannel.writeAndFlush(Unpooled.buffer().writeLong(8)).sync();
73+
74+
latch.await();
75+
} finally {
76+
QuicTestUtils.closeIfNotNull(channel);
77+
QuicTestUtils.closeIfNotNull(server);
78+
79+
shutdown(executor);
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)