@@ -30,8 +30,11 @@ import (
3030
3131 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3232 "k8s.io/apimachinery/pkg/runtime"
33+ "k8s.io/apimachinery/pkg/runtime/schema"
3334 "k8s.io/apimachinery/pkg/types"
3435 "k8s.io/apimachinery/pkg/util/wait"
36+ "k8s.io/apimachinery/pkg/watch"
37+ "k8s.io/client-go/dynamic"
3538 "k8s.io/client-go/kubernetes"
3639 "k8s.io/client-go/rest"
3740 "k8s.io/klog/v2"
@@ -45,6 +48,11 @@ const (
4548 nodeCreationRetry = time .Minute * 5
4649 // nodeCreationTimeout is the time after which the context for node creation request is cancelled.
4750 nodeCreationTimeout = time .Minute * 3
51+
52+ // watcherRetryCount is the number of times a watcher creation would be retried for.
53+ watcherRetryCount = 10
54+ // watcherRetryDelay is the amount of time to wait before trying recreation of a watcher.
55+ watcherRetryDelay = time .Second * 5
4856)
4957
5058var (
@@ -84,16 +92,10 @@ type Manager struct {
8492 PodUID string
8593}
8694
87- // Deploy creates CSIAddonsNode custom resource with all required information.
88- // When information to create the CSIAddonsNode is missing, an error will be
89- // returned immediately. If creating the CSIAddonsNode in the Kubernetes
90- // cluster fails (missing CRD, RBAC limitations, ...), an error will be logged,
91- // and creation will be retried.
92- func (mgr * Manager ) Deploy () error {
93- object , err := mgr .getCSIAddonsNode ()
94- if err != nil {
95- return fmt .Errorf ("failed to get csiaddonsNode object: %w" , err )
96- }
95+ // deploy creates CSIAddonsNode custom resource with all required information.
96+ // If creating the CSIAddonsNode in the Kubernetes cluster fails (missing CRD, RBAC limitations, ...)
97+ // an error will be logged and creation will be retried.
98+ func (mgr * Manager ) deploy (object * csiaddonsv1alpha1.CSIAddonsNode ) error {
9799
98100 // loop until the CSIAddonsNode has been created
99101 return wait .PollUntilContextTimeout (context .TODO (), nodeCreationRetry , nodeCreationTimeout , true , func (ctx context.Context ) (bool , error ) {
@@ -133,6 +135,10 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro
133135 },
134136 }
135137 _ , err = controllerutil .CreateOrUpdate (ctx , cli , csiaddonNode , func () error {
138+ if ! csiaddonNode .DeletionTimestamp .IsZero () {
139+ return errors .New ("csiaddonnode is being deleted" )
140+ }
141+
136142 // update the resourceVersion
137143 resourceVersion := csiaddonNode .ResourceVersion
138144 if resourceVersion != "" {
@@ -275,3 +281,80 @@ func generateName(nodeID, namespace, ownerKind, ownerName string) (string, error
275281
276282 return base , nil
277283}
284+
285+ // watchCSIAddonsNode starts a watcher for a specific CSIAddonsNode resource identified by its name.
286+ // If a CSIAddonsNode is deleted, it logs a warning and attempts to recreate it using mgr.deploy()
287+ func (mgr * Manager ) watchCSIAddonsNode (node * csiaddonsv1alpha1.CSIAddonsNode ) error {
288+ // Call deploy on start, this takes care of the cases where
289+ // a watcher might exit due to an error while trying to
290+ // recreate CSIAddonsNode in the cluster
291+ err := mgr .deploy (node )
292+ if err != nil {
293+ klog .Fatalf ("Failed to create csiaddonsnode: %v" , err )
294+ }
295+
296+ klog .Infof ("Starting watcher for CSIAddonsNode: %s" , node .Name )
297+
298+ dynamicClient , err := dynamic .NewForConfig (mgr .Config )
299+ if err != nil {
300+ return fmt .Errorf ("failed to create dynamic client: %w" , err )
301+ }
302+
303+ gvr := schema.GroupVersionResource {
304+ Group : csiaddonsv1alpha1 .GroupVersion .Group ,
305+ Version : csiaddonsv1alpha1 .GroupVersion .Version ,
306+ Resource : "csiaddonsnodes" ,
307+ }
308+
309+ watcher , err := dynamicClient .Resource (gvr ).Namespace (node .Namespace ).Watch (context .Background (), v1.ListOptions {
310+ FieldSelector : fmt .Sprintf ("metadata.name=%s" , node .Name ),
311+ })
312+ if err != nil {
313+ return fmt .Errorf ("failed to watch CSIAddonsNode objects: %w" , err )
314+ }
315+ defer watcher .Stop ()
316+
317+ for event := range watcher .ResultChan () {
318+ switch event .Type {
319+ case watch .Deleted :
320+ klog .Infof ("WARNING: An active CSIAddonsNode: %s was deleted, it will be recreated" , node .Name )
321+
322+ err := mgr .deploy (node )
323+ if err != nil {
324+ return fmt .Errorf ("failed to recreate CSIAddonsNode: %w" , err )
325+ }
326+ klog .Infof ("CSIAddonsNode: %s recreated" , node .Name )
327+ }
328+ }
329+
330+ // The channel was closed by the API server without any errors
331+ // Simply log it here and return, the dispatcher is responsible
332+ // for restarting the watcher
333+ klog .Infof ("Watcher for %s exited gracefully, will be restarted soon" , node .Name )
334+
335+ return nil
336+ }
337+
338+ // DispatchWatcher starts a watcher for the CSIAddonsNode and retries
339+ // if the watcher exits due to an error. It will retry up to a maximum number of
340+ // attempts defined by watcherRetryCount before returning an error.
341+ func (mgr * Manager ) DispatchWatcher () error {
342+ retryCount := 0
343+ node , err := mgr .getCSIAddonsNode ()
344+ if err != nil {
345+ return errors .New ("failed to get CSIAddonsNode object" )
346+ }
347+
348+ for retryCount < int (watcherRetryCount ) {
349+ err := mgr .watchCSIAddonsNode (node )
350+ if err != nil {
351+ klog .Errorf ("Watcher for %s exited, retrying (%d/%d), error: %v" , node .Name , retryCount + 1 , watcherRetryCount , err )
352+
353+ retryCount ++
354+ time .Sleep (watcherRetryDelay * time .Duration (retryCount ))
355+ } else {
356+ retryCount = 0
357+ }
358+ }
359+ return fmt .Errorf ("watcher for %s reached max retries, giving up" , node .Name )
360+ }
0 commit comments