Skip to content

Commit 12f565e

Browse files
committed
Merge branch 'feat/ingressv1-statussync' into next
This PR makes `statusSync` update the `status` of Ingress v1 resources in Store just like it does for Ingress v1beta1. Includes a test (equivalent to the v1beta1 test). Notes to reviewers: - This implementation (and test) is an almost direct copy of the preexisting v1beta1 code, including todos and bugs (most notably Kong#829). For review comments regarding v1 functionality, please think whether the same problem applies to v1beta1. For such (preexisting) problems, use your best judgment to tell if a fix should belong in scope of this PR (which I define as "introduce v1 syncing") or not (and be therefore reported as an issue affecting both v1beta1 and v1 to be fixed later) - Consider not squashing when merging From Kong#830
2 parents 9df75e6 + 7ce6757 commit 12f565e

File tree

2 files changed

+243
-44
lines changed

2 files changed

+243
-44
lines changed

internal/ingress/status/status.go

+76-21
Original file line numberDiff line numberDiff line change
@@ -312,37 +312,41 @@ func sliceToStatus(endpoints []string) []apiv1.LoadBalancerIngress {
312312

313313
// updateStatus changes the status information of Ingress rules
314314
func (s *statusSync) updateStatus(ctx context.Context, newIngressPoint []apiv1.LoadBalancerIngress) {
315-
ings := s.IngressLister.ListIngressesV1beta1()
316-
tcpIngresses, err := s.IngressLister.ListTCPIngresses()
317-
if err != nil {
318-
s.Logger.Errorf("failed to list TPCIngresses: %v", err)
319-
}
320-
knativeIngresses, err := s.IngressLister.ListKnativeIngresses()
321-
if err != nil {
322-
s.Logger.Errorf("failed to list Knative Ingresses: %v", err)
323-
}
324-
325315
p := pool.NewLimited(10)
326316
defer p.Close()
327317

328318
batch := p.Batch()
329319

330-
for _, ing := range ings {
331-
batch.Queue(s.runUpdate(ctx, ing, newIngressPoint, s.CoreClient))
320+
for _, ing := range s.IngressLister.ListIngressesV1beta1() {
321+
batch.Queue(s.runUpdateIngressV1beta1(ctx, ing, newIngressPoint, s.CoreClient))
332322
}
333-
for _, ing := range tcpIngresses {
334-
batch.Queue(s.runUpdateTCPIngress(ctx, ing, newIngressPoint, s.KongConfigClient))
323+
324+
for _, ing := range s.IngressLister.ListIngressesV1() {
325+
batch.Queue(s.runUpdateIngressV1(ctx, ing, newIngressPoint, s.CoreClient))
326+
}
327+
328+
if tcpIngresses, err := s.IngressLister.ListTCPIngresses(); err != nil {
329+
s.Logger.Errorf("failed to list TPCIngresses: %v", err)
330+
} else {
331+
for _, ing := range tcpIngresses {
332+
batch.Queue(s.runUpdateTCPIngress(ctx, ing, newIngressPoint, s.KongConfigClient))
333+
}
335334
}
336-
for _, ing := range knativeIngresses {
337-
batch.Queue(s.runUpdateKnativeIngress(ctx, ing, newIngressPoint, s.KnativeClient))
335+
336+
if knativeIngresses, err := s.IngressLister.ListKnativeIngresses(); err != nil {
337+
s.Logger.Errorf("failed to list Knative Ingresses: %v", err)
338+
} else {
339+
for _, ing := range knativeIngresses {
340+
batch.Queue(s.runUpdateKnativeIngress(ctx, ing, newIngressPoint, s.KnativeClient))
341+
}
338342
}
339343

340344
batch.QueueComplete()
341345
batch.WaitAll()
342346
}
343347

344-
func (s *statusSync) runUpdate(ctx context.Context, ing *networkingv1beta1.Ingress, status []apiv1.LoadBalancerIngress,
345-
client clientset.Interface) pool.WorkFunc {
348+
func (s *statusSync) runUpdateIngressV1beta1(ctx context.Context, ing *networkingv1beta1.Ingress,
349+
status []apiv1.LoadBalancerIngress, client clientset.Interface) pool.WorkFunc {
346350
return func(wu pool.WorkUnit) (interface{}, error) {
347351
if wu.IsCancelled() {
348352
return nil, nil
@@ -352,7 +356,7 @@ func (s *statusSync) runUpdate(ctx context.Context, ing *networkingv1beta1.Ingre
352356
"ingress_namespace": ing.Namespace,
353357
"ingress_name": ing.Name,
354358
})
355-
sort.SliceStable(status, lessLoadBalancerIngress(status))
359+
sort.SliceStable(status, lessLoadBalancerIngress(status)) // BUG: data race - see issue #829
356360

357361
curIPs := ing.Status.LoadBalancer.Ingress
358362
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
@@ -363,6 +367,14 @@ func (s *statusSync) runUpdate(ctx context.Context, ing *networkingv1beta1.Ingre
363367
}
364368

365369
switch s.IngressAPI {
370+
case utils.NetworkingV1:
371+
// I expect this case to never happen, because if s.IngressAPI == NetworkingV1, then I expect Store to have only
372+
// v1 ingresses (and no v1beta1 ingresses). If Store happens to have a v1beta1 Ingress nonetheless, I'm choosing
373+
// not to drop it, but to log a warning and talk networking.k8s.io/v1beta1 (as opposed to extensions/v1beta1)
374+
// because a v1-supporting Kubernetes API is more likely to support the former than the latter.
375+
logger.Warnf("statusSync got an unexpected v1beta1 Ingress when it expected v1")
376+
fallthrough
377+
366378
case utils.NetworkingV1beta1:
367379
ingClient := client.NetworkingV1beta1().Ingresses(ing.Namespace)
368380

@@ -407,6 +419,49 @@ func (s *statusSync) runUpdate(ctx context.Context, ing *networkingv1beta1.Ingre
407419
}
408420
}
409421

422+
func (s *statusSync) runUpdateIngressV1(ctx context.Context, ing *networkingv1.Ingress,
423+
status []apiv1.LoadBalancerIngress, client clientset.Interface) pool.WorkFunc {
424+
return func(wu pool.WorkUnit) (interface{}, error) {
425+
if wu.IsCancelled() {
426+
return nil, nil
427+
}
428+
429+
logger := s.Logger.WithFields(logrus.Fields{
430+
"ingress_namespace": ing.Namespace,
431+
"ingress_name": ing.Name,
432+
})
433+
sort.SliceStable(status, lessLoadBalancerIngress(status)) // BUG: data race - see issue #829
434+
435+
curIPs := ing.Status.LoadBalancer.Ingress
436+
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
437+
438+
if ingressSliceEqual(status, curIPs) {
439+
logger.Debugf("no change in status, update skipped")
440+
return true, nil
441+
}
442+
443+
ingClient := client.NetworkingV1().Ingresses(ing.Namespace)
444+
445+
currIng, err := ingClient.Get(ctx, ing.Name, metav1.GetOptions{})
446+
if err != nil {
447+
return nil, fmt.Errorf("failed to fetch Ingress %v/%v: %w", ing.Namespace, ing.Name, err)
448+
}
449+
450+
logger.WithField("ingress_status", status).Debugf("attempting to update ingress status")
451+
currIng.Status.LoadBalancer.Ingress = status
452+
_, err = ingClient.UpdateStatus(ctx, currIng, metav1.UpdateOptions{})
453+
if err != nil {
454+
// TODO return this error?
455+
logger.Errorf("failed to update ingress status: %v", err)
456+
} else {
457+
logger.WithField("ingress_status", status).Debugf("successfully updated ingress status")
458+
}
459+
460+
return true, nil
461+
462+
}
463+
}
464+
410465
func toCoreLBStatus(knativeLBStatus *knative.LoadBalancerStatus) []apiv1.LoadBalancerIngress {
411466
var res []apiv1.LoadBalancerIngress
412467
if knativeLBStatus == nil {
@@ -447,7 +502,7 @@ func (s *statusSync) runUpdateKnativeIngress(ctx context.Context,
447502
"ingress_namespace": ing.Namespace,
448503
"ingress_name": ing.Name,
449504
})
450-
sort.SliceStable(status, lessLoadBalancerIngress(status))
505+
sort.SliceStable(status, lessLoadBalancerIngress(status)) // BUG: data race - see issue #829
451506
curIPs := toCoreLBStatus(ing.Status.PublicLoadBalancer)
452507
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
453508

@@ -507,7 +562,7 @@ func (s *statusSync) runUpdateTCPIngress(ctx context.Context,
507562
"ingress_namespace": ing.Namespace,
508563
"ingress_name": ing.Name,
509564
})
510-
sort.SliceStable(status, lessLoadBalancerIngress(status))
565+
sort.SliceStable(status, lessLoadBalancerIngress(status)) // BUG: data race - see issue #829
511566

512567
curIPs := ing.Status.LoadBalancer.Ingress
513568
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))

0 commit comments

Comments
 (0)