Skip to content

Commit f80c714

Browse files
authored
feat: move Identify I/O from NetworkBehaviour to ConnectionHandler (#3208)
Addresses #2885
1 parent c39d25e commit f80c714

File tree

4 files changed

+292
-226
lines changed

4 files changed

+292
-226
lines changed

protocols/identify/CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
# 0.42.0 [unreleased]
22

3+
- Move I/O from `Behaviour` to `Handler`. Handle `Behaviour`'s Identify and Push requests independently by incoming order,
4+
previously Push requests were prioritized. see [PR 3208].
5+
36
- Update to `libp2p-swarm` `v0.42.0`.
47

8+
[PR 3208]: https://github.com/libp2p/rust-libp2p/pull/3208
9+
510
# 0.41.0
611

712
- Change default `cache_size` of `Config` to 100. See [PR 2995].

protocols/identify/src/behaviour.rs

+96-141
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,21 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use crate::handler::{self, Proto, Push};
22-
use crate::protocol::{Info, ReplySubstream, UpgradeError};
23-
use futures::prelude::*;
21+
use crate::handler::{self, InEvent, Proto};
22+
use crate::protocol::{Info, Protocol, UpgradeError};
2423
use libp2p_core::{
25-
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey,
24+
connection::ConnectionId, multiaddr, ConnectedPoint, Multiaddr, PeerId, PublicKey,
2625
};
2726
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
2827
use libp2p_swarm::{
2928
dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError,
30-
IntoConnectionHandler, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction,
31-
NotifyHandler, PollParameters,
29+
IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
3230
};
3331
use lru::LruCache;
3432
use std::num::NonZeroUsize;
3533
use std::{
3634
collections::{HashMap, HashSet, VecDeque},
3735
iter::FromIterator,
38-
pin::Pin,
3936
task::Context,
4037
task::Poll,
4138
time::Duration,
@@ -51,30 +48,23 @@ pub struct Behaviour {
5148
config: Config,
5249
/// For each peer we're connected to, the observed address to send back to it.
5350
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
54-
/// Pending replies to send.
55-
pending_replies: VecDeque<Reply>,
51+
/// Pending requests to be fulfilled, either `Handler` requests for `Behaviour` info
52+
/// to address identification requests, or push requests to peers
53+
/// with current information about the local peer.
54+
requests: Vec<Request>,
5655
/// Pending events to be emitted when polled.
5756
events: VecDeque<NetworkBehaviourAction<Event, Proto>>,
58-
/// Peers to which an active push with current information about
59-
/// the local peer should be sent.
60-
pending_push: HashSet<PeerId>,
6157
/// The addresses of all peers that we have discovered.
6258
discovered_peers: PeerCache,
6359
}
6460

65-
/// A pending reply to an inbound identification request.
66-
enum Reply {
67-
/// The reply is queued for sending.
68-
Queued {
69-
peer: PeerId,
70-
io: ReplySubstream<NegotiatedSubstream>,
71-
observed: Multiaddr,
72-
},
73-
/// The reply is being sent.
74-
Sending {
75-
peer: PeerId,
76-
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
77-
},
61+
/// A `Behaviour` request to be fulfilled, either `Handler` requests for `Behaviour` info
62+
/// to address identification requests, or push requests to peers
63+
/// with current information about the local peer.
64+
#[derive(Debug, PartialEq, Eq)]
65+
struct Request {
66+
peer_id: PeerId,
67+
protocol: Protocol,
7868
}
7969

8070
/// Configuration for the [`identify::Behaviour`](Behaviour).
@@ -184,9 +174,8 @@ impl Behaviour {
184174
Self {
185175
config,
186176
connected: HashMap::new(),
187-
pending_replies: VecDeque::new(),
177+
requests: Vec::new(),
188178
events: VecDeque::new(),
189-
pending_push: HashSet::new(),
190179
discovered_peers,
191180
}
192181
}
@@ -197,7 +186,13 @@ impl Behaviour {
197186
I: IntoIterator<Item = PeerId>,
198187
{
199188
for p in peers {
200-
if self.pending_push.insert(p) && !self.connected.contains_key(&p) {
189+
let request = Request {
190+
peer_id: p,
191+
protocol: Protocol::Push,
192+
};
193+
if !self.requests.contains(&request) {
194+
self.requests.push(request);
195+
201196
let handler = self.new_handler();
202197
self.events.push_back(NetworkBehaviourAction::Dial {
203198
opts: DialOpts::peer_id(p).build(),
@@ -240,13 +235,19 @@ impl NetworkBehaviour for Behaviour {
240235
type OutEvent = Event;
241236

242237
fn new_handler(&mut self) -> Self::ConnectionHandler {
243-
Proto::new(self.config.initial_delay, self.config.interval)
238+
Proto::new(
239+
self.config.initial_delay,
240+
self.config.interval,
241+
self.config.local_public_key.clone(),
242+
self.config.protocol_version.clone(),
243+
self.config.agent_version.clone(),
244+
)
244245
}
245246

246247
fn on_connection_handler_event(
247248
&mut self,
248249
peer_id: PeerId,
249-
connection: ConnectionId,
250+
connection_id: ConnectionId,
250251
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
251252
) {
252253
match event {
@@ -271,26 +272,22 @@ impl NetworkBehaviour for Behaviour {
271272
score: AddressScore::Finite(1),
272273
});
273274
}
275+
handler::Event::Identification(peer) => {
276+
self.events
277+
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent {
278+
peer_id: peer,
279+
}));
280+
}
274281
handler::Event::IdentificationPushed => {
275282
self.events
276283
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed {
277284
peer_id,
278285
}));
279286
}
280-
handler::Event::Identify(sender) => {
281-
let observed = self
282-
.connected
283-
.get(&peer_id)
284-
.and_then(|addrs| addrs.get(&connection))
285-
.expect(
286-
"`on_connection_handler_event` is only called \
287-
with an established connection and calling `NetworkBehaviour::on_event` \
288-
with `FromSwarm::ConnectionEstablished ensures there is an entry; qed",
289-
);
290-
self.pending_replies.push_back(Reply::Queued {
291-
peer: peer_id,
292-
io: sender,
293-
observed: observed.clone(),
287+
handler::Event::Identify => {
288+
self.requests.push(Request {
289+
peer_id,
290+
protocol: Protocol::Identify(connection_id),
294291
});
295292
}
296293
handler::Event::IdentificationError(error) => {
@@ -305,99 +302,41 @@ impl NetworkBehaviour for Behaviour {
305302

306303
fn poll(
307304
&mut self,
308-
cx: &mut Context<'_>,
305+
_cx: &mut Context<'_>,
309306
params: &mut impl PollParameters,
310307
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
311308
if let Some(event) = self.events.pop_front() {
312309
return Poll::Ready(event);
313310
}
314311

315-
// Check for a pending active push to perform.
316-
let peer_push = self.pending_push.iter().find_map(|peer| {
317-
self.connected.get(peer).map(|conns| {
318-
let observed_addr = conns
319-
.values()
320-
.next()
321-
.expect("connected peer has a connection")
322-
.clone();
323-
324-
let listen_addrs = listen_addrs(params);
325-
let protocols = supported_protocols(params);
326-
327-
let info = Info {
328-
public_key: self.config.local_public_key.clone(),
329-
protocol_version: self.config.protocol_version.clone(),
330-
agent_version: self.config.agent_version.clone(),
331-
listen_addrs,
332-
protocols,
333-
observed_addr,
334-
};
335-
336-
(*peer, Push(info))
337-
})
338-
});
339-
340-
if let Some((peer_id, push)) = peer_push {
341-
self.pending_push.remove(&peer_id);
342-
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
312+
// Check for pending requests.
313+
match self.requests.pop() {
314+
Some(Request {
315+
peer_id,
316+
protocol: Protocol::Push,
317+
}) => Poll::Ready(NetworkBehaviourAction::NotifyHandler {
343318
peer_id,
344-
event: push,
345319
handler: NotifyHandler::Any,
346-
});
347-
}
348-
349-
// Check for pending replies to send.
350-
if let Some(r) = self.pending_replies.pop_front() {
351-
let mut sending = 0;
352-
let to_send = self.pending_replies.len() + 1;
353-
let mut reply = Some(r);
354-
loop {
355-
match reply {
356-
Some(Reply::Queued { peer, io, observed }) => {
357-
let info = Info {
358-
listen_addrs: listen_addrs(params),
359-
protocols: supported_protocols(params),
360-
public_key: self.config.local_public_key.clone(),
361-
protocol_version: self.config.protocol_version.clone(),
362-
agent_version: self.config.agent_version.clone(),
363-
observed_addr: observed,
364-
};
365-
let io = Box::pin(io.send(info));
366-
reply = Some(Reply::Sending { peer, io });
367-
}
368-
Some(Reply::Sending { peer, mut io }) => {
369-
sending += 1;
370-
match Future::poll(Pin::new(&mut io), cx) {
371-
Poll::Ready(Ok(())) => {
372-
let event = Event::Sent { peer_id: peer };
373-
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
374-
}
375-
Poll::Pending => {
376-
self.pending_replies.push_back(Reply::Sending { peer, io });
377-
if sending == to_send {
378-
// All remaining futures are NotReady
379-
break;
380-
} else {
381-
reply = self.pending_replies.pop_front();
382-
}
383-
}
384-
Poll::Ready(Err(err)) => {
385-
let event = Event::Error {
386-
peer_id: peer,
387-
error: ConnectionHandlerUpgrErr::Upgrade(
388-
libp2p_core::upgrade::UpgradeError::Apply(err),
389-
),
390-
};
391-
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
392-
}
393-
}
394-
}
395-
None => unreachable!(),
396-
}
397-
}
320+
event: InEvent {
321+
listen_addrs: listen_addrs(params),
322+
supported_protocols: supported_protocols(params),
323+
protocol: Protocol::Push,
324+
},
325+
}),
326+
Some(Request {
327+
peer_id,
328+
protocol: Protocol::Identify(connection_id),
329+
}) => Poll::Ready(NetworkBehaviourAction::NotifyHandler {
330+
peer_id,
331+
handler: NotifyHandler::One(connection_id),
332+
event: InEvent {
333+
listen_addrs: listen_addrs(params),
334+
supported_protocols: supported_protocols(params),
335+
protocol: Protocol::Identify(connection_id),
336+
},
337+
}),
338+
None => Poll::Pending,
398339
}
399-
400-
Poll::Pending
401340
}
402341

403342
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
@@ -417,15 +356,27 @@ impl NetworkBehaviour for Behaviour {
417356
}) => {
418357
if remaining_established == 0 {
419358
self.connected.remove(&peer_id);
420-
self.pending_push.remove(&peer_id);
359+
self.requests.retain(|request| {
360+
request
361+
!= &Request {
362+
peer_id,
363+
protocol: Protocol::Push,
364+
}
365+
});
421366
} else if let Some(addrs) = self.connected.get_mut(&peer_id) {
422367
addrs.remove(&connection_id);
423368
}
424369
}
425370
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
426371
if let Some(peer_id) = peer_id {
427372
if !self.connected.contains_key(&peer_id) {
428-
self.pending_push.remove(&peer_id);
373+
self.requests.retain(|request| {
374+
request
375+
!= &Request {
376+
peer_id,
377+
protocol: Protocol::Push,
378+
}
379+
});
429380
}
430381
}
431382

@@ -437,14 +388,17 @@ impl NetworkBehaviour for Behaviour {
437388
}
438389
}
439390
}
440-
FromSwarm::NewListenAddr(_) => {
441-
if self.config.push_listen_addr_updates {
442-
self.pending_push.extend(self.connected.keys());
443-
}
444-
}
445-
FromSwarm::ExpiredListenAddr(_) => {
391+
FromSwarm::NewListenAddr(_) | FromSwarm::ExpiredListenAddr(_) => {
446392
if self.config.push_listen_addr_updates {
447-
self.pending_push.extend(self.connected.keys());
393+
for p in self.connected.keys() {
394+
let request = Request {
395+
peer_id: *p,
396+
protocol: Protocol::Push,
397+
};
398+
if !self.requests.contains(&request) {
399+
self.requests.push(request);
400+
}
401+
}
448402
}
449403
}
450404
FromSwarm::AddressChange(_)
@@ -509,7 +463,7 @@ fn listen_addrs(params: &impl PollParameters) -> Vec<Multiaddr> {
509463
/// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true.
510464
fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
511465
let last_component = addr.iter().last();
512-
if let Some(Protocol::P2p(multi_addr_peer_id)) = last_component {
466+
if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
513467
return multi_addr_peer_id == *peer_id.as_ref();
514468
}
515469
true
@@ -557,6 +511,7 @@ impl PeerCache {
557511
mod tests {
558512
use super::*;
559513
use futures::pin_mut;
514+
use futures::prelude::*;
560515
use libp2p_core::{identity, muxing::StreamMuxerBox, transport, upgrade, PeerId, Transport};
561516
use libp2p_mplex::MplexConfig;
562517
use libp2p_noise as noise;
@@ -618,7 +573,7 @@ mod tests {
618573

619574
// nb. Either swarm may receive the `Identified` event first, upon which
620575
// it will permit the connection to be closed, as defined by
621-
// `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if
576+
// `Handler::connection_keep_alive`. Hence the test succeeds if
622577
// either `Identified` event arrives correctly.
623578
async_std::task::block_on(async move {
624579
loop {
@@ -835,8 +790,8 @@ mod tests {
835790
let addr_without_peer_id: Multiaddr = addr.clone();
836791
let mut addr_with_other_peer_id = addr.clone();
837792

838-
addr.push(Protocol::P2p(peer_id.into()));
839-
addr_with_other_peer_id.push(Protocol::P2p(other_peer_id.into()));
793+
addr.push(multiaddr::Protocol::P2p(peer_id.into()));
794+
addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id.into()));
840795

841796
assert!(multiaddr_matches_peer_id(&addr, &peer_id));
842797
assert!(!multiaddr_matches_peer_id(

0 commit comments

Comments
 (0)