Skip to content

Commit 3371766

Browse files
author
Kubernetes Submit Queue
authored
Merge pull request kubernetes#38996 from dcbw/proxy-sync-fewer-services
Automatic merge from submit-queue proxy/iptables: don't sync proxy rules if services map didn't change Build the service map in a separate testable function. Return that map instead of changing proxier.serviceMap directly. Use reflect.DeepEqual() to skip syncing proxy rules if nothing actually changed. @thockin @kubernetes/rh-networking @kubernetes/sig-network-misc @timothysc @wojtek-t @jeremyeder @caseydavenport
2 parents 671ba93 + 5907639 commit 3371766

File tree

3 files changed

+392
-101
lines changed

3 files changed

+392
-101
lines changed

pkg/proxy/iptables/BUILD

+2
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@ go_test(
4040
tags = ["automanaged"],
4141
deps = [
4242
"//pkg/api:go_default_library",
43+
"//pkg/api/service:go_default_library",
4344
"//pkg/proxy:go_default_library",
4445
"//pkg/util/exec:go_default_library",
46+
"//pkg/util/intstr:go_default_library",
4547
"//pkg/util/iptables:go_default_library",
4648
"//pkg/util/iptables/testing:go_default_library",
4749
"//vendor:k8s.io/apimachinery/pkg/types",

pkg/proxy/iptables/proxier.go

+102-101
Original file line numberDiff line numberDiff line change
@@ -153,18 +153,44 @@ type endpointsInfo struct {
153153
}
154154

155155
// returns a new serviceInfo struct
156-
func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
157-
return &serviceInfo{
158-
sessionAffinityType: api.ServiceAffinityNone, // default
159-
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
156+
func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
157+
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
158+
info := &serviceInfo{
159+
clusterIP: net.ParseIP(service.Spec.ClusterIP),
160+
port: int(port.Port),
161+
protocol: port.Protocol,
162+
nodePort: int(port.NodePort),
163+
// Deep-copy in case the service instance changes
164+
loadBalancerStatus: *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer),
165+
sessionAffinityType: service.Spec.SessionAffinity,
166+
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
167+
externalIPs: make([]string, len(service.Spec.ExternalIPs)),
168+
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
169+
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
170+
}
171+
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
172+
copy(info.externalIPs, service.Spec.ExternalIPs)
173+
174+
if info.onlyNodeLocalEndpoints {
175+
p := apiservice.GetServiceHealthCheckNodePort(service)
176+
if p == 0 {
177+
glog.Errorf("Service does not contain necessary annotation %v",
178+
apiservice.BetaAnnotationHealthCheckNodePort)
179+
} else {
180+
info.healthCheckNodePort = int(p)
181+
}
160182
}
183+
184+
return info
161185
}
162186

187+
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
188+
163189
// Proxier is an iptables based proxy for connections between a localhost:lport
164190
// and services that provide the actual backends.
165191
type Proxier struct {
166192
mu sync.Mutex // protects the following fields
167-
serviceMap map[proxy.ServicePortName]*serviceInfo
193+
serviceMap proxyServiceMap
168194
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
169195
portsMap map[localPort]closeable
170196
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
@@ -278,7 +304,7 @@ func NewProxier(ipt utiliptables.Interface,
278304
}
279305

280306
return &Proxier{
281-
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
307+
serviceMap: make(proxyServiceMap),
282308
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
283309
portsMap: make(map[localPort]closeable),
284310
syncPeriod: syncPeriod,
@@ -382,44 +408,6 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
382408
return encounteredError
383409
}
384410

385-
func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
386-
if info.protocol != port.Protocol || info.port != int(port.Port) || info.nodePort != int(port.NodePort) {
387-
return false
388-
}
389-
if !info.clusterIP.Equal(net.ParseIP(service.Spec.ClusterIP)) {
390-
return false
391-
}
392-
if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) {
393-
return false
394-
}
395-
if !api.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) {
396-
return false
397-
}
398-
if info.sessionAffinityType != service.Spec.SessionAffinity {
399-
return false
400-
}
401-
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly()
402-
if info.onlyNodeLocalEndpoints != onlyNodeLocalEndpoints {
403-
return false
404-
}
405-
if !reflect.DeepEqual(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) {
406-
return false
407-
}
408-
return true
409-
}
410-
411-
func ipsEqual(lhs, rhs []string) bool {
412-
if len(lhs) != len(rhs) {
413-
return false
414-
}
415-
for i := range lhs {
416-
if lhs[i] != rhs[i] {
417-
return false
418-
}
419-
}
420-
return true
421-
}
422-
423411
// Sync is called to immediately synchronize the proxier state to iptables
424412
func (proxier *Proxier) Sync() {
425413
proxier.mu.Lock()
@@ -438,18 +426,18 @@ func (proxier *Proxier) SyncLoop() {
438426
}
439427
}
440428

441-
// OnServiceUpdate tracks the active set of service proxies.
442-
// They will be synchronized using syncProxyRules()
443-
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
444-
start := time.Now()
445-
defer func() {
446-
glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
447-
}()
448-
proxier.mu.Lock()
449-
defer proxier.mu.Unlock()
450-
proxier.haveReceivedServiceUpdate = true
429+
type healthCheckPort struct {
430+
namespace types.NamespacedName
431+
nodeport int
432+
}
451433

452-
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
434+
// Accepts a list of Services and the existing service map. Returns the new
435+
// service map, a list of healthcheck ports to add to or remove from the health
436+
// checking listener service, and a set of stale UDP services.
437+
func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
438+
newServiceMap := make(proxyServiceMap)
439+
healthCheckAdd := make([]healthCheckPort, 0)
440+
healthCheckDel := make([]healthCheckPort, 0)
453441

