Skip to content

Commit

Permalink
[FIXED] Max streams reached with parallel stream creation (#6502)
Browse files Browse the repository at this point in the history
Maximum streams reached errors would be returned when creating (the
same) streams in parallel:
- if max streams = 1, the second create request would fail as the
previous create request being inflight made it exceed the limit, even if
it was about the same stream
- if max streams > 1, if the streams already existed would count the
streams twice, once for the streams existing already, and another time
due to being inflight

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Feb 12, 2025
2 parents aee8cbc + 7201a1c commit 3efc8de
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
11 changes: 9 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6078,7 +6078,14 @@ func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfi
numStreams, reservations := tieredStreamAndReservationCount(asa, tier, cfg)
// Check for inflight proposals...
if cc := js.cluster; cc != nil && cc.inflight != nil {
numStreams += len(cc.inflight[acc.Name])
streams := cc.inflight[acc.Name]
numStreams += len(streams)
// If inflight contains the same stream, don't count toward exceeding maximum.
if cfg != nil {
if _, ok := streams[cfg.Name]; ok {
numStreams--
}
}
}
if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams {
return NewJSMaximumStreamsLimitError()
Expand Down Expand Up @@ -6186,7 +6193,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
// On success, add this as an inflight proposal so we can apply limits
// on concurrent create requests while this stream assignment has
// possibly not been processed yet.
if streams, ok := cc.inflight[acc.Name]; ok {
if streams, ok := cc.inflight[acc.Name]; ok && self == nil {
streams[cfg.Name] = &inflightInfo{rg, syncSubject}
}
}
Expand Down
71 changes: 71 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3198,6 +3198,77 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
}
}

func TestJetStreamClusterMaxStreamsReached(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()

// Adjust our limits.
c.updateLimits("$G", map[string]JetStreamAccountLimits{
_EMPTY_: {
MaxMemory: 1024,
MaxStore: 1024,
MaxStreams: 1,
},
})

// Many stream creations in parallel for the same stream should not result in
// maximum number of streams reached error. All should have a successful response.
var wg sync.WaitGroup
for i := 0; i < 15; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)
}()
}
wg.Wait()
require_NoError(t, js.DeleteStream("TEST"))

// Adjust our limits.
c.updateLimits("$G", map[string]JetStreamAccountLimits{
_EMPTY_: {
MaxMemory: 1024,
MaxStore: 1024,
MaxStreams: 2,
},
})

// Setup streams beforehand.
for d := 0; d < 2; d++ {
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST-%d", d),
Subjects: []string{fmt.Sprintf("foo.%d", d)},
Replicas: 1,
})
require_NoError(t, err)
}

// Many stream creations in parallel for streams that already exist should not result in
// maximum number of streams reached error. All should have a successful response.
for i := 0; i < 15; i++ {
wg.Add(1)
d := i % 2
go func() {
defer wg.Done()
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST-%d", d),
Subjects: []string{fmt.Sprintf("foo.%d", d)},
Replicas: 1,
})
require_NoError(t, err)
}()
}
wg.Wait()
}

func TestJetStreamClusterStreamLimits(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down

0 comments on commit 3efc8de

Please sign in to comment.