Skip to content

Commit cdb62ab

Browse files
committed
Make sure on a miss from a starting sequence that if no other msgs exists we avoid loading and blocks.
Signed-off-by: Derek Collison <[email protected]>
1 parent 72e0637 commit cdb62ab

File tree

2 files changed

+101
-10
lines changed

2 files changed

+101
-10
lines changed

server/filestore.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2596,28 +2596,38 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
25962596

25972597
// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
25982598
// Will return -1 if no matches at all.
2599-
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) int {
2600-
start := uint32(math.MaxUint32)
2599+
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
2600+
start, stop := uint32(math.MaxUint32), uint32(0)
26012601
if wc {
26022602
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
26032603
if psi.fblk < start {
26042604
start = psi.fblk
26052605
}
2606+
if psi.lblk > stop {
2607+
stop = psi.lblk
2608+
}
26062609
})
26072610
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
2608-
start = psi.fblk
2611+
start, stop = psi.fblk, psi.lblk
26092612
}
26102613
// Nothing found.
26112614
if start == uint32(math.MaxUint32) {
2612-
return -1
2615+
return -1, -1
26132616
}
2614-
// Here we need to translate this to index into fs.blks.
2617+
// Here we need to translate this to index into fs.blks properly.
26152618
mb := fs.bim[start]
26162619
if mb == nil {
2617-
return -1
2620+
return -1, -1
26182621
}
2619-
bi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
2620-
return bi
2622+
fi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
2623+
2624+
mb = fs.bim[stop]
2625+
if mb == nil {
2626+
return -1, -1
2627+
}
2628+
li, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
2629+
2630+
return fi, li
26212631
}
26222632

26232633
// Optimized way for getting all num pending matching a filter subject.
@@ -6475,9 +6485,9 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
64756485
// Similar to above if start <= first seq.
64766486
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
64776487
if i == bi {
6478-
nbi := fs.checkSkipFirstBlock(filter, wc)
6488+
nbi, lbi := fs.checkSkipFirstBlock(filter, wc)
64796489
// Nothing available.
6480-
if nbi < 0 {
6490+
if nbi < 0 || lbi <= bi {
64816491
return nil, fs.state.LastSeq, ErrStoreEOF
64826492
}
64836493
// See if we can jump ahead here.

server/filestore_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7263,6 +7263,58 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
72637263
require_Equal(t, psi.lblk, 4)
72647264
}
72657265

7266+
func TestFileStoreLargeSparseMsgsDoNotLoadAfterLast(t *testing.T) {
7267+
sd := t.TempDir()
7268+
fs, err := newFileStore(
7269+
FileStoreConfig{StoreDir: sd, BlockSize: 128},
7270+
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
7271+
require_NoError(t, err)
7272+
defer fs.Stop()
7273+
7274+
msg := []byte("hello")
7275+
// Create 2 blocks with each, each block holds 2 msgs
7276+
for i := 0; i < 2; i++ {
7277+
fs.StoreMsg("foo.22.bar", nil, msg)
7278+
fs.StoreMsg("foo.22.baz", nil, msg)
7279+
}
7280+
// Now create 8 more blocks with just baz. So no matches for these 8 blocks
7281+
// for "foo.22.bar".
7282+
for i := 0; i < 8; i++ {
7283+
fs.StoreMsg("foo.22.baz", nil, msg)
7284+
fs.StoreMsg("foo.22.baz", nil, msg)
7285+
}
7286+
require_Equal(t, fs.numMsgBlocks(), 10)
7287+
7288+
// Remove all blk cache and fss.
7289+
fs.mu.RLock()
7290+
for _, mb := range fs.blks {
7291+
mb.mu.Lock()
7292+
mb.fss, mb.cache = nil, nil
7293+
mb.mu.Unlock()
7294+
}
7295+
fs.mu.RUnlock()
7296+
7297+
// "foo.22.bar" is at sequence 1 and 3.
7298+
// Make sure if we do a LoadNextMsg() starting at 4 that we do not load
7299+
// all the tail blocks.
7300+
_, _, err = fs.LoadNextMsg("foo.*.bar", true, 4, nil)
7301+
require_Error(t, err, ErrStoreEOF)
7302+
7303+
// Now make sure we did not load fss and cache.
7304+
var loaded int
7305+
fs.mu.RLock()
7306+
for _, mb := range fs.blks {
7307+
mb.mu.RLock()
7308+
if mb.cache != nil || mb.fss != nil {
7309+
loaded++
7310+
}
7311+
mb.mu.RUnlock()
7312+
}
7313+
fs.mu.RUnlock()
7314+
// We will load first block for starting seq 4, but no others should have loaded.
7315+
require_Equal(t, loaded, 1)
7316+
}
7317+
72667318
///////////////////////////////////////////////////////////////////////////
72677319
// Benchmarks
72687320
///////////////////////////////////////////////////////////////////////////
@@ -7520,3 +7572,32 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
75207572
require_NoError(b, err)
75217573
}
75227574
}
7575+
7576+
func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) {
7577+
fs, err := newFileStore(
7578+
FileStoreConfig{StoreDir: b.TempDir()},
7579+
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
7580+
require_NoError(b, err)
7581+
defer fs.Stop()
7582+
7583+
// Small om purpose.
7584+
msg := []byte("ok")
7585+
7586+
// Make first msg one that would match as well.
7587+
fs.StoreMsg("foo.1.baz", nil, msg)
7588+
// Add in a bunch of msgs.
7589+
// We need to make sure we have a range of subjects that could kick in a linear scan.
7590+
for i := 0; i < 1_000_000; i++ {
7591+
subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2)
7592+
fs.StoreMsg(subj, nil, msg)
7593+
}
7594+
7595+
b.ResetTimer()
7596+
7597+
var smv StoreMsg
7598+
for i := 0; i < b.N; i++ {
7599+
// Make sure not first seq.
7600+
_, _, err := fs.LoadNextMsg("foo.*.baz", true, 2, &smv)
7601+
require_Error(b, err, ErrStoreEOF)
7602+
}
7603+
}

0 commit comments

Comments
 (0)