Skip to content

Commit 4673aeb

Browse files
add watches for owned resources and CRBs
Signed-off-by: Kevin <[email protected]>
1 parent 077121e commit 4673aeb

File tree

4 files changed

+119
-32
lines changed

4 files changed

+119
-32
lines changed

config/rbac/role.yaml

+14
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ rules:
113113
- create
114114
- delete
115115
- get
116+
- list
116117
- patch
118+
- watch
117119
- apiGroups:
118120
- ""
119121
resources:
@@ -122,8 +124,10 @@ rules:
122124
- create
123125
- delete
124126
- get
127+
- list
125128
- patch
126129
- update
130+
- watch
127131
- apiGroups:
128132
- ""
129133
resources:
@@ -132,8 +136,10 @@ rules:
132136
- create
133137
- delete
134138
- get
139+
- list
135140
- patch
136141
- update
142+
- watch
137143
- apiGroups:
138144
- dscinitialization.opendatahub.io
139145
resources:
@@ -204,8 +210,10 @@ rules:
204210
- create
205211
- delete
206212
- get
213+
- list
207214
- patch
208215
- update
216+
- watch
209217
- apiGroups:
210218
- networking.k8s.io
211219
resources:
@@ -214,8 +222,10 @@ rules:
214222
- create
215223
- delete
216224
- get
225+
- list
217226
- patch
218227
- update
228+
- watch
219229
- apiGroups:
220230
- ray.io
221231
resources:
@@ -262,8 +272,10 @@ rules:
262272
- create
263273
- delete
264274
- get
275+
- list
265276
- patch
266277
- update
278+
- watch
267279
- apiGroups:
268280
- route.openshift.io
269281
resources:
@@ -273,8 +285,10 @@ rules:
273285
- create
274286
- delete
275287
- get
288+
- list
276289
- patch
277290
- update
291+
- watch
278292
- apiGroups:
279293
- scheduling.k8s.io
280294
resources:

main.go

