diff --git a/.github/workflows/on-pull-request.yml b/.github/workflows/on-pull-request.yml index 23854e74..d89825f7 100644 --- a/.github/workflows/on-pull-request.yml +++ b/.github/workflows/on-pull-request.yml @@ -50,7 +50,7 @@ jobs: skip-cache: true - name: Test - run: go test -v -race -p=1 -count=1 -tags holster_test_mode + run: go test -v -race -p=1 -count=1 go-bench: runs-on: ubuntu-latest timeout-minutes: 30 diff --git a/Makefile b/Makefile index 192ed39c..7c77cca8 100644 --- a/Makefile +++ b/Makefile @@ -1,57 +1,47 @@ -.DEFAULT_GOAL := build +.DEFAULT_GOAL := release VERSION=$(shell cat version) LDFLAGS="-X main.Version=$(VERSION)" GOLANGCI_LINT = $(GOPATH)/bin/golangci-lint GOLANGCI_LINT_VERSION = 1.56.2 -.PHONY: help -help: - @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' - -$(GOLANGCI_LINT): ## Download Go linter +$(GOLANGCI_LINT): curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin $(GOLANGCI_LINT_VERSION) .PHONY: lint -lint: $(GOLANGCI_LINT) ## Run Go linter - $(GOLANGCI_LINT) run -v --fix -c .golangci.yml ./... +lint: $(GOLANGCI_LINT) + $(GOLANGCI_LINT) run .PHONY: test -test: ## Run unit tests and measure code coverage - (go test -v -race -p=1 -count=1 -tags holster_test_mode -coverprofile coverage.out ./...; ret=$$?; \ +test: + (go test -v -race -p=1 -count=1 -coverprofile coverage.out ./...; ret=$$?; \ go tool cover -func coverage.out; \ go tool cover -html coverage.out -o coverage.html; \ exit $$ret) .PHONY: bench -bench: ## Run Go benchmarks +bench: go test ./... -bench . -benchtime 5s -timeout 0 -run=XXX -benchmem .PHONY: docker -docker: ## Build Docker image +docker: docker build --build-arg VERSION=$(VERSION) -t ghcr.io/mailgun/gubernator:$(VERSION) . docker tag ghcr.io/mailgun/gubernator:$(VERSION) ghcr.io/mailgun/gubernator:latest -.PHONY: build -build: proto ## Build binary +.PHONY: release +release: go build -v -ldflags $(LDFLAGS) -o gubernator ./cmd/gubernator/main.go .PHONY: clean -clean: ## Clean binaries +clean: rm -f gubernator gubernator-cli -.PHONY: clean-proto -clean-proto: ## Clean the generated source files from the protobuf sources - @echo "==> Cleaning up the go generated files from proto" - @find . -name "*.pb.go" -type f -delete - @find . -name "*.pb.*.go" -type f -delete - - .PHONY: proto -proto: ## Build protos - ./buf.gen.yaml +proto: + # Install buf: https://buf.build/docs/installation + buf generate .PHONY: certs -certs: ## Generate SSL certificates +certs: rm certs/*.key || rm certs/*.srl || rm certs/*.csr || rm certs/*.pem || rm certs/*.cert || true openssl genrsa -out certs/ca.key 4096 openssl req -new -x509 -key certs/ca.key -sha256 -subj "/C=US/ST=TX/O=Mailgun Technologies, Inc." -days 3650 -out certs/ca.cert diff --git a/algorithms.go b/algorithms.go index f2ed4a82..4f34dae6 100644 --- a/algorithms.go +++ b/algorithms.go @@ -26,16 +26,8 @@ import ( "go.opentelemetry.io/otel/trace" ) -// ### NOTE ### -// The both token and leaky follow the same semantic which allows for requests of more than the limit -// to be rejected, but subsequent requests within the same window that are under the limit to succeed. -// IE: client attempts to send 1000 emails but 100 is their limit. The request is rejected as over the -// limit, but we do not set the remainder to 0 in the cache. The client can retry within the same window -// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT` - // Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) { - tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket")) defer tokenBucketTimer.ObserveDuration() @@ -89,6 +81,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * ResetTime: 0, }, nil } + + // The following semantic allows for requests of more than the limit to be rejected, but subsequent + // requests within the same duration that are under the limit to succeed. IE: client attempts to + // send 1000 emails but 100 is their limit. The request is rejected as over the limit, but since we + // don't store OVER_LIMIT in the cache the client can retry within the same rate limit duration with + // 100 emails and the request will succeed. t, ok := item.Value.(*TokenBucketItem) if !ok { // Client switched algorithms; perhaps due to a migration? @@ -389,24 +387,22 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * // If requested hits takes the remainder if int64(b.Remaining) == r.Hits { - b.Remaining = 0 - rl.Remaining = int64(b.Remaining) + b.Remaining -= float64(r.Hits) + rl.Remaining = 0 rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate) return rl, nil } // If requested is more than available, then return over the limit - // without updating the bucket, unless `DRAIN_OVER_LIMIT` is set. + // without updating the bucket. if r.Hits > int64(b.Remaining) { metricOverLimitCounter.Add(1) rl.Status = Status_OVER_LIMIT - - // DRAIN_OVER_LIMIT behavior drains the remaining counter. if HasBehavior(r.Behavior, Behavior_DRAIN_OVER_LIMIT) { + // DRAIN_OVER_LIMIT behavior drains the remaining counter. b.Remaining = 0 rl.Remaining = 0 } - return rl, nil } diff --git a/benchmark_test.go b/benchmark_test.go index 5a383761..56d0fe57 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -33,13 +33,10 @@ func BenchmarkServer(b *testing.B) { require.NoError(b, err, "Error in conf.SetDefaults") b.Run("GetPeerRateLimit() with no batching", func(b *testing.B) { - client, err := guber.NewPeerClient(guber.PeerConfig{ + client := guber.NewPeerClient(guber.PeerConfig{ Info: cluster.GetRandomPeer(cluster.DataCenterNone), Behavior: conf.Behaviors, }) - if err != nil { - b.Errorf("Error building client: %s", err) - } b.ResetTimer() diff --git a/buf.gen.yaml b/buf.gen.yaml old mode 100755 new mode 100644 index 5c62f51f..65a065c5 --- a/buf.gen.yaml +++ b/buf.gen.yaml @@ -1,8 +1,6 @@ -#!/usr/bin/env -S buf generate --debug --template ---- version: v1 plugins: - - plugin: buf.build/protocolbuffers/go:v1.32.0 + - name: go out: ./ opt: paths=source_relative - plugin: buf.build/grpc/go:v1.3.0 @@ -10,12 +8,6 @@ plugins: opt: - paths=source_relative - require_unimplemented_servers=false - - plugin: buf.build/grpc-ecosystem/gateway:v2.18.0 # same version in go.mod - out: ./ - opt: - - paths=source_relative - - logtostderr=true - - generate_unbound_methods=true - plugin: buf.build/grpc/python:v1.57.0 out: ./python/gubernator - plugin: buf.build/protocolbuffers/python diff --git a/buf.lock b/buf.lock index b46be110..3435191d 100644 --- a/buf.lock +++ b/buf.lock @@ -4,5 +4,5 @@ deps: - remote: buf.build owner: googleapis repository: googleapis - commit: 7e6f6e774e29406da95bd61cdcdbc8bc - digest: shake256:fe43dd2265ea0c07d76bd925eeba612667cf4c948d2ce53d6e367e1b4b3cb5fa69a51e6acb1a6a50d32f894f054a35e6c0406f6808a483f2752e10c866ffbf73 + commit: 711e289f6a384c4caeebaff7c6931ade + digest: shake256:e08fb55dad7469f69df00304eed31427d2d1576e9aab31e6bf86642688e04caaf0372f15fe6974cf79432779a635b3ea401ca69c943976dc42749524e4c25d94 diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 1f5a7ba8..900d0ced 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -23,16 +23,12 @@ import ( "github.com/mailgun/gubernator/v2/cluster" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/goleak" ) func TestStartMultipleInstances(t *testing.T) { - t.Cleanup(func() { - goleak.VerifyNone(t) - }) err := cluster.Start(2) require.NoError(t, err) - t.Cleanup(cluster.Stop) + defer cluster.Stop() assert.Equal(t, 2, len(cluster.GetPeers())) assert.Equal(t, 2, len(cluster.GetDaemons())) diff --git a/cmd/gubernator/main.go b/cmd/gubernator/main.go index 875dfc05..d556ba88 100644 --- a/cmd/gubernator/main.go +++ b/cmd/gubernator/main.go @@ -31,7 +31,7 @@ import ( "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" - "k8s.io/klog/v2" + "k8s.io/klog" ) var log = logrus.WithField("category", "gubernator") diff --git a/functional_test.go b/functional_test.go index 654342b7..5b4057f2 100644 --- a/functional_test.go +++ b/functional_test.go @@ -24,23 +24,36 @@ import ( "math/rand" "net/http" "os" + "sort" "strings" + "sync" + "sync/atomic" "testing" "time" guber "github.com/mailgun/gubernator/v2" "github.com/mailgun/gubernator/v2/cluster" "github.com/mailgun/holster/v4/clock" + "github.com/mailgun/holster/v4/syncutil" "github.com/mailgun/holster/v4/testutil" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" json "google.golang.org/protobuf/encoding/protojson" ) +var algos = []struct { + Name string + Algorithm guber.Algorithm +}{ + {Name: "Token bucket", Algorithm: guber.Algorithm_TOKEN_BUCKET}, + {Name: "Leaky bucket", Algorithm: guber.Algorithm_LEAKY_BUCKET}, +} + // Setup and shutdown the mock gubernator cluster for the entire test suite func TestMain(m *testing.M) { if err := cluster.StartWith([]guber.PeerInfo{ @@ -60,6 +73,13 @@ func TestMain(m *testing.M) { fmt.Println(err) os.Exit(1) } + + // Populate peer clients. Avoids data races when goroutines conflict trying + // to instantiate client singletons. + for _, peer := range cluster.GetDaemons() { + _ = peer.MustClient() + } + code := m.Run() cluster.Stop() @@ -404,8 +424,8 @@ func TestDrainOverLimit(t *testing.T) { }, } - for idx, algoCase := range []guber.Algorithm{guber.Algorithm_TOKEN_BUCKET, guber.Algorithm_LEAKY_BUCKET} { - t.Run(guber.Algorithm_name[int32(algoCase)], func(t *testing.T) { + for idx, algoCase := range algos { + t.Run(algoCase.Name, func(t *testing.T) { for _, test := range tests { ctx := context.Background() t.Run(test.Name, func(t *testing.T) { @@ -414,7 +434,7 @@ func TestDrainOverLimit(t *testing.T) { { Name: "test_drain_over_limit", UniqueKey: fmt.Sprintf("account:1234:%d", idx), - Algorithm: algoCase, + Algorithm: algoCase.Algorithm, Behavior: guber.Behavior_DRAIN_OVER_LIMIT, Duration: guber.Second * 30, Hits: test.Hits, @@ -436,49 +456,6 @@ func TestDrainOverLimit(t *testing.T) { } } -func TestTokenBucketRequestMoreThanAvailable(t *testing.T) { - defer clock.Freeze(clock.Now()).Unfreeze() - - client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) - require.NoError(t, err) - - sendHit := func(status guber.Status, remain int64, hit int64) *guber.RateLimitResp { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) - defer cancel() - resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: "test_token_more_than_available", - UniqueKey: "account:123456", - Algorithm: guber.Algorithm_TOKEN_BUCKET, - Duration: guber.Millisecond * 1000, - Hits: hit, - Limit: 2000, - }, - }, - }) - require.NoError(t, err, hit) - assert.Equal(t, "", resp.Responses[0].Error) - assert.Equal(t, status, resp.Responses[0].Status) - assert.Equal(t, remain, resp.Responses[0].Remaining) - assert.Equal(t, int64(2000), resp.Responses[0].Limit) - return resp.Responses[0] - } - - // Use half of the bucket - sendHit(guber.Status_UNDER_LIMIT, 1000, 1000) - - // Ask for more than the bucket has and the remainder is still 1000. - // See NOTE in algorithms.go - sendHit(guber.Status_OVER_LIMIT, 1000, 1500) - - // Now other clients can ask for some of the remaining until we hit our limit - sendHit(guber.Status_UNDER_LIMIT, 500, 500) - sendHit(guber.Status_UNDER_LIMIT, 100, 400) - sendHit(guber.Status_UNDER_LIMIT, 0, 100) - sendHit(guber.Status_OVER_LIMIT, 0, 1) -} - func TestLeakyBucket(t *testing.T) { defer clock.Freeze(clock.Now()).Unfreeze() @@ -738,7 +715,7 @@ func TestLeakyBucketGregorian(t *testing.T) { Hits: 1, Remaining: 58, Status: guber.Status_UNDER_LIMIT, - Sleep: clock.Millisecond * 1200, + Sleep: clock.Second, }, { Name: "third hit; leak one hit", @@ -748,12 +725,7 @@ func TestLeakyBucketGregorian(t *testing.T) { }, } - // Truncate to the nearest minute now := clock.Now() - now = now.Truncate(1 * time.Minute) - // So we don't start on the minute boundary - now = now.Add(time.Millisecond * 100) - for _, test := range tests { t.Run(test.Name, func(t *testing.T) { resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ @@ -854,50 +826,6 @@ func TestLeakyBucketNegativeHits(t *testing.T) { } } -func TestLeakyBucketRequestMoreThanAvailable(t *testing.T) { - // Freeze time so we don't leak during the test - defer clock.Freeze(clock.Now()).Unfreeze() - - client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) - require.NoError(t, err) - - sendHit := func(status guber.Status, remain int64, hits int64) *guber.RateLimitResp { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) - defer cancel() - resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: "test_leaky_more_than_available", - UniqueKey: "account:123456", - Algorithm: guber.Algorithm_LEAKY_BUCKET, - Duration: guber.Millisecond * 1000, - Hits: hits, - Limit: 2000, - }, - }, - }) - require.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].Error) - assert.Equal(t, status, resp.Responses[0].Status) - assert.Equal(t, remain, resp.Responses[0].Remaining) - assert.Equal(t, int64(2000), resp.Responses[0].Limit) - return resp.Responses[0] - } - - // Use half of the bucket - sendHit(guber.Status_UNDER_LIMIT, 1000, 1000) - - // Ask for more than the rate limit has and the remainder is still 1000. - // See NOTE in algorithms.go - sendHit(guber.Status_OVER_LIMIT, 1000, 1500) - - // Now other clients can ask for some of the remaining until we hit our limit - sendHit(guber.Status_UNDER_LIMIT, 500, 500) - sendHit(guber.Status_UNDER_LIMIT, 100, 400) - sendHit(guber.Status_UNDER_LIMIT, 0, 100) - sendHit(guber.Status_OVER_LIMIT, 0, 1) -} - func TestMissingFields(t *testing.T) { client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.Nil(t, errs) @@ -962,16 +890,13 @@ func TestMissingFields(t *testing.T) { } func TestGlobalRateLimits(t *testing.T) { - const ( - name = "test_global" - key = "account:12345" - ) - - peers, err := cluster.ListNonOwningDaemons(name, key) - require.NoError(t, err) + peer := cluster.PeerAt(0).GRPCAddress + client, errs := guber.DialV1Server(peer, nil) + require.NoError(t, errs) + var resetTime int64 - sendHit := func(client guber.V1Client, status guber.Status, hits int64, remain int64) { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) + sendHit := func(status guber.Status, remain int64, i int) string { + ctx, cancel := context.WithTimeout(context.Background(), 10*clock.Second) defer cancel() resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ @@ -980,65 +905,76 @@ func TestGlobalRateLimits(t *testing.T) { UniqueKey: "account:12345", Algorithm: guber.Algorithm_TOKEN_BUCKET, Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 3, - Hits: hits, + Duration: guber.Second * 3, + Hits: 1, Limit: 5, }, }, }) - require.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].Error) - assert.Equal(t, remain, resp.Responses[0].Remaining) - assert.Equal(t, status, resp.Responses[0].Status) - assert.Equal(t, int64(5), resp.Responses[0].Limit) + require.NoError(t, err, i) + item := resp.Responses[0] + assert.Equal(t, "", item.Error) + assert.Equal(t, remain, item.Remaining) + assert.Equal(t, status, item.Status) + assert.Equal(t, int64(5), item.Limit) + // ResetTime should not change during test. + if resetTime == 0 { + resetTime = item.ResetTime + } + assert.Equal(t, resetTime, item.ResetTime) + + // ensure that we have a canonical host + assert.NotEmpty(t, resp.Responses[0].Metadata["owner"]) + + // name/key should ensure our connected peer is NOT the owner, + // the peer we are connected to should forward requests asynchronously to the owner. + assert.NotEqual(t, peer, resp.Responses[0].Metadata["owner"]) + + return resp.Responses[0].Metadata["owner"] } + // Our first hit should create the request on the peer and queue for async forward - sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1, 4) + sendHit(guber.Status_UNDER_LIMIT, 4, 1) // Our second should be processed as if we own it since the async forward hasn't occurred yet - sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 2, 2) + sendHit(guber.Status_UNDER_LIMIT, 3, 2) testutil.UntilPass(t, 20, clock.Millisecond*200, func(t testutil.TestingT) { - // Inspect peers metrics, ensure the peer sent the global rate limit to the owner - metricsURL := fmt.Sprintf("http://%s/metrics", peers[0].Config().HTTPListenAddress) + // Inspect our metrics, ensure they collected the counts we expected during this test + d := cluster.DaemonAt(0) + metricsURL := fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress) m, err := getMetricRequest(metricsURL, "gubernator_global_send_duration_count") assert.NoError(t, err) assert.Equal(t, 1, int(m.Value)) - }) - owner, err := cluster.FindOwningDaemon(name, key) - require.NoError(t, err) - require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1)) - - // Check different peers, they should have gotten the broadcast from the owner - sendHit(peers[1].MustClient(), guber.Status_UNDER_LIMIT, 0, 2) - sendHit(peers[2].MustClient(), guber.Status_UNDER_LIMIT, 0, 2) - - // Non owning peer should calculate the rate limit remaining before forwarding - // to the owner. - sendHit(peers[3].MustClient(), guber.Status_UNDER_LIMIT, 2, 0) - - require.NoError(t, waitForBroadcast(clock.Second*3, owner, 2)) + // Expect one peer (the owning peer) to indicate a broadcast. + var broadcastCount int + for i := 0; i < cluster.NumOfDaemons(); i++ { + d := cluster.DaemonAt(i) + metricsURL := fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress) + m, err := getMetricRequest(metricsURL, "gubernator_broadcast_duration_count") + assert.NoError(t, err) + broadcastCount += int(m.Value) + } - sendHit(peers[4].MustClient(), guber.Status_OVER_LIMIT, 1, 0) + assert.Equal(t, 1, broadcastCount) + }) } // Ensure global broadcast updates all peers when GetRateLimits is called on // either owner or non-owner peer. func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { ctx := context.Background() - const name = "test_global" - key := fmt.Sprintf("key:%016x", rand.Int()) + name := t.Name() + key := randomKey() // Determine owner and non-owner peers. - ownerPeerInfo, err := cluster.FindOwningPeer(name, key) + owner, err := cluster.FindOwningDaemon(name, key) require.NoError(t, err) - owner := ownerPeerInfo.GRPCAddress - nonOwner := cluster.PeerAt(0).GRPCAddress - if nonOwner == owner { - nonOwner = cluster.PeerAt(1).GRPCAddress - } - require.NotEqual(t, owner, nonOwner) + // ownerAddr := owner.ownerPeerInfo.GRPCAddress + peers, err := cluster.ListNonOwningDaemons(name, key) + require.NoError(t, err) + nonOwner := peers[0] // Connect to owner and non-owner peers in round robin. dialOpts := []grpc.DialOption{ @@ -1046,22 +982,22 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), } - address := fmt.Sprintf("static:///%s,%s", owner, nonOwner) + address := fmt.Sprintf("static:///%s,%s", owner.PeerInfo.GRPCAddress, nonOwner.PeerInfo.GRPCAddress) conn, err := grpc.DialContext(ctx, address, dialOpts...) require.NoError(t, err) client := guber.NewV1Client(conn) - sendHit := func(status guber.Status, i int) { - ctx, cancel := context.WithTimeout(ctx, 10*clock.Second) + sendHit := func(client guber.V1Client, status guber.Status, i int) { + ctx, cancel := context.WithTimeout(context.Background(), 10*clock.Second) defer cancel() resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ { Name: name, UniqueKey: key, - Algorithm: guber.Algorithm_LEAKY_BUCKET, + Algorithm: guber.Algorithm_TOKEN_BUCKET, Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 5, + Duration: 5 * guber.Minute, Hits: 1, Limit: 2, }, @@ -1069,321 +1005,73 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { }) require.NoError(t, err, i) item := resp.Responses[0] - assert.Equal(t, "", item.GetError(), fmt.Sprintf("mismatch error, iteration %d", i)) - assert.Equal(t, status, item.GetStatus(), fmt.Sprintf("mismatch status, iteration %d", i)) + assert.Equal(t, "", item.Error, fmt.Sprintf("unexpected error, iteration %d", i)) + assert.Equal(t, status, item.Status, fmt.Sprintf("mismatch status, iteration %d", i)) } + require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...)) + // Send two hits that should be processed by the owner and non-owner and // deplete the limit consistently. - sendHit(guber.Status_UNDER_LIMIT, 1) - sendHit(guber.Status_UNDER_LIMIT, 2) - - // Sleep to ensure the global broadcast occurs (every 100ms). - time.Sleep(150 * time.Millisecond) + sendHit(client, guber.Status_UNDER_LIMIT, 1) + sendHit(client, guber.Status_UNDER_LIMIT, 2) + require.NoError(t, waitForBroadcast(3*clock.Second, owner, 1)) // All successive hits should return OVER_LIMIT. for i := 2; i <= 10; i++ { - sendHit(guber.Status_OVER_LIMIT, i) + sendHit(client, guber.Status_OVER_LIMIT, i) } } func TestGlobalRateLimitsPeerOverLimit(t *testing.T) { - const ( - name = "test_global_token_limit" - key = "account:12345" - ) - - peers, err := cluster.ListNonOwningDaemons(name, key) - require.NoError(t, err) - - sendHit := func(expectedStatus guber.Status, hits int64) { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) - defer cancel() - resp, err := peers[0].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: name, - UniqueKey: key, - Algorithm: guber.Algorithm_TOKEN_BUCKET, - Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 5, - Hits: hits, - Limit: 2, - }, - }, - }) - assert.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].GetError()) - assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus()) - } + name := t.Name() + key := randomKey() owner, err := cluster.FindOwningDaemon(name, key) require.NoError(t, err) - - // Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining - sendHit(guber.Status_UNDER_LIMIT, 1) - sendHit(guber.Status_UNDER_LIMIT, 1) - // Wait for the broadcast from the owner to the peer - require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1)) - // Since the remainder is 0, the peer should set OVER_LIMIT instead of waiting for the owner - // to respond with OVER_LIMIT. - sendHit(guber.Status_OVER_LIMIT, 1) - // Wait for the broadcast from the owner to the peer - require.NoError(t, waitForBroadcast(clock.Second*3, owner, 2)) - // The status should still be OVER_LIMIT - sendHit(guber.Status_OVER_LIMIT, 0) -} - -func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) { - const ( - name = "test_global_token_limit_leaky" - key = "account:12345" - ) - - peers, err := cluster.ListNonOwningDaemons(name, key) - require.NoError(t, err) - - sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64) { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) - defer cancel() - resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: name, - UniqueKey: key, - Algorithm: guber.Algorithm_LEAKY_BUCKET, - Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 5, - Hits: hits, - Limit: 2, - }, - }, - }) - assert.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].GetError()) - assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus()) - } - owner, err := cluster.FindOwningDaemon(name, key) - require.NoError(t, err) - - // Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining - sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1) - sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1) - // Wait for the broadcast from the owner to the peers - require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1)) - // Ask a different peer if the status is over the limit - sendHit(peers[1].MustClient(), guber.Status_OVER_LIMIT, 1) -} - -func TestGlobalRequestMoreThanAvailable(t *testing.T) { - const ( - name = "test_global_more_than_available" - key = "account:123456" - ) - - peers, err := cluster.ListNonOwningDaemons(name, key) - require.NoError(t, err) - - sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64, remaining int64) { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) - defer cancel() - resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: name, - UniqueKey: key, - Algorithm: guber.Algorithm_LEAKY_BUCKET, - Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 1_000, - Hits: hits, - Limit: 100, - }, - }, - }) - assert.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].GetError()) - assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus()) - } - owner, err := cluster.FindOwningDaemon(name, key) - require.NoError(t, err) - - prev, err := getBroadcastCount(owner) - require.NoError(t, err) - - // Ensure GRPC has connections to each peer before we start, as we want - // the actual test requests to happen quite fast. - for _, p := range peers { - sendHit(p.MustClient(), guber.Status_UNDER_LIMIT, 0, 100) - } - - // Send a request for 50 hits from each non owning peer in the cluster. These requests - // will be queued and sent to the owner as accumulated hits. As a result of the async nature - // of `Behavior_GLOBAL` rate limit requests spread across peers like this will be allowed to - // over-consume their resource within the rate limit window until the owner is updated and - // a broadcast to all peers is received. - // - // The maximum number of resources that can be over-consumed can be calculated by multiplying - // the remainder by the number of peers in the cluster. For example: If you have a remainder of 100 - // and a cluster of 10 instances, then the maximum over-consumed resource is 1,000. If you need - // a more accurate remaining calculation, and wish to avoid over consuming a resource, then do - // not use `Behavior_GLOBAL`. - for _, p := range peers { - sendHit(p.MustClient(), guber.Status_UNDER_LIMIT, 50, 50) - } - - // Wait for the broadcast from the owner to the peers - require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+1)) - - // We should be over the limit - sendHit(peers[0].MustClient(), guber.Status_OVER_LIMIT, 1, 0) -} - -func TestGlobalNegativeHits(t *testing.T) { - const ( - name = "test_global_negative_hits" - key = "account:12345" - ) - peers, err := cluster.ListNonOwningDaemons(name, key) require.NoError(t, err) - sendHit := func(client guber.V1Client, status guber.Status, hits int64, remaining int64) { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) + sendHit := func(expectedStatus guber.Status, hits, expectedRemaining int64) { + ctx, cancel := context.WithTimeout(context.Background(), 10*clock.Second) defer cancel() - resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ + resp, err := peers[0].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ { Name: name, UniqueKey: key, Algorithm: guber.Algorithm_TOKEN_BUCKET, Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 100, + Duration: 5 * guber.Minute, Hits: hits, Limit: 2, }, }, }) assert.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].GetError()) - assert.Equal(t, status, resp.Responses[0].GetStatus()) - assert.Equal(t, remaining, resp.Responses[0].Remaining) - } - owner, err := cluster.FindOwningDaemon(name, key) - require.NoError(t, err) - prev, err := getBroadcastCount(owner) - require.NoError(t, err) - - // Send a negative hit on a rate limit with no hits - sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, -1, 3) - - // Wait for the negative remaining to propagate - require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+1)) - - // Send another negative hit to a different peer - sendHit(peers[1].MustClient(), guber.Status_UNDER_LIMIT, -1, 4) - - require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+2)) - - // Should have 4 in the remainder - sendHit(peers[2].MustClient(), guber.Status_UNDER_LIMIT, 4, 0) - - require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+3)) - - sendHit(peers[3].MustClient(), guber.Status_UNDER_LIMIT, 0, 0) -} - -func TestGlobalResetRemaining(t *testing.T) { - const ( - name = "test_global_reset" - key = "account:123456" - ) - - peers, err := cluster.ListNonOwningDaemons(name, key) - require.NoError(t, err) - - sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64, remaining int64) { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) - defer cancel() - resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: name, - UniqueKey: key, - Algorithm: guber.Algorithm_LEAKY_BUCKET, - Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 1_000, - Hits: hits, - Limit: 100, - }, - }, - }) - assert.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].GetError()) - assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus()) - assert.Equal(t, remaining, resp.Responses[0].Remaining) - } - owner, err := cluster.FindOwningDaemon(name, key) - require.NoError(t, err) - prev, err := getBroadcastCount(owner) - require.NoError(t, err) - - for _, p := range peers { - sendHit(p.MustClient(), guber.Status_UNDER_LIMIT, 50, 50) + item := resp.Responses[0] + assert.Equal(t, "", item.Error, "unexpected error") + assert.Equal(t, expectedStatus, item.Status, "mismatch status") + assert.Equal(t, expectedRemaining, item.Remaining, "mismatch remaining") } - // Wait for the broadcast from the owner to the peers - require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+1)) + require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...)) - // We should be over the limit and remaining should be zero - sendHit(peers[0].MustClient(), guber.Status_OVER_LIMIT, 1, 0) + // Send two hits that should be processed by the owner and the broadcast to + // peer, depleting the remaining. + sendHit(guber.Status_UNDER_LIMIT, 1, 1) + sendHit(guber.Status_UNDER_LIMIT, 1, 0) - // Now reset the remaining - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) - defer cancel() - resp, err := peers[0].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: name, - UniqueKey: key, - Algorithm: guber.Algorithm_LEAKY_BUCKET, - Behavior: guber.Behavior_GLOBAL | guber.Behavior_RESET_REMAINING, - Duration: guber.Minute * 1_000, - Hits: 0, - Limit: 100, - }, - }, - }) - require.NoError(t, err) - assert.NotEqual(t, 100, resp.Responses[0].Remaining) - - // Wait for the reset to propagate. - require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+2)) + // Wait for the broadcast from the owner to the peer + require.NoError(t, waitForBroadcast(3*clock.Second, owner, 1)) - // Check a different peer to ensure remaining has been reset - resp, err = peers[1].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: name, - UniqueKey: key, - Algorithm: guber.Algorithm_LEAKY_BUCKET, - Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 1_000, - Hits: 0, - Limit: 100, - }, - }, - }) - require.NoError(t, err) - assert.NotEqual(t, 100, resp.Responses[0].Remaining) + // Since the remainder is 0, the peer should return OVER_LIMIT on next hit. + sendHit(guber.Status_OVER_LIMIT, 1, 0) -} + // Wait for the broadcast from the owner to the peer. + require.NoError(t, waitForBroadcast(3*clock.Second, owner, 2)) -func getMetricRequest(url string, name string) (*model.Sample, error) { - resp, err := http.Get(url) - if err != nil { - return nil, err - } - defer resp.Body.Close() - return getMetric(resp.Body, name) + // The status should still be OVER_LIMIT. + sendHit(guber.Status_OVER_LIMIT, 0, 0) } func TestChangeLimit(t *testing.T) { @@ -1617,20 +1305,9 @@ func TestHealthCheck(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), clock.Second*15) defer cancel() require.NoError(t, cluster.Restart(ctx)) - - // wait for every peer instance to come back online - for _, peer := range cluster.GetPeers() { - peerClient, err := guber.DialV1Server(peer.GRPCAddress, nil) - require.NoError(t, err) - testutil.UntilPass(t, 10, clock.Millisecond*300, func(t testutil.TestingT) { - healthResp, err = peerClient.HealthCheck(context.Background(), &guber.HealthCheckReq{}) - assert.Equal(t, "healthy", healthResp.GetStatus()) - }) - } } func TestLeakyBucketDivBug(t *testing.T) { - // Freeze time so we don't leak during the test defer clock.Freeze(clock.Now()).Unfreeze() client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) @@ -1732,10 +1409,9 @@ func TestGRPCGateway(t *testing.T) { func TestGetPeerRateLimits(t *testing.T) { ctx := context.Background() - peerClient, err := guber.NewPeerClient(guber.PeerConfig{ + peerClient := guber.NewPeerClient(guber.PeerConfig{ Info: cluster.GetRandomPeer(cluster.DataCenterNone), }) - require.NoError(t, err) t.Run("Stable rate check request order", func(t *testing.T) { // Ensure response order matches rate check request order. @@ -1779,6 +1455,463 @@ func TestGetPeerRateLimits(t *testing.T) { // TODO: Add a test for sending no rate limits RateLimitReqList.RateLimits = nil +func TestGlobalBehavior(t *testing.T) { + const limit = 1000 + broadcastTimeout := 400 * time.Millisecond + + makeReq := func(name, key string, hits int64) *guber.RateLimitReq { + return &guber.RateLimitReq{ + Name: name, + UniqueKey: key, + Algorithm: guber.Algorithm_TOKEN_BUCKET, + Behavior: guber.Behavior_GLOBAL, + Duration: guber.Minute * 3, + Hits: hits, + Limit: limit, + } + } + + t.Run("Hits on owner peer", func(t *testing.T) { + testCases := []struct { + Name string + Hits int64 + }{ + {Name: "Single hit", Hits: 1}, + {Name: "Multiple hits", Hits: 10}, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + name := t.Name() + key := fmt.Sprintf("account:%08x", rand.Int()) + peers, err := cluster.ListNonOwningDaemons(name, key) + require.NoError(t, err) + owner, err := cluster.FindOwningDaemon(name, key) + require.NoError(t, err) + t.Logf("Owner peer: %s", owner.InstanceID) + + require.NoError(t, waitForIdle(1*time.Minute, cluster.GetDaemons()...)) + + broadcastCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_broadcast_duration_count") + updateCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_global_send_duration_count") + upgCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/UpdatePeerGlobals\"}") + gprlCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/GetPeerRateLimits\"}") + + // When + for i := int64(0); i < testCase.Hits; i++ { + sendHit(t, owner, makeReq(name, key, 1), guber.Status_UNDER_LIMIT, 999-i) + } + + // Then + // Expect a single global broadcast to all non-owner peers. + t.Log("Waiting for global broadcasts") + var wg sync.WaitGroup + var didOwnerBroadcast, didNonOwnerBroadcast int + wg.Add(len(peers) + 1) + go func() { + expected := broadcastCounters[owner.InstanceID] + 1 + if err := waitForBroadcast(broadcastTimeout, owner, expected); err == nil { + didOwnerBroadcast++ + t.Log("Global broadcast from owner") + } + wg.Done() + }() + for _, peer := range peers { + go func(peer *guber.Daemon) { + expected := broadcastCounters[peer.InstanceID] + 1 + if err := waitForBroadcast(broadcastTimeout, peer, expected); err == nil { + didNonOwnerBroadcast++ + t.Logf("Global broadcast from peer %s", peer.InstanceID) + } + wg.Done() + }(peer) + } + wg.Wait() + assert.Equal(t, 1, didOwnerBroadcast) + assert.Zero(t, didNonOwnerBroadcast) + + // Check for global hits update from non-owner to owner peer. + // Expect no global hits update because the hits were given + // directly to the owner peer. + t.Log("Waiting for global broadcasts") + var didOwnerUpdate, didNonOwnerUpdate int + wg.Add(len(peers) + 1) + go func() { + expected := updateCounters[owner.InstanceID] + 1 + if err := waitForUpdate(broadcastTimeout, owner, expected); err == nil { + didOwnerUpdate++ + t.Log("Global hits update from owner") + } + wg.Done() + }() + for _, peer := range peers { + go func(peer *guber.Daemon) { + expected := updateCounters[peer.InstanceID] + 1 + if err := waitForUpdate(broadcastTimeout, peer, expected); err == nil { + didNonOwnerUpdate++ + t.Logf("Global hits update from peer %s", peer.InstanceID) + } + wg.Done() + + }(peer) + } + wg.Wait() + assert.Zero(t, didOwnerUpdate) + assert.Zero(t, didNonOwnerUpdate) + + // Assert UpdatePeerGlobals endpoint called once on each peer except owner. + // Used by global broadcast. + upgCounters2 := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/UpdatePeerGlobals\"}") + for _, peer := range cluster.GetDaemons() { + expected := upgCounters[peer.InstanceID] + if peer.PeerInfo.DataCenter == cluster.DataCenterNone && peer.InstanceID != owner.InstanceID { + expected++ + } + assert.Equal(t, expected, upgCounters2[peer.InstanceID]) + } + + // Assert PeerGetRateLimits endpoint not called. + // Used by global hits update. + gprlCounters2 := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/GetPeerRateLimits\"}") + for _, peer := range cluster.GetDaemons() { + expected := gprlCounters[peer.InstanceID] + assert.Equal(t, expected, gprlCounters2[peer.InstanceID]) + } + + // Verify all peers report consistent remaining value value. + for _, peer := range cluster.GetDaemons() { + if peer.PeerInfo.DataCenter != cluster.DataCenterNone { + continue + } + sendHit(t, peer, makeReq(name, key, 0), guber.Status_UNDER_LIMIT, limit-testCase.Hits) + } + }) + } + }) + + t.Run("Hits on non-owner peer", func(t *testing.T) { + testCases := []struct { + Name string + Hits int64 + }{ + {Name: "Single hit", Hits: 1}, + {Name: "Multiple htis", Hits: 10}, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + name := t.Name() + key := fmt.Sprintf("account:%08x", rand.Int()) + peers, err := cluster.ListNonOwningDaemons(name, key) + require.NoError(t, err) + owner, err := cluster.FindOwningDaemon(name, key) + require.NoError(t, err) + t.Logf("Owner peer: %s", owner.InstanceID) + + require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...)) + + broadcastCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_broadcast_duration_count") + updateCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_global_send_duration_count") + upgCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/UpdatePeerGlobals\"}") + gprlCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/GetPeerRateLimits\"}") + + // When + for i := int64(0); i < testCase.Hits; i++ { + sendHit(t, peers[0], makeReq(name, key, 1), guber.Status_UNDER_LIMIT, 999-i) + } + + // Then + // Check for global hits update from non-owner to owner peer. + // Expect single global hits update from non-owner peer that received hits. + t.Log("Waiting for global hits updates") + var wg sync.WaitGroup + var didOwnerUpdate int + var didNonOwnerUpdate []string + wg.Add(len(peers) + 1) + go func() { + expected := updateCounters[owner.InstanceID] + 1 + if err := waitForUpdate(broadcastTimeout, owner, expected); err == nil { + didOwnerUpdate++ + t.Log("Global hits update from owner") + } + wg.Done() + }() + for _, peer := range peers { + go func(peer *guber.Daemon) { + expected := updateCounters[peer.InstanceID] + 1 + if err := waitForUpdate(broadcastTimeout, peer, expected); err == nil { + didNonOwnerUpdate = append(didNonOwnerUpdate, peer.InstanceID) + t.Logf("Global hits update from peer %s", peer.InstanceID) + } + wg.Done() + + }(peer) + } + wg.Wait() + assert.Zero(t, didOwnerUpdate) + assert.Len(t, didNonOwnerUpdate, 1) + assert.Equal(t, []string{peers[0].InstanceID}, didNonOwnerUpdate) + + // Expect a single global broadcast to all non-owner peers. + t.Log("Waiting for global broadcasts") + var didOwnerBroadcast, didNonOwnerBroadcast int + wg.Add(len(peers) + 1) + go func() { + expected := broadcastCounters[owner.InstanceID] + 1 + if err := waitForBroadcast(broadcastTimeout, owner, expected); err == nil { + didOwnerBroadcast++ + t.Log("Global broadcast from owner") + } + wg.Done() + }() + for _, peer := range peers { + go func(peer *guber.Daemon) { + expected := broadcastCounters[peer.InstanceID] + 1 + if err := waitForBroadcast(broadcastTimeout, peer, expected); err == nil { + didNonOwnerBroadcast++ + t.Logf("Global broadcast from peer %s", peer.InstanceID) + } + wg.Done() + }(peer) + } + wg.Wait() + assert.Equal(t, 1, didOwnerBroadcast) + assert.Empty(t, didNonOwnerBroadcast) + + // Assert UpdatePeerGlobals endpoint called once on each peer except owner. + // Used by global broadcast. + upgCounters2 := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/UpdatePeerGlobals\"}") + for _, peer := range cluster.GetDaemons() { + expected := upgCounters[peer.InstanceID] + if peer.PeerInfo.DataCenter == cluster.DataCenterNone && peer.InstanceID != owner.InstanceID { + expected++ + } + assert.Equal(t, expected, upgCounters2[peer.InstanceID], "upgCounter %s", peer.InstanceID) + } + + // Assert PeerGetRateLimits endpoint called once on owner. + // Used by global hits update. + gprlCounters2 := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/GetPeerRateLimits\"}") + for _, peer := range cluster.GetDaemons() { + expected := gprlCounters[peer.InstanceID] + if peer.InstanceID == owner.InstanceID { + expected++ + } + assert.Equal(t, expected, gprlCounters2[peer.InstanceID], "gprlCounter %s", peer.InstanceID) + } + + // Verify all peers report consistent remaining value value. + for _, peer := range cluster.GetDaemons() { + if peer.PeerInfo.DataCenter != cluster.DataCenterNone { + continue + } + sendHit(t, peer, makeReq(name, key, 0), guber.Status_UNDER_LIMIT, limit-testCase.Hits) + } + }) + } + }) + + t.Run("Distributed hits", func(t *testing.T) { + testCases := []struct { + Name string + Hits int + }{ + {Name: "2 hits", Hits: 2}, + {Name: "10 hits", Hits: 10}, + {Name: "100 hits", Hits: 100}, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + name := t.Name() + key := fmt.Sprintf("account:%08x", rand.Int()) + peers, err := cluster.ListNonOwningDaemons(name, key) + require.NoError(t, err) + owner, err := cluster.FindOwningDaemon(name, key) + require.NoError(t, err) + var localPeers []*guber.Daemon + for _, peer := range cluster.GetDaemons() { + if peer.PeerInfo.DataCenter == cluster.DataCenterNone && peer.InstanceID != owner.InstanceID { + localPeers = append(localPeers, peer) + } + } + t.Logf("Owner peer: %s", owner.InstanceID) + + require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...)) + + broadcastCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_broadcast_duration_count") + updateCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_global_send_duration_count") + upgCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/UpdatePeerGlobals\"}") + gprlCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/GetPeerRateLimits\"}") + expectUpdate := make(map[string]struct{}) + var wg sync.WaitGroup + var mutex sync.Mutex + + // When + wg.Add(testCase.Hits) + for i := 0; i < testCase.Hits; i++ { + peer := localPeers[i%len(localPeers)] + go func(peer *guber.Daemon) { + sendHit(t, peer, makeReq(name, key, 1), guber.Status_UNDER_LIMIT, -1) + if peer.InstanceID != owner.InstanceID { + mutex.Lock() + expectUpdate[peer.InstanceID] = struct{}{} + mutex.Unlock() + } + wg.Done() + }(peer) + } + wg.Wait() + + // Then + // Check for global hits update from non-owner to owner peer. + // Expect single update from each non-owner peer that received + // hits. + t.Log("Waiting for global hits updates") + var didOwnerUpdate int64 + var didNonOwnerUpdate []string + wg.Add(len(peers) + 1) + go func() { + expected := updateCounters[owner.InstanceID] + 1 + if err := waitForUpdate(broadcastTimeout, owner, expected); err == nil { + atomic.AddInt64(&didOwnerUpdate, 1) + t.Log("Global hits update from owner") + } + wg.Done() + }() + for _, peer := range peers { + go func(peer *guber.Daemon) { + expected := updateCounters[peer.InstanceID] + 1 + if err := waitForUpdate(broadcastTimeout, peer, expected); err == nil { + mutex.Lock() + didNonOwnerUpdate = append(didNonOwnerUpdate, peer.InstanceID) + mutex.Unlock() + t.Logf("Global hits update from peer %s", peer.InstanceID) + } + wg.Done() + + }(peer) + } + wg.Wait() + assert.Zero(t, didOwnerUpdate) + assert.Len(t, didNonOwnerUpdate, len(expectUpdate)) + expectedNonOwnerUpdate := maps.Keys(expectUpdate) + sort.Strings(expectedNonOwnerUpdate) + sort.Strings(didNonOwnerUpdate) + assert.Equal(t, expectedNonOwnerUpdate, didNonOwnerUpdate) + + // Expect a single global broadcast to all non-owner peers. + t.Log("Waiting for global broadcasts") + var didOwnerBroadcast, didNonOwnerBroadcast int64 + wg.Add(len(peers) + 1) + go func() { + expected := broadcastCounters[owner.InstanceID] + 1 + if err := waitForBroadcast(broadcastTimeout, owner, expected); err == nil { + atomic.AddInt64(&didOwnerBroadcast, 1) + t.Log("Global broadcast from owner") + } + wg.Done() + }() + for _, peer := range peers { + go func(peer *guber.Daemon) { + expected := broadcastCounters[peer.InstanceID] + 1 + if err := waitForBroadcast(broadcastTimeout, peer, expected); err == nil { + atomic.AddInt64(&didNonOwnerBroadcast, 1) + t.Logf("Global broadcast from peer %s", peer.InstanceID) + } + wg.Done() + }(peer) + } + wg.Wait() + assert.Equal(t, int64(1), didOwnerBroadcast) + assert.Empty(t, didNonOwnerBroadcast) + + // Assert UpdatePeerGlobals endpoint called at least + // once on each peer except owner. + // Used by global broadcast. + upgCounters2 := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/UpdatePeerGlobals\"}") + for _, peer := range cluster.GetDaemons() { + expected := upgCounters[peer.InstanceID] + if peer.PeerInfo.DataCenter == cluster.DataCenterNone && peer.InstanceID != owner.InstanceID { + expected++ + } + assert.GreaterOrEqual(t, upgCounters2[peer.InstanceID], expected, "upgCounter %s", peer.InstanceID) + } + + // Assert PeerGetRateLimits endpoint called on owner + // for each non-owner that received hits. + // Used by global hits update. + gprlCounters2 := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/GetPeerRateLimits\"}") + for _, peer := range cluster.GetDaemons() { + expected := gprlCounters[peer.InstanceID] + if peer.InstanceID == owner.InstanceID { + expected += len(expectUpdate) + } + assert.Equal(t, expected, gprlCounters2[peer.InstanceID], "gprlCounter %s", peer.InstanceID) + } + + // Verify all peers report consistent remaining value value. + for _, peer := range cluster.GetDaemons() { + if peer.PeerInfo.DataCenter != cluster.DataCenterNone { + continue + } + sendHit(t, peer, makeReq(name, key, 0), guber.Status_UNDER_LIMIT, int64(limit-testCase.Hits)) + } + }) + } + }) +} + +// Request metrics and parse into map. +// Optionally pass names to filter metrics by name. +func getMetrics(HTTPAddr string, names ...string) (map[string]*model.Sample, error) { + url := fmt.Sprintf("http://%s/metrics", HTTPAddr) + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + decoder := expfmt.SampleDecoder{ + Dec: expfmt.NewDecoder(resp.Body, expfmt.FmtText), + Opts: &expfmt.DecodeOptions{ + Timestamp: model.Now(), + }, + } + nameSet := make(map[string]struct{}) + for _, name := range names { + nameSet[name] = struct{}{} + } + metrics := make(map[string]*model.Sample) + + for { + var smpls model.Vector + err := decoder.Decode(&smpls) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + for _, smpl := range smpls { + name := smpl.Metric.String() + if _, ok := nameSet[name]; ok || len(nameSet) == 0 { + metrics[name] = smpl + } + } + } + + return metrics, nil +} + +func getMetricRequest(url string, name string) (*model.Sample, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return getMetric(resp.Body, name) +} + func getMetric(in io.Reader, name string) (*model.Sample, error) { dec := expfmt.SampleDecoder{ Dec: expfmt.NewDecoder(in, expfmt.FmtText), @@ -1808,44 +1941,142 @@ func getMetric(in io.Reader, name string) (*model.Sample, error) { return nil, nil } -// getBroadcastCount returns the current broadcast count for use with waitForBroadcast() -// TODO: Replace this with something else, we can call and reset via HTTP/GRPC calls in gubernator v3 -func getBroadcastCount(d *guber.Daemon) (int, error) { - m, err := getMetricRequest(fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress), - "gubernator_broadcast_duration_count") - if err != nil { - return 0, err - } +// waitForBroadcast waits until the broadcast count for the daemon changes to +// at least the expected value and the broadcast queue is empty. +// Returns an error if timeout waiting for conditions to be met. +func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for { + metrics, err := getMetrics(d.Config().HTTPListenAddress, + "gubernator_broadcast_duration_count", "gubernator_global_queue_length") + if err != nil { + return err + } + gbdc := metrics["gubernator_broadcast_duration_count"] + ggql := metrics["gubernator_global_queue_length"] + + // It's possible a broadcast occurred twice if waiting for multiple + // peers to forward updates to non-owners. + if int(gbdc.Value) >= expect && ggql.Value == 0 { + return nil + } - return int(m.Value), nil + select { + case <-clock.After(100 * clock.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + } } -// waitForBroadcast waits until the broadcast count for the daemon passed -// changes to the expected value. Returns an error if the expected value is -// not found before the context is cancelled. -func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error { +// waitForUpdate waits until the global hits update count for the daemon +// changes to at least the expected value and the global update queue is empty. +// Returns an error if timeout waiting for conditions to be met. +func waitForUpdate(timeout clock.Duration, d *guber.Daemon, expect int) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() for { - m, err := getMetricRequest(fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress), - "gubernator_broadcast_duration_count") + metrics, err := getMetrics(d.Config().HTTPListenAddress, + "gubernator_global_send_duration_count", "gubernator_global_send_queue_length") if err != nil { return err } + gsdc := metrics["gubernator_global_send_duration_count"] + gsql := metrics["gubernator_global_send_queue_length"] - // It's possible a broadcast occurred twice if waiting for multiple peer to + // It's possible a hit occurred twice if waiting for multiple peers to // forward updates to the owner. - if int(m.Value) >= expect { - // Give the nodes some time to process the broadcasts - clock.Sleep(clock.Millisecond * 500) + if int(gsdc.Value) >= expect && gsql.Value == 0 { return nil } select { - case <-clock.After(time.Millisecond * 800): + case <-clock.After(100 * clock.Millisecond): case <-ctx.Done(): return ctx.Err() } } } + +// waitForIdle waits until both global broadcast and global hits queues are +// empty. +func waitForIdle(timeout clock.Duration, daemons ...*guber.Daemon) error { + var wg syncutil.WaitGroup + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for _, d := range daemons { + wg.Run(func(raw any) error { + d := raw.(*guber.Daemon) + for { + metrics, err := getMetrics(d.Config().HTTPListenAddress, + "gubernator_global_queue_length", "gubernator_global_send_queue_length") + if err != nil { + return err + } + ggql := metrics["gubernator_global_queue_length"] + gsql := metrics["gubernator_global_send_queue_length"] + + if ggql.Value == 0 && gsql.Value == 0 { + return nil + } + + select { + case <-clock.After(100 * clock.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + } + }, d) + } + errs := wg.Wait() + if len(errs) > 0 { + return errs[0] + } + return nil +} + +func getMetricValue(t *testing.T, d *guber.Daemon, name string) float64 { + m, err := getMetricRequest(fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress), + name) + require.NoError(t, err) + if m == nil { + return 0 + } + return float64(m.Value) +} + +// Get metric counter values on each peer. +func getPeerCounters(t *testing.T, peers []*guber.Daemon, name string) map[string]int { + counters := make(map[string]int) + for _, peer := range peers { + counters[peer.InstanceID] = int(getMetricValue(t, peer, name)) + } + return counters +} + +func sendHit(t *testing.T, d *guber.Daemon, req *guber.RateLimitReq, expectStatus guber.Status, expectRemaining int64) { + if req.Hits != 0 { + t.Logf("Sending %d hits to peer %s", req.Hits, d.InstanceID) + } + client := d.MustClient() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{req}, + }) + require.NoError(t, err) + item := resp.Responses[0] + assert.Equal(t, "", item.Error) + if expectRemaining >= 0 { + assert.Equal(t, expectRemaining, item.Remaining) + } + assert.Equal(t, expectStatus, item.Status) + assert.Equal(t, req.Limit, item.Limit) +} + +func randomKey() string { + return fmt.Sprintf("%016x", rand.Int()) +} diff --git a/global.go b/global.go index bd0c1e7c..5709dea8 100644 --- a/global.go +++ b/global.go @@ -20,37 +20,42 @@ import ( "context" "github.com/mailgun/holster/v4/syncutil" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "google.golang.org/protobuf/proto" ) // globalManager manages async hit queue and updates peers in // the cluster periodically when a global rate limit we own updates. type globalManager struct { - hitsQueue chan *RateLimitReq - broadcastQueue chan *UpdatePeerGlobal - wg syncutil.WaitGroup - conf BehaviorConfig - log FieldLogger - instance *V1Instance // TODO circular import? V1Instance also holds a reference to globalManager - metricGlobalSendDuration prometheus.Summary - metricBroadcastDuration prometheus.Summary - metricBroadcastCounter *prometheus.CounterVec - metricGlobalQueueLength prometheus.Gauge + hitsQueue chan *RateLimitReq + updatesQueue chan *RateLimitReq + wg syncutil.WaitGroup + conf BehaviorConfig + log FieldLogger + instance *V1Instance // todo circular import? V1Instance also holds a reference to globalManager + metricGlobalSendDuration prometheus.Summary + metricGlobalSendQueueLength prometheus.Gauge + metricBroadcastDuration prometheus.Summary + metricBroadcastCounter *prometheus.CounterVec + metricGlobalQueueLength prometheus.Gauge } func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager { gm := globalManager{ - log: instance.log, - hitsQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), - broadcastQueue: make(chan *UpdatePeerGlobal, conf.GlobalBatchLimit), - instance: instance, - conf: conf, + log: instance.log, + hitsQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), + updatesQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), + instance: instance, + conf: conf, metricGlobalSendDuration: prometheus.NewSummary(prometheus.SummaryOpts{ Name: "gubernator_global_send_duration", Help: "The duration of GLOBAL async sends in seconds.", Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001}, }), + metricGlobalSendQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gubernator_global_send_queue_length", + Help: "The count of requests queued up for global broadcast. This is only used for GetRateLimit requests using global behavior.", + }), metricBroadcastDuration: prometheus.NewSummary(prometheus.SummaryOpts{ Name: "gubernator_broadcast_duration", Help: "The duration of GLOBAL broadcasts to peers in seconds.", @@ -71,14 +76,14 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager } func (gm *globalManager) QueueHit(r *RateLimitReq) { - gm.hitsQueue <- r + if r.Hits != 0 { + gm.hitsQueue <- r + } } -func (gm *globalManager) QueueUpdate(req *RateLimitReq, resp *RateLimitResp) { - gm.broadcastQueue <- &UpdatePeerGlobal{ - Key: req.HashKey(), - Algorithm: req.Algorithm, - Status: resp, +func (gm *globalManager) QueueUpdate(r *RateLimitReq) { + if r.Hits != 0 { + gm.updatesQueue <- r } } @@ -95,15 +100,11 @@ func (gm *globalManager) runAsyncHits() { select { case r := <-gm.hitsQueue: + gm.metricGlobalSendQueueLength.Set(float64(len(hits))) // Aggregate the hits into a single request key := r.HashKey() _, ok := hits[key] if ok { - // If any of our hits includes a request to RESET_REMAINING - // ensure the owning peer gets this behavior - if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) { - SetBehavior(&hits[key].Behavior, Behavior_RESET_REMAINING, true) - } hits[key].Hits += r.Hits } else { hits[key] = r @@ -113,6 +114,7 @@ func (gm *globalManager) runAsyncHits() { if len(hits) == gm.conf.GlobalBatchLimit { gm.sendHits(hits) hits = make(map[string]*RateLimitReq) + gm.metricGlobalSendQueueLength.Set(0) return true } @@ -126,9 +128,9 @@ func (gm *globalManager) runAsyncHits() { if len(hits) != 0 { gm.sendHits(hits) hits = make(map[string]*RateLimitReq) + gm.metricGlobalSendQueueLength.Set(0) } case <-done: - interval.Stop() return false } return true @@ -152,6 +154,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { gm.log.WithError(err).Errorf("while getting peer for hash key '%s'", r.HashKey()) continue } + p, ok := peerRequests[peer.Info().GRPCAddress] if ok { p.req.Requests = append(p.req.Requests, r) @@ -188,37 +191,39 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { // and in a periodic frequency determined by GlobalSyncWait. func (gm *globalManager) runBroadcasts() { var interval = NewInterval(gm.conf.GlobalSyncWait) - updates := make(map[string]*UpdatePeerGlobal) + updates := make(map[string]*RateLimitReq) gm.wg.Until(func(done chan struct{}) bool { select { - case updateReq := <-gm.broadcastQueue: - updates[updateReq.Key] = updateReq + case r := <-gm.updatesQueue: + updates[r.HashKey()] = r + gm.metricGlobalQueueLength.Set(float64(len(updates))) - // Send the hits if we reached our batch limit + // Send the broadcast if we reached our batch limit if len(updates) >= gm.conf.GlobalBatchLimit { gm.metricBroadcastCounter.WithLabelValues("queue_full").Inc() - gm.broadcastPeers(context.Background(), updates) - updates = make(map[string]*UpdatePeerGlobal) + gm.broadcastPeers(updates) + updates = make(map[string]*RateLimitReq) + gm.metricGlobalQueueLength.Set(0) return true } - // If this is our first queued hit since last send + // If this is our first queued updated since last send // queue the next interval if len(updates) == 1 { interval.Next() } case <-interval.C: - if len(updates) != 0 { - gm.metricBroadcastCounter.WithLabelValues("timer").Inc() - gm.broadcastPeers(context.Background(), updates) - updates = make(map[string]*UpdatePeerGlobal) - } else { - gm.metricGlobalQueueLength.Set(0) + if len(updates) == 0 { + break } + gm.metricBroadcastCounter.WithLabelValues("timer").Inc() + gm.broadcastPeers(updates) + updates = make(map[string]*RateLimitReq) + gm.metricGlobalQueueLength.Set(0) + case <-done: - interval.Stop() return false } return true @@ -226,14 +231,29 @@ func (gm *globalManager) runBroadcasts() { } // broadcastPeers broadcasts global rate limit statuses to all other peers -func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]*UpdatePeerGlobal) { +func (gm *globalManager) broadcastPeers(updates map[string]*RateLimitReq) { defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration() + ctx := context.Background() var req UpdatePeerGlobalsReq - gm.metricGlobalQueueLength.Set(float64(len(updates))) - for _, r := range updates { - req.Globals = append(req.Globals, r) + // Copy the original since we are removing the GLOBAL behavior + rl := proto.Clone(r).(*RateLimitReq) + // We are only sending the status of the rate limit so, we + // clear the behavior flag, so we don't get queued for update again. + SetBehavior(&rl.Behavior, Behavior_GLOBAL, false) + rl.Hits = 0 + + status, err := gm.instance.getLocalRateLimit(ctx, rl) + if err != nil { + gm.log.WithError(err).Errorf("while getting local rate limit for: '%s'", rl.HashKey()) + continue + } + req.Globals = append(req.Globals, &UpdatePeerGlobal{ + Algorithm: rl.Algorithm, + Key: rl.HashKey(), + Status: status, + }) } fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency) @@ -250,8 +270,8 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] cancel() if err != nil { - // Only log if it's an unknown error - if !errors.Is(err, context.Canceled) && errors.Is(err, context.DeadlineExceeded) { + // Skip peers that are not in a ready state + if !IsNotReady(err) { gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress) } } @@ -261,10 +281,6 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] fan.Wait() } -// Close stops all goroutines and shuts down all the peers. func (gm *globalManager) Close() { gm.wg.Stop() - for _, peer := range gm.instance.GetPeerList() { - _ = peer.Shutdown(context.Background()) - } } diff --git a/go.mod b/go.mod index 93080b32..848172cf 100644 --- a/go.mod +++ b/go.mod @@ -22,9 +22,8 @@ require ( go.opentelemetry.io/otel v1.21.0 go.opentelemetry.io/otel/sdk v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 - go.uber.org/goleak v1.3.0 - golang.org/x/net v0.18.0 - golang.org/x/sync v0.3.0 + golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 + golang.org/x/net v0.21.0 golang.org/x/time v0.3.0 google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b google.golang.org/grpc v1.59.0 @@ -32,7 +31,7 @@ require ( k8s.io/api v0.23.3 k8s.io/apimachinery v0.23.3 k8s.io/client-go v0.23.3 - k8s.io/klog/v2 v2.120.1 + k8s.io/klog v0.3.1 ) require ( @@ -43,7 +42,7 @@ require ( github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -81,18 +80,19 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect - golang.org/x/mod v0.8.0 // indirect + golang.org/x/mod v0.15.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect - golang.org/x/sys v0.14.0 // indirect - golang.org/x/term v0.14.0 // indirect + golang.org/x/sys v0.17.0 // indirect + golang.org/x/term v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.6.0 // indirect + golang.org/x/tools v0.18.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20231012201019-e917dd12ba7a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.30.0 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect diff --git a/go.sum b/go.sum index fea9ef4c..0164981f 100644 --- a/go.sum +++ b/go.sum @@ -139,8 +139,8 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7 github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -452,7 +452,6 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= @@ -478,6 +477,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -503,8 +504,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.15.0 h1:SernR4v+D55NyBH2QiEQrlBAnj1ECL6AGrA5+dPaMY8= +golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -550,8 +551,8 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= -golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -579,8 +580,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -643,13 +643,13 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.14.0 h1:LGK9IlZ8T9jvdy6cTdfKUCltatMFOehAQo9SRC46UQ8= -golang.org/x/term v0.14.0/go.mod h1:TySc+nGkYR6qt8km8wUhuFRTVSMIX3XPR58y2lC8vww= +golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -721,8 +721,8 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ= +golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -883,11 +883,12 @@ k8s.io/apimachinery v0.23.3/go.mod h1:BEuFMMBaIbcOqVIJqNZJXGFTP4W6AycEpb5+m/97hr k8s.io/client-go v0.23.3 h1:23QYUmCQ/W6hW78xIwm3XqZrrKZM+LWDqW2zfo+szJs= k8s.io/client-go v0.23.3/go.mod h1:47oMd+YvAOqZM7pcQ6neJtBiFH7alOyfunYN48VsmwE= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= +k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68= +k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= +k8s.io/klog/v2 v2.30.0 h1:bUO6drIvCIsvZ/XFgfxoGFQU/a4Qkh0iAlvUR7vlHJw= k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= -k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 h1:E3J9oCLlaobFUqsjG9DfKbP2BmgwBL2p7pn0A3dG9W4= k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/gubernator.go b/gubernator.go index 7ec9a96a..a4a2625a 100644 --- a/gubernator.go +++ b/gubernator.go @@ -149,15 +149,17 @@ func (s *V1Instance) Close() (err error) { return nil } + if s.conf.Loader == nil { + return nil + } + s.global.Close() - if s.conf.Loader != nil { - err = s.workerPool.Store(ctx) - if err != nil { - s.log.WithError(err). - Error("Error in workerPool.Store") - return errors.Wrap(err, "Error in workerPool.Store") - } + err = s.workerPool.Store(ctx) + if err != nil { + s.log.WithError(err). + Error("Error in workerPool.Store") + return errors.Wrap(err, "Error in workerPool.Store") } err = s.workerPool.Close() @@ -343,7 +345,7 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) { // Make an RPC call to the peer that owns this rate limit r, err := req.Peer.GetPeerRateLimit(ctx, req.Req) if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + if IsNotReady(err) { attempts++ metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc() req.Peer, err = s.GetPeer(ctx, req.Key) @@ -394,9 +396,31 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) tracing.EndScope(ctx, err) }() + item, ok, err := s.workerPool.GetCacheItem(ctx, req.HashKey()) + if err != nil { + countError(err, "Error in workerPool.GetCacheItem") + return nil, errors.Wrap(err, "during in workerPool.GetCacheItem") + } + if ok { + // Global rate limits are always stored as RateLimitResp regardless of algorithm + rl, ok := item.Value.(*RateLimitResp) + if ok { + rl2 := proto.Clone(rl).(*RateLimitResp) + if req.Hits != 0 { + if req.Hits > rl2.Remaining { + rl2.Status = Status_OVER_LIMIT + } else { + rl2.Status = Status_UNDER_LIMIT + } + } + return rl2, nil + } + // We get here if the owning node hasn't asynchronously forwarded it's updates to us yet and + // our cache still holds the rate limit we created on the first hit. + } + cpy := proto.Clone(req).(*RateLimitReq) - SetBehavior(&cpy.Behavior, Behavior_NO_BATCHING, true) - SetBehavior(&cpy.Behavior, Behavior_GLOBAL, false) + cpy.Behavior = Behavior_NO_BATCHING // Process the rate limit like we own it resp, err = s.getLocalRateLimit(ctx, cpy) @@ -411,29 +435,14 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) // UpdatePeerGlobals updates the local cache with a list of global rate limits. This method should only // be called by a peer who is the owner of a global rate limit. func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) { - now := MillisecondNow() + defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.UpdatePeerGlobals")).ObserveDuration() for _, g := range r.Globals { item := &CacheItem{ ExpireAt: g.Status.ResetTime, Algorithm: g.Algorithm, + Value: g.Status, Key: g.Key, } - switch g.Algorithm { - case Algorithm_LEAKY_BUCKET: - item.Value = &LeakyBucketItem{ - Remaining: float64(g.Status.Remaining), - Limit: g.Status.Limit, - Burst: g.Status.Limit, - UpdatedAt: now, - } - case Algorithm_TOKEN_BUCKET: - item.Value = &TokenBucketItem{ - Status: g.Status.Status, - Limit: g.Status.Limit, - Remaining: g.Status.Remaining, - CreatedAt: now, - } - } err := s.workerPool.AddCacheItem(ctx, g.Key, item) if err != nil { return nil, errors.Wrap(err, "Error in workerPool.AddCacheItem") @@ -445,6 +454,7 @@ func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals // GetPeerRateLimits is called by other peers to get the rate limits owned by this peer. func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (resp *GetPeerRateLimitsResp, err error) { + defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.GetPeerRateLimits")).ObserveDuration() if len(r.Requests) > maxBatchSize { err := fmt.Errorf("'PeerRequest.rate_limits' list too large; max size is '%d'", maxBatchSize) metricCheckErrorCounter.WithLabelValues("Request too large").Inc() @@ -485,15 +495,6 @@ func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits // Extract the propagated context from the metadata in the request prop := propagation.TraceContext{} ctx := prop.Extract(ctx, &MetadataCarrier{Map: rin.req.Metadata}) - - // Forwarded global requests must have DRAIN_OVER_LIMIT set so token and leaky algorithms - // drain the remaining in the event a peer asks for more than is remaining. - // This is needed because with GLOBAL behavior peers will accumulate hits, which could - // result in requesting more hits than is remaining. - if HasBehavior(rin.req.Behavior, Behavior_GLOBAL) { - SetBehavior(&rin.req.Behavior, Behavior_DRAIN_OVER_LIMIT, true) - } - rl, err := s.getLocalRateLimit(ctx, rin.req) if err != nil { // Return the error for this request @@ -528,7 +529,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health localPeers := s.conf.LocalPicker.Peers() for _, peer := range localPeers { for _, errMsg := range peer.GetLastErr() { - err := fmt.Errorf("error returned from local peer.GetLastErr: %s", errMsg) + err := fmt.Errorf("Error returned from local peer.GetLastErr: %s", errMsg) span.RecordError(err) errs = append(errs, err.Error()) } @@ -538,7 +539,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health regionPeers := s.conf.RegionPicker.Peers() for _, peer := range regionPeers { for _, errMsg := range peer.GetLastErr() { - err := fmt.Errorf("error returned from region peer.GetLastErr: %s", errMsg) + err := fmt.Errorf("Error returned from region peer.GetLastErr: %s", errMsg) span.RecordError(err) errs = append(errs, err.Error()) } @@ -578,16 +579,17 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_ } metricGetRateLimitCounter.WithLabelValues("local").Inc() - // If global behavior, then broadcast update to all peers. + + // If global behavior and owning peer, broadcast update to all peers. + // Assuming that this peer does not own the ratelimit. if HasBehavior(r.Behavior, Behavior_GLOBAL) { - s.global.QueueUpdate(r, resp) + s.global.QueueUpdate(r) } return resp, nil } -// SetPeers replaces the peers and shuts down all the previous peers. -// TODO this should return an error if we failed to connect to any of the new peers +// SetPeers is called by the implementor to indicate the pool of peers has changed func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { localPicker := s.conf.LocalPicker.New() regionPicker := s.conf.RegionPicker.New() @@ -598,18 +600,13 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { peer := s.conf.RegionPicker.GetByPeerInfo(info) // If we don't have an existing PeerClient create a new one if peer == nil { - var err error - peer, err = NewPeerClient(PeerConfig{ + peer = NewPeerClient(PeerConfig{ TraceGRPC: s.conf.PeerTraceGRPC, Behavior: s.conf.Behaviors, TLS: s.conf.PeerTLS, Log: s.log, Info: info, }) - if err != nil { - s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err) - return - } } regionPicker.Add(peer) continue @@ -617,18 +614,13 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { // If we don't have an existing PeerClient create a new one peer := s.conf.LocalPicker.GetByPeerInfo(info) if peer == nil { - var err error - peer, err = NewPeerClient(PeerConfig{ + peer = NewPeerClient(PeerConfig{ TraceGRPC: s.conf.PeerTraceGRPC, Behavior: s.conf.Behaviors, TLS: s.conf.PeerTLS, Log: s.log, Info: info, }) - if err != nil { - s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err) - return - } } localPicker.Add(peer) } @@ -727,6 +719,7 @@ func (s *V1Instance) Describe(ch chan<- *prometheus.Desc) { s.global.metricBroadcastDuration.Describe(ch) s.global.metricGlobalQueueLength.Describe(ch) s.global.metricGlobalSendDuration.Describe(ch) + s.global.metricGlobalSendQueueLength.Describe(ch) } // Collect fetches metrics from the server for use by prometheus @@ -745,6 +738,7 @@ func (s *V1Instance) Collect(ch chan<- prometheus.Metric) { s.global.metricBroadcastDuration.Collect(ch) s.global.metricGlobalQueueLength.Collect(ch) s.global.metricGlobalSendDuration.Collect(ch) + s.global.metricGlobalSendQueueLength.Collect(ch) } // HasBehavior returns true if the provided behavior is set diff --git a/gubernator.pb.go b/gubernator.pb.go index 808a8814..1ae28afa 100644 --- a/gubernator.pb.go +++ b/gubernator.pb.go @@ -109,7 +109,7 @@ const ( // distributed to each peer and cached locally. A rate limit request received from any peer in the // cluster will first check the local cache for a rate limit answer, if it exists the peer will // immediately return the answer to the client and asynchronously forward the aggregate hits to - // the owner peer. Because of GLOBALS async nature we lose some accuracy in rate limit + // the peer coordinator. Because of GLOBALS async nature we lose some accuracy in rate limit // reporting, which may result in allowing some requests beyond the chosen rate limit. However we // gain massive performance as every request coming into the system does not have to wait for a // single peer to decide if the rate limit has been reached. @@ -478,15 +478,15 @@ type RateLimitResp struct { // The status of the rate limit. Status Status `protobuf:"varint,1,opt,name=status,proto3,enum=pb.gubernator.Status" json:"status,omitempty"` - // The currently configured request limit (Identical to [[RateLimitReq.limit]]). + // The currently configured request limit (Identical to RateLimitRequest.rate_limit_config.limit). Limit int64 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` - // This is the number of requests remaining before the rate limit is hit but after subtracting the hits from the current request + // This is the number of requests remaining before the limit is hit. Remaining int64 `protobuf:"varint,3,opt,name=remaining,proto3" json:"remaining,omitempty"` // This is the time when the rate limit span will be reset, provided as a unix timestamp in milliseconds. ResetTime int64 `protobuf:"varint,4,opt,name=reset_time,json=resetTime,proto3" json:"reset_time,omitempty"` // Contains the error; If set all other values should be ignored Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` - // This is additional metadata that a client might find useful. (IE: Additional headers, coordinator ownership, etc..) + // This is additional metadata that a client might find useful. (IE: Additional headers, corrdinator ownership, etc..) Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } diff --git a/gubernator.pb.gw.go b/gubernator.pb.gw.go index bb460598..1c67924b 100644 --- a/gubernator.pb.gw.go +++ b/gubernator.pb.gw.go @@ -145,7 +145,7 @@ func RegisterV1HandlerServer(ctx context.Context, mux *runtime.ServeMux, server // RegisterV1HandlerFromEndpoint is same as RegisterV1Handler but // automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterV1HandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { - conn, err := grpc.DialContext(ctx, endpoint, opts...) + conn, err := grpc.Dial(endpoint, opts...) if err != nil { return err } diff --git a/gubernator.proto b/gubernator.proto index fea99a22..5fd040df 100644 --- a/gubernator.proto +++ b/gubernator.proto @@ -85,7 +85,7 @@ enum Behavior { // distributed to each peer and cached locally. A rate limit request received from any peer in the // cluster will first check the local cache for a rate limit answer, if it exists the peer will // immediately return the answer to the client and asynchronously forward the aggregate hits to - // the owner peer. Because of GLOBALS async nature we lose some accuracy in rate limit + // the peer coordinator. Because of GLOBALS async nature we lose some accuracy in rate limit // reporting, which may result in allowing some requests beyond the chosen rate limit. However we // gain massive performance as every request coming into the system does not have to wait for a // single peer to decide if the rate limit has been reached. @@ -178,15 +178,15 @@ enum Status { message RateLimitResp { // The status of the rate limit. Status status = 1; - // The currently configured request limit (Identical to [[RateLimitReq.limit]]). + // The currently configured request limit (Identical to RateLimitRequest.rate_limit_config.limit). int64 limit = 2; - // This is the number of requests remaining before the rate limit is hit but after subtracting the hits from the current request + // This is the number of requests remaining before the limit is hit. int64 remaining = 3; // This is the time when the rate limit span will be reset, provided as a unix timestamp in milliseconds. int64 reset_time = 4; // Contains the error; If set all other values should be ignored string error = 5; - // This is additional metadata that a client might find useful. (IE: Additional headers, coordinator ownership, etc..) + // This is additional metadata that a client might find useful. (IE: Additional headers, corrdinator ownership, etc..) map metadata = 6; } diff --git a/interval_test.go b/interval_test.go index 68c8b40d..89642c3e 100644 --- a/interval_test.go +++ b/interval_test.go @@ -18,8 +18,9 @@ package gubernator_test import ( "testing" + "time" - "github.com/mailgun/gubernator/v2" + gubernator "github.com/mailgun/gubernator/v2" "github.com/mailgun/holster/v4/clock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,18 +28,18 @@ import ( func TestInterval(t *testing.T) { t.Run("Happy path", func(t *testing.T) { - interval := gubernator.NewInterval(10 * clock.Millisecond) + interval := gubernator.NewInterval(10 * time.Millisecond) defer interval.Stop() interval.Next() assert.Empty(t, interval.C) - clock.Sleep(10 * clock.Millisecond) + time.Sleep(10 * time.Millisecond) // Wait for tick. select { case <-interval.C: - case <-clock.After(100 * clock.Millisecond): + case <-time.After(100 * time.Millisecond): require.Fail(t, "timeout") } }) diff --git a/peer_client.go b/peer_client.go index 39c13c14..a39d9f02 100644 --- a/peer_client.go +++ b/peer_client.go @@ -21,7 +21,6 @@ import ( "crypto/tls" "fmt" "sync" - "sync/atomic" "github.com/mailgun/holster/v4/clock" "github.com/mailgun/holster/v4/collections" @@ -34,10 +33,8 @@ import ( "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" ) type PeerPicker interface { @@ -48,16 +45,24 @@ type PeerPicker interface { Add(*PeerClient) } +type peerStatus int + +const ( + peerNotConnected peerStatus = iota + peerConnected + peerClosing +) + type PeerClient struct { - client PeersV1Client - conn *grpc.ClientConn - conf PeerConfig - queue chan *request - queueClosed atomic.Bool - lastErrs *collections.LRUCache - - wgMutex sync.RWMutex - wg sync.WaitGroup // Monitor the number of in-flight requests. GUARDED_BY(wgMutex) + client PeersV1Client + conn *grpc.ClientConn + conf PeerConfig + queue chan *request + lastErrs *collections.LRUCache + + mutex sync.RWMutex // This mutex is for verifying the closing state of the client + status peerStatus // Keep the current status of the peer + wg sync.WaitGroup // This wait group is to monitor the number of in-flight requests } type response struct { @@ -79,39 +84,80 @@ type PeerConfig struct { TraceGRPC bool } -// NewPeerClient tries to establish a connection to a peer in a non-blocking fashion. -// If batching is enabled, it also starts a goroutine where batches will be processed. -func NewPeerClient(conf PeerConfig) (*PeerClient, error) { - peerClient := &PeerClient{ +func NewPeerClient(conf PeerConfig) *PeerClient { + return &PeerClient{ queue: make(chan *request, 1000), + status: peerNotConnected, conf: conf, lastErrs: collections.NewLRUCache(100), } - var opts []grpc.DialOption +} - if conf.TraceGRPC { - opts = []grpc.DialOption{ - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), - } - } +// Connect establishes a GRPC connection to a peer +func (c *PeerClient) connect(ctx context.Context) (err error) { + // NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and + // handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected. + // Doing this here encapsulates managing the connected state to the PeerClient struct. Previously a PeerClient + // was connected when `NewPeerClient()` was called however, when adding support for multi data centers having a + // PeerClient connected to every Peer in every data center continuously is not desirable. - if conf.TLS != nil { - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(conf.TLS))) - } else { - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - } + funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.connect")) + defer funcTimer.ObserveDuration() + lockTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.connect_RLock")) - var err error - peerClient.conn, err = grpc.Dial(conf.Info.GRPCAddress, opts...) - if err != nil { - return nil, err + c.mutex.RLock() + lockTimer.ObserveDuration() + + if c.status == peerClosing { + c.mutex.RUnlock() + return &PeerErr{err: errors.New("already disconnecting")} } - peerClient.client = NewPeersV1Client(peerClient.conn) - if !conf.Behavior.DisableBatching { - go peerClient.runBatch() + if c.status == peerNotConnected { + // This mutex stuff looks wonky, but it allows us to use RLock() 99% of the time, while the 1% where we + // actually need to connect uses a full Lock(), using RLock() most of which should reduce the over head + // of a full lock on every call + + // Yield the read lock so we can get the RW lock + c.mutex.RUnlock() + c.mutex.Lock() + defer c.mutex.Unlock() + + // Now that we have the RW lock, ensure no else got here ahead of us. + if c.status == peerConnected { + return nil + } + + // Setup OpenTelemetry interceptor to propagate spans. + var opts []grpc.DialOption + + if c.conf.TraceGRPC { + opts = []grpc.DialOption{ + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), + } + } + + if c.conf.TLS != nil { + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(c.conf.TLS))) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + var err error + c.conn, err = grpc.Dial(c.conf.Info.GRPCAddress, opts...) + if err != nil { + return c.setLastErr(&PeerErr{err: errors.Wrapf(err, "failed to dial peer %s", c.conf.Info.GRPCAddress)}) + } + c.client = NewPeersV1Client(c.conn) + c.status = peerConnected + + if !c.conf.Behavior.DisableBatching { + go c.runBatch() + } + return nil } - return peerClient, nil + c.mutex.RUnlock() + return nil } // Info returns PeerInfo struct that describes this PeerClient @@ -161,13 +207,21 @@ func (c *PeerClient) GetPeerRateLimit(ctx context.Context, r *RateLimitReq) (res // GetPeerRateLimits requests a list of rate limit statuses from a peer func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (resp *GetPeerRateLimitsResp, err error) { - // NOTE: This must be done within the Lock since calling Wait() in Shutdown() causes + if err := c.connect(ctx); err != nil { + err = errors.Wrap(err, "Error in connect") + metricCheckErrorCounter.WithLabelValues("Connect error").Add(1) + return nil, c.setLastErr(err) + } + + // NOTE: This must be done within the RLock since calling Wait() in Shutdown() causes // a race condition if called within a separate go routine if the internal wg is `0` // when Wait() is called then Add(1) is called concurrently. - c.wgMutex.Lock() + c.mutex.RLock() c.wg.Add(1) - c.wgMutex.Unlock() - defer c.wg.Done() + defer func() { + c.mutex.RUnlock() + defer c.wg.Done() + }() resp, err = c.client.GetPeerRateLimits(ctx, r) if err != nil { @@ -187,12 +241,17 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits // UpdatePeerGlobals sends global rate limit status updates to a peer func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (resp *UpdatePeerGlobalsResp, err error) { + if err := c.connect(ctx); err != nil { + return nil, c.setLastErr(err) + } // See NOTE above about RLock and wg.Add(1) - c.wgMutex.Lock() + c.mutex.RLock() c.wg.Add(1) - c.wgMutex.Unlock() - defer c.wg.Done() + defer func() { + c.mutex.RUnlock() + defer c.wg.Done() + }() resp, err = c.client.UpdatePeerGlobals(ctx, r) if err != nil { @@ -237,26 +296,29 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.getPeerRateLimitsBatch")) defer funcTimer.ObserveDuration() + if err := c.connect(ctx); err != nil { + err = errors.Wrap(err, "Error in connect") + return nil, c.setLastErr(err) + } + + // See NOTE above about RLock and wg.Add(1) + c.mutex.RLock() + if c.status == peerClosing { + err := &PeerErr{err: errors.New("already disconnecting")} + return nil, c.setLastErr(err) + } + + // Wait for a response or context cancel req := request{ resp: make(chan *response, 1), ctx: ctx, request: r, } - c.wgMutex.Lock() - c.wg.Add(1) - c.wgMutex.Unlock() - defer c.wg.Done() - // Enqueue the request to be sent peerAddr := c.Info().GRPCAddress metricBatchQueueLength.WithLabelValues(peerAddr).Set(float64(len(c.queue))) - if c.queueClosed.Load() { - // this check prevents "panic: send on close channel" - return nil, status.Error(codes.Canceled, "grpc: the client connection is closing") - } - select { case c.queue <- &req: // Successfully enqueued request. @@ -264,7 +326,12 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq return nil, errors.Wrap(ctx.Err(), "Context error while enqueuing request") } - // Wait for a response or context cancel + c.wg.Add(1) + defer func() { + c.mutex.RUnlock() + c.wg.Done() + }() + select { case re := <-req.resp: if re.err != nil { @@ -277,7 +344,7 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq } } -// runBatch processes batching requests by waiting for requests to be queued. Send +// run processes batching requests by waiting for requests to be queued. Send // the queue as a batch when either c.batchWait time has elapsed or the queue // reaches c.batchLimit. func (c *PeerClient) runBatch() { @@ -291,8 +358,8 @@ func (c *PeerClient) runBatch() { select { case r, ok := <-c.queue: + // If the queue has shutdown, we need to send the rest of the queue if !ok { - // If the queue has shutdown, we need to send the rest of the queue if len(queue) > 0 { c.sendBatch(ctx, queue) } @@ -359,6 +426,7 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { prop.Inject(r.ctx, &MetadataCarrier{Map: r.request.Metadata}) req.Requests = append(req.Requests, r.request) tracing.EndScope(r.ctx, nil) + } timeoutCtx, timeoutCancel := context.WithTimeout(ctx, c.conf.Behavior.BatchTimeout) @@ -402,26 +470,31 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { } } -// Shutdown waits until all outstanding requests have finished or the context is cancelled. -// Then it closes the grpc connection. +// Shutdown will gracefully shutdown the client connection, until the context is cancelled func (c *PeerClient) Shutdown(ctx context.Context) error { - // ensure we don't leak goroutines, even if the Shutdown times out - defer c.conn.Close() + // Take the write lock since we're going to modify the closing state + c.mutex.Lock() + if c.status == peerClosing || c.status == peerNotConnected { + c.mutex.Unlock() + return nil + } + defer c.mutex.Unlock() + + c.status = peerClosing + defer func() { + if c.conn != nil { + c.conn.Close() + } + }() + + // This allows us to wait on the waitgroup, or until the context + // has been cancelled. This doesn't leak goroutines, because + // closing the connection will kill any outstanding requests. waitChan := make(chan struct{}) go func() { - // drain in-flight requests - c.wgMutex.Lock() - defer c.wgMutex.Unlock() c.wg.Wait() - - // clear errors - c.lastErrs = collections.NewLRUCache(100) - - // signal that no more items will be sent - c.queueClosed.Store(true) close(c.queue) - close(waitChan) }() @@ -432,3 +505,30 @@ func (c *PeerClient) Shutdown(ctx context.Context) error { return nil } } + +// PeerErr is returned if the peer is not connected or is in a closing state +type PeerErr struct { + err error +} + +func (p *PeerErr) NotReady() bool { + return true +} + +func (p *PeerErr) Error() string { + return p.err.Error() +} + +func (p *PeerErr) Cause() error { + return p.err +} + +type notReadyErr interface { + NotReady() bool +} + +// IsNotReady returns true if the err is because the peer is not connected or in a closing state +func IsNotReady(err error) bool { + te, ok := err.(notReadyErr) + return ok && te.NotReady() +} diff --git a/peer_client_test.go b/peer_client_test.go index d739f40a..99924bed 100644 --- a/peer_client_test.go +++ b/peer_client_test.go @@ -19,15 +19,13 @@ package gubernator_test import ( "context" "runtime" - "strings" + "sync" "testing" gubernator "github.com/mailgun/gubernator/v2" "github.com/mailgun/gubernator/v2/cluster" "github.com/mailgun/holster/v4/clock" - "github.com/pkg/errors" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" + "github.com/stretchr/testify/assert" ) func TestPeerClientShutdown(t *testing.T) { @@ -58,17 +56,17 @@ func TestPeerClientShutdown(t *testing.T) { c := cases[i] t.Run(c.Name, func(t *testing.T) { - client, err := gubernator.NewPeerClient(gubernator.PeerConfig{ + client := gubernator.NewPeerClient(gubernator.PeerConfig{ Info: cluster.GetRandomPeer(cluster.DataCenterNone), Behavior: config, }) - require.NoError(t, err) - wg := errgroup.Group{} - wg.SetLimit(threads) + wg := sync.WaitGroup{} + wg.Add(threads) // Spawn a whole bunch of concurrent requests to test shutdown in various states for j := 0; j < threads; j++ { - wg.Go(func() error { + go func() { + defer wg.Done() ctx := context.Background() _, err := client.GetPeerRateLimit(ctx, &gubernator.RateLimitReq{ Hits: 1, @@ -76,26 +74,28 @@ func TestPeerClientShutdown(t *testing.T) { Behavior: c.Behavior, }) - if err != nil { - if !strings.Contains(err.Error(), "client connection is closing") { - return errors.Wrap(err, "unexpected error in test") - } + isExpectedErr := false + + switch err.(type) { + case *gubernator.PeerErr: + isExpectedErr = true + case nil: + isExpectedErr = true } - return nil - }) + + assert.True(t, true, isExpectedErr) + + }() } // yield the processor that way we allow other goroutines to start their request runtime.Gosched() - shutDownErr := client.Shutdown(context.Background()) + err := client.Shutdown(context.Background()) + assert.NoError(t, err) - err = wg.Wait() - if err != nil { - t.Error(err) - t.Fail() - } - require.NoError(t, shutDownErr) + wg.Wait() }) + } } diff --git a/peers.pb.gw.go b/peers.pb.gw.go index f0929765..41f7d6e5 100644 --- a/peers.pb.gw.go +++ b/peers.pb.gw.go @@ -161,7 +161,7 @@ func RegisterPeersV1HandlerServer(ctx context.Context, mux *runtime.ServeMux, se // RegisterPeersV1HandlerFromEndpoint is same as RegisterPeersV1Handler but // automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterPeersV1HandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { - conn, err := grpc.DialContext(ctx, endpoint, opts...) + conn, err := grpc.Dial(endpoint, opts...) if err != nil { return err } diff --git a/peers.proto b/peers.proto index 1ce2a431..5caefae4 100644 --- a/peers.proto +++ b/peers.proto @@ -26,10 +26,10 @@ import "gubernator.proto"; // NOTE: For use by gubernator peers only service PeersV1 { - // Used by peers to relay batches of requests to an owner peer + // Used by peers to relay batches of requests to an authoritative peer rpc GetPeerRateLimits (GetPeerRateLimitsReq) returns (GetPeerRateLimitsResp) {} - // Used by owner peers to send global rate limit updates to non-owner peers + // Used by peers send global rate limit updates to other peers rpc UpdatePeerGlobals (UpdatePeerGlobalsReq) returns (UpdatePeerGlobalsResp) {} } diff --git a/peers_grpc.pb.go b/peers_grpc.pb.go index e74a7d16..33db74af 100644 --- a/peers_grpc.pb.go +++ b/peers_grpc.pb.go @@ -42,9 +42,9 @@ const ( // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type PeersV1Client interface { - // Used by peers to relay batches of requests to an owner peer + // Used by peers to relay batches of requests to an authoritative peer GetPeerRateLimits(ctx context.Context, in *GetPeerRateLimitsReq, opts ...grpc.CallOption) (*GetPeerRateLimitsResp, error) - // Used by owner peers to send global rate limit updates to non-owner peers + // Used by peers send global rate limit updates to other peers UpdatePeerGlobals(ctx context.Context, in *UpdatePeerGlobalsReq, opts ...grpc.CallOption) (*UpdatePeerGlobalsResp, error) } @@ -78,9 +78,9 @@ func (c *peersV1Client) UpdatePeerGlobals(ctx context.Context, in *UpdatePeerGlo // All implementations should embed UnimplementedPeersV1Server // for forward compatibility type PeersV1Server interface { - // Used by peers to relay batches of requests to an owner peer + // Used by peers to relay batches of requests to an authoritative peer GetPeerRateLimits(context.Context, *GetPeerRateLimitsReq) (*GetPeerRateLimitsResp, error) - // Used by owner peers to send global rate limit updates to non-owner peers + // Used by peers send global rate limit updates to other peers UpdatePeerGlobals(context.Context, *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) } diff --git a/python/gubernator/gubernator_pb2.py b/python/gubernator/gubernator_pb2.py index 17351bb6..cefa90ce 100644 --- a/python/gubernator/gubernator_pb2.py +++ b/python/gubernator/gubernator_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: gubernator.proto -# Protobuf Python Version: 4.25.3 +# Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/python/gubernator/peers_pb2.py b/python/gubernator/peers_pb2.py index b1451c7a..53b40758 100644 --- a/python/gubernator/peers_pb2.py +++ b/python/gubernator/peers_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: peers.proto -# Protobuf Python Version: 4.25.3 +# Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/python/gubernator/peers_pb2_grpc.py b/python/gubernator/peers_pb2_grpc.py index 9ebb860d..7b8f4c99 100644 --- a/python/gubernator/peers_pb2_grpc.py +++ b/python/gubernator/peers_pb2_grpc.py @@ -32,14 +32,14 @@ class PeersV1Servicer(object): """ def GetPeerRateLimits(self, request, context): - """Used by peers to relay batches of requests to an owner peer + """Used by peers to relay batches of requests to an authoritative peer """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def UpdatePeerGlobals(self, request, context): - """Used by owner peers to send global rate limit updates to non-owner peers + """Used by peers send global rate limit updates to other peers """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!')