Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit f9013b2

Browse files
committed
Merge branch 'master' into v0.2
2 parents dede5ab + a118a85 commit f9013b2

File tree

9 files changed

+193
-215
lines changed

9 files changed

+193
-215
lines changed

Diff for: Cargo.lock

+135-160
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: build.rs

+1
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ const ERROR_MSG: &'static str = "Failed to generate metadata files";
2222

2323
fn main() {
2424
vergen(OutputFns::all()).expect(ERROR_MSG);
25+
println!("cargo:rerun-if-changed=.git/HEAD");
2526
}

Diff for: polkadot/service/res/krummelanke.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"bootNodes": [
4444
"/ip4/104.211.54.233/tcp/30333/p2p/QmRMGcQh69t8a8YwzHkofVo9SFr7ffggUwhAYjVSTChmrd",
4545
"/ip4/104.211.48.51/tcp/30333/p2p/QmWCnXrhM1in1qPqVT3rDXQEJHedAzbPDMimdjqy2P9fGn",
46-
"/ip4/104.211.48.247/tcp/30333/p2p/QmYPx99i3H8EKXrvYHTBwqz3jjFC1kBfkvmSKd2h9zwQFr",
46+
"/ip4/104.211.48.247/tcp/30333/p2p/QmY33GW69TnTsdQWjAkxJR1GrWTdeV1PmzzcSmUay4HvAB",
4747
"/ip4/40.114.120.164/tcp/30333/p2p/QmWzYU5X1NpFrprD1YZF5Lcj9aE5WF4QEg5FpvQx5XGWG7",
4848
"/ip4/40.117.153.33/tcp/30333/p2p/QmSz8qCADMmi92QB8dTqMPu56JYQQKZBAHz7y8KXjvqcvW"
4949
],

Diff for: substrate/network-libp2p/Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ bytes = "0.4"
1111
error-chain = { version = "0.12", default-features = false }
1212
fnv = "1.0"
1313
futures = "0.1"
14-
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "fad12c89ea2b6f1f6420557db6e9305fb03f9f67", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
14+
libp2p = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
1515
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
1616
ethkey = { git = "https://github.com/paritytech/parity.git" }
1717
ethereum-types = "0.3"
@@ -20,10 +20,10 @@ parking_lot = "0.5"
2020
libc = "0.2"
2121
log = "0.3"
2222
rand = "0.5.0"
23-
tokio-core = "0.1"
23+
tokio = "0.1"
2424
tokio-io = "0.1"
2525
tokio-timer = "0.2"
26-
varint = { git = "https://github.com/libp2p/rust-libp2p" }
26+
varint = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2" }
2727

2828
[dev-dependencies]
2929
assert_matches = "1.2"

Diff for: substrate/network-libp2p/src/lib.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919

2020
extern crate parking_lot;
2121
extern crate fnv;
22-
#[macro_use]
2322
extern crate futures;
24-
extern crate tokio_core;
23+
extern crate tokio;
2524
extern crate tokio_io;
2625
extern crate tokio_timer;
2726
extern crate ethkey;

Diff for: substrate/network-libp2p/src/network_state.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -658,10 +658,6 @@ impl NetworkState {
658658
peer_info.id,
659659
peer_info.kad_connec.is_alive(),
660660
peer_info.protocols.iter().filter(|c| c.1.is_alive()).count());
661-
// TODO: we manually clear the connections as a work-around for
662-
// networking bugs ; normally it should automatically drop
663-
for c in peer_info.protocols.iter() { c.1.clear(); }
664-
peer_info.kad_connec.clear();
665661
let old = connections.peer_by_nodeid.remove(&peer_info.id);
666662
debug_assert_eq!(old, Some(who));
667663
}
@@ -852,11 +848,11 @@ fn parse_and_add_to_node_store(
852848
NodeStore::Memory(ref node_store) =>
853849
node_store
854850
.peer_or_create(&who)
855-
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
851+
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
856852
NodeStore::Json(ref node_store) =>
857853
node_store
858854
.peer_or_create(&who)
859-
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
855+
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
860856
}
861857

862858
Ok(who)

Diff for: substrate/network-libp2p/src/service.rs