+40-2
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,13 @@ import (
3636
"golang.org/x/exp/slices"
3737

3838
corev1 "k8s.io/api/core/v1"
39+
networkingv1 "k8s.io/api/networking/v1"
40+
rbacv1 "k8s.io/api/rbac/v1"
3941
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
4042
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
4143
apierrors "k8s.io/apimachinery/pkg/api/errors"
4244
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
45+
"k8s.io/apimachinery/pkg/labels"
4346
"k8s.io/apimachinery/pkg/runtime"
4447
"k8s.io/apimachinery/pkg/types"
4548
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -49,12 +52,14 @@ import (
4952
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
5053
_ "k8s.io/client-go/plugin/pkg/client/auth"
5154
"k8s.io/client-go/rest"
52-
"k8s.io/client-go/tools/cache"
55+
clientcache "k8s.io/client-go/tools/cache"
5356
retrywatch "k8s.io/client-go/tools/watch"
5457
configv1alpha1 "k8s.io/component-base/config/v1alpha1"
5558
"k8s.io/klog/v2"
5659
"k8s.io/utils/ptr"
5760
ctrl "sigs.k8s.io/controller-runtime"
61+
"sigs.k8s.io/controller-runtime/pkg/cache"
62+
"sigs.k8s.io/controller-runtime/pkg/client"
5863
"sigs.k8s.io/controller-runtime/pkg/healthz"
5964
"sigs.k8s.io/controller-runtime/pkg/log/zap"
6065
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -166,6 +171,38 @@ func main() {
166171
kubeConfig.QPS = ptr.Deref(cfg.ClientConnection.QPS, rest.DefaultQPS)
167172
setupLog.V(2).Info("REST client", "qps", kubeConfig.QPS, "burst", kubeConfig.Burst)
168173

174+
selector, err := labels.Parse(controllers.RayClusterNameLabel)
175+
exitOnError(err, "unable to parse label selector")
176+
177+
cacheOpts := cache.Options{
178+
ByObject: map[client.Object]cache.ByObject{
179+
&corev1.Secret{}: {
180+
Label: selector,
181+
},
182+
&corev1.Service{}: {
183+
Label: selector,
184+
},
185+
&corev1.ServiceAccount{}: {
186+
Label: selector,
187+
},
188+
&networkingv1.Ingress{}: {
189+
Label: selector,
190+
},
191+
&networkingv1.NetworkPolicy{}: {
192+
Label: selector,
193+
},
194+
&rbacv1.ClusterRoleBinding{}: {
195+
Label: selector,
196+
},
197+
},
198+
}
199+
200+
if isOpenShift(ctx, kubeClient.DiscoveryClient) {
201+
cacheOpts.ByObject[&routev1.Route{}] = cache.ByObject{
202+
Label: selector,
203+
}
204+
}
205+
169206
mgr, err := ctrl.NewManager(kubeConfig, ctrl.Options{
170207
Scheme: scheme,
171208
Metrics: metricsserver.Options{
@@ -179,6 +216,7 @@ func main() {
179216
LeaseDuration: &cfg.LeaderElection.LeaseDuration.Duration,
180217
RetryPeriod: &cfg.LeaderElection.RetryPeriod.Duration,
181218
RenewDeadline: &cfg.LeaderElection.RenewDeadline.Duration,
219+
Cache: cacheOpts,
182220
})
183221
exitOnError(err, "unable to create manager")
184222

@@ -451,7 +489,7 @@ func waitForAPI(ctx context.Context, mgr ctrl.Manager, apiName string, action fu
451489

452490
// Wait for the API to become available then invoke action
453491
setupLog.Info(fmt.Sprintf("API %v not available, setting up retry watcher", apiName))
454-
retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{
492+
retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &clientcache.ListWatch{
455493
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
456494
return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
457495
},

pkg/controllers/raycluster_controller.go

+57-24
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
3535

3636
corev1 "k8s.io/api/core/v1"
37+
networkingv1 "k8s.io/api/networking/v1"
3738
rbacv1 "k8s.io/api/rbac/v1"
3839
"k8s.io/apimachinery/pkg/api/errors"
3940
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -48,6 +49,8 @@ import (
4849
ctrl "sigs.k8s.io/controller-runtime"
4950
"sigs.k8s.io/controller-runtime/pkg/client"
5051
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
52+
"sigs.k8s.io/controller-runtime/pkg/handler"
53+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
5154

5255
routev1 "github.com/openshift/api/route/v1"
5356
routev1ac "github.com/openshift/client-go/route/applyconfigurations/route/v1"
@@ -78,6 +81,8 @@ const (
7881

7982
CAPrivateKeyKey = "ca.key"
8083
CACertKey = "ca.crt"
84+
85+
RayClusterNameLabel = "ray.openshift.ai/cluster-name"
8186
)
8287

8388
var (
@@ -88,16 +93,16 @@ var (
8893
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters,verbs=get;list;watch;create;update;patch;delete
8994
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters/status,verbs=get;update;patch
9095
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters/finalizers,verbs=update
91-
// +kubebuilder:rbac:groups=route.openshift.io,resources=routes;routes/custom-host,verbs=get;create;update;patch;delete
92-
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;create;update;patch;delete
93-
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;create;patch;delete;get
94-
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;create;update;patch;delete
95-
// +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;create;update;patch;delete
96-
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings,verbs=get;create;update;patch;delete
96+
// +kubebuilder:rbac:groups=route.openshift.io,resources=routes;routes/custom-host,verbs=get;list;create;update;patch;delete;watch
97+
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;create;update;patch;delete;watch
98+
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;create;patch;delete;get;watch
99+
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;create;update;patch;delete;watch
100+
// +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;create;update;patch;delete;watch
101+
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings,verbs=get;list;create;update;patch;delete;watch
97102
// +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create;
98103
// +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create;
99104
// +kubebuilder:rbac:groups=dscinitialization.opendatahub.io,resources=dscinitializations,verbs=get;list;watch
100-
// +kubebuilder:rbac:groups=networking.k8s.io,resources=networkpolicies,verbs=get;create;update;patch;delete
105+
// +kubebuilder:rbac:groups=networking.k8s.io,resources=networkpolicies,verbs=get;list;create;update;patch;delete;watch
101106

102107
// Reconcile is part of the main kubernetes reconciliation loop which aims to
103108
// move the current state of the cluster closer to the desired state.
@@ -301,7 +306,7 @@ func crbNameFromCluster(cluster *rayv1.RayCluster) string {
301306
func desiredOAuthClusterRoleBinding(cluster *rayv1.RayCluster) *rbacv1ac.ClusterRoleBindingApplyConfiguration {
302307
return rbacv1ac.ClusterRoleBinding(
303308
crbNameFromCluster(cluster)).
304-
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
309+
WithLabels(map[string]string{RayClusterNameLabel: cluster.Name, "ray.openshift.ai/cluster-namespace": cluster.Namespace}).
305310
WithSubjects(
306311
rbacv1ac.Subject().
307312
WithKind("ServiceAccount").
@@ -322,14 +327,14 @@ func oauthServiceAccountNameFromCluster(cluster *rayv1.RayCluster) string {
322327

323328
func desiredServiceAccount(cluster *rayv1.RayCluster) *corev1ac.ServiceAccountApplyConfiguration {
324329
return corev1ac.ServiceAccount(oauthServiceAccountNameFromCluster(cluster), cluster.Namespace).
325-
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
330+
WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}).
326331
WithAnnotations(map[string]string{
327332
"serviceaccounts.openshift.io/oauth-redirectreference.first": "" +
328333
`{"kind":"OAuthRedirectReference","apiVersion":"v1",` +
329334
`"reference":{"kind":"Route","name":"` + dashboardNameFromCluster(cluster) + `"}}`,
330335
}).
331336
WithOwnerReferences(
332-
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
337+
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion).WithController(true),
333338
)
334339
}
335340

@@ -343,7 +348,7 @@ func rayClientNameFromCluster(cluster *rayv1.RayCluster) string {
343348

344349
func desiredClusterRoute(cluster *rayv1.RayCluster) *routev1ac.RouteApplyConfiguration {
345350
return routev1ac.Route(dashboardNameFromCluster(cluster), cluster.Namespace).
346-
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
351+
WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}).
347352
WithSpec(routev1ac.RouteSpec().
348353
WithTo(routev1ac.RouteTargetReference().WithKind("Service").WithName(oauthServiceNameFromCluster(cluster))).
349354
WithPort(routev1ac.RoutePort().WithTargetPort(intstr.FromString((oAuthServicePortName)))).
@@ -353,7 +358,7 @@ func desiredClusterRoute(cluster *rayv1.RayCluster) *routev1ac.RouteApplyConfigu
353358
),
354359
).
355360
WithOwnerReferences(
356-
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
361+
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion).WithController(true),
357362
)
358363
}
359364

@@ -367,7 +372,7 @@ func oauthServiceTLSSecretName(cluster *rayv1.RayCluster) string {
367372

368373
func desiredOAuthService(cluster *rayv1.RayCluster) *corev1ac.ServiceApplyConfiguration {
369374
return corev1ac.Service(oauthServiceNameFromCluster(cluster), cluster.Namespace).
370-
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
375+
WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}).
371376
WithAnnotations(map[string]string{"service.beta.openshift.io/serving-cert-secret-name": oauthServiceTLSSecretName(cluster)}).
372377
WithSpec(
373378
corev1ac.ServiceSpec().
@@ -381,7 +386,7 @@ func desiredOAuthService(cluster *rayv1.RayCluster) *corev1ac.ServiceApplyConfig
381386
WithSelector(map[string]string{"ray.io/cluster": cluster.Name, "ray.io/node-type": "head"}),
382387
).
383388
WithOwnerReferences(
384-
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
389+
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion).WithController(true),
385390
)
386391
}
387392

@@ -397,10 +402,10 @@ func desiredOAuthSecret(cluster *rayv1.RayCluster, cookieSalt string) *corev1ac.
397402
cookieSecret := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
398403

399404
return corev1ac.Secret(oauthSecretNameFromCluster(cluster), cluster.Namespace).
400-
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
405+
WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}).
401406
WithStringData(map[string]string{"cookie_secret": cookieSecret}).
402407
WithOwnerReferences(
403-
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
408+
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion).WithController(true),
404409
)
405410
}
406411

@@ -410,7 +415,7 @@ func caSecretNameFromCluster(cluster *rayv1.RayCluster) string {
410415

411416
func desiredCASecret(cluster *rayv1.RayCluster, key, cert []byte) *corev1ac.SecretApplyConfiguration {
412417
return corev1ac.Secret(caSecretNameFromCluster(cluster), cluster.Namespace).
413-
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
418+
WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}).
414419
WithData(map[string][]byte{
415420
CAPrivateKeyKey: key,
416421
CACertKey: cert,
@@ -419,7 +424,8 @@ func desiredCASecret(cluster *rayv1.RayCluster, key, cert []byte) *corev1ac.Secr
419424
WithUID(cluster.UID).
420425
WithName(cluster.Name).
421426
WithKind(cluster.Kind).
422-
WithAPIVersion(cluster.APIVersion))
427+
WithAPIVersion(cluster.APIVersion).
428+
WithController(true))
423429
}
424430

425431
func generateCACertificate() ([]byte, []byte, error) {
@@ -466,7 +472,7 @@ func generateCACertificate() ([]byte, []byte, error) {
466472
}
467473
func desiredWorkersNetworkPolicy(cluster *rayv1.RayCluster) *networkingv1ac.NetworkPolicyApplyConfiguration {
468474
return networkingv1ac.NetworkPolicy(cluster.Name+"-workers", cluster.Namespace).
469-
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
475+
WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}).
470476
WithSpec(networkingv1ac.NetworkPolicySpec().
471477
WithPodSelector(metav1ac.LabelSelector().WithMatchLabels(map[string]string{"ray.io/cluster": cluster.Name, "ray.io/node-type": "worker"})).
472478
WithIngress(
@@ -477,7 +483,7 @@ func desiredWorkersNetworkPolicy(cluster *rayv1.RayCluster) *networkingv1ac.Netw
477483
),
478484
).
479485
WithOwnerReferences(
480-
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
486+
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion).WithController(true),
481487
)
482488
}
483489
func desiredHeadNetworkPolicy(cluster *rayv1.RayCluster, cfg *config.KubeRayConfiguration, kubeRayNamespaces []string) *networkingv1ac.NetworkPolicyApplyConfiguration {
@@ -488,7 +494,7 @@ func desiredHeadNetworkPolicy(cluster *rayv1.RayCluster, cfg *config.KubeRayConf
488494
allSecuredPorts = append(allSecuredPorts, networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt(10001)))
489495
}
490496
return networkingv1ac.NetworkPolicy(cluster.Name+"-head", cluster.Namespace).
491-
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
497+
WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}).
492498
WithSpec(networkingv1ac.NetworkPolicySpec().
493499
WithPodSelector(metav1ac.LabelSelector().WithMatchLabels(map[string]string{"ray.io/cluster": cluster.Name, "ray.io/node-type": "head"})).
494500
WithIngress(
@@ -534,7 +540,7 @@ func desiredHeadNetworkPolicy(cluster *rayv1.RayCluster, cfg *config.KubeRayConf
534540
),
535541
).
536542
WithOwnerReferences(
537-
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
543+
metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion).WithController(true),
538544
)
539545
}
540546

