Skip to content

Commit 5907639

Browse files
committed
proxy/iptables: clean up service map creation
Instead of copying the map, like OnServicesUpdate() used to do and which was copied into buildServiceMap() to preserve semantics while creating testcases, start with a new empty map and do deletion checking later.
1 parent 6aa784e commit 5907639

File tree

2 files changed

+137
-85
lines changed

2 files changed

+137
-85
lines changed

pkg/proxy/iptables/proxier.go

+45-85
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,35 @@ type endpointsInfo struct {
153153
}
154154

155155
// returns a new serviceInfo struct
156-
func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
157-
return &serviceInfo{
158-
sessionAffinityType: api.ServiceAffinityNone, // default
159-
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
156+
func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
157+
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
158+
info := &serviceInfo{
159+
clusterIP: net.ParseIP(service.Spec.ClusterIP),
160+
port: int(port.Port),
161+
protocol: port.Protocol,
162+
nodePort: int(port.NodePort),
163+
// Deep-copy in case the service instance changes
164+
loadBalancerStatus: *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer),
165+
sessionAffinityType: service.Spec.SessionAffinity,
166+
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
167+
externalIPs: make([]string, len(service.Spec.ExternalIPs)),
168+
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
169+
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
170+
}
171+
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
172+
copy(info.externalIPs, service.Spec.ExternalIPs)
173+
174+
if info.onlyNodeLocalEndpoints {
175+
p := apiservice.GetServiceHealthCheckNodePort(service)
176+
if p == 0 {
177+
glog.Errorf("Service does not contain necessary annotation %v",
178+
apiservice.BetaAnnotationHealthCheckNodePort)
179+
} else {
180+
info.healthCheckNodePort = int(p)
181+
}
160182
}
183+
184+
return info
161185
}
162186

163187
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
@@ -384,44 +408,6 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
384408
return encounteredError
385409
}
386410

387-
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
388-
if info.protocol != port.Protocol || info.port != int(port.Port) || info.nodePort != int(port.NodePort) {
389-
return false
390-
}
391-
if !info.clusterIP.Equal(net.ParseIP(service.Spec.ClusterIP)) {
392-
return false
393-
}
394-
if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) {
395-
return false
396-
}
397-
if !api.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) {
398-
return false
399-
}
400-
if info.sessionAffinityType != service.Spec.SessionAffinity {
401-
return false
402-
}
403-
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly()
404-
if info.onlyNodeLocalEndpoints != onlyNodeLocalEndpoints {
405-
return false
406-
}
407-
if !reflect.DeepEqual(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) {
408-
return false
409-
}
410-
return true
411-
}
412-
413-
func ipsEqual(lhs, rhs []string) bool {
414-
if len(lhs) != len(rhs) {
415-
return false
416-
}
417-
for i := range lhs {
418-
if lhs[i] != rhs[i] {
419-
return false
420-
}
421-
}
422-
return true
423-
}
424-
425411
// Sync is called to immediately synchronize the proxier state to iptables
426412
func (proxier *Proxier) Sync() {
427413
proxier.mu.Lock()
@@ -449,16 +435,10 @@ type healthCheckPort struct {
449435
// service map, a list of healthcheck ports to add to or remove from the health
450436
// checking listener service, and a set of stale UDP services.
451437
func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
438+
newServiceMap := make(proxyServiceMap)
452439
healthCheckAdd := make([]healthCheckPort, 0)
453440
healthCheckDel := make([]healthCheckPort, 0)
454441

455-
newServiceMap := make(proxyServiceMap)
456-
for key, value := range oldServiceMap {
457-
newServiceMap[key] = value
458-
}
459-
460-
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
461-
462442
for i := range allServices {
463443
service := &allServices[i]
464444
svcName := types.NamespacedName{
@@ -484,57 +464,37 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (
484464
NamespacedName: svcName,
485465
Port: servicePort.Name,
486466
}
487-
activeServices[serviceName] = true
488-
info, exists := newServiceMap[serviceName]
489-
if exists {
490-
if sameConfig(info, service, servicePort) {
491-
// Nothing changed.
492-
continue
493-
}
494-
// Something changed.
495-
glog.V(3).Infof("Something changed for service %q: removing it", serviceName)
496-
delete(newServiceMap, serviceName)
467+
468+
info := newServiceInfo(serviceName, servicePort, service)
469+
oldInfo, exists := oldServiceMap[serviceName]
470+
equal := reflect.DeepEqual(info, oldInfo)
471+
if !exists {
472+
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
473+
} else if !equal {
474+
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
497475
}
498-
serviceIP := net.ParseIP(service.Spec.ClusterIP)
499-
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
500-
info = newServiceInfo(serviceName)
501-
info.clusterIP = serviceIP
502-
info.port = int(servicePort.Port)
503-
info.protocol = servicePort.Protocol
504-
info.nodePort = int(servicePort.NodePort)
505-
info.externalIPs = service.Spec.ExternalIPs
506-
// Deep-copy in case the service instance changes
507-
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
508-
info.sessionAffinityType = service.Spec.SessionAffinity
509-
info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges
510-
info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
511-
if info.onlyNodeLocalEndpoints {
512-
p := apiservice.GetServiceHealthCheckNodePort(service)
513-
if p == 0 {
514-
glog.Errorf("Service does not contain necessary annotation %v",
515-
apiservice.BetaAnnotationHealthCheckNodePort)
516-
} else {
517-
info.healthCheckNodePort = int(p)
476+
477+
if !exists || !equal {
478+
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
518479
healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort})
480+
} else {
481+
healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0})
519482
}
520-
} else {
521-
healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0})
522483
}
523-
newServiceMap[serviceName] = info
524484

