Skip to content

Commit 67e6ed8

Browse files
committed
support remote multil apiserver proxy
1 parent da720e4 commit 67e6ed8

File tree

11 files changed

+518
-28
lines changed

11 files changed

+518
-28
lines changed

pkg/apis/ctrlmesh/constants/constants.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package constants
1919
const (
2020
ProxyUserID = 1359
2121

22-
ProxyMetricsHealthPort = 5441
23-
ProxyApiserverPort = 5443
24-
ProxyWebhookPort = 5445
25-
ProxyGRPCPort = 5447
26-
ProxyMetricsPort = 5449
27-
ProxyManagerHealthPort = 5451
28-
ProxyGRPCServerPort = 5453
22+
ProxyMetricsHealthPort = 5441
23+
ProxyApiServerPort = 5443
24+
ProxyWebhookPort = 5445
25+
ProxyGRPCPort = 5447
26+
ProxyMetricsPort = 5449
27+
ProxyManagerHealthPort = 5451
28+
ProxyGRPCServerPort = 5453
29+
ProxyRemoteApiServerPort = 5455
2930

3031
ProxyIptablesPort = 15002
3132
PprofListenPort = 5050
@@ -34,7 +35,7 @@ const (
3435

3536
ProxyIptablesPortFlag = "proxy-metrics"
3637
ProxyMetricsHealthPortFlag = "metrics-health-port"
37-
ProxyApiserverPortFlag = "proxy-apiserver-port"
38+
ProxyApiServerPortFlag = "proxy-apiserver-port"
3839
ProxyWebhookPortFlag = "proxy-webhook-port"
3940
ProxyGRPCPortFlag = "grpc-port"
4041
ProxyIptablesFlag = "tport"

pkg/apis/ctrlmesh/http/http.go

+1
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ const (
2020
HeaderMeshRealEndpoint = "Mesh-Real-Endpoint"
2121
HeaderHttpApiServerPreUrl = "Mesh-Pre-Url-Added"
2222
HeaderEscapeMesh = "Mesh-Escape"
23+
HeaderRemoteApiServerHost = "Remote-Api-Server-Host"
2324
)

pkg/apis/ctrlmesh/rest/types.go

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
Copyright 2024 The KusionStack Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package rest
18+
19+
const RemoteRegisterPath = "/remote-register"
20+
21+
type ConfigRequest struct {
22+
Action Action `json:"action"`
23+
Kubeconfigs [][]byte `json:"kubeconfig"`
24+
}
25+
26+
type Action string
27+
28+
const (
29+
Add Action = "add"
30+
Delete Action = "delete"
31+
Update Action = "update"
32+
)

pkg/cmd/proxy/main.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/constants"
3737
"github.com/KusionStack/controller-mesh/pkg/client"
3838
proxyapiserver "github.com/KusionStack/controller-mesh/pkg/proxy/apiserver"
39+
"github.com/KusionStack/controller-mesh/pkg/proxy/apiserver/common"
3940
proxycache "github.com/KusionStack/controller-mesh/pkg/proxy/cache"
4041
"github.com/KusionStack/controller-mesh/pkg/proxy/circuitbreaker"
4142
"github.com/KusionStack/controller-mesh/pkg/proxy/faultinjection"
@@ -47,7 +48,7 @@ import (
4748

4849
var (
4950
metricsHealthPort = flag.Int(constants.ProxyMetricsHealthPortFlag, constants.ProxyMetricsHealthPort, "Port to bind 0.0.0.0 and serve metric endpoint/healthz/pprof.")
50-
proxyApiserverPort = flag.Int(constants.ProxyApiserverPortFlag, constants.ProxyApiserverPort, "Port to bind localhost and proxy the requests to apiserver.")
51+
proxyApiServerPort = flag.Int(constants.ProxyApiServerPortFlag, constants.ProxyApiServerPort, "Port to bind localhost and proxy the requests to apiserver.")
5152
proxyWebhookPort = flag.Int(constants.ProxyWebhookPortFlag, constants.ProxyWebhookPort, "Port to bind 0.0.0.0 and proxy the requests to webhook.")
5253

5354
leaderElectionName = flag.String(constants.ProxyLeaderElectionNameFlag, "", "The name of leader election.")
@@ -103,19 +104,19 @@ func main() {
103104
klog.Fatalf("Failed to start proxy client: %v", err)
104105
}
105106

106-
var stoppedApiserver, stoppedWebhook <-chan struct{}
107+
var stoppedApiserver, stoppedRemoteApiserver <-chan struct{}
107108

108109
// TODO: webhook proxy
109110

110111
// ApiServer proxy
111112
{
112-
opts := proxyapiserver.NewOptions()
113+
opts := common.NewOptions()
113114
opts.Config = rest.CopyConfig(cfg)
114115
// Certs generated by proxy-init.sh
115116
opts.SecureServingOptions.ServerCert.CertKey.KeyFile = "/var/run/secrets/kubernetes.io/serviceaccount/ctrlmesh/tls.key"
116117
opts.SecureServingOptions.ServerCert.CertKey.CertFile = "/var/run/secrets/kubernetes.io/serviceaccount/ctrlmesh/tls.crt"
117118
opts.SecureServingOptions.BindAddress = net.ParseIP("127.0.0.1")
118-
opts.SecureServingOptions.BindPort = *proxyApiserverPort
119+
opts.SecureServingOptions.BindPort = *proxyApiServerPort
119120
opts.LeaderElectionName = *leaderElectionName
120121
opts.SpecManager = proxyClient.GetSpecManager()
121122
opts.BreakerWrapperFunc = breakerMgr.HandlerWrapper()
@@ -136,16 +137,27 @@ func main() {
136137
}
137138
}
138139

140+
// Remote ApiServer Proxy
141+
{
142+
opts := common.NewOptions()
143+
opts.SpecManager = proxyClient.GetSpecManager()
144+
proxy, err := proxyapiserver.NewRemoteProxy(opts)
145+
if err != nil {
146+
klog.Fatalf("Failed to new remote apiserver proxy: %v", err)
147+
}
148+
stoppedRemoteApiserver, err = proxy.Start()
149+
if err != nil {
150+
klog.Fatalf("Failed to start remote apiserver proxy: %v", err)
151+
}
152+
}
153+
139154
{
140155
go tproxy.NewTProxy(*proxyIptablePort, faultInjectionMgr, breakerMgr).Start()
141156
}
142157

143158
serveHTTP(ctx, readyHandler)
144-
if stoppedWebhook != nil {
145-
<-stoppedWebhook
146-
klog.Infof("Webhook proxy stopped")
147-
}
148159
<-stoppedApiserver
160+
<-stoppedRemoteApiserver
149161
klog.Infof("Apiserver proxy stopped")
150162
}
151163

pkg/proxy/apiserver/injector.go renamed to pkg/proxy/apiserver/common/injector.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package apiserver
17+
package common
1818

1919
import (
2020
"fmt"
@@ -43,7 +43,7 @@ type injector struct {
4343
specManager *protomanager.SpecManager
4444
}
4545

46-
func New(m *protomanager.SpecManager) Injector {
46+
func NewInjector(m *protomanager.SpecManager) Injector {
4747
return &injector{specManager: m}
4848
}
4949

pkg/proxy/apiserver/options.go renamed to pkg/proxy/apiserver/common/options.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
1515
limitations under the License.
1616
*/
1717

18-
package apiserver
18+
package common
1919

2020
import (
2121
"fmt"

pkg/proxy/apiserver/request.go renamed to pkg/proxy/apiserver/common/request.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
1515
limitations under the License.
1616
*/
1717

18-
package apiserver
18+
package common
1919

2020
import (
2121
"fmt"

pkg/proxy/apiserver/handler.go renamed to pkg/proxy/apiserver/in_cluster_handler.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"k8s.io/klog/v2"
4242

4343
"github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/constants"
44+
"github.com/KusionStack/controller-mesh/pkg/proxy/apiserver/common"
4445
proxyfilters "github.com/KusionStack/controller-mesh/pkg/proxy/filters"
4546
"github.com/KusionStack/controller-mesh/pkg/proxy/leaderelection"
4647
"github.com/KusionStack/controller-mesh/pkg/utils"
@@ -49,21 +50,20 @@ import (
4950
)
5051

5152
var (
52-
upgradeSubresources = sets.NewString("exec", "attach")
53-
enableIpTable = os.Getenv(constants.EnvIPTable) == "true"
54-
53+
upgradeSubresources = sets.NewString("exec", "attach")
54+
enableIpTable = os.Getenv(constants.EnvIPTable) == "true"
5555
disableCircuitBreaker = os.Getenv(constants.EnvDisableCircuitBreaker) == "true"
5656
enableFaultInjection = os.Getenv(constants.EnvEnableFaultInjection) == "true"
5757
)
5858

5959
type Proxy struct {
60-
opts *Options
60+
opts *common.Options
6161
inSecureServer *http.Server
6262
servingInfo *server.SecureServingInfo
6363
handler http.Handler
6464
}
6565

66-
func NewProxy(opts *Options) (*Proxy, error) {
66+
func NewProxy(opts *common.Options) (*Proxy, error) {
6767
var servingInfo *server.SecureServingInfo
6868
if enableIpTable {
6969
if err := opts.ApplyTo(&servingInfo); err != nil {
@@ -79,7 +79,7 @@ func NewProxy(opts *Options) (*Proxy, error) {
7979
inHandler := &handler{
8080
cfg: opts.Config,
8181
transport: tp,
82-
injector: New(opts.SpecManager),
82+
injector: common.NewInjector(opts.SpecManager),
8383
}
8484
if opts.LeaderElectionName != "" {
8585
inHandler.electionHandler = leaderelection.New(opts.SpecManager, opts.LeaderElectionName)
@@ -95,7 +95,7 @@ func NewProxy(opts *Options) (*Proxy, error) {
9595
handler = opts.FaultInjectionWrapperFunc(handler)
9696
}
9797
handler = genericfilters.WithWaitGroup(handler, opts.LongRunningFunc, opts.HandlerChainWaitGroup)
98-
handler = WithRequestInfo(handler, opts.RequestInfoResolver)
98+
handler = common.WithRequestInfo(handler, opts.RequestInfoResolver)
9999
handler = proxyfilters.WithPanicRecovery(handler, opts.RequestInfoResolver)
100100

101101
inSecureServer := &http.Server{
@@ -136,7 +136,7 @@ func (p *Proxy) Start(ctx context.Context) (<-chan struct{}, error) {
136136
type handler struct {
137137
cfg *rest.Config
138138
transport http.RoundTripper
139-
injector Injector
139+
injector common.Injector
140140
electionHandler leaderelection.Handler
141141
}
142142

pkg/proxy/apiserver/remote.go

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
Copyright 2024 The KusionStack Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package apiserver
18+
19+
import (
20+
"bytes"
21+
"crypto/sha256"
22+
"errors"
23+
"fmt"
24+
"net/http"
25+
"sync"
26+
27+
"k8s.io/client-go/rest"
28+
"k8s.io/client-go/tools/clientcmd"
29+
)
30+
31+
func NewClusterStore() *ClusterStore {
32+
return &ClusterStore{
33+
remoteClusters: make(map[string]*Cluster),
34+
}
35+
}
36+
37+
type ClusterStore struct {
38+
sync.RWMutex
39+
// keyed by api server host
40+
remoteClusters map[string]*Cluster
41+
}
42+
43+
// Cluster defines cluster struct
44+
type Cluster struct {
45+
// Client for accessing the cluster.
46+
sync.RWMutex
47+
Config *rest.Config
48+
Transport http.RoundTripper
49+
KubeConfigSha [sha256.Size]byte
50+
}
51+
52+
func (c *ClusterStore) Get(host string) *Cluster {
53+
c.RLock()
54+
defer c.RUnlock()
55+
return c.remoteClusters[host]
56+
}
57+
58+
func (c *ClusterStore) StoreListOf(kubeConfigs ...[]byte) (err error) {
59+
for _, kubeConfig := range kubeConfigs {
60+
if localErr := c.Store(kubeConfig); localErr != nil {
61+
err = errors.Join(err, localErr)
62+
}
63+
}
64+
return err
65+
}
66+
67+
func (c *ClusterStore) Store(kubeConfig []byte) error {
68+
sha := sha256.Sum256(kubeConfig)
69+
cfg, err := DefaultBuildRestConfig(kubeConfig)
70+
if err != nil {
71+
return err
72+
}
73+
c.Lock()
74+
defer c.Unlock()
75+
cluster, ok := c.remoteClusters[cfg.Host]
76+
if ok && bytes.Equal(sha[:], cluster.KubeConfigSha[:]) {
77+
return nil
78+
}
79+
tp, err := rest.TransportFor(cfg)
80+
if err != nil {
81+
return err
82+
}
83+
c.remoteClusters[cfg.Host] = &Cluster{
84+
Config: cfg,
85+
KubeConfigSha: sha,
86+
Transport: tp,
87+
}
88+
return nil
89+
}
90+
91+
func (c *ClusterStore) Delete(kubeConfig []byte) error {
92+
cfg, err := DefaultBuildRestConfig(kubeConfig)
93+
if err != nil {
94+
return err
95+
}
96+
c.Lock()
97+
defer c.Unlock()
98+
delete(c.remoteClusters, cfg.Host)
99+
return nil
100+
}
101+
102+
func DefaultBuildRestConfig(kubeConfig []byte) (*rest.Config, error) {
103+
if len(kubeConfig) == 0 {
104+
return nil, fmt.Errorf("kubeconfig is empty")
105+
}
106+
rawConfig, err := clientcmd.Load(kubeConfig)
107+
if err != nil {
108+
return nil, fmt.Errorf("kubeconfig cannot be loaded: %v", err)
109+
}
110+
if err = clientcmd.Validate(*rawConfig); err != nil {
111+
return nil, fmt.Errorf("kubeconfig is not valid: %v", err)
112+
}
113+
clientConfig := clientcmd.NewDefaultClientConfig(*rawConfig, &clientcmd.ConfigOverrides{})
114+
return clientConfig.ClientConfig()
115+
}

0 commit comments

Comments
 (0)