@@ -73,6 +73,7 @@ use futures::{
73
73
Future , FutureExt , StreamExt ,
74
74
} ;
75
75
use lru:: LruCache ;
76
+ use parking_lot:: RwLock ;
76
77
77
78
use polkadot_primitives:: v1:: { Block , BlockId , BlockNumber , Hash , ParachainHost } ;
78
79
use client:: { BlockImportNotification , BlockchainEvents , FinalityNotification } ;
@@ -159,13 +160,24 @@ impl<Client> HeadSupportsParachains for Arc<Client> where
159
160
}
160
161
161
162
162
- /// A handler used to communicate with the [`Overseer`].
163
+ /// A handle used to communicate with the [`Overseer`].
163
164
///
164
165
/// [`Overseer`]: struct.Overseer.html
165
166
#[ derive( Clone ) ]
166
- pub struct Handle ( pub OverseerHandle ) ;
167
+ pub enum Handle {
168
+ /// Used only at initialization to break the cyclic dependency.
169
+ // TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
170
+ Disconnected ( Arc < RwLock < Option < OverseerHandle > > > ) ,
171
+ /// A handle to the overseer.
172
+ Connected ( OverseerHandle ) ,
173
+ }
167
174
168
175
impl Handle {
176
+ /// Create a new disconnected [`Handle`].
177
+ pub fn new_disconnected ( ) -> Self {
178
+ Self :: Disconnected ( Arc :: new ( RwLock :: new ( None ) ) )
179
+ }
180
+
169
181
/// Inform the `Overseer` that that some block was imported.
170
182
pub async fn block_imported ( & mut self , block : BlockInfo ) {
171
183
self . send_and_log_error ( Event :: BlockImported ( block) ) . await
@@ -207,25 +219,59 @@ impl Handle {
207
219
208
220
/// Most basic operation, to stop a server.
209
221
async fn send_and_log_error ( & mut self , event : Event ) {
210
- if self . 0 . send ( event) . await . is_err ( ) {
211
- tracing:: info!( target: LOG_TARGET , "Failed to send an event to Overseer" ) ;
222
+ self . try_connect ( ) ;
223
+ if let Self :: Connected ( ref mut handle) = self {
224
+ if handle. send ( event) . await . is_err ( ) {
225
+ tracing:: info!( target: LOG_TARGET , "Failed to send an event to Overseer" ) ;
226
+ }
227
+ } else {
228
+ tracing:: warn!( target: LOG_TARGET , "Using a disconnected Handle to send to Overseer" ) ;
212
229
}
213
230
}
214
231
215
- /// Whether the overseer handler is connected to an overseer.
216
- pub fn is_connected ( & self ) -> bool {
217
- true
232
+ /// Whether the handle is disconnected.
233
+ pub fn is_disconnected ( & self ) -> bool {
234
+ match self {
235
+ Self :: Disconnected ( ref x) => x. read ( ) . is_none ( ) ,
236
+ _ => false ,
237
+ }
218
238
}
219
239
220
- /// Whether the handler is disconnected.
221
- pub fn is_disconnected ( & self ) -> bool {
222
- false
240
+ /// Connect this handle and all disconnected clones of it to the overseer.
241
+ pub fn connect_to_overseer ( & mut self , handle : OverseerHandle ) {
242
+ match self {
243
+ Self :: Disconnected ( ref mut x) => {
244
+ let mut maybe_handle = x. write ( ) ;
245
+ if maybe_handle. is_none ( ) {
246
+ tracing:: info!( target: LOG_TARGET , "🖇️ Connecting all Handles to Overseer" ) ;
247
+ * maybe_handle = Some ( handle) ;
248
+ } else {
249
+ tracing:: warn!(
250
+ target: LOG_TARGET ,
251
+ "Attempting to connect a clone of a connected Handle" ,
252
+ ) ;
253
+ }
254
+ } ,
255
+ _ => {
256
+ tracing:: warn!(
257
+ target: LOG_TARGET ,
258
+ "Attempting to connect an already connected Handle" ,
259
+ ) ;
260
+ } ,
261
+ }
223
262
}
224
263
225
- /// Using this handler, connect another handler to the same
226
- /// overseer, if any.
227
- pub fn connect_other ( & self , other : & mut Handle ) {
228
- * other = self . clone ( ) ;
264
+ /// Try upgrading from `Self::Disconnected` to `Self::Connected` state
265
+ /// after calling `connect_to_overseer` on `self` or a clone of `self`.
266
+ fn try_connect ( & mut self ) {
267
+ if let Self :: Disconnected ( ref mut x) = self {
268
+ let guard = x. write ( ) ;
269
+ if let Some ( ref h) = * guard {
270
+ let handle = h. clone ( ) ;
271
+ drop ( guard) ;
272
+ * self = Self :: Connected ( handle) ;
273
+ }
274
+ }
229
275
}
230
276
}
231
277
@@ -301,7 +347,7 @@ pub enum ExternalRequest {
301
347
/// import and finality notifications into the [`OverseerHandle`].
302
348
pub async fn forward_events < P : BlockchainEvents < Block > > (
303
349
client : Arc < P > ,
304
- mut handler : Handle ,
350
+ mut handle : Handle ,
305
351
) {
306
352
let mut finality = client. finality_notification_stream ( ) ;
307
353
let mut imports = client. import_notification_stream ( ) ;
@@ -311,15 +357,15 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
311
357
f = finality. next( ) => {
312
358
match f {
313
359
Some ( block) => {
314
- handler . block_finalized( block. into( ) ) . await ;
360
+ handle . block_finalized( block. into( ) ) . await ;
315
361
}
316
362
None => break ,
317
363
}
318
364
} ,
319
365
i = imports. next( ) => {
320
366
match i {
321
367
Some ( block) => {
322
- handler . block_imported( block. into( ) ) . await ;
368
+ handle . block_imported( block. into( ) ) . await ;
323
369
}
324
370
None => break ,
325
371
}
@@ -338,7 +384,6 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
338
384
network=NetworkBridgeEvent <protocol_v1:: ValidationProtocol >,
339
385
) ]
340
386
pub struct Overseer < SupportsParachains > {
341
-
342
387
#[ subsystem( no_dispatch, CandidateValidationMessage ) ]
343
388
candidate_validation : CandidateValidation ,
344
389
@@ -390,16 +435,16 @@ pub struct Overseer<SupportsParachains> {
390
435
#[ subsystem( no_dispatch, GossipSupportMessage ) ]
391
436
gossip_support : GossipSupport ,
392
437
393
- #[ subsystem( no_dispatch, wip , DisputeCoordinatorMessage ) ]
394
- dipute_coordinator : DisputeCoordinator ,
438
+ #[ subsystem( no_dispatch, DisputeCoordinatorMessage ) ]
439
+ dispute_coordinator : DisputeCoordinator ,
395
440
396
- #[ subsystem( no_dispatch, wip , DisputeParticipationMessage ) ]
441
+ #[ subsystem( no_dispatch, DisputeParticipationMessage ) ]
397
442
dispute_participation : DisputeParticipation ,
398
443
399
- #[ subsystem( no_dispatch, wip , DisputeDistributionMessage ) ]
400
- dipute_distribution : DisputeDistribution ,
444
+ #[ subsystem( no_dispatch, DisputeDistributionMessage ) ]
445
+ dispute_distribution : DisputeDistribution ,
401
446
402
- #[ subsystem( no_dispatch, wip , ChainSelectionMessage ) ]
447
+ #[ subsystem( no_dispatch, ChainSelectionMessage ) ]
403
448
chain_selection : ChainSelection ,
404
449
405
450
/// External listeners waiting for a hash to be in the active-leave set.
@@ -436,7 +481,7 @@ where
436
481
/// This returns the overseer along with an [`OverseerHandle`] which can
437
482
/// be used to send messages from external parts of the codebase.
438
483
///
439
- /// The [`OverseerHandler `] returned from this function is connected to
484
+ /// The [`OverseerHandle `] returned from this function is connected to
440
485
/// the returned [`Overseer`].
441
486
///
442
487
/// ```text
@@ -527,7 +572,7 @@ where
527
572
/// let spawner = sp_core::testing::TaskExecutor::new();
528
573
/// let all_subsystems = AllSubsystems::<()>::dummy()
529
574
/// .replace_candidate_validation(ValidationSubsystem);
530
- /// let (overseer, _handler ) = Overseer::new(
575
+ /// let (overseer, _handle ) = Overseer::new(
531
576
/// vec![],
532
577
/// all_subsystems,
533
578
/// None,
@@ -549,13 +594,13 @@ where
549
594
/// # });
550
595
/// # }
551
596
/// ```
552
- pub fn new < CV , CB , SD , AD , AR , BS , BD , P , RA , AS , NB , CA , CG , CP , ApD , ApV , GS > (
597
+ pub fn new < CV , CB , SD , AD , AR , BS , BD , P , RA , AS , NB , CA , CG , CP , ApD , ApV , GS , DC , DP , DD , CS > (
553
598
leaves : impl IntoIterator < Item = BlockInfo > ,
554
- all_subsystems : AllSubsystems < CV , CB , SD , AD , AR , BS , BD , P , RA , AS , NB , CA , CG , CP , ApD , ApV , GS > ,
599
+ all_subsystems : AllSubsystems < CV , CB , SD , AD , AR , BS , BD , P , RA , AS , NB , CA , CG , CP , ApD , ApV , GS , DC , DP , DD , CS > ,
555
600
prometheus_registry : Option < & prometheus:: Registry > ,
556
601
supports_parachains : SupportsParachains ,
557
602
s : S ,
558
- ) -> SubsystemResult < ( Self , Handle ) >
603
+ ) -> SubsystemResult < ( Self , OverseerHandle ) >
559
604
where
560
605
CV : Subsystem < OverseerSubsystemContext < CandidateValidationMessage > , SubsystemError > + Send ,
561
606
CB : Subsystem < OverseerSubsystemContext < CandidateBackingMessage > , SubsystemError > + Send ,
@@ -574,11 +619,15 @@ where
574
619
ApD : Subsystem < OverseerSubsystemContext < ApprovalDistributionMessage > , SubsystemError > + Send ,
575
620
ApV : Subsystem < OverseerSubsystemContext < ApprovalVotingMessage > , SubsystemError > + Send ,
576
621
GS : Subsystem < OverseerSubsystemContext < GossipSupportMessage > , SubsystemError > + Send ,
622
+ DC : Subsystem < OverseerSubsystemContext < DisputeCoordinatorMessage > , SubsystemError > + Send ,
623
+ DP : Subsystem < OverseerSubsystemContext < DisputeParticipationMessage > , SubsystemError > + Send ,
624
+ DD : Subsystem < OverseerSubsystemContext < DisputeDistributionMessage > , SubsystemError > + Send ,
625
+ CS : Subsystem < OverseerSubsystemContext < ChainSelectionMessage > , SubsystemError > + Send ,
577
626
S : SpawnNamed ,
578
627
{
579
628
let metrics: Metrics = <Metrics as MetricsTrait >:: register ( prometheus_registry) ?;
580
629
581
- let ( mut overseer, handler ) = Self :: builder ( )
630
+ let ( mut overseer, handle ) = Self :: builder ( )
582
631
. candidate_validation ( all_subsystems. candidate_validation )
583
632
. candidate_backing ( all_subsystems. candidate_backing )
584
633
. statement_distribution ( all_subsystems. statement_distribution )
@@ -596,6 +645,10 @@ where
596
645
. approval_distribution ( all_subsystems. approval_distribution )
597
646
. approval_voting ( all_subsystems. approval_voting )
598
647
. gossip_support ( all_subsystems. gossip_support )
648
+ . dispute_coordinator ( all_subsystems. dispute_coordinator )
649
+ . dispute_participation ( all_subsystems. dispute_participation )
650
+ . dispute_distribution ( all_subsystems. dispute_distribution )
651
+ . chain_selection ( all_subsystems. chain_selection )
599
652
. leaves ( Vec :: from_iter (
600
653
leaves. into_iter ( ) . map ( |BlockInfo { hash, parent_hash : _, number } | ( hash, number) )
601
654
) )
@@ -647,7 +700,7 @@ where
647
700
overseer. spawner ( ) . spawn ( "metrics_metronome" , Box :: pin ( metronome) ) ;
648
701
}
649
702
650
- Ok ( ( overseer, Handle ( handler ) ) )
703
+ Ok ( ( overseer, handle ) )
651
704
}
652
705
653
706
/// Stop the overseer.
0 commit comments