diff --git a/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission.go b/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission.go index d64967387ae..b725068557a 100644 --- a/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission.go +++ b/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission.go @@ -20,19 +20,24 @@ import ( "context" "fmt" "io" + "strings" "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" - "k8s.io/apimachinery/pkg/labels" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/kubernetes/pkg/controlplane/apiserver" + "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" "github.com/kcp-dev/kcp/pkg/reconciler/apis/apibinding" - apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" - apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1" + corev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/core/v1alpha1" ) const ( @@ -43,7 +48,9 @@ func Register(plugins *admission.Plugins) { plugins.Register(PluginName, func(_ io.Reader) (admission.Interface, error) { return &crdNoOverlappingGVRAdmission{ - Handler: admission.NewHandler(admission.Create), + Handler: admission.NewHandler(admission.Create), + lockResourcesPartially: apibinding.LockResourcesPartially, + now: metav1.Now, }, nil }) } @@ -51,7 +58,12 @@ func Register(plugins *admission.Plugins) { type crdNoOverlappingGVRAdmission struct { *admission.Handler - apiBindingClusterLister apisv1alpha1listers.APIBindingClusterLister + lockResourcesPartially func(ctx context.Context, kcpClusterCLient kcpclientset.ClusterInterface, crds []*apiextensionsv1.CustomResourceDefinition, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding apibinding.ExpirableLock) ([]schema.GroupResource, map[schema.GroupResource]apibinding.Lock, error) + + logicalclusterLister corev1alpha1listers.LogicalClusterClusterLister + kcpClusterClient kcpclientset.ClusterInterface + + now func() metav1.Time } // Ensure that the required admission interfaces are implemented. @@ -60,12 +72,19 @@ var _ = admission.InitializationValidator(&crdNoOverlappingGVRAdmission{}) func (p *crdNoOverlappingGVRAdmission) SetKcpInformers(local, global kcpinformers.SharedInformerFactory) { p.SetReadyFunc(local.Apis().V1alpha1().APIBindings().Informer().HasSynced) - p.apiBindingClusterLister = local.Apis().V1alpha1().APIBindings().Lister() + p.logicalclusterLister = local.Core().V1alpha1().LogicalClusters().Lister() +} + +func (p *crdNoOverlappingGVRAdmission) SetKcpClusterClient(c kcpclientset.ClusterInterface) { + p.kcpClusterClient = c } func (p *crdNoOverlappingGVRAdmission) ValidateInitialization() error { - if p.apiBindingClusterLister == nil { - return fmt.Errorf(PluginName + " plugin needs an APIBindings lister") + if p.logicalclusterLister == nil { + return fmt.Errorf(PluginName + " plugin needs an LogicalCluster lister") + } + if p.kcpClusterClient == nil { + return fmt.Errorf(PluginName + " plugin needs a KCP cluster client") } return nil } @@ -78,13 +97,17 @@ func (p *crdNoOverlappingGVRAdmission) Validate(ctx context.Context, a admission if a.GetKind().GroupKind() != apiextensions.Kind("CustomResourceDefinition") { return nil } - cluster, err := request.ClusterNameFrom(ctx) + if a.GetOperation() != admission.Create { + return nil + } + + clusterName, err := request.ClusterNameFrom(ctx) if err != nil { return fmt.Errorf("failed to retrieve cluster from context: %w", err) } - clusterName := logicalcluster.Name(cluster.String()) // TODO(sttts): remove this cast once ClusterNameFrom returns a tenancy.Name - // ignore CRDs targeting system and non-root workspaces - if clusterName == apibinding.SystemBoundCRDsClusterName || clusterName == apiserver.LocalAdminCluster { + + // ignore CRDs targeting system logical clusters. + if strings.HasPrefix(string(clusterName), "system:") { return nil } @@ -92,21 +115,33 @@ func (p *crdNoOverlappingGVRAdmission) Validate(ctx context.Context, a admission if !ok { return fmt.Errorf("unexpected type %T", a.GetObject()) } - apiBindingsForCurrentClusterName, err := p.listAPIBindingsFor(clusterName) + + lc, err := p.logicalclusterLister.Cluster(clusterName).Get(corev1alpha1.LogicalClusterName) if err != nil { + return fmt.Errorf("failed to get LogicalCluster in logical cluster %q: %w", clusterName, err) + } + + // (optimistically) lock group resource for LogicalCluster. If this request + // eventually fails, the logicalclustercleanup controller will clean them + // up eventually. + gr := schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural} + var locked []schema.GroupResource + var skipped map[schema.GroupResource]apibinding.Lock + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + var err error + locked, skipped, err = p.lockResourcesPartially(ctx, p.kcpClusterClient, nil, lc, []schema.GroupResource{gr}, apibinding.ExpirableLock{ + Lock: apibinding.Lock{CRD: true}, + CRDExpiry: ptr.To(p.now()), + }) return err + }) + if err != nil { + return fmt.Errorf("failed to lock resources %s in logical cluster %q: %w", gr, logicalcluster.From(lc), err) } - for _, apiBindingForCurrentClusterName := range apiBindingsForCurrentClusterName { - for _, boundResource := range apiBindingForCurrentClusterName.Status.BoundResources { - if boundResource.Group == crd.Spec.Group && boundResource.Resource == crd.Spec.Names.Plural { - return admission.NewForbidden(a, fmt.Errorf("cannot create %q CustomResourceDefinition with %q group and %q resource because it overlaps with a bound CustomResourceDefinition for %q APIBinding in %q logical cluster", - crd.Name, crd.Spec.Group, crd.Spec.Names.Plural, apiBindingForCurrentClusterName.Name, clusterName)) - } - } + if len(locked) == 0 { + return admission.NewForbidden(a, fmt.Errorf("cannot create CustomResourceDefinition %q because it overlaps with a bound CustomResourceDefinition for %v APIBinding in %q logical cluster", + crd.Name, skipped[gr], clusterName)) } - return nil -} -func (p *crdNoOverlappingGVRAdmission) listAPIBindingsFor(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) { - return p.apiBindingClusterLister.Cluster(clusterName).List(labels.Everything()) + return nil } diff --git a/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission_test.go b/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission_test.go index 8a4567340a9..613eee7d1db 100644 --- a/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission_test.go +++ b/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission_test.go @@ -18,31 +18,46 @@ package crdnooverlappinggvr import ( "context" + "encoding/json" "testing" + "time" kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/tools/cache" + "k8s.io/utils/ptr" + "github.com/kcp-dev/kcp/pkg/reconciler/apis/apibinding" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" - apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" + corev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/core/v1alpha1" ) +var mockNow = func() metav1.Time { + ts, _ := time.Parse(time.RFC3339, "2022-01-01T00:00:00Z") + return metav1.Time{Time: ts} +} + func TestValidate(t *testing.T) { scenarios := []struct { - name string - attr admission.Attributes - clusterName logicalcluster.Name - initialObjects []runtime.Object - wantErr bool + name string + attr admission.Attributes + clusterName logicalcluster.Name + initialObjects []runtime.Object + logicalClusterUpdateConflict bool + wantErr bool + wantAnnotation string }{ { name: "creating a conflicting CRD is forbidden", @@ -54,8 +69,33 @@ func TestValidate(t *testing.T) { }, }), clusterName: "root:acme", - initialObjects: []runtime.Object{createBinding("foo1", "root:acme", []apisv1alpha1.BoundAPIResource{{Group: "acme.dev", Resource: "foo"}})}, + initialObjects: []runtime.Object{withBinding(createLogicalCluster("root:acme"), "foo1", []apisv1alpha1.BoundAPIResource{{Group: "acme.dev", Resource: "foo"}})}, wantErr: true, + wantAnnotation: `{"foo.acme.dev":{"n":"foo1"}}`, + }, + { + name: "updating a CRD is always allowed", + attr: updateAttr(&apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "acme.dev", + Names: apiextensions.CustomResourceDefinitionNames{Plural: "foo"}, + }, + }), + clusterName: "root:acme", + initialObjects: []runtime.Object{withBinding(createLogicalCluster("root:acme"), "foo1", []apisv1alpha1.BoundAPIResource{{Group: "acme.dev", Resource: "foo"}})}, + }, + { + name: "deleting a CRD is always allowed", + attr: deleteAttr(&apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "acme.dev", + Names: apiextensions.CustomResourceDefinitionNames{Plural: "foo"}, + }, + }), + clusterName: "root:acme", + initialObjects: []runtime.Object{withBinding(createLogicalCluster("root:acme"), "foo1", []apisv1alpha1.BoundAPIResource{{Group: "acme.dev", Resource: "foo"}})}, }, { @@ -68,7 +108,7 @@ func TestValidate(t *testing.T) { }, }), clusterName: "system:bound-crds", - initialObjects: []runtime.Object{createBinding("foo1", "root:acme", []apisv1alpha1.BoundAPIResource{{Group: "acme.dev", Resource: "foo"}})}, + initialObjects: []runtime.Object{withBinding(createLogicalCluster("root:acme"), "foo1", []apisv1alpha1.BoundAPIResource{{Group: "acme.dev", Resource: "foo"}})}, }, { @@ -81,7 +121,65 @@ func TestValidate(t *testing.T) { }, }), clusterName: "root:acme", - initialObjects: []runtime.Object{createBinding("foo1", "root:acme", []apisv1alpha1.BoundAPIResource{{Group: "acme.dev", Resource: "bar"}})}, + initialObjects: []runtime.Object{withBinding(createLogicalCluster("root:acme"), "foo1", []apisv1alpha1.BoundAPIResource{{Group: "acme.dev", Resource: "bar"}})}, + wantAnnotation: `{"bar.acme.dev":{"n":"foo1"},"foo.acme.dev":{"c":true,"e":"2022-01-01T00:00:00Z"}}`, + }, + + { + name: "logical cluster update conflict fails the request", + attr: createAttr(&apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "acme.dev", + Names: apiextensions.CustomResourceDefinitionNames{Plural: "foo"}, + }, + }), + clusterName: "root:acme", + initialObjects: []runtime.Object{withBinding(createLogicalCluster("root:acme"), "foo1", []apisv1alpha1.BoundAPIResource{{Group: "acme.dev", Resource: "bar"}})}, + logicalClusterUpdateConflict: true, + wantErr: true, + }, + + { + name: "fails without resource binding annotation on LogicalCluster", + attr: createAttr(&apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "acme.dev", + Names: apiextensions.CustomResourceDefinitionNames{Plural: "foo"}, + }, + }), + clusterName: "root:acme", + initialObjects: []runtime.Object{createLogicalCluster("root:acme")}, + wantErr: true, + }, + + { + name: "succeeds with other CRD", + attr: createAttr(&apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "acme.dev", + Names: apiextensions.CustomResourceDefinitionNames{Plural: "foo"}, + }, + }), + clusterName: "root:acme", + initialObjects: []runtime.Object{withCRD(createLogicalCluster("root:acme"), schema.GroupResource{Group: "acme.dev", Resource: "bar"}, ptr.To(metav1.Time{Time: mockNow().Add(-time.Minute)}))}, + wantAnnotation: `{"bar.acme.dev":{"c":true,"e":"2021-12-31T23:59:00Z"},"foo.acme.dev":{"c":true,"e":"2022-01-01T00:00:00Z"}}`, + }, + + { + name: "falls through with same CRD", + attr: createAttr(&apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "acme.dev", + Names: apiextensions.CustomResourceDefinitionNames{Plural: "foo"}, + }, + }), + clusterName: "root:acme", + initialObjects: []runtime.Object{withCRD(createLogicalCluster("root:acme"), schema.GroupResource{Group: "acme.dev", Resource: "foo"}, ptr.To(metav1.Time{Time: mockNow().Add(-time.Minute)}))}, + wantAnnotation: `{"foo.acme.dev":{"c":true,"e":"2022-01-01T00:00:00Z"}}`, }, } for _, scenario := range scenarios { @@ -93,11 +191,37 @@ func TestValidate(t *testing.T) { } } - a := &crdNoOverlappingGVRAdmission{Handler: admission.NewHandler(admission.Create, admission.Update), apiBindingClusterLister: apisv1alpha1listers.NewAPIBindingClusterLister(indexer)} + var updatedLogicalCluster *corev1alpha1.LogicalCluster + a := &crdNoOverlappingGVRAdmission{ + Handler: admission.NewHandler(admission.Create, admission.Update), + logicalclusterLister: corev1alpha1listers.NewLogicalClusterClusterLister(indexer), + lockResourcesPartially: func(_ context.Context, _ kcpclientset.ClusterInterface, crds []*apiextensionsv1.CustomResourceDefinition, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding apibinding.ExpirableLock) ([]schema.GroupResource, map[schema.GroupResource]apibinding.Lock, error) { + lc, locked, skipped, err := apibinding.WithLockedResources(crds, lc, grs, binding) + updatedLogicalCluster = lc + return locked, skipped, err + }, + now: mockNow, + } + + if scenario.logicalClusterUpdateConflict { + a.lockResourcesPartially = func(_ context.Context, _ kcpclientset.ClusterInterface, _ []*apiextensionsv1.CustomResourceDefinition, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding apibinding.ExpirableLock) ([]schema.GroupResource, map[schema.GroupResource]apibinding.Lock, error) { + return nil, nil, kerrors.NewConflict(schema.GroupResource{}, "conflict", nil) + } + } + ctx := request.WithCluster(context.Background(), request.Cluster{Name: scenario.clusterName}) if err := a.Validate(ctx, scenario.attr, nil); (err != nil) != scenario.wantErr { t.Fatalf("Validate() error = %v, wantErr %v", err, scenario.wantErr) } + + if scenario.wantAnnotation != "" { + if updatedLogicalCluster == nil { + t.Fatal("expected LogicalCluster to be updated, got nil") + } + if got := updatedLogicalCluster.Annotations[apibinding.ResourceBindingsAnnotationKey]; got != scenario.wantAnnotation { + t.Errorf("expected LogicalCluster annotation %q, got %q", scenario.wantAnnotation, got) + } + } }) } } @@ -118,14 +242,82 @@ func createAttr(obj *apiextensions.CustomResourceDefinition) admission.Attribute ) } -func createBinding(name string, clusterName string, boundResources []apisv1alpha1.BoundAPIResource) *apisv1alpha1.APIBinding { - return &apisv1alpha1.APIBinding{ +func updateAttr(obj *apiextensions.CustomResourceDefinition) admission.Attributes { + return admission.NewAttributesRecord( + obj, + obj, + apiextensionsv1.Kind("CustomResourceDefinition").WithVersion("v1"), + "", + "test", + apiextensionsv1.Resource("customresourcedefinitions").WithVersion("v1"), + "", + admission.Update, + &metav1.CreateOptions{}, + false, + &user.DefaultInfo{}, + ) +} + +func deleteAttr(obj *apiextensions.CustomResourceDefinition) admission.Attributes { + return admission.NewAttributesRecord( + obj, + nil, + apiextensionsv1.Kind("CustomResourceDefinition").WithVersion("v1"), + "", + "test", + apiextensionsv1.Resource("customresourcedefinitions").WithVersion("v1"), + "", + admission.Delete, + &metav1.CreateOptions{}, + false, + &user.DefaultInfo{}, + ) +} + +func createLogicalCluster(clusterName string) *corev1alpha1.LogicalCluster { + return &corev1alpha1.LogicalCluster{ ObjectMeta: metav1.ObjectMeta{ + Name: corev1alpha1.LogicalClusterName, Annotations: map[string]string{ logicalcluster.AnnotationKey: clusterName, }, - Name: name, }, - Status: apisv1alpha1.APIBindingStatus{BoundResources: boundResources}, } } + +func withCRD(lc *corev1alpha1.LogicalCluster, gr schema.GroupResource, expiry *metav1.Time) *corev1alpha1.LogicalCluster { + rbs := make(apibinding.ResourceBindingsAnnotation) + if v := lc.Annotations[apibinding.ResourceBindingsAnnotationKey]; v != "" { + if err := json.Unmarshal([]byte(v), &rbs); err != nil { + panic(err) + } + } + + rbs[gr.String()] = apibinding.ExpirableLock{ + Lock: apibinding.Lock{CRD: true}, + CRDExpiry: expiry, + } + + bs, _ := json.Marshal(rbs) + lc.Annotations[apibinding.ResourceBindingsAnnotationKey] = string(bs) + return lc +} + +func withBinding(lc *corev1alpha1.LogicalCluster, binding string, boundResources []apisv1alpha1.BoundAPIResource) *corev1alpha1.LogicalCluster { + rbs := make(apibinding.ResourceBindingsAnnotation) + if v := lc.Annotations[apibinding.ResourceBindingsAnnotationKey]; v != "" { + if err := json.Unmarshal([]byte(v), &rbs); err != nil { + panic(err) + } + } + + for _, br := range boundResources { + rbs[schema.GroupResource{Group: br.Group, Resource: br.Resource}.String()] = apibinding.ExpirableLock{ + Lock: apibinding.Lock{Name: binding}, + } + } + + bs, _ := json.Marshal(rbs) + lc.Annotations[apibinding.ResourceBindingsAnnotationKey] = string(bs) + return lc +} diff --git a/pkg/reconciler/apis/apibinding/apibinding_controller.go b/pkg/reconciler/apis/apibinding/apibinding_controller.go index 37aa3ae7e49..08d0093351c 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_controller.go +++ b/pkg/reconciler/apis/apibinding/apibinding_controller.go @@ -48,9 +48,11 @@ import ( "github.com/kcp-dev/kcp/pkg/reconciler/events" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" "github.com/kcp-dev/kcp/sdk/apis/core" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" apisv1alpha1client "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/typed/apis/v1alpha1" apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" + corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1" ) @@ -70,6 +72,7 @@ func NewController( apiExportInformer apisv1alpha1informers.APIExportClusterInformer, apiResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer, apiConversionInformer apisv1alpha1informers.APIConversionClusterInformer, + logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer, globalAPIResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer, globalAPIConversionInformer apisv1alpha1informers.APIConversionClusterInformer, @@ -146,6 +149,9 @@ func NewController( listCRDs: func(clusterName logicalcluster.Name) ([]*apiextensionsv1.CustomResourceDefinition, error) { return crdInformer.Lister().Cluster(clusterName).List(labels.Everything()) }, + getLogicalCluster: func(name logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) { + return logicalClusterInformer.Lister().Cluster(name).Get(corev1alpha1.LogicalClusterName) + }, deletedCRDTracker: newLockedStringSet(), commit: committer.NewCommitter[*APIBinding, Patcher, *APIBindingSpec, *APIBindingStatus](kcpClusterClient.ApisV1alpha1().APIBindings()), } @@ -223,6 +229,20 @@ func NewController( }, })) + // LogicalCluster handlers + _, _ = logicalClusterInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueLogicalCluster(objOrTombstone[*corev1alpha1.LogicalCluster](obj), logger, "") + }, + UpdateFunc: func(old, obj interface{}) { + was := old.(*corev1alpha1.LogicalCluster).Annotations[ResourceBindingsAnnotationKey] + is := obj.(*corev1alpha1.LogicalCluster).Annotations[ResourceBindingsAnnotationKey] + if was != is { + c.enqueueLogicalCluster(objOrTombstone[*corev1alpha1.LogicalCluster](obj), logger, "") + } + }, + })) + // APIConversion handlers _, _ = apiConversionInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -285,6 +305,8 @@ type controller struct { getCRD func(clusterName logicalcluster.Name, name string) (*apiextensionsv1.CustomResourceDefinition, error) listCRDs func(clusterName logicalcluster.Name) ([]*apiextensionsv1.CustomResourceDefinition, error) + getLogicalCluster func(logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) + deletedCRDTracker *lockedStringSet commit CommitFunc } @@ -353,6 +375,18 @@ func (c *controller) enqueueAPIResourceSchema(schema *apisv1alpha1.APIResourceSc } } +// enqueueLogicalCluster maps LogicalClusters to APIBindings for enqueuing. +func (c *controller) enqueueLogicalCluster(lc *corev1alpha1.LogicalCluster, logger logr.Logger, logSuffix string) { + bindings, err := c.listAPIBindings(logicalcluster.From(lc)) + if err != nil { + utilruntime.HandleError(err) + return + } + for _, binding := range bindings { + c.enqueueAPIBinding(binding, logging.WithObject(logger, binding), fmt.Sprintf(" because of LogicalCluster%s", logSuffix)) + } +} + func (c *controller) enqueueAPIConversion(apiConversion *apisv1alpha1.APIConversion, logger logr.Logger) { logger = logging.WithObject(logger, apiConversion) diff --git a/pkg/reconciler/apis/apibinding/apibinding_reconcile.go b/pkg/reconciler/apis/apibinding/apibinding_reconcile.go index aebee2cc81c..2331a0c35dd 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_reconcile.go +++ b/pkg/reconciler/apis/apibinding/apibinding_reconcile.go @@ -31,6 +31,7 @@ import ( "k8s.io/apiextensions-apiserver/pkg/apiserver" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" utilserrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -140,10 +141,10 @@ type bindingReconciler struct { func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alpha1.APIBinding) (reconcileStatus, error) { logger := klog.FromContext(ctx) - // Check for valid reference + // Check for valid APIExport reference workspaceRef := apiBinding.Spec.Reference.Export if workspaceRef == nil { - // this should not happen because of OpenAPI + // this should not happen because of validation. conditions.MarkFalse( apiBinding, apisv1alpha1.APIExportValid, @@ -153,8 +154,6 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp ) return reconcileStatusContinue, nil } - - // Get APIExport apiExportPath := logicalcluster.NewPath(apiBinding.Spec.Reference.Export.Path) if apiExportPath.Empty() { apiExportPath = logicalcluster.From(apiBinding).Path() @@ -185,7 +184,6 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp ) return reconcileStatusContinue, err } - logger = logging.WithObject(logger, apiExport) // Record the export's permission claims @@ -209,19 +207,11 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp // The full path is unreliable for this purpose. apiBinding.Status.APIExportClusterName = logicalcluster.From(apiExport).String() - var needToWaitForRequeueWhenEstablished []string - - checker, err := newConflictChecker(logicalcluster.From(apiBinding), r.listAPIBindings, r.getAPIResourceSchema, r.getCRD, r.listCRDs) - if err != nil { - return reconcileStatusContinue, err - } - - // Process all APIResourceSchemas + // Collect the schemas. + schemas := make(map[string]*apisv1alpha1.APIResourceSchema) + grs := sets.New[schema.GroupResource]() for _, schemaName := range apiExport.Spec.LatestResourceSchemas { - bindingClusterName := logicalcluster.From(apiBinding) - - // Get the schema - schema, err := r.getAPIResourceSchema(logicalcluster.From(apiExport), schemaName) + sch, err := r.getAPIResourceSchema(logicalcluster.From(apiExport), schemaName) if err != nil { logger.Error(err, "error binding") @@ -230,7 +220,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp apisv1alpha1.APIExportValid, apisv1alpha1.InternalErrorReason, conditionsv1alpha1.ConditionSeverityError, - "Invalid APIExport. Please contact the APIExport owner to resolve", + fmt.Sprintf("Invalid APIExport. Please contact the APIExport owner to resolve: APIResourceSchema %q not found", schemaName), ) if apierrors.IsNotFound(err) { @@ -239,9 +229,73 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp return reconcileStatusContinue, err } - logger := logging.WithObject(logger, schema) + schemas[schemaName] = sch + grs = grs.Insert(schema.GroupResource{Group: sch.Spec.Group, Resource: sch.Spec.Names.Plural}) + } - if err := checker.Check(apiBinding, schema); err != nil { + // Look up LogicalCluster which acts as our lock to avoid having multiple bindings + // and/or CRDs owning the same resource. + cl, err := r.getLogicalCluster(logicalcluster.From(apiBinding)) + if err != nil { + conditions.MarkFalse( + apiBinding, + apisv1alpha1.APIBindingValid, + apisv1alpha1.APIBindingLogicalClusterNotFoundReason, + conditionsv1alpha1.ConditionSeverityError, + fmt.Sprintf("LogicalCluster for %q not found", logicalcluster.From(apiBinding)), + ) + + // wait until it shows up. Bindings don't work without. + return reconcileStatusContinue, err + } + + crds, err := r.listCRDs(logicalcluster.From(apiBinding)) + if err != nil { + return reconcileStatusContinue, err + } + + // Lock resources before going any further. Not being able to lock all resources is NOT an error. + // It will be reflected in the BindingUpToDate and InitialBindingCompleted conditions. + _, skipped, err := LockResourcesPartially(ctx, r.kcpClusterClient, crds, cl, grs.UnsortedList(), ExpirableLock{ + Lock: Lock{Name: apiBinding.Name}, + }) + if err != nil { + logger.Error(err, "error locking resources: %v", skipped) + + conditions.MarkFalse( + apiBinding, + apisv1alpha1.APIBindingValid, + apisv1alpha1.InternalErrorReason, + conditionsv1alpha1.ConditionSeverityError, + fmt.Sprintf("Error locking resources: %v", err), + ) + + return reconcileStatusContinue, err + } + if len(skipped) > 0 { + logger.V(4).Info("skipped resources", "resources", skipped) + } + + // When we get here, the binding itself is in a good state. + conditions.MarkTrue(apiBinding, apisv1alpha1.APIBindingValid) + + checker, err := newConflictChecker(logicalcluster.From(apiBinding), r.listAPIBindings, r.getAPIResourceSchema, r.getCRD, r.listCRDs) + if err != nil { + return reconcileStatusContinue, err + } + var needToWaitForRequeueWhenEstablished []string + for _, schemaName := range apiExport.Spec.LatestResourceSchemas { + sch := schemas[schemaName] + logger := logging.WithObject(logger, sch) + + if _, ok := skipped[schema.GroupResource{Group: sch.Spec.Group, Resource: sch.Spec.Names.Plural}]; ok { + // This resource was skipped because it's already locked by another binding. + continue + } + + // A resource will be served if the group resource is locked by this binding AND there are no + // naming conflicts with other bindings or CRDs. The former is critical, the latter is advisory. + if err := checker.Check(apiBinding, sch); err != nil { conditions.MarkFalse( apiBinding, apisv1alpha1.BindingUpToDate, @@ -251,7 +305,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp err, ) - // Only change InitialBindingCompleted if it's false + // Only change InitialBindingCompleted if it's false. if conditions.IsFalse(apiBinding, apisv1alpha1.InitialBindingCompleted) { conditions.MarkFalse( apiBinding, @@ -266,7 +320,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp } // If there are multiple versions, the conversion strategy must be defined in the APIResourceSchema - if len(schema.Spec.Versions) > 1 && schema.Spec.Conversion == nil { + if len(sch.Spec.Versions) > 1 && sch.Spec.Conversion == nil { conditions.MarkFalse( apiBinding, apisv1alpha1.APIExportValid, @@ -278,7 +332,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp return reconcileStatusContinue, fmt.Errorf( "conversion strategy not specified %s|%s for APIBinding %s|%s, APIExport %s|%s, APIResourceSchema %s|%s: %w", apiExportPath, schemaName, - bindingClusterName, apiBinding.Name, + logicalcluster.From(apiBinding), apiBinding.Name, apiExportPath, apiExport.Name, apiExportPath, schemaName, err, @@ -286,7 +340,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp } // Try to get the bound CRD - existingCRD, err := r.getCRD(SystemBoundCRDsClusterName, boundCRDName(schema)) + existingCRD, err := r.getCRD(SystemBoundCRDsClusterName, boundCRDName(sch)) if err != nil && !apierrors.IsNotFound(err) { conditions.MarkFalse( apiBinding, @@ -298,8 +352,8 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp return reconcileStatusContinue, fmt.Errorf( "error getting CRD %s|%s for APIBinding %s|%s, APIExport %s|%s, APIResourceSchema %s|%s: %w", - SystemBoundCRDsClusterName, boundCRDName(schema), - bindingClusterName, apiBinding.Name, + SystemBoundCRDsClusterName, boundCRDName(sch), + logicalcluster.From(apiBinding), apiBinding.Name, apiExportPath, apiExport.Name, apiExportPath, schemaName, err, @@ -319,7 +373,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp } } else { // Need to create bound CRD - crd, err := generateCRD(schema) + crd, err := generateCRD(sch) if err != nil { logger.Error(err, "error generating CRD") @@ -347,7 +401,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp // Create bound CRD logger.V(2).Info("creating CRD") if _, err := r.createCRD(ctx, SystemBoundCRDsClusterName.Path(), crd); err != nil { - schemaClusterName := logicalcluster.From(schema) + schemaClusterName := logicalcluster.From(sch) if apierrors.IsInvalid(err) { status := apierrors.APIStatus(nil) // The error is guaranteed to implement APIStatus here @@ -411,7 +465,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp } for _, b := range apiBinding.Status.BoundResources { - if b.Group == schema.Spec.Group && b.Resource == schema.Spec.Names.Plural { + if b.Group == sch.Spec.Group && b.Resource == sch.Spec.Names.Plural { storageVersions.Insert(b.StorageVersions...) break } @@ -422,11 +476,11 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp // Upsert the BoundAPIResource for this APIResourceSchema newBoundResource := apisv1alpha1.BoundAPIResource{ - Group: schema.Spec.Group, - Resource: schema.Spec.Names.Plural, + Group: sch.Spec.Group, + Resource: sch.Spec.Names.Plural, Schema: apisv1alpha1.BoundAPIResourceSchema{ - Name: schema.Name, - UID: string(schema.UID), + Name: sch.Name, + UID: string(sch.UID), IdentityHash: apiExport.Status.IdentityHash, }, StorageVersions: sortedStorageVersions, @@ -434,7 +488,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp found := false for i, r := range apiBinding.Status.BoundResources { - if r.Group == schema.Spec.Group && r.Resource == schema.Spec.Names.Plural { + if r.Group == sch.Spec.Group && r.Resource == sch.Spec.Names.Plural { apiBinding.Status.BoundResources[i] = newBoundResource found = true break @@ -470,6 +524,25 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp strings.Join(needToWaitForRequeueWhenEstablished, ", "), ) } + } else if len(skipped) > 0 { + conditions.MarkFalse( + apiBinding, + apisv1alpha1.BindingUpToDate, + apisv1alpha1.GroupResourceConflictReason, + conditionsv1alpha1.ConditionSeverityWarning, + "Unable to bind APIs because they are bound by other APIBindings or CRDs: %v", skipped, + ) + + // Only change InitialBindingCompleted if it's false + if conditions.IsFalse(apiBinding, apisv1alpha1.InitialBindingCompleted) { + conditions.MarkFalse( + apiBinding, + apisv1alpha1.InitialBindingCompleted, + apisv1alpha1.GroupResourceConflictReason, + conditionsv1alpha1.ConditionSeverityWarning, + "Unable to bind APIs because they are bound by other APIBindings or CRDs: %v", skipped, + ) + } } else { conditions.MarkTrue(apiBinding, apisv1alpha1.InitialBindingCompleted) conditions.MarkTrue(apiBinding, apisv1alpha1.BindingUpToDate) diff --git a/pkg/reconciler/apis/apibinding/logical_cluster_lock.go b/pkg/reconciler/apis/apibinding/logical_cluster_lock.go new file mode 100644 index 00000000000..87c0b280ae6 --- /dev/null +++ b/pkg/reconciler/apis/apibinding/logical_cluster_lock.go @@ -0,0 +1,180 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apibinding + +import ( + "context" + "fmt" + + "github.com/kcp-dev/logicalcluster/v3" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/json" + + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" +) + +const ( + // ResourceBindingsAnnotationKey is the key for the annotation on the LogicalCluster + // to hold ResourceBindings. + ResourceBindingsAnnotationKey = "apis.kcp.io/resource-bindings" +) + +// Lock is a lock for a resource, part of the apis.kcp.io/resource-bindings annotation. +type Lock struct { + // Name is the name of the APIBinding, or empty. + Name string `json:"n,omitempty"` + // CRD is true if the binding is for a CRD. + CRD bool `json:"c,omitempty"` +} + +// ExpirableLock is a lock that can expire CRD entries. +type ExpirableLock struct { + Lock `json:",inline"` + + // Expiry is an optional timestamp. After that time, the CRD entry is not + // considered valid anymore IF the object cannot be found. + CRDExpiry *metav1.Time `json:"e,omitempty"` +} + +// ResourceBindingsAnnotation is a map of "." to bindings. It +// is stored as a JSON string in the LogicalCluster annotation +// apis.kcp.io/resource-bindings. It serves as a lock for resources +// to prevent races of multiple bindings or CRDs owning the same resource. +type ResourceBindingsAnnotation map[string]ExpirableLock + +// LockResourcesPartially tries to lock the resources for the given binding. It +// returns those resources that got successfully locked. If a resource is already +// locked by another binding, it is skipped and returned in the second return +// value. +// It will update the LogicalCluster with the new binding information IFF the +// set of newly locked resources is not empty. +func LockResourcesPartially(ctx context.Context, kcpClusterCLient kcpclientset.ClusterInterface, crds []*apiextensionsv1.CustomResourceDefinition, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding ExpirableLock) ([]schema.GroupResource, map[schema.GroupResource]Lock, error) { + lc, locked, skipped, err := WithLockedResources(crds, lc, grs, binding) + if err != nil { + return nil, nil, err + } + + _, err = kcpClusterCLient.CoreV1alpha1().LogicalClusters().Cluster(logicalcluster.From(lc).Path()).Update(ctx, lc, metav1.UpdateOptions{}) + if err != nil { + return nil, nil, err + } + + return locked, skipped, nil +} + +// WithLockedResources does everything of LockResourcesPartially except the +// actual update of the LogicalCluster. It returns the updated LogicalCluster +// and the resources that would get successfully locked if the LogicalCluster +// was updated with it. +func WithLockedResources(crds []*apiextensionsv1.CustomResourceDefinition, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding ExpirableLock) (*corev1alpha1.LogicalCluster, []schema.GroupResource, map[schema.GroupResource]Lock, error) { + v, found := lc.Annotations[ResourceBindingsAnnotationKey] + if !found { + return nil, nil, nil, fmt.Errorf("%s annotation not found, migration has to happen first", ResourceBindingsAnnotationKey) + } + + var rbs ResourceBindingsAnnotation + if err := json.Unmarshal([]byte(v), &rbs); err != nil { + return nil, nil, nil, fmt.Errorf("failed to unmarshal ResourceBindings annotation: %w", err) + } + + crdNames := make(map[string]bool, len(crds)) + for _, crd := range crds { + crdNames[crd.Name] = true + } + + // find what resources need to be newly locked + skipped := make(map[schema.GroupResource]Lock) + newlyLocked := make([]schema.GroupResource, 0, len(grs)) + locked := make([]schema.GroupResource, 0, len(grs)) + for _, gr := range grs { + b, found := rbs[gr.String()] + if !found || b.Lock == binding.Lock { + newlyLocked = append(newlyLocked, gr) + locked = append(locked, gr) + continue + } + if b.CRD && !crdNames[gr.String()] && b.CRDExpiry != nil && metav1.Now().After(b.CRDExpiry.Time) { + // CRD binding expired, and CRD is not known + newlyLocked = append(newlyLocked, gr) + locked = append(locked, gr) + continue + } + skipped[gr] = b.Lock + } + + // don't do anything if no resources need to be locked + if len(newlyLocked) == 0 { + return lc, locked, skipped, nil + } + + // update the LogicalCluster with the new binding information + for _, gr := range newlyLocked { + rbs[gr.String()] = binding + } + lc = lc.DeepCopy() + bs, err := json.Marshal(rbs) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) + } + lc.Annotations[ResourceBindingsAnnotationKey] = string(bs) + + return lc, locked, skipped, nil +} + +// unlockResource unlocks the resource for the given binding. It updates the +// LogicalCluster with the new binding information IFF at least one resource +// was unlocked. +func unlockResource(ctx context.Context, kcpClusterCLient kcpclientset.ClusterInterface, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding Lock) error { + v, found := lc.Annotations[ResourceBindingsAnnotationKey] + if !found { + return fmt.Errorf("%s annotation not found, migration has to happen first", ResourceBindingsAnnotationKey) + } + + var rbs ResourceBindingsAnnotation + if err := json.Unmarshal([]byte(v), &rbs); err != nil { + return fmt.Errorf("failed to unmarshal ResourceBindings annotation: %w", err) + } + + unlocked := false + for _, gr := range grs { + if bound, found := rbs[gr.String()]; found && bound.Lock == binding { + delete(rbs, gr.String()) + unlocked = true + } + } + if !unlocked { + return nil + } + + lc = lc.DeepCopy() + bs, err := json.Marshal(rbs) + if err != nil { + return fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) + } + lc.Annotations[ResourceBindingsAnnotationKey] = string(bs) + + _, err = kcpClusterCLient.CoreV1alpha1().LogicalClusters().Cluster(logicalcluster.From(lc).Path()).Update(ctx, lc, metav1.UpdateOptions{}) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go b/pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go index 4effc8aed43..d1754a91ff8 100644 --- a/pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go +++ b/pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go @@ -45,8 +45,7 @@ import ( ) const ( - ControllerName = "kcp-crdcleanup" - DefaultIdentitySecretNamespace = "kcp-system" + ControllerName = "kcp-crdcleanup" AgeThreshold time.Duration = time.Minute * 30 ) diff --git a/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller.go b/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller.go new file mode 100644 index 00000000000..0d0d53211e4 --- /dev/null +++ b/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller.go @@ -0,0 +1,334 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logicalclustercleanup + +import ( + "context" + "fmt" + "time" + + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + kcpapiextensionsv1informers "github.com/kcp-dev/client-go/apiextensions/informers/apiextensions/v1" + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apiextensions-apiserver/pkg/apihelpers" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "github.com/kcp-dev/kcp/pkg/logging" + "github.com/kcp-dev/kcp/pkg/reconciler/apis/apibinding" + "github.com/kcp-dev/kcp/pkg/reconciler/events" + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" + apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" + corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" +) + +const ( + ControllerName = "kcp-logicalclustercleanup" + + AgeThreshold time.Duration = time.Minute * 30 +) + +// NewController returns a new controller for LogicalCluster resource binding annotation cleanup. +func NewController( + kcpClusterClient kcpclientset.ClusterInterface, + logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer, + crdInformer kcpapiextensionsv1informers.CustomResourceDefinitionClusterInformer, + apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer, +) (*controller, error) { + c := &controller{ + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[request](), + workqueue.TypedRateLimitingQueueConfig[request]{ + Name: ControllerName, + }, + ), + kcpClusterClient: kcpClusterClient, + getLogicalCluster: func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) { + return logicalClusterInformer.Lister().Cluster(clusterName).Get(corev1alpha1.LogicalClusterName) + }, + listsCRDs: func(clusterName logicalcluster.Name) ([]*apiextensionsv1.CustomResourceDefinition, error) { + return crdInformer.Lister().Cluster(clusterName).List(labels.Everything()) + }, + listAPIBindings: func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) { + return apiBindingInformer.Lister().Cluster(clusterName).List(labels.Everything()) + }, + getAPIBindings: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIBinding, error) { + return apiBindingInformer.Lister().Cluster(clusterName).Get(name) + }, + } + + _, _ = logicalClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueLogicalCluster(obj.(*corev1alpha1.LogicalCluster)) + }, + UpdateFunc: func(_, obj interface{}) { + c.enqueueLogicalCluster(obj.(*corev1alpha1.LogicalCluster)) + }, + }) + + _, _ = crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueCRD(obj.(*apiextensionsv1.CustomResourceDefinition), false) + }, + DeleteFunc: func(obj interface{}) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + c.enqueueCRD(obj.(*apiextensionsv1.CustomResourceDefinition), true) + }, + }) + + _, _ = apiBindingInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueFromAPIBinding(obj.(*apisv1alpha1.APIBinding)) + }, + DeleteFunc: func(obj interface{}) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + c.enqueueFromAPIBinding(obj.(*apisv1alpha1.APIBinding)) + }, + })) + + return c, nil +} + +type request struct { + clusterName logicalcluster.Name + + // deletedCRD is either empty, or the name of a deleted CRD. When a CRD + // is deleted, we can remove it from the annotation. Otherwise, we leave + // CRDs in the annotation because we are racing with the crdnooverlappinggvr + // admission plugin. It adds the CRD to the annotation BEFORE the CRD is + // actually created. + deletedCRD string +} + +// controller deletes bound CRDs when they are no longer in use by any APIBindings. +type controller struct { + queue workqueue.TypedRateLimitingInterface[request] + + kcpClusterClient kcpclientset.ClusterInterface + + getLogicalCluster func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) + listsCRDs func(clusterName logicalcluster.Name) ([]*apiextensionsv1.CustomResourceDefinition, error) + listAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) + getAPIBindings func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIBinding, error) +} + +// enqueueLogicalCluster enqueues a LogicalCluster. +func (c *controller) enqueueLogicalCluster(lc *corev1alpha1.LogicalCluster) { + if lc.Name != corev1alpha1.LogicalClusterName { + // should not happen. + return + } + + key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(lc) + if err != nil { + runtime.HandleError(err) + return + } + + logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), key) + logger.V(4).Info("queueing LogicalCluster") + c.queue.Add(request{clusterName: logicalcluster.From(lc)}) +} + +// enqueueCRD enqueues a CRD. +func (c *controller) enqueueCRD(crd *apiextensionsv1.CustomResourceDefinition, deleted bool) { + key := kcpcache.ToClusterAwareKey(logicalcluster.From(crd).String(), "", corev1alpha1.LogicalClusterName) + logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), key) + logger.V(4).Info("queueing LogicalCluster because of CRD", "CRD", crd.Name) + + req := request{clusterName: logicalcluster.From(crd)} + if deleted { + req.deletedCRD = crd.Name + } + c.queue.Add(req) +} + +func (c *controller) enqueueFromAPIBinding(binding *apisv1alpha1.APIBinding) { + key := kcpcache.ToClusterAwareKey(logicalcluster.From(binding).String(), "", corev1alpha1.LogicalClusterName) + logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), key) + logger.V(4).Info("queueing LogicalCluster because of APIBinding", "APIBinding", binding.Name) + c.queue.Add(request{clusterName: logicalcluster.From(binding)}) +} + +// Start starts the controller, which stops when ctx.Done() is closed. +func (c *controller) Start(ctx context.Context, numThreads int) { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + + logger := logging.WithReconciler(klog.FromContext(ctx), ControllerName) + ctx = klog.NewContext(ctx, logger) + logger.Info("Starting controller") + defer logger.Info("Shutting down controller") + + for i := 0; i < numThreads; i++ { + go wait.UntilWithContext(ctx, c.startWorker, time.Second) + } + + <-ctx.Done() +} + +func (c *controller) startWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *controller) processNextWorkItem(ctx context.Context) bool { + // Wait until there is a new item in the working queue + k, quit := c.queue.Get() + if quit { + return false + } + key := k + + logger := logging.WithQueueKey(klog.FromContext(ctx), fmt.Sprintf("%v", key)) + ctx = klog.NewContext(ctx, logger) + logger.V(4).Info("processing key") + + // No matter what, tell the queue we're done with this key, to unblock + // other workers. + defer c.queue.Done(key) + + if err := c.process(ctx, key); err != nil { + runtime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", ControllerName, key, err)) + c.queue.AddRateLimited(key) + return true + } + c.queue.Forget(key) + return true +} + +func (c *controller) process(ctx context.Context, req request) error { + logger := klog.FromContext(ctx) + + clusterName := req.clusterName + + lc, err := c.getLogicalCluster(clusterName) + if err != nil { + if errors.IsNotFound(err) { + return nil // object got deleted before we handled it. + } + return err + } + + // decode existing annotation. + var rbs apibinding.ResourceBindingsAnnotation + annValue, found := lc.Annotations[apibinding.ResourceBindingsAnnotationKey] + if found { + if err := json.Unmarshal([]byte(annValue), &rbs); err != nil { + logger.Error(err, "failed to unmarshal ResourceBindings annotation, resetting") + rbs = apibinding.ResourceBindingsAnnotation{} + annValue = "" + } + } + + // get objects + bindings, err := c.listAPIBindings(clusterName) + if err != nil { + return err + } + crds, err := c.listsCRDs(clusterName) + if err != nil { + return err + } + + // migrate or rebuild APIBinding entries from status.boundResources? This happens only once per logical cluster. + // After the initial migration this is done by the apibinding controller. + if annValue == "" { + for _, b := range bindings { + for _, br := range b.Status.BoundResources { + gr := schema.GroupResource{Group: br.Group, Resource: br.Resource} + if _, found := rbs[gr.String()]; !found { + rbs[gr.String()] = b.Name + } else { + logger.Error(fmt.Errorf("resource %s is already bound to %q", gr.String(), rbs[gr.String()]), "skipping in migration") + } + } + } + } else { + // remove bindings that are gone. + bindingNames := make(map[string]bool) + for _, b := range bindings { + bindingNames[b.Name] = true + } + for gr, b := range rbs { + if _, found := bindingNames[b]; !found { + logger.V(4).Info("removing binding", "binding", b, "resource", gr) + delete(rbs, gr) + } + } + } + + // always add CRDs. + crdNames := make(map[string]bool) + for _, crd := range crds { + if !apihelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) { + continue + } + + gr := schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural} + if _, found := rbs[gr.String()]; !found { + rbs[gr.String()] = apibinding.ResourceBindingsCRDValue + } else { + logger.Error(fmt.Errorf("resource %s is already bound to %q", gr.String(), rbs[gr.String()]), "skipping in migration") + } + + crdNames[crd.Name] = true + } + + // delete only if the CRD was actually deleted. + // Note: here we could be more aggressive and do a real client look-up. + // Note: it could be that we build up a list of CRDs, and they never get + // removed from the annotation. But this requires that we miss delete + // events or there is some other admission plugin that prevents creation. + // All this is highly unlikely. We could remedy this by adding some + // timeout to the annotation entry, and then after the max of a request + // duration remove it. + if req.deletedCRD != "" && crdNames[req.deletedCRD] { + logger.V(4).Info("removing CRD binding", "CRD", req.deletedCRD) + delete(rbs, req.deletedCRD) + } + + // update annotation on LogicalCluster. + bs, err := json.Marshal(rbs) + if err != nil { + return fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) + } + lc = lc.DeepCopy() + lc.Annotations[apibinding.ResourceBindingsAnnotationKey] = string(bs) + _, err = c.kcpClusterClient.CoreV1alpha1().Cluster(clusterName.Path()).LogicalClusters().Update(ctx, lc, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update LogicalCluster: %w", err) + } + + return nil +} diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index d1c9bf34492..38a07689907 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -66,6 +66,7 @@ import ( "github.com/kcp-dev/kcp/pkg/reconciler/apis/crdcleanup" "github.com/kcp-dev/kcp/pkg/reconciler/apis/extraannotationsync" "github.com/kcp-dev/kcp/pkg/reconciler/apis/identitycache" + "github.com/kcp-dev/kcp/pkg/reconciler/apis/logicalclustercleanup" "github.com/kcp-dev/kcp/pkg/reconciler/apis/permissionclaimlabel" apisreplicateclusterrole "github.com/kcp-dev/kcp/pkg/reconciler/apis/replicateclusterrole" apisreplicateclusterrolebinding "github.com/kcp-dev/kcp/pkg/reconciler/apis/replicateclusterrolebinding" @@ -748,6 +749,7 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), s.KcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), s.KcpSharedInformerFactory.Apis().V1alpha1().APIConversions(), + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIConversions(), @@ -995,6 +997,40 @@ func (s *Server) installCRDCleanupController(ctx context.Context, config *rest.C }) } +func (s *Server) installLogicalClusterCleanupController(ctx context.Context, config *rest.Config) error { + config = rest.CopyConfig(config) + config = rest.AddUserAgent(config, logicalclustercleanup.ControllerName) + + kcpClusterClient, err := kcpclientset.NewForConfig(config) + if err != nil { + return err + } + + c, err := logicalclustercleanup.NewController( + kcpClusterClient, + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + s.ApiExtensionsSharedInformerFactory.Apiextensions().V1().CustomResourceDefinitions(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + ) + if err != nil { + return err + } + + return s.registerController(&controllerWrapper{ + Name: logicalclustercleanup.ControllerName, + Wait: func(ctx context.Context, s *Server) error { + return wait.PollUntilContextCancel(ctx, waitPollInterval, true, func(ctx context.Context) (bool, error) { + return s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters().Informer().HasSynced() && + s.ApiExtensionsSharedInformerFactory.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() && + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().HasSynced(), nil + }) + }, + Runner: func(ctx context.Context) { + c.Start(ctx, 2) + }, + }) +} + func (s *Server) installAPIExportController(ctx context.Context, config *rest.Config) error { config = rest.CopyConfig(config) config = rest.AddUserAgent(config, apiexport.ControllerName) diff --git a/pkg/server/server.go b/pkg/server/server.go index 2363e03fc4b..177d1e9c8f0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -252,6 +252,9 @@ func (s *Server) installControllers(ctx context.Context, controllerConfig *rest. if err := s.installCRDCleanupController(ctx, controllerConfig); err != nil { return err } + if err := s.installLogicalClusterCleanupController(ctx, controllerConfig); err != nil { + return err + } if err := s.installExtraAnnotationSyncController(ctx, controllerConfig); err != nil { return err } diff --git a/sdk/apis/apis/v1alpha1/types_apibinding.go b/sdk/apis/apis/v1alpha1/types_apibinding.go index fd7d1e282f3..5743cf61552 100644 --- a/sdk/apis/apis/v1alpha1/types_apibinding.go +++ b/sdk/apis/apis/v1alpha1/types_apibinding.go @@ -181,6 +181,13 @@ type APIBindingStatus struct { // These are valid conditions of APIBinding. const ( + // APIBindingValid is a condition for APIBinding that reflects the validity of the APIBinding. + APIBindingValid conditionsv1alpha1.ConditionType = "APIBindingValid" + + // APIBindingLogicalClusterNotFoundReason is a reason for the APIBindingValid condition that + // the LogicalCluster has not been found. + APIBindingLogicalClusterNotFoundReason = "LogicalClusterNotFound" + // APIExportValid is a condition for APIBinding that reflects the validity of the referenced APIExport. APIExportValid conditionsv1alpha1.ConditionType = "APIExportValid" @@ -203,6 +210,9 @@ const ( // WaitingForEstablishedReason is a reason for the InitialBindingCompleted condition that the bound CRDs are not ready. WaitingForEstablishedReason = "WaitingForEstablished" + // GroupResourceConflictReason is a reason for the InitialBindingCompleted condition that there is a conflict with other bindings or CRDs. + GroupResourceConflictReason = "GroupResourceConflict" + // BindingUpToDate is a condition for APIBinding that indicates that the APIs currently bound are up-to-date with // the binding's desired export. BindingUpToDate conditionsv1alpha1.ConditionType = "BindingUpToDate"