Skip to content

Commit

Permalink
Cherry-picks for 2.10.23-RC.5 (#6171)
Browse files Browse the repository at this point in the history
Includes the following:

- #5661
- #5666
- #5671
- #5344
- #5684
- #5689
- #5691
- #5714
- #5717
- #5707
- #5792
- #5912
- #5957
- #5700
- #5975
- #5991
- #5987
- #6027
- #6038
- #6053
- #5848
- #6055
- #6056
- #6060
- #6061
- #6072
- #5832
- #6073
- #6107

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Nov 25, 2024
2 parents ba4b34f + 4606175 commit 24acb45
Show file tree
Hide file tree
Showing 9 changed files with 1,646 additions and 737 deletions.
20 changes: 13 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
return false
}

const (
defaultConsumerNotActiveStartInterval = 30 * time.Second
defaultConsumerNotActiveMaxInterval = 5 * time.Minute
)

var (
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval
)

func (o *consumer) deleteNotActive() {
o.mu.Lock()
if o.mset == nil {
Expand Down Expand Up @@ -1628,12 +1638,8 @@ func (o *consumer) deleteNotActive() {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
const (
startInterval = 30 * time.Second
maxInterval = 5 * time.Minute
)
jitter := time.Duration(rand.Int63n(int64(startInterval)))
interval := startInterval + jitter
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
Expand All @@ -1648,7 +1654,7 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < maxInterval {
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
Expand Down
16 changes: 12 additions & 4 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10023,14 +10023,22 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {
// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is
// handled automatically by this function, so don't wrap calls to it in dios.
func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error {
if fs.fcfg.SyncAlways {
return writeFileWithSync(name, data, perm)
}
<-dios
defer func() {
dios <- struct{}{}
}()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if fs.fcfg.SyncAlways {
flags |= os.O_SYNC
}
return os.WriteFile(name, data, perm)
}

func writeFileWithSync(name string, data []byte, perm fs.FileMode) error {
<-dios
defer func() {
dios <- struct{}{}
}()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_SYNC
f, err := os.OpenFile(name, flags, perm)
if err != nil {
return err
Expand Down
62 changes: 28 additions & 34 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,10 +1431,6 @@ func (js *jetStream) monitorCluster() {
aq.recycle(&ces)

case isLeader = <-lch:
// For meta layer synchronize everyone to our state on becoming leader.
if isLeader && n.ApplyQ().len() == 0 {
n.SendSnapshot(js.metaSnapshot())
}
// Process the change.
js.processLeaderChange(isLeader)
if isLeader {
Expand Down Expand Up @@ -2129,8 +2125,32 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
}

// Check if we already have this assigned.
retry:
if node := s.lookupRaftNode(rg.Name); node != nil {
if node.State() == Closed {
// We're waiting for this node to finish shutting down before we replace it.
js.mu.Unlock()
node.WaitForStop()
js.mu.Lock()
goto retry
}
s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name)
// Check and see if the group has the same peers. If not then we
// will update the known peers, which will send a peerstate if leader.
groupPeerIDs := append([]string{}, rg.Peers...)
var samePeers bool
if nodePeers := node.Peers(); len(rg.Peers) == len(nodePeers) {
nodePeerIDs := make([]string, 0, len(nodePeers))
for _, n := range nodePeers {
nodePeerIDs = append(nodePeerIDs, n.ID)
}
slices.Sort(groupPeerIDs)
slices.Sort(nodePeerIDs)
samePeers = slices.Equal(groupPeerIDs, nodePeerIDs)
}
if !samePeers {
node.UpdateKnownPeers(groupPeerIDs)
}
rg.node = node
js.mu.Unlock()
return nil
Expand Down Expand Up @@ -8959,17 +8979,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// mset.store never changes after being set, don't need lock.
mset.store.FastState(&state)

// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
// We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state.
if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq {
s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d",
mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq)
if state.LastSeq > sreq.LastSeq {
sreq.LastSeq = state.LastSeq
}
}

// Setup sequences to walk through.
seq, last := sreq.FirstSeq, sreq.LastSeq
mset.setCatchupPeer(sreq.Peer, last-seq)
Expand Down Expand Up @@ -9133,25 +9142,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}
// Check for a condition where our state's first is now past the last that we could have sent.
// If so reset last and continue sending.
var state StreamState
mset.mu.RLock()
mset.store.FastState(&state)
mset.mu.RUnlock()
if last < state.FirstSeq {
last = state.LastSeq
}
// Recheck our exit condition.
if seq == last {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
select {
case <-remoteQuitCh:
Expand Down
20 changes: 16 additions & 4 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6647,12 +6647,24 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
return nil
})

// Make sure we only sent 1002 sync catchup msgs.
// This is for the new messages, the delete range, and the EOF.
// Make sure we only sent 2 sync catchup msgs.
// This is for the delete range, and the EOF.
nmsgs, _, _ := sub.Pending()
if nmsgs != 1002 {
t.Fatalf("Expected only 1002 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
if nmsgs != 2 {
t.Fatalf("Expected only 2 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
}

msg, err := sub.NextMsg(0)
require_NoError(t, err)
mbuf := msg.Data[1:]
dr, err := decodeDeleteRange(mbuf)
require_NoError(t, err)
require_Equal(t, dr.First, 1001)
require_Equal(t, dr.Num, 1000)

msg, err = sub.NextMsg(0)
require_NoError(t, err)
require_Equal(t, len(msg.Data), 0)
}

func TestJetStreamClusterStreamResetWithLargeFirstSeq(t *testing.T) {
Expand Down
14 changes: 12 additions & 2 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) {
}

func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
time.Sleep(2 * time.Second)

// Restart first and wait so that we know it will try cleanup without a metaleader.
// It will fail as there's no metaleader at that time, it should keep retrying on an interval.
c.restartServer(rs)
time.Sleep(time.Second)

Expand All @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
defer nc.Close()

subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
m, err := nc.Request(subj, nil, time.Second)
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
// Request will take at most 4 seconds if some consumers can't be found.
m, err := nc.Request(subj, nil, 5*time.Second)
if err != nil {
return err
}
Expand Down Expand Up @@ -3910,6 +3917,7 @@ func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) {
node.InstallSnapshot(mset.stateSnapshot())
// Stop the stream
mset.stop(false, false)
node.WaitForStop()

if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 {
t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes)
Expand Down Expand Up @@ -5801,6 +5809,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
s.rnMu.RLock()
defer s.rnMu.RUnlock()
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}
Expand Down
Loading

0 comments on commit 24acb45

Please sign in to comment.