Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Fixes for filestore FSS state #5616

Merged
merged 7 commits into from
Jul 7, 2024
33 changes: 27 additions & 6 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2318,11 +2318,15 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor

if filter == _EMPTY_ {
filter = fwcs
wc = true
}

// If we only have 1 subject currently and it matches our filter we can also set isAll.
if !isAll && mb.fss.Size() == 1 {
_, isAll = mb.fss.Find(stringToBytes(filter))
// Since mb.fss.Find won't work if filter is a wildcard, need to use Match instead.
mb.fss.Match(stringToBytes(filter), func(subject []byte, _ *SimpleState) {
isAll = true
})
}
// Make sure to start at mb.first.seq if fseq < mb.first.seq
if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq {
Expand Down Expand Up @@ -2461,6 +2465,7 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (

if filter == _EMPTY_ {
filter = fwcs
wc = true
}

update := func(ss *SimpleState) {
Expand Down Expand Up @@ -2494,6 +2499,10 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (

var havePartial bool
mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) {
if havePartial {
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
}
Expand Down Expand Up @@ -2752,7 +2761,12 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
if !ok {
return nil
}
start, stop = fs.bim[info.fblk], fs.bim[info.lblk]
if f := fs.bim[info.fblk]; f != nil {
start = f
}
if l := fs.bim[info.lblk]; l != nil {
stop = l
}
}

// Aggregate fss.
Expand Down Expand Up @@ -2945,9 +2959,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)

// See if filter was provided but its the only subject.
if !isAll && !wc && fs.psim.Size() == 1 {
if _, ok := fs.psim.Find(stringToBytes(filter)); ok {
// Since fs.psim.Find won't work if filter is a wildcard, need to use Match instead.
fs.psim.Match(stringToBytes(filter), func(subject []byte, _ *psi) {
isAll = true
}
})
}
if isAll && filter == _EMPTY_ {
filter = fwcs
Expand Down Expand Up @@ -3073,6 +3088,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)

var havePartial bool
mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) {
if havePartial {
// If we already found a partial then don't do anything else.
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
Expand Down Expand Up @@ -3824,11 +3843,12 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {

// collect all that are not correct.
needAttention := make(map[string]*psi)
fs.psim.Match([]byte(fwcs), func(subj []byte, psi *psi) {
fs.psim.Iter(func(subj []byte, psi *psi) bool {
numMsgs += psi.total
if psi.total > maxMsgsPer {
needAttention[string(subj)] = psi
}
return true
})

// We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught.
Expand All @@ -3848,10 +3868,11 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
fs.rebuildStateLocked(nil)
// Need to redo blocks that need attention.
needAttention = make(map[string]*psi)
fs.psim.Match([]byte(fwcs), func(subj []byte, psi *psi) {
fs.psim.Iter(func(subj []byte, psi *psi) bool {
if psi.total > maxMsgsPer {
needAttention[string(subj)] = psi
}
return true
})
}

Expand Down
5 changes: 4 additions & 1 deletion server/stree/stree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ func TestSubjectTreeBasics(t *testing.T) {
require_True(t, old == nil)
require_False(t, updated)
require_Equal(t, st.Size(), 1)
// Find with single leaf.
// Find shouldn't work with a wildcard.
_, found := st.Find(b("foo.bar.*"))
require_False(t, found)
// But it should with a literal. Find with single leaf.
v, found := st.Find(b("foo.bar.baz"))
require_True(t, found)
require_Equal(t, *v, 22)
Expand Down