From 7201a1cbf6634dff34668d9b7682ccba3fa6e2e7 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 12 Feb 2025 11:39:46 +0100 Subject: [PATCH] [FIXED] Max streams reached with parallel stream creation Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 11 ++++- server/jetstream_cluster_1_test.go | 71 ++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5ae7397c85..427f2066b1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6148,7 +6148,14 @@ func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfi numStreams, reservations := tieredStreamAndReservationCount(asa, tier, cfg) // Check for inflight proposals... if cc := js.cluster; cc != nil && cc.inflight != nil { - numStreams += len(cc.inflight[acc.Name]) + streams := cc.inflight[acc.Name] + numStreams += len(streams) + // If inflight contains the same stream, don't count toward exceeding maximum. + if cfg != nil { + if _, ok := streams[cfg.Name]; ok { + numStreams-- + } + } } if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams { return NewJSMaximumStreamsLimitError() @@ -6256,7 +6263,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, // On success, add this as an inflight proposal so we can apply limits // on concurrent create requests while this stream assignment has // possibly not been processed yet. - if streams, ok := cc.inflight[acc.Name]; ok { + if streams, ok := cc.inflight[acc.Name]; ok && self == nil { streams[cfg.Name] = &inflightInfo{rg, syncSubject} } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index c6d42c881a..07672771ff 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -3198,6 +3198,77 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) { } } +func TestJetStreamClusterMaxStreamsReached(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomNonLeader()) + defer nc.Close() + + // Adjust our limits. + c.updateLimits("$G", map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxMemory: 1024, + MaxStore: 1024, + MaxStreams: 1, + }, + }) + + // Many stream creations in parallel for the same stream should not result in + // maximum number of streams reached error. All should have a successful response. + var wg sync.WaitGroup + for i := 0; i < 15; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + }) + require_NoError(t, err) + }() + } + wg.Wait() + require_NoError(t, js.DeleteStream("TEST")) + + // Adjust our limits. + c.updateLimits("$G", map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxMemory: 1024, + MaxStore: 1024, + MaxStreams: 2, + }, + }) + + // Setup streams beforehand. + for d := 0; d < 2; d++ { + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("TEST-%d", d), + Subjects: []string{fmt.Sprintf("foo.%d", d)}, + Replicas: 1, + }) + require_NoError(t, err) + } + + // Many stream creations in parallel for streams that already exist should not result in + // maximum number of streams reached error. All should have a successful response. + for i := 0; i < 15; i++ { + wg.Add(1) + d := i % 2 + go func() { + defer wg.Done() + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("TEST-%d", d), + Subjects: []string{fmt.Sprintf("foo.%d", d)}, + Replicas: 1, + }) + require_NoError(t, err) + }() + } + wg.Wait() +} + func TestJetStreamClusterStreamLimits(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()