Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Klues <[email protected]>
  • Loading branch information
klueska committed Jan 11, 2025
1 parent 72cffb1 commit 55b81c0
Showing 1 changed file with 79 additions and 11 deletions.
90 changes: 79 additions & 11 deletions cmd/nvidia-dra-controller/mnenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ import (
)

const (
resourceClaimFinalizer = "gpu.nvidia.com/finalizer.multiNodeEnvironment"
imexDeviceClass = "imex.nvidia.com"
multiNodeEnvironmentFinalizer = "gpu.nvidia.com/finalizer.multiNodeEnvironment"
imexDeviceClass = "imex.nvidia.com"

MultiNodeEnvironmentAddEvent = "onMultiNodeEnvironmentAddEvent"
MultiNodeEnvironmentDeleteEvent = "onMultiNodeEnvironmentDeleteEvent"
ResourceClaimAddEvent = "ResourceClaimAddEvent"
DeviceClassAddEvent = "DeviceClassAddEvent"
)

type WorkItem struct {
Expand All @@ -58,24 +59,30 @@ type MultiNodeEnvironmentManager struct {

multiNodeEnvironmentLister nvlisters.MultiNodeEnvironmentLister
resourceClaimLister resourcelisters.ResourceClaimLister
deviceClassLister resourcelisters.DeviceClassLister
}

// StartManager starts a MultiNodeEnvironmentManager.
func StartMultiNodeEnvironmentManager(ctx context.Context, config *Config) (*MultiNodeEnvironmentManager, error) {
queue := workqueue.New(workqueue.DefaultControllerRateLimiter())

mneInformerFactory := nvinformers.NewSharedInformerFactory(config.clientsets.Nvidia, 30*time.Second)
mneInformer := mneInformerFactory.Gpu().V1alpha1().MultiNodeEnvironments().Informer()
nvInformerFactory := nvinformers.NewSharedInformerFactory(config.clientsets.Nvidia, 30*time.Second)
coreInformerFactory := informers.NewSharedInformerFactory(config.clientsets.Core, 30*time.Second)

mneInformer := nvInformerFactory.Gpu().V1alpha1().MultiNodeEnvironments().Informer()
mneLister := nvlisters.NewMultiNodeEnvironmentLister(mneInformer.GetIndexer())

rcInformerFactory := informers.NewSharedInformerFactory(config.clientsets.Core, 30*time.Second)
rcInformer := rcInformerFactory.Resource().V1beta1().ResourceClaims().Informer()
rcInformer := coreInformerFactory.Resource().V1beta1().ResourceClaims().Informer()
rcLister := resourcelisters.NewResourceClaimLister(rcInformer.GetIndexer())

dcInformer := coreInformerFactory.Resource().V1beta1().DeviceClasses().Informer()
dcLister := resourcelisters.NewDeviceClassLister(dcInformer.GetIndexer())

m := &MultiNodeEnvironmentManager{
clientsets: config.clientsets,
multiNodeEnvironmentLister: mneLister,
resourceClaimLister: rcLister,
deviceClassLister: dcLister,
}

mneInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{

Check failure on line 88 in cmd/nvidia-dra-controller/mnenv.go

View workflow job for this annotation

GitHub Actions / check

Error return value of `mneInformer.AddEventHandler` is not checked (errcheck)
Expand All @@ -87,21 +94,25 @@ func StartMultiNodeEnvironmentManager(ctx context.Context, config *Config) (*Mul
AddFunc: func(obj any) { queue.Enqueue(obj, m.onResourceClaimAdd) },
})

dcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{

Check failure on line 97 in cmd/nvidia-dra-controller/mnenv.go

View workflow job for this annotation

GitHub Actions / check

Error return value of `dcInformer.AddEventHandler` is not checked (errcheck)
AddFunc: func(obj any) { queue.Enqueue(obj, m.onDeviceClassAdd) },
})

m.waitGroup.Add(3)
go func() {
defer m.waitGroup.Done()
rcInformerFactory.Start(ctx.Done())
nvInformerFactory.Start(ctx.Done())
}()
go func() {
defer m.waitGroup.Done()
mneInformerFactory.Start(ctx.Done())
coreInformerFactory.Start(ctx.Done())
}()
go func() {
defer m.waitGroup.Done()
queue.Run(ctx.Done())
}()

if !cache.WaitForCacheSync(ctx.Done(), mneInformer.HasSynced, rcInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), mneInformer.HasSynced, rcInformer.HasSynced, dcInformer.HasSynced) {
klog.Warning("Cache sync failed; retrying in 5 seconds")
time.Sleep(5 * time.Second)
if !cache.WaitForCacheSync(ctx.Done(), mneInformer.HasSynced, rcInformer.HasSynced) {
Expand Down Expand Up @@ -157,7 +168,7 @@ func (m *MultiNodeEnvironmentManager) onMultiNodeEnvironmentAdd(obj any) error {
Name: mne.Spec.ResourceClaimName,
Namespace: mne.Namespace,
OwnerReferences: []metav1.OwnerReference{ownerReference},
Finalizers: []string{resourceClaimFinalizer},
Finalizers: []string{multiNodeEnvironmentFinalizer},
},
Spec: resourceapi.ResourceClaimSpec{
Devices: resourceapi.DeviceClaim{
Expand Down Expand Up @@ -222,6 +233,37 @@ func (m *MultiNodeEnvironmentManager) onResourceClaimAdd(obj any) error {
return nil
}

func (m *MultiNodeEnvironmentManager) onDeviceClassAdd(obj any) error {
dc, ok := obj.(*resourceapi.DeviceClass)
if !ok {
return fmt.Errorf("failed to cast to DeviceClass")
}

klog.Infof("Processing added DeviceClass: %s/%s", dc.Namespace, dc.Name)

if len(dc.OwnerReferences) != 1 {
return nil
}

if dc.OwnerReferences[0].Kind != nvapi.MultiNodeEnvironmentKind {
return nil
}

_, err := m.multiNodeEnvironmentLister.MultiNodeEnvironments(dc.Namespace).Get(dc.OwnerReferences[0].Name)
if err == nil {
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("error retrieving DeviceClass's OwnerReference '%s': %w", dc.OwnerReferences[0].Name, err)
}

if err := m.removeDeviceClassFinalizer(dc.Name); err != nil {
return fmt.Errorf("error removing finalizer on DeviceClass '%s': %w", dc.Name, err)
}

return nil
}

func (m *MultiNodeEnvironmentManager) removeResourceClaimFinalizer(namespace, name string) error {
rc, err := m.resourceClaimLister.ResourceClaims(namespace).Get(name)
if err != nil && errors.IsNotFound(err) {
Expand All @@ -235,7 +277,7 @@ func (m *MultiNodeEnvironmentManager) removeResourceClaimFinalizer(namespace, na

newRC.Finalizers = []string{}
for _, f := range rc.Finalizers {
if f != resourceClaimFinalizer {
if f != multiNodeEnvironmentFinalizer {
newRC.Finalizers = append(newRC.Finalizers, f)
}
}
Expand All @@ -247,3 +289,29 @@ func (m *MultiNodeEnvironmentManager) removeResourceClaimFinalizer(namespace, na

return nil
}

func (m *MultiNodeEnvironmentManager) removeDeviceClassFinalizer(name string) error {
dc, err := m.deviceClassLister.Get(name)
if err != nil && errors.IsNotFound(err) {
return fmt.Errorf("DeviceClass not found")
}
if err != nil {
return fmt.Errorf("error retrieving DeviceClass: %w", err)
}

newDC := dc.DeepCopy()

newDC.Finalizers = []string{}
for _, f := range dc.Finalizers {
if f != multiNodeEnvironmentFinalizer {
newDC.Finalizers = append(newDC.Finalizers, f)
}
}

_, err = m.clientsets.Core.ResourceV1beta1().DeviceClasses().Update(context.Background(), newDC, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update DeviceClass: %w", err)
}

return nil
}

0 comments on commit 55b81c0

Please sign in to comment.