diff --git a/Makefile b/Makefile index afb51a421e6..8631749f04f 100644 --- a/Makefile +++ b/Makefile @@ -336,6 +336,20 @@ test-e2e-sharded-minimal: build-all $(SUITES_ARGS) \ $(if $(value WAIT),|| { echo "Terminated with $$?"; wait "$$PID"; },) +# This is just easy target to run 2 shard test server locally until manually killed. +# You can targer test to it by running: +# go test ./test/e2e/apibinding/... --kcp-kubeconfig=$(pwd)/.kcp/admin.kubeconfig --shard-kubeconfigs=root=$(pwd)/.kcp-0/admin.kubeconfig -run=^TestAPIBindingEndpointSlicesSharded$ +test-run-sharded-server: WORK_DIR ?= . +test-run-sharded-server: LOG_DIR ?= $(WORK_DIR)/.kcp +test-run-sharded-server: + mkdir -p "$(LOG_DIR)" "$(WORK_DIR)/.kcp" + rm -f "$(WORK_DIR)/.kcp/ready-to-test" + UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 ./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false --shard-feature-gates=$(TEST_FEATURE_GATES) $(TEST_SERVER_ARGS) --number-of-shards=2 2>&1 & PID=$$!; echo "PID $$PID" && \ + trap 'kill -TERM $$PID' TERM INT EXIT && \ + while [ ! -f "$(WORK_DIR)/.kcp/ready-to-test" ]; do sleep 1; done && \ + echo 'Server started' && \ + wait $$PID + .PHONY: test ifdef USE_GOTESTSUM test: $(GOTESTSUM) diff --git a/config/crds/apis.kcp.io_apiexportendpointslices.yaml b/config/crds/apis.kcp.io_apiexportendpointslices.yaml index bca294c4a10..dbe1b2d2cf1 100644 --- a/config/crds/apis.kcp.io_apiexportendpointslices.yaml +++ b/config/crds/apis.kcp.io_apiexportendpointslices.yaml @@ -148,6 +148,15 @@ spec: - url type: object type: array + x-kubernetes-list-map-keys: + - url + x-kubernetes-list-type: map + shardSelector: + description: |- + shardSelector is the selector used to filter the shards. It is used to filter the shards + when determining partition scope when deriving the endpoints. This is set by owning shard, + and is used by follower shards to determine if its inscope or not. + type: string type: object type: object served: true diff --git a/pkg/authorization/bootstrap/policy.go b/pkg/authorization/bootstrap/policy.go index dadc0dbe013..ee234eee7ad 100644 --- a/pkg/authorization/bootstrap/policy.go +++ b/pkg/authorization/bootstrap/policy.go @@ -24,6 +24,7 @@ import ( rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest" "k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac/bootstrappolicy" + "github.com/kcp-dev/kcp/sdk/apis/apis" "github.com/kcp-dev/kcp/sdk/apis/core" "github.com/kcp-dev/kcp/sdk/apis/tenancy" ) @@ -101,6 +102,13 @@ func clusterRoles() []rbacv1.ClusterRole { rbacv1helpers.NewRule("access").URLs("/").RuleOrDie(), }, }, + { + ObjectMeta: metav1.ObjectMeta{Name: SystemExternalLogicalClusterAdmin}, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("update", "patch", "get").Groups(apis.GroupName).Resources("apiexportendpointslices/status").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch").Groups(apis.GroupName).Resources("apiexportendpointslices").RuleOrDie(), + }, + }, } } diff --git a/pkg/indexers/apiexport.go b/pkg/indexers/apiexport.go index fb9f1c18a0c..b5212edb731 100644 --- a/pkg/indexers/apiexport.go +++ b/pkg/indexers/apiexport.go @@ -17,6 +17,8 @@ limitations under the License. package indexers import ( + "fmt" + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" "github.com/kcp-dev/logicalcluster/v3" @@ -33,6 +35,8 @@ const ( // APIExportByClaimedIdentities is the indexer name for retrieving APIExports that have a permission claim for a // particular identity hash. APIExportByClaimedIdentities = "APIExportByClaimedIdentities" + // APIExportEndpointSliceByAPIExport is the indexer name for retrieving APIExportEndpointSlices by their APIExport's Reference Path and Name. + APIExportEndpointSliceByAPIExport = "APIExportEndpointSliceByAPIExport" ) // IndexAPIExportByIdentity is an index function that indexes an APIExport by its identity hash. @@ -72,3 +76,23 @@ func IndexAPIExportByClaimedIdentities(obj interface{}) ([]string, error) { } return sets.List[string](claimedIdentities), nil } + +// IndexAPIExportEndpointSliceByAPIExportFunc indexes the APIExportEndpointSlice by their APIExport's Reference Path and Name. +func IndexAPIExportEndpointSliceByAPIExport(obj interface{}) ([]string, error) { + apiExportEndpointSlice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice) + if !ok { + return []string{}, fmt.Errorf("obj %T is not an APIExportEndpointSlice", obj) + } + + var result []string + pathRemote := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) + if !pathRemote.Empty() { + result = append(result, pathRemote.Join(apiExportEndpointSlice.Spec.APIExport.Name).String()) + } + pathLocal := logicalcluster.From(apiExportEndpointSlice).Path() + if !pathLocal.Empty() { + result = append(result, pathLocal.Join(apiExportEndpointSlice.Spec.APIExport.Name).String()) + } + + return result, nil +} diff --git a/pkg/indexers/apiexport_test.go b/pkg/indexers/apiexport_test.go new file mode 100644 index 00000000000..3ef5ba29ee6 --- /dev/null +++ b/pkg/indexers/apiexport_test.go @@ -0,0 +1,95 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package indexers + +import ( + "reflect" + "testing" + + "github.com/kcp-dev/logicalcluster/v3" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" +) + +func TestIndexAPIExportEndpointSliceByAPIExport(t *testing.T) { + tests := map[string]struct { + obj interface{} + want []string + wantErr bool + }{ + "not an APIExportEndpointSlice": { + obj: "not an APIExportEndpointSlice", + want: []string{}, + wantErr: true, + }, + "valid APIExportEndpointSlice": { + obj: &apisv1alpha1.APIExportEndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + logicalcluster.AnnotationKey: "root:local", + }, + Name: "foo", + }, + Spec: apisv1alpha1.APIExportEndpointSliceSpec{ + APIExport: apisv1alpha1.ExportBindingReference{ + Path: "root:default", + Name: "foo", + }, + }, + }, + want: []string{ + logicalcluster.NewPath("root:default:foo").String(), + logicalcluster.NewPath("root:local:foo").String(), + }, + wantErr: false, + }, + "valid APIExportEndpointSlice local to export": { + obj: &apisv1alpha1.APIExportEndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + logicalcluster.AnnotationKey: "root:local", + }, + Name: "foo", + }, + Spec: apisv1alpha1.APIExportEndpointSliceSpec{ + APIExport: apisv1alpha1.ExportBindingReference{ + Name: "foo", + }, + }, + }, + want: []string{ + logicalcluster.NewPath("root:local:foo").String(), + }, + wantErr: false, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + got, err := IndexAPIExportEndpointSliceByAPIExport(tt.obj) + if (err != nil) != tt.wantErr { + t.Errorf("IndexAPIExportEndpointSliceByAPIExport() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("IndexAPIExportEndpointSliceByAPIExport() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/openapi/zz_generated.openapi.go b/pkg/openapi/zz_generated.openapi.go index d8dc9f7be05..5aa662f19c5 100644 --- a/pkg/openapi/zz_generated.openapi.go +++ b/pkg/openapi/zz_generated.openapi.go @@ -776,6 +776,14 @@ func schema_sdk_apis_apis_v1alpha1_APIExportEndpointSliceStatus(ref common.Refer }, }, "endpoints": { + VendorExtensible: spec.VendorExtensible{ + Extensions: spec.Extensions{ + "x-kubernetes-list-map-keys": []interface{}{ + "url", + }, + "x-kubernetes-list-type": "map", + }, + }, SchemaProps: spec.SchemaProps{ Description: "endpoints contains all the URLs of the APIExport service.", Type: []string{"array"}, @@ -789,6 +797,13 @@ func schema_sdk_apis_apis_v1alpha1_APIExportEndpointSliceStatus(ref common.Refer }, }, }, + "shardSelector": { + SchemaProps: spec.SchemaProps{ + Description: "shardSelector is the selector used to filter the shards. It is used to filter the shards when determining partition scope when deriving the endpoints. This is set by owning shard, and is used by follower shards to determine if its inscope or not.", + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go index 344c26cbb79..6ed2ab42f25 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go @@ -19,14 +19,15 @@ package apiexportendpointslice import ( "context" "fmt" - "reflect" "time" + "github.com/go-logr/logr" kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -41,12 +42,10 @@ import ( "github.com/kcp-dev/kcp/pkg/reconciler/events" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" "github.com/kcp-dev/kcp/sdk/apis/core" - corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" topologyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/topology/v1alpha1" kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" apisv1alpha1client "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/typed/apis/v1alpha1" - apisinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" - corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" + apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" topologyinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/topology/v1alpha1" ) @@ -57,9 +56,8 @@ const ( // NewController returns a new controller for APIExportEndpointSlices. // Shards and APIExports are read from the cache server. func NewController( - apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer, - globalShardClusterInformer corev1alpha1informers.ShardClusterInformer, - globalAPIExportClusterInformer apisinformers.APIExportClusterInformer, + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, + globalAPIExportClusterInformer apisv1alpha1informers.APIExportClusterInformer, partitionClusterInformer topologyinformers.PartitionClusterInformer, kcpClusterClient kcpclientset.ClusterInterface, ) (*controller, error) { @@ -73,11 +71,8 @@ func NewController( listAPIExportEndpointSlices: func() ([]*apisv1alpha1.APIExportEndpointSlice, error) { return apiExportEndpointSliceClusterInformer.Lister().List(labels.Everything()) }, - listShards: func(selector labels.Selector) ([]*corev1alpha1.Shard, error) { - return globalShardClusterInformer.Lister().List(selector) - }, - getAPIExportEndpointSlice: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error) { - return apiExportEndpointSliceClusterInformer.Lister().Cluster(clusterName).Get(name) + getAPIExportEndpointSlice: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExportEndpointSlice, error) { + return indexers.ByPathAndName[*apisv1alpha1.APIExportEndpointSlice](apisv1alpha1.Resource("apiexportendpointslices"), apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), path, name) }, getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), globalAPIExportClusterInformer.Informer().GetIndexer(), path, name) @@ -104,50 +99,38 @@ func NewController( commit: committer.NewCommitter[*APIExportEndpointSlice, Patcher, *APIExportEndpointSliceSpec, *APIExportEndpointSliceStatus](kcpClusterClient.ApisV1alpha1().APIExportEndpointSlices()), } + logger := logging.WithReconciler(klog.Background(), ControllerName) + _, _ = apiExportEndpointSliceClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - c.enqueueAPIExportEndpointSlice(obj) + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](obj), logger, "") }, UpdateFunc: func(_, newObj interface{}) { - c.enqueueAPIExportEndpointSlice(newObj) + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](newObj), logger, "") }, DeleteFunc: func(obj interface{}) { - c.enqueueAPIExportEndpointSlice(obj) + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](obj), logger, "") }, }) _, _ = globalAPIExportClusterInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - c.enqueueAPIExportEndpointSlicesForAPIExport(obj) + c.enqueueAPIExportEndpointSlicesForAPIExport(objOrTombstone[*apisv1alpha1.APIExport](obj), logger) }, DeleteFunc: func(obj interface{}) { - c.enqueueAPIExportEndpointSlicesForAPIExport(obj) - }, - })) - - _, _ = globalShardClusterInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - c.enqueueAllAPIExportEndpointSlices(obj) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if filterShardEvent(oldObj, newObj) { - c.enqueueAllAPIExportEndpointSlices(newObj) - } - }, - DeleteFunc: func(obj interface{}) { - c.enqueueAllAPIExportEndpointSlices(obj) + c.enqueueAPIExportEndpointSlicesForAPIExport(objOrTombstone[*apisv1alpha1.APIExport](obj), logger) }, })) _, _ = partitionClusterInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - c.enqueuePartition(obj) + c.enqueuePartition(objOrTombstone[*topologyv1alpha1.Partition](obj), logger) }, UpdateFunc: func(_, newObj interface{}) { - c.enqueuePartition(newObj) + c.enqueuePartition(objOrTombstone[*topologyv1alpha1.Partition](newObj), logger) }, DeleteFunc: func(obj interface{}) { - c.enqueuePartition(obj) + c.enqueuePartition(objOrTombstone[*topologyv1alpha1.Partition](obj), logger) }, })) @@ -166,45 +149,34 @@ type CommitFunc = func(context.Context, *Resource, *Resource) error type controller struct { queue workqueue.TypedRateLimitingInterface[string] - listShards func(selector labels.Selector) ([]*corev1alpha1.Shard, error) listAPIExportEndpointSlices func() ([]*apisv1alpha1.APIExportEndpointSlice, error) - getAPIExportEndpointSlice func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error) + getAPIExportEndpointSlice func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExportEndpointSlice, error) getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) getPartition func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.Partition, error) getAPIExportEndpointSlicesByPartition func(key string) ([]*apisv1alpha1.APIExportEndpointSlice, error) - apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer commit CommitFunc } // enqueueAPIExportEndpointSlice enqueues an APIExportEndpointSlice. -func (c *controller) enqueueAPIExportEndpointSlice(obj interface{}) { - key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) +func (c *controller) enqueueAPIExportEndpointSlice(slice *apisv1alpha1.APIExportEndpointSlice, logger logr.Logger, logSuffix string) { + key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(slice) if err != nil { utilruntime.HandleError(err) return } - logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), key) - logger.V(4).Info("queueing APIExportEndpointSlice") + logger.V(4).Info(fmt.Sprintf("queueing APIExportEndpointSlice%s", logSuffix)) c.queue.Add(key) } // enqueueAPIExportEndpointSlicesForAPIExport enqueues APIExportEndpointSlices referencing a specific APIExport. -func (c *controller) enqueueAPIExportEndpointSlicesForAPIExport(obj interface{}) { - if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = d.Obj - } - export, ok := obj.(*apisv1alpha1.APIExport) - if !ok { - utilruntime.HandleError(fmt.Errorf("obj is supposed to be a APIExport, but is %T", obj)) - return - } - +func (c *controller) enqueueAPIExportEndpointSlicesForAPIExport(export *apisv1alpha1.APIExport, logger logr.Logger) { // binding keys by full path keys := sets.New[string]() if path := logicalcluster.NewPath(export.Annotations[core.LogicalClusterPathAnnotationKey]); !path.Empty() { - pathKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexAPIExportEndpointSliceByAPIExport, path.Join(export.Name).String()) + pathKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, path.Join(export.Name).String()) if err != nil { utilruntime.HandleError(err) return @@ -212,7 +184,7 @@ func (c *controller) enqueueAPIExportEndpointSlicesForAPIExport(obj interface{}) keys.Insert(pathKeys...) } - clusterKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexAPIExportEndpointSliceByAPIExport, logicalcluster.From(export).Path().Join(export.Name).String()) + clusterKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, logicalcluster.From(export).Path().Join(export.Name).String()) if err != nil { utilruntime.HandleError(err) return @@ -227,35 +199,12 @@ func (c *controller) enqueueAPIExportEndpointSlicesForAPIExport(obj interface{}) } else if !exists { continue } - logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*apisv1alpha1.APIExport)) - logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlices because of referenced APIExport") - c.enqueueAPIExportEndpointSlice(slice) - } -} - -// enqueueAllAPIExportEndpointSlices enqueues all APIExportEndpointSlices. -func (c *controller) enqueueAllAPIExportEndpointSlices(shard interface{}) { - list, err := c.listAPIExportEndpointSlices() - if err != nil { - utilruntime.HandleError(err) - return - } - - logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), shard.(*corev1alpha1.Shard)) - for i := range list { - key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(list[i]) - if err != nil { - utilruntime.HandleError(err) - continue - } - - logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlice because Shard changed") - c.queue.Add(key) + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](slice), logger, " because of referenced APIExport") } } // enqueuePartition maps a Partition to APIExportEndpointSlices for enqueuing. -func (c *controller) enqueuePartition(obj interface{}) { +func (c *controller) enqueuePartition(obj *topologyv1alpha1.Partition, logger logr.Logger) { key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(err) @@ -268,10 +217,8 @@ func (c *controller) enqueuePartition(obj interface{}) { return } - logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*topologyv1alpha1.Partition)) - logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlices because Partition changed") for _, slice := range slices { - c.enqueueAPIExportEndpointSlice(slice) + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](slice), logger, " because of Partition change") } } @@ -328,7 +275,7 @@ func (c *controller) process(ctx context.Context, key string) error { utilruntime.HandleError(err) return nil } - obj, err := c.getAPIExportEndpointSlice(clusterName, name) + obj, err := c.getAPIExportEndpointSlice(clusterName.Path(), name) if err != nil { if errors.IsNotFound(err) { return nil // object deleted before we handled it @@ -353,6 +300,7 @@ func (c *controller) process(ctx context.Context, key string) error { // If the object being reconciled changed as a result, update it. oldResource := &Resource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status} newResource := &Resource{ObjectMeta: obj.ObjectMeta, Spec: &obj.Spec, Status: &obj.Status} + if err := c.commit(ctx, oldResource, newResource); err != nil { errs = append(errs, err) } @@ -360,36 +308,36 @@ func (c *controller) process(ctx context.Context, key string) error { return utilerrors.NewAggregate(errs) } -// filterShardEvent returns true if the event passes the filter and needs to be processed false otherwise. -func filterShardEvent(oldObj, newObj interface{}) bool { - oldShard, ok := oldObj.(*corev1alpha1.Shard) - if !ok { - return false - } - newShard, ok := newObj.(*corev1alpha1.Shard) - if !ok { - return false - } - if oldShard.Spec.VirtualWorkspaceURL != newShard.Spec.VirtualWorkspaceURL { - return true - } - if !reflect.DeepEqual(oldShard.Labels, newShard.Labels) { - return true - } - return false -} - // InstallIndexers adds the additional indexers that this controller requires to the informers. -func InstallIndexers(globalAPIExportClusterInformer apisinformers.APIExportClusterInformer, apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer) { +func InstallIndexers( + globalAPIExportClusterInformer apisv1alpha1informers.APIExportClusterInformer, + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, +) { indexers.AddIfNotPresentOrDie(globalAPIExportClusterInformer.Informer().GetIndexer(), cache.Indexers{ indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, }) - indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ - indexAPIExportEndpointSliceByAPIExport: indexAPIExportEndpointSliceByAPIExportFunc, + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.APIExportEndpointSliceByAPIExport: indexers.IndexAPIExportEndpointSliceByAPIExport, }) - indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ indexAPIExportEndpointSlicesByPartition: indexAPIExportEndpointSlicesByPartitionFunc, }) } + +func objOrTombstone[T runtime.Object](obj any) T { + if t, ok := obj.(T); ok { + return t + } + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if t, ok := tombstone.Obj.(T); ok { + return t + } + + panic(fmt.Errorf("tombstone %T is not a %T", tombstone, new(T))) + } + + panic(fmt.Errorf("%T is not a %T", obj, new(T))) +} diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller_test.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller_test.go index 002e9c5e69e..028c150b9c7 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller_test.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller_test.go @@ -18,7 +18,6 @@ package apiexportendpointslice import ( "context" - "errors" "fmt" "testing" @@ -28,10 +27,8 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" - corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" conditionsv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/apis/conditions/v1alpha1" "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" topologyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/topology/v1alpha1" @@ -46,85 +43,34 @@ func TestReconcile(t *testing.T) { listShardsError error errorReason string - wantError bool - wantVerifyFailure bool - wantAPIExportEndpointSliceURLsError bool - wantAPIExportEndpointSliceURLsReady bool - wantAPIExportEndpointSliceURLsUnknown bool - wantAPIExportValid bool - wantPartitionValid bool - wantAPIExportNotValid bool - wantPartitionNotValid bool + wantError bool + wantVerifyFailure bool + wantAPIExportValid bool + wantPartitionValid bool + wantAPIExportNotValid bool + wantPartitionNotValid bool }{ - "error listing shards": { - listShardsError: errors.New("foo"), - wantError: true, - wantAPIExportEndpointSliceURLsError: true, - }, "APIExportValid set to false when APIExport is missing": { apiExportMissing: true, errorReason: apisv1alpha1.APIExportNotFoundReason, wantAPIExportNotValid: true, }, "APIExportValid set to false if an internal error happens when fetching the APIExport": { - apiExportInternalErr: true, - wantError: true, - errorReason: apisv1alpha1.InternalErrorReason, - wantAPIExportNotValid: true, - wantAPIExportEndpointSliceURLsUnknown: true, + apiExportInternalErr: true, + wantError: true, + errorReason: apisv1alpha1.InternalErrorReason, + wantAPIExportNotValid: true, }, "PartitionValid set to false when the Partition is missing": { - partitionMissing: true, - errorReason: apisv1alpha1.PartitionInvalidReferenceReason, - wantPartitionNotValid: true, - wantAPIExportEndpointSliceURLsError: true, - }, - "APIExportEndpointSliceURLs set when no issue": { - wantAPIExportEndpointSliceURLsReady: true, - wantAPIExportValid: true, - wantPartitionValid: true, + partitionMissing: true, + errorReason: apisv1alpha1.PartitionInvalidReferenceReason, + wantPartitionNotValid: true, }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { c := &controller{ - listShards: func(selector labels.Selector) ([]*corev1alpha1.Shard, error) { - if tc.listShardsError != nil { - return nil, tc.listShardsError - } - - return []*corev1alpha1.Shard{ - { - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - logicalcluster.AnnotationKey: "root:org:ws", - }, - Labels: map[string]string{ - "region": "Europe", - }, - Name: "shard1", - }, - Spec: corev1alpha1.ShardSpec{ - VirtualWorkspaceURL: "https://server-1.kcp.dev/", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - logicalcluster.AnnotationKey: "root:org:ws", - }, - Labels: map[string]string{ - "region": "Europe", - }, - Name: "shard2", - }, - Spec: corev1alpha1.ShardSpec{ - VirtualWorkspaceURL: "https://server-2.kcp.dev/", - }, - }, - }, nil - }, getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { if tc.apiExportMissing { return nil, apierrors.NewNotFound(apisv1alpha1.Resource("APIExport"), name) @@ -186,35 +132,6 @@ func TestReconcile(t *testing.T) { require.NoError(t, err, "expected no error") } - if tc.wantAPIExportEndpointSliceURLsError { - requireConditionMatches(t, apiExportEndpointSlice, - conditions.FalseCondition( - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - conditionsv1alpha1.ConditionSeverityError, - "", - ), - ) - } - - if tc.wantAPIExportEndpointSliceURLsReady { - requireConditionMatches(t, apiExportEndpointSlice, conditions.TrueCondition(apisv1alpha1.APIExportEndpointSliceURLsReady)) - require.Equal(t, []apisv1alpha1.APIExportEndpoint{ - {URL: "https://server-1.kcp.dev/services/apiexport/root:org:ws/my-export"}, - {URL: "https://server-2.kcp.dev/services/apiexport/root:org:ws/my-export"}, - }, apiExportEndpointSlice.Status.APIExportEndpoints) - } - - if tc.wantAPIExportEndpointSliceURLsUnknown { - requireConditionMatches(t, apiExportEndpointSlice, - conditions.UnknownCondition( - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - "", - ), - ) - } - if tc.wantAPIExportNotValid { requireConditionMatches(t, apiExportEndpointSlice, conditions.FalseCondition( diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go index b55d565e8d1..6b052a7a989 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go @@ -26,24 +26,9 @@ import ( ) const ( - indexAPIExportEndpointSliceByAPIExport = "indexAPIExportEndpointSliceByAPIExport" indexAPIExportEndpointSlicesByPartition = "indexAPIExportEndpointSlicesByPartition" ) -// indexAPIExportEndpointSliceByAPIExportFunc indexes the APIExportEndpointSlice by their APIExport's Reference Path and Name. -func indexAPIExportEndpointSliceByAPIExportFunc(obj interface{}) ([]string, error) { - apiExportEndpointSlice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice) - if !ok { - return []string{}, fmt.Errorf("obj %T is not an APIExportEndpointSlice", obj) - } - - path := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) - if path.Empty() { - path = logicalcluster.From(apiExportEndpointSlice).Path() - } - return []string{path.Join(apiExportEndpointSlice.Spec.APIExport.Name).String()}, nil -} - // indexAPIExportEndpointSlicesByPartitionFunc is an index function that maps a Partition to the key for its // spec.partition. func indexAPIExportEndpointSlicesByPartitionFunc(obj interface{}) ([]string, error) { diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_reconcile.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_reconcile.go index 5aaf4a73e38..9461f24413e 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_reconcile.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_reconcile.go @@ -18,36 +18,26 @@ package apiexportendpointslice import ( "context" - "net/url" - "path" "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/klog/v2" - virtualworkspacesoptions "github.com/kcp-dev/kcp/cmd/virtual-workspaces/options" - "github.com/kcp-dev/kcp/pkg/logging" - apiexportbuilder "github.com/kcp-dev/kcp/pkg/virtual/apiexport/builder" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" - corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" conditionsv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/apis/conditions/v1alpha1" "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" topologyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/topology/v1alpha1" ) type endpointsReconciler struct { - listShards func(selector labels.Selector) ([]*corev1alpha1.Shard, error) getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) getPartition func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.Partition, error) } func (c *controller) reconcile(ctx context.Context, apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice) error { r := &endpointsReconciler{ - listShards: c.listShards, getAPIExport: c.getAPIExport, getPartition: c.getPartition, } @@ -55,18 +45,19 @@ func (c *controller) reconcile(ctx context.Context, apiExportEndpointSlice *apis return r.reconcile(ctx, apiExportEndpointSlice) } -func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice) error { - // TODO (fgiloux): When the information is available in the cache server - // check if at least one APIBinding is bound in the shard to the APIExport referenced by the APIExportEndpointSlice. - // If so, add the respective endpoint to the status. - // For now the unfiltered list is added. +func (r *endpointsReconciler) reconcile(_ context.Context, apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice) error { + // TODO(mjudeikis): Remove this at some point once we confident that we don't use it anymore. + conditions.Delete( + apiExportEndpointSlice, + apisv1alpha1.ErrorGeneratingURLsReason, + ) // Get APIExport apiExportPath := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) if apiExportPath.Empty() { apiExportPath = logicalcluster.From(apiExportEndpointSlice).Path() } - apiExport, err := r.getAPIExport(apiExportPath, apiExportEndpointSlice.Spec.APIExport.Name) + _, err := r.getAPIExport(apiExportPath, apiExportEndpointSlice.Spec.APIExport.Name) if err != nil { reason := apisv1alpha1.InternalErrorReason if errors.IsNotFound(err) { @@ -81,13 +72,6 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl apiExportPath, apiExportEndpointSlice.Spec.APIExport.Name, ) - conditions.MarkFalse( - apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - conditionsv1alpha1.ConditionSeverityError, - "", - ) // No need to try again return nil } else { @@ -100,12 +84,6 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl apiExportPath, apiExportEndpointSlice.Spec.APIExport.Name, ) - conditions.MarkUnknown( - apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - "", - ) return err } } @@ -128,13 +106,6 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl "%v", err, ) - conditions.MarkFalse( - apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - conditionsv1alpha1.ConditionSeverityError, - "", - ) // No need to try again return nil } else { @@ -146,12 +117,6 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl "%v", err, ) - conditions.MarkUnknown( - apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - "", - ) return err } } @@ -165,90 +130,18 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl "%v", err, ) - conditions.MarkFalse( - apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - conditionsv1alpha1.ConditionSeverityError, - "", - ) return err } } if selector == nil { selector = labels.Everything() } - conditions.MarkTrue(apiExportEndpointSlice, apisv1alpha1.PartitionValid) - - // Get shards - shards, err := r.listShards(selector) - if err != nil { - conditions.MarkFalse( - apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - conditionsv1alpha1.ConditionSeverityError, - "error listing shards", - ) - return err - } - - if err = r.updateEndpoints(ctx, apiExportEndpointSlice, apiExport, shards); err != nil { - conditions.MarkFalse( - apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - conditionsv1alpha1.ConditionSeverityError, - "%v", - err, - ) - return err - } - conditions.MarkTrue(apiExportEndpointSlice, apisv1alpha1.APIExportEndpointSliceURLsReady) - - return nil -} - -func (r *endpointsReconciler) updateEndpoints(ctx context.Context, - apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice, - apiExport *apisv1alpha1.APIExport, - shards []*corev1alpha1.Shard) error { - logger := klog.FromContext(ctx) - desiredURLs := sets.New[string]() - for _, shard := range shards { - if shard.Spec.VirtualWorkspaceURL == "" { - continue - } - u, err := url.Parse(shard.Spec.VirtualWorkspaceURL) - if err != nil { - // Should never happen - logger = logging.WithObject(logger, shard) - logger.Error( - err, "error parsing shard.spec.virtualWorkspaceURL", - "VirtualWorkspaceURL", shard.Spec.VirtualWorkspaceURL, - ) - - continue - } - - u.Path = path.Join( - u.Path, - virtualworkspacesoptions.DefaultRootPathPrefix, - apiexportbuilder.VirtualWorkspaceName, - logicalcluster.From(apiExport).String(), - apiExport.Name, - ) - - desiredURLs.Insert(u.String()) - } + conditions.MarkTrue(apiExportEndpointSlice, apisv1alpha1.PartitionValid) - apiExportEndpointSlice.Status.APIExportEndpoints = nil - for _, u := range sets.List[string](desiredURLs) { - apiExportEndpointSlice.Status.APIExportEndpoints = append(apiExportEndpointSlice.Status.APIExportEndpoints, apisv1alpha1.APIExportEndpoint{ - URL: u, - }) - } + // We presenrve selector in the status for url generation. Else we don't know partition selector + // without propagating partitions over the cache. + apiExportEndpointSlice.Status.ShardSelector = selector.String() return nil } diff --git a/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_controller.go b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_controller.go new file mode 100644 index 00000000000..ae1c893bff0 --- /dev/null +++ b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_controller.go @@ -0,0 +1,377 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiexportendpointsliceurls + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "github.com/kcp-dev/kcp/pkg/indexers" + "github.com/kcp-dev/kcp/pkg/logging" + "github.com/kcp-dev/kcp/pkg/reconciler/events" + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/core" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + apisv1alpha1apply "github.com/kcp-dev/kcp/sdk/client/applyconfiguration/apis/v1alpha1" + kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" + apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" + corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" +) + +const ( + ControllerName = "kcp-apiexportendpointslice-urls" +) + +// NewController returns a new controller for APIExportEndpointSlices. +// Shards and APIExports are read from the cache server. +func NewController( + shardName string, + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, + apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer, + globalAPIExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, + globalShardClusterInformer corev1alpha1informers.ShardClusterInformer, + globalAPIExportClusterInformer apisv1alpha1informers.APIExportClusterInformer, + clusterClient kcpclientset.ClusterInterface, +) (*controller, error) { + c := &controller{ + shardName: shardName, + clusterClient: clusterClient, + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: ControllerName, + }, + ), + getMyShard: func() (*corev1alpha1.Shard, error) { + return globalShardClusterInformer.Cluster(core.RootCluster).Lister().Get(shardName) + }, + getAPIExportEndpointSlice: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExportEndpointSlice, error) { + obj, err := indexers.ByPathAndNameWithFallback[*apisv1alpha1.APIExportEndpointSlice](apisv1alpha1.Resource("apiexportendpointslices"), apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), globalAPIExportEndpointSliceClusterInformer.Informer().GetIndexer(), path, name) + if err != nil { + return nil, err + } + return obj, err + }, + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), globalAPIExportClusterInformer.Informer().GetIndexer(), path, name) + }, + listAPIBindingsByAPIExport: func(export *apisv1alpha1.APIExport) ([]*apisv1alpha1.APIBinding, error) { + // binding keys by full path + keys := sets.New[string]() + if path := logicalcluster.NewPath(export.Annotations[core.LogicalClusterPathAnnotationKey]); !path.Empty() { + pathKeys, err := apiBindingInformer.Informer().GetIndexer().IndexKeys(indexers.APIBindingsByAPIExport, path.Join(export.Name).String()) + if err != nil { + return nil, err + } + keys.Insert(pathKeys...) + } + + clusterKeys, err := apiBindingInformer.Informer().GetIndexer().IndexKeys(indexers.APIBindingsByAPIExport, logicalcluster.From(export).Path().Join(export.Name).String()) + if err != nil { + return nil, err + } + keys.Insert(clusterKeys...) + + bindings := make([]*apisv1alpha1.APIBinding, 0, keys.Len()) + for _, key := range sets.List[string](keys) { + binding, exists, err := apiBindingInformer.Informer().GetIndexer().GetByKey(key) + if err != nil { + utilruntime.HandleError(err) + continue + } else if !exists { + utilruntime.HandleError(fmt.Errorf("APIBinding %q does not exist", key)) + continue + } + bindings = append(bindings, binding.(*apisv1alpha1.APIBinding)) + } + return bindings, nil + }, + patchAPIExportEndpointSlice: func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error { + _, err := clusterClient.ApisV1alpha1().APIExportEndpointSlices().Cluster(cluster).ApplyStatus(ctx, patch, metav1.ApplyOptions{ + FieldManager: shardName, + }) + return err + }, + apiExportEndpointSliceClusterInformer: apiExportEndpointSliceClusterInformer, + globalApiExportEndpointSliceClusterInformer: globalAPIExportEndpointSliceClusterInformer, + } + + logger := logging.WithReconciler(klog.Background(), ControllerName) + + _, _ = apiExportEndpointSliceClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](obj), logger, "") + }, + UpdateFunc: func(_, newObj interface{}) { + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](newObj), logger, "") + }, + DeleteFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](obj), logger, "") + }, + }) + + _, _ = apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSliceByAPIBinding(objOrTombstone[*apisv1alpha1.APIBinding](obj), logger) + }, + UpdateFunc: func(_, newObj interface{}) { + c.enqueueAPIExportEndpointSliceByAPIBinding(objOrTombstone[*apisv1alpha1.APIBinding](newObj), logger) + }, + DeleteFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSliceByAPIBinding(objOrTombstone[*apisv1alpha1.APIBinding](obj), logger) + }, + }) + + _, _ = globalAPIExportEndpointSliceClusterInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](obj), logger, " from cache") + }, + UpdateFunc: func(_, newObj interface{}) { + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](newObj), logger, " from cache") + }, + DeleteFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](obj), logger, " from cache") + }, + })) + + return c, nil +} + +// controller reconciles APIExportEndpointSlices. It ensures that the shard endpoints are populated +// in the status of every APIExportEndpointSlices. +type controller struct { + queue workqueue.TypedRateLimitingInterface[string] + shardName string + clusterClient kcpclientset.ClusterInterface + + getMyShard func() (*corev1alpha1.Shard, error) + getAPIExportEndpointSlice func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExportEndpointSlice, error) + getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) + listAPIBindingsByAPIExport func(apiexport *apisv1alpha1.APIExport) ([]*apisv1alpha1.APIBinding, error) + patchAPIExportEndpointSlice func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error + + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer + globalApiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer +} + +func (c *controller) enqueueAPIExportEndpointSliceByAPIBinding(binding *apisv1alpha1.APIBinding, logger logr.Logger) { + { // local to shard + keys := sets.New[string]() + if path := logicalcluster.NewPath(binding.Spec.Reference.Export.Path); !path.Empty() { // This is remote apibinding. + pathKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, path.Join(binding.Spec.Reference.Export.Name).String()) + if err != nil { + utilruntime.HandleError(err) + return + } + keys.Insert(pathKeys...) + } else { + // This is local apibinding to the export. Meaning it has path set to empty string, so apiexport is in the same cluster as the binding. + // While our CLI does not allow this, it is possible to create such a binding via the API. + clusterKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, logicalcluster.From(binding).Path().Join(binding.Spec.Reference.Export.Name).String()) + if err != nil { + utilruntime.HandleError(err) + return + } + keys.Insert(clusterKeys...) + } + + for _, key := range sets.List[string](keys) { + slice, exists, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().GetByKey(key) + if err != nil { + utilruntime.HandleError(err) + continue + } else if !exists { + continue + } + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](slice), logger, " because of APIBinding") + } + } + { + keys := sets.New[string]() + if path := logicalcluster.NewPath(binding.Spec.Reference.Export.Path); !path.Empty() { + pathKeys, err := c.globalApiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, path.Join(binding.Spec.Reference.Export.Name).String()) + if err != nil { + utilruntime.HandleError(err) + return + } + keys.Insert(pathKeys...) + } else { + clusterKeys, err := c.globalApiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, logicalcluster.From(binding).Path().Join(binding.Spec.Reference.Export.Name).String()) + if err != nil { + utilruntime.HandleError(err) + return + } + keys.Insert(clusterKeys...) + } + + for _, key := range sets.List[string](keys) { + slice, exists, err := c.globalApiExportEndpointSliceClusterInformer.Informer().GetIndexer().GetByKey(key) + if err != nil { + utilruntime.HandleError(err) + continue + } else if !exists { + continue + } + c.enqueueAPIExportEndpointSlice(objOrTombstone[*apisv1alpha1.APIExportEndpointSlice](slice), logger, "because of APIBinding from cache") + } + } +} + +// enqueueAPIExportEndpointSlice enqueues an APIExportEndpointSlice. +func (c *controller) enqueueAPIExportEndpointSlice(obj *apisv1alpha1.APIExportEndpointSlice, logger logr.Logger, logSuffix string) { + key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(err) + return + } + + logger.V(4).Info(fmt.Sprintf("queueing APIExportEndpointSlice%s", logSuffix)) + c.queue.Add(key) +} + +// Start starts the controller, which stops when ctx.Done() is closed. +func (c *controller) Start(ctx context.Context, numThreads int) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + logger := logging.WithReconciler(klog.FromContext(ctx), ControllerName) + ctx = klog.NewContext(ctx, logger) + logger.Info("Starting controller") + defer logger.Info("Shutting down controller") + + for i := 0; i < numThreads; i++ { + go wait.UntilWithContext(ctx, c.startWorker, time.Second) + } + + <-ctx.Done() +} + +func (c *controller) startWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *controller) processNextWorkItem(ctx context.Context) bool { + // Wait until there is a new item in the working queue + k, quit := c.queue.Get() + if quit { + return false + } + key := k + + logger := logging.WithQueueKey(klog.FromContext(ctx), key) + ctx = klog.NewContext(ctx, logger) + logger.V(4).Info("processing key") + + // No matter what, tell the queue we're done with this key, to unblock + // other workers. + defer c.queue.Done(key) + + if requeue, err := c.process(ctx, key); err != nil { + utilruntime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", ControllerName, key, err)) + c.queue.AddRateLimited(key) + return true + } else if requeue { + // only requeue if we didn't error, but we still want to requeue + c.queue.Add(key) + return true + } + c.queue.Forget(key) + return true +} + +func (c *controller) process(ctx context.Context, key string) (bool, error) { + clusterName, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key) + if err != nil { + utilruntime.HandleError(err) + return false, nil + } + obj, err := c.getAPIExportEndpointSlice(clusterName.Path(), name) + if err != nil { + if errors.IsNotFound(err) { + return false, nil // object deleted before we handled it + } + return false, err + } + + obj = obj.DeepCopy() + + logger := logging.WithObject(klog.FromContext(ctx), obj) + ctx = klog.NewContext(ctx, logger) + + var errs []error + requeue, err := c.reconcile(ctx, obj) + if err != nil { + errs = append(errs, err) + } + + return requeue, utilerrors.NewAggregate(errs) +} + +// InstallIndexers adds the additional indexers that this controller requires to the informers. +func InstallIndexers( + globalAPIExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, + apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer, +) { + indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + indexers.AddIfNotPresentOrDie(globalAPIExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.APIExportEndpointSliceByAPIExport: indexers.IndexAPIExportEndpointSliceByAPIExport, + }) + indexers.AddIfNotPresentOrDie(globalAPIExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.APIExportEndpointSliceByAPIExport: indexers.IndexAPIExportEndpointSliceByAPIExport, + }) + indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport, + }) +} + +func objOrTombstone[T runtime.Object](obj any) T { + if t, ok := obj.(T); ok { + return t + } + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if t, ok := tombstone.Obj.(T); ok { + return t + } + + panic(fmt.Errorf("tombstone %T is not a %T", tombstone, new(T))) + } + + panic(fmt.Errorf("%T is not a %T", obj, new(T))) +} diff --git a/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_controller_test.go b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_controller_test.go new file mode 100644 index 00000000000..64cee84e798 --- /dev/null +++ b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_controller_test.go @@ -0,0 +1,351 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiexportendpointsliceurls + +import ( + "context" + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/kcp-dev/logicalcluster/v3" + "github.com/stretchr/testify/require" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + conditionsv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/apis/conditions/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" + apisv1alpha1apply "github.com/kcp-dev/kcp/sdk/client/applyconfiguration/apis/v1alpha1" +) + +func TestReconcile(t *testing.T) { + tests := map[string]struct { + input *apisv1alpha1.APIExportEndpointSlice + endpointsReconciler *endpointsReconciler + expectedConditions []*conditionsv1alpha1.Condition + expectedError error + }{ + "condition not ready": { + input: &apisv1alpha1.APIExportEndpointSlice{ + Status: apisv1alpha1.APIExportEndpointSliceStatus{ + Conditions: []conditionsv1alpha1.Condition{ + { + Type: apisv1alpha1.APIExportValid, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + endpointsReconciler: &endpointsReconciler{}, + }, + "empty selector": { + input: &apisv1alpha1.APIExportEndpointSlice{ + Spec: apisv1alpha1.APIExportEndpointSliceSpec{ + APIExport: apisv1alpha1.ExportBindingReference{ + Name: "my-export", + Path: "root:org:ws", + }, + }, + Status: apisv1alpha1.APIExportEndpointSliceStatus{ + Conditions: []conditionsv1alpha1.Condition{ + { + Type: apisv1alpha1.APIExportValid, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + endpointsReconciler: &endpointsReconciler{ + getMyShard: func() (*corev1alpha1.Shard, error) { + return &corev1alpha1.Shard{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shard1", + }, + }, nil + }, + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + return &apisv1alpha1.APIExport{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-export", + }, + }, nil + }, + }, + }, + "invalid selector": { + input: &apisv1alpha1.APIExportEndpointSlice{ + Status: apisv1alpha1.APIExportEndpointSliceStatus{ + ShardSelector: ",", + Conditions: []conditionsv1alpha1.Condition{ + { + Type: apisv1alpha1.APIExportValid, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + endpointsReconciler: &endpointsReconciler{}, + expectedError: errors.New("invalid selector: ,"), + }, + "error getting apiExport": { + input: &apisv1alpha1.APIExportEndpointSlice{ + Status: apisv1alpha1.APIExportEndpointSliceStatus{ + ShardSelector: "shared=foo", + Conditions: []conditionsv1alpha1.Condition{ + { + Type: apisv1alpha1.APIExportValid, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + endpointsReconciler: &endpointsReconciler{ + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + return nil, errors.New("lost in space") + }, + }, + expectedError: errors.New("lost in space"), + }, + "update endpoint - not my shard - no update": { + input: &apisv1alpha1.APIExportEndpointSlice{ + Spec: apisv1alpha1.APIExportEndpointSliceSpec{ + APIExport: apisv1alpha1.ExportBindingReference{ + Path: "root:org:ws", + Name: "my-export", + }, + }, + Status: apisv1alpha1.APIExportEndpointSliceStatus{ + ShardSelector: "shared=foo", + Conditions: []conditionsv1alpha1.Condition{ + { + Type: apisv1alpha1.APIExportValid, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + endpointsReconciler: &endpointsReconciler{ + shardName: "shard2", + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + return &apisv1alpha1.APIExport{}, nil + }, + getMyShard: func() (*corev1alpha1.Shard, error) { + return &corev1alpha1.Shard{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shard1", + }, + }, nil + }, + patchAPIExportEndpointSlice: func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error { + if len(patch.Status.APIExportEndpoints) != 1 && patch.Status.APIExportEndpoints[0].URL != ptr.To("") { + return errors.New("unexpected update") + } + return nil + }, + }, + }, + "my shard, no consumers": { + input: &apisv1alpha1.APIExportEndpointSlice{ + Spec: apisv1alpha1.APIExportEndpointSliceSpec{ + APIExport: apisv1alpha1.ExportBindingReference{ + Path: "root:org:ws", + Name: "my-export", + }, + }, + Status: apisv1alpha1.APIExportEndpointSliceStatus{ + ShardSelector: "shared=foo", + Conditions: []conditionsv1alpha1.Condition{ + { + Type: apisv1alpha1.APIExportValid, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + endpointsReconciler: &endpointsReconciler{ + shardName: "shard1", + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + return &apisv1alpha1.APIExport{}, nil + }, + getMyShard: func() (*corev1alpha1.Shard, error) { + return &corev1alpha1.Shard{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shard1", + }, + Spec: corev1alpha1.ShardSpec{ + VirtualWorkspaceURL: "https://server-1.kcp.dev/", + }, + }, nil + }, + listAPIBindingsByAPIExport: func(apiexport *apisv1alpha1.APIExport) ([]*apisv1alpha1.APIBinding, error) { + return nil, nil + }, + patchAPIExportEndpointSlice: func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error { + if patch.Status.APIExportEndpoints != nil { + return errors.New("unexpected update") + } + return nil + }, + }, + }, + "my shard, consumer went away, remove url": { + input: &apisv1alpha1.APIExportEndpointSlice{ + Spec: apisv1alpha1.APIExportEndpointSliceSpec{ + APIExport: apisv1alpha1.ExportBindingReference{ + Path: "root:org:ws", + Name: "my-export", + }, + }, + Status: apisv1alpha1.APIExportEndpointSliceStatus{ + ShardSelector: "shared=foo", + Conditions: []conditionsv1alpha1.Condition{ + { + Type: apisv1alpha1.APIExportValid, + Status: corev1.ConditionTrue, + }, + }, + APIExportEndpoints: []apisv1alpha1.APIExportEndpoint{ + { + URL: "https://server-1.kcp.dev/who-took-the-cookie-from-the-cookie-jar", + }, + }, + }, + }, + endpointsReconciler: &endpointsReconciler{ + shardName: "shard1", + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + return &apisv1alpha1.APIExport{}, nil + }, + getMyShard: func() (*corev1alpha1.Shard, error) { + return &corev1alpha1.Shard{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shard1", + }, + Spec: corev1alpha1.ShardSpec{ + VirtualWorkspaceURL: "https://server-1.kcp.dev/", + }, + }, nil + }, + listAPIBindingsByAPIExport: func(apiexport *apisv1alpha1.APIExport) ([]*apisv1alpha1.APIBinding, error) { + return nil, nil + }, + patchAPIExportEndpointSlice: func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error { + if patch.Status.APIExportEndpoints != nil { + return errors.New("unexpected update") + } + return nil + }, + }, + }, + "my shard, consumer exists, add url": { + input: &apisv1alpha1.APIExportEndpointSlice{ + Spec: apisv1alpha1.APIExportEndpointSliceSpec{ + APIExport: apisv1alpha1.ExportBindingReference{ + Path: "root:org:ws", + Name: "my-export", + }, + }, + Status: apisv1alpha1.APIExportEndpointSliceStatus{ + ShardSelector: "shared=foo", + Conditions: []conditionsv1alpha1.Condition{ + { + Type: apisv1alpha1.APIExportValid, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + endpointsReconciler: &endpointsReconciler{ + shardName: "shard1", + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + return &apisv1alpha1.APIExport{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-export", + }, + }, nil + }, + getMyShard: func() (*corev1alpha1.Shard, error) { + return &corev1alpha1.Shard{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shard1", + }, + Spec: corev1alpha1.ShardSpec{ + VirtualWorkspaceURL: "https://server-1.kcp.dev/", + }, + }, nil + }, + listAPIBindingsByAPIExport: func(apiexport *apisv1alpha1.APIExport) ([]*apisv1alpha1.APIBinding, error) { + return []*apisv1alpha1.APIBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-binding", + }, + }, + }, nil + }, + patchAPIExportEndpointSlice: func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error { + if len(patch.Status.APIExportEndpoints) != 1 { + t.Fatalf("unexpected update: %v", patch) + } + url := ptr.Deref(patch.Status.APIExportEndpoints[0].URL, "") + if url != "https://server-1.kcp.dev/services/apiexport/my-export" { + t.Fatalf("unexpected update: %v", patch) + } + return nil + }, + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + c := &controller{ + getMyShard: tc.endpointsReconciler.getMyShard, + getAPIExport: tc.endpointsReconciler.getAPIExport, + listAPIBindingsByAPIExport: tc.endpointsReconciler.listAPIBindingsByAPIExport, + patchAPIExportEndpointSlice: tc.endpointsReconciler.patchAPIExportEndpointSlice, + shardName: tc.endpointsReconciler.shardName, + } + input := tc.input.DeepCopy() + _, err := c.reconcile(context.Background(), input) + if tc.expectedError != nil { + require.Error(t, err, tc.expectedError.Error()) + } else { + require.NoError(t, err, "expected no error") + } + + for _, expectedCondition := range tc.expectedConditions { + requireConditionMatches(t, input, expectedCondition) + } + }) + } +} + +// requireConditionMatches looks for a condition matching c in g. LastTransitionTime and Message +// are not compared. +func requireConditionMatches(t *testing.T, g conditions.Getter, c *conditionsv1alpha1.Condition) { + t.Helper() + actual := conditions.Get(g, c.Type) + require.NotNil(t, actual, "missing condition %q", c.Type) + actual.LastTransitionTime = c.LastTransitionTime + actual.Message = c.Message + require.Empty(t, cmp.Diff(actual, c)) +} diff --git a/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_reconcile.go b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_reconcile.go new file mode 100644 index 00000000000..7b5607a044c --- /dev/null +++ b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_reconcile.go @@ -0,0 +1,180 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiexportendpointsliceurls + +import ( + "context" + "net/url" + "path" + + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" + + virtualworkspacesoptions "github.com/kcp-dev/kcp/cmd/virtual-workspaces/options" + "github.com/kcp-dev/kcp/pkg/logging" + apiexportbuilder "github.com/kcp-dev/kcp/pkg/virtual/apiexport/builder" + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" + apisv1alpha1apply "github.com/kcp-dev/kcp/sdk/client/applyconfiguration/apis/v1alpha1" +) + +type endpointsReconciler struct { + getMyShard func() (*corev1alpha1.Shard, error) + getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) + listAPIBindingsByAPIExport func(apiexport *apisv1alpha1.APIExport) ([]*apisv1alpha1.APIBinding, error) + patchAPIExportEndpointSlice func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error + shardName string +} + +type result struct { + url string + remove bool +} + +func (c *controller) reconcile(ctx context.Context, apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice) (bool, error) { + r := &endpointsReconciler{ + getMyShard: c.getMyShard, + getAPIExport: c.getAPIExport, + listAPIBindingsByAPIExport: c.listAPIBindingsByAPIExport, + shardName: c.shardName, + patchAPIExportEndpointSlice: c.patchAPIExportEndpointSlice, + } + + return r.reconcile(ctx, apiExportEndpointSlice) +} + +func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice) (bool, error) { + // we only continue if all conditions are set to true. As this is more of the secondary controller, + // we don't want to do anything if the primary controller is not ready. + for _, condition := range apiExportEndpointSlice.Status.Conditions { + if !conditions.IsTrue(apiExportEndpointSlice, condition.Type) { + return false, nil + } + } + + selector, err := labels.Parse(apiExportEndpointSlice.Status.ShardSelector) + if err != nil { + return false, err + } + + apiExportPath := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) + if apiExportPath.Empty() { + apiExportPath = logicalcluster.From(apiExportEndpointSlice).Path() + } + apiExport, err := r.getAPIExport(apiExportPath, apiExportEndpointSlice.Spec.APIExport.Name) + if err != nil { + return true, err + } + + shard, err := r.getMyShard() + if err != nil { + return true, err + } + + rs, err := r.updateEndpoints(ctx, apiExportEndpointSlice, apiExport, shard, selector) + if err != nil { + return true, err + } + if rs == nil { // no change, nothing to do. + return false, nil + } + + // Patch the object + patch := apisv1alpha1apply.APIExportEndpointSlice(apiExportEndpointSlice.Name) + if rs.remove { + patch.WithStatus(apisv1alpha1apply.APIExportEndpointSliceStatus()) + } else { + patch.WithStatus(apisv1alpha1apply.APIExportEndpointSliceStatus(). + WithAPIExportEndpoints(apisv1alpha1apply.APIExportEndpoint().WithURL(rs.url))) + } + cluster := logicalcluster.From(apiExportEndpointSlice) + err = r.patchAPIExportEndpointSlice(ctx, cluster.Path(), patch) + if err != nil { + return true, err + } + return false, nil +} + +func (r *endpointsReconciler) updateEndpoints(ctx context.Context, + apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice, + apiExport *apisv1alpha1.APIExport, + shard *corev1alpha1.Shard, + selector labels.Selector, +) (*result, error) { + logger := klog.FromContext(ctx) + var rs result + if shard.Spec.VirtualWorkspaceURL == "" { + return nil, nil + } + + // Check if we have local consumers + bindings, err := r.listAPIBindingsByAPIExport(apiExport) + if err != nil { + return nil, err + } + + if selector.Matches(labels.Set(shard.Labels)) { // we are in partition + if len(bindings) == 0 { // we have no consumers + return &result{ + remove: true, + }, nil + } // This falls through to the next block to update the URL. + } else { // we are not in partition + if len(bindings) == 0 { // we have no consumers, we can remove the endpoint + return &result{ + remove: true, + }, nil + } else { + // we not in partition, but we have consumers. + // Do nothing, as we are on the way to be orphaned. + // If we remove url, where is chance we gonna kill controllers on their way out. + return nil, nil + } + } + + u, err := url.Parse(shard.Spec.VirtualWorkspaceURL) + if err != nil { + // Should never happen + logger = logging.WithObject(logger, shard) + logger.Error( + err, "error parsing shard.spec.virtualWorkspaceURL", + "VirtualWorkspaceURL", shard.Spec.VirtualWorkspaceURL, + ) + return nil, nil + } + + u.Path = path.Join( + u.Path, + virtualworkspacesoptions.DefaultRootPathPrefix, + apiexportbuilder.VirtualWorkspaceName, + logicalcluster.From(apiExport).String(), + apiExport.Name, + ) + + rs.url = u.String() + + for _, u := range apiExportEndpointSlice.Status.APIExportEndpoints { + if u.URL == rs.url { + return nil, nil + } + } + + return &rs, nil +} diff --git a/pkg/reconciler/cache/replication/replication_controller.go b/pkg/reconciler/cache/replication/replication_controller.go index c43e9801b67..0e133883f7c 100644 --- a/pkg/reconciler/cache/replication/replication_controller.go +++ b/pkg/reconciler/cache/replication/replication_controller.go @@ -209,7 +209,6 @@ func InstallIndexers( Local: localKcpInformers.Apis().V1alpha1().APIExportEndpointSlices().Informer(), Global: globalKcpInformers.Apis().V1alpha1().APIExportEndpointSlices().Informer(), }, - apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"): { Kind: "APIResourceSchema", Local: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(), diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index ac6ee296776..4127a655a30 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -63,6 +63,7 @@ import ( "github.com/kcp-dev/kcp/pkg/reconciler/apis/apibindingdeletion" "github.com/kcp-dev/kcp/pkg/reconciler/apis/apiexport" "github.com/kcp-dev/kcp/pkg/reconciler/apis/apiexportendpointslice" + "github.com/kcp-dev/kcp/pkg/reconciler/apis/apiexportendpointsliceurls" "github.com/kcp-dev/kcp/pkg/reconciler/apis/crdcleanup" "github.com/kcp-dev/kcp/pkg/reconciler/apis/extraannotationsync" "github.com/kcp-dev/kcp/pkg/reconciler/apis/identitycache" @@ -1266,7 +1267,7 @@ func (s *Server) installTenancyReplicateClusterRoleBindingControllers(ctx contex }) } -func (s *Server) installAPIExportEndpointSliceController(ctx context.Context, config *rest.Config) error { +func (s *Server) installAPIExportEndpointSliceController(_ context.Context, config *rest.Config) error { config = rest.CopyConfig(config) config = rest.AddUserAgent(config, apiexportendpointslice.ControllerName) @@ -1278,7 +1279,6 @@ func (s *Server) installAPIExportEndpointSliceController(ctx context.Context, co c, err := apiexportendpointslice.NewController( s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), // Shards and APIExports get retrieved from cache server - s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions(), kcpClusterClient, @@ -1289,12 +1289,51 @@ func (s *Server) installAPIExportEndpointSliceController(ctx context.Context, co return s.registerController(&controllerWrapper{ Name: apiexportendpointslice.ControllerName, + Wait: func(ctx context.Context, s *Server) error { + return wait.PollUntilContextCancel(ctx, waitPollInterval, true, func(ctx context.Context) (bool, error) { + return s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().HasSynced() && + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced() && + s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions().Informer().HasSynced(), nil + }) + }, + Runner: func(ctx context.Context) { + c.Start(ctx, 2) + }, + }) +} + +func (s *Server) installAPIExportEndpointSliceURLsController(_ context.Context, _ *rest.Config) error { + config := rest.CopyConfig(s.ExternalLogicalClusterAdminConfig) + config = rest.AddUserAgent(config, apiexportendpointsliceurls.ControllerName) + + kcpClusterClient, err := kcpclientset.NewForConfig(config) + if err != nil { + return err + } + + c, err := apiexportendpointsliceurls.NewController( + s.Options.Extra.ShardName, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + // Shards and APIExports get retrieved from cache server + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), + s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + kcpClusterClient, + ) + if err != nil { + return err + } + + return s.registerController(&controllerWrapper{ + Name: apiexportendpointsliceurls.ControllerName, Wait: func(ctx context.Context, s *Server) error { return wait.PollUntilContextCancel(ctx, waitPollInterval, true, func(ctx context.Context) (bool, error) { return s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards().Informer().HasSynced() && s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().HasSynced() && + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().HasSynced() && s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced() && - s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions().Informer().HasSynced(), nil + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().HasSynced(), nil }) }, Runner: func(ctx context.Context) { @@ -1551,6 +1590,11 @@ func (s *Server) addIndexersToInformers(_ context.Context) map[schema.GroupVersi s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), ) + apiexportendpointsliceurls.InstallIndexers( + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + ) labelclusterrolebindings.InstallIndexers( s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), ) diff --git a/pkg/server/server.go b/pkg/server/server.go index 2363e03fc4b..6004206ffb8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -314,6 +314,9 @@ func (s *Server) installControllers(ctx context.Context, controllerConfig *rest. if err := s.installAPIExportEndpointSliceController(ctx, controllerConfig); err != nil { return err } + if err := s.installAPIExportEndpointSliceURLsController(ctx, controllerConfig); err != nil { + return err + } } if s.Options.Controllers.EnableAll || enabled.Has("apibinder") { diff --git a/sdk/apis/apis/v1alpha1/types_apiexportendpointslice.go b/sdk/apis/apis/v1alpha1/types_apiexportendpointslice.go index f8f90571042..5e80af8c72c 100644 --- a/sdk/apis/apis/v1alpha1/types_apiexportendpointslice.go +++ b/sdk/apis/apis/v1alpha1/types_apiexportendpointslice.go @@ -73,10 +73,19 @@ type APIExportEndpointSliceStatus struct { // conditions is a list of conditions that apply to the APIExportEndpointSlice. Conditions conditionsv1alpha1.Conditions `json:"conditions,omitempty"` + // endpoints contains all the URLs of the APIExport service. + // // +optional + // +listType=map + // +listMapKey=url + APIExportEndpoints []APIExportEndpoint `json:"endpoints"` - // endpoints contains all the URLs of the APIExport service. - APIExportEndpoints []APIExportEndpoint `json:"endpoints,omitempty"` + // +optional + + // shardSelector is the selector used to filter the shards. It is used to filter the shards + // when determining partition scope when deriving the endpoints. This is set by owning shard, + // and is used by follower shards to determine if its inscope or not. + ShardSelector string `json:"shardSelector,omitempty"` } // Using a struct provides an extension point @@ -106,6 +115,8 @@ const ( // PartitionValid is a condition for APIExportEndpointSlice that reflects the validity of the referenced Partition. PartitionValid conditionsv1alpha1.ConditionType = "PartitionValid" + // EndpointURLsReady is a condition for APIExportEndpointSlice that reflects the readiness of the URLs. + // DEPRECATED: This condition is deprecated and will be removed in a future release. APIExportEndpointSliceURLsReady conditionsv1alpha1.ConditionType = "EndpointURLsReady" // PartitionInvalidReferenceReason is a reason for the PartitionValid condition of APIExportEndpointSlice that the diff --git a/sdk/client/applyconfiguration/apis/v1alpha1/apiexportendpointslicestatus.go b/sdk/client/applyconfiguration/apis/v1alpha1/apiexportendpointslicestatus.go index a6b7d4f0f34..ad44fc9a310 100644 --- a/sdk/client/applyconfiguration/apis/v1alpha1/apiexportendpointslicestatus.go +++ b/sdk/client/applyconfiguration/apis/v1alpha1/apiexportendpointslicestatus.go @@ -27,6 +27,7 @@ import ( type APIExportEndpointSliceStatusApplyConfiguration struct { Conditions *v1alpha1.Conditions `json:"conditions,omitempty"` APIExportEndpoints []APIExportEndpointApplyConfiguration `json:"endpoints,omitempty"` + ShardSelector *string `json:"shardSelector,omitempty"` } // APIExportEndpointSliceStatusApplyConfiguration constructs a declarative configuration of the APIExportEndpointSliceStatus type for use with @@ -55,3 +56,11 @@ func (b *APIExportEndpointSliceStatusApplyConfiguration) WithAPIExportEndpoints( } return b } + +// WithShardSelector sets the ShardSelector field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ShardSelector field is set to the value of the last call. +func (b *APIExportEndpointSliceStatusApplyConfiguration) WithShardSelector(value string) *APIExportEndpointSliceStatusApplyConfiguration { + b.ShardSelector = &value + return b +} diff --git a/test/e2e/reconciler/apiexportendpointslice/apiexportendpointslice_test.go b/test/e2e/reconciler/apiexportendpointslice/apiexportendpointslice_test.go index 82b27d95d00..4349ff15d20 100644 --- a/test/e2e/reconciler/apiexportendpointslice/apiexportendpointslice_test.go +++ b/test/e2e/reconciler/apiexportendpointslice/apiexportendpointslice_test.go @@ -18,27 +18,36 @@ package apiexportendpointslice import ( "context" + "embed" "fmt" "testing" "time" "github.com/davecgh/go-spew/spew" + kcpdynamic "github.com/kcp-dev/client-go/dynamic" + "github.com/kcp-dev/logicalcluster/v3" "github.com/stretchr/testify/require" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/restmapper" "k8s.io/client-go/util/retry" + "github.com/kcp-dev/kcp/config/helpers" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" "github.com/kcp-dev/kcp/sdk/apis/core" - corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" topologyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/topology/v1alpha1" kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" "github.com/kcp-dev/kcp/test/e2e/framework" ) +//go:embed *.yaml +var testFiles embed.FS + func TestAPIExportEndpointSliceWithPartition(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) @@ -116,7 +125,7 @@ func TestAPIExportEndpointSliceWithPartition(t *testing.T) { slice, err = kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) require.NoError(t, err) - if conditions.IsTrue(slice, apisv1alpha1.APIExportValid) && conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceURLsReady) { + if conditions.IsTrue(slice, apisv1alpha1.APIExportValid) { return true, "" } @@ -143,8 +152,6 @@ func TestAPIExportEndpointSliceWithPartition(t *testing.T) { return false, spew.Sdump(slice.Status.Conditions) }, wait.ForeverTestTimeout, 100*time.Millisecond, "expected missing Partition") require.True(t, len(slice.Status.APIExportEndpoints) == 0, "not expecting any endpoint") - require.True(t, conditions.IsFalse(slice, apisv1alpha1.APIExportEndpointSliceURLsReady), "expecting URLs not ready condition") - t.Logf("Creating the missing Partition") partitionClient := kcpClusterClient.TopologyV1alpha1().Partitions() _, err = partitionClient.Cluster(partitionClusterPath).Create(ctx, partition, metav1.CreateOptions{}) @@ -162,208 +169,242 @@ func TestAPIExportEndpointSliceWithPartition(t *testing.T) { t.Logf("Checking that no endpoint has been populated") require.True(t, len(slice.Status.APIExportEndpoints) == 0, "not expecting any endpoint") - require.True(t, conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceURLsReady), "expecting the URLs ready condition") } -func TestAPIExportEndpointSliceWithPartitionPrivate(t *testing.T) { +func TestAPIBindingEndpointSlicesSharded(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - server := framework.PrivateKcpServer(t) - // Create Organization and Workspaces - orgPath, _ := framework.NewOrganizationFixture(t, server) - exportClusterPath, _ := framework.NewWorkspaceFixture(t, server, orgPath) - partitionClusterPath, _ := framework.NewWorkspaceFixture(t, server, orgPath) + framework.Suite(t, "control-plane") - cfg := server.BaseConfig(t) + server := framework.SharedKcpServer(t) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) - var err error - kcpClusterClient, err := kcpclientset.NewForConfig(cfg) - require.NoError(t, err, "failed to construct kcp cluster client for server") + cfg := server.BaseConfig(t) - export := &apisv1alpha1.APIExport{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-export", - }, - } + t.Logf("Check if we can access shards") + var shards *v1alpha1.ShardList + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - slice := &apisv1alpha1.APIExportEndpointSlice{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "my-slice", - }, - Spec: apisv1alpha1.APIExportEndpointSliceSpec{ - APIExport: apisv1alpha1.ExportBindingReference{ - Path: exportClusterPath.String(), - Name: export.Name, - }, - }, - } + shards, err = kcpClusterClient.Cluster(core.RootCluster.Path()).CoreV1alpha1().Shards().List(ctx, metav1.ListOptions{}) + require.NoError(t, err, "failed to list shards") - partition := &topologyv1alpha1.Partition{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-partition", - }, - Spec: topologyv1alpha1.PartitionSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "region": "apiexportendpointslice-test-region", - }, - }, - }, + if len(shards.Items) < 2 { + t.Skipf("Need at least 2 shards to run this test, got %d", len(shards.Items)) + return + } } - t.Logf("Creating the APIExport") - exportClient := kcpClusterClient.ApisV1alpha1().APIExports() - _, err = exportClient.Cluster(exportClusterPath).Create(ctx, export, metav1.CreateOptions{}) - require.NoError(t, err, "error creating APIExport") + t.Logf("Setup provider workspace") + var orgPath, providerPath logicalcluster.Path + { + orgPath, _ = framework.NewOrganizationFixture(t, server) + providerPath, _ = framework.NewWorkspaceFixture(t, server, orgPath, framework.WithName("service-provider")) - t.Logf("Creating the APIExportEndpointSlice") - sliceClient := kcpClusterClient.ApisV1alpha1().APIExportEndpointSlices() - // allow some time for APIExport to be synced onto the cache server - framework.Eventually(t, func() (bool, string) { - slice, err = sliceClient.Cluster(partitionClusterPath).Create(ctx, slice, metav1.CreateOptions{}) - if err != nil { - return false, err.Error() - } - return true, "" - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expected successful creation of APIExportEndpointSlice") - sliceName := slice.Name + serviceProviderClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + dynamicClusterClient, err := kcpdynamic.NewForConfig(cfg) + require.NoError(t, err, "failed to construct dynamic cluster client for server") - framework.Eventually(t, func() (bool, string) { - slice, err = kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) + mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(serviceProviderClient.Cluster(providerPath).Discovery())) + err = helpers.CreateResourceFromFS(ctx, dynamicClusterClient.Cluster(providerPath), mapper, nil, "apiresourceschema_cowboys.yaml", testFiles) require.NoError(t, err) - if conditions.IsTrue(slice, apisv1alpha1.APIExportValid) { - return true, "" + + t.Logf("Create an APIExport today-cowboys in %q", providerPath) + cowboysAPIExport := &apisv1alpha1.APIExport{ + ObjectMeta: metav1.ObjectMeta{ + Name: "today-cowboys", + }, + Spec: apisv1alpha1.APIExportSpec{ + LatestResourceSchemas: []string{"today.cowboys.wildwest.dev"}, + }, } - return false, spew.Sdump(slice.Status.Conditions) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expected valid APIExport") - require.True(t, conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceURLsReady), "expecting URLs ready condition") + _, err = serviceProviderClient.Cluster(providerPath).ApisV1alpha1().APIExports().Create(ctx, cowboysAPIExport, metav1.CreateOptions{}) + require.NoError(t, err) + } - t.Logf("Creating the Partition") - partitionClient := kcpClusterClient.TopologyV1alpha1().Partitions() - _, err = partitionClient.Cluster(partitionClusterPath).Create(ctx, partition, metav1.CreateOptions{}) - require.NoError(t, err, "error creating Partition") + t.Logf("Create a consumer workspaces - one per shard") + var bindShardname string + { + for _, shard := range shards.Items { + if bindShardname == "" { // bind to the first shard only + bindShardname = shard.Name + } + if bindShardname != shard.Name { + continue + } + consumerPath, _ := framework.NewWorkspaceFixture(t, server, orgPath, framework.WithName("consumer-bound-against-%s", shard.Name), framework.WithShard(shard.Name)) + + t.Logf("Create an APIBinding in %q that points to the today-cowboys export from %q", consumerPath, providerPath) + apiBinding := &apisv1alpha1.APIBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cowboys", + }, + Spec: apisv1alpha1.APIBindingSpec{ + Reference: apisv1alpha1.BindingReference{ + Export: &apisv1alpha1.ExportBindingReference{ + Path: providerPath.String(), + Name: "today-cowboys", + }, + }, + }, + } - t.Logf("Adding a Partition to the APIExportEndpointSlice") - slice.Spec.Partition = partition.Name - _, err = kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Update(ctx, slice, metav1.UpdateOptions{}) - require.NoError(t, err, "error updating APIExportEndpointSlice") + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) - require.NoError(t, err) - if conditions.IsTrue(s, apisv1alpha1.PartitionValid) { - return true, "" + framework.Eventually(t, func() (bool, string) { + _, err = kcpClusterClient.Cluster(consumerPath).ApisV1alpha1().APIBindings().Create(ctx, apiBinding, metav1.CreateOptions{}) + return err == nil, fmt.Sprintf("Error creating APIBinding: %v", err) + }, wait.ForeverTestTimeout, time.Millisecond*100) } - return false, spew.Sdump(s.Status.Conditions) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expected valid Partition") - require.True(t, conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceURLsReady), "expecting URLs ready condition") + } - t.Logf("Checking that no endpoint has been populated") - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) + // TODO(mjudeikis): This will be deprecated when we deperecate APIExport urls. + t.Logf("Check that APIExport has 2 virtual workspaces") + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + + apiExport, err := kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExports().Get(ctx, "today-cowboys", metav1.GetOptions{}) require.NoError(t, err) - if len(s.Status.APIExportEndpoints) == 0 { - return true, "" - } - return false, fmt.Sprintf("expected 0 endpoints, but got: %#v", s.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "not expecting any endpoint") - // Endpoint tests require the edition of shards. - // These tests are run on a private cluster to avoid side effects on other e2e tests. - // They require the resources previously created: APIExport, APIExportEndpointSlice, etc. - shard := &corev1alpha1.Shard{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-shard", - Labels: map[string]string{ - "region": "apiexportendpointslice-test-region", - }, - }, - Spec: corev1alpha1.ShardSpec{ - BaseURL: "https://base.kcp.test.dev", - }, + //nolint:staticcheck // SA1019 VirtualWorkspaces is deprecated but not removed yet + require.Len(t, apiExport.Status.VirtualWorkspaces, 2) } - t.Logf("Creating a shard in the region") - shardClient := kcpClusterClient.CoreV1alpha1().Shards() - shard, err = shardClient.Cluster(core.RootCluster.Path()).Create(ctx, shard, metav1.CreateOptions{}) - require.NoError(t, err, "error creating Shard") + t.Logf("Create a topology PartitionSet for the providers") + var partition *topologyv1alpha1.Partition + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - framework.Eventually(t, func() (bool, string) { - slice, err = kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) + _, err = kcpClusterClient.Cluster(providerPath).TopologyV1alpha1().PartitionSets().Create(ctx, &topologyv1alpha1.PartitionSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cowboys", + }, + Spec: topologyv1alpha1.PartitionSetSpec{ + ShardSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "shared": "true", + }, + }, + }, + }, metav1.CreateOptions{}) require.NoError(t, err) - if len(slice.Status.APIExportEndpoints) == 1 { - return true, "" - } - return false, fmt.Sprintf("expected 1 endpoint, but got: %#v", slice.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting a single endpoint") - require.Contains(t, slice.Status.APIExportEndpoints[0].URL, export.Name) - t.Logf("Updating the previously created shard") - shard.Labels["region"] = "doesnotexist" - shard, err = shardClient.Cluster(core.RootCluster.Path()).Update(ctx, shard, metav1.UpdateOptions{}) - require.NoError(t, err, "error updating Shard") + // Partition should be created + framework.Eventually(t, func() (bool, string) { + partitions, err := kcpClusterClient.Cluster(providerPath).TopologyV1alpha1().Partitions().List(ctx, metav1.ListOptions{}) + if err == nil && len(partitions.Items) == 1 { + partition = &partitions.Items[0] + return true, "" + } + return false, fmt.Sprintf("Error listing partitions: %v", err) + }, wait.ForeverTestTimeout, time.Millisecond*500) + } - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) + t.Logf("Create APIExportEndpointSlice for consumers") + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + + _, err = kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExportEndpointSlices().Create(ctx, &apisv1alpha1.APIExportEndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shared-cowboys", + }, + Spec: apisv1alpha1.APIExportEndpointSliceSpec{ + Partition: partition.Name, + APIExport: apisv1alpha1.ExportBindingReference{ + Path: providerPath.String(), + Name: "today-cowboys", + }, + }, + }, metav1.CreateOptions{}) require.NoError(t, err) - if len(s.Status.APIExportEndpoints) == 0 { - return true, "" - } - return false, fmt.Sprintf("expected 0 endpoints, but got: %#v", s.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting no endpoint") - t.Logf("Setting back the correct label") - shard.Labels["region"] = "apiexportendpointslice-test-region" - shard, err = shardClient.Cluster(core.RootCluster.Path()).Update(ctx, shard, metav1.UpdateOptions{}) - require.NoError(t, err, "error updating Shard") + // we should have 1 APIExportEndpointSlice with 1 APIExportEndpoint as we bound only once. + framework.Eventually(t, func() (bool, string) { + slice, err := kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, "shared-cowboys", metav1.GetOptions{}) + if len(slice.Status.APIExportEndpoints) == 1 { + return true, "" + } + return false, fmt.Sprintf("APIExportEndpointSlice has %d endpoints: %v", len(slice.Status.APIExportEndpoints), err) + }, wait.ForeverTestTimeout*50, time.Millisecond*500) + } - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) - require.NoError(t, err) - if len(s.Status.APIExportEndpoints) == 1 { - return true, "" - } - return false, fmt.Sprintf("expected 1 endpoint, but got: %#v", s.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting a single endpoint") + t.Logf("Create consumer on second shard and observe APIExportEndpointSlice to have second url added") + var consumerPath logicalcluster.Path + { + for _, shard := range shards.Items { + if bindShardname == shard.Name { + continue + } + consumerPath, _ = framework.NewWorkspaceFixture(t, server, orgPath, framework.WithName("consumer-bound-against-%s", shard.Name), framework.WithShard(shard.Name)) + + t.Logf("Create an APIBinding in %q that points to the today-cowboys export from %q", consumerPath, providerPath) + apiBinding := &apisv1alpha1.APIBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cowboys", + }, + Spec: apisv1alpha1.APIBindingSpec{ + Reference: apisv1alpha1.BindingReference{ + Export: &apisv1alpha1.ExportBindingReference{ + Path: providerPath.String(), + Name: "today-cowboys", + }, + }, + }, + } - t.Logf("Deleting the shard") - err = shardClient.Cluster(core.RootCluster.Path()).Delete(ctx, shard.Name, metav1.DeleteOptions{}) - require.NoError(t, err, "error deleting Shard") + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) - require.NoError(t, err) - if len(s.Status.APIExportEndpoints) == 0 { - return true, "" + framework.Eventually(t, func() (bool, string) { + _, err = kcpClusterClient.Cluster(consumerPath).ApisV1alpha1().APIBindings().Create(ctx, apiBinding, metav1.CreateOptions{}) + return err == nil, fmt.Sprintf("Error creating APIBinding: %v", err) + }, wait.ForeverTestTimeout, time.Millisecond*500) } - return false, fmt.Sprintf("expected 0 endpoints, but got: %#v", s.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting no endpoint") + } - t.Logf("Creating a slice without partition") - sliceWithAll := &apisv1alpha1.APIExportEndpointSlice{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "my-slice-without-partition", - }, - Spec: apisv1alpha1.APIExportEndpointSliceSpec{ - APIExport: apisv1alpha1.ExportBindingReference{ - Path: exportClusterPath.String(), - Name: slice.Spec.APIExport.Name, - }, - }, + t.Logf("Check that APIExportEndpointSlices has 2 virtual workspaces") + { + framework.Eventually(t, func() (bool, string) { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + + slice, err := kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, "shared-cowboys", metav1.GetOptions{}) + if len(slice.Status.APIExportEndpoints) == 2 { + return true, "" + } + return false, fmt.Sprintf("APIExportEndpointSlice has %d endpoints: %v", len(slice.Status.APIExportEndpoints), err) + }, wait.ForeverTestTimeout*50, time.Millisecond*500) } - sliceWithAll, err = sliceClient.Cluster(partitionClusterPath).Create(ctx, sliceWithAll, metav1.CreateOptions{}) - require.NoError(t, err, "error creating APIExportEndpointSlice") - sliceWithAllName := sliceWithAll.Name + t.Logf("Delete consumer on second shard and observe APIExportEndpointSlice to have second url removed") + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - framework.Eventually(t, func() (bool, string) { - sliceWithAll, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceWithAllName, metav1.GetOptions{}) - require.NoError(t, err) - if len(sliceWithAll.Status.APIExportEndpoints) == 1 { - return true, "" - } - return false, fmt.Sprintf("expected 1 endpoint, but got: %#v", sliceWithAll.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting a single endpoint for the root shard, got %d", len(sliceWithAll.Status.APIExportEndpoints)) + framework.Eventually(t, func() (bool, string) { + err := kcpClusterClient.Cluster(consumerPath).ApisV1alpha1().APIBindings().Delete(ctx, "cowboys", metav1.DeleteOptions{}) + return err == nil, fmt.Sprintf("Error deleting APIBinding: %v", err) + }, wait.ForeverTestTimeout, time.Millisecond*500) + } + + t.Logf("Check that APIExportEndpointSlices has 1 virtual workspaces") + { + framework.Eventually(t, func() (bool, string) { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + + slice, err := kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, "shared-cowboys", metav1.GetOptions{}) + if len(slice.Status.APIExportEndpoints) == 1 { + return true, "" + } + return false, fmt.Sprintf("APIExportEndpointSlice has %d endpoints: %v", len(slice.Status.APIExportEndpoints), err) + }, wait.ForeverTestTimeout*50, time.Millisecond*500) + } } diff --git a/test/e2e/reconciler/apiexportendpointslice/apiresourceschema_cowboys.yaml b/test/e2e/reconciler/apiexportendpointslice/apiresourceschema_cowboys.yaml new file mode 100644 index 00000000000..b65c76e0175 --- /dev/null +++ b/test/e2e/reconciler/apiexportendpointslice/apiresourceschema_cowboys.yaml @@ -0,0 +1,46 @@ +apiVersion: apis.kcp.io/v1alpha1 +kind: APIResourceSchema +metadata: + name: today.cowboys.wildwest.dev +spec: + group: wildwest.dev + names: + kind: Cowboy + listKind: CowboyList + plural: cowboys + singular: cowboy + scope: Namespaced + versions: + - name: v1alpha1 + schema: + description: Cowboy is part of the wild west + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: CowboySpec holds the desired state of the Cowboy. + properties: + intent: + type: string + type: object + status: + description: CowboyStatus communicates the observed state of the Cowboy. + properties: + result: + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} \ No newline at end of file