From 3723a284d15870cb9a4b8a149f1a7751b5891280 Mon Sep 17 00:00:00 2001 From: Peter Bale Date: Fri, 25 May 2018 16:30:45 +0100 Subject: [PATCH] Add ingress loadbalancer status for elb and Merlin (#174) * Adding ingress status updater tools This is to add the underlying framework needed to add updater status updaters which will allow us to set the status of an ingress. Adding the ingress resource to the `IngressEntry` so that it can be used at a later stage. Also including useful util functions which can be reused by multiple status updaters. * Adding elb status updater This is to add a new status updater for use with the elb's. It will discover all of the elb's with the relevant labels (using existing function as part of the elb updater). After this, it will iterate through the ingress resources and set the relevent elb DNS name based on the lb scheme label. * Adding Merlin status updater This is to add a new status updater for use with Merlin. If enabled, the user will need to specify both `internal-hostname` and `external-hostname` as the updater uses these two values to match each ingress with the two hostnames or ip addresses using the lb scheme label. * Updating README and CHANGELOG This is to add the changes that have been made to update the ingress status to the README and create a new version in the CHANGELOG `v1.10.0`. --- CHANGELOG.md | 4 + README.md | 20 ++ cmd/feed-ingress/main.go | 33 ++- controller/controller.go | 7 +- controller/controller_test.go | 36 ++- controller/ingress_entry.go | 8 +- dns/dns_updater.go | 8 +- dns/dns_updater_test.go | 52 ++--- elb/status/status.go | 72 ++++++ examples/feed-ingress-deployment-merlin.yml | 3 + k8s/client.go | 18 ++ k8s/status/status.go | 78 +++++++ k8s/status/status_test.go | 240 ++++++++++++++++++++ merlin/status/status.go | 64 ++++++ util/test/mocks.go | 6 + 15 files changed, 601 insertions(+), 48 deletions(-) create mode 100644 elb/status/status.go create mode 100644 k8s/status/status.go create mode 100644 k8s/status/status_test.go create mode 100644 merlin/status/status.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d35ba04..5d1cf4b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# v1.10.0 +* Added `k8s/status` library for setting ingress status +* ELB and Merlin updaters set relevant ingress status + # v1.9.3 * Attach to https in addition to http for merlin. * Fix merlin deregistration, which was failing due to long lived connections getting killed. diff --git a/README.md b/README.md index 165a0264..916e47b9 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,26 @@ securityContext: See the [example ingress for gorb](examples/feed-ingress-deployment-gorb.yml) +### Ingress status + +When using either the [elb](#elb) or [Merlin](#merlin) updater, the ingress status will be updated with relevant +loadbalancer information. This can then be used with other controllers such a `external-dns` which can set DNS for any +given ingress using the ingress status. + +#### elb + +feed will automatically discover all of your elb's and then use the `sky.uk/frontend-scheme` annotation to match an elb +label to an ingress. The updater will then set the ingress status to the elb's DNS name. + +#### Merlin + +The Merlin updater is currently unable to auto-discover all hosted vips (virtual ip addresses) on a Merlin server; +instead the status updater supports two different loadbalancer types: `internal` and `internet-facing`. These two vips +are set using the `merlin-vip` and `merlin-internet-facing-vip` flags respectively. + +An ingress can select which loadbalancer it wants to be associated with by setting the `sky.uk/frontend-scheme` +annotation to either `internal` or `internet-facing`. + ### Running feed-ingress on privileged ports feed-ingress can be run on privileged ports by defining the `NET_BIND_SERVICE` Linux capability. diff --git a/cmd/feed-ingress/main.go b/cmd/feed-ingress/main.go index d7dbbef2..6e2d6e66 100644 --- a/cmd/feed-ingress/main.go +++ b/cmd/feed-ingress/main.go @@ -16,9 +16,11 @@ import ( "github.com/sky-uk/feed/alb" "github.com/sky-uk/feed/controller" "github.com/sky-uk/feed/elb" + elb_status "github.com/sky-uk/feed/elb/status" "github.com/sky-uk/feed/gorb" "github.com/sky-uk/feed/k8s" "github.com/sky-uk/feed/merlin" + merlin_status "github.com/sky-uk/feed/merlin/status" "github.com/sky-uk/feed/nginx" "github.com/sky-uk/feed/util/cmd" "github.com/sky-uk/feed/util/metrics" @@ -72,6 +74,7 @@ var ( merlinHealthTimeout time.Duration merlinVIP string merlinVIPInterface string + merlinInternetFacingVIP string ) const ( @@ -276,6 +279,7 @@ func init() { flag.StringVar(&merlinVIP, "merlin-vip", "", "VIP to assign to loopback to support direct route and tunnel.") flag.StringVar(&merlinVIPInterface, "merlin-vip-interface", defaultMerlinVIPInterface, "VIP interface to assign the VIP.") + flag.StringVar(&merlinInternetFacingVIP, "merlin-internet-facing-vip", "", "VIP to assign internet facing DNS.") } func main() { @@ -290,7 +294,7 @@ func main() { } controllerConfig.KubernetesClient = client - controllerConfig.Updaters, err = createIngressUpdaters() + controllerConfig.Updaters, err = createIngressUpdaters(client) if err != nil { log.Fatal("Unable to create ingress updaters: ", err) } @@ -314,7 +318,7 @@ func main() { select {} } -func createIngressUpdaters() ([]controller.Updater, error) { +func createIngressUpdaters(kubernetesClient k8s.Client) ([]controller.Updater, error) { nginxConfig.Ports = []nginx.Port{{Name: "http", Port: ingressPort}} if ingressHTTPSPort != unset { nginxConfig.Ports = append(nginxConfig.Ports, nginx.Port{Name: "https", Port: ingressHTTPSPort}) @@ -338,6 +342,17 @@ func createIngressUpdaters() ([]controller.Updater, error) { } updaters = append(updaters, elbUpdater) + statusConfig := elb_status.Config{ + Region: region, + LabelValue: elbLabelValue, + KubernetesClient: kubernetesClient, + } + elbStatusUpdater, err := elb_status.New(statusConfig) + if err != nil { + return updaters, err + } + updaters = append(updaters, elbStatusUpdater) + case "alb": albUpdater, err := alb.New(region, targetGroupNames, targetGroupDeregistrationDelay) if err != nil { @@ -401,8 +416,20 @@ func createIngressUpdaters() ([]controller.Updater, error) { } updaters = append(updaters, merlinUpdater) + statusConfig := merlin_status.Config{ + InternalVIP: merlinVIP, + InternetFacingVIP: merlinInternetFacingVIP, + KubernetesClient: kubernetesClient, + } + merlinStatusUpdater, err := merlin_status.New(statusConfig) + if err != nil { + return updaters, err + } + updaters = append(updaters, merlinStatusUpdater) + default: - return nil, fmt.Errorf("invalid registration frontend type. Must be either gorb, elb, alb but was %s", registrationFrontendType) + return nil, fmt.Errorf("invalid registration frontend type. Must be either gorb, elb, alb, merlin but"+ + "was %s", registrationFrontendType) } return updaters, nil diff --git a/controller/controller.go b/controller/controller.go index af4c6833..12ccce6a 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -176,14 +176,15 @@ func (c *controller) updateIngresses() error { BackendTimeoutSeconds: c.defaultBackendTimeout, BackendMaxConnections: c.defaultBackendMaxConnections, CreationTimestamp: ingress.CreationTimestamp.Time, + Ingress: ingress, } log.Debugf("Found ingress to update: %s", ingress.Name) - if elbScheme, ok := ingress.Annotations[frontendSchemeAnnotation]; ok { - entry.ELbScheme = elbScheme + if lbScheme, ok := ingress.Annotations[frontendSchemeAnnotation]; ok { + entry.LbScheme = lbScheme } else { - entry.ELbScheme = ingress.Annotations[frontendElbSchemeAnnotation] + entry.LbScheme = ingress.Annotations[frontendElbSchemeAnnotation] } if allow, ok := ingress.Annotations[ingressAllowAnnotation]; ok { diff --git a/controller/controller_test.go b/controller/controller_test.go index 05bd4a98..72a9b61e 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -376,7 +376,7 @@ func TestUpdaterIsUpdatedOnK8sUpdates(t *testing.T) { Path: ingressPath, ServiceAddress: serviceIP, ServicePort: ingressSvcPort, - ELbScheme: "internal", + LbScheme: "internal", Allow: []string{}, BackendTimeoutSeconds: backendTimeout, }}, @@ -392,7 +392,7 @@ func TestUpdaterIsUpdatedOnK8sUpdates(t *testing.T) { Path: ingressPath, ServiceAddress: serviceIP, ServicePort: ingressSvcPort, - ELbScheme: "internal", + LbScheme: "internal", Allow: []string{}, StripPaths: true, BackendTimeoutSeconds: backendTimeout, @@ -409,7 +409,7 @@ func TestUpdaterIsUpdatedOnK8sUpdates(t *testing.T) { Path: ingressPath, ServiceAddress: serviceIP, ServicePort: ingressSvcPort, - ELbScheme: "internal", + LbScheme: "internal", Allow: []string{}, StripPaths: false, BackendTimeoutSeconds: backendTimeout, @@ -426,7 +426,7 @@ func TestUpdaterIsUpdatedOnK8sUpdates(t *testing.T) { Path: ingressPath, ServiceAddress: serviceIP, ServicePort: ingressSvcPort, - ELbScheme: "internal", + LbScheme: "internal", Allow: []string{}, StripPaths: false, BackendTimeoutSeconds: 20, @@ -443,7 +443,7 @@ func TestUpdaterIsUpdatedOnK8sUpdates(t *testing.T) { Path: ingressPath, ServiceAddress: serviceIP, ServicePort: ingressSvcPort, - ELbScheme: "internal", + LbScheme: "internal", Allow: []string{}, StripPaths: false, BackendTimeoutSeconds: backendTimeout, @@ -460,7 +460,7 @@ func TestUpdaterIsUpdatedOnK8sUpdates(t *testing.T) { Path: ingressPath, ServiceAddress: serviceIP, ServicePort: ingressSvcPort, - ELbScheme: "internal", + LbScheme: "internal", Allow: []string{}, StripPaths: false, BackendTimeoutSeconds: 20, @@ -478,7 +478,7 @@ func TestUpdaterIsUpdatedOnK8sUpdates(t *testing.T) { Path: ingressPath, ServiceAddress: serviceIP, ServicePort: ingressSvcPort, - ELbScheme: "internal", + LbScheme: "internal", Allow: []string{}, StripPaths: false, BackendTimeoutSeconds: 20, @@ -489,6 +489,10 @@ func TestUpdaterIsUpdatedOnK8sUpdates(t *testing.T) { for _, test := range tests { fmt.Printf("test: %s\n", test.description) + // add ingress pointers to entries + test.entries = addIngresses(test.ingresses, test.entries) + + // setup clients client := new(fake.FakeClient) updater := new(fakeUpdater) controller := newController(updater, client) @@ -519,6 +523,18 @@ func TestUpdaterIsUpdatedOnK8sUpdates(t *testing.T) { } } +func addIngresses(ingresses []*v1beta1.Ingress, entries IngressEntries) IngressEntries { + if len(ingresses) != len(entries) { + return entries + } + appendedEntries := IngressEntries{} + for i, entry := range entries { + entry.Ingress = ingresses[i] + appendedEntries = append(appendedEntries, entry) + } + return appendedEntries +} + func createLbEntriesFixture() IngressEntries { return []IngressEntry{{ Namespace: ingressNamespace, @@ -528,7 +544,7 @@ func createLbEntriesFixture() IngressEntries { ServiceAddress: serviceIP, ServicePort: ingressSvcPort, Allow: strings.Split(ingressAllow, ","), - ELbScheme: elbScheme, + LbScheme: lbScheme, BackendTimeoutSeconds: backendTimeout, }} } @@ -543,7 +559,7 @@ const ( ingressAllow = "10.82.0.0/16,10.44.0.0/16" ingressDefaultAllow = "10.50.0.0/16,10.1.0.0/16" serviceIP = "10.254.0.82" - elbScheme = "internal" + lbScheme = "internal" stripPath = "MISSING" backendTimeout = 10 defaultMaxConnections = 0 @@ -571,7 +587,7 @@ func createIngressesFixture(host string, serviceName string, servicePort int, al annotations := make(map[string]string) if allow != "MISSING" { annotations[ingressAllowAnnotation] = allow - annotations[schemeAnnotationKey] = elbScheme + annotations[schemeAnnotationKey] = lbScheme } if stripPath != "MISSING" { annotations[stripPathAnnotation] = stripPath diff --git a/controller/ingress_entry.go b/controller/ingress_entry.go index 085868af..e1980f0a 100644 --- a/controller/ingress_entry.go +++ b/controller/ingress_entry.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "time" + + "k8s.io/client-go/pkg/apis/extensions/v1beta1" ) // IngressEntries type @@ -26,8 +28,8 @@ type IngressEntry struct { ServicePort int32 // Allow are the ips or cidrs that are allowed to access the service. Allow []string - // ElbScheme internet-facing or internal will dictate which kind of ELB to attach to - ELbScheme string + // LbScheme internet-facing or internal will dictate which kind of load balancer to attach to. + LbScheme string // StripPaths before forwarding to the backend StripPaths bool // BackendTimeoutSeconds backend timeout @@ -36,6 +38,8 @@ type IngressEntry struct { BackendMaxConnections int // Ingress creation time CreationTimestamp time.Time + // Ingress resource + Ingress *v1beta1.Ingress } // validate returns error if entry has invalid fields. diff --git a/dns/dns_updater.go b/dns/dns_updater.go index a19fc15e..82f0290a 100644 --- a/dns/dns_updater.go +++ b/dns/dns_updater.go @@ -164,8 +164,8 @@ func (u *updater) indexByHost(entries []controller.IngressEntry) (hostToIngress, } if previous, exists := mapping[hostNameWithPeriod]; exists { - if previous.ELbScheme != entry.ELbScheme { - skipped = append(skipped, entry.NamespaceName()+":conflicting-scheme:"+entry.ELbScheme) + if previous.LbScheme != entry.LbScheme { + skipped = append(skipped, entry.NamespaceName()+":conflicting-scheme:"+entry.LbScheme) skippedCount.Inc() } } else { @@ -188,9 +188,9 @@ func (u *updater) createChanges(hostToIngress hostToIngress, var skipped []string for host, entry := range hostToIngress { - dnsDetails, exists := u.schemeToFrontendMap[entry.ELbScheme] + dnsDetails, exists := u.schemeToFrontendMap[entry.LbScheme] if !exists { - skipped = append(skipped, entry.NamespaceName()+":scheme:"+entry.ELbScheme) + skipped = append(skipped, entry.NamespaceName()+":scheme:"+entry.LbScheme) skippedCount.Inc() continue } diff --git a/dns/dns_updater_test.go b/dns/dns_updater_test.go index 5cca9598..5cb3d810 100644 --- a/dns/dns_updater_test.go +++ b/dns/dns_updater_test.go @@ -247,7 +247,7 @@ func TestUpdateRecordSetFail(t *testing.T) { mockR53.mockGetRecords(nil, nil) mockALB.mockDescribeLoadBalancers(albNames, lbDetails, nil) - ingressUpdate := []controller.IngressEntry{{Host: "verification.james.com", ELbScheme: internalScheme}} + ingressUpdate := []controller.IngressEntry{{Host: "verification.james.com", LbScheme: internalScheme}} mockR53.On("UpdateRecordSets", mock.Anything).Return(errors.New("no updates for you")) @@ -278,7 +278,7 @@ func TestRecordSetUpdates(t *testing.T) { Name: "test-entry", Host: "cats.james.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }}, nil, @@ -301,7 +301,7 @@ func TestRecordSetUpdates(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: externalScheme, + LbScheme: externalScheme, ServicePort: 80, }}, []*route53.ResourceRecordSet{{ @@ -368,7 +368,7 @@ func TestRecordSetUpdates(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }}, []*route53.ResourceRecordSet{ @@ -424,7 +424,7 @@ func TestRecordSetUpdates(t *testing.T) { Name: "test-entry", Host: "foo.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, // scheme doesn't match internal @@ -432,7 +432,7 @@ func TestRecordSetUpdates(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: "invalidscheme", + LbScheme: "invalidscheme", ServicePort: 80, }, }, @@ -446,21 +446,21 @@ func TestRecordSetUpdates(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, { Name: "test-entry-blah", Host: "foo.james.com", Path: "/blah/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, { Name: "test-entry-lala", Host: "foo.james.com", Path: "/lala/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, }, @@ -485,14 +485,14 @@ func TestRecordSetUpdates(t *testing.T) { Name: "test-entry", Host: "bar.james.com", Path: "/", - ELbScheme: externalScheme, + LbScheme: externalScheme, ServicePort: 80, }, { Name: "test-entry", Host: "bar.james.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, }, @@ -516,7 +516,7 @@ func TestRecordSetUpdates(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: externalScheme, + LbScheme: externalScheme, ServicePort: 80, }}, []*route53.ResourceRecordSet{{ @@ -577,7 +577,7 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "cats.james.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }}, nil, @@ -602,7 +602,7 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: externalScheme, + LbScheme: externalScheme, ServicePort: 80, }}, []*route53.ResourceRecordSet{{ @@ -676,7 +676,7 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }}, []*route53.ResourceRecordSet{ @@ -737,7 +737,7 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "foo.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, // scheme doesn't match internal @@ -745,7 +745,7 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: "invalidscheme", + LbScheme: "invalidscheme", ServicePort: 80, }, }, @@ -760,21 +760,21 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, { Name: "test-entry-blah", Host: "foo.james.com", Path: "/blah/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, { Name: "test-entry-lala", Host: "foo.james.com", Path: "/lala/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, }, @@ -801,14 +801,14 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "bar.james.com", Path: "/", - ELbScheme: externalScheme, + LbScheme: externalScheme, ServicePort: 80, }, { Name: "test-entry", Host: "bar.james.com", Path: "/", - ELbScheme: internalScheme, + LbScheme: internalScheme, ServicePort: 80, }, }, @@ -834,7 +834,7 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: externalScheme, + LbScheme: externalScheme, ServicePort: 80, }}, []*route53.ResourceRecordSet{{ @@ -856,7 +856,7 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: externalScheme, + LbScheme: externalScheme, ServicePort: 80, }}, []*route53.ResourceRecordSet{{ @@ -890,7 +890,7 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: externalScheme, + LbScheme: externalScheme, ServicePort: 80, }}, []*route53.ResourceRecordSet{{ @@ -923,7 +923,7 @@ func TestRecordSetUpdatesWithAddressArguments(t *testing.T) { Name: "test-entry", Host: "foo.james.com", Path: "/", - ELbScheme: externalScheme, + LbScheme: externalScheme, ServicePort: 80, }}, []*route53.ResourceRecordSet{}, diff --git a/elb/status/status.go b/elb/status/status.go new file mode 100644 index 00000000..2ede8310 --- /dev/null +++ b/elb/status/status.go @@ -0,0 +1,72 @@ +/* +Package status provides an updater for an ELB frontend to update ingress statuses. +*/ +package status + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/sky-uk/feed/controller" + "github.com/sky-uk/feed/elb" + "github.com/sky-uk/feed/k8s" + "k8s.io/client-go/pkg/api/v1" + + aws_elb "github.com/aws/aws-sdk-go/service/elb" + k8s_status "github.com/sky-uk/feed/k8s/status" +) + +// Config for creating a new ELB status updater. +type Config struct { + Region string + LabelValue string + KubernetesClient k8s.Client +} + +// New creates a new ELB frontend status updater. +func New(conf Config) (controller.Updater, error) { + session, err := session.NewSession(&aws.Config{Region: &conf.Region}) + if err != nil { + return nil, fmt.Errorf("unable to create ELB status updater: %v", err) + } + + return &status{ + awsElb: aws_elb.New(session), + labelValue: conf.LabelValue, + loadBalancers: make(map[string]v1.LoadBalancerStatus), + kubernetesClient: conf.KubernetesClient, + }, nil +} + +type status struct { + awsElb elb.ELB + labelValue string + loadBalancers map[string]v1.LoadBalancerStatus + kubernetesClient k8s.Client +} + +// Start discovers the elbs and generates loadBalancer statuses. +func (s *status) Start() error { + clusterFrontEnds, err := elb.FindFrontEndElbs(s.awsElb, s.labelValue) + if err != nil { + return err + } + + for lbLabel, clusterFrontEnd := range clusterFrontEnds { + s.loadBalancers[lbLabel] = k8s_status.GenerateLoadBalancerStatus([]string{clusterFrontEnd.DNSName}) + } + return nil +} + +func (s *status) Stop() error { + return nil +} + +func (s *status) Health() error { + return nil +} + +func (s *status) Update(ingresses controller.IngressEntries) error { + return k8s_status.Update(ingresses, s.loadBalancers, s.kubernetesClient) +} diff --git a/examples/feed-ingress-deployment-merlin.yml b/examples/feed-ingress-deployment-merlin.yml index ec1e8904..13f6bc1d 100644 --- a/examples/feed-ingress-deployment-merlin.yml +++ b/examples/feed-ingress-deployment-merlin.yml @@ -117,6 +117,9 @@ spec: # Interface to bind virtual IP to. - -merlin-vip-interface=lo + # Internet facing Virtual IP. + - -merlin-internet-facing-vip=1.0.0.0 + # Each worker uses a full cpu, so scale up vertically on a box by increasing this value. - -nginx-workers=1 diff --git a/k8s/client.go b/k8s/client.go index d6aea7d7..d0cb57ef 100644 --- a/k8s/client.go +++ b/k8s/client.go @@ -41,6 +41,9 @@ type Client interface { // WatchServices watches for updates to services and notifies the Watcher. WatchServices() Watcher + + // UpdateIngressStatus updates the ingress status with the loadbalancer hostname or ip address. + UpdateIngressStatus(*v1beta1.Ingress) error } type client struct { @@ -143,6 +146,21 @@ func (c *client) createServiceSource() { go controller.Run(make(chan struct{})) } +func (c *client) UpdateIngressStatus(ingress *v1beta1.Ingress) error { + ingressClient := c.clientset.ExtensionsV1beta1().Ingresses(ingress.Namespace) + + currentIng, err := ingressClient.Get(ingress.Name) + if err != nil { + return err + } + + currentIng.Status.LoadBalancer.Ingress = ingress.Status.LoadBalancer.Ingress + + _, err = ingressClient.UpdateStatus(currentIng) + + return err +} + // Implement cache.ResourceEventHandler type handlerWatcher struct { *bufferedWatcher diff --git a/k8s/status/status.go b/k8s/status/status.go new file mode 100644 index 00000000..353f6257 --- /dev/null +++ b/k8s/status/status.go @@ -0,0 +1,78 @@ +package status + +import ( + "net" + "sort" + + "fmt" + + "github.com/sky-uk/feed/controller" + "github.com/sky-uk/feed/k8s" + "k8s.io/client-go/pkg/api/v1" +) + +// GenerateLoadBalancerStatus to convert a slice of strings to ingress loadbalancer objects. +// Allows hostnames or ip addresses and sets the appropriate field. +func GenerateLoadBalancerStatus(endpoints []string) v1.LoadBalancerStatus { + lbs := v1.LoadBalancerStatus{} + for _, ep := range endpoints { + if net.ParseIP(ep) != nil { + lbs.Ingress = append(lbs.Ingress, v1.LoadBalancerIngress{IP: ep}) + } else { + lbs.Ingress = append(lbs.Ingress, v1.LoadBalancerIngress{Hostname: ep}) + } + } + + return lbs +} + +// Update ingresses with current status where unchanged statuses are ignored. +func Update(ingresses controller.IngressEntries, lbs map[string]v1.LoadBalancerStatus, k8sClient k8s.Client) error { + var failedIngresses []string + for _, ingress := range ingresses { + if lb, ok := lbs[ingress.LbScheme]; ok { + if statusUnchanged(ingress.Ingress.Status.LoadBalancer.Ingress, lb.Ingress) { + return nil + } + + ingress.Ingress.Status.LoadBalancer.Ingress = lb.Ingress + + if err := k8sClient.UpdateIngressStatus(ingress.Ingress); err != nil { + failedIngresses = append(failedIngresses, ingress.Name) + } + } + } + + if len(failedIngresses) > 0 { + return fmt.Errorf("Failed to update ingresses: %s", failedIngresses) + } + return nil +} + +func statusUnchanged(existing, new []v1.LoadBalancerIngress) bool { + if len(existing) != len(new) { + return false + } + + sortLoadBalancerStatus(existing) + sortLoadBalancerStatus(new) + for i, loadbalancer := range existing { + if loadbalancer != new[i] { + return false + } + } + + return true +} + +func sortLoadBalancerStatus(lbi []v1.LoadBalancerIngress) { + sort.SliceStable(lbi, func(i, j int) bool { + if lbi[i].IP < lbi[j].IP { + return true + } + if lbi[i].IP > lbi[j].IP { + return false + } + return lbi[i].Hostname < lbi[j].Hostname + }) +} diff --git a/k8s/status/status_test.go b/k8s/status/status_test.go new file mode 100644 index 00000000..630a5a96 --- /dev/null +++ b/k8s/status/status_test.go @@ -0,0 +1,240 @@ +package status + +import ( + "fmt" + "testing" + + "github.com/sky-uk/feed/controller" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/pkg/api/v1" + + "errors" + + fake "github.com/sky-uk/feed/util/test" + "k8s.io/client-go/pkg/apis/extensions/v1beta1" +) + +const ( + defaultHostname = "test.cosmic.sky" + defaultIPAddress = "127.0.0.1" + defaultLBLabel = "internal" + defaultIngressName = "test" +) + +func createDefaultLoadBalancerStatus() v1.LoadBalancerStatus { + return createLoadBalancerStatus(defaultHostname, defaultIPAddress) +} + +func createLoadBalancerStatus(hostname, ip string) v1.LoadBalancerStatus { + return v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{ + Hostname: hostname, + IP: ip, + }}, + } +} + +func createDefaultLBs() map[string]v1.LoadBalancerStatus { + return createLBs(defaultLBLabel, createDefaultLoadBalancerStatus()) +} + +func createLBs(lbLabel string, lbStatus v1.LoadBalancerStatus) map[string]v1.LoadBalancerStatus { + return map[string]v1.LoadBalancerStatus{ + lbLabel: lbStatus, + } +} + +func createDefaultIngresses() controller.IngressEntries { + return createIngresses(defaultIngressName, defaultLBLabel, createDefaultLoadBalancerStatus()) +} + +func createIngresses(name, lbScheme string, lbStatus v1.LoadBalancerStatus) controller.IngressEntries { + return controller.IngressEntries{ + { + Name: name, + LbScheme: lbScheme, + Ingress: &v1beta1.Ingress{ + Status: v1beta1.IngressStatus{ + LoadBalancer: lbStatus, + }, + }, + }, + } +} + +func TestGenerateLoadBalancerStatus(t *testing.T) { + assert := assert.New(t) + + var tests = []struct { + description string + endpoints []string + expected v1.LoadBalancerStatus + }{ + { + description: "single hostname", + endpoints: []string{defaultHostname}, + expected: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{ + Hostname: defaultHostname, + }}, + }, + }, + { + description: "single ip address", + endpoints: []string{defaultIPAddress}, + expected: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{ + IP: defaultIPAddress, + }}, + }, + }, + { + description: "mixture of a hostname and ip address", + endpoints: []string{defaultHostname, defaultIPAddress}, + expected: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + {Hostname: defaultHostname}, + {IP: defaultIPAddress}, + }, + }, + }, + } + for _, test := range tests { + fmt.Printf("test: %s\n", test.description) + assert.Equal(test.expected, GenerateLoadBalancerStatus(test.endpoints)) + } +} + +func TestStatusUnchanged(t *testing.T) { + assert := assert.New(t) + + var tests = []struct { + description string + existingIngressStatus []v1.LoadBalancerIngress + newIngressStatus []v1.LoadBalancerIngress + expected bool + }{ + { + description: "identical ingress statuses", + existingIngressStatus: createDefaultLoadBalancerStatus().Ingress, + newIngressStatus: createDefaultLoadBalancerStatus().Ingress, + expected: true, + }, + { + description: "different ingress hostname status", + existingIngressStatus: createDefaultLoadBalancerStatus().Ingress, + newIngressStatus: createLoadBalancerStatus("changed.cosmic.sky", defaultIPAddress).Ingress, + expected: false, + }, + { + description: "different ingress ip status", + existingIngressStatus: createDefaultLoadBalancerStatus().Ingress, + newIngressStatus: createLoadBalancerStatus(defaultHostname, "0.0.0.0").Ingress, + expected: false, + }, + { + description: "different number of ingress statuses", + existingIngressStatus: []v1.LoadBalancerIngress{}, + newIngressStatus: createDefaultLoadBalancerStatus().Ingress, + expected: false, + }, + } + for _, test := range tests { + fmt.Printf("test: %s\n", test.description) + assert.Equal(test.expected, statusUnchanged(test.existingIngressStatus, test.newIngressStatus)) + } +} + +func TestSortLoadBalancerStatus(t *testing.T) { + assert := assert.New(t) + + var tests = []struct { + description string + lbi []v1.LoadBalancerIngress + expected []v1.LoadBalancerIngress + }{ + { + description: "reorder hostname", + lbi: []v1.LoadBalancerIngress{ + {Hostname: "b-" + defaultHostname}, + {Hostname: "a-" + defaultHostname}, + }, + expected: []v1.LoadBalancerIngress{ + {Hostname: "a-" + defaultHostname}, + {Hostname: "b-" + defaultHostname}, + }, + }, + { + description: "reorder ip addresses", + lbi: []v1.LoadBalancerIngress{ + {IP: "127.0.0.2"}, + {IP: defaultIPAddress}, + }, + expected: []v1.LoadBalancerIngress{ + {IP: defaultIPAddress}, + {IP: "127.0.0.2"}, + }, + }, + { + description: "reorder hostnames and ip addresses", + lbi: []v1.LoadBalancerIngress{ + {IP: "127.0.0.2"}, + {Hostname: "b-" + defaultHostname}, + {IP: defaultIPAddress}, + {Hostname: "a-" + defaultHostname}, + }, + expected: []v1.LoadBalancerIngress{ + {Hostname: "a-" + defaultHostname}, + {Hostname: "b-" + defaultHostname}, + {IP: defaultIPAddress}, + {IP: "127.0.0.2"}, + }, + }, + } + for _, test := range tests { + fmt.Printf("test: %s\n", test.description) + sortLoadBalancerStatus(test.lbi) + assert.Equal(test.expected, test.lbi) + } +} + +func TestUpdate(t *testing.T) { + assert := assert.New(t) + + lbs := createDefaultLBs() + ingresses := createIngresses(defaultIngressName, defaultLBLabel, v1.LoadBalancerStatus{}) + + client := new(fake.FakeClient) + client.On("UpdateIngressStatus").Return(nil) + + err := Update(ingresses, lbs, client) + + assert.NoError(err) +} + +func TestUpdateFails(t *testing.T) { + assert := assert.New(t) + + lbs := createDefaultLBs() + ingresses := createIngresses(defaultIngressName, defaultLBLabel, v1.LoadBalancerStatus{}) + + client := new(fake.FakeClient) + client.On("UpdateIngressStatus").Return(errors.New("failed")) + + err := Update(ingresses, lbs, client) + + assert.Error(err) + assert.EqualError(err, "Failed to update ingresses: [test]") +} + +func TestUpdateDoesNotRunWithNoChange(t *testing.T) { + assert := assert.New(t) + + lbs := createDefaultLBs() + ingresses := createDefaultIngresses() + + client := new(fake.FakeClient) + client.On("UpdateIngressStatus").Return(errors.New("failed")) + + assert.NoError(Update(ingresses, lbs, client)) +} diff --git a/merlin/status/status.go b/merlin/status/status.go new file mode 100644 index 00000000..69141ecc --- /dev/null +++ b/merlin/status/status.go @@ -0,0 +1,64 @@ +/* +Package status provides an updater for a Merlin frontend to update ingress statuses. +*/ +package status + +import ( + "github.com/sky-uk/feed/controller" + "github.com/sky-uk/feed/k8s" + "k8s.io/client-go/pkg/api/v1" + + k8s_status "github.com/sky-uk/feed/k8s/status" +) + +const ( + internalLabelValue = "internal" + internetFacingLabelValue = "internet-facing" +) + +// Config for creating a new Merlin status updater. +type Config struct { + InternalVIP string + InternetFacingVIP string + KubernetesClient k8s.Client +} + +// New creates a new Merlin frontend status updater. +func New(conf Config) (controller.Updater, error) { + return &status{ + vips: map[string]string{ + internalLabelValue: conf.InternalVIP, + internetFacingLabelValue: conf.InternetFacingVIP, + }, + loadBalancers: make(map[string]v1.LoadBalancerStatus), + kubernetesClient: conf.KubernetesClient, + }, nil +} + +type status struct { + vips map[string]string + loadBalancers map[string]v1.LoadBalancerStatus + kubernetesClient k8s.Client +} + +// Start generates loadBalancer statuses from valid vips. +func (s *status) Start() error { + for lbLabel, vip := range s.vips { + if vip != "" { + s.loadBalancers[lbLabel] = k8s_status.GenerateLoadBalancerStatus([]string{vip}) + } + } + return nil +} + +func (s *status) Stop() error { + return nil +} + +func (s *status) Health() error { + return nil +} + +func (s *status) Update(ingresses controller.IngressEntries) error { + return k8s_status.Update(ingresses, s.loadBalancers, s.kubernetesClient) +} diff --git a/util/test/mocks.go b/util/test/mocks.go index 7373fa5f..b651ae09 100644 --- a/util/test/mocks.go +++ b/util/test/mocks.go @@ -36,6 +36,12 @@ func (c *FakeClient) WatchServices() k8s.Watcher { return r.Get(0).(k8s.Watcher) } +// UpdateIngressStatus mocks out calls to UpdateIngressStatus +func (c *FakeClient) UpdateIngressStatus(*v1beta1.Ingress) error { + r := c.Called() + return r.Error(0) +} + func (c *FakeClient) String() string { return "FakeClient" }