@@ -35,6 +35,7 @@ import (
35
35
"k8s.io/apiserver/pkg/audit"
36
36
"k8s.io/apiserver/pkg/util/webhook"
37
37
"k8s.io/client-go/rest"
38
+ "k8s.io/client-go/util/flowcontrol"
38
39
)
39
40
40
41
const (
@@ -63,6 +64,9 @@ const (
63
64
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
64
65
defaultBatchMaxWait = 30 * time .Second // Send events at least twice a minute.
65
66
defaultInitialBackoff = 10 * time .Second // Wait at least 10 seconds before retrying.
67
+
68
+ defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
69
+ defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
66
70
)
67
71
68
72
// The plugin name reported in error metrics.
@@ -154,6 +158,7 @@ func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batc
154
158
maxBatchSize : defaultBatchMaxSize ,
155
159
maxBatchWait : defaultBatchMaxWait ,
156
160
shutdownCh : make (chan struct {}),
161
+ throttle : flowcontrol .NewTokenBucketRateLimiter (defaultBatchThrottleQPS , defaultBatchThrottleBurst ),
157
162
}, nil
158
163
}
159
164
@@ -181,6 +186,9 @@ type batchBackend struct {
181
186
// all requests have been completed and no new will be spawned, since the
182
187
// sending routine is not running anymore.
183
188
reqMutex sync.RWMutex
189
+
190
+ // Limits the number of requests sent to the backend per second.
191
+ throttle flowcontrol.RateLimiter
184
192
}
185
193
186
194
func (b * batchBackend ) Run (stopCh <- chan struct {}) error {
@@ -306,6 +314,10 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) {
306
314
307
315
list := auditinternal.EventList {Items : events }
308
316
317
+ if b .throttle != nil {
318
+ b .throttle .Accept ()
319
+ }
320
+
309
321
// Locking reqMutex for read will guarantee that the shutdown process will
310
322
// block until the goroutine started below is finished. At the same time, it
311
323
// will not prevent other batches from being proceed further this point.
0 commit comments