Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Max streams reached with parallel stream creation #6502

Merged
merged 1 commit into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6148,7 +6148,14 @@ func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfi
numStreams, reservations := tieredStreamAndReservationCount(asa, tier, cfg)
// Check for inflight proposals...
if cc := js.cluster; cc != nil && cc.inflight != nil {
numStreams += len(cc.inflight[acc.Name])
streams := cc.inflight[acc.Name]
numStreams += len(streams)
// If inflight contains the same stream, don't count toward exceeding maximum.
if cfg != nil {
if _, ok := streams[cfg.Name]; ok {
numStreams--
}
}
}
if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams {
return NewJSMaximumStreamsLimitError()
Expand Down Expand Up @@ -6256,7 +6263,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
// On success, add this as an inflight proposal so we can apply limits
// on concurrent create requests while this stream assignment has
// possibly not been processed yet.
if streams, ok := cc.inflight[acc.Name]; ok {
if streams, ok := cc.inflight[acc.Name]; ok && self == nil {
streams[cfg.Name] = &inflightInfo{rg, syncSubject}
}
}
Expand Down
71 changes: 71 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3198,6 +3198,77 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
}
}

func TestJetStreamClusterMaxStreamsReached(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()

// Adjust our limits.
c.updateLimits("$G", map[string]JetStreamAccountLimits{
_EMPTY_: {
MaxMemory: 1024,
MaxStore: 1024,
MaxStreams: 1,
},
})

// Many stream creations in parallel for the same stream should not result in
// maximum number of streams reached error. All should have a successful response.
var wg sync.WaitGroup
for i := 0; i < 15; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)
}()
}
wg.Wait()
require_NoError(t, js.DeleteStream("TEST"))

// Adjust our limits.
c.updateLimits("$G", map[string]JetStreamAccountLimits{
_EMPTY_: {
MaxMemory: 1024,
MaxStore: 1024,
MaxStreams: 2,
},
})

// Setup streams beforehand.
for d := 0; d < 2; d++ {
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST-%d", d),
Subjects: []string{fmt.Sprintf("foo.%d", d)},
Replicas: 1,
})
require_NoError(t, err)
}

// Many stream creations in parallel for streams that already exist should not result in
// maximum number of streams reached error. All should have a successful response.
for i := 0; i < 15; i++ {
wg.Add(1)
d := i % 2
go func() {
defer wg.Done()
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST-%d", d),
Subjects: []string{fmt.Sprintf("foo.%d", d)},
Replicas: 1,
})
require_NoError(t, err)
}()
}
wg.Wait()
}

func TestJetStreamClusterStreamLimits(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down