454442
for i := range allServices {
455443
service := &allServices[i]
@@ -463,6 +451,11 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
463451
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
464452
continue
465453
}
454+
// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
455+
if service.Spec.Type == api.ServiceTypeExternalName {
456+
glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
457+
continue
458+
}
466459

467460
for i := range service.Spec.Ports {
468461
servicePort := &service.Spec.Ports[i]
@@ -471,72 +464,80 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
471464
NamespacedName: svcName,
472465
Port: servicePort.Name,
473466
}
474-
activeServices[serviceName] = true
475-
info, exists := proxier.serviceMap[serviceName]
476-
if exists && proxier.sameConfig(info, service, servicePort) {
477-
// Nothing changed.
478-
continue
479-
}
480-
if exists {
481-
// Something changed.
482-
glog.V(3).Infof("Something changed for service %q: removing it", serviceName)
483-
delete(proxier.serviceMap, serviceName)
467+
468+
info := newServiceInfo(serviceName, servicePort, service)
469+
oldInfo, exists := oldServiceMap[serviceName]
470+
equal := reflect.DeepEqual(info, oldInfo)
471+
if !exists {
472+
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
473+
} else if !equal {
474+
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
484475
}
485-
serviceIP := net.ParseIP(service.Spec.ClusterIP)
486-
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
487-
info = newServiceInfo(serviceName)
488-
info.clusterIP = serviceIP
489-
info.port = int(servicePort.Port)
490-
info.protocol = servicePort.Protocol
491-
info.nodePort = int(servicePort.NodePort)
492-
info.externalIPs = service.Spec.ExternalIPs
493-
// Deep-copy in case the service instance changes
494-
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
495-
info.sessionAffinityType = service.Spec.SessionAffinity
496-
info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges
497-
info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
498-
if info.onlyNodeLocalEndpoints {
499-
p := apiservice.GetServiceHealthCheckNodePort(service)
500-
if p == 0 {
501-
glog.Errorf("Service does not contain necessary annotation %v",
502-
apiservice.BetaAnnotationHealthCheckNodePort)
476+
477+
if !exists || !equal {
478+
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
479+
healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort})
503480
} else {
504-
glog.V(4).Infof("Adding health check for %+v, port %v", serviceName.NamespacedName, p)
505-
info.healthCheckNodePort = int(p)
506-
// Turn on healthcheck responder to listen on the health check nodePort
507-
healthcheck.AddServiceListener(serviceName.NamespacedName, info.healthCheckNodePort)
481+
healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0})
508482
}
509-
} else {
510-
glog.V(4).Infof("Deleting health check for %+v", serviceName.NamespacedName)
511-
// Delete healthcheck responders, if any, previously listening for this service
512-
healthcheck.DeleteServiceListener(serviceName.NamespacedName, 0)
513483
}
514-
proxier.serviceMap[serviceName] = info
515484

485+
newServiceMap[serviceName] = info
516486
glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
517487
}
518488
}
519489

520490
staleUDPServices := sets.NewString()
521491
// Remove serviceports missing from the update.
522-
for name, info := range proxier.serviceMap {
523-
if !activeServices[name] {
492+
for name, info := range oldServiceMap {
493+
if _, exists := newServiceMap[name]; !exists {
524494
glog.V(1).Infof("Removing service %q", name)
525495
if info.protocol == api.ProtocolUDP {
526496
staleUDPServices.Insert(info.clusterIP.String())
527497
}
528-
delete(proxier.serviceMap, name)
529498
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
530-
// Remove ServiceListener health check nodePorts from the health checker
531-
// TODO - Stats
532-
glog.V(4).Infof("Deleting health check for %+v, port %v", name.NamespacedName, info.healthCheckNodePort)
533-
healthcheck.DeleteServiceListener(name.NamespacedName, info.healthCheckNodePort)
499+
healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort})
534500
}
535501
}
536502
}
537-
proxier.syncProxyRules()
538-
proxier.deleteServiceConnections(staleUDPServices.List())
539503

504+
return newServiceMap, healthCheckAdd, healthCheckDel, staleUDPServices
505+
}
506+
507+
// OnServiceUpdate tracks the active set of service proxies.
508+
// They will be synchronized using syncProxyRules()
509+
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
510+
start := time.Now()
511+
defer func() {
512+
glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
513+
}()
514+
proxier.mu.Lock()
515+
defer proxier.mu.Unlock()
516+
proxier.haveReceivedServiceUpdate = true
517+
518+
newServiceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(allServices, proxier.serviceMap)
519+
for _, hc := range hcAdd {
520+
glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport)
521+
// Turn on healthcheck responder to listen on the health check nodePort
522+
// FIXME: handle failures from adding the service
523+
healthcheck.AddServiceListener(hc.namespace, hc.nodeport)
524+
}
525+
for _, hc := range hcDel {
526+
// Remove ServiceListener health check nodePorts from the health checker
527+
// TODO - Stats
528+
glog.V(4).Infof("Deleting health check for %+v, port %v", hc.namespace, hc.nodeport)
529+
// FIXME: handle failures from deleting the service
530+
healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport)
531+
}
532+
533+
if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) {
534+
proxier.serviceMap = newServiceMap
535+
proxier.syncProxyRules()
536+
} else {
537+
glog.V(4).Infof("Skipping proxy iptables rule sync on service update because nothing changed")
538+
}
539+
540+
proxier.deleteServiceConnections(staleUDPServices.List())
540541
}
541542

542543
// Generate a list of ip strings from the list of endpoint infos

0 commit comments

Comments
 (0)