@@ -548,8 +554,35 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
548554
return err
549555
}
550556
r.CookieSalt = string(b)
551-
return ctrl.NewControllerManagedBy(mgr).
557+
// despite ownership, we need to check for labels because we can't use
558+
controller := ctrl.NewControllerManagedBy(mgr).
552559
Named(controllerName).
553560
For(&rayv1.RayCluster{}).
554-
Complete(r)
561+
Owns(&corev1.ServiceAccount{}).
562+
Owns(&corev1.Service{}).
563+
Owns(&corev1.Secret{}).
564+
Owns(&networkingv1.Ingress{}).
565+
Owns(&networkingv1.NetworkPolicy{}).
566+
Watches(&rbacv1.ClusterRoleBinding{}, handler.EnqueueRequestsFromMapFunc(
567+
func(c context.Context, o client.Object) []reconcile.Request {
568+
name, ok := o.GetLabels()[RayClusterNameLabel]
569+
if !ok {
570+
return []reconcile.Request{}
571+
}
572+
namespace, ok := o.GetLabels()["ray.openshift.ai/cluster-namespace"]
573+
if !ok {
574+
return []reconcile.Request{}
575+
}
576+
return []reconcile.Request{{
577+
NamespacedName: client.ObjectKey{
578+
Name: name,
579+
Namespace: namespace,
580+
}}}
581+
}),
582+
)
583+
if r.IsOpenShift {
584+
controller.Owns(&routev1.Route{})
585+
}
586+
587+
return controller.Complete(r)
555588
}

0 commit comments

Comments
 (0)