Skip to content

Commit

Permalink
Move redundant methods into a common utility package
Browse files Browse the repository at this point in the history
Signed-off-by: Shiva Krishna, Merla <[email protected]>
  • Loading branch information
shivamerla committed Feb 4, 2025
1 parent c67e045 commit 6f1dd25
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 645 deletions.
73 changes: 72 additions & 1 deletion internal/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ const (
)

// Updater is the condition updater

type Updater interface {
SetConditionsReady(ctx context.Context, cr client.Object, reason, message string) error
SetConditionsNotReady(ctx context.Context, cr client.Object, reason, message string) error
Expand All @@ -88,6 +87,8 @@ func (u *updater) SetConditionsReady(ctx context.Context, obj client.Object, rea
return u.SetConditionsReadyNemoEntitystore(ctx, cr, reason, message)
case *appsv1alpha1.NemoCustomizer:
return u.SetConditionsReadyNemoCustomizer(ctx, cr, reason, message)
case *appsv1alpha1.NemoDatastore:
return u.SetConditionsReadyNemoDatastore(ctx, cr, reason, message)
default:
return fmt.Errorf("unknown CRD type for %v", obj)
}
Expand Down Expand Up @@ -144,6 +145,23 @@ func (u *updater) SetConditionsReadyNemoEntitystore(ctx context.Context, cr *app
return u.updateNemoEntitystoreStatus(ctx, cr)
}

func (u *updater) SetConditionsReadyNemoDatastore(ctx context.Context, cr *appsv1alpha1.NemoDatastore, reason, message string) error {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: Ready,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
})

meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: Failed,
Status: metav1.ConditionFalse,
Reason: Ready,
})
cr.Status.State = v1alpha1.NemoDatastoreStatusReady
return u.updateNemoDatastoreStatus(ctx, cr)
}

func (u *updater) SetConditionsReadyNemoCustomizer(ctx context.Context, cr *appsv1alpha1.NemoCustomizer, reason, message string) error {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: Ready,
Expand All @@ -169,6 +187,8 @@ func (u *updater) SetConditionsNotReady(ctx context.Context, obj client.Object,
return u.SetConditionsNotReadyNemoGuardrail(ctx, cr, reason, message)
case *appsv1alpha1.NemoEntitystore:
return u.SetConditionsNotReadyNemoEntitystore(ctx, cr, reason, message)
case *appsv1alpha1.NemoDatastore:
return u.SetConditionsNotReadyNemoDatastore(ctx, cr, reason, message)
case *appsv1alpha1.NemoCustomizer:
return u.SetConditionsNotReadyNemoCustomizer(ctx, cr, reason, message)
default:
Expand Down Expand Up @@ -230,6 +250,24 @@ func (u *updater) SetConditionsNotReadyNemoEntitystore(ctx context.Context, cr *
return u.updateNemoEntitystoreStatus(ctx, cr)
}

func (u *updater) SetConditionsNotReadyNemoDatastore(ctx context.Context, cr *appsv1alpha1.NemoDatastore, reason, message string) error {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: Ready,
Status: metav1.ConditionFalse,
Reason: reason,
Message: message,
})

meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: Failed,
Status: metav1.ConditionFalse,
Reason: Ready,
Message: message,
})
cr.Status.State = v1alpha1.NemoDatastoreStatusNotReady
return u.updateNemoDatastoreStatus(ctx, cr)
}

func (u *updater) SetConditionsNotReadyNemoCustomizer(ctx context.Context, cr *appsv1alpha1.NemoCustomizer, reason, message string) error {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: Ready,
Expand All @@ -256,6 +294,8 @@ func (u *updater) SetConditionsFailed(ctx context.Context, obj client.Object, re
return u.SetConditionsFailedNemoGuardrail(ctx, cr, reason, message)
case *appsv1alpha1.NemoEntitystore:
return u.SetConditionsFailedNemoEntitystore(ctx, cr, reason, message)
case *appsv1alpha1.NemoDatastore:
return u.SetConditionsFailedNemoDatastore(ctx, cr, reason, message)
case *appsv1alpha1.NemoCustomizer:
return u.SetConditionsFailedNemoCustomizer(ctx, cr, reason, message)
default:
Expand Down Expand Up @@ -314,6 +354,23 @@ func (u *updater) SetConditionsFailedNemoEntitystore(ctx context.Context, cr *ap
return u.updateNemoEntitystoreStatus(ctx, cr)
}

func (u *updater) SetConditionsFailedNemoDatastore(ctx context.Context, cr *appsv1alpha1.NemoDatastore, reason, message string) error {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: Ready,
Status: metav1.ConditionFalse,
Reason: Failed,
})

meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: Failed,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
})
cr.Status.State = v1alpha1.NemoDatastoreStatusFailed
return u.updateNemoDatastoreStatus(ctx, cr)
}

