Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/protocol/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub(crate) struct RequestResponseProtocol {
/// Pending outbound responses.
///
/// The future listens to a `oneshot::Sender` which is given to `RequestResponseHandle`.
/// If the request is accepted by the local node, the response is sent over the channel to the
/// If the request is accepted by the local node, the response is sent over the channel to
/// the future which sends it to remote peer and closes the substream.
///
/// If the substream is rejected by the local node, the `oneshot::Sender` is dropped which
Expand Down Expand Up @@ -457,7 +457,7 @@ impl RequestResponseProtocol {
Ok(())
}

/// Handle pending inbound response.
/// Handle pending inbound request.
async fn on_inbound_request(
&mut self,
peer: PeerId,
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/transport_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl Stream for KeepAliveTracker {
}
}

/// Provides an interfaces for [`Litep2p`](crate::Litep2p) protocols to interact
/// Provides an interface for [`Litep2p`](crate::Litep2p) protocols to interact
/// with the underlying transport protocols.
#[derive(Debug)]
pub struct TransportService {
Expand All @@ -279,7 +279,7 @@ pub struct TransportService {
/// Transport handle.
transport_handle: TransportManagerHandle,

/// RX channel for receiving events from tranports and connections.
/// RX channel for receiving events from transports and connections.
rx: Receiver<InnerTransportEvent>,

/// Next substream ID.
Expand Down
179 changes: 56 additions & 123 deletions src/substream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,8 +833,8 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = Pin::into_inner(self);

for (key, mut substream) in inner.substreams.iter_mut() {
match Pin::new(&mut substream).poll_next(cx) {
for (key, substream) in inner.substreams.iter_mut() {
match Pin::new(substream).poll_next(cx) {
Poll::Pending => continue,
Poll::Ready(Some(data)) => return Poll::Ready(Some((*key, data))),
Poll::Ready(None) =>
Expand All @@ -849,58 +849,36 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::{mock::substream::MockSubstream, PeerId};
use futures::{SinkExt, StreamExt};
use crate::{mock::substream::MockSubstream, utils::futures_stream::FuturesStream};
use futures::StreamExt;

async fn get_data_from_substream(
mut substream: MockSubstream,
) -> (Result<BytesMut, SubstreamError>, MockSubstream) {
let request = match substream.next().await {
Some(Ok(request)) => Ok(request),
Some(Err(error)) => Err(error),
None => Err(SubstreamError::ConnectionClosed),
};

(request, substream)
}

#[test]
fn add_substream() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();
let mut set = FuturesStream::new();

let peer = PeerId::random();
let substream = MockSubstream::new();
set.insert(peer, substream);
set.push(get_data_from_substream(substream));

let peer = PeerId::random();
let substream = MockSubstream::new();
set.insert(peer, substream);
}

#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn add_same_peer_twice() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

let peer = PeerId::random();
let substream1 = MockSubstream::new();
let substream2 = MockSubstream::new();

set.insert(peer, substream1);
set.insert(peer, substream2);
}

#[test]
fn remove_substream() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

let peer1 = PeerId::random();
let substream1 = MockSubstream::new();
set.insert(peer1, substream1);

let peer2 = PeerId::random();
let substream2 = MockSubstream::new();
set.insert(peer2, substream2);

assert!(set.remove(&peer1).is_some());
assert!(set.remove(&peer2).is_some());
assert!(set.remove(&PeerId::random()).is_none());
set.push(get_data_from_substream(substream));
}

#[tokio::test]
async fn poll_data_from_substream() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();
let mut set = FuturesStream::new();

let peer = PeerId::random();
let mut substream = MockSubstream::new();
substream
.expect_poll_next()
Expand All @@ -911,90 +889,45 @@ mod tests {
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..])))));
substream.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer, substream);

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..]));
set.push(get_data_from_substream(substream));
let (value, substream) = set.next().await.unwrap();
assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..]));

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"world"[..]));
set.push(get_data_from_substream(substream));
let (value, _substream) = set.next().await.unwrap();
assert_eq!(value.unwrap(), BytesMut::from(&b"world"[..]));

assert!(futures::poll!(set.next()).is_pending());
}

#[tokio::test]
async fn substream_closed() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();
let mut set = FuturesStream::new();

