diff --git a/cmd/aggregated-apiserver/app/options/options.go b/cmd/aggregated-apiserver/app/options/options.go index 9af56845add1..3d33acc0688d 100644 --- a/cmd/aggregated-apiserver/app/options/options.go +++ b/cmd/aggregated-apiserver/app/options/options.go @@ -19,7 +19,6 @@ import ( genericfilters "k8s.io/apiserver/pkg/server/filters" genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" netutils "k8s.io/utils/net" @@ -91,10 +90,9 @@ func (o *Options) Run(ctx context.Context) error { restConfig := config.GenericConfig.ClientConfig restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst - kubeClientSet := kubernetes.NewForConfigOrDie(restConfig) secretLister := config.GenericConfig.SharedInformerFactory.Core().V1().Secrets().Lister() - server, err := config.Complete().New(kubeClientSet, secretLister) + server, err := config.Complete().New(restConfig, secretLister) if err != nil { return err } diff --git a/pkg/aggregatedapiserver/apiserver.go b/pkg/aggregatedapiserver/apiserver.go index 9772d0150e02..9d93d0ed86f9 100644 --- a/pkg/aggregatedapiserver/apiserver.go +++ b/pkg/aggregatedapiserver/apiserver.go @@ -4,8 +4,8 @@ import ( "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" - "k8s.io/client-go/kubernetes" listcorev1 "k8s.io/client-go/listers/core/v1" + restclient "k8s.io/client-go/rest" "k8s.io/klog/v2" clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" @@ -54,7 +54,7 @@ func (cfg *Config) Complete() CompletedConfig { return CompletedConfig{&c} } -func (c completedConfig) New(kubeClient kubernetes.Interface, secretLister listcorev1.SecretLister) (*APIServer, error) { +func (c completedConfig) New(restConfig *restclient.Config, secretLister listcorev1.SecretLister) (*APIServer, error) { genericServer, err := c.GenericConfig.New("aggregated-apiserver", genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err @@ -66,7 +66,7 @@ func (c completedConfig) New(kubeClient kubernetes.Interface, secretLister listc apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(clusterapis.GroupName, clusterscheme.Scheme, clusterscheme.ParameterCodec, clusterscheme.Codecs) - clusterStorage, err := clusterstorage.NewStorage(clusterscheme.Scheme, kubeClient, secretLister, c.GenericConfig.RESTOptionsGetter) + clusterStorage, err := clusterstorage.NewStorage(clusterscheme.Scheme, restConfig, secretLister, c.GenericConfig.RESTOptionsGetter) if err != nil { klog.Errorf("Unable to create REST storage for a resource due to %v, will die", err) return nil, err diff --git a/pkg/registry/cluster/storage/aggregate.go b/pkg/registry/cluster/storage/aggregate.go new file mode 100644 index 000000000000..da278349e301 --- /dev/null +++ b/pkg/registry/cluster/storage/aggregate.go @@ -0,0 +1,371 @@ +package storage + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "path" + "strconv" + "strings" + + authenticationv1 "k8s.io/api/authentication/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/request" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + restclient "k8s.io/client-go/rest" + "k8s.io/klog/v2" + + clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" + "github.com/karmada-io/karmada/pkg/util/lifted" + "github.com/karmada-io/karmada/pkg/util/proxy" +) + +func karmadaResourceLocation(restConfig *restclient.Config) (*url.URL, http.RoundTripper, error) { + location, err := url.Parse(restConfig.Host) + if err != nil { + return nil, nil, err + } + + transport, err := restclient.TransportFor(restConfig) + if err != nil { + return nil, nil, err + } + + return location, transport, nil +} + +type requestContext struct { + clusterName string + location *url.URL + transport http.RoundTripper + impersonateToken string + responseBody []byte + responseHeader http.Header +} + +// connectAllClusters returns a handler to proxy all clusters. +// Aggregates resources returned by all cluster. If resource names conflict, +// a conflict error is returned during handler processing. +func (r *ProxyREST) connectAllClusters( + ctx context.Context, + proxyPath string, + secretGetter func(context.Context, string, string) (*corev1.Secret, error), + responder rest.Responder, +) (http.Handler, error) { + klog.V(4).Infof("Request resources with the proxyPath(%s)", proxyPath) + proxyRequestInfo := lifted.NewRequestInfo(&http.Request{ + URL: &url.URL{Path: proxyPath}, + }) + + // 1. for no resource request, proxy the request to karmada apiserver. + if !proxyRequestInfo.IsResourceRequest { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + location := *r.karmadaLocation + location.Path = path.Join(location.Path, proxyPath) + location.RawQuery = req.URL.RawQuery + + handler := proxy.NewThrottledUpgradeAwareProxyHandler(&location, r.karmadaTransPort, true, false, responder) + handler.ServeHTTP(rw, req) + }), nil + } + + clusterList, err := r.clusterLister(ctx) + if err != nil { + return nil, err + } + + // 2. for resource request. + if len(proxyRequestInfo.Name) != 0 { + return requestWithResourceNameHandlerFunc(ctx, proxyPath, secretGetter, responder, clusterList), nil + } + return requestWithoutResourceNameHandlerFunc(ctx, proxyPath, secretGetter, clusterList), nil +} + +func requestWithResourceNameHandlerFunc( + ctx context.Context, + proxyPath string, + secretGetter func(context.Context, string, string) (*corev1.Secret, error), + responder rest.Responder, + clusterList *clusterapis.ClusterList, +) http.Handler { + // For specific resource, get first to determine which cluster belong to, + // and then proxy to the target member cluster. + // Note: for resource creation requests, the resource name is not specified, + // so the request to create a resource is not considered. + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + proxyRequestInfo := lifted.NewRequestInfo(&http.Request{ + URL: &url.URL{Path: proxyPath, RawQuery: req.URL.RawQuery}, + Method: req.Method, + }) + + requester, exist := request.UserFrom(req.Context()) + if !exist { + responsewriters.InternalError(rw, req, errors.New("no user found for request")) + return + } + + requestContexts := make([]requestContext, 0) + for i := range clusterList.Items { + cluster := clusterList.Items[i] + // TODO: store it in the memory + tlsConfig, err := proxy.GetTlsConfigForCluster(ctx, &cluster, secretGetter) + if err != nil { + klog.Error(err) + continue + } + location, transport, err := proxy.Location(&cluster, tlsConfig) + if err != nil { + klog.Error(err) + continue + } + + // TODO: store it in the memory + impersonateToken, err := clusterImpersonateToken(ctx, &cluster, secretGetter) + if err != nil { + klog.Errorf("failed to get impersonateToken for cluster %s: %v", cluster.Name, err) + continue + } + + statusCode, err := doClusterRequest(req.Method, requestURLStr(location.String(), proxyRequestInfo), transport, requester, impersonateToken) + if err != nil { + klog.Errorf("failed to do request for cluster %s: %v", cluster.Name, err) + continue + } + if statusCode == http.StatusOK { + requestContexts = append(requestContexts, requestContext{ + clusterName: cluster.Name, + location: location, + transport: transport, + impersonateToken: impersonateToken, + }) + } + } + + switch len(requestContexts) { + case 0: + http.Error(rw, "resource not found or don't have permission to get it", http.StatusNotFound) + case 1: + reqCtx := requestContexts[0] + setRequestHeader(req, requester, reqCtx.impersonateToken) + reqCtx.location.Path = path.Join(reqCtx.location.Path, proxyPath) + reqCtx.location.RawQuery = req.URL.RawQuery + + handler := proxy.NewThrottledUpgradeAwareProxyHandler(reqCtx.location, reqCtx.transport, + true, false, responder) + handler.ServeHTTP(rw, req) + default: + clusterNames := make([]string, len(requestContexts)) + for i, reqCtx := range requestContexts { + clusterNames[i] = reqCtx.clusterName + } + http.Error(rw, fmt.Sprintf("conflict resource, exist in more than one cluster: %s", + strings.Join(clusterNames, ",")), http.StatusConflict) + } + }) +} + +// nolint:gocyclo +func requestWithoutResourceNameHandlerFunc( + ctx context.Context, + proxyPath string, + secretGetter func(context.Context, string, string) (*corev1.Secret, error), + clusterList *clusterapis.ClusterList, +) http.Handler { + // For uncertain resource names in processing, we need to make further judgments + // based on the requested verb to determine whether to merge the request results + // or proxy the request to the target cluster. + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + proxyRequestInfo := lifted.NewRequestInfo(&http.Request{ + URL: &url.URL{Path: proxyPath, RawQuery: req.URL.RawQuery}, + Method: req.Method, + }) + + requester, exist := request.UserFrom(req.Context()) + if !exist { + responsewriters.InternalError(rw, req, errors.New("no user found for request")) + return + } + + if proxyRequestInfo.Verb == "list" { + targetClusters := make([]requestContext, 0) + for i := range clusterList.Items { + cluster := clusterList.Items[i] + // TODO: store it in the memory + tlsConfig, err := proxy.GetTlsConfigForCluster(ctx, &cluster, secretGetter) + if err != nil { + klog.Error(err) + continue + } + location, transport, err := proxy.Location(&cluster, tlsConfig) + if err != nil { + klog.Error(err) + continue + } + + // TODO: store it in the memory + impersonateToken, err := clusterImpersonateToken(ctx, &cluster, secretGetter) + if err != nil { + klog.Errorf("failed to get impersonateToken for cluster %s: %v", cluster.Name, err) + continue + } + + location.Path = path.Join(location.Path, proxyPath) + location.RawQuery = req.URL.RawQuery + + simpleRequest, err := http.NewRequest(req.Method, location.String(), nil) + if err != nil { + klog.Errorf("failed to create request for cluster %s: %v", cluster.Name, err) + continue + } + // simpleRequest.Header = req.Header.Clone() + setRequestHeader(simpleRequest, requester, impersonateToken) + + httpClient := &http.Client{Transport: transport} + resp, err := httpClient.Do(simpleRequest) + if err != nil { + klog.Errorf("failed to do request for cluster %s: %v", cluster.Name, err) + continue + } + if resp.StatusCode != http.StatusOK { + klog.Warningf("get resource not ok with cluster %s: %d", cluster.Name, resp.StatusCode) + continue + } + body, err := io.ReadAll(resp.Body) + if err != nil { + klog.Errorf("unable to read content with cluster %s response: %v", cluster.Name, err) + continue + } + + targetClusters = append(targetClusters, requestContext{ + clusterName: cluster.Name, + responseBody: body, + responseHeader: resp.Header, + }) + _ = resp.Body.Close() + } + + if len(targetClusters) == 0 { + http.Error(rw, "not found", http.StatusNotFound) + return + } + + var resObjList *unstructured.UnstructuredList + for _, reqCtx := range targetClusters { + objList := &unstructured.UnstructuredList{} + err := objList.UnmarshalJSON(reqCtx.responseBody) + if err != nil { + klog.Errorf("Failed to unmarshal object list, error is: %v", err) + continue + } + + if resObjList == nil { + resObjList = objList + continue + } + resObjList.Items = append(resObjList.Items, objList.Items...) + } + resByte, err := resObjList.MarshalJSON() + if err != nil { + klog.Errorf("Failed to marshal object list, error is: %v", err) + return + } + rw.Header().Set("Content-Type", "application/json") + rw.Header().Set("Content-Length", string(strconv.Itoa(len(resByte)))) + for k, vs := range targetClusters[0].responseHeader { + if rw.Header().Get(k) != "" { + continue + } + for _, v := range vs { + rw.Header().Set(k, v) + } + } + rw.WriteHeader(http.StatusOK) + _, _ = rw.Write(resByte) + + return + } + + http.Error(rw, fmt.Sprintf("Request verb %s is not support", proxyRequestInfo.Verb), http.StatusMethodNotAllowed) + }) +} + +func clusterImpersonateToken( + ctx context.Context, + cluster *clusterapis.Cluster, + secretGetter func(context.Context, string, string) (*corev1.Secret, error), +) (string, error) { + if cluster.Spec.ImpersonatorSecretRef == nil { + return "", fmt.Errorf("the impersonatorSecretRef of cluster is nil") + } + secret, err := secretGetter(ctx, cluster.Spec.ImpersonatorSecretRef.Namespace, cluster.Spec.ImpersonatorSecretRef.Name) + if err != nil { + return "", err + } + impersonateToken, err := proxy.ImpersonateToken(cluster.Name, secret) + if err != nil { + return "", err + } + return impersonateToken, nil +} + +func doClusterRequest( + method string, + url string, + transport http.RoundTripper, + userInfo user.Info, + impersonateToken string, +) (statusCode int, err error) { + simpleRequest, err := http.NewRequest(method, url, nil) + if err != nil { + return 0, err + } + setRequestHeader(simpleRequest, userInfo, impersonateToken) + + httpClient := &http.Client{Transport: transport} + resp, err := httpClient.Do(simpleRequest) + if err != nil { + return 0, err + } + _ = resp.Body.Close() + return resp.StatusCode, nil +} + +// requestURLStr returns the request resource url string. +func requestURLStr(urlStr string, requestInfo *apirequest.RequestInfo) string { + parts := []string{requestInfo.APIPrefix} + if requestInfo.APIGroup != "" { + parts = append(parts, requestInfo.APIGroup) + } + parts = append(parts, requestInfo.APIVersion) + if requestInfo.Namespace != "" { + parts = append(parts, "namespaces", requestInfo.Namespace) + } + if requestInfo.Resource != "" { + parts = append(parts, requestInfo.Resource) + } + if requestInfo.Name != "" { + parts = append(parts, requestInfo.Name) + } + if requestInfo.Subresource != "" && + requestInfo.Subresource != "exec" && requestInfo.Subresource != "log" { + parts = append(parts, requestInfo.Subresource) + } + return fmt.Sprintf("%s/%s", urlStr, strings.Join(parts, "/")) +} + +func setRequestHeader(req *http.Request, userInfo user.Info, impersonateToken string) { + req.Header.Set(authenticationv1.ImpersonateUserHeader, userInfo.GetName()) + for _, group := range userInfo.GetGroups() { + if !proxy.SkipGroup(group) { + req.Header.Add(authenticationv1.ImpersonateGroupHeader, group) + } + } + req.Header.Set("Authorization", fmt.Sprintf("bearer %s", impersonateToken)) +} diff --git a/pkg/registry/cluster/storage/proxy.go b/pkg/registry/cluster/storage/proxy.go index 939b97f012ec..54a476a29a5d 100644 --- a/pkg/registry/cluster/storage/proxy.go +++ b/pkg/registry/cluster/storage/proxy.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "net/url" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -11,16 +12,24 @@ import ( "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/kubernetes" listcorev1 "k8s.io/client-go/listers/core/v1" + restclient "k8s.io/client-go/rest" clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" "github.com/karmada-io/karmada/pkg/util/proxy" ) +const matchAllClusters = "*" + // ProxyREST implements the proxy subresource for a Cluster. type ProxyREST struct { + restConfig *restclient.Config kubeClient kubernetes.Interface secretLister listcorev1.SecretLister clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error) + clusterLister func(ctx context.Context) (*clusterapis.ClusterList, error) + + karmadaLocation *url.URL + karmadaTransPort http.RoundTripper } // Implement Connecter @@ -50,19 +59,19 @@ func (r *ProxyREST) Connect(ctx context.Context, id string, options runtime.Obje return nil, fmt.Errorf("invalid options object: %#v", options) } + secretGetter := func(ctx context.Context, namespace string, name string) (*corev1.Secret, error) { + return r.kubeClient.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) + } + + if id == matchAllClusters { + return r.connectAllClusters(ctx, proxyOpts.Path, secretGetter, responder) + } + cluster, err := r.clusterGetter(ctx, id) if err != nil { return nil, err } - secretGetter := func(ctx context.Context, namespace string, name string) (*corev1.Secret, error) { - secret, err := r.secretLister.Secrets(namespace).Get(name) - if err != nil { - // fall back to call api server in case the cache has not been synchronized yet - return r.kubeClient.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) - } - return secret, nil - } return proxy.ConnectCluster(ctx, cluster, proxyOpts.Path, secretGetter, responder) } diff --git a/pkg/registry/cluster/storage/storage.go b/pkg/registry/cluster/storage/storage.go index 59e0dd9720a0..99c5403edf68 100644 --- a/pkg/registry/cluster/storage/storage.go +++ b/pkg/registry/cluster/storage/storage.go @@ -14,6 +14,7 @@ import ( "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/kubernetes" listcorev1 "k8s.io/client-go/listers/core/v1" + restclient "k8s.io/client-go/rest" "sigs.k8s.io/structured-merge-diff/v4/fieldpath" clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" @@ -32,7 +33,7 @@ type ClusterStorage struct { } // NewStorage returns a ClusterStorage object that will work against clusters. -func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, secretLister listcorev1.SecretLister, optsGetter generic.RESTOptionsGetter) (*ClusterStorage, error) { +func NewStorage(scheme *runtime.Scheme, restConfig *restclient.Config, secretLister listcorev1.SecretLister, optsGetter generic.RESTOptionsGetter) (*ClusterStorage, error) { strategy := clusterregistry.NewStrategy(scheme) store := &genericregistry.Store{ @@ -59,14 +60,22 @@ func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, secretL statusStore.UpdateStrategy = statusStrategy statusStore.ResetFieldsStrategy = statusStrategy + kubeClientSet := kubernetes.NewForConfigOrDie(restConfig) + karmadaLocation, karmadaTransport, err := karmadaResourceLocation(restConfig) + if err != nil { + return nil, err + } clusterRest := &REST{secretLister, store} return &ClusterStorage{ Cluster: clusterRest, Status: &StatusREST{&statusStore}, Proxy: &ProxyREST{ - kubeClient: kubeClient, - secretLister: secretLister, - clusterGetter: clusterRest.getCluster, + restConfig: restConfig, + kubeClient: kubeClientSet, + clusterGetter: clusterRest.getCluster, + clusterLister: clusterRest.listClusters, + karmadaLocation: karmadaLocation, + karmadaTransPort: karmadaTransport, }, }, nil } @@ -109,6 +118,17 @@ func (r *REST) getCluster(ctx context.Context, name string) (*clusterapis.Cluste } return cluster, nil } +func (r *REST) listClusters(ctx context.Context) (*clusterapis.ClusterList, error) { + obj, err := r.List(ctx, nil) + if err != nil { + return nil, err + } + clusterList := obj.(*clusterapis.ClusterList) + if clusterList == nil { + return nil, fmt.Errorf("unexpected object type: %#v", obj) + } + return clusterList, nil +} // ResourceGetter is an interface for retrieving resources by ResourceLocation. type ResourceGetter interface { diff --git a/pkg/util/proxy/proxy.go b/pkg/util/proxy/proxy.go index 73b8991a23fc..789fcddffea4 100644 --- a/pkg/util/proxy/proxy.go +++ b/pkg/util/proxy/proxy.go @@ -53,7 +53,7 @@ func ConnectCluster(ctx context.Context, cluster *clusterapis.Cluster, proxyPath if err != nil { return nil, err } - impersonateToken, err := getImpersonateToken(cluster.Name, impersonateTokenSecret) + impersonateToken, err := ImpersonateToken(cluster.Name, impersonateTokenSecret) if err != nil { return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", cluster.Name, err) } @@ -72,7 +72,7 @@ func newProxyHandler(location *url.URL, proxyTransport http.RoundTripper, cluste req.Header.Set(authenticationv1.ImpersonateUserHeader, requester.GetName()) for _, group := range requester.GetGroups() { - if !skipGroup(group) { + if !SkipGroup(group) { req.Header.Add(authenticationv1.ImpersonateGroupHeader, group) } } @@ -197,7 +197,7 @@ func ParseProxyHeaders(proxyHeaders map[string]string) http.Header { return header } -func getImpersonateToken(clusterName string, secret *corev1.Secret) (string, error) { +func ImpersonateToken(clusterName string, secret *corev1.Secret) (string, error) { token, found := secret.Data[clusterapis.SecretTokenKey] if !found { return "", fmt.Errorf("the impresonate token of cluster %s is empty", clusterName) @@ -213,7 +213,8 @@ func getClusterCABundle(clusterName string, secret *corev1.Secret) (string, erro return string(caBundle), nil } -func skipGroup(group string) bool { +// SkipGroup tells whether the input group can be skipped during impersonate. +func SkipGroup(group string) bool { switch group { case user.AllAuthenticated, user.AllUnauthenticated: return true