Skip to content

Commit

Permalink
Make sure to flush cache during PurgeEx to avoid memory bloat.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Dec 20, 2023
1 parent 3d8a017 commit 5910c29
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
20 changes: 11 additions & 9 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ var blkPoolSmall sync.Pool // 2MB

// Get a new msg block based on sz estimate.
func getMsgBlockBuf(sz int) (buf []byte) {
var pb interface{}
var pb any
if sz <= defaultSmallBlockSize {
pb = blkPoolSmall.Get()
} else if sz <= defaultMediumBlockSize {
Expand Down Expand Up @@ -6132,21 +6132,23 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
for i := 0; i < len(fs.blks); i++ {
mb := fs.blks[i]
mb.mu.Lock()
if err := mb.ensurePerSubjectInfoLoaded(); err != nil {
mb.mu.Unlock()
continue
var shouldExpire bool
if mb.cacheNotLoaded() {
mb.loadMsgsWithLock()
shouldExpire = true
}

t, f, l := mb.filteredPendingLocked(subject, wc, atomic.LoadUint64(&mb.first.seq))
if t == 0 {
// Expire if we were responsible for loading.
if shouldExpire {
// Expire this cache before moving on.
mb.tryForceExpireCacheLocked()
}
mb.mu.Unlock()
continue
}

var shouldExpire bool
if mb.cacheNotLoaded() {
mb.loadMsgsWithLock()
shouldExpire = true
}
if sequence > 1 && sequence <= l {
l = sequence - 1
}
Expand Down
32 changes: 32 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6286,6 +6286,38 @@ func TestFileStoreCorruptPSIMOnDisk(t *testing.T) {
require_True(t, bytes.Equal(sm.msg, []byte("XYZ")))
}

func TestFileStorePurgeExBufPool(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 1024},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes
for i := 0; i < 1000; i++ {
fs.StoreMsg("foo.foo", nil, msg)
fs.StoreMsg("foo.bar", nil, msg)
}

p, err := fs.PurgeEx("foo.bar", 1, 0)
require_NoError(t, err)
require_Equal(t, p, 1000)

// Now make sure we do not have all of the msg blocks cache's loaded.
var loaded int
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
if mb.cacheAlreadyLoaded() {
loaded++
}
mb.mu.RUnlock()
}
fs.mu.RUnlock()
require_Equal(t, loaded, 1)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 5910c29

Please sign in to comment.