Skip to content

Commit

Permalink
[FIXED] (2.11) Propose message delete for clustered interest stream (#…
Browse files Browse the repository at this point in the history
…6140)

For an Interest or WorkQueue stream messages will be removed once all
consumers that need to receive a message have acked it.

For a clustered stream each consumer would ack and remove a message by
themselves. This can be problematic since that introduces different
ordering between servers. For example when using DiscardOld with
MaxMsgs, which could result in stream desync. Proposing the message
removal ensures ordering between servers.

Signed-off-by: Maurice van Veen <[email protected]>

---------

Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Neil Twigg <[email protected]>
Co-authored-by: Neil Twigg <[email protected]>
  • Loading branch information
MauriceVanVeen and neilalexander committed Feb 18, 2025
1 parent 45ee8c4 commit e482da2
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 17 deletions.
19 changes: 15 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5544,6 +5544,7 @@ func (o *consumer) isMonitorRunning() bool {

// If we detect that our ackfloor is higher than the stream's last sequence, return this error.
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")
var errAckFloorInvalid = errors.New("consumer ack floor is invalid")

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
Expand Down Expand Up @@ -5573,7 +5574,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
asflr := state.AckFloor.Stream
// Protect ourselves against rolling backwards.
if asflr&(1<<63) != 0 {
return nil
return errAckFloorInvalid
}

// Check if the underlying stream's last sequence is less than our floor.
Expand All @@ -5592,6 +5593,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
fseq = chkfloor
}

var retryAsflr uint64
for seq = fseq; asflr > 0 && seq <= asflr; seq++ {
if filters != nil {
_, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv)
Expand All @@ -5604,15 +5606,24 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
}
// Only ack though if no error and seq <= ack floor.
if err == nil && seq <= asflr {
mset.ackMsg(o, seq)
didRemove := mset.ackMsg(o, seq)
// Removing the message could fail. For example if we're behind on stream applies.
// Overwrite retry floor (only the first time) to allow us to check next time if the removal was successful.
if didRemove && retryAsflr == 0 {
retryAsflr = seq
}
}
}
// If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it.
if retryAsflr == 0 {
retryAsflr = asflr + 1
}

o.mu.Lock()
// Update our check floor.
// Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work.
if asflr+1 > o.chkflr {
o.chkflr = asflr + 1
if retryAsflr > o.chkflr {
o.chkflr = retryAsflr
}
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
Expand Down
122 changes: 116 additions & 6 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) {
var expectedErr error
var msgId string
var smv StoreMsg
for i, mset := range msets {
for _, mset := range msets {
mset.mu.RLock()
sm, err := mset.store.LoadMsg(seq, &smv)
mset.mu.RUnlock()
Expand All @@ -754,17 +754,14 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) {
// by all msets for that seq to prove consistency across replicas.
// If one of the msets either returns no error or doesn't return
// the same error, then that replica has drifted.
if msgId != _EMPTY_ {
t.Fatalf("Expected MsgId %q for seq %d, but got error: %v", msgId, seq, err)
} else if expectedErr == nil {
if expectedErr == nil {
expectedErr = err
} else {
require_Error(t, err, expectedErr)
}
continue
}
// Only set expected msg ID if it's for the very first time.
if msgId == _EMPTY_ && i == 0 {
if msgId == _EMPTY_ {
msgId = string(sm.hdr)
} else if msgId != string(sm.hdr) {
t.Fatalf("MsgIds do not match for seq %d: %q vs %q", seq, msgId, sm.hdr)
Expand Down Expand Up @@ -4673,3 +4670,116 @@ func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) {
ljs.mu.Lock()
t.Logf("Took %s to clear %d items", time.Since(start), count)
}

func TestJetStreamClusterStreamAckMsgR1SignalsRemovedMsg(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.WorkQueuePolicy,
Replicas: 1,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "CONSUMER",
Replicas: 1,
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)

s := c.streamLeader(globalAccountName, "TEST")
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("CONSUMER")
require_NotNil(t, o)

// Too high sequence, should register pre-ack and return true allowing for retries.
require_True(t, mset.ackMsg(o, 100))

var smv StoreMsg
sm, err := mset.store.LoadMsg(1, &smv)
require_NoError(t, err)
require_Equal(t, sm.subj, "foo")

// Now do a proper ack, should immediately remove the message since it's R1.
require_True(t, mset.ackMsg(o, 1))
_, err = mset.store.LoadMsg(1, &smv)
require_Error(t, err, ErrStoreMsgNotFound)
}

func TestJetStreamClusterStreamAckMsgR3SignalsRemovedMsg(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "CONSUMER",
Replicas: 3,
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)

getStreamAndConsumer := func(s *Server) (*stream, *consumer, error) {
t.Helper()
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return nil, nil, err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return nil, nil, err
}
o := mset.lookupConsumer("CONSUMER")
if err != nil {
return nil, nil, err
}
return mset, o, nil
}

sl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
sf := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER")

msetL, ol, err := getStreamAndConsumer(sl)
require_NoError(t, err)
msetF, of, err := getStreamAndConsumer(sf)
require_NoError(t, err)

// Too high sequence, should register pre-ack and return true allowing for retries.
require_True(t, msetL.ackMsg(ol, 100))
require_True(t, msetF.ackMsg(of, 100))

// Let all servers ack the message.
var smv StoreMsg
for _, s := range c.servers {
mset, _, err := getStreamAndConsumer(s)
require_NoError(t, err)
require_True(t, mset.ackMsg(of, 1))

_, err = mset.store.LoadMsg(1, &smv)
require_Error(t, err, ErrStoreMsgNotFound)
}
}
2 changes: 1 addition & 1 deletion server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11225,7 +11225,7 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceInterest(t *testing.
}

require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1)
require_Equal(t, checkFloor(mset.lookupConsumer("B")), 100_001)
require_Equal(t, checkFloor(mset.lookupConsumer("B")), 90_001)
require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_001)

// This checks the chkflr state. For this test this should be much faster,
Expand Down
16 changes: 10 additions & 6 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5683,16 +5683,17 @@ func (mset *stream) clearPreAck(o *consumer, seq uint64) {
}

// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
func (mset *stream) ackMsg(o *consumer, seq uint64) {
// Returns whether the message at seq was removed as a result of the ACK.
func (mset *stream) ackMsg(o *consumer, seq uint64) bool {
if seq == 0 {
return
return false
}

// Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers.
mset.mu.Lock()
if mset.closed.Load() || mset.cfg.Retention == LimitsPolicy {
mset.mu.Unlock()
return
return false
}

store := mset.store
Expand All @@ -5703,7 +5704,9 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
if seq > state.LastSeq {
mset.registerPreAck(o, seq)
mset.mu.Unlock()
return
// We have not removed the message, but should still signal so we could retry later
// since we potentially need to remove it then.
return true
}

// Always clear pre-ack if here.
Expand All @@ -5712,7 +5715,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
// Make sure this sequence is not below our first sequence.
if seq < state.FirstSeq {
mset.mu.Unlock()
return
return false
}

var shouldRemove bool
Expand All @@ -5728,14 +5731,15 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {

// If nothing else to do.
if !shouldRemove {
return
return false
}

// If we are here we should attempt to remove.
if _, err := store.RemoveMsg(seq); err == ErrStoreEOF {
// This should not happen, but being pedantic.
mset.registerPreAckLock(o, seq)
}
return true
}

// Snapshot creates a snapshot for the stream and possibly consumers.
Expand Down

0 comments on commit e482da2

Please sign in to comment.