@@ -505,7 +505,7 @@ func checkConsumerCfg(
505505 }
506506
507507 // Check if we have a BackOff defined that MaxDeliver is within range etc.
508- if lbo := len (config .BackOff ); lbo > 0 && config .MaxDeliver != - 1 && config .MaxDeliver <= lbo {
508+ if lbo := len (config .BackOff ); lbo > 0 && config .MaxDeliver != - 1 && lbo > config .MaxDeliver {
509509 return NewJSConsumerMaxDeliverBackoffError ()
510510 }
511511
@@ -1843,7 +1843,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
18431843 }
18441844
18451845 // Check if BackOff is defined, MaxDeliver is within range.
1846- if lbo := len (ncfg .BackOff ); lbo > 0 && ncfg .MaxDeliver != - 1 && ncfg .MaxDeliver <= lbo {
1846+ if lbo := len (ncfg .BackOff ); lbo > 0 && ncfg .MaxDeliver != - 1 && lbo > ncfg .MaxDeliver {
18471847 return NewJSConsumerMaxDeliverBackoffError ()
18481848 }
18491849
@@ -2206,9 +2206,7 @@ func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) {
22062206 n += binary .PutUvarint (b [n :], dc )
22072207 n += binary .PutVarint (b [n :], ts )
22082208 o .propose (b [:n ])
2209- }
2210- if o .store != nil {
2211- // Update local state always.
2209+ } else if o .store != nil {
22122210 o .store .UpdateDelivered (dseq , sseq , dc , ts )
22132211 }
22142212 // Update activity.
@@ -3839,7 +3837,7 @@ func (o *consumer) checkAckFloor() {
38393837 // We will set it explicitly to 1 behind our current lowest in pending, or if
38403838 // pending is empty, to our current delivered -1.
38413839 const minOffThreshold = 50
3842- if o .asflr < ss .FirstSeq - minOffThreshold {
3840+ if ss . FirstSeq >= minOffThreshold && o .asflr < ss .FirstSeq - minOffThreshold {
38433841 var psseq , pdseq uint64
38443842 for seq , p := range o .pending {
38453843 if psseq == 0 || seq < psseq {
@@ -5245,12 +5243,6 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
52455243 if dflag {
52465244 n .Delete ()
52475245 } else {
5248- // Try to install snapshot on clean exit
5249- if o .store != nil && (o .retention != LimitsPolicy || n .NeedSnapshot ()) {
5250- if snap , err := o .store .EncodedState (); err == nil {
5251- n .InstallSnapshot (snap )
5252- }
5253- }
52545246 n .Stop ()
52555247 }
52565248 }
@@ -5595,8 +5587,9 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
55955587
55965588 o .mu .Lock ()
55975589 // Update our check floor.
5598- if seq > o .chkflr {
5599- o .chkflr = seq
5590+ // Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work.
5591+ if asflr + 1 > o .chkflr {
5592+ o .chkflr = asflr + 1
56005593 }
56015594 // See if we need to process this update if our parent stream is not a limits policy stream.
56025595 state , _ = o .store .State ()
0 commit comments