Skip to content

Commit

Permalink
fix(messaging): check and clear closed substreams before use (#917)
Browse files Browse the repository at this point in the history
Description
---
fix(messaging): check and clear closed substreams before use

Motivation and Context
---
Observed error in logs, when outbound message is being sent that
messaging channel, was closed. This PR ensures that we check this before
returning the channel back to users.

How Has This Been Tested?
---
Messaging behaviour needs more testing, Manually

What process can a PR reviewer use to test or verify this change?
---
VN can send messages

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Jan 26, 2024
1 parent bb0b311 commit 0062fcc
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
30 changes: 25 additions & 5 deletions networking/libp2p-messaging/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ where TCodec: Codec + Send + Clone + 'static
pub fn obtain_message_channel(&mut self, peer_id: PeerId) -> MessageSink<TCodec::Message> {
let stream_id = self.next_outbound_stream_id;

self.clear_closed_connections();
match self.get_connections(&peer_id) {
Some(connections) => {
// Return a currently active stream
Expand Down Expand Up @@ -142,6 +143,18 @@ where TCodec: Codec + Send + Clone + 'static
}
}

fn clear_closed_connections(&mut self) {
for connections in self.connected.values_mut() {
connections.clear_closed_connections();
}
self.connected.retain(|_, connections| !connections.is_empty());

// Shrink the capacity of empty queues if they exceed the threshold.
if self.connected.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.connected.shrink_to_fit();
}
}

fn next_outbound_stream_id(&mut self) -> StreamId {
let stream_id = self.next_outbound_stream_id;
self.next_outbound_stream_id = self.next_outbound_stream_id.wrapping_add(1);
Expand Down Expand Up @@ -361,22 +374,22 @@ struct Connections<TMsg> {
}

impl<TMsg> Connections<TMsg> {
fn new() -> Self {
pub(self) fn new() -> Self {
Self {
last_selected_index: 0,
connections: SmallVec::new(),
}
}

pub fn push(&mut self, connection: Connection<TMsg>) {
pub(self) fn push(&mut self, connection: Connection<TMsg>) {
self.connections.push(connection);
}

pub fn is_empty(&self) -> bool {
pub(self) fn is_empty(&self) -> bool {
self.connections.is_empty()
}

pub fn next_active_sink(&mut self) -> Option<&MessageSink<TMsg>> {
pub(self) fn next_active_sink(&mut self) -> Option<&MessageSink<TMsg>> {
let initial_last_selected = cmp::min(self.last_selected_index, self.connections.len() - 1);
let (last_index, sink) = cycle_once(self.connections.len(), initial_last_selected, |i| {
let conn = &self.connections[i];
Expand All @@ -387,7 +400,7 @@ impl<TMsg> Connections<TMsg> {
Some(sink)
}

pub fn next_pending_sink(&mut self) -> Option<&MessageSink<TMsg>> {
pub(self) fn next_pending_sink(&mut self) -> Option<&MessageSink<TMsg>> {
let initial_last_selected = cmp::min(self.last_selected_index, self.connections.len() - 1);
let (last_index, sink) = cycle_once(self.connections.len(), initial_last_selected, |i| {
let conn = &self.connections[i];
Expand All @@ -397,6 +410,13 @@ impl<TMsg> Connections<TMsg> {
self.last_selected_index = last_index;
Some(sink)
}

pub(self) fn clear_closed_connections(&mut self) {
self.connections.retain(|c| {
c.message_sink.as_ref().map_or(true, |s| !s.is_closed()) &&
c.pending_sink.as_ref().map_or(true, |s| !s.is_closed())
});
}
}

impl<TMsg> Default for Connections<TMsg> {
Expand Down
4 changes: 4 additions & 0 deletions networking/libp2p-messaging/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl<TMsg> MessageSink<TMsg> {
self.sender.unbounded_send(msg).map_err(|_| crate::Error::ChannelClosed)
}

pub fn is_closed(&self) -> bool {
self.sender.is_closed()
}

pub async fn send_all<TStream>(&mut self, stream: &mut TStream) -> Result<(), crate::Error>
where TStream: Stream<Item = Result<TMsg, mpsc::SendError>> + Unpin + ?Sized {
self.sender
Expand Down

0 comments on commit 0062fcc

Please sign in to comment.