@@ -27,6 +27,69 @@ pub mod types;
27
27
28
28
use futures:: StreamExt ;
29
29
30
+ #[ instrument( skip( context, validator_id, network_receiver, cached_messages) , level = "info" ) ]
31
+ #[ allow( missing_docs) ]
32
+ async fn run_height < BlockT : ConsensusBlock > (
33
+ context : Arc < dyn ConsensusContext < Block = BlockT > > ,
34
+ height : BlockNumber ,
35
+ validator_id : ValidatorId ,
36
+ network_receiver : & mut SubscriberReceiver < ConsensusMessage > ,
37
+ cached_messages : & mut Vec < ConsensusMessage > ,
38
+ ) -> Result < BlockT , ConsensusError >
39
+ where
40
+ ProposalWrapper :
41
+ Into < ( ProposalInit , mpsc:: Receiver < BlockT :: ProposalChunk > , oneshot:: Receiver < BlockHash > ) > ,
42
+ {
43
+ let mut shc = SingleHeightConsensus :: new ( height, context, validator_id) . await ;
44
+
45
+ if let Some ( decision) = shc. start ( ) . await ? {
46
+ return Ok ( decision) ;
47
+ }
48
+
49
+ let mut current_height_messages = Vec :: new ( ) ;
50
+ for msg in std:: mem:: take ( cached_messages) {
51
+ match height. 0 . cmp ( & msg. height ( ) ) {
52
+ std:: cmp:: Ordering :: Less => cached_messages. push ( msg) ,
53
+ std:: cmp:: Ordering :: Equal => current_height_messages. push ( msg) ,
54
+ std:: cmp:: Ordering :: Greater => { }
55
+ }
56
+ }
57
+
58
+ loop {
59
+ let message = current_height_messages. pop ( ) . unwrap_or (
60
+ // TODO(matan): Handle parsing failures and utilize ReportCallback.
61
+ network_receiver
62
+ . next ( )
63
+ . await
64
+ . expect ( "Network receiver closed unexpectedly" )
65
+ . 0
66
+ . expect ( "Failed to parse consensus message" ) ,
67
+ ) ;
68
+
69
+ if message. height ( ) != height. 0 {
70
+ debug ! ( "Received a message for a different height. {:?}" , message) ;
71
+ if message. height ( ) > height. 0 {
72
+ cached_messages. push ( message) ;
73
+ }
74
+ continue ;
75
+ }
76
+
77
+ let maybe_block = match message {
78
+ ConsensusMessage :: Proposal ( proposal) => {
79
+ // Special case due to fake streaming.
80
+ let ( proposal_init, content_receiver, fin_receiver) =
81
+ ProposalWrapper ( proposal) . into ( ) ;
82
+ shc. handle_proposal ( proposal_init, content_receiver, fin_receiver) . await ?
83
+ }
84
+ _ => shc. handle_message ( message) . await ?,
85
+ } ;
86
+
87
+ if let Some ( block) = maybe_block {
88
+ return Ok ( block) ;
89
+ }
90
+ }
91
+ }
92
+
30
93
// TODO(dvir): add test for this.
31
94
#[ instrument( skip( context, start_height, network_receiver) , level = "info" ) ]
32
95
#[ allow( missing_docs) ]
@@ -41,30 +104,16 @@ where
41
104
Into < ( ProposalInit , mpsc:: Receiver < BlockT :: ProposalChunk > , oneshot:: Receiver < BlockHash > ) > ,
42
105
{
43
106
let mut current_height = start_height;
107
+ let mut future_messages = Vec :: new ( ) ;
44
108
loop {
45
- debug ! ( "Starting consensus for height {current_height}" ) ;
46
- let mut shc =
47
- SingleHeightConsensus :: new ( current_height, context. clone ( ) , validator_id) . await ;
48
-
49
- let block = if let Some ( block) = shc. start ( ) . await ? {
50
- block
51
- } else {
52
- info ! ( "Validator flow height {current_height}" ) ;
53
- let ConsensusMessage :: Proposal ( proposal) = network_receiver
54
- . next ( )
55
- . await
56
- . expect ( "Failed to receive a message from network" )
57
- . 0
58
- . expect ( "Network receiver closed unexpectedly" )
59
- else {
60
- todo ! ( "Handle votes" ) ;
61
- } ;
62
- let ( proposal_init, content_receiver, fin_receiver) = ProposalWrapper ( proposal) . into ( ) ;
63
-
64
- shc. handle_proposal ( proposal_init, content_receiver, fin_receiver)
65
- . await ?
66
- . expect ( "Failed to handle proposal" )
67
- } ;
109
+ let block = run_height (
110
+ Arc :: clone ( & context) ,
111
+ current_height,
112
+ validator_id,
113
+ & mut network_receiver,
114
+ & mut future_messages,
115
+ )
116
+ . await ?;
68
117
69
118
info ! (
70
119
"Finished consensus for height: {current_height}. Agreed on block with id: {:x}" ,
0 commit comments