Skip to content

Commit 93290d0

Browse files
committed
feat/queued-notifications: added a queued-notifier
Rationale is explained in the doc comments and doc example for it
1 parent 20f71b3 commit 93290d0

File tree

7 files changed

+357
-192
lines changed

7 files changed

+357
-192
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ rand = "~0.3.16"
2020
rust_sodium = "~0.5.0"
2121
serde = "~1.0.11"
2222
serde_derive = "~1.0.11"
23+
unwrap = "~1.1.0"
2324

2425
[dependencies.socket-collection]
2526
version = "~0.3.0"
@@ -30,7 +31,6 @@ version = "~0.3.0"
3031
env_logger = "~0.5.13"
3132
maidsafe_utilities = "~0.15.0"
3233
serde_json = "~1.0.2"
33-
unwrap = "~1.1.0"
3434

3535
#[features]
3636
#enable-udt = ["socket-collection/enable-udt"]

examples/peer_impl/overlay_connect.rs

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use mio::{Poll, PollOpt, Ready, Token};
55
use mio_extras::timer::Timeout;
66
use p2p::{
77
msg_to_read, msg_to_send, Handle, HolePunchInfo, HolePunchMediator, Interface, NatInfo,
8-
RendezvousInfo, Res,
8+
QueuedNotifier, RendezvousInfo, Res,
99
};
1010
use socket_collection::TcpSock;
1111
use sodium::crypto::box_;
@@ -117,32 +117,29 @@ impl OverlayConnect {
117117

118118
if let PeerState::Discovered = *stored_state {
119119
let weak = self.self_weak.clone();
120-
let handler = move |ifc: &mut Interface, poll: &Poll, nat_info, res| {
120+
let handler = move |ifc: &mut Interface, poll: &Poll, (nat_info, res)| {
121121
if let Some(overlay_connect) = weak.upgrade() {
122122
if let Some(core) = ifc.as_any().downcast_mut::<Core>() {
123-
overlay_connect.borrow_mut().handle_rendezvous_res(
124-
core,
125-
poll,
126-
peer.clone(),
127-
nat_info,
128-
res,
129-
);
123+
overlay_connect
124+
.borrow_mut()
125+
.handle_rendezvous_res(core, poll, peer, nat_info, res);
130126
} else {
131127
warn!("Failed to conver Interface to Core");
132128
}
133129
}
134130
};
135131

136-
let next_state = match HolePunchMediator::start(core, poll, Box::new(handler)) {
137-
Ok(mediator_token) => PeerState::CreatingRendezvousInfo {
138-
mediator_token,
139-
peer_info: None,
140-
},
141-
Err(e) => {
142-
info!("Could not initialise p2p mediator: {:?}", e);
143-
return;
144-
}
145-
};
132+
let next_state =
133+
match HolePunchMediator::start(core, poll, QueuedNotifier::new(handler)) {
134+
Ok(mediator_token) => PeerState::CreatingRendezvousInfo {
135+
mediator_token,
136+
peer_info: None,
137+
},
138+
Err(e) => {
139+
info!("Could not initialise p2p mediator: {:?}", e);
140+
return;
141+
}
142+
};
146143

147144
*stored_state = next_state;
148145
} else {
@@ -360,38 +357,37 @@ impl OverlayConnect {
360357
let handler = move |ifc: &mut Interface, poll: &Poll, res| {
361358
if let Some(overlay_connect) = weak.upgrade() {
362359
if let Some(core) = ifc.as_any().downcast_mut::<Core>() {
363-
overlay_connect.borrow_mut().handle_holepunch_res(
364-
core,
365-
poll,
366-
src_peer.clone(),
367-
res,
368-
);
360+
overlay_connect
361+
.borrow_mut()
362+
.handle_holepunch_res(core, poll, src_peer, res);
369363
} else {
370364
warn!("Failed to conver Interface to Core");
371365
}
372366
}
373367
};
374-
Handle::start_hole_punch(core, mediator_token, src_info, Box::new(handler));
368+
Handle::start_hole_punch(
369+
core,
370+
poll,
371+
mediator_token,
372+
src_info,
373+
QueuedNotifier::new(handler),
374+
);
375375
PeerState::AwaitingHolePunchResult
376376
}
377377
PeerState::Discovered => {
378378
let weak = self.self_weak.clone();
379-
let handler = move |ifc: &mut Interface, poll: &Poll, nat_info, res| {
379+
let handler = move |ifc: &mut Interface, poll: &Poll, (nat_info, res)| {
380380
if let Some(overlay_connect) = weak.upgrade() {
381381
if let Some(core) = ifc.as_any().downcast_mut::<Core>() {
382-
overlay_connect.borrow_mut().handle_rendezvous_res(
383-
core,
384-
poll,
385-
src_peer.clone(),
386-
nat_info,
387-
res,
388-
);
382+
overlay_connect
383+
.borrow_mut()
384+
.handle_rendezvous_res(core, poll, src_peer, nat_info, res);
389385
} else {
390386
warn!("Failed to conver Interface to Core");
391387
}
392388
}
393389
};
394-
match HolePunchMediator::start(core, poll, Box::new(handler)) {
390+
match HolePunchMediator::start(core, poll, QueuedNotifier::new(handler)) {
395391
Ok(mediator_token) => PeerState::CreatingRendezvousInfo {
396392
mediator_token,
397393
peer_info: Some(src_info),
@@ -471,22 +467,20 @@ impl OverlayConnect {
471467
let handler = move |ifc: &mut Interface, poll: &Poll, res| {
472468
if let Some(overlay_connect) = weak.upgrade() {
473469
if let Some(core) = ifc.as_any().downcast_mut::<Core>() {
474-
overlay_connect.borrow_mut().handle_holepunch_res(
475-
core,
476-
poll,
477-
for_peer.clone(),
478-
res,
479-
);
470+
overlay_connect
471+
.borrow_mut()
472+
.handle_holepunch_res(core, poll, for_peer, res);
480473
} else {
481474
warn!("Failed to conver Interface to Core");
482475
}
483476
}
484477
};
485478
Handle::start_hole_punch(
486479
core,
480+
poll,
487481
mediator_token,
488482
peer_info,
489-
Box::new(handler),
483+
QueuedNotifier::new(handler),
490484
);
491485
PeerState::AwaitingHolePunchResult
492486
} else {

src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ quick_error! {
106106
description("Invalid state - the state may already be active or is an operation is not \
107107
supposed to be permitted for this state")
108108
}
109+
/// Notifier has expired (possibly already used for notification)
110+
NotifierExpired {
111+
description("Notifier has expired (possibly already used for notification)")
112+
}
109113
/// Socket is not available
110114
UnregisteredSocket {
111115
description("Socket is not available")

0 commit comments

Comments
 (0)