Skip to content
This repository was archived by the owner on Dec 26, 2024. It is now read-only.

Commit 05ccf54

Browse files
authored
fix(network): if all peers are blocked, act as if there are no peers (#2222)
1 parent 6403530 commit 05ccf54

File tree

6 files changed

+134
-26
lines changed

6 files changed

+134
-26
lines changed

Diff for: Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: crates/papyrus_network/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ path = "src/bin/streamed_bytes_benchmark.rs"
1616
[dependencies]
1717
async-stream.workspace = true
1818
bytes.workspace = true
19-
chrono.workspace = true
2019
defaultmap.workspace = true
2120
derive_more.workspace = true
2221
futures.workspace = true

Diff for: crates/papyrus_network/src/peer_manager/behaviour_impl.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::task::Poll;
1+
use std::task::{ready, Poll};
22

33
use libp2p::swarm::behaviour::ConnectionEstablished;
44
use libp2p::swarm::{
@@ -175,9 +175,19 @@ where
175175

176176
fn poll(
177177
&mut self,
178-
_cx: &mut std::task::Context<'_>,
178+
cx: &mut std::task::Context<'_>,
179179
) -> std::task::Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>>
180180
{
181+
if let Some(event) = self.pending_events.pop() {
182+
return Poll::Ready(event);
183+
}
184+
if let Some(sleep_future) = &mut self.sleep_waiting_for_unblocked_peer {
185+
ready!(sleep_future.as_mut().poll(cx));
186+
for outbound_session_id in std::mem::take(&mut self.sessions_received_when_no_peers) {
187+
self.assign_peer_to_session(outbound_session_id);
188+
}
189+
}
190+
self.sleep_waiting_for_unblocked_peer = None;
181191
self.pending_events.pop().map(Poll::Ready).unwrap_or(Poll::Pending)
182192
}
183193
}

Diff for: crates/papyrus_network/src/peer_manager/mod.rs

+32-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::collections::HashMap;
2+
use std::time::Duration;
23

3-
use chrono::Duration;
4+
use futures::future::BoxFuture;
5+
use futures::FutureExt;
46
use libp2p::swarm::dial_opts::DialOpts;
57
use libp2p::swarm::ToSwarm;
68
use libp2p::PeerId;
@@ -30,9 +32,11 @@ pub struct PeerManager<P: PeerTrait + 'static> {
3032
session_to_peer_map: HashMap<OutboundSessionId, PeerId>,
3133
config: PeerManagerConfig,
3234
last_peer_index: usize,
35+
// TODO(shahak): Change to VecDeque and awake when item is added.
3336
pending_events: Vec<ToSwarm<ToOtherBehaviourEvent, libp2p::swarm::THandlerInEvent<Self>>>,
3437
peers_pending_dial_with_sessions: HashMap<PeerId, Vec<OutboundSessionId>>,
3538
sessions_received_when_no_peers: Vec<OutboundSessionId>,
39+
sleep_waiting_for_unblocked_peer: Option<BoxFuture<'static, ()>>,
3640
}
3741

3842
#[derive(Clone)]
@@ -53,7 +57,11 @@ pub(crate) enum PeerManagerError {
5357

5458
impl Default for PeerManagerConfig {
5559
fn default() -> Self {
56-
Self { target_num_for_peers: 100, blacklist_timeout: Duration::max_value() }
60+
Self {
61+
target_num_for_peers: 100,
62+
// 1 year.
63+
blacklist_timeout: Duration::from_secs(3600 * 24 * 365),
64+
}
5765
}
5866
}
5967

@@ -72,13 +80,16 @@ where
7280
pending_events: Vec::new(),
7381
peers_pending_dial_with_sessions: HashMap::new(),
7482
sessions_received_when_no_peers: Vec::new(),
83+
sleep_waiting_for_unblocked_peer: None,
7584
}
7685
}
7786

