From e482da214c449aa5d27d6182f2558286fd96280d Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 28 Nov 2024 18:46:18 +0100 Subject: [PATCH] [FIXED] (2.11) Propose message delete for clustered interest stream (#6140) For an Interest or WorkQueue stream messages will be removed once all consumers that need to receive a message have acked it. For a clustered stream each consumer would ack and remove a message by themselves. This can be problematic since that introduces different ordering between servers. For example when using DiscardOld with MaxMsgs, which could result in stream desync. Proposing the message removal ensures ordering between servers. Signed-off-by: Maurice van Veen --------- Signed-off-by: Maurice van Veen Signed-off-by: Neil Twigg Co-authored-by: Neil Twigg --- server/consumer.go | 19 ++++- server/jetstream_cluster_4_test.go | 122 +++++++++++++++++++++++++++-- server/norace_test.go | 2 +- server/stream.go | 16 ++-- 4 files changed, 142 insertions(+), 17 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 9511c592bc..dc5f7e5828 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5544,6 +5544,7 @@ func (o *consumer) isMonitorRunning() bool { // If we detect that our ackfloor is higher than the stream's last sequence, return this error. var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence") +var errAckFloorInvalid = errors.New("consumer ack floor is invalid") // If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent. func (o *consumer) checkStateForInterestStream(ss *StreamState) error { @@ -5573,7 +5574,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { asflr := state.AckFloor.Stream // Protect ourselves against rolling backwards. if asflr&(1<<63) != 0 { - return nil + return errAckFloorInvalid } // Check if the underlying stream's last sequence is less than our floor. @@ -5592,6 +5593,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { fseq = chkfloor } + var retryAsflr uint64 for seq = fseq; asflr > 0 && seq <= asflr; seq++ { if filters != nil { _, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv) @@ -5604,15 +5606,24 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { } // Only ack though if no error and seq <= ack floor. if err == nil && seq <= asflr { - mset.ackMsg(o, seq) + didRemove := mset.ackMsg(o, seq) + // Removing the message could fail. For example if we're behind on stream applies. + // Overwrite retry floor (only the first time) to allow us to check next time if the removal was successful. + if didRemove && retryAsflr == 0 { + retryAsflr = seq + } } } + // If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it. + if retryAsflr == 0 { + retryAsflr = asflr + 1 + } o.mu.Lock() // Update our check floor. // Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work. - if asflr+1 > o.chkflr { - o.chkflr = asflr + 1 + if retryAsflr > o.chkflr { + o.chkflr = retryAsflr } // See if we need to process this update if our parent stream is not a limits policy stream. state, _ = o.store.State() diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 4ee7dd5cb2..bfab9f1941 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -744,7 +744,7 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { var expectedErr error var msgId string var smv StoreMsg - for i, mset := range msets { + for _, mset := range msets { mset.mu.RLock() sm, err := mset.store.LoadMsg(seq, &smv) mset.mu.RUnlock() @@ -754,17 +754,14 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { // by all msets for that seq to prove consistency across replicas. // If one of the msets either returns no error or doesn't return // the same error, then that replica has drifted. - if msgId != _EMPTY_ { - t.Fatalf("Expected MsgId %q for seq %d, but got error: %v", msgId, seq, err) - } else if expectedErr == nil { + if expectedErr == nil { expectedErr = err } else { require_Error(t, err, expectedErr) } continue } - // Only set expected msg ID if it's for the very first time. - if msgId == _EMPTY_ && i == 0 { + if msgId == _EMPTY_ { msgId = string(sm.hdr) } else if msgId != string(sm.hdr) { t.Fatalf("MsgIds do not match for seq %d: %q vs %q", seq, msgId, sm.hdr) @@ -4673,3 +4670,116 @@ func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) { ljs.mu.Lock() t.Logf("Took %s to clear %d items", time.Since(start), count) } + +func TestJetStreamClusterStreamAckMsgR1SignalsRemovedMsg(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 1, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 1, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + s := c.streamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + // Too high sequence, should register pre-ack and return true allowing for retries. + require_True(t, mset.ackMsg(o, 100)) + + var smv StoreMsg + sm, err := mset.store.LoadMsg(1, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, "foo") + + // Now do a proper ack, should immediately remove the message since it's R1. + require_True(t, mset.ackMsg(o, 1)) + _, err = mset.store.LoadMsg(1, &smv) + require_Error(t, err, ErrStoreMsgNotFound) +} + +func TestJetStreamClusterStreamAckMsgR3SignalsRemovedMsg(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + getStreamAndConsumer := func(s *Server) (*stream, *consumer, error) { + t.Helper() + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return nil, nil, err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return nil, nil, err + } + o := mset.lookupConsumer("CONSUMER") + if err != nil { + return nil, nil, err + } + return mset, o, nil + } + + sl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER") + sf := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER") + + msetL, ol, err := getStreamAndConsumer(sl) + require_NoError(t, err) + msetF, of, err := getStreamAndConsumer(sf) + require_NoError(t, err) + + // Too high sequence, should register pre-ack and return true allowing for retries. + require_True(t, msetL.ackMsg(ol, 100)) + require_True(t, msetF.ackMsg(of, 100)) + + // Let all servers ack the message. + var smv StoreMsg + for _, s := range c.servers { + mset, _, err := getStreamAndConsumer(s) + require_NoError(t, err) + require_True(t, mset.ackMsg(of, 1)) + + _, err = mset.store.LoadMsg(1, &smv) + require_Error(t, err, ErrStoreMsgNotFound) + } +} diff --git a/server/norace_test.go b/server/norace_test.go index 6825ac452d..8c9785f505 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11225,7 +11225,7 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceInterest(t *testing. } require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) - require_Equal(t, checkFloor(mset.lookupConsumer("B")), 100_001) + require_Equal(t, checkFloor(mset.lookupConsumer("B")), 90_001) require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_001) // This checks the chkflr state. For this test this should be much faster, diff --git a/server/stream.go b/server/stream.go index a27ed9d495..e7d7512e42 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5683,16 +5683,17 @@ func (mset *stream) clearPreAck(o *consumer, seq uint64) { } // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. -func (mset *stream) ackMsg(o *consumer, seq uint64) { +// Returns whether the message at seq was removed as a result of the ACK. +func (mset *stream) ackMsg(o *consumer, seq uint64) bool { if seq == 0 { - return + return false } // Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers. mset.mu.Lock() if mset.closed.Load() || mset.cfg.Retention == LimitsPolicy { mset.mu.Unlock() - return + return false } store := mset.store @@ -5703,7 +5704,9 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { if seq > state.LastSeq { mset.registerPreAck(o, seq) mset.mu.Unlock() - return + // We have not removed the message, but should still signal so we could retry later + // since we potentially need to remove it then. + return true } // Always clear pre-ack if here. @@ -5712,7 +5715,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // Make sure this sequence is not below our first sequence. if seq < state.FirstSeq { mset.mu.Unlock() - return + return false } var shouldRemove bool @@ -5728,7 +5731,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // If nothing else to do. if !shouldRemove { - return + return false } // If we are here we should attempt to remove. @@ -5736,6 +5739,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // This should not happen, but being pedantic. mset.registerPreAckLock(o, seq) } + return true } // Snapshot creates a snapshot for the stream and possibly consumers.