Skip to content

Commit f5a70b5

Browse files
committed
feat: k8s client-go workqueue
1 parent ea1c975 commit f5a70b5

File tree

2 files changed

+16
-6
lines changed

2 files changed

+16
-6
lines changed

Diff for: README.md

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
<img src="https://raw.githubusercontent.com/rfyiamcool/golang_logo/master/png/golang_76.png" width="35%">
1010

11+
- [源码分析 kubernetes client-go workqueue 的实现原理](https://github.com/rfyiamcool/notes/blob/main/kubernetes_client_go_workqueue_code.md)
12+
1113
- [源码分析 kubernetes apisix ingress controller 控制器的实现原理 (一)](https://github.com/rfyiamcool/notes/blob/main/kubernetes_apisix_ingress_controller_code.md)
1214

1315
- [源码分析 kubernetes apisix ingress controller 控制器的实现原理 (二)](https://github.com/rfyiamcool/notes/blob/main/kubernetes_apisix_ingress_controller_crd_cache_code.md)

Diff for: kubernetes_client_go_workqueue_code.md

+14-6
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ k8s 的各个 controller 控制器内都有使用 workqueue 做事件队列. 为
2020

2121
deltafifo 虽然名为 fifo 队列, 但他的 fifo 不是全局事件, 而只是针对某资源对象的事件进行内部 fifo 排列. 比如某个 deployment 频繁做变更, 那么 deltafifo 逻辑是把后续收到的相关事件放在一起.
2222

23-
如果对 deltafifo 的实现原理感兴趣, 请阅读下文.
23+
如果对 informer deltafifo 的实现原理感兴趣, 请阅读下文.
2424

25-
[k8s informer 的设计实现](https://github.com/rfyiamcool/notes#kubernetes)
25+
[深入源码分析 k8s informer 的设计实现](https://github.com/rfyiamcool/notes#kubernetes)
2626

2727
## workqueue 中内置了三种队列模型
2828

@@ -547,9 +547,13 @@ type rateLimitingType struct {
547547

548548
在实例化 `rateLimitingType` 时, 需要创建 DelayingInterface 和配置 Ratelimiter.
549549

550-
`AddRateLimited` 方法是通过限频器计算出需要等待的时长, 然后调用 `delayingQueue.AddAfter()` 方法来判定把对象扔到延迟队里还是队列里.
550+
`AddRateLimited` 方法是通过限频器计算出需要等待的时长, 然后调用 `delayingQueue.AddAfter()` 方法来决定把对象扔到延迟队里还是队列里.
551551

552-
调用方在使用限频类的 workqueue 时, 当入队超过一定阈值后会采用异步的方法来添加任务, 对于 k8s controller 来说避免了同步等待.
552+
k8s 控制器在使用限频类的 workqueue 时, 当入队超过一定阈值后会采用异步的方法来添加任务, 这样对于 k8s controller 来说避免了同步等待, 及时去处理后面任务.
553+
554+
拿 deploment controller 来说, informer 关联的 eventHanlder 方法直接用 `Add` 入队. 只有错误的时候才会使用 `AddRateLimited` 进行入队.
555+
556+
另外 `Forget` 方法是在 rateLimiter 里清理掉某对象的相关记录. 值得注意的是, 不是所有的 rateLimiter 真正的实现了该接口. 具体情况还是要分析 ratelimiter 代码.
553557

554558
```go
555559
// 众多实例化方法之一, 传入 ratelimiter 限频器
@@ -639,6 +643,8 @@ func (r *BucketRateLimiter) Forget(item interface{}) {
639643

640644
#### 基于 backoff ratelimiter 的实现原理
641645

646+
使用一个 map 记录了各个元素的计数, 后通过经典 backoff 算法可以求出当前需要等待的时长. 默认为 1, 只要不 Forget 抹掉计数, 那么下次再入队时, 其等待的时长为上次的二次方.
647+
642648
```go
643649
type ItemExponentialFailureRateLimiter struct {
644650
failuresLock sync.Mutex
@@ -664,7 +670,7 @@ func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration
664670
exp := r.failures[item]
665671
r.failures[item] = r.failures[item] + 1
666672

667-
// 通过公式计算 backoff 时长
673+
// 通过公式计算 backoff 时长, 当前时长为上次的二次方.
668674
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
669675
if backoff > math.MaxInt64 {
670676
// 不能超过 maxDelay
@@ -755,6 +761,8 @@ Interface 为基本的队列类型. DelayingInterface 在 Interface 基础上实
755761

756762
![](https://xiaorui-cc.oss-cn-hangzhou.aliyuncs.com/images/202301/202301141656245.png)
757763

758-
从 k8s controller/scheduler/kube-proxy/kubelet ... 等组件中可看到不少经典的 workqueue 的用法. 其简化的流程是先通过 k8s informer 监听资源的变更, 实例化 informer 时需注册 addFunc/updateFunc/deleteFunc 事件方法. 这事件方法对应的操作是把 delta 对象扔到 workqueue 里. 控制器通常会开启多个协程去 workqueue 消费, 拿到的对象后使用控制器的 sync 进行状态同步.
764+
从 k8s controller/scheduler/kube-proxy/kubelet ... 等组件源码中, 可以找到不少经典的 workqueue 的用法.
765+
766+
其简化的流程原理是这样. 先通过 k8s informer 监听资源的变更, 实例化 informer 时需注册 addFunc/updateFunc/deleteFunc 事件方法. 这事件方法对应的操作是把 delta 对象扔到 workqueue 里. 控制器通常会开启多个协程去 workqueue 消费, 拿到的对象后使用控制器的 sync 进行状态同步.
759767

760768
![](https://xiaorui-cc.oss-cn-hangzhou.aliyuncs.com/images/202301/202301142109871.png)

0 commit comments

Comments
 (0)