func (u *updater) SetConditionsFailedNemoCustomizer(ctx context.Context, cr *appsv1alpha1.NemoCustomizer, reason, message string) error {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: Ready,
Expand Down Expand Up @@ -371,6 +428,19 @@ func (u *updater) updateNemoEntitystoreStatus(ctx context.Context, cr *appsv1alp
return nil
}

func (u *updater) updateNemoDatastoreStatus(ctx context.Context, cr *appsv1alpha1.NemoDatastore) error {
obj := &appsv1alpha1.NemoDatastore{}
errGet := u.client.Get(ctx, types.NamespacedName{Name: cr.Name, Namespace: cr.GetNamespace()}, obj)
if errGet != nil {
return errGet
}
obj.Status = cr.Status
if err := u.client.Status().Update(ctx, obj); err != nil {
return err
}
return nil
}

func (u *updater) updateNemoCustomizerStatus(ctx context.Context, cr *appsv1alpha1.NemoCustomizer) error {
obj := &appsv1alpha1.NemoCustomizer{}
errGet := u.client.Get(ctx, types.NamespacedName{Name: cr.Name, Namespace: cr.GetNamespace()}, obj)
Expand Down Expand Up @@ -408,6 +478,7 @@ func UpdateCondition(conditions *[]metav1.Condition, conditionType string, statu
// condition updated
}

// IfPresentUpdateCondition updates an already existing condition
func IfPresentUpdateCondition(conditions *[]metav1.Condition, conditionType string, status metav1.ConditionStatus, reason, message string) {
for i := range *conditions {
if (*conditions)[i].Type == conditionType {
Expand Down
168 changes: 6 additions & 162 deletions internal/controller/nemo_datastore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/NVIDIA/k8s-nim-operator/internal/k8sutil"
"github.com/NVIDIA/k8s-nim-operator/internal/render"
"github.com/NVIDIA/k8s-nim-operator/internal/shared"
"github.com/NVIDIA/k8s-nim-operator/internal/utils"
"github.com/go-logr/logr"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -35,7 +34,6 @@ import (
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -329,7 +327,7 @@ func (r *NemoDatastoreReconciler) reconcileNemoDatastore(ctx context.Context, ne
return ctrl.Result{}, err
}
} else {
err = r.cleanupResource(ctx, &networkingv1.Ingress{}, namespacedName)
err = k8sutil.CleanupResource(ctx, r.GetClient(), &networkingv1.Ingress{}, namespacedName)
if err != nil && !errors.IsNotFound(err) {
return ctrl.Result{}, err
}
Expand All @@ -345,7 +343,7 @@ func (r *NemoDatastoreReconciler) reconcileNemoDatastore(ctx context.Context, ne
}
} else {
// If autoscaling is disabled, ensure the HPA is deleted
err = r.cleanupResource(ctx, &autoscalingv2.HorizontalPodAutoscaler{}, namespacedName)
err = k8sutil.CleanupResource(ctx, r.GetClient(), &autoscalingv2.HorizontalPodAutoscaler{}, namespacedName)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -393,19 +391,19 @@ func (r *NemoDatastoreReconciler) reconcileNemoDatastore(ctx context.Context, ne
}

// Wait for deployment
msg, ready, err := r.isDeploymentReady(ctx, &namespacedName)
msg, ready, err := k8sutil.IsDeploymentReady(ctx, r.GetClient(), &namespacedName)
if err != nil {
return ctrl.Result{}, err
}

if !ready {
// Update status as NotReady
err = r.SetConditionsNotReady(ctx, nemoDatastore, conditions.NotReady, msg)
err = r.updater.SetConditionsNotReady(ctx, nemoDatastore, conditions.NotReady, msg)
r.GetEventRecorder().Eventf(nemoDatastore, corev1.EventTypeNormal, conditions.NotReady,
"NemoDatastore %s not ready yet, msg: %s", nemoDatastore.Name, msg)
} else {
// Update status as ready
err = r.SetConditionsReady(ctx, nemoDatastore, conditions.Ready, msg)
err = r.updater.SetConditionsReady(ctx, nemoDatastore, conditions.Ready, msg)
r.GetEventRecorder().Eventf(nemoDatastore, corev1.EventTypeNormal, conditions.Ready,
"NemoDatastore %s ready, msg: %s", nemoDatastore.Name, msg)
}
Expand Down Expand Up @@ -494,7 +492,7 @@ func (r *NemoDatastoreReconciler) renderAndSyncResource(ctx context.Context, Nem
return err
}

err = r.syncResource(ctx, obj, resource, namespacedName)
err = k8sutil.SyncResource(ctx, r.GetClient(), obj, resource, namespacedName)
if err != nil {
logger.Error(err, "failed to sync", conditionType, namespacedName)
statusError := r.updater.SetConditionsFailed(ctx, NemoDatastore, reason, err.Error())
Expand All @@ -505,157 +503,3 @@ func (r *NemoDatastoreReconciler) renderAndSyncResource(ctx context.Context, Nem
}
return nil
}

// CheckDeploymentReadiness checks if the Deployment is ready
func (r *NemoDatastoreReconciler) isDeploymentReady(ctx context.Context, namespacedName *types.NamespacedName) (string, bool, error) {
deployment := &appsv1.Deployment{}
err := r.Get(ctx, client.ObjectKey{Name: namespacedName.Name, Namespace: namespacedName.Namespace}, deployment)
if err != nil {
if errors.IsNotFound(err) {
return "", false, nil
}
return "", false, err
}

cond := getDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing)
if cond != nil && cond.Reason == "ProgressDeadlineExceeded" {
return fmt.Sprintf("deployment %q exceeded its progress deadline", deployment.Name), false, nil
}
if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), false, nil
}
if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), false, nil
}
if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), false, nil
}
return fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name), true, nil
}

