diff --git a/server/filestore.go b/server/filestore.go index d6e1bdd581..d48ee088a0 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -8002,7 +8002,7 @@ func (fs *fileStore) compact(seq uint64, noMarkers bool) (uint64, error) { if err == errDeletedMsg { // Update dmap. if !smb.dmap.IsEmpty() { - smb.dmap.Delete(seq) + smb.dmap.Delete(mseq) } } else if sm != nil { sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) diff --git a/server/memstore.go b/server/memstore.go index 9e10ee49de..821489de2d 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1217,6 +1217,8 @@ func (ms *memStore) compact(seq uint64, noMarkers bool) (uint64, error) { ms.removeSeqPerSubject(sm.subj, seq, !noMarkers && ms.cfg.SubjectDeleteMarkerTTL > 0) // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, seq) + } else if !ms.dmap.IsEmpty() { + ms.dmap.Delete(seq) } } if purged > ms.state.Msgs { @@ -1244,9 +1246,10 @@ func (ms *memStore) compact(seq uint64, noMarkers bool) (uint64, error) { return true }) } - // Reset msgs and fss. + // Reset msgs, fss and dmap. ms.msgs = make(map[uint64]*StoreMsg) ms.fss = stree.NewSubjectTree[SimpleState]() + ms.dmap.Empty() } // Subject delete markers if needed. sdmcb := ms.subjectDeleteMarkersAfterOperation(JSMarkerReasonPurge) @@ -1283,9 +1286,10 @@ func (ms *memStore) reset() error { // Update msgs and bytes. ms.state.Msgs = 0 ms.state.Bytes = 0 - // Reset msgs and fss. + // Reset msgs, fss and dmap. ms.msgs = make(map[uint64]*StoreMsg) ms.fss = stree.NewSubjectTree[SimpleState]() + ms.dmap.Empty() ms.mu.Unlock() @@ -1319,6 +1323,8 @@ func (ms *memStore) Truncate(seq uint64) error { ms.removeSeqPerSubject(sm.subj, i, false) // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, i) + } else if !ms.dmap.IsEmpty() { + ms.dmap.Delete(i) } } // Reset last. diff --git a/server/store_test.go b/server/store_test.go index bb94537f53..c62988ae02 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -301,3 +301,105 @@ func TestStoreMaxMsgsPerUpdateBug(t *testing.T) { }, ) } + +func TestStoreCompactCleansUpDmap(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0} + } + for cseq := uint64(2); cseq <= 4; cseq++ { + t.Run(fmt.Sprintf("Compact(%d)", cseq), func(t *testing.T) { + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + dmapEntries := func() int { + if fss, ok := fs.(*fileStore); ok { + return fss.dmapEntries() + } else if mss, ok := fs.(*memStore); ok { + mss.mu.RLock() + defer mss.mu.RUnlock() + return mss.dmap.Size() + } else { + return 0 + } + } + + // Publish messages, should have no interior deletes. + for i := 0; i < 3; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + require_Len(t, dmapEntries(), 0) + + // Removing one message in the middle should be an interior delete. + _, err := fs.RemoveMsg(2) + require_NoError(t, err) + require_Len(t, dmapEntries(), 1) + + // Compacting must always clean up the interior delete. + _, err = fs.Compact(cseq) + require_NoError(t, err) + require_Len(t, dmapEntries(), 0) + + // Validate first/last sequence. + state := fs.State() + fseq := uint64(3) + if fseq < cseq { + fseq = cseq + } + require_Equal(t, state.FirstSeq, fseq) + require_Equal(t, state.LastSeq, 3) + }) + }) + } +} + +func TestStoreTruncateCleansUpDmap(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0} + } + for tseq := uint64(0); tseq <= 1; tseq++ { + t.Run(fmt.Sprintf("Truncate(%d)", tseq), func(t *testing.T) { + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + dmapEntries := func() int { + if fss, ok := fs.(*fileStore); ok { + return fss.dmapEntries() + } else if mss, ok := fs.(*memStore); ok { + mss.mu.RLock() + defer mss.mu.RUnlock() + return mss.dmap.Size() + } else { + return 0 + } + } + + // Publish messages, should have no interior deletes. + for i := 0; i < 3; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + require_Len(t, dmapEntries(), 0) + + // Removing one message in the middle should be an interior delete. + _, err := fs.RemoveMsg(2) + require_NoError(t, err) + require_Len(t, dmapEntries(), 1) + + // Truncating must always clean up the interior delete. + err = fs.Truncate(tseq) + require_NoError(t, err) + require_Len(t, dmapEntries(), 0) + + // Validate first/last sequence. + state := fs.State() + fseq := uint64(1) + if fseq > tseq { + fseq = tseq + } + require_Equal(t, state.FirstSeq, fseq) + require_Equal(t, state.LastSeq, tseq) + }) + }) + } +}