Skip to content

Commit

Permalink
Cherry-picks for 2.10.26-RC.4 (#6520)
Browse files Browse the repository at this point in the history
Includes the following:

- #6507
- #6497
- #6476
- #6511
- #6513
- #6517
- #6515
- #6519
- #6521

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Feb 18, 2025
2 parents 45ee8c4 + e97c614 commit 612b7a9
Show file tree
Hide file tree
Showing 20 changed files with 852 additions and 46 deletions.
6 changes: 3 additions & 3 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3970,7 +3970,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
reply = append(reply, '@')
reply = append(reply, c.pa.deliver...)
}
didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames) || didDeliver
didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames, false) || didDeliver
}

// Check to see if we did not deliver to anyone and the client has a reply subject set
Expand Down Expand Up @@ -4017,7 +4017,7 @@ func (c *client) handleGWReplyMap(msg []byte) bool {
reply = append(reply, '@')
reply = append(reply, c.pa.deliver...)
}
c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil)
c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil, false)
}
return true
}
Expand Down Expand Up @@ -4366,7 +4366,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
flags |= pmrCollectQueueNames
var queues [][]byte
didDeliver, queues = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags)
didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues) || didDeliver
didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues, false) || didDeliver
} else {
didDeliver, _ = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags)
}
Expand Down
19 changes: 15 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5544,6 +5544,7 @@ func (o *consumer) isMonitorRunning() bool {

// If we detect that our ackfloor is higher than the stream's last sequence, return this error.
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")
var errAckFloorInvalid = errors.New("consumer ack floor is invalid")

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
Expand Down Expand Up @@ -5573,7 +5574,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
asflr := state.AckFloor.Stream
// Protect ourselves against rolling backwards.
if asflr&(1<<63) != 0 {
return nil
return errAckFloorInvalid
}

// Check if the underlying stream's last sequence is less than our floor.
Expand All @@ -5592,6 +5593,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
fseq = chkfloor
}

var retryAsflr uint64
for seq = fseq; asflr > 0 && seq <= asflr; seq++ {
if filters != nil {
_, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv)
Expand All @@ -5604,15 +5606,24 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
}
// Only ack though if no error and seq <= ack floor.
if err == nil && seq <= asflr {
mset.ackMsg(o, seq)
didRemove := mset.ackMsg(o, seq)
// Removing the message could fail. For example if we're behind on stream applies.
// Overwrite retry floor (only the first time) to allow us to check next time if the removal was successful.
if didRemove && retryAsflr == 0 {
retryAsflr = seq
}
}
}
// If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it.
if retryAsflr == 0 {
retryAsflr = asflr + 1
}

o.mu.Lock()
// Update our check floor.
// Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work.
if asflr+1 > o.chkflr {
o.chkflr = asflr + 1
if retryAsflr > o.chkflr {
o.chkflr = retryAsflr
}
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
Expand Down
15 changes: 5 additions & 10 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1432,15 +1432,14 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
if seq == 0 || seq&ebit != 0 || seq < fseq {
seq = seq &^ ebit
if seq >= fseq {
// Only add to dmap if past recorded first seq and non-zero.
if seq != 0 {
addToDmap(seq)
}
atomic.StoreUint64(&mb.last.seq, seq)
mb.last.ts = ts
if mb.msgs == 0 {
atomic.StoreUint64(&mb.first.seq, seq+1)
mb.first.ts = 0
} else if seq != 0 {
// Only add to dmap if past recorded first seq and non-zero.
addToDmap(seq)
}
}
index += rl
Expand Down Expand Up @@ -7017,11 +7016,7 @@ func (fs *fileStore) State() StreamState {
}
// Add in deleted.
mb.dmap.Range(func(seq uint64) bool {
if seq < fseq {
mb.dmap.Delete(seq)
} else {
state.Deleted = append(state.Deleted, seq)
}
state.Deleted = append(state.Deleted, seq)
return true
})
mb.mu.Unlock()
Expand Down Expand Up @@ -7573,7 +7568,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
if err == errDeletedMsg {
// Update dmap.
if !smb.dmap.IsEmpty() {
smb.dmap.Delete(seq)
smb.dmap.Delete(mseq)
}
} else if sm != nil {
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
Expand Down
55 changes: 55 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8273,3 +8273,58 @@ func changeDirectoryPermission(directory string, mode fs.FileMode) error {
})
return err
}

func TestFileStoreLeftoverSkipMsgInDmap(t *testing.T) {
storeDir := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1},
)
require_NoError(t, err)
defer fs.Stop()

getLmbState := func(fs *fileStore) (uint64, uint64, int) {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.RLock()
fseq := atomic.LoadUint64(&lmb.first.seq)
lseq := atomic.LoadUint64(&lmb.last.seq)
dmaps := lmb.dmap.Size()
lmb.mu.RUnlock()
return fseq, lseq, dmaps
}

// Only skip a message.
fs.SkipMsg()

// Confirm state.
state := fs.State()
require_Equal(t, state.FirstSeq, 2)
require_Equal(t, state.LastSeq, 1)
require_Equal(t, state.NumDeleted, 0)
fseq, lseq, dmaps := getLmbState(fs)
require_Equal(t, fseq, 2)
require_Equal(t, lseq, 1)
require_Len(t, dmaps, 0)

// Stop without writing index.db so we recover based on just the blk file.
require_NoError(t, fs.stop(false, false))

fs, err = newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1},
)
require_NoError(t, err)
defer fs.Stop()