func (r *NemoDatastoreReconciler) syncResource(ctx context.Context, obj client.Object, desired client.Object, namespacedName types.NamespacedName) error {
logger := log.FromContext(ctx)

err := r.Get(ctx, namespacedName, obj)
if err != nil && !errors.IsNotFound(err) {
return err
}

if !utils.IsSpecChanged(obj, desired) {
logger.V(2).Info("Object spec has not changed, skipping update", "obj", obj)
return nil
}
logger.V(2).Info("Object spec has changed, updating")

if errors.IsNotFound(err) {
err = r.Create(ctx, desired)
if err != nil {
return err
}
} else {
err = r.Update(ctx, desired)
if err != nil {
return err
}
}
return nil
}

// cleanupResource deletes the given Kubernetes resource if it exists.
// If the resource does not exist or an error occurs during deletion, the function returns nil or the error.
//
// Parameters:
// ctx (context.Context): The context for the operation.
// obj (client.Object): The Kubernetes resource to delete.
// namespacedName (types.NamespacedName): The namespaced name of the resource.
//
// Returns:
// error: An error if the resource deletion fails, or nil if the resource is not found or deletion is successful.
func (r *NemoDatastoreReconciler) cleanupResource(ctx context.Context, obj client.Object, namespacedName types.NamespacedName) error {

logger := log.FromContext(ctx)

err := r.Get(ctx, namespacedName, obj)
if err != nil && !errors.IsNotFound(err) {
return err
}

if errors.IsNotFound(err) {
return nil
}

err = r.Delete(ctx, obj)
if err != nil {
return err
}
logger.V(2).Info("NIM Service object changed, deleting ", "obj", obj)
return nil
}

func (r *NemoDatastoreReconciler) SetConditionsReady(ctx context.Context, cr *appsv1alpha1.NemoDatastore, reason, message string) error {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: conditions.Ready,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
})

meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: conditions.Failed,
Status: metav1.ConditionFalse,
Reason: conditions.Ready,
})

cr.Status.State = appsv1alpha1.NemoDatastoreStatusReady
return r.updateNemoDatastoreStatus(ctx, cr)
}

func (r *NemoDatastoreReconciler) SetConditionsNotReady(ctx context.Context, cr *appsv1alpha1.NemoDatastore, reason, message string) error {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: conditions.Ready,
Status: metav1.ConditionFalse,
Reason: reason,
Message: message,
})

meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: conditions.Failed,
Status: metav1.ConditionFalse,
Reason: conditions.Ready,
Message: message,
})

cr.Status.State = appsv1alpha1.NemoDatastoreStatusNotReady
return r.updateNemoDatastoreStatus(ctx, cr)
}

func (r *NemoDatastoreReconciler) SetConditionsFailed(ctx context.Context, cr *appsv1alpha1.NemoDatastore, reason, message string) error {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: conditions.Ready,
Status: metav1.ConditionFalse,
Reason: conditions.Failed,
})

meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: conditions.Failed,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
})
cr.Status.State = appsv1alpha1.NemoDatastoreStatusFailed
return r.updateNemoDatastoreStatus(ctx, cr)
}

func (r *NemoDatastoreReconciler) updateNemoDatastoreStatus(ctx context.Context, cr *appsv1alpha1.NemoDatastore) error {

obj := &appsv1alpha1.NemoDatastore{}
errGet := r.Get(ctx, types.NamespacedName{Name: cr.Name, Namespace: cr.GetNamespace()}, obj)
if errGet != nil {
return errGet
}
obj.Status = cr.Status
if err := r.Status().Update(ctx, obj); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 6f1dd25

Please sign in to comment.