Skip to content

Commit 8d62bc4

Browse files
Harvey Lowndesowainlewis
Harvey Lowndes
authored andcommitted
Support request rate limiting (#201)
* Support request rate limiting
1 parent 60400e6 commit 8d62bc4

12 files changed

+374
-11
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,4 @@ See [LICENSE](LICENSE) for more details.
173173
[7]: https://github.com/oracle/oci-cloud-controller-manager/tree/master/manifests/cloud-provider-example.yaml
174174
[8]: https://github.com/oracle/oci-cloud-controller-manager/blob/master/docs/tutorial.md
175175
[9]: https://github.com/oracle/oci-cloud-controller-manager/blob/master/docs/tutorial-ssl.md
176+
[10]: https://github.com/oracle/oci-cloud-controller-manager/blob/master/docs/rate-limiter-configuration.md

docs/rate-limiter-configuration.md

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Rate Limiting Configuration
2+
3+
This file defines a list of Rate Limiting configurations properties,
4+
supported by the `oci-cloud-controller-manager`.
5+
6+
These properties exist in the `yaml` configuration under `rateLimiter`
7+
in root. For example:
8+
9+
```yaml
10+
auth:
11+
...
12+
loadBalancer:
13+
...
14+
rateLimiter:
15+
rateLimitQPSRead: ...
16+
rateLimitBucketRead: ...
17+
rateLimitQPSWrite: ...
18+
rateLimitBucketWrite: ...
19+
```
20+
21+
## Request read/write rate limiting properties
22+
23+
| Name | Description | Default |
24+
| ---- | ----------- | ------- |
25+
| `rateLimitQPSRead` | The maximum queries allowed per second for read requests. | 20.0 |
26+
| `rateLimitBucketRead` | The maximum token bucket burst size for read requests. | 5.0 |
27+
| `rateLimitQPSWrite` | The maximum queries allwoed per second for write requests. | 20.0 |
28+
| `rateLimitBucketWrite` | The maximim token bucket burst size for write requests. | 5.0 |

pkg/oci/ccm.go

+51-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
clientset "k8s.io/client-go/kubernetes"
3535
listersv1 "k8s.io/client-go/listers/core/v1"
3636
cache "k8s.io/client-go/tools/cache"
37+
"k8s.io/client-go/util/flowcontrol"
3738
cloudprovider "k8s.io/kubernetes/pkg/cloudprovider"
3839
controller "k8s.io/kubernetes/pkg/controller"
3940

@@ -42,6 +43,11 @@ import (
4243
"github.com/oracle/oci-cloud-controller-manager/pkg/oci/util"
4344
)
4445

46+
const (
47+
rateLimitQPSDefault = 20.0
48+
rateLimitBucketDefault = 5
49+
)
50+
4551
// ProviderName uniquely identifies the Oracle Bare Metal Cloud Services (OCI)
4652
// cloud-provider.
4753
func ProviderName() string {
@@ -78,7 +84,10 @@ func NewCloudProvider(config *Config) (cloudprovider.Interface, error) {
7884
if err != nil {
7985
return nil, err
8086
}
81-
c, err := client.New(logger.Sugar(), cp)
87+
88+
rateLimiter := newRateLimiter(logger.Sugar(), config.RateLimiter)
89+
90+
c, err := client.New(logger.Sugar(), cp, &rateLimiter)
8291
if err != nil {
8392
return nil, err
8493
}
@@ -225,3 +234,44 @@ func buildConfigurationProvider(logger *zap.Logger, config *Config) (common.Conf
225234
)
226235
return cp, nil
227236
}
237+
238+
// newRateLimiter builds and returns a struct containing read and write
239+
// rate limiters. Defaults are used where no (0) value is provided.
240+
func newRateLimiter(logger *zap.SugaredLogger, config *RateLimiterConfig) client.RateLimiter {
241+
if config == nil {
242+
config = &RateLimiterConfig{}
243+
}
244+
245+
// Set to default values if configuration not declared
246+
if config.RateLimitQPSRead == 0 {
247+
config.RateLimitQPSRead = rateLimitQPSDefault
248+
}
249+
if config.RateLimitBucketRead == 0 {
250+
config.RateLimitBucketRead = rateLimitBucketDefault
251+
}
252+
if config.RateLimitQPSWrite == 0 {
253+
config.RateLimitQPSWrite = rateLimitQPSDefault
254+
}
255+
if config.RateLimitBucketWrite == 0 {
256+
config.RateLimitBucketWrite = rateLimitBucketDefault
257+
}
258+
259+
rateLimiter := client.RateLimiter{
260+
Reader: flowcontrol.NewTokenBucketRateLimiter(
261+
config.RateLimitQPSRead,
262+
config.RateLimitBucketRead),
263+
Writer: flowcontrol.NewTokenBucketRateLimiter(
264+
config.RateLimitQPSWrite,
265+
config.RateLimitBucketWrite),
266+
}
267+
268+
logger.Infof("OCI using read rate limit configuration: QPS=%g, bucket=%d",
269+
config.RateLimitQPSRead,
270+
config.RateLimitBucketRead)
271+
272+
logger.Infof("OCI using write rate limit configuration: QPS=%g, bucket=%d",
273+
config.RateLimitQPSWrite,
274+
config.RateLimitBucketWrite)
275+
276+
return rateLimiter
277+
}

pkg/oci/ccm_test.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2017 Oracle and/or its affiliates. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package oci
16+
17+
import (
18+
"testing"
19+
20+
"go.uber.org/zap"
21+
)
22+
23+
func TestBuildRateLimiterWithConfig(t *testing.T) {
24+
qpsRead := float32(6.0)
25+
bucketRead := 20
26+
qpsWrite := float32(8.0)
27+
bucketWrite := 20
28+
29+
rateLimiterConfig := &RateLimiterConfig{
30+
RateLimitQPSRead: qpsRead,
31+
RateLimitBucketRead: bucketRead,
32+
RateLimitQPSWrite: qpsWrite,
33+
RateLimitBucketWrite: bucketWrite,
34+
}
35+
36+
rateLimiter := newRateLimiter(zap.S(), rateLimiterConfig)
37+
38+
if rateLimiter.Reader.QPS() != qpsRead {
39+
t.Errorf("unexpected QPS (read) value: expected %f but found %f", qpsRead, rateLimiter.Reader.QPS())
40+
}
41+
42+
if rateLimiter.Writer.QPS() != qpsWrite {
43+
t.Errorf("unexpected QPS (write) value: expected %f but found %f", qpsWrite, rateLimiter.Writer.QPS())
44+
}
45+
}
46+
47+
func TestBuildRateLimiterWithDefaults(t *testing.T) {
48+
rateLimiterConfig := &RateLimiterConfig{}
49+
50+
rateLimiter := newRateLimiter(zap.S(), rateLimiterConfig)
51+
52+
if rateLimiter.Reader.QPS() != rateLimitQPSDefault {
53+
t.Errorf("unexpected QPS (read) value: expected %f but found %f", rateLimitQPSDefault, rateLimiter.Reader.QPS())
54+
}
55+
56+
if rateLimiter.Writer.QPS() != rateLimitQPSDefault {
57+
t.Errorf("unexpected QPS (write) value: expected %f but found %f", rateLimitQPSDefault, rateLimiter.Writer.QPS())
58+
}
59+
}

pkg/oci/client/client.go

+44-5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package client
1616

1717
import (
18+
"context"
1819
"crypto/tls"
1920
"crypto/x509"
2021
"io/ioutil"
@@ -26,6 +27,7 @@ import (
2627

2728
"go.uber.org/zap"
2829
"k8s.io/client-go/tools/cache"
30+
"k8s.io/client-go/util/flowcontrol"
2931

3032
"github.com/oracle/oci-go-sdk/common"
3133
"github.com/oracle/oci-go-sdk/core"
@@ -40,18 +42,54 @@ type Interface interface {
4042
Networking() NetworkingInterface
4143
}
4244

45+
// RateLimiter reader and writer.
46+
type RateLimiter struct {
47+
Reader flowcontrol.RateLimiter
48+
Writer flowcontrol.RateLimiter
49+
}
50+
51+
type computeClient interface {
52+
GetInstance(ctx context.Context, request core.GetInstanceRequest) (response core.GetInstanceResponse, err error)
53+
ListInstances(ctx context.Context, request core.ListInstancesRequest) (response core.ListInstancesResponse, err error)
54+
ListVnicAttachments(ctx context.Context, request core.ListVnicAttachmentsRequest) (response core.ListVnicAttachmentsResponse, err error)
55+
}
56+
57+
type virtualNetworkClient interface {
58+
GetVnic(ctx context.Context, request core.GetVnicRequest) (response core.GetVnicResponse, err error)
59+
GetSubnet(ctx context.Context, request core.GetSubnetRequest) (response core.GetSubnetResponse, err error)
60+
GetSecurityList(ctx context.Context, request core.GetSecurityListRequest) (response core.GetSecurityListResponse, err error)
61+
UpdateSecurityList(ctx context.Context, request core.UpdateSecurityListRequest) (response core.UpdateSecurityListResponse, err error)
62+
}
63+
64+
type loadBalancerClient interface {
65+
GetLoadBalancer(ctx context.Context, request loadbalancer.GetLoadBalancerRequest) (response loadbalancer.GetLoadBalancerResponse, err error)
66+
ListLoadBalancers(ctx context.Context, request loadbalancer.ListLoadBalancersRequest) (response loadbalancer.ListLoadBalancersResponse, err error)
67+
CreateLoadBalancer(ctx context.Context, request loadbalancer.CreateLoadBalancerRequest) (response loadbalancer.CreateLoadBalancerResponse, err error)
68+
DeleteLoadBalancer(ctx context.Context, request loadbalancer.DeleteLoadBalancerRequest) (response loadbalancer.DeleteLoadBalancerResponse, err error)
69+
ListCertificates(ctx context.Context, request loadbalancer.ListCertificatesRequest) (response loadbalancer.ListCertificatesResponse, err error)
70+
CreateCertificate(ctx context.Context, request loadbalancer.CreateCertificateRequest) (response loadbalancer.CreateCertificateResponse, err error)
71+
GetWorkRequest(ctx context.Context, request loadbalancer.GetWorkRequestRequest) (response loadbalancer.GetWorkRequestResponse, err error)
72+
CreateBackendSet(ctx context.Context, request loadbalancer.CreateBackendSetRequest) (response loadbalancer.CreateBackendSetResponse, err error)
73+
UpdateBackendSet(ctx context.Context, request loadbalancer.UpdateBackendSetRequest) (response loadbalancer.UpdateBackendSetResponse, err error)
74+
DeleteBackendSet(ctx context.Context, request loadbalancer.DeleteBackendSetRequest) (response loadbalancer.DeleteBackendSetResponse, err error)
75+
CreateListener(ctx context.Context, request loadbalancer.CreateListenerRequest) (response loadbalancer.CreateListenerResponse, err error)
76+
UpdateListener(ctx context.Context, request loadbalancer.UpdateListenerRequest) (response loadbalancer.UpdateListenerResponse, err error)
77+
DeleteListener(ctx context.Context, request loadbalancer.DeleteListenerRequest) (response loadbalancer.DeleteListenerResponse, err error)
78+
}
79+
4380
type client struct {
44-
compute *core.ComputeClient
45-
network *core.VirtualNetworkClient
46-
loadbalancer *loadbalancer.LoadBalancerClient
81+
compute computeClient
82+
network virtualNetworkClient
83+
loadbalancer loadBalancerClient
84+
85+
rateLimiter RateLimiter
4786

4887
subnetCache cache.Store
4988
logger *zap.SugaredLogger
5089
}
5190

5291
// New constructs an OCI API client.
53-
func New(logger *zap.SugaredLogger, cp common.ConfigurationProvider) (Interface, error) {
54-
logger = logger.Named("ociClient")
92+
func New(logger *zap.SugaredLogger, cp common.ConfigurationProvider, opRateLimiter *RateLimiter) (Interface, error) {
5593
compute, err := core.NewComputeClientWithConfigurationProvider(cp)
5694
if err != nil {
5795
return nil, errors.Wrap(err, "NewComputeClientWithConfigurationProvider")
@@ -86,6 +124,7 @@ func New(logger *zap.SugaredLogger, cp common.ConfigurationProvider) (Interface,
86124
compute: &compute,
87125
network: &network,
88126
loadbalancer: &lb,
127+
rateLimiter: *opRateLimiter,
89128

90129
subnetCache: cache.NewTTLStore(subnetCacheKeyFn, time.Duration(24)*time.Hour),
91130
logger: logger,

pkg/oci/client/client_test.go

+89
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
package client
1616

1717
import (
18+
"context"
1819
"testing"
1920

2021
"github.com/oracle/oci-go-sdk/core"
22+
"k8s.io/client-go/util/flowcontrol"
2123
)
2224

2325
func TestInstanceTerminalState(t *testing.T) {
@@ -54,3 +56,90 @@ func TestInstanceTerminalState(t *testing.T) {
5456
})
5557
}
5658
}
59+
60+
type mockComputeClient struct{}
61+
62+
type mockVirtualNetworkClient struct{}
63+
64+
type mockLoadBalancerClient struct{}
65+
66+
func TestRateLimiting(t *testing.T) {
67+
var qpsRead float32 = 5
68+
bucketRead := 5
69+
var qpsWrite float32 = 10
70+
bucketWrite := 5
71+
72+
rateLimiter := RateLimiter{
73+
Reader: flowcontrol.NewTokenBucketRateLimiter(
74+
qpsRead,
75+
bucketRead),
76+
Writer: flowcontrol.NewTokenBucketRateLimiter(
77+
qpsWrite,
78+
bucketWrite),
79+
}
80+
81+
client := newClient(rateLimiter)
82+
83+
// Read requests up to qpsRead should pass and the others fail
84+
for i := 0; i < int(qpsRead)*2; i++ {
85+
_, err := client.Compute().GetInstance(context.Background(), "123345")
86+
p := (err == nil)
87+
88+
if (i < int(qpsRead) && !p) || (i >= int(qpsRead) && p) {
89+
t.Errorf("unexpected result from request %d: %v", i, err)
90+
}
91+
}
92+
93+
// Write requests up to qpsWrite should pass and the others fail
94+
ids := [2]string{"12334"}
95+
for i := 0; i < int(qpsWrite)*2; i++ {
96+
req := core.UpdateSecurityListRequest{
97+
SecurityListId: &ids[0],
98+
}
99+
100+
_, err := client.Networking().UpdateSecurityList(context.Background(), req)
101+
p := (err == nil)
102+
103+
if (i < int(qpsRead) && !p) || (i >= int(qpsRead) && p) {
104+
t.Errorf("unexpected result from request %d: %v", i, err)
105+
}
106+
}
107+
}
108+
109+
func newClient(rateLimiter RateLimiter) Interface {
110+
return &client{
111+
compute: &mockComputeClient{},
112+
network: &mockVirtualNetworkClient{},
113+
rateLimiter: rateLimiter,
114+
}
115+
}
116+
117+
/* Mock ComputeClient interface implementations */
118+
func (c *mockComputeClient) GetInstance(ctx context.Context, request core.GetInstanceRequest) (response core.GetInstanceResponse, err error) {
119+
return core.GetInstanceResponse{}, nil
120+
}
121+
122+
func (c *mockComputeClient) ListInstances(ctx context.Context, request core.ListInstancesRequest) (response core.ListInstancesResponse, err error) {
123+
return core.ListInstancesResponse{}, nil
124+
}
125+
126+
func (c *mockComputeClient) ListVnicAttachments(ctx context.Context, request core.ListVnicAttachmentsRequest) (response core.ListVnicAttachmentsResponse, err error) {
127+
return core.ListVnicAttachmentsResponse{}, nil
128+
}
129+
130+
/* Mock NetworkClient interface implementations */
131+
func (c *mockVirtualNetworkClient) GetVnic(ctx context.Context, request core.GetVnicRequest) (response core.GetVnicResponse, err error) {
132+
return core.GetVnicResponse{}, nil
133+
}
134+
135+
func (c *mockVirtualNetworkClient) GetSubnet(ctx context.Context, request core.GetSubnetRequest) (response core.GetSubnetResponse, err error) {
136+
return core.GetSubnetResponse{}, nil
137+
}
138+
139+
func (c *mockVirtualNetworkClient) GetSecurityList(ctx context.Context, request core.GetSecurityListRequest) (response core.GetSecurityListResponse, err error) {
140+
return core.GetSecurityListResponse{}, nil
141+
}
142+
143+
func (c *mockVirtualNetworkClient) UpdateSecurityList(ctx context.Context, request core.UpdateSecurityListRequest) (response core.UpdateSecurityListResponse, err error) {
144+
return core.UpdateSecurityListResponse{}, nil
145+
}

0 commit comments

Comments
 (0)