diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index f164d15cd..e97189e7c 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -85,6 +85,12 @@ type GrpcProxyAgentOptions struct { // Enables updating the server count by counting the number of valid leases // matching the selector. CountServerLeases bool + // Namespace where lease objects are managed. + LeaseNamespace string + // Labels on which lease objects are managed. + LeaseLabel string + // ServerCountSource describes how server counts should be combined. + ServerCountSource string // Path to kubeconfig (used by kubernetes client for lease listing) KubeconfigPath string // Content type of requests sent to apiserver. @@ -104,6 +110,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) WarnOnChannelLimit: o.WarnOnChannelLimit, SyncForever: o.SyncForever, XfrChannelSize: o.XfrChannelSize, + ServerCountSource: o.ServerCountSource, } } @@ -132,6 +139,9 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.") flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.") flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.") + flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.") + flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.") + flags.StringVar(&o.ServerCountSource, "server-count-source", o.ServerCountSource, "Defines how the server counts from lease and from server responses are combined. Possible values: 'default' to use only one source (server or leases depending on other flags), 'max' to take the larger value.") flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file") flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.") return flags @@ -159,6 +169,10 @@ func (o *GrpcProxyAgentOptions) Print() { klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers)) klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit) klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever) + klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases) + klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace) + klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel) + klog.V(1).Infof("ServerCountSource set to %s.\n", o.ServerCountSource) klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize) klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType) } @@ -216,6 +230,18 @@ func (o *GrpcProxyAgentOptions) Validate() error { return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err) } } + // Validate labels provided. + if o.CountServerLeases { + _, err := util.ParseLabels(o.LeaseLabel) + if err != nil { + return err + } + } + if o.ServerCountSource != "" { + if o.ServerCountSource != "default" && o.ServerCountSource != "max" { + return fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got %v", o.ServerCountSource) + } + } return nil } @@ -263,6 +289,9 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { SyncForever: false, XfrChannelSize: 150, CountServerLeases: false, + LeaseNamespace: "kube-system", + LeaseLabel: "k8s-app=konnectivity-server", + ServerCountSource: "default", KubeconfigPath: "", APIContentType: runtime.ContentTypeProtobuf, } diff --git a/cmd/agent/app/options/options_test.go b/cmd/agent/app/options/options_test.go index 7ce38b2da..9432217e7 100644 --- a/cmd/agent/app/options/options_test.go +++ b/cmd/agent/app/options/options_test.go @@ -156,6 +156,10 @@ func TestValidate(t *testing.T) { fieldMap: map[string]interface{}{"XfrChannelSize": -10}, expected: fmt.Errorf("channel size -10 must be greater than 0"), }, + "ServerCountSource": { + fieldMap: map[string]interface{}{"ServerCountSource": "foobar"}, + expected: fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got foobar"), + }, } { t.Run(desc, func(t *testing.T) { testAgentOptions := NewGrpcProxyAgentOptions() diff --git a/cmd/agent/app/server.go b/cmd/agent/app/server.go index c98dd7cae..83ee3afbb 100644 --- a/cmd/agent/app/server.go +++ b/cmd/agent/app/server.go @@ -53,7 +53,6 @@ import ( const ( ReadHeaderTimeout = 60 * time.Second - LeaseNamespace = "kube-system" LeaseInformerResync = time.Second * 10 ) @@ -163,11 +162,11 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st if err != nil { return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err) } - leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, LeaseNamespace, LeaseInformerResync) + leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, o.LeaseNamespace, LeaseInformerResync) go leaseInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced) leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer()) - serverLeaseSelector, _ := labels.Parse("k8s-app=konnectivity-server") + serverLeaseSelector, _ := labels.Parse(o.LeaseLabel) serverLeaseCounter := agent.NewServerLeaseCounter( clock.RealClock{}, leaseLister, diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index de0b89902..f5a26621c 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -108,6 +108,10 @@ type ProxyRunOptions struct { // Lease controller configuration EnableLeaseController bool + // Lease Namespace + LeaseNamespace string + // Lease Labels + LeaseLabel string } func (o *ProxyRunOptions) Flags() *pflag.FlagSet { @@ -146,6 +150,8 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet { flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.") flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.") flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.") + flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.") + flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.") flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.") flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.") @@ -184,6 +190,9 @@ func (o *ProxyRunOptions) Print() { klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst) klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType) klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies) + klog.V(1).Infof("EnableLeaseController set to %v.\n", o.EnableLeaseController) + klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace) + klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel) klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites) klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize) } @@ -321,6 +330,13 @@ func (o *ProxyRunOptions) Validate() error { } } } + // Validate labels provided. + if o.EnableLeaseController { + _, err := util.ParseLabels(o.LeaseLabel) + if err != nil { + return err + } + } return nil } @@ -361,6 +377,8 @@ func NewProxyRunOptions() *ProxyRunOptions { CipherSuites: make([]string, 0), XfrChannelSize: 10, EnableLeaseController: false, + LeaseNamespace: "kube-system", + LeaseLabel: "k8s-app=konnectivity-server", } return &o } diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index d96cb105b..a9c265f9d 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -57,7 +57,6 @@ const ( LeaseDuration = 30 * time.Second LeaseRenewalInterval = 15 * time.Second LeaseGCInterval = 15 * time.Second - LeaseNamespace = "kube-system" ) func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command { @@ -156,6 +155,11 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { } defer p.agentServer.Stop() + labels, err := util.ParseLabels(o.LeaseLabel) + if err != nil { + return err + } + if o.EnableLeaseController { leaseController := leases.NewController( k8sClient, @@ -164,8 +168,8 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { LeaseRenewalInterval, LeaseGCInterval, fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID), - LeaseNamespace, - map[string]string{"k8s-app": "konnectivity-server"}, + o.LeaseNamespace, + labels, ) klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.") leaseController.Run(ctx) diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index 785f14f48..a17948b99 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -30,6 +30,12 @@ import ( "sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics" ) +const ( + fromResponses = "KNP server response headers" + fromLeases = "KNP lease count" + fromFallback = "fallback to 1" +) + // ClientSet consists of clients connected to each instance of an HA proxy server. type ClientSet struct { mu sync.Mutex //protects the clients. @@ -39,7 +45,7 @@ type ClientSet struct { agentID string // ID of this agent address string // proxy server address. Assuming HA proxy server - leaseCounter *ServerLeaseCounter // counts number of proxy server leases + leaseCounter ServerCounter // counts number of proxy server leases lastReceivedServerCount int // last server count received from a proxy server lastServerCount int // last server count value from either lease system or proxy server, former takes priority @@ -68,6 +74,7 @@ type ClientSet struct { xfrChannelSize int syncForever bool // Continue syncing (support dynamic server count). + serverCountSource string } func (cs *ClientSet) ClientsCount() int { @@ -147,7 +154,8 @@ type ClientSetConfig struct { WarnOnChannelLimit bool SyncForever bool XfrChannelSize int - ServerLeaseCounter *ServerLeaseCounter + ServerLeaseCounter ServerCounter + ServerCountSource string } func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet { @@ -167,6 +175,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C xfrChannelSize: cc.XfrChannelSize, stopCh: stopCh, leaseCounter: cc.ServerLeaseCounter, + serverCountSource: cc.ServerCountSource, } } @@ -218,15 +227,41 @@ func (cs *ClientSet) sync() { } func (cs *ClientSet) ServerCount() int { + var serverCount int - if cs.leaseCounter != nil { - serverCount = cs.leaseCounter.Count() - } else { - serverCount = cs.lastReceivedServerCount + var countSourceLabel string + + switch cs.serverCountSource { + case "", "default": + if cs.leaseCounter != nil { + serverCount = cs.leaseCounter.Count() + countSourceLabel = fromLeases + } else { + serverCount = cs.lastReceivedServerCount + countSourceLabel = fromResponses + } + case "max": + countFromLeases := 0 + if cs.leaseCounter != nil { + countFromLeases = cs.leaseCounter.Count() + } + countFromResponses := cs.lastReceivedServerCount + + serverCount = countFromLeases + countSourceLabel = fromLeases + if countFromResponses > serverCount { + serverCount = countFromResponses + countSourceLabel = fromResponses + } + if serverCount == 0 { + serverCount = 1 + countSourceLabel = fromFallback + } + } if serverCount != cs.lastServerCount { - klog.Warningf("change detected in proxy server count (was: %d, now: %d)", cs.lastServerCount, serverCount) + klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel) cs.lastServerCount = serverCount } diff --git a/pkg/agent/clientset_test.go b/pkg/agent/clientset_test.go new file mode 100644 index 000000000..a7b674cf9 --- /dev/null +++ b/pkg/agent/clientset_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agent + +import ( + "testing" +) + +type FakeServerCounter struct { + count int +} + +func (f *FakeServerCounter) Count() int { + return f.count +} + +func TestServerCount(t *testing.T) { + testCases := []struct{ + name string + serverCountSource string + leaseCounter ServerCounter + responseCount int + want int + } { + { + name: "higher from response", + serverCountSource: "max", + responseCount: 42, + leaseCounter: &FakeServerCounter{24}, + want: 42, + }, + { + name: "higher from leases", + serverCountSource: "max", + responseCount: 3, + leaseCounter: &FakeServerCounter{6}, + want: 6, + }, + { + name: "both zero", + serverCountSource: "max", + responseCount: 0, + leaseCounter: &FakeServerCounter{0}, + want: 1, + }, + + { + name: "response picked by default when no lease counter", + serverCountSource: "default", + responseCount: 3, + leaseCounter: nil, + want: 3, + }, + { + name: "lease counter always picked when present", + serverCountSource: "default", + responseCount: 6, + leaseCounter: &FakeServerCounter{3}, + want: 3, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + cs := &ClientSet{ + clients: make(map[string]*Client), + leaseCounter: tc.leaseCounter, + serverCountSource: tc.serverCountSource, + + } + cs.lastReceivedServerCount = tc.responseCount + if got := cs.ServerCount(); got != tc.want { + t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want) + } + }) + } + +} diff --git a/pkg/agent/lease_counter.go b/pkg/agent/lease_counter.go index 4175d4ffe..cdc74ce70 100644 --- a/pkg/agent/lease_counter.go +++ b/pkg/agent/lease_counter.go @@ -36,6 +36,10 @@ import ( coordinationv1lister "k8s.io/client-go/listers/coordination/v1" ) +type ServerCounter interface { + Count() int +} + // A ServerLeaseCounter counts leases in the k8s apiserver to determine the // current proxy server count. type ServerLeaseCounter struct { diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 000000000..77e694d5f --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,27 @@ +package util + +import ( + "fmt" + "strings" +) + +// ParseLabels takes a comma-separated string of key-value pairs and returns a map of labels. +func ParseLabels(labelStr string) (map[string]string, error) { + labels := make(map[string]string) + + if len(labelStr) == 0 { + return labels, fmt.Errorf("empty string provided") + } + pairs := strings.Split(labelStr, ",") + + for _, pair := range pairs { + keyValue := strings.Split(pair, "=") + if len(keyValue) != 2 { + return nil, fmt.Errorf("invalid label format: %s", pair) + } + key := strings.TrimSpace(keyValue[0]) + value := strings.TrimSpace(keyValue[1]) + labels[key] = value + } + return labels, nil +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go new file mode 100644 index 000000000..82fc56d30 --- /dev/null +++ b/pkg/util/util_test.go @@ -0,0 +1,74 @@ +package util + +import ( + "testing" +) + +func TestParseLabels(t *testing.T) { + testCases := []struct { + input string + expectedOutput map[string]string + shouldError bool + }{ + { + input: "app=myapp,env=prod,version=1.0", + expectedOutput: map[string]string{ + "app": "myapp", + "env": "prod", + "version": "1.0", + }, + shouldError: false, + }, + { + input: "app=myapp,env=prod,invalid", + expectedOutput: nil, + shouldError: true, + }, + { + input: "app=myapp", + expectedOutput: map[string]string{ + "app": "myapp", + }, + shouldError: false, + }, + { + input: "", + expectedOutput: map[string]string{}, + shouldError: true, + }, + { + input: " key = value , another = test ", + expectedOutput: map[string]string{ + "key": "value", + "another": "test", + }, + shouldError: false, + }, + } + + for _, tc := range testCases { + output, err := ParseLabels(tc.input) + + // Check for unexpected errors or missing errors + if tc.shouldError && err == nil { + t.Errorf("expected error for input %q but got none", tc.input) + continue + } + if !tc.shouldError && err != nil { + t.Errorf("did not expect error for input %q but got: %v", tc.input, err) + continue + } + + // Compare maps if there was no error + if !tc.shouldError { + if len(output) != len(tc.expectedOutput) { + t.Errorf("for input %q, expected map length %d but got %d", tc.input, len(tc.expectedOutput), len(output)) + } + for key, expectedValue := range tc.expectedOutput { + if output[key] != expectedValue { + t.Errorf("for input %q, expected %q=%q but got %q=%q", tc.input, key, expectedValue, key, output[key]) + } + } + } + } +}