+28-22
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use std::thread;
4141
use std::time::{Duration, Instant};
4242
use futures::{future, Future, Stream, IntoFuture};
4343
use futures::sync::{mpsc, oneshot};
44-
use tokio_core::reactor::{Core, Handle};
44+
use tokio::runtime::current_thread;
4545
use tokio_io::{AsyncRead, AsyncWrite};
4646
use tokio_timer::{Interval, Deadline};
4747

@@ -118,7 +118,7 @@ impl NetworkService {
118118
local_peer_id: local_peer_id.clone(),
119119
kbuckets_timeout: Duration::from_secs(600),
120120
request_timeout: Duration::from_secs(10),
121-
known_initial_peers: network_state.known_peers().collect(),
121+
known_initial_peers: network_state.known_peers(),
122122
});
123123

124124
let shared = Arc::new(Shared {
@@ -191,16 +191,16 @@ impl NetworkService {
191191

192192
let shared = self.shared.clone();
193193
let join_handle = thread::spawn(move || {
194-
// Tokio core that is going to run everything in this thread.
195-
let mut core = match Core::new() {
194+
// Tokio runtime that is going to run everything in this thread.
195+
let mut runtime = match current_thread::Runtime::new() {
196196
Ok(c) => c,
197197
Err(err) => {
198198
let _ = init_tx.send(Err(err.into()));
199199
return
200200
}
201201
};
202202

203-
let fut = match init_thread(core.handle(), shared,
203+
let fut = match init_thread(shared,
204204
timeouts_register_rx, close_rx) {
205205
Ok(future) => {
206206
debug!(target: "sub-libp2p", "Successfully started networking service");
@@ -213,7 +213,7 @@ impl NetworkService {
213213
}
214214
};
215215

216-
match core.run(fut) {
216+
match runtime.block_on(fut) {
217217
Ok(()) => debug!(target: "sub-libp2p", "libp2p future finished"),
218218
Err(err) => error!(target: "sub-libp2p", "error while running libp2p: {:?}", err),
219219
}
@@ -395,7 +395,6 @@ impl NetworkContext for NetworkContextImpl {
395395
/// - `timeouts_register_rx` should receive newly-registered timeouts.
396396
/// - `close_rx` should be triggered when we want to close the network.
397397
fn init_thread(
398-
core: Handle,
399398
shared: Arc<Shared>,
400399
timeouts_register_rx: mpsc::UnboundedReceiver<
401400
(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))
@@ -405,7 +404,6 @@ fn init_thread(
405404
// Build the transport layer.
406405
let transport = {
407406
let base = transport::build_transport(
408-
core.clone(),
409407
transport::UnencryptedAllowed::Denied,
410408
shared.network_state.local_private_key().clone()
411409
);
@@ -535,7 +533,7 @@ fn init_thread(
535533

536534
// Build the timeouts system for the `register_timeout` function.
537535
// (note: this has nothing to do with socket timeouts)
538-
let timeouts = timeouts::build_timeouts_stream(core.clone(), timeouts_register_rx)
536+
let timeouts = timeouts::build_timeouts_stream(timeouts_register_rx)
539537
.for_each({
540538
let shared = shared.clone();
541539
move |(handler, protocol_id, timer_token)| {
@@ -630,7 +628,7 @@ fn listener_handle<'a, C>(
630628
match shared.network_state.ping_connection(node_id.clone()) {
631629
Ok((_, ping_connec)) => {
632630
trace!(target: "sub-libp2p", "Successfully opened ping substream with {:?}", node_id);
633-
let fut = ping_connec.set_until(pinger, future);
631+
let fut = ping_connec.tie_or_passthrough(pinger, future);
634632
Box::new(fut) as Box<_>
635633
},
636634
Err(err) => Box::new(future::err(err)) as Box<_>
@@ -687,7 +685,7 @@ fn handle_kademlia_connection(
687685
val
688686
});
689687

690-
Ok(kad_connec.set_until(controller, future))
688+
Ok(kad_connec.tie_or_passthrough(controller, future))
691689
}
692690

693691
/// When a remote performs a `FIND_NODE` Kademlia request for `searched`,
@@ -823,7 +821,7 @@ fn handle_custom_connection(
823821
});
824822

825823
let val = (custom_proto_out.outgoing, custom_proto_out.protocol_version);
826-
let final_fut = unique_connec.set_until(val, fut)
824+
let final_fut = unique_connec.tie_or_stop(val, fut)
827825
.then(move |val| {
828826
// Makes sure that `dc_guard` is kept alive until here.
829827
drop(dc_guard);
@@ -950,7 +948,7 @@ fn perform_kademlia_query<T, To, St, C>(
950948
let random_peer_id = random_key.into_peer_id();
951949
trace!(target: "sub-libp2p", "Start kademlia discovery for {:?}", random_peer_id);
952950

953-
shared.clone()
951+
let future = shared.clone()
954952
.kad_system
955953
.find_node(random_peer_id, {
956954
let shared = shared.clone();
@@ -974,7 +972,10 @@ fn perform_kademlia_query<T, To, St, C>(
974972
)
975973
.into_future()
976974
.map_err(|(err, _)| err)
977-
.map(|_| ())
975+
.map(|_| ());
976+
977+
// Note that we use a `Box` in order to speed up compilation.
978+
Box::new(future) as Box<Future<Item = _, Error = _>>
978979
}
979980

980981
/// Connects to additional nodes, if necessary.
@@ -1163,8 +1164,7 @@ fn open_peer_custom_proto<T, To, St, C>(
11631164
);
11641165
}
11651166

1166-
// TODO: this future should be used
1167-
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
1167+
unique_connec.dial(&swarm_controller, &addr, with_err);
11681168
},
11691169
Err(err) => {
11701170
trace!(target: "sub-libp2p",
@@ -1200,11 +1200,14 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
12001200
})
12011201
});
12021202

1203-
shared.network_state
1203+
let future = shared.network_state
12041204
.kad_connection(who.clone())
12051205
.into_future()
1206-
.map(move |(_, k)| k.get_or_dial(&swarm_controller, &addr, transport))
1207-
.flatten()
1206+
.map(move |(_, k)| k.dial(&swarm_controller, &addr, transport))
1207+
.flatten();
1208+
1209+
// Note that we use a Box in order to speed up compilation.
1210+
Box::new(future) as Box<Future<Item = _, Error = _>>
12081211
}
12091212

12101213
/// Processes the information about a node.
@@ -1305,7 +1308,7 @@ fn ping_all<T, St, C>(
13051308

13061309
let addr = Multiaddr::from(AddrComponent::P2P(who.clone().into_bytes()));
13071310
let fut = pinger
1308-
.get_or_dial(&swarm_controller, &addr, transport.clone())
1311+
.dial(&swarm_controller, &addr, transport.clone())
13091312
.and_then(move |mut p| {
13101313
trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, who);
13111314
p.ping()
@@ -1334,7 +1337,7 @@ fn ping_all<T, St, C>(
13341337
ping_futures.push(fut);
13351338
}
13361339

1337-
future::loop_fn(ping_futures, |ping_futures| {
1340+
let future = future::loop_fn(ping_futures, |ping_futures| {
13381341
if ping_futures.is_empty() {
13391342
let fut = future::ok(future::Loop::Break(()));
13401343
return future::Either::A(fut)
@@ -1344,7 +1347,10 @@ fn ping_all<T, St, C>(
13441347
.map(|((), _, rest)| future::Loop::Continue(rest))
13451348
.map_err(|(err, _, _)| err);
13461349
future::Either::B(fut)
1347-
})
1350+
});
1351+
1352+
// Note that we use a Box in order to speed up compilation.
1353+
Box::new(future) as Box<Future<Item = _, Error = _>>
13481354
}
13491355

13501356
/// Expects a multiaddr of the format `/p2p/<node_id>` and returns the node ID.

Diff for: substrate/network-libp2p/src/timeouts.rs

+21-18
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
1616

1717
use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc};
18-
use std::io::Error as IoError;
18+
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
1919
use std::marker::PhantomData;
2020
use std::time::{Duration, Instant};
21-
use tokio_core::reactor::{Handle, Timeout};
21+
use tokio_timer::{self, Delay};
2222

2323
/// Builds the timeouts system.
2424
///
@@ -27,21 +27,18 @@ use tokio_core::reactor::{Handle, Timeout};
2727
/// `T` can be anything you want, as it is transparently passed from the input
2828
/// to the output. Timeouts continue to fire forever, as there is no way to
2929
/// unregister them.
30-
pub fn build_timeouts_stream<T>(
31-
core: Handle,
30+
pub fn build_timeouts_stream<'a, T>(
3231
timeouts_rx: mpsc::UnboundedReceiver<(Duration, T)>
33-
) -> impl Stream<Item = T, Error = IoError>
34-
where T: Clone {
32+
) -> Box<Stream<Item = T, Error = IoError> + 'a>
33+
where T: Clone + 'a {
3534
let next_timeout = next_in_timeouts_stream(timeouts_rx);
3635

3736
// The `unfold` function is essentially a loop turned into a stream. The
3837
// first parameter is the initial state, and the closure returns the new
3938
// state and an item.
40-
stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
39+
let stream = stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
4140
// `timeouts` is a `Vec` of futures that produce an `Out`.
4241

43-
let core = core.clone();
44-
4542
// `select_ok` panics if `timeouts` is empty anyway.
4643
if timeouts.is_empty() {
4744
return None
@@ -53,8 +50,7 @@ pub fn build_timeouts_stream<T>(
5350
Out::NewTimeout((Some((duration, item)), next_timeouts)) => {
5451
// Received a new timeout request on the channel.
5552
let next_timeout = next_in_timeouts_stream(next_timeouts);
56-
let at = Instant::now() + duration;
57-
let timeout = Timeout::new_at(at, &core)?;
53+
let timeout = Delay::new(Instant::now() + duration);
5854
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
5955
timeouts.push(future::Either::B(timeout));
6056
timeouts.push(future::Either::A(next_timeout));
@@ -66,16 +62,18 @@ pub fn build_timeouts_stream<T>(
6662
Out::Timeout(duration, item) => {
6763
// A timeout has happened.
6864
let returned = item.clone();
69-
let at = Instant::now() + duration;
70-
let timeout = Timeout::new_at(at, &core)?;
65+
let timeout = Delay::new(Instant::now() + duration);
7166
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
7267
timeouts.push(future::Either::B(timeout));
7368
Ok((Some(returned), timeouts))
7469
},
7570
}
7671
)
7772
)
78-
}).filter_map(|item| item)
73+
}).filter_map(|item| item);
74+
75+
// Note that we use a `Box` in order to speed up compilation time.
76+
Box::new(stream) as Box<Stream<Item = _, Error = _>>
7977
}
8078

8179
/// Local enum representing the output of the selection.
@@ -97,15 +95,20 @@ fn next_in_timeouts_stream<T, B>(
9795
.map_err(|_| unreachable!("an UnboundedReceiver can never error"))
9896
}
9997

100-
/// Does the equivalent to `future.map(move |()| (duration, item))`.
98+
/// Does the equivalent to `future.map(move |()| (duration, item)).map_err(|err| to_io_err(err))`.
10199
struct TimeoutWrapper<A, F, T>(F, Duration, Option<T>, PhantomData<A>);
102100
impl<A, F, T> Future for TimeoutWrapper<A, F, T>
103-
where F: Future<Item = ()> {
101+
where F: Future<Item = (), Error = tokio_timer::Error> {
104102
type Item = Out<A, T>;
105-
type Error = F::Error;
103+
type Error = IoError;
106104

107105
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
108-
let _ready: () = try_ready!(self.0.poll());
106+
match self.0.poll() {
107+
Ok(Async::Ready(())) => (),
108+
Ok(Async::NotReady) => return Ok(Async::NotReady),
109+
Err(err) => return Err(IoError::new(IoErrorKind::Other, err.to_string())),
110+
}
111+
109112
let out = Out::Timeout(self.1, self.2.take().expect("poll() called again after success"));
110113
Ok(Async::Ready(out))
111114
}

Diff for: substrate/network-libp2p/src/transport.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@ use libp2p::{self, Transport, mplex, secio, yamux};
1818
use libp2p::core::{MuxedTransport, either, upgrade};
1919
use libp2p::transport_timeout::TransportTimeout;
2020
use std::time::Duration;
21-
use tokio_core::reactor::Handle;
2221
use tokio_io::{AsyncRead, AsyncWrite};
2322

2423
/// Builds the transport that serves as a common ground for all connections.
2524
pub fn build_transport(
26-
core: Handle,
2725
unencrypted_allowed: UnencryptedAllowed,
2826
local_private_key: secio::SecioKeyPair
2927
) -> impl MuxedTransport<Output = impl AsyncRead + AsyncWrite> + Clone {
30-
let base = libp2p::CommonTransport::new(core)
28+
let base = libp2p::CommonTransport::new()
3129
.with_upgrade({
3230
let secio = secio::SecioConfig {
3331
key: local_private_key,

0 commit comments

Comments
 (0)