@@ -15,6 +15,7 @@ use crate::types::{
15
15
ConsensusBlock ,
16
16
ConsensusContext ,
17
17
ConsensusError ,
18
+ Decision ,
18
19
ProposalInit ,
19
20
Round ,
20
21
ValidatorId ,
@@ -26,10 +27,7 @@ const ROUND_ZERO: Round = 0;
26
27
/// call to `start`, which is relevant if we are the proposer for this height's first round.
27
28
/// SingleHeightConsensus receives messages directly as parameters to function calls. It can send
28
29
/// out messages "directly" to the network, and returning a decision to the caller.
29
- pub ( crate ) struct SingleHeightConsensus < BlockT >
30
- where
31
- BlockT : ConsensusBlock ,
32
- {
30
+ pub ( crate ) struct SingleHeightConsensus < BlockT : ConsensusBlock > {
33
31
height : BlockNumber ,
34
32
context : Arc < dyn ConsensusContext < Block = BlockT > > ,
35
33
validators : Vec < ValidatorId > ,
40
38
precommits : HashMap < ( Round , ValidatorId ) , Vote > ,
41
39
}
42
40
43
- impl < BlockT > SingleHeightConsensus < BlockT >
44
- where
45
- BlockT : ConsensusBlock ,
46
- {
41
+ impl < BlockT : ConsensusBlock > SingleHeightConsensus < BlockT > {
47
42
pub ( crate ) async fn new (
48
43
height : BlockNumber ,
49
44
context : Arc < dyn ConsensusContext < Block = BlockT > > ,
65
60
}
66
61
67
62
#[ instrument( skip( self ) , fields( height=self . height. 0 ) , level = "debug" ) ]
68
- pub ( crate ) async fn start ( & mut self ) -> Result < Option < BlockT > , ConsensusError > {
63
+ pub ( crate ) async fn start ( & mut self ) -> Result < Option < Decision < BlockT > > , ConsensusError > {
69
64
info ! ( "Starting consensus with validators {:?}" , self . validators) ;
70
65
let events = self . state_machine . start ( ) ;
71
66
self . handle_state_machine_events ( events) . await
83
78
init : ProposalInit ,
84
79
p2p_messages_receiver : mpsc:: Receiver < <BlockT as ConsensusBlock >:: ProposalChunk > ,
85
80
fin_receiver : oneshot:: Receiver < BlockHash > ,
86
- ) -> Result < Option < BlockT > , ConsensusError > {
81
+ ) -> Result < Option < Decision < BlockT > > , ConsensusError > {
87
82
debug ! (
88
83
"Received proposal: proposal_height={}, proposer={:?}" ,
89
84
init. height. 0 , init. proposer
@@ -137,7 +132,7 @@ where
137
132
pub ( crate ) async fn handle_message (
138
133
& mut self ,
139
134
message : ConsensusMessage ,
140
- ) -> Result < Option < BlockT > , ConsensusError > {
135
+ ) -> Result < Option < Decision < BlockT > > , ConsensusError > {
141
136
debug ! ( "Received message: {:?}" , message) ;
142
137
match message {
143
138
ConsensusMessage :: Proposal ( _) => {
@@ -148,7 +143,10 @@ where
148
143
}
149
144
150
145
#[ instrument( skip_all) ]
151
- async fn handle_vote ( & mut self , vote : Vote ) -> Result < Option < BlockT > , ConsensusError > {
146
+ async fn handle_vote (
147
+ & mut self ,
148
+ vote : Vote ,
149
+ ) -> Result < Option < Decision < BlockT > > , ConsensusError > {
152
150
let ( votes, sm_vote) = match vote. vote_type {
153
151
VoteType :: Prevote => {
154
152
( & mut self . prevotes , StateMachineEvent :: Prevote ( vote. block_hash , ROUND_ZERO ) )
@@ -180,7 +178,7 @@ where
180
178
async fn handle_state_machine_events (
181
179
& mut self ,
182
180
mut events : VecDeque < StateMachineEvent > ,
183
- ) -> Result < Option < BlockT > , ConsensusError > {
181
+ ) -> Result < Option < Decision < BlockT > > , ConsensusError > {
184
182
while let Some ( event) = events. pop_front ( ) {
185
183
trace ! ( "Handling event: {:?}" , event) ;
186
184
match event {
@@ -194,16 +192,7 @@ where
194
192
// sent this out when responding to a StartRound.
195
193
}
196
194
StateMachineEvent :: Decision ( block_hash, round) => {
197
- let block = self
198
- . proposals
199
- . remove ( & round)
200
- . expect ( "StateMachine arrived at an unknown decision" ) ;
201
- assert_eq ! (
202
- block. id( ) ,
203
- block_hash,
204
- "StateMachine block hash should match the stored block"
205
- ) ;
206
- return Ok ( Some ( block) ) ;
195
+ return self . handle_state_machine_decision ( block_hash, round) . await ;
207
196
}
208
197
StateMachineEvent :: Prevote ( block_hash, round) => {
209
198
self . handle_state_machine_vote ( block_hash, round, VoteType :: Prevote ) . await ?;
@@ -255,12 +244,13 @@ where
255
244
self . state_machine . handle_event ( StateMachineEvent :: StartRound ( Some ( id) , round) )
256
245
}
257
246
247
+ #[ instrument( skip_all) ]
258
248
async fn handle_state_machine_vote (
259
249
& mut self ,
260
250
block_hash : BlockHash ,
261
251
round : Round ,
262
252
vote_type : VoteType ,
263
- ) -> Result < Option < BlockT > , ConsensusError > {
253
+ ) -> Result < Option < Decision < BlockT > > , ConsensusError > {
264
254
let votes = match vote_type {
265
255
VoteType :: Prevote => & mut self . prevotes ,
266
256
VoteType :: Precommit => & mut self . precommits ,
@@ -273,4 +263,29 @@ where
273
263
self . context . broadcast ( ConsensusMessage :: Vote ( vote) ) . await ?;
274
264
Ok ( None )
275
265
}
266
+
267
+ #[ instrument( skip_all) ]
268
+ async fn handle_state_machine_decision (
269
+ & mut self ,
270
+ block_hash : BlockHash ,
271
+ round : Round ,
272
+ ) -> Result < Option < Decision < BlockT > > , ConsensusError > {
273
+ let block =
274
+ self . proposals . remove ( & round) . expect ( "StateMachine arrived at an unknown decision" ) ;
275
+ assert_eq ! ( block. id( ) , block_hash, "StateMachine block hash should match the stored block" ) ;
276
+ let supporting_precommits: Vec < Vote > = self
277
+ . validators
278
+ . iter ( )
279
+ . filter_map ( |v| {
280
+ let vote = self . precommits . get ( & ( round, * v) ) ?;
281
+ if vote. block_hash != block_hash {
282
+ return None ;
283
+ }
284
+ Some ( vote. clone ( ) )
285
+ } )
286
+ . collect ( ) ;
287
+ // TODO(matan): Check actual weights.
288
+ assert ! ( supporting_precommits. len( ) >= self . state_machine. quorum_size( ) as usize ) ;
289
+ Ok ( Some ( Decision { precommits : supporting_precommits, block } ) )
290
+ }
276
291
}
0 commit comments