diff --git a/apis/serving/v1alpha1/servingruntime_webhook.go b/apis/serving/v1alpha1/servingruntime_webhook.go index 1b92ce0b..57f7e4d8 100644 --- a/apis/serving/v1alpha1/servingruntime_webhook.go +++ b/apis/serving/v1alpha1/servingruntime_webhook.go @@ -32,7 +32,7 @@ import ( // +kubebuilder:webhook:path=/validate-serving-modelmesh-io-v1alpha1-servingruntime,mutating=false,failurePolicy=fail,sideEffects=None,groups=serving.kserve.io,resources=servingruntimes;clusterservingruntimes,verbs=create;update,versions=v1alpha1,name=servingruntime.modelmesh-webhook-server.default,admissionReviewVersions=v1 type ServingRuntimeWebhook struct { Client client.Client - decoder *admission.Decoder + Decoder *admission.Decoder } func (s *ServingRuntimeWebhook) Handle(ctx context.Context, req admission.Request) admission.Response { @@ -42,7 +42,7 @@ func (s *ServingRuntimeWebhook) Handle(ctx context.Context, req admission.Reques if req.Kind.Kind == "ServingRuntime" { servingRuntime := &kservev1alpha.ServingRuntime{} - err := s.decoder.Decode(req, servingRuntime) + err := s.Decoder.Decode(req, servingRuntime) if err != nil { return admission.Errored(http.StatusBadRequest, err) } @@ -58,7 +58,7 @@ func (s *ServingRuntimeWebhook) Handle(ctx context.Context, req admission.Reques } else { clusterServingRuntime := &kservev1alpha.ClusterServingRuntime{} - err := s.decoder.Decode(req, clusterServingRuntime) + err := s.Decoder.Decode(req, clusterServingRuntime) if err != nil { return admission.Errored(http.StatusBadRequest, err) } @@ -92,12 +92,6 @@ func (s *ServingRuntimeWebhook) Handle(ctx context.Context, req admission.Reques return admission.Allowed("Passed all validation checks for ServingRuntime") } -// InjectDecoder injects the decoder. -func (s *ServingRuntimeWebhook) InjectDecoder(d *admission.Decoder) error { - s.decoder = d - return nil -} - // Validation of servingruntime autoscaler class func validateServingRuntimeAutoscaler(annotations map[string]string) error { value, ok := annotations[constants.AutoscalerClass] diff --git a/controllers/predictor_controller.go b/controllers/predictor_controller.go index 5faa392b..605c2fd5 100644 --- a/controllers/predictor_controller.go +++ b/controllers/predictor_controller.go @@ -592,20 +592,20 @@ func (pr *PredictorReconciler) SetupWithManager(mgr ctrl.Manager, eventStream *m watchInferenceServices bool, sourcePluginEvents <-chan event.GenericEvent) error { builder := ctrl.NewControllerManagedBy(mgr). For(&api.Predictor{}). - Watches(&src.Channel{Source: eventStream.MMEvents}, &handler.EnqueueRequestForObject{}) + WatchesRawSource(&src.Channel{Source: eventStream.MMEvents}, &handler.EnqueueRequestForObject{}) if sourcePluginEvents != nil { - builder.Watches(&src.Channel{Source: sourcePluginEvents}, &handler.EnqueueRequestForObject{}) + builder.WatchesRawSource(&src.Channel{Source: sourcePluginEvents}, &handler.EnqueueRequestForObject{}) } if watchInferenceServices { - builder = builder.Watches(&src.Kind{Type: &v1beta1.InferenceService{}}, prefixName(InferenceServiceCRSourceId)) + builder = builder.Watches(&v1beta1.InferenceService{}, prefixName(InferenceServiceCRSourceId)) } return builder.Complete(pr) } func prefixName(prefix string) handler.EventHandler { - return handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + return handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { // Prepend prefix return []reconcile.Request{{ NamespacedName: types.NamespacedName{ diff --git a/controllers/service_controller.go b/controllers/service_controller.go index 4514b19d..fd394a11 100644 --- a/controllers/service_controller.go +++ b/controllers/service_controller.go @@ -46,7 +46,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/source" ) const ( @@ -447,7 +446,7 @@ func (r *ServiceReconciler) setupForNamespaceScope(builder *bld.Builder) { UpdateFunc: func(event event.UpdateEvent) bool { return filter(event.ObjectNew) }, DeleteFunc: func(event event.DeleteEvent) bool { return false }, })). - Watches(&source.Kind{Type: &corev1.ConfigMap{}}, + Watches(&corev1.ConfigMap{}, config.ConfigWatchHandler(r.ConfigMapName, func() []reconcile.Request { if _, _, changed := r.getMMService(r.ControllerDeployment.Namespace, r.ConfigProvider, true); changed { r.Log.Info("Triggering service reconciliation after config change") @@ -458,8 +457,8 @@ func (r *ServiceReconciler) setupForNamespaceScope(builder *bld.Builder) { // Enable ServiceMonitor watch if ServiceMonitorCRDExists if r.ServiceMonitorCRDExists { - builder.Watches(&source.Kind{Type: &monitoringv1.ServiceMonitor{}}, - handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + builder.Watches(&monitoringv1.ServiceMonitor{}, + handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { if o.GetName() == serviceMonitorName && o.GetNamespace() == r.ControllerDeployment.Namespace { return []reconcile.Request{{NamespacedName: r.ControllerDeployment}} } @@ -470,7 +469,7 @@ func (r *ServiceReconciler) setupForNamespaceScope(builder *bld.Builder) { func (r *ServiceReconciler) setupForClusterScope(builder *bld.Builder) { builder.For(&corev1.Namespace{}). - Watches(&source.Kind{Type: &corev1.ConfigMap{}}, + Watches(&corev1.ConfigMap{}, config.ConfigWatchHandler(r.ConfigMapName, func() []reconcile.Request { list := &corev1.NamespaceList{} if err := r.Client.List(context.TODO(), list); err != nil { @@ -492,8 +491,8 @@ func (r *ServiceReconciler) setupForClusterScope(builder *bld.Builder) { // Enable ServiceMonitor watch if ServiceMonitorCRDExists if r.ServiceMonitorCRDExists { - builder.Watches(&source.Kind{Type: &monitoringv1.ServiceMonitor{}}, - handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + builder.Watches(&monitoringv1.ServiceMonitor{}, + handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { if o.GetName() == serviceMonitorName { return []reconcile.Request{{ NamespacedName: types.NamespacedName{Name: o.GetNamespace()}, diff --git a/controllers/servingruntime_controller.go b/controllers/servingruntime_controller.go index adbfd46a..28144afd 100644 --- a/controllers/servingruntime_controller.go +++ b/controllers/servingruntime_controller.go @@ -606,7 +606,7 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager, For(&kserveapi.ServingRuntime{}). Owns(&appsv1.Deployment{}). // watch the user configmap and reconcile all runtimes when it changes - Watches(&source.Kind{Type: &corev1.ConfigMap{}}, + Watches(&corev1.ConfigMap{}, config.ConfigWatchHandler(r.ConfigMapName, func() []reconcile.Request { return r.requestsForRuntimes("", func(namespace string) bool { mme, err := modelMeshEnabled2(context.TODO(), namespace, @@ -615,22 +615,22 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager, }) }, r.ConfigProvider, &r.Client)). // watch predictors and reconcile the corresponding runtime(s) it could be assigned to - Watches(&source.Kind{Type: &api.Predictor{}}, - handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + Watches(&api.Predictor{}, + handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { return r.runtimeRequestsForPredictor(o.(*api.Predictor), "Predictor") })) if r.ClusterScope { // watch namespaces to check the modelmesh-enabled flag - builder = builder.Watches(&source.Kind{Type: &corev1.Namespace{}}, handler.EnqueueRequestsFromMapFunc( - func(o client.Object) []reconcile.Request { + builder = builder.Watches(&corev1.Namespace{}, handler.EnqueueRequestsFromMapFunc( + func(_ context.Context, o client.Object) []reconcile.Request { return r.requestsForRuntimes(o.GetName(), nil) })) } if watchInferenceServices { - builder = builder.Watches(&source.Kind{Type: &v1beta1.InferenceService{}}, - handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + builder = builder.Watches(&v1beta1.InferenceService{}, + handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { if p, _ := predictor_source.BuildBasePredictorFromInferenceService(o.(*v1beta1.InferenceService)); p != nil { return r.runtimeRequestsForPredictor(p, "InferenceService") } @@ -639,26 +639,26 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager, } if r.EnableCSRWatch { - builder = builder.Watches(&source.Kind{Type: &kserveapi.ClusterServingRuntime{}}, - handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + builder = builder.Watches(&kserveapi.ClusterServingRuntime{}, + handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { return r.clusterServingRuntimeRequests(o.(*kserveapi.ClusterServingRuntime)) })) } if r.EnableSecretWatch { - builder = builder.Watches(&source.Kind{Type: &corev1.Secret{}}, - handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + builder = builder.Watches(&corev1.Secret{}, + handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { return r.storageSecretRequests(o.(*corev1.Secret)) })) } if sourcePluginEvents != nil { - builder.Watches(&source.Channel{Source: sourcePluginEvents}, - handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + builder.WatchesRawSource(&source.Channel{Source: sourcePluginEvents}, + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { nn, src := predictor_source.ResolveSource(types.NamespacedName{ Name: o.GetName(), Namespace: o.GetNamespace()}, PredictorCRSourceId) if registry, ok := r.RegistryMap[src]; ok { - if p, _ := registry.Get(context.TODO(), nn); p != nil { + if p, _ := registry.Get(ctx, nn); p != nil { return r.runtimeRequestsForPredictor(p, registry.GetSourceName()) } } diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 39075a83..a02c9299 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/yaml" kserveapi "github.com/kserve/kserve/pkg/apis/serving/v1alpha1" @@ -101,8 +102,8 @@ var _ = BeforeSuite(func() { // +kubebuilder:scaffold:scheme k8sManager, err = ctrl.NewManager(cfg, ctrl.Options{ - Scheme: scheme.Scheme, - MetricsBindAddress: "0", //This disables the metrics server + Scheme: scheme.Scheme, + Metrics: server.Options{BindAddress: "0"}, //This disables the metrics server }) Expect(err).ToNot(HaveOccurred()) diff --git a/main.go b/main.go index fc3c071d..d38faa42 100644 --- a/main.go +++ b/main.go @@ -43,10 +43,13 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" _ "k8s.io/client-go/plugin/pkg/client/auth/oidc" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" @@ -219,14 +222,18 @@ func main() { mgrOpts := ctrl.Options{ Scheme: scheme, - MetricsBindAddress: metricsAddr, - Port: 9443, + Metrics: server.Options{BindAddress: metricsAddr}, + WebhookServer: webhook.NewServer(webhook.Options{Port: 9443}), HealthProbeBindAddress: probeAddr, } if !clusterScopeMode { // Set manager to operate scoped to our namespace - mgrOpts.Namespace = ControllerNamespace + mgrOpts.Cache = cache.Options{ + DefaultNamespaces: map[string]cache.Config{ + ControllerNamespace: {}, + }, + } } if enableLeaderElection { @@ -261,10 +268,12 @@ func main() { } // Setup servingruntime validating webhook + // TODO: Rework webhook setup using builder.WebhookManagedBy, so that there is no need to worry about the Decoder hookServer := mgr.GetWebhookServer() servingRuntimeWebhook := &webhook.Admission{ Handler: &servingv1alpha1.ServingRuntimeWebhook{ - Client: mgr.GetClient(), + Client: mgr.GetClient(), + Decoder: admission.NewDecoder(mgr.GetScheme()), }, } hookServer.Register("/validate-serving-modelmesh-io-v1alpha1-servingruntime", servingRuntimeWebhook) diff --git a/pkg/config/config.go b/pkg/config/config.go index 3f0fd708..ca2db8a0 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -232,10 +232,10 @@ func (cp *ConfigProvider) ReloadConfigMap(ctx context.Context, c client.Client, // Handler used by controllers which depend on the user configuration func ConfigWatchHandler(configMapName types.NamespacedName, f func() []reconcile.Request, cp *ConfigProvider, kclient *client.Client) handler.EventHandler { - return handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { // Ignore ConfigMaps we don't care about if o.GetName() == configMapName.Name && o.GetNamespace() == configMapName.Namespace { - err := cp.ReloadConfigMap(context.TODO(), *kclient, configMapName) + err := cp.ReloadConfigMap(ctx, *kclient, configMapName) if err != nil { configLog.Error(err, "Unable to reload user configuration") }