@@ -105,26 +105,21 @@ type RaftNode interface {
105
105
// past the group membership state, so the leader returned here may not be
106
106
// known as a current group member.
107
107
LeaderLocked () roachpb.ReplicaID
108
+ // LogMarkLocked returns the current log mark of the raft log. It is not
109
+ // guaranteed to be stablestorage, unless this method is called right after
110
+ // RawNode is initialized. Processor calls this only on initialization.
111
+ LogMarkLocked () rac2.LogMark
108
112
109
- // StableIndexLocked is the (inclusive) highest index that is known to be
110
- // successfully persisted in local storage.
111
- StableIndexLocked () uint64
112
113
// NextUnstableIndexLocked returns the index of the next entry that will
113
114
// be sent to local storage. All entries < this index are either stored,
114
115
// or have been sent to storage.
115
116
//
116
117
// NB: NextUnstableIndex can regress when the node accepts appends or
117
118
// snapshots from a newer leader.
118
119
NextUnstableIndexLocked () uint64
119
- // GetAdmittedLocked returns the current value of the admitted array.
120
- GetAdmittedLocked () [raftpb .NumPriorities ]uint64
121
120
122
121
// Mutating methods.
123
122
124
- // SetAdmittedLocked sets a new value for the admitted array. It is the
125
- // caller's responsibility to ensure that it is not regressing admitted,
126
- // and it is not advancing admitted beyond the stable index.
127
- SetAdmittedLocked ([raftpb .NumPriorities ]uint64 ) raftpb.Message
128
123
// StepMsgAppRespForAdmittedLocked steps a MsgAppResp on the leader, which
129
124
// may advance its knowledge of a follower's admitted state.
130
125
StepMsgAppRespForAdmittedLocked (raftpb.Message ) error
@@ -398,11 +393,18 @@ type Processor interface {
398
393
info SideChannelInfoUsingRaftMessageRequest ,
399
394
)
400
395
396
+ // SyncedLogStorage is called when the log storage is synced, after writing a
397
+ // snapshot or log entries batch. It can be called synchronously from
398
+ // OnLogSync or OnSnapSync handlers if the write batch is blocking, or
399
+ // asynchronously from OnLogSync.
400
+ SyncedLogStorage (ctx context.Context , mark rac2.LogMark , snap bool )
401
401
// AdmittedLogEntry is called when an entry is admitted. It can be called
402
402
// synchronously from within ACWorkQueue.Admit if admission is immediate.
403
403
AdmittedLogEntry (
404
404
ctx context.Context , state EntryForAdmissionCallbackState ,
405
405
)
406
+ // AdmittedState returns the vector of admitted log indices.
407
+ AdmittedState () rac2.AdmittedVector
406
408
407
409
// AdmitForEval is called to admit work that wants to evaluate at the
408
410
// leaseholder.
@@ -417,6 +419,9 @@ type Processor interface {
417
419
type processorImpl struct {
418
420
opts ProcessorOptions
419
421
422
+ // State for tracking and advancing the log's admitted vector.
423
+ logTracker logTracker
424
+
420
425
// The fields below are accessed while holding the mutex. Lock ordering:
421
426
// Replica.raftMu < this.mu < Replica.mu.
422
427
mu struct {
@@ -432,10 +437,6 @@ type processorImpl struct {
432
437
leaderNodeID roachpb.NodeID
433
438
leaderStoreID roachpb.StoreID
434
439
leaseholderID roachpb.ReplicaID
435
- // State for advancing admitted.
436
- lastObservedStableIndex uint64
437
- scheduledAdmittedProcessing bool
438
- waitingForAdmissionState waitingForAdmissionState
439
440
// State at a follower.
440
441
follower struct {
441
442
isLeaderUsingV2Protocol bool
@@ -510,7 +511,7 @@ func (p *processorImpl) InitRaftLocked(ctx context.Context, rn RaftNode) {
510
511
log .Fatalf (ctx , "initializing RaftNode after replica is initialized" )
511
512
}
512
513
p .raftMu .raftNode = rn
513
- // TODO(pav-kv): initialize the LogTracker from RaftNode state.
514
+ p . logTracker . init ( p . raftMu . raftNode . LogMarkLocked ())
514
515
}
515
516
516
517
// OnDestroyRaftMuLocked implements Processor.
@@ -523,7 +524,6 @@ func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) {
523
524
p .closeLeaderStateRaftMuLockedProcLocked (ctx )
524
525
525
526
// Release some memory.
526
- p .mu .waitingForAdmissionState = waitingForAdmissionState {}
527
527
p .mu .follower .lowPriOverrideState = lowPriOverrideState {}
528
528
}
529
529
@@ -740,65 +740,70 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
740
740
// leader may switch to v2.
741
741
742
742
// Grab the state we need in one shot after acquiring Replica mu.
743
- var nextUnstableIndex , stableIndex uint64
743
+ var nextUnstableIndex uint64
744
744
var leaderID , leaseholderID roachpb.ReplicaID
745
- var admitted [raftpb .NumPriorities ]uint64
746
745
var myLeaderTerm uint64
747
746
func () {
748
747
p .opts .Replica .MuLock ()
749
748
defer p .opts .Replica .MuUnlock ()
750
749
nextUnstableIndex = p .raftMu .raftNode .NextUnstableIndexLocked ()
751
- stableIndex = p .raftMu .raftNode .StableIndexLocked ()
752
750
leaderID = p .raftMu .raftNode .LeaderLocked ()
753
751
leaseholderID = p .opts .Replica .LeaseholderMuLocked ()
754
- admitted = p .raftMu .raftNode .GetAdmittedLocked ()
755
752
if leaderID == p .opts .ReplicaID {
756
753
myLeaderTerm = p .raftMu .raftNode .TermLocked ()
757
754
}
758
755
}()
759
756
if len (e .Entries ) > 0 {
760
757
nextUnstableIndex = e .Entries [0 ].Index
761
758
}
762
- p .mu .lastObservedStableIndex = stableIndex
763
- p .mu .scheduledAdmittedProcessing = false
764
759
p .makeStateConsistentRaftMuLockedProcLocked (
765
760
ctx , nextUnstableIndex , leaderID , leaseholderID , myLeaderTerm )
766
761
767
762
if ! p .isLeaderUsingV2ProcLocked () {
768
763
return
769
764
}
770
- // If there was a recent MsgStoreAppendResp that triggered this Ready
771
- // processing, it has already been stepped, so the stable index would have
772
- // advanced. So this is an opportune place to do Admitted processing.
773
- nextAdmitted := p .mu .waitingForAdmissionState .computeAdmitted (stableIndex )
774
- if admittedIncreased (admitted , nextAdmitted ) {
775
- p .opts .Replica .MuLock ()
776
- msgResp := p .raftMu .raftNode .SetAdmittedLocked (nextAdmitted )
777
- p .opts .Replica .MuUnlock ()
778
- if p .mu .leader .rc == nil && p .mu .leaderNodeID != 0 {
779
- // Follower, and know leaderNodeID, leaderStoreID.
780
- // TODO(pav-kv): populate the message correctly.
765
+
766
+ // Piggyback admitted index advancement, if any, to the message stream going
767
+ // to the leader node, if we are not the leader. At the leader node, the
768
+ // admitted vector is read directly from the log tracker.
769
+ if p .mu .leader .rc == nil && p .mu .leaderNodeID != 0 {
770
+ // TODO(pav-kv): must make sure the leader term is the same.
771
+ if admitted , dirty := p .logTracker .admitted (true /* sched */ ); dirty {
781
772
p .opts .AdmittedPiggybacker .Add (p .mu .leaderNodeID , kvflowcontrolpb.PiggybackedAdmittedState {
782
773
RangeID : p .opts .RangeID ,
783
774
ToStoreID : p .mu .leaderStoreID ,
784
775
FromReplicaID : p .opts .ReplicaID ,
785
- ToReplicaID : roachpb .ReplicaID (msgResp .To ),
786
- Admitted : kvflowcontrolpb.AdmittedState {},
787
- })
776
+ ToReplicaID : p .mu .leaderID ,
777
+ Admitted : kvflowcontrolpb.AdmittedState {
778
+ Term : admitted .Term ,
779
+ Admitted : admitted .Admitted [:],
780
+ }})
788
781
}
789
- // Else if the local replica is the leader, we have already told it
790
- // about the update by calling SetAdmittedLocked. If the leader is not
791
- // known, we simply drop the message.
792
782
}
783
+
793
784
if p .mu .leader .rc != nil {
794
785
if err := p .mu .leader .rc .HandleRaftEventRaftMuLocked (ctx , e ); err != nil {
795
786
log .Errorf (ctx , "error handling raft event: %v" , err )
796
787
}
797
788
}
798
789
}
799
790
791
+ func (p * processorImpl ) registerLogAppend (ctx context.Context , e rac2.RaftEvent ) {
792
+ if len (e .Entries ) == 0 {
793
+ return
794
+ }
795
+ after := e .Entries [0 ].Index - 1
796
+ to := rac2.LogMark {Term : e .Term , Index : e .Entries [len (e .Entries )- 1 ].Index }
797
+ p .logTracker .append (ctx , after , to )
798
+ }
799
+
800
800
// AdmitRaftEntriesRaftMuLocked implements Processor.
801
801
func (p * processorImpl ) AdmitRaftEntriesRaftMuLocked (ctx context.Context , e rac2.RaftEvent ) bool {
802
+ // Register all log appends without exception. If the replica is being
803
+ // destroyed, this should be a no-op, but there is no harm in registering the
804
+ // write just in case.
805
+ p .registerLogAppend (ctx , e )
806
+
802
807
// Return false only if we're not destroyed and not using V2.
803
808
if destroyed , usingV2 := func () (bool , bool ) {
804
809
p .mu .Lock ()
@@ -807,6 +812,8 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
807
812
}(); destroyed || ! usingV2 {
808
813
return destroyed
809
814
}
815
+ // NB: destroyed and "using v2" status does not change below since we are
816
+ // holding raftMu.
810
817
811
818
for _ , entry := range e .Entries {
812
819
typ , priBits , err := raftlog .EncodingOf (entry )
@@ -833,7 +840,6 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
833
840
p .mu .Lock ()
834
841
defer p .mu .Unlock ()
835
842
raftPri = p .mu .follower .lowPriOverrideState .getEffectivePriority (entry .Index , raftPri )
836
- p .mu .waitingForAdmissionState .add (mark .Term , mark .Index , raftPri )
837
843
}()
838
844
} else {
839
845
raftPri = raftpb .LowPri
@@ -843,19 +849,16 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
843
849
"do not use RACv1 for pri %s, which is regular work" ,
844
850
admissionpb .WorkPriority (meta .AdmissionPriority ))
845
851
}
846
- func () {
847
- p .mu .Lock ()
848
- defer p .mu .Unlock ()
849
- p .mu .waitingForAdmissionState .add (mark .Term , mark .Index , raftPri )
850
- }()
851
852
}
852
- admissionPri := rac2 .RaftToAdmissionPriority (raftPri )
853
- // NB: cannot hold mu when calling Admit since the callback may
854
- // execute from inside Admit, when the entry is immediately admitted.
853
+ // Register all entries subject to AC with the log tracker.
854
+ p .logTracker .register (ctx , mark , raftPri )
855
+
856
+ // NB: cannot hold mu when calling Admit since the callback may execute from
857
+ // inside Admit, when the entry is immediately admitted.
855
858
submitted := p .opts .ACWorkQueue .Admit (ctx , EntryForAdmission {
856
859
StoreID : p .opts .StoreID ,
857
860
TenantID : p .raftMu .tenantID ,
858
- Priority : admissionPri ,
861
+ Priority : rac2 . RaftToAdmissionPriority ( raftPri ) ,
859
862
CreateTime : meta .AdmissionCreateTime ,
860
863
RequestedCount : int64 (len (entry .Data )),
861
864
Ingested : typ .IsSideloaded (),
@@ -866,13 +869,14 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
866
869
Priority : raftPri ,
867
870
},
868
871
})
872
+ // Failure is very rare and likely does not happen, e.g. store is not found.
873
+ // TODO(pav-kv): audit failure scenarios and minimize/eliminate them.
869
874
if ! submitted {
870
- // Very rare. e.g. store was not found.
871
- func () {
872
- p .mu .Lock ()
873
- defer p .mu .Unlock ()
874
- p .mu .waitingForAdmissionState .remove (mark .Term , mark .Index , raftPri )
875
- }()
875
+ // NB: this also admits all previously registered entries.
876
+ // TODO(pav-kv): consider not registering this entry in the first place,
877
+ // instead of falsely admitting a prefix of the log. We don't want false
878
+ // admissions to reach the leader.
879
+ p .logTracker .logAdmitted (ctx , mark , raftPri )
876
880
}
877
881
}
878
882
return true
@@ -941,30 +945,34 @@ func (p *processorImpl) SideChannelForPriorityOverrideAtFollowerRaftMuLocked(
941
945
}
942
946
}
943
947
948
+ // SyncedLogStorage implements Processor.
949
+ func (p * processorImpl ) SyncedLogStorage (ctx context.Context , mark rac2.LogMark , snap bool ) {
950
+ if snap {
951
+ p .logTracker .snapSynced (ctx , mark )
952
+ } else {
953
+ p .logTracker .logSynced (ctx , mark )
954
+ }
955
+ // NB: storage syncs will trigger raft Ready processing, so we don't need to
956
+ // explicitly schedule it here like in AdmittedLogEntry.
957
+ }
958
+
944
959
// AdmittedLogEntry implements Processor.
945
960
func (p * processorImpl ) AdmittedLogEntry (
946
961
ctx context.Context , state EntryForAdmissionCallbackState ,
947
962
) {
948
- p .mu .Lock ()
949
- defer p .mu .Unlock ()
950
- if p .mu .destroyed {
951
- return
952
- }
953
- admittedMayAdvance :=
954
- p .mu .waitingForAdmissionState .remove (state .Mark .Term , state .Mark .Index , state .Priority )
955
- if ! admittedMayAdvance || state .Mark .Index > p .mu .lastObservedStableIndex ||
956
- ! p .isLeaderUsingV2ProcLocked () {
957
- return
958
- }
959
- // The lastObservedStableIndex has moved at or ahead of state.Index. This
960
- // will happen when admission is not immediate. In this case we need to
961
- // schedule processing.
962
- if ! p .mu .scheduledAdmittedProcessing {
963
- p .mu .scheduledAdmittedProcessing = true
963
+ if p .logTracker .logAdmitted (ctx , state .Mark , state .Priority ) {
964
+ // Schedule a raft Ready cycle that will send the updated admitted vector to
965
+ // the leader via AdmittedPiggybacker if it hasn't been sent by then.
964
966
p .opts .RaftScheduler .EnqueueRaftReady (p .opts .RangeID )
965
967
}
966
968
}
967
969
970
+ // AdmittedState implements Processor.
971
+ func (p * processorImpl ) AdmittedState () rac2.AdmittedVector {
972
+ admitted , _ := p .logTracker .admitted (false /* sched */ )
973
+ return admitted
974
+ }
975
+
968
976
// AdmitForEval implements Processor.
969
977
func (p * processorImpl ) AdmitForEval (
970
978
ctx context.Context , pri admissionpb.WorkPriority , ct time.Time ,
@@ -991,15 +999,6 @@ func (p *processorImpl) AdmitForEval(
991
999
return p .mu .leader .rc .WaitForEval (ctx , pri )
992
1000
}
993
1001
994
- func admittedIncreased (prev , next [raftpb .NumPriorities ]uint64 ) bool {
995
- for i := range prev {
996
- if prev [i ] < next [i ] {
997
- return true
998
- }
999
- }
1000
- return false
1001
- }
1002
-
1003
1002
// GetAdmitted implements rac2.AdmittedTracker.
1004
1003
func (p * processorImpl ) GetAdmitted (replicaID roachpb.ReplicaID ) rac2.AdmittedVector {
1005
1004
// TODO(pav-kv): implement
0 commit comments