diff --git a/cmd/compute-domain-kubelet-plugin/device_state.go b/cmd/compute-domain-kubelet-plugin/device_state.go index a1466e3f..daf7fec0 100644 --- a/cmd/compute-domain-kubelet-plugin/device_state.go +++ b/cmd/compute-domain-kubelet-plugin/device_state.go @@ -382,7 +382,7 @@ func (s *DeviceState) applyComputeDomainChannelConfig(ctx context.Context, confi for _, r := range results { channel := s.allocatable[r.Device].Channel if err := s.computeDomainManager.AssertComputeDomainNamespace(ctx, claim.Namespace, config.DomainID); err != nil { - return nil, fmt.Errorf("error asserting ComputeDomain's namespace: %w", err) + return nil, permanentError{fmt.Errorf("error asserting ComputeDomain's namespace: %w", err)} } if err := s.computeDomainManager.AddNodeLabel(ctx, config.DomainID); err != nil { return nil, fmt.Errorf("error adding Node label for ComputeDomain: %w", err) diff --git a/cmd/compute-domain-kubelet-plugin/driver.go b/cmd/compute-domain-kubelet-plugin/driver.go index eda5318f..4cdf4787 100644 --- a/cmd/compute-domain-kubelet-plugin/driver.go +++ b/cmd/compute-domain-kubelet-plugin/driver.go @@ -18,6 +18,7 @@ package main import ( "context" + "errors" "fmt" "sync" "time" @@ -31,6 +32,15 @@ import ( "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue" ) +// ErrorRetryMaxTimeout is the max amount of time we retry when errors are +// returned before giving up. +const ErrorRetryMaxTimeout = 45 * time.Second + +// permanentError defines an error indicating that it is permanent. +// By default, every error will be retried up to ErrorRetryMaxTimeout. +// Errors marked as permament will not be retried. +type permanentError struct{ error } + var _ drapbv1.DRAPluginServer = &driver{} type driver struct { @@ -98,19 +108,19 @@ func (d *driver) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrep preparedResources := &drapbv1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1.NodePrepareResourceResponse{}} var wg sync.WaitGroup - ctx, cancel := context.WithTimeout(ctx, 45*time.Second) + ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout) workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter()) for _, claim := range req.Claims { wg.Add(1) workQueue.EnqueueRaw(claim, func(ctx context.Context, obj any) error { - prepared := d.nodePrepareResource(ctx, claim) - if prepared.Error != "" { - return fmt.Errorf("%s", prepared.Error) + done, prepared := d.nodePrepareResource(ctx, claim) + if done { + preparedResources.Claims[claim.UID] = prepared + wg.Done() + return nil } - preparedResources.Claims[claim.UID] = prepared - wg.Done() - return nil + return fmt.Errorf("%s", prepared.Error) }) } @@ -129,19 +139,19 @@ func (d *driver) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUn unpreparedResources := &drapbv1.NodeUnprepareResourcesResponse{Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{}} var wg sync.WaitGroup - ctx, cancel := context.WithTimeout(ctx, 45*time.Second) + ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout) workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter()) for _, claim := range req.Claims { wg.Add(1) workQueue.EnqueueRaw(claim, func(ctx context.Context, obj any) error { - unprepared := d.nodeUnprepareResource(ctx, claim) - if unprepared.Error != "" { - return fmt.Errorf("%s", unprepared.Error) + done, unprepared := d.nodeUnprepareResource(ctx, claim) + if done { + unpreparedResources.Claims[claim.UID] = unprepared + wg.Done() + return nil } - unpreparedResources.Claims[claim.UID] = unprepared - wg.Done() - return nil + return fmt.Errorf("%s", unprepared.Error) }) } @@ -155,7 +165,7 @@ func (d *driver) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUn return unpreparedResources, nil } -func (d *driver) nodePrepareResource(ctx context.Context, claim *drapbv1.Claim) *drapbv1.NodePrepareResourceResponse { +func (d *driver) nodePrepareResource(ctx context.Context, claim *drapbv1.Claim) (bool, *drapbv1.NodePrepareResourceResponse) { d.Lock() defer d.Unlock() @@ -164,33 +174,43 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *drapbv1.Claim) claim.Name, metav1.GetOptions{}) if err != nil { - return &drapbv1.NodePrepareResourceResponse{ + ret := &drapbv1.NodePrepareResourceResponse{ Error: fmt.Sprintf("failed to fetch ResourceClaim %s in namespace %s", claim.Name, claim.Namespace), } + return isPermanentError(err), ret } prepared, err := d.state.Prepare(ctx, resourceClaim) if err != nil { - return &drapbv1.NodePrepareResourceResponse{ + ret := &drapbv1.NodePrepareResourceResponse{ Error: fmt.Sprintf("error preparing devices for claim %v: %v", claim.UID, err), } + return isPermanentError(err), ret } klog.Infof("Returning newly prepared devices for claim '%v': %v", claim.UID, prepared) - return &drapbv1.NodePrepareResourceResponse{Devices: prepared} + return true, &drapbv1.NodePrepareResourceResponse{Devices: prepared} } -func (d *driver) nodeUnprepareResource(ctx context.Context, claim *drapbv1.Claim) *drapbv1.NodeUnprepareResourceResponse { +func (d *driver) nodeUnprepareResource(ctx context.Context, claim *drapbv1.Claim) (bool, *drapbv1.NodeUnprepareResourceResponse) { d.Lock() defer d.Unlock() if err := d.state.Unprepare(ctx, claim.UID); err != nil { - return &drapbv1.NodeUnprepareResourceResponse{ + ret := &drapbv1.NodeUnprepareResourceResponse{ Error: fmt.Sprintf("error unpreparing devices for claim %v: %v", claim.UID, err), } + return isPermanentError(err), ret } - return &drapbv1.NodeUnprepareResourceResponse{} + return true, &drapbv1.NodeUnprepareResourceResponse{} +} + +func isPermanentError(err error) bool { + if errors.As(err, &permanentError{}) { + return true + } + return false } // TODO: implement loop to remove CDI files from the CDI path for claimUIDs