7887
fn add_peer(&mut self, mut peer: P) {
7988
info!("Peer Manager found new peer {:?}", peer.peer_id());
8089
peer.set_timeout_duration(self.config.blacklist_timeout);
8190
self.peers.insert(peer.peer_id(), peer);
91+
// The new peer is unblocked so we don't need to wait for unblocked peer.
92+
self.sleep_waiting_for_unblocked_peer = None;
8293
for outbound_session_id in std::mem::take(&mut self.sessions_received_when_no_peers) {
8394
self.assign_peer_to_session(outbound_session_id);
8495
}
@@ -90,10 +101,12 @@ where
90101
}
91102

92103
// TODO(shahak): Remove return value and use events in tests.
104+
// TODO(shahak): Split this function for readability.
93105
fn assign_peer_to_session(&mut self, outbound_session_id: OutboundSessionId) -> Option<PeerId> {
94106
// TODO: consider moving this logic to be async (on a different tokio task)
95107
// until then we can return the assignment even if we use events for the notification.
96108
if self.peers.is_empty() {
109+
info!("No peers. Waiting for a new peer to be connected for {outbound_session_id:?}");
97110
self.sessions_received_when_no_peers.push(outbound_session_id);
98111
return None;
99112
}
@@ -106,6 +119,23 @@ where
106119
self.peers.iter().take(self.last_peer_index).find(|(_, peer)| !peer.is_blocked())
107120
});
108121
self.last_peer_index = (self.last_peer_index + 1) % self.peers.len();
122+
if peer.is_none() {
123+
info!(
124+
"No unblocked peers. Waiting for a new peer to be connected or for a peer to \
125+
become unblocked for {outbound_session_id:?}"
126+
);
127+
self.sessions_received_when_no_peers.push(outbound_session_id);
128+
// Find the peer closest to becoming unblocked.
129+
let sleep_deadline = self
130+
.peers
131+
.values()
132+
.map(|peer| peer.blocked_until())
133+
.min()
134+
.expect("min should not return None on a non-empty iterator");
135+
self.sleep_waiting_for_unblocked_peer =
136+
Some(tokio::time::sleep_until(sleep_deadline.into()).boxed());
137+
return None;
138+
}
109139
peer.map(|(peer_id, peer)| {
110140
// TODO: consider not allowing reassignment of the same session
111141
self.session_to_peer_map.insert(outbound_session_id, *peer_id);

Diff for: crates/papyrus_network/src/peer_manager/peer.rs

+14-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
// using chrono time and not std since std does not have the ability for std::time::Instance to
2-
// represent the maximum time of the system.
3-
use chrono::{DateTime, Duration, Utc};
1+
use std::time::{Duration, Instant};
2+
43
use libp2p::swarm::ConnectionId;
54
use libp2p::{Multiaddr, PeerId};
65
#[cfg(test)]
@@ -23,6 +22,9 @@ pub trait PeerTrait {
2322

2423
fn is_blocked(&self) -> bool;
2524

25+
/// Returns Instant::now if not blocked.
26+
fn blocked_until(&self) -> Instant;
27+
2628
fn connection_ids(&self) -> &Vec<ConnectionId>;
2729

2830
fn add_connection_id(&mut self, connection_id: ConnectionId);
@@ -34,7 +36,7 @@ pub trait PeerTrait {
3436
pub struct Peer {
3537
peer_id: PeerId,
3638
multiaddr: Multiaddr,
37-
timed_out_until: Option<DateTime<Utc>>,
39+
timed_out_until: Option<Instant>,
3840
timeout_duration: Option<Duration>,
3941
connection_ids: Vec<ConnectionId>,
4042
}
@@ -52,11 +54,10 @@ impl PeerTrait for Peer {
5254

5355
fn update_reputation(&mut self, _reason: ReputationModifier) {
5456
if let Some(timeout_duration) = self.timeout_duration {
55-
self.timed_out_until =
56-
Utc::now().checked_add_signed(timeout_duration).or(Some(DateTime::<Utc>::MAX_UTC));
57-
return;
57+
self.timed_out_until = Some(Instant::now() + timeout_duration);
58+
} else {
59+
debug!("Timeout duration not set for peer: {:?}", self.peer_id);
5860
}
59-
debug!("Timeout duration not set for peer: {:?}", self.peer_id);
6061
}
6162

6263
fn peer_id(&self) -> PeerId {
@@ -73,12 +74,16 @@ impl PeerTrait for Peer {
7374

7475
fn is_blocked(&self) -> bool {
7576
if let Some(timed_out_until) = self.timed_out_until {
76-
timed_out_until > Utc::now()
77+
timed_out_until > Instant::now()
7778
} else {
7879
false
7980
}
8081
}
8182

83+
fn blocked_until(&self) -> Instant {
84+
self.timed_out_until.unwrap_or_else(Instant::now)
85+
}
86+
8287
fn connection_ids(&self) -> &Vec<ConnectionId> {
8388
&self.connection_ids
8489
}

Diff for: crates/papyrus_network/src/peer_manager/test.rs

+76-11
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
// TODO(shahak): Add tests for multiple connection ids
22

33
use core::{panic, time};
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
use std::time::{Duration, Instant};
47

58
use assert_matches::assert_matches;
6-
use chrono::Duration;
79
use futures::future::poll_fn;
10+
use futures::{FutureExt, Stream, StreamExt};
811
use libp2p::swarm::behaviour::ConnectionEstablished;
912
use libp2p::swarm::{ConnectionId, NetworkBehaviour, ToSwarm};
1013
use libp2p::{Multiaddr, PeerId};
1114
use mockall::predicate::eq;
1215
use tokio::time::sleep;
16+
use void::Void;
1317

1418
use super::behaviour_impl::ToOtherBehaviourEvent;
1519
use crate::discovery::identify_impl::IdentifyToOtherBehaviourEvent;
@@ -19,6 +23,19 @@ use crate::peer_manager::peer::{MockPeerTrait, Peer, PeerTrait};
1923
use crate::peer_manager::{PeerManager, PeerManagerConfig, ReputationModifier};
2024
use crate::sqmr::OutboundSessionId;
2125

26+
impl<P: PeerTrait> Unpin for PeerManager<P> {}
27+
28+
impl<P: PeerTrait> Stream for PeerManager<P> {
29+
type Item = ToSwarm<ToOtherBehaviourEvent, Void>;
30+
31+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32+
match Pin::into_inner(self).poll(cx) {
33+
Poll::Pending => Poll::Pending,
34+
Poll::Ready(event) => Poll::Ready(Some(event)),
35+
}
36+
}
37+
}
38+
2239
#[test]
2340
fn peer_assignment_round_robin() {
2441
// Create a new peer manager
@@ -102,8 +119,8 @@ fn peer_assignment_round_robin() {
102119
}
103120
}
104121

105-
#[test]
106-
fn peer_assignment_no_peers() {
122+
#[tokio::test]
123+
async fn peer_assignment_no_peers() {
107124
// Create a new peer manager
108125
let config = PeerManagerConfig::default();
109126
let mut peer_manager: PeerManager<MockPeerTrait> = PeerManager::new(config.clone());
@@ -113,25 +130,72 @@ fn peer_assignment_no_peers() {
113130

114131
// Assign a peer to the session
115132
assert_matches!(peer_manager.assign_peer_to_session(outbound_session_id), None);
133+
assert!(peer_manager.next().now_or_never().is_none());
116134

117135
// Now the peer manager finds a new peer and can assign the session.
118136
let connection_id = ConnectionId::new_unchecked(0);
119137
let (mut peer, peer_id) =
120138
create_mock_peer(config.blacklist_timeout, false, Some(connection_id));
121139
peer.expect_is_blocked().times(1).return_const(false);
122140
peer_manager.add_peer(peer);
123-
assert_eq!(peer_manager.pending_events.len(), 1);
124141
assert_matches!(
125-
peer_manager.pending_events.first().unwrap(),
142+
peer_manager.next().await.unwrap(),
126143
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::SessionAssigned {
127144
outbound_session_id: event_outbound_session_id,
128145
peer_id: event_peer_id,
129146
connection_id: event_connection_id,
130147
}
131-
) if outbound_session_id == *event_outbound_session_id &&
132-
peer_id == *event_peer_id &&
133-
connection_id == *event_connection_id
148+
) if outbound_session_id == event_outbound_session_id &&
149+
peer_id == event_peer_id &&
150+
connection_id == event_connection_id
134151
);
152+
assert!(peer_manager.next().now_or_never().is_none());
153+
}
154+
155+
#[tokio::test]
156+
async fn peer_assignment_no_unblocked_peers() {
157+
const BLOCKED_UNTIL: Duration = Duration::from_secs(5);
158+
const TIMEOUT: Duration = Duration::from_secs(1);
159+
// Create a new peer manager
160+
let config = PeerManagerConfig::default();
161+
let mut peer_manager: PeerManager<MockPeerTrait> = PeerManager::new(config.clone());
162+
163+
// Create a session
164+
let outbound_session_id = OutboundSessionId { value: 1 };
165+
166+
// Create a peer
167+
let connection_id = ConnectionId::new_unchecked(0);
168+
let (mut peer, peer_id) = create_mock_peer(config.blacklist_timeout, true, Some(connection_id));
169+
peer.expect_is_blocked().times(1).return_const(true);
170+
peer.expect_is_blocked().times(1).return_const(false);
171+
peer.expect_blocked_until().times(1).returning(|| Instant::now() + BLOCKED_UNTIL);
172+
173+
peer_manager.add_peer(peer);
174+
peer_manager.report_peer(peer_id, ReputationModifier::Bad {}).unwrap();
175+
176+
// Try to assign a peer to the session, and check there wasn't any assignment.
177+
assert_matches!(peer_manager.assign_peer_to_session(outbound_session_id), None);
178+
assert!(peer_manager.next().now_or_never().is_none());
179+
180+
// Simulate that BLOCKED_UNTIL has passed.
181+
tokio::time::pause();
182+
tokio::time::advance(BLOCKED_UNTIL).await;
183+
tokio::time::resume();
184+
185+
// After BLOCKED_UNTIL has passed, the peer manager can assign the session.
186+
let event = tokio::time::timeout(TIMEOUT, peer_manager.next()).await.unwrap().unwrap();
187+
assert_matches!(
188+
event,
189+
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::SessionAssigned {
190+
outbound_session_id: event_outbound_session_id,
191+
peer_id: event_peer_id,
192+
connection_id: event_connection_id,
193+
}
194+
) if outbound_session_id == event_outbound_session_id &&
195+
peer_id == event_peer_id &&
196+
connection_id == event_connection_id
197+
);
198+
assert!(peer_manager.next().now_or_never().is_none());
135199
}
136200

137201
#[test]
@@ -155,7 +219,7 @@ fn report_peer_calls_update_reputation() {
155219
async fn peer_block_realeased_after_timeout() {
156220
const DURATION_IN_MILLIS: u64 = 50;
157221
let mut peer = Peer::new(PeerId::random(), Multiaddr::empty());
158-
peer.set_timeout_duration(Duration::milliseconds(DURATION_IN_MILLIS as i64));
222+
peer.set_timeout_duration(Duration::from_millis(DURATION_IN_MILLIS));
159223
peer.update_reputation(ReputationModifier::Bad {});
160224
assert!(peer.is_blocked());
161225
sleep(time::Duration::from_millis(DURATION_IN_MILLIS)).await;
@@ -236,15 +300,16 @@ fn more_peers_needed() {
236300
assert!(!peer_manager.more_peers_needed());
237301
}
238302

239-
#[test]
240-
fn timed_out_peer_not_assignable_to_queries() {
303+
#[tokio::test]
304+
async fn timed_out_peer_not_assignable_to_queries() {
241305
// Create a new peer manager
242306
let config = PeerManagerConfig::default();
243307
let mut peer_manager: PeerManager<MockPeerTrait> = PeerManager::new(config.clone());
244308

245309
// Create a mock peer
246310
let (mut peer, peer_id) = create_mock_peer(config.blacklist_timeout, true, None);
247311
peer.expect_is_blocked().times(1).return_const(true);
312+
peer.expect_blocked_until().times(1).returning(|| Instant::now() + Duration::from_secs(1));
248313

249314
// Add the mock peer to the peer manager
250315
peer_manager.add_peer(peer);

0 commit comments

Comments
 (0)