From 5910c2900b482af8c78f1eb49e44bfe367e556ac Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 20 Dec 2023 12:39:54 -0800 Subject: [PATCH] Make sure to flush cache during PurgeEx to avoid memory bloat. Signed-off-by: Derek Collison --- server/filestore.go | 20 +++++++++++--------- server/filestore_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 50361a5e219..e4ba028be6b 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -795,7 +795,7 @@ var blkPoolSmall sync.Pool // 2MB // Get a new msg block based on sz estimate. func getMsgBlockBuf(sz int) (buf []byte) { - var pb interface{} + var pb any if sz <= defaultSmallBlockSize { pb = blkPoolSmall.Get() } else if sz <= defaultMediumBlockSize { @@ -6132,21 +6132,23 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint for i := 0; i < len(fs.blks); i++ { mb := fs.blks[i] mb.mu.Lock() - if err := mb.ensurePerSubjectInfoLoaded(); err != nil { - mb.mu.Unlock() - continue + var shouldExpire bool + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true } + t, f, l := mb.filteredPendingLocked(subject, wc, atomic.LoadUint64(&mb.first.seq)) if t == 0 { + // Expire if we were responsible for loading. + if shouldExpire { + // Expire this cache before moving on. + mb.tryForceExpireCacheLocked() + } mb.mu.Unlock() continue } - var shouldExpire bool - if mb.cacheNotLoaded() { - mb.loadMsgsWithLock() - shouldExpire = true - } if sequence > 1 && sequence <= l { l = sequence - 1 } diff --git a/server/filestore_test.go b/server/filestore_test.go index 7856562bdfc..427e137cc75 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -6286,6 +6286,38 @@ func TestFileStoreCorruptPSIMOnDisk(t *testing.T) { require_True(t, bytes.Equal(sm.msg, []byte("XYZ"))) } +func TestFileStorePurgeExBufPool(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 1024}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes + for i := 0; i < 1000; i++ { + fs.StoreMsg("foo.foo", nil, msg) + fs.StoreMsg("foo.bar", nil, msg) + } + + p, err := fs.PurgeEx("foo.bar", 1, 0) + require_NoError(t, err) + require_Equal(t, p, 1000) + + // Now make sure we do not have all of the msg blocks cache's loaded. + var loaded int + fs.mu.RLock() + for _, mb := range fs.blks { + mb.mu.RLock() + if mb.cacheAlreadyLoaded() { + loaded++ + } + mb.mu.RUnlock() + } + fs.mu.RUnlock() + require_Equal(t, loaded, 1) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks ///////////////////////////////////////////////////////////////////////////