diff --git a/service/src/lib.rs b/service/src/lib.rs index 6e5d4833ea53..37db8cf423f1 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -18,7 +18,6 @@ pub mod chain_spec; -use futures01::sync::mpsc; use futures::{FutureExt, TryFutureExt, task::{Spawn, SpawnError, FutureObj}}; use client::LongestChain; use std::sync::Arc; @@ -280,8 +279,12 @@ pub fn new_full(config: Configuration) Dispatch: NativeExecutionDispatch + 'static, Extrinsic: RuntimeExtrinsic, { - use sc_network::DhtEvent; - use futures::{compat::Stream01CompatExt, stream::StreamExt}; + use sc_network::Event; + use futures01::Stream; + use futures::{ + compat::Stream01CompatExt, + stream::StreamExt, + }; let is_collator = config.custom.collating_for.is_some(); let is_authority = config.roles.is_authority() && !is_collator; @@ -305,19 +308,11 @@ pub fn new_full(config: Configuration) let (builder, mut import_setup, inherent_data_providers) = new_full_start!(config, Runtime, Dispatch); - // Dht event channel from the network to the authority discovery module. Use - // bounded channel to ensure back-pressure. Authority discovery is triggering one - // event per authority within the current authority set. This estimates the - // authority set size to be somewhere below 10 000 thereby setting the channel - // buffer size to 10 000. - let (dht_event_tx, dht_event_rx) = mpsc::channel::(10000); - let service = builder .with_network_protocol(|config| Ok(PolkadotProtocol::new(config.custom.collating_for.clone())))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _) )? - .with_dht_event_tx(dht_event_tx)? .build()?; let (block_import, link_half, babe_link) = import_setup.take() @@ -441,15 +436,20 @@ pub fn new_full(config: Configuration) service.spawn_essential_task(babe); if authority_discovery_enabled { - let future03_dht_event_rx = dht_event_rx.compat() + let network = service.network(); + let dht_event_stream = network.event_stream().filter_map(|e| match e { + Event::Dht(e) => Some(e), + _ => None, + }); + let future03_dht_event_stream = dht_event_stream.compat() .map(|x| x.expect(" never returns an error; qed")) .boxed(); let authority_discovery = authority_discovery::AuthorityDiscovery::new( service.client(), - service.network(), + network, sentry_nodes, service.keystore(), - future03_dht_event_rx, + future03_dht_event_stream, ); let future01_authority_discovery = authority_discovery.map(|x| Ok(x)).compat();