Skip to content

Commit 5b95d1d

Browse files
Gleb Natapovavikivity
authored andcommitted
rpc stream: do not abort stream queue if stream connection was closed without error
queue::abort() drops all queued packets and report an error to a consumer. If stream connection completes normally we want the consumer to get all the data without errors, so abort the queue only in case of an error. Otherwise the queue will wait to be consumed. Since closing the stream involves sending a special EOS packet the consumer should not hang since the queue will not be empty. Fixes: #2612 Message-ID: <[email protected]>
1 parent b0a9f89 commit 5b95d1d

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

src/rpc/rpc.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,8 +1022,10 @@ namespace rpc {
10221022
}
10231023
}
10241024
}
1025+
if (is_stream() && (ep || _error)) {
1026+
_stream_queue.abort(std::make_exception_ptr(stream_closed()));
1027+
}
10251028
_error = true;
1026-
_stream_queue.abort(std::make_exception_ptr(stream_closed()));
10271029
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
10281030
f.ignore_ready_future();
10291031
_outstanding.clear();
@@ -1244,8 +1246,10 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
12441246
format("server{} connection dropped", is_stream() ? " stream" : "").c_str(), ep);
12451247
}
12461248
_fd.shutdown_input();
1249+
if (is_stream() && (ep || _error)) {
1250+
_stream_queue.abort(std::make_exception_ptr(stream_closed()));
1251+
}
12471252
_error = true;
1248-
_stream_queue.abort(std::make_exception_ptr(stream_closed()));
12491253
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
12501254
f.ignore_ready_future();
12511255
get_server()._conns.erase(get_connection_id());

0 commit comments

Comments
 (0)