diff --git a/.golangci.yaml b/.golangci.yaml index 261bff33..803473c3 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -4,6 +4,7 @@ run: - ".*.gen.go" - pkg/utils/http/reverse_proxy.go - pkg/proxy/apiserver/options.go + - pkg/proxy/apiserver/common/options.go skip-dirs: - vendor/ diff --git a/pkg/apis/ctrlmesh/constants/constants.go b/pkg/apis/ctrlmesh/constants/constants.go index 1aff96b3..5ba3390f 100644 --- a/pkg/apis/ctrlmesh/constants/constants.go +++ b/pkg/apis/ctrlmesh/constants/constants.go @@ -19,13 +19,14 @@ package constants const ( ProxyUserID = 1359 - ProxyMetricsHealthPort = 5441 - ProxyApiserverPort = 5443 - ProxyWebhookPort = 5445 - ProxyGRPCPort = 5447 - ProxyMetricsPort = 5449 - ProxyManagerHealthPort = 5451 - ProxyGRPCServerPort = 5453 + ProxyMetricsHealthPort = 5441 + ProxyApiServerPort = 5443 + ProxyWebhookPort = 5445 + ProxyGRPCPort = 5447 + ProxyMetricsPort = 5449 + ProxyManagerHealthPort = 5451 + ProxyGRPCServerPort = 5453 + ProxyRemoteApiServerPort = 5455 ProxyIptablesPort = 15002 PprofListenPort = 5050 @@ -34,7 +35,7 @@ const ( ProxyIptablesPortFlag = "proxy-metrics" ProxyMetricsHealthPortFlag = "metrics-health-port" - ProxyApiserverPortFlag = "proxy-apiserver-port" + ProxyApiServerPortFlag = "proxy-apiserver-port" ProxyWebhookPortFlag = "proxy-webhook-port" ProxyGRPCPortFlag = "grpc-port" ProxyIptablesFlag = "tport" diff --git a/pkg/apis/ctrlmesh/http/http.go b/pkg/apis/ctrlmesh/http/http.go index c29faa0b..87d70316 100644 --- a/pkg/apis/ctrlmesh/http/http.go +++ b/pkg/apis/ctrlmesh/http/http.go @@ -20,4 +20,5 @@ const ( HeaderMeshRealEndpoint = "Mesh-Real-Endpoint" HeaderHttpApiServerPreUrl = "Mesh-Pre-Url-Added" HeaderEscapeMesh = "Mesh-Escape" + HeaderRemoteApiServerHost = "Remote-Api-Server-Host" ) diff --git a/pkg/apis/ctrlmesh/rest/types.go b/pkg/apis/ctrlmesh/rest/types.go new file mode 100644 index 00000000..d6803d57 --- /dev/null +++ b/pkg/apis/ctrlmesh/rest/types.go @@ -0,0 +1,32 @@ +/* +Copyright 2024 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +const RemoteRegisterPath = "/remote-register" + +type ConfigRequest struct { + Action Action `json:"action"` + Kubeconfigs [][]byte `json:"kubeconfig"` +} + +type Action string + +const ( + Add Action = "add" + Delete Action = "delete" + Update Action = "update" +) diff --git a/pkg/cmd/proxy/main.go b/pkg/cmd/proxy/main.go index 77daaa94..b0d794d6 100644 --- a/pkg/cmd/proxy/main.go +++ b/pkg/cmd/proxy/main.go @@ -36,6 +36,7 @@ import ( "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/constants" "github.com/KusionStack/controller-mesh/pkg/client" proxyapiserver "github.com/KusionStack/controller-mesh/pkg/proxy/apiserver" + "github.com/KusionStack/controller-mesh/pkg/proxy/apiserver/common" proxycache "github.com/KusionStack/controller-mesh/pkg/proxy/cache" "github.com/KusionStack/controller-mesh/pkg/proxy/circuitbreaker" "github.com/KusionStack/controller-mesh/pkg/proxy/faultinjection" @@ -47,7 +48,7 @@ import ( var ( metricsHealthPort = flag.Int(constants.ProxyMetricsHealthPortFlag, constants.ProxyMetricsHealthPort, "Port to bind 0.0.0.0 and serve metric endpoint/healthz/pprof.") - proxyApiserverPort = flag.Int(constants.ProxyApiserverPortFlag, constants.ProxyApiserverPort, "Port to bind localhost and proxy the requests to apiserver.") + proxyApiServerPort = flag.Int(constants.ProxyApiServerPortFlag, constants.ProxyApiServerPort, "Port to bind localhost and proxy the requests to apiserver.") proxyWebhookPort = flag.Int(constants.ProxyWebhookPortFlag, constants.ProxyWebhookPort, "Port to bind 0.0.0.0 and proxy the requests to webhook.") leaderElectionName = flag.String(constants.ProxyLeaderElectionNameFlag, "", "The name of leader election.") @@ -103,19 +104,19 @@ func main() { klog.Fatalf("Failed to start proxy client: %v", err) } - var stoppedApiserver, stoppedWebhook <-chan struct{} + var stoppedApiserver, stoppedRemoteApiserver <-chan struct{} // TODO: webhook proxy // ApiServer proxy { - opts := proxyapiserver.NewOptions() + opts := common.NewOptions() opts.Config = rest.CopyConfig(cfg) // Certs generated by proxy-init.sh opts.SecureServingOptions.ServerCert.CertKey.KeyFile = "/var/run/secrets/kubernetes.io/serviceaccount/ctrlmesh/tls.key" opts.SecureServingOptions.ServerCert.CertKey.CertFile = "/var/run/secrets/kubernetes.io/serviceaccount/ctrlmesh/tls.crt" opts.SecureServingOptions.BindAddress = net.ParseIP("127.0.0.1") - opts.SecureServingOptions.BindPort = *proxyApiserverPort + opts.SecureServingOptions.BindPort = *proxyApiServerPort opts.LeaderElectionName = *leaderElectionName opts.SpecManager = proxyClient.GetSpecManager() opts.BreakerWrapperFunc = breakerMgr.HandlerWrapper() @@ -136,16 +137,27 @@ func main() { } } + // Remote ApiServer Proxy + { + opts := common.NewOptions() + opts.SpecManager = proxyClient.GetSpecManager() + proxy, err := proxyapiserver.NewRemoteProxy(opts) + if err != nil { + klog.Fatalf("Failed to new remote apiserver proxy: %v", err) + } + stoppedRemoteApiserver, err = proxy.Start() + if err != nil { + klog.Fatalf("Failed to start remote apiserver proxy: %v", err) + } + } + { go tproxy.NewTProxy(*proxyIptablePort, faultInjectionMgr, breakerMgr).Start() } serveHTTP(ctx, readyHandler) - if stoppedWebhook != nil { - <-stoppedWebhook - klog.Infof("Webhook proxy stopped") - } <-stoppedApiserver + <-stoppedRemoteApiserver klog.Infof("Apiserver proxy stopped") } diff --git a/pkg/proxy/apiserver/injector.go b/pkg/proxy/apiserver/common/injector.go similarity index 97% rename from pkg/proxy/apiserver/injector.go rename to pkg/proxy/apiserver/common/injector.go index 7d371ceb..f428800d 100644 --- a/pkg/proxy/apiserver/injector.go +++ b/pkg/proxy/apiserver/common/injector.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiserver +package common import ( "fmt" @@ -43,7 +43,7 @@ type injector struct { specManager *protomanager.SpecManager } -func New(m *protomanager.SpecManager) Injector { +func NewInjector(m *protomanager.SpecManager) Injector { return &injector{specManager: m} } diff --git a/pkg/proxy/apiserver/options.go b/pkg/proxy/apiserver/common/options.go similarity index 99% rename from pkg/proxy/apiserver/options.go rename to pkg/proxy/apiserver/common/options.go index de6f8a45..5e2e19d0 100644 --- a/pkg/proxy/apiserver/options.go +++ b/pkg/proxy/apiserver/common/options.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiserver +package common import ( "fmt" diff --git a/pkg/proxy/apiserver/request.go b/pkg/proxy/apiserver/common/request.go similarity index 99% rename from pkg/proxy/apiserver/request.go rename to pkg/proxy/apiserver/common/request.go index 9f52415f..7083bff4 100644 --- a/pkg/proxy/apiserver/request.go +++ b/pkg/proxy/apiserver/common/request.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiserver +package common import ( "fmt" diff --git a/pkg/proxy/apiserver/handler.go b/pkg/proxy/apiserver/in_cluster_handler.go similarity index 94% rename from pkg/proxy/apiserver/handler.go rename to pkg/proxy/apiserver/in_cluster_handler.go index bc2109cd..e51521d9 100644 --- a/pkg/proxy/apiserver/handler.go +++ b/pkg/proxy/apiserver/in_cluster_handler.go @@ -41,6 +41,7 @@ import ( "k8s.io/klog/v2" "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/constants" + "github.com/KusionStack/controller-mesh/pkg/proxy/apiserver/common" proxyfilters "github.com/KusionStack/controller-mesh/pkg/proxy/filters" "github.com/KusionStack/controller-mesh/pkg/proxy/leaderelection" "github.com/KusionStack/controller-mesh/pkg/utils" @@ -49,21 +50,20 @@ import ( ) var ( - upgradeSubresources = sets.NewString("exec", "attach") - enableIpTable = os.Getenv(constants.EnvIPTable) == "true" - + upgradeSubresources = sets.NewString("exec", "attach") + enableIpTable = os.Getenv(constants.EnvIPTable) == "true" disableCircuitBreaker = os.Getenv(constants.EnvDisableCircuitBreaker) == "true" enableFaultInjection = os.Getenv(constants.EnvEnableFaultInjection) == "true" ) type Proxy struct { - opts *Options + opts *common.Options inSecureServer *http.Server servingInfo *server.SecureServingInfo handler http.Handler } -func NewProxy(opts *Options) (*Proxy, error) { +func NewProxy(opts *common.Options) (*Proxy, error) { var servingInfo *server.SecureServingInfo if enableIpTable { if err := opts.ApplyTo(&servingInfo); err != nil { @@ -79,7 +79,7 @@ func NewProxy(opts *Options) (*Proxy, error) { inHandler := &handler{ cfg: opts.Config, transport: tp, - injector: New(opts.SpecManager), + injector: common.NewInjector(opts.SpecManager), } if opts.LeaderElectionName != "" { inHandler.electionHandler = leaderelection.New(opts.SpecManager, opts.LeaderElectionName) @@ -95,7 +95,7 @@ func NewProxy(opts *Options) (*Proxy, error) { handler = opts.FaultInjectionWrapperFunc(handler) } handler = genericfilters.WithWaitGroup(handler, opts.LongRunningFunc, opts.HandlerChainWaitGroup) - handler = WithRequestInfo(handler, opts.RequestInfoResolver) + handler = common.WithRequestInfo(handler, opts.RequestInfoResolver) handler = proxyfilters.WithPanicRecovery(handler, opts.RequestInfoResolver) inSecureServer := &http.Server{ @@ -136,7 +136,7 @@ func (p *Proxy) Start(ctx context.Context) (<-chan struct{}, error) { type handler struct { cfg *rest.Config transport http.RoundTripper - injector Injector + injector common.Injector electionHandler leaderelection.Handler } diff --git a/pkg/proxy/apiserver/remote.go b/pkg/proxy/apiserver/remote.go new file mode 100644 index 00000000..aeab79ea --- /dev/null +++ b/pkg/proxy/apiserver/remote.go @@ -0,0 +1,115 @@ +/* +Copyright 2024 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "bytes" + "crypto/sha256" + "errors" + "fmt" + "net/http" + "sync" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func NewClusterStore() *ClusterStore { + return &ClusterStore{ + remoteClusters: make(map[string]*Cluster), + } +} + +type ClusterStore struct { + sync.RWMutex + // keyed by api server host + remoteClusters map[string]*Cluster +} + +// Cluster defines cluster struct +type Cluster struct { + // Client for accessing the cluster. + sync.RWMutex + Config *rest.Config + Transport http.RoundTripper + KubeConfigSha [sha256.Size]byte +} + +func (c *ClusterStore) Get(host string) *Cluster { + c.RLock() + defer c.RUnlock() + return c.remoteClusters[host] +} + +func (c *ClusterStore) StoreListOf(kubeConfigs ...[]byte) (err error) { + for _, kubeConfig := range kubeConfigs { + if localErr := c.Store(kubeConfig); localErr != nil { + err = errors.Join(err, localErr) + } + } + return err +} + +func (c *ClusterStore) Store(kubeConfig []byte) error { + sha := sha256.Sum256(kubeConfig) + cfg, err := DefaultBuildRestConfig(kubeConfig) + if err != nil { + return err + } + c.Lock() + defer c.Unlock() + cluster, ok := c.remoteClusters[cfg.Host] + if ok && bytes.Equal(sha[:], cluster.KubeConfigSha[:]) { + return nil + } + tp, err := rest.TransportFor(cfg) + if err != nil { + return err + } + c.remoteClusters[cfg.Host] = &Cluster{ + Config: cfg, + KubeConfigSha: sha, + Transport: tp, + } + return nil +} + +func (c *ClusterStore) Delete(kubeConfig []byte) error { + cfg, err := DefaultBuildRestConfig(kubeConfig) + if err != nil { + return err + } + c.Lock() + defer c.Unlock() + delete(c.remoteClusters, cfg.Host) + return nil +} + +func DefaultBuildRestConfig(kubeConfig []byte) (*rest.Config, error) { + if len(kubeConfig) == 0 { + return nil, fmt.Errorf("kubeconfig is empty") + } + rawConfig, err := clientcmd.Load(kubeConfig) + if err != nil { + return nil, fmt.Errorf("kubeconfig cannot be loaded: %v", err) + } + if err = clientcmd.Validate(*rawConfig); err != nil { + return nil, fmt.Errorf("kubeconfig is not valid: %v", err) + } + clientConfig := clientcmd.NewDefaultClientConfig(*rawConfig, &clientcmd.ConfigOverrides{}) + return clientConfig.ClientConfig() +} diff --git a/pkg/proxy/apiserver/remote_handler.go b/pkg/proxy/apiserver/remote_handler.go new file mode 100644 index 00000000..b9c6695c --- /dev/null +++ b/pkg/proxy/apiserver/remote_handler.go @@ -0,0 +1,222 @@ +/* +Copyright 2024 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "k8s.io/apimachinery/pkg/util/httpstream/spdy" + "k8s.io/apimachinery/pkg/util/proxy" + "k8s.io/apiserver/pkg/authentication/user" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + genericfilters "k8s.io/apiserver/pkg/server/filters" + "k8s.io/client-go/rest" + "k8s.io/client-go/transport" + "k8s.io/klog/v2" + + "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/constants" + meshhttp "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/http" + ctrlmeshrest "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/rest" + "github.com/KusionStack/controller-mesh/pkg/proxy/apiserver/common" + proxyfilters "github.com/KusionStack/controller-mesh/pkg/proxy/filters" + utilshttp "github.com/KusionStack/controller-mesh/pkg/utils/http" + "github.com/KusionStack/controller-mesh/pkg/utils/pool" +) + +type RemoteProxy struct { + opts *common.Options + inSecureServer *http.Server +} + +func NewRemoteProxy(opts *common.Options) (*RemoteProxy, error) { + clusterStore := NewClusterStore() + inHandler := &remoteHandler{ + injector: common.NewInjector(opts.SpecManager), + store: clusterStore, + } + + var handler http.Handler = inHandler + handler = genericfilters.WithWaitGroup(handler, opts.LongRunningFunc, opts.HandlerChainWaitGroup) + handler = common.WithRequestInfo(handler, opts.RequestInfoResolver) + handler = proxyfilters.WithPanicRecovery(handler, opts.RequestInfoResolver) + handler = WithRemoteRegister(handler, clusterStore) + inSecureServer := &http.Server{ + Addr: net.JoinHostPort("127.0.0.1", strconv.Itoa(constants.ProxyRemoteApiServerPort)), + Handler: handler, + MaxHeaderBytes: 1 << 20, + } + + return &RemoteProxy{ + opts: opts, + inSecureServer: inSecureServer, + }, nil +} + +type remoteHandler struct { + store *ClusterStore + injector common.Injector +} + +func (p *RemoteProxy) Start() (<-chan struct{}, error) { + stoppedCh := make(chan struct{}) + go func() { + defer close(stoppedCh) + klog.Infof("start listen and serve %s", p.inSecureServer.Addr) + err := p.inSecureServer.ListenAndServe() + if err != nil { + klog.Errorf("fail listen and serve %v", err) + } + }() + return stoppedCh, nil +} + +func (rh *remoteHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + startTime := time.Now() + requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) + if !ok { + klog.Errorf("%s %s %s, no request info in context", r.Method, r.Header.Get("Content-Type"), r.URL) + http.Error(rw, "no request info in context", http.StatusBadRequest) + return + } + + u, err := rh.getURL(r) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + p := utilshttp.NewSingleHostReverseProxy(u) + cluster := rh.store.Get(u.Host) + if cluster == nil { + http.Error(rw, fmt.Sprintf("cluster not found in store. Host: %s", u.Host), http.StatusInternalServerError) + return + } + p.Transport = cluster.Transport + p.FlushInterval = 500 * time.Millisecond + p.BufferPool = pool.BytesPool + + if requestInfo.IsResourceRequest && upgradeSubresources.Has(requestInfo.Subresource) { + rh.upgradeProxyHandler(rw, r, cluster.Config) + return + } + + if err = rh.injector.Inject(r, requestInfo); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + var statusCode int + modifyResponse := func(resp *http.Response) error { + statusCode = resp.StatusCode + return nil + } + + // no need to log leader election requests + defer func() { + if klog.V(4).Enabled() { + klog.InfoS("PROXY", + "verb", r.Method, + "URI", r.RequestURI, + "latency", time.Since(startTime), + "userAgent", r.UserAgent(), + "resp", statusCode, + ) + } + }() + + p.ModifyResponse = modifyResponse + p.ServeHTTP(rw, r) +} + +func (rh *remoteHandler) upgradeProxyHandler(rw http.ResponseWriter, r *http.Request, cfg *rest.Config) { + tlsConfig, err := rest.TLSConfigFor(cfg) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig) + wrappedRT, err := rest.HTTPWrappersForConfig(cfg, upgradeRoundTripper) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.APIServerUser, []string{user.SystemPrivilegedGroup}, nil, wrappedRT) + u, err := rh.getURL(r) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + p := proxy.NewUpgradeAwareHandler(u, proxyRoundTripper, true, true, &responder{w: rw}) + p.ServeHTTP(rw, r) +} + +func (rh *remoteHandler) getURL(r *http.Request) (*url.URL, error) { + remoteHost := r.Header.Get(meshhttp.HeaderRemoteApiServerHost) + if len(remoteHost) == 0 { + return nil, fmt.Errorf("not found remote api server host") + } + return url.Parse(fmt.Sprintf("https://%s", remoteHost)) +} + +func (rh *remoteHandler) newProxy(r *http.Request) (*utilshttp.ReverseProxy, error) { + u, err := rh.getURL(r) + if err != nil { + return nil, err + } + p := utilshttp.NewSingleHostReverseProxy(u) + cluster := rh.store.Get(u.Host) + if cluster == nil { + return nil, fmt.Errorf("cluster not found in store. Host: %s", u.Host) + } + p.Transport = cluster.Transport + p.FlushInterval = 500 * time.Millisecond + p.BufferPool = pool.BytesPool + return p, nil +} + +func WithRemoteRegister(handler http.Handler, store *ClusterStore) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if !strings.HasPrefix(req.URL.Path, "/remote-register") { + handler.ServeHTTP(w, req) + return + } + cfgReq := &ctrlmeshrest.ConfigRequest{} + if err := json.NewDecoder(req.Body).Decode(cfgReq); err != nil { + http.Error(w, fmt.Sprintf("failed to decode ConfigRequest, %v", err), http.StatusBadRequest) + return + } + var err error + defer req.Body.Close() + switch cfgReq.Action { + case ctrlmeshrest.Add, ctrlmeshrest.Update: + err = store.StoreListOf(cfgReq.Kubeconfigs...) + case ctrlmeshrest.Delete: + err = store.StoreListOf(cfgReq.Kubeconfigs...) + default: + } + if err != nil { + http.Error(w, fmt.Sprintf("failed to %s kubeconfig, %v", cfgReq.Action, err), http.StatusBadRequest) + } + }) +} diff --git a/pkg/utils/client/proxy_client.go b/pkg/utils/client/proxy_client.go new file mode 100644 index 00000000..1abea459 --- /dev/null +++ b/pkg/utils/client/proxy_client.go @@ -0,0 +1,103 @@ +/* +Copyright 2024 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "strings" + "time" + + "k8s.io/klog/v2" + + "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/constants" + ctrlmeshrest "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/rest" +) + +const ( + DialInterval = 2 * time.Second + DialTimeOut = 5 * time.Second +) + +func NewClient() *Client { + return &Client{ + httpClient: &http.Client{}, + } +} + +type Client struct { + httpClient *http.Client +} + +func (c *Client) WaitForServerReady(ctx context.Context) error { + address := fmt.Sprintf("http://127.0.0.1:%d", constants.ProxyMetricsHealthPort) + for { + conn, err := net.DialTimeout("tcp", address, DialTimeOut) + if err == nil { + conn.Close() + return nil + } + klog.Infof("waiting for server to become ready... DialErr: %s", err.Error()) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(DialInterval): + continue + } + } +} + +func (c *Client) Sync(ctx context.Context, action ctrlmeshrest.Action, kubeconfigs ...[]byte) error { + req := &ctrlmeshrest.ConfigRequest{ + Action: action, + Kubeconfigs: kubeconfigs, + } + byt, _ := json.Marshal(req) + httpReq, err := http.NewRequestWithContext(ctx, "POST", + fmt.Sprintf("http://127.0.0.1:%d%s", constants.ProxyRemoteApiServerPort, ctrlmeshrest.RemoteRegisterPath), + strings.NewReader(string(byt))) + if err != nil { + return err + } + resp, err := c.httpClient.Do(httpReq) + if err != nil { + klog.Errorf("failed to do http request, %v", err) + return err + } + defer func() { + if resp.Body != nil { + _ = resp.Body.Close() + } + }() + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + klog.Infof("sync kubeconfigs success, Action: %s, StatusCode: %s", action, resp.StatusCode) + return nil + } + + var errBody []byte + errBody, err = io.ReadAll(resp.Body) + if err != nil { + klog.Errorf("failed to read response body, %v", err) + return err + } + return fmt.Errorf("failed by response status code: %d, body: %s", resp.StatusCode, string(errBody)) +}