let peer = PeerId::random();
let mut substream = MockSubstream::new();
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..])))));
substream.expect_poll_next().times(1).return_once(|_| Poll::Ready(None));
substream.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer, substream);

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..]));

match set.next().await {
Some((exited_peer, Err(SubstreamError::ConnectionClosed))) => {
assert_eq!(peer, exited_peer);
}
_ => panic!("inavlid event received"),
}
}

#[tokio::test]
async fn get_mut_substream() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

let peer = PeerId::random();
let mut substream = MockSubstream::new();
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..])))));
substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(())));
substream.expect_start_send().times(1).return_once(|_| Ok(()));
substream.expect_poll_flush().times(1).return_once(|_| Poll::Ready(Ok(())));
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..])))));
substream.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer, substream);

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..]));

let substream = set.get_mut(&peer).unwrap();
substream.send(vec![1, 2, 3, 4].into()).await.unwrap();
set.push(get_data_from_substream(substream));

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"world"[..]));
let (value, substream) = set.next().await.unwrap();
assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..]));

// try to get non-existent substream
assert!(set.get_mut(&PeerId::random()).is_none());
set.push(get_data_from_substream(substream));
let (value, _substream) = set.next().await.unwrap();
assert_eq!(value, Err(SubstreamError::ConnectionClosed));
}

#[tokio::test]
async fn poll_data_from_two_substreams() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();
let mut set = FuturesStream::new();

// prepare first substream
let peer1 = PeerId::random();
let mut substream1 = MockSubstream::new();
substream1
.expect_poll_next()
Expand All @@ -1005,10 +938,9 @@ mod tests {
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..])))));
substream1.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer1, substream1);
set.push(get_data_from_substream(substream1));

// prepare second substream
let peer2 = PeerId::random();
let mut substream2 = MockSubstream::new();
substream2
.expect_poll_next()
Expand All @@ -1019,41 +951,42 @@ mod tests {
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"huup"[..])))));
substream2.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer2, substream2);
set.push(get_data_from_substream(substream2));

let expected: Vec<Vec<(PeerId, BytesMut)>> = vec![
let expected: Vec<Vec<BytesMut>> = vec![
vec![
(peer1, BytesMut::from(&b"hello"[..])),
(peer1, BytesMut::from(&b"world"[..])),
(peer2, BytesMut::from(&b"siip"[..])),
(peer2, BytesMut::from(&b"huup"[..])),
BytesMut::from(&b"hello"[..]),
BytesMut::from(&b"world"[..]),
BytesMut::from(&b"siip"[..]),
BytesMut::from(&b"huup"[..]),
],
vec![
(peer1, BytesMut::from(&b"hello"[..])),
(peer2, BytesMut::from(&b"siip"[..])),
(peer1, BytesMut::from(&b"world"[..])),
(peer2, BytesMut::from(&b"huup"[..])),
BytesMut::from(&b"hello"[..]),
BytesMut::from(&b"siip"[..]),
BytesMut::from(&b"world"[..]),
BytesMut::from(&b"huup"[..]),
],
vec![
(peer2, BytesMut::from(&b"siip"[..])),
(peer2, BytesMut::from(&b"huup"[..])),
(peer1, BytesMut::from(&b"hello"[..])),
(peer1, BytesMut::from(&b"world"[..])),
BytesMut::from(&b"siip"[..]),
BytesMut::from(&b"huup"[..]),
BytesMut::from(&b"hello"[..]),
BytesMut::from(&b"world"[..]),
],
vec![
(peer1, BytesMut::from(&b"hello"[..])),
(peer2, BytesMut::from(&b"siip"[..])),
(peer2, BytesMut::from(&b"huup"[..])),
(peer1, BytesMut::from(&b"world"[..])),
BytesMut::from(&b"hello"[..]),
BytesMut::from(&b"siip"[..]),
BytesMut::from(&b"huup"[..]),
BytesMut::from(&b"world"[..]),
],
];

// poll values
let mut values = Vec::new();

for _ in 0..4 {
let value = set.next().await.unwrap();
values.push((value.0, value.1.unwrap()));
let (value, substream) = set.next().await.unwrap();
values.push(value.unwrap());
set.push(get_data_from_substream(substream));
}

let mut correct_found = false;
Expand Down
Loading