From 0062fcc18aebedc4b6e1ff44dcb36ffdc2f76e28 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Fri, 26 Jan 2024 14:33:19 +0400 Subject: [PATCH] fix(messaging): check and clear closed substreams before use (#917) Description --- fix(messaging): check and clear closed substreams before use Motivation and Context --- Observed error in logs, when outbound message is being sent that messaging channel, was closed. This PR ensures that we check this before returning the channel back to users. How Has This Been Tested? --- Messaging behaviour needs more testing, Manually What process can a PR reviewer use to test or verify this change? --- VN can send messages Breaking Changes --- - [x] None - [ ] Requires data directory to be deleted - [ ] Other - Please specify --- networking/libp2p-messaging/src/behaviour.rs | 30 ++++++++++++++++---- networking/libp2p-messaging/src/stream.rs | 4 +++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/networking/libp2p-messaging/src/behaviour.rs b/networking/libp2p-messaging/src/behaviour.rs index 29f62a6ae..04f9f0d0b 100644 --- a/networking/libp2p-messaging/src/behaviour.rs +++ b/networking/libp2p-messaging/src/behaviour.rs @@ -85,6 +85,7 @@ where TCodec: Codec + Send + Clone + 'static pub fn obtain_message_channel(&mut self, peer_id: PeerId) -> MessageSink { let stream_id = self.next_outbound_stream_id; + self.clear_closed_connections(); match self.get_connections(&peer_id) { Some(connections) => { // Return a currently active stream @@ -142,6 +143,18 @@ where TCodec: Codec + Send + Clone + 'static } } + fn clear_closed_connections(&mut self) { + for connections in self.connected.values_mut() { + connections.clear_closed_connections(); + } + self.connected.retain(|_, connections| !connections.is_empty()); + + // Shrink the capacity of empty queues if they exceed the threshold. + if self.connected.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.connected.shrink_to_fit(); + } + } + fn next_outbound_stream_id(&mut self) -> StreamId { let stream_id = self.next_outbound_stream_id; self.next_outbound_stream_id = self.next_outbound_stream_id.wrapping_add(1); @@ -361,22 +374,22 @@ struct Connections { } impl Connections { - fn new() -> Self { + pub(self) fn new() -> Self { Self { last_selected_index: 0, connections: SmallVec::new(), } } - pub fn push(&mut self, connection: Connection) { + pub(self) fn push(&mut self, connection: Connection) { self.connections.push(connection); } - pub fn is_empty(&self) -> bool { + pub(self) fn is_empty(&self) -> bool { self.connections.is_empty() } - pub fn next_active_sink(&mut self) -> Option<&MessageSink> { + pub(self) fn next_active_sink(&mut self) -> Option<&MessageSink> { let initial_last_selected = cmp::min(self.last_selected_index, self.connections.len() - 1); let (last_index, sink) = cycle_once(self.connections.len(), initial_last_selected, |i| { let conn = &self.connections[i]; @@ -387,7 +400,7 @@ impl Connections { Some(sink) } - pub fn next_pending_sink(&mut self) -> Option<&MessageSink> { + pub(self) fn next_pending_sink(&mut self) -> Option<&MessageSink> { let initial_last_selected = cmp::min(self.last_selected_index, self.connections.len() - 1); let (last_index, sink) = cycle_once(self.connections.len(), initial_last_selected, |i| { let conn = &self.connections[i]; @@ -397,6 +410,13 @@ impl Connections { self.last_selected_index = last_index; Some(sink) } + + pub(self) fn clear_closed_connections(&mut self) { + self.connections.retain(|c| { + c.message_sink.as_ref().map_or(true, |s| !s.is_closed()) && + c.pending_sink.as_ref().map_or(true, |s| !s.is_closed()) + }); + } } impl Default for Connections { diff --git a/networking/libp2p-messaging/src/stream.rs b/networking/libp2p-messaging/src/stream.rs index f4c0665f5..b76335f70 100644 --- a/networking/libp2p-messaging/src/stream.rs +++ b/networking/libp2p-messaging/src/stream.rs @@ -71,6 +71,10 @@ impl MessageSink { self.sender.unbounded_send(msg).map_err(|_| crate::Error::ChannelClosed) } + pub fn is_closed(&self) -> bool { + self.sender.is_closed() + } + pub async fn send_all(&mut self, stream: &mut TStream) -> Result<(), crate::Error> where TStream: Stream> + Unpin + ?Sized { self.sender