485+
newServiceMap[serviceName] = info
525486
glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
526487
}
527488
}
528489

529490
staleUDPServices := sets.NewString()
530491
// Remove serviceports missing from the update.
531-
for name, info := range newServiceMap {
532-
if !activeServices[name] {
492+
for name, info := range oldServiceMap {
493+
if _, exists := newServiceMap[name]; !exists {
533494
glog.V(1).Infof("Removing service %q", name)
534495
if info.protocol == api.ProtocolUDP {
535496
staleUDPServices.Insert(info.clusterIP.String())
536497
}
537-
delete(newServiceMap, name)
538498
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
539499
healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort})
540500
}

pkg/proxy/iptables/proxier_test.go

+92
Original file line numberDiff line numberDiff line change
@@ -1079,4 +1079,96 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
10791079
}
10801080
}
10811081

1082+
func TestBuildServiceMapServiceUpdate(t *testing.T) {
1083+
first := []api.Service{
1084+
makeTestService("somewhere", "some-service", func(svc *api.Service) {
1085+
svc.Spec.Type = api.ServiceTypeClusterIP
1086+
svc.Spec.ClusterIP = "172.16.55.4"
1087+
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
1088+
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
1089+
}),
1090+
}
1091+
1092+
second := []api.Service{
1093+
makeTestService("somewhere", "some-service", func(svc *api.Service) {
1094+
svc.ObjectMeta.Annotations = map[string]string{
1095+
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
1096+
service.BetaAnnotationHealthCheckNodePort: "345",
1097+
}
1098+
svc.Spec.Type = api.ServiceTypeLoadBalancer
1099+
svc.Spec.ClusterIP = "172.16.55.4"
1100+
svc.Spec.LoadBalancerIP = "5.6.7.8"
1101+
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
1102+
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
1103+
svc.Status.LoadBalancer = api.LoadBalancerStatus{
1104+
Ingress: []api.LoadBalancerIngress{
1105+
{IP: "10.1.2.3"},
1106+
},
1107+
}
1108+
}),
1109+
}
1110+
1111+
serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(first, make(proxyServiceMap))
1112+
if len(serviceMap) != 2 {
1113+
t.Errorf("expected service map length 2, got %v", serviceMap)
1114+
}
1115+
if len(hcAdd) != 0 {
1116+
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
1117+
}
1118+
if len(hcDel) != 2 {
1119+
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
1120+
}
1121+
if len(staleUDPServices) != 0 {
1122+
// Services only added, so nothing stale yet
1123+
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
1124+
}
1125+
1126+
// Change service to load-balancer
1127+
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(second, serviceMap)
1128+
if len(serviceMap) != 2 {
1129+
t.Errorf("expected service map length 2, got %v", serviceMap)
1130+
}
1131+
if len(hcAdd) != 2 {
1132+
t.Errorf("expected healthcheck add length 2, got %v", hcAdd)
1133+
}
1134+
if len(hcDel) != 0 {
1135+
t.Errorf("expected healthcheck add length 2, got %v", hcDel)
1136+
}
1137+
if len(staleUDPServices) != 0 {
1138+
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
1139+
}
1140+
1141+
// No change; make sure the service map stays the same and there are
1142+
// no health-check changes
1143+
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(second, serviceMap)
1144+
if len(serviceMap) != 2 {
1145+
t.Errorf("expected service map length 2, got %v", serviceMap)
1146+
}
1147+
if len(hcAdd) != 0 {
1148+
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
1149+
}
1150+
if len(hcDel) != 0 {
1151+
t.Errorf("expected healthcheck add length 2, got %v", hcDel)
1152+
}
1153+
if len(staleUDPServices) != 0 {
1154+
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
1155+
}
1156+
1157+
// And back to ClusterIP
1158+
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(first, serviceMap)
1159+
if len(serviceMap) != 2 {
1160+
t.Errorf("expected service map length 2, got %v", serviceMap)
1161+
}
1162+
if len(hcAdd) != 0 {
1163+
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
1164+
}
1165+
if len(hcDel) != 2 {
1166+
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
1167+
}
1168+
if len(staleUDPServices) != 0 {
1169+
// Services only added, so nothing stale yet
1170+
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
1171+
}
1172+
}
1173+
10821174
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.

0 commit comments

Comments
 (0)