Skip to content

Commit

Permalink
[FIXED] Cleanup dmap on Compact/Truncate (#6515)
Browse files Browse the repository at this point in the history
In `Compact` and `Truncate` the interior delete map would not always be
cleaned up properly. This could result in memory leaks. Also, if the
filestore would restore based on `index.db` it would invalidate it due
to having too many deletes and not being able to reconstruct the state.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Feb 17, 2025
2 parents 6fd6415 + e76e8cc commit 6fca6c6
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 3 deletions.
2 changes: 1 addition & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8002,7 +8002,7 @@ func (fs *fileStore) compact(seq uint64, noMarkers bool) (uint64, error) {
if err == errDeletedMsg {
// Update dmap.
if !smb.dmap.IsEmpty() {
smb.dmap.Delete(seq)
smb.dmap.Delete(mseq)
}
} else if sm != nil {
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
Expand Down
10 changes: 8 additions & 2 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,8 @@ func (ms *memStore) compact(seq uint64, noMarkers bool) (uint64, error) {
ms.removeSeqPerSubject(sm.subj, seq, !noMarkers && ms.cfg.SubjectDeleteMarkerTTL > 0)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)
} else if !ms.dmap.IsEmpty() {
ms.dmap.Delete(seq)
}
}
if purged > ms.state.Msgs {
Expand Down Expand Up @@ -1244,9 +1246,10 @@ func (ms *memStore) compact(seq uint64, noMarkers bool) (uint64, error) {
return true
})
}
// Reset msgs and fss.
// Reset msgs, fss and dmap.
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.dmap.Empty()
}
// Subject delete markers if needed.
sdmcb := ms.subjectDeleteMarkersAfterOperation(JSMarkerReasonPurge)
Expand Down Expand Up @@ -1283,9 +1286,10 @@ func (ms *memStore) reset() error {
// Update msgs and bytes.
ms.state.Msgs = 0
ms.state.Bytes = 0
// Reset msgs and fss.
// Reset msgs, fss and dmap.
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.dmap.Empty()

ms.mu.Unlock()

Expand Down Expand Up @@ -1319,6 +1323,8 @@ func (ms *memStore) Truncate(seq uint64) error {
ms.removeSeqPerSubject(sm.subj, i, false)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, i)
} else if !ms.dmap.IsEmpty() {
ms.dmap.Delete(i)
}
}
// Reset last.
Expand Down
102 changes: 102 additions & 0 deletions server/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,105 @@ func TestStoreMaxMsgsPerUpdateBug(t *testing.T) {
},
)
}

func TestStoreCompactCleansUpDmap(t *testing.T) {
config := func() StreamConfig {
return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0}
}
for cseq := uint64(2); cseq <= 4; cseq++ {
t.Run(fmt.Sprintf("Compact(%d)", cseq), func(t *testing.T) {
testAllStoreAllPermutations(
t, false, config(),
func(t *testing.T, fs StreamStore) {
dmapEntries := func() int {
if fss, ok := fs.(*fileStore); ok {
return fss.dmapEntries()
} else if mss, ok := fs.(*memStore); ok {
mss.mu.RLock()
defer mss.mu.RUnlock()
return mss.dmap.Size()
} else {
return 0
}
}

// Publish messages, should have no interior deletes.
for i := 0; i < 3; i++ {
_, _, err := fs.StoreMsg("foo", nil, nil, 0)
require_NoError(t, err)
}
require_Len(t, dmapEntries(), 0)

// Removing one message in the middle should be an interior delete.
_, err := fs.RemoveMsg(2)
require_NoError(t, err)
require_Len(t, dmapEntries(), 1)

// Compacting must always clean up the interior delete.
_, err = fs.Compact(cseq)
require_NoError(t, err)
require_Len(t, dmapEntries(), 0)

// Validate first/last sequence.
state := fs.State()
fseq := uint64(3)
if fseq < cseq {
fseq = cseq
}
require_Equal(t, state.FirstSeq, fseq)
require_Equal(t, state.LastSeq, 3)
})
})
}
}

func TestStoreTruncateCleansUpDmap(t *testing.T) {
config := func() StreamConfig {
return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0}
}
for tseq := uint64(0); tseq <= 1; tseq++ {
t.Run(fmt.Sprintf("Truncate(%d)", tseq), func(t *testing.T) {
testAllStoreAllPermutations(
t, false, config(),
func(t *testing.T, fs StreamStore) {
dmapEntries := func() int {
if fss, ok := fs.(*fileStore); ok {
return fss.dmapEntries()
} else if mss, ok := fs.(*memStore); ok {
mss.mu.RLock()
defer mss.mu.RUnlock()
return mss.dmap.Size()
} else {
return 0
}
}

// Publish messages, should have no interior deletes.
for i := 0; i < 3; i++ {
_, _, err := fs.StoreMsg("foo", nil, nil, 0)
require_NoError(t, err)
}
require_Len(t, dmapEntries(), 0)

// Removing one message in the middle should be an interior delete.
_, err := fs.RemoveMsg(2)
require_NoError(t, err)
require_Len(t, dmapEntries(), 1)

// Truncating must always clean up the interior delete.
err = fs.Truncate(tseq)
require_NoError(t, err)
require_Len(t, dmapEntries(), 0)

// Validate first/last sequence.
state := fs.State()
fseq := uint64(1)
if fseq > tseq {
fseq = tseq
}
require_Equal(t, state.FirstSeq, fseq)
require_Equal(t, state.LastSeq, tseq)
})
})
}
}

0 comments on commit 6fca6c6

Please sign in to comment.