// Confirm the skipped message is not included in the deletes.
state = fs.State()
require_Equal(t, state.FirstSeq, 2)
require_Equal(t, state.LastSeq, 1)
require_Equal(t, state.NumDeleted, 0)
fseq, lseq, dmaps = getLmbState(fs)
require_Equal(t, fseq, 2)
require_Equal(t, lseq, 1)
require_Len(t, dmaps, 0)
}
24 changes: 22 additions & 2 deletions server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2499,8 +2499,13 @@ var subPool = &sync.Pool{
// that the message is not sent to a given gateway if for instance
// it is known that this gateway has no interest in the account or
// subject, etc..
// When invoked from a LEAF connection, `checkLeafQF` should be passed as `true`
// so that we skip any queue subscription interest that is not part of the
// `c.pa.queues` filter (similar to what we do in `processMsgResults`). However,
// when processing service imports, then this boolean should be passes as `false`,
// regardless if it is a LEAF connection or not.
// <Invoked from any client connection's readLoop>
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) bool {
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte, checkLeafQF bool) bool {
// We had some times when we were sending across a GW with no subject, and the other side would break
// due to parser error. These need to be fixed upstream but also double check here.
if len(subject) == 0 {
Expand Down Expand Up @@ -2577,6 +2582,21 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
qsubs := qr.qsubs[i]
if len(qsubs) > 0 {
queue := qsubs[0].queue
if checkLeafQF {
// Skip any queue that is not in the leaf's queue filter.
skip := true
for _, qn := range c.pa.queues {
if bytes.Equal(queue, qn) {
skip = false
break
}
}
if skip {
continue
}
// Now we still need to check that it was not delivered
// locally by checking the given `qgroups`.
}
add := true
for _, qn := range qgroups {
if bytes.Equal(queue, qn) {
Expand Down Expand Up @@ -2969,7 +2989,7 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
// we now need to send the message with the real subject to
// gateways in case they have interest on that reply subject.
if !isServiceReply {
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues)
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues, false)
}
} else if c.kind == GATEWAY {
// Only if we are a gateway connection should we try to route
Expand Down
7 changes: 4 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1587,10 +1587,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
}
if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil {
for _, ca := range osa.consumers {
if sa.consumers[ca.Name] == nil {
// Consumer was either removed, or recreated with a different raft group.
if nca := sa.consumers[ca.Name]; nca == nil {
caDel = append(caDel, ca)
} else if nca.Group != nil && ca.Group != nil && nca.Group.Name != ca.Group.Name {
caDel = append(caDel, ca)
} else {
caAdd = append(caAdd, ca)
}
}
}
Expand Down
110 changes: 110 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7555,6 +7555,116 @@ func TestJetStreamClusterAccountStatsForReplicatedStreams(t *testing.T) {
require_True(t, accStats.Sent.Bytes >= accStats.Received.Bytes*4)
}

func TestJetStreamClusterRecreateConsumerFromMetaSnapshot(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Initial setup.
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.Publish("foo", nil)
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
require_NoError(t, err)

// Wait for all servers to be fully up-to-date.
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
if err = checkState(t, c, globalAccountName, "TEST"); err != nil {
return err
}
for _, s := range c.servers {
if acc, err := s.lookupAccount(globalAccountName); err != nil {
return err
} else if mset, err := acc.lookupStream("TEST"); err != nil {
return err
} else if o := mset.lookupConsumer("CONSUMER"); o == nil {
return errors.New("consumer doesn't exist")
}
}
return nil
})

// Shutdown a random server.
rs := c.randomServer()
rs.Shutdown()
rs.WaitForShutdown()

// Recreate connection, since we could have shutdown the server we were connected to.
nc.Close()
c.waitOnLeader()
nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// Recreate consumer.
require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER"))
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
require_NoError(t, err)

// Wait for all servers (except for the one that's down) to have recreated the consumer.
var consumerRg string
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
consumerRg = _EMPTY_
for _, s := range c.servers {
if s == rs {
continue
}
if acc, err := s.lookupAccount(globalAccountName); err != nil {
return err
} else if mset, err := acc.lookupStream("TEST"); err != nil {
return err
} else if o := mset.lookupConsumer("CONSUMER"); o == nil {
return errors.New("consumer doesn't exist")
} else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ {
consumerRg = ccrg
} else if consumerRg != ccrg {
return errors.New("consumer raft groups don't match")
}
}
return nil
})

// Install snapshots on all remaining servers to "hide" the intermediate consumer recreate requests.
for _, s := range c.servers {
if s != rs {
sjs := s.getJetStream()
require_NotNil(t, sjs)
snap, err := sjs.metaSnapshot()
require_NoError(t, err)
sjs.mu.RLock()
meta := sjs.cluster.meta
sjs.mu.RUnlock()
require_NoError(t, meta.InstallSnapshot(snap))
}
}

// Restart the server, it should receive a meta snapshot and recognize the consumer recreation.
rs = c.restartServer(rs)
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
consumerRg = _EMPTY_
for _, s := range c.servers {
if acc, err := s.lookupAccount(globalAccountName); err != nil {
return err
} else if mset, err := acc.lookupStream("TEST"); err != nil {
return err
} else if o := mset.lookupConsumer("CONSUMER"); o == nil {
return errors.New("consumer doesn't exist")
} else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ {
consumerRg = ccrg
} else if consumerRg != ccrg {
return errors.New("consumer raft groups don't match")
}
}
return nil
})
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
Loading

0 comments on commit 612b7a9

Please sign in to comment.