1
- use super :: super :: { Address , Event , Index , Instruction , Message , Request , Status } ;
1
+ use super :: super :: { Address , Event , Index , Instruction , Message , Request , RequestID , Status } ;
2
2
use super :: { Follower , Node , NodeID , RawNode , Role , Term , Ticks , HEARTBEAT_INTERVAL } ;
3
- use crate :: error:: Result ;
3
+ use crate :: error:: { Error , Result } ;
4
4
5
5
use :: log:: { debug, info} ;
6
6
use std:: collections:: { HashMap , HashSet } ;
@@ -14,11 +14,30 @@ struct Progress {
14
14
last : Index ,
15
15
}
16
16
17
+ /// A pending client write request.
18
+ #[ derive( Clone , Debug , PartialEq ) ]
19
+ struct Write {
20
+ /// The client or node which submitted the write.
21
+ from : Address ,
22
+ /// The write request ID.
23
+ id : RequestID ,
24
+ }
25
+
17
26
// A leader serves requests and replicates the log to followers.
18
27
#[ derive( Clone , Debug , PartialEq ) ]
19
28
pub struct Leader {
20
29
/// Peer replication progress.
21
30
progress : HashMap < NodeID , Progress > ,
31
+ /// Keeps track of pending write requests, keyed by log index. These are
32
+ /// added when the write is proposed and appended to the leader's log, and
33
+ /// removed when the command is applied to the state machine, sending the
34
+ /// command result to the waiting client.
35
+ ///
36
+ /// If the leader loses leadership, all pending write requests are aborted
37
+ /// by returning Error::Abort.
38
+ ///
39
+ /// TODO: Actually return responses when applied.
40
+ writes : HashMap < Index , Write > ,
22
41
/// Number of ticks since last periodic heartbeat.
23
42
since_heartbeat : Ticks ,
24
43
}
@@ -28,7 +47,7 @@ impl Leader {
28
47
pub fn new ( peers : HashSet < NodeID > , last_index : Index ) -> Self {
29
48
let next = last_index + 1 ;
30
49
let progress = peers. into_iter ( ) . map ( |p| ( p, Progress { next, last : 0 } ) ) . collect ( ) ;
31
- Self { progress, since_heartbeat : 0 }
50
+ Self { progress, writes : HashMap :: new ( ) , since_heartbeat : 0 }
32
51
}
33
52
}
34
53
@@ -53,9 +72,18 @@ impl RawNode<Leader> {
53
72
assert ! ( term > self . term, "Can only become follower in later term" ) ;
54
73
55
74
info ! ( "Discovered new term {}" , term) ;
75
+
76
+ // Cancel in-flight requests.
77
+ self . state_tx . send ( Instruction :: Abort ) ?;
78
+ for write in std:: mem:: take ( & mut self . role . writes ) . into_values ( ) {
79
+ self . send (
80
+ write. from ,
81
+ Event :: ClientResponse { id : write. id , response : Err ( Error :: Abort ) } ,
82
+ ) ?;
83
+ }
84
+
56
85
self . term = term;
57
86
self . log . set_term ( term, None ) ?;
58
- self . state_tx . send ( Instruction :: Abort ) ?;
59
87
Ok ( self . into_role ( Follower :: new ( None , None ) ) )
60
88
}
61
89
@@ -148,8 +176,11 @@ impl RawNode<Leader> {
148
176
self . heartbeat ( ) ?;
149
177
}
150
178
179
+ // A client submitted a write command. Propose it, and track it
180
+ // until it's applied and the response is returned to the client.
151
181
Event :: ClientRequest { id, request : Request :: Mutate ( command) } => {
152
182
let index = self . propose ( Some ( command) ) ?;
183
+ self . role . writes . insert ( index, Write { from : msg. from , id : id. clone ( ) } ) ;
153
184
self . state_tx . send ( Instruction :: Notify { id, address : msg. from , index } ) ?;
154
185
if self . peers . is_empty ( ) {
155
186
self . maybe_commit ( ) ?;
@@ -264,6 +295,8 @@ impl RawNode<Leader> {
264
295
// TODO: Move application elsewhere, but needs access to applied index.
265
296
let mut scan = self . log . scan ( ( prev_commit_index + 1 ) ..=commit_index) ?;
266
297
while let Some ( entry) = scan. next ( ) . transpose ( ) ? {
298
+ // TODO: Send response.
299
+ self . role . writes . remove ( & entry. index ) ;
267
300
self . state_tx . send ( Instruction :: Apply { entry } ) ?;
268
301
}
269
302
}
0 commit comments