Skip to content

Commit 7ce6757

Browse files
committed
feat(status): implement ingress v1 status syncing
This is merely a copy-paste of existing v1beta1 functionality; resolution of TODOs is out of scope of this commit.
1 parent 6ce395b commit 7ce6757

File tree

2 files changed

+235
-38
lines changed

2 files changed

+235
-38
lines changed

internal/ingress/status/status.go

Lines changed: 75 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -312,37 +312,41 @@ func sliceToStatus(endpoints []string) []apiv1.LoadBalancerIngress {
312312

313313
// updateStatus changes the status information of Ingress rules
314314
func (s *statusSync) updateStatus(ctx context.Context, newIngressPoint []apiv1.LoadBalancerIngress) {
315-
ings := s.IngressLister.ListIngressesV1beta1()
316-
tcpIngresses, err := s.IngressLister.ListTCPIngresses()
317-
if err != nil {
318-
s.Logger.Errorf("failed to list TPCIngresses: %v", err)
319-
}
320-
knativeIngresses, err := s.IngressLister.ListKnativeIngresses()
321-
if err != nil {
322-
s.Logger.Errorf("failed to list Knative Ingresses: %v", err)
323-
}
324-
325315
p := pool.NewLimited(10)
326316
defer p.Close()
327317

328318
batch := p.Batch()
329319

330-
for _, ing := range ings {
320+
for _, ing := range s.IngressLister.ListIngressesV1beta1() {
331321
batch.Queue(s.runUpdateIngressV1beta1(ctx, ing, newIngressPoint, s.CoreClient))
332322
}
333-
for _, ing := range tcpIngresses {
334-
batch.Queue(s.runUpdateTCPIngress(ctx, ing, newIngressPoint, s.KongConfigClient))
323+
324+
for _, ing := range s.IngressLister.ListIngressesV1() {
325+
batch.Queue(s.runUpdateIngressV1(ctx, ing, newIngressPoint, s.CoreClient))
326+
}
327+
328+
if tcpIngresses, err := s.IngressLister.ListTCPIngresses(); err != nil {
329+
s.Logger.Errorf("failed to list TPCIngresses: %v", err)
330+
} else {
331+
for _, ing := range tcpIngresses {
332+
batch.Queue(s.runUpdateTCPIngress(ctx, ing, newIngressPoint, s.KongConfigClient))
333+
}
335334
}
336-
for _, ing := range knativeIngresses {
337-
batch.Queue(s.runUpdateKnativeIngress(ctx, ing, newIngressPoint, s.KnativeClient))
335+
336+
if knativeIngresses, err := s.IngressLister.ListKnativeIngresses(); err != nil {
337+
s.Logger.Errorf("failed to list Knative Ingresses: %v", err)
338+
} else {
339+
for _, ing := range knativeIngresses {
340+
batch.Queue(s.runUpdateKnativeIngress(ctx, ing, newIngressPoint, s.KnativeClient))
341+
}
338342
}
339343

340344
batch.QueueComplete()
341345
batch.WaitAll()
342346
}
343347

344-
func (s *statusSync) runUpdateIngressV1beta1(ctx context.Context, ing *networkingv1beta1.Ingress, status []apiv1.LoadBalancerIngress,
345-
client clientset.Interface) pool.WorkFunc {
348+
func (s *statusSync) runUpdateIngressV1beta1(ctx context.Context, ing *networkingv1beta1.Ingress,
349+
status []apiv1.LoadBalancerIngress, client clientset.Interface) pool.WorkFunc {
346350
return func(wu pool.WorkUnit) (interface{}, error) {
347351
if wu.IsCancelled() {
348352
return nil, nil
@@ -352,7 +356,7 @@ func (s *statusSync) runUpdateIngressV1beta1(ctx context.Context, ing *networkin
352356
"ingress_namespace": ing.Namespace,
353357
"ingress_name": ing.Name,
354358
})
355-
sort.SliceStable(status, lessLoadBalancerIngress(status))
359+
sort.SliceStable(status, lessLoadBalancerIngress(status)) // BUG: data race - see issue #829
356360

357361
curIPs := ing.Status.LoadBalancer.Ingress
358362
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
@@ -363,6 +367,14 @@ func (s *statusSync) runUpdateIngressV1beta1(ctx context.Context, ing *networkin
363367
}
364368

365369
switch s.IngressAPI {
370+
case utils.NetworkingV1:
371+
// I expect this case to never happen, because if s.IngressAPI == NetworkingV1, then I expect Store to have only
372+
// v1 ingresses (and no v1beta1 ingresses). If Store happens to have a v1beta1 Ingress nonetheless, I'm choosing
373+
// not to drop it, but to log a warning and talk networking.k8s.io/v1beta1 (as opposed to extensions/v1beta1)
374+
// because a v1-supporting Kubernetes API is more likely to support the former than the latter.
375+
logger.Warnf("statusSync got an unexpected v1beta1 Ingress when it expected v1")
376+
fallthrough
377+
366378
case utils.NetworkingV1beta1:
367379
ingClient := client.NetworkingV1beta1().Ingresses(ing.Namespace)
368380

@@ -407,6 +419,49 @@ func (s *statusSync) runUpdateIngressV1beta1(ctx context.Context, ing *networkin
407419
}
408420
}
409421

422+
func (s *statusSync) runUpdateIngressV1(ctx context.Context, ing *networkingv1.Ingress,
423+
status []apiv1.LoadBalancerIngress, client clientset.Interface) pool.WorkFunc {
424+
return func(wu pool.WorkUnit) (interface{}, error) {
425+
if wu.IsCancelled() {
426+
return nil, nil
427+
}
428+
429+
logger := s.Logger.WithFields(logrus.Fields{
430+
"ingress_namespace": ing.Namespace,
431+
"ingress_name": ing.Name,
432+
})
433+
sort.SliceStable(status, lessLoadBalancerIngress(status)) // BUG: data race - see issue #829
434+
435+
curIPs := ing.Status.LoadBalancer.Ingress
436+
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
437+
438+
if ingressSliceEqual(status, curIPs) {
439+
logger.Debugf("no change in status, update skipped")
440+
return true, nil
441+
}
442+
443+
ingClient := client.NetworkingV1().Ingresses(ing.Namespace)
444+
445+
currIng, err := ingClient.Get(ctx, ing.Name, metav1.GetOptions{})
446+
if err != nil {
447+
return nil, fmt.Errorf("failed to fetch Ingress %v/%v: %w", ing.Namespace, ing.Name, err)
448+
}
449+
450+
logger.WithField("ingress_status", status).Debugf("attempting to update ingress status")
451+
currIng.Status.LoadBalancer.Ingress = status
452+
_, err = ingClient.UpdateStatus(ctx, currIng, metav1.UpdateOptions{})
453+
if err != nil {
454+
// TODO return this error?
455+
logger.Errorf("failed to update ingress status: %v", err)
456+
} else {
457+
logger.WithField("ingress_status", status).Debugf("successfully updated ingress status")
458+
}
459+
460+
return true, nil
461+
462+
}
463+
}
464+
410465
func toCoreLBStatus(knativeLBStatus *knative.LoadBalancerStatus) []apiv1.LoadBalancerIngress {
411466
var res []apiv1.LoadBalancerIngress
412467
if knativeLBStatus == nil {
@@ -447,7 +502,7 @@ func (s *statusSync) runUpdateKnativeIngress(ctx context.Context,
447502
"ingress_namespace": ing.Namespace,
448503
"ingress_name": ing.Name,
449504
})
450-
sort.SliceStable(status, lessLoadBalancerIngress(status))
505+
sort.SliceStable(status, lessLoadBalancerIngress(status)) // BUG: data race - see issue #829
451506
curIPs := toCoreLBStatus(ing.Status.PublicLoadBalancer)
452507
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
453508

@@ -507,7 +562,7 @@ func (s *statusSync) runUpdateTCPIngress(ctx context.Context,
507562
"ingress_namespace": ing.Namespace,
508563
"ingress_name": ing.Name,
509564
})
510-
sort.SliceStable(status, lessLoadBalancerIngress(status))
565+
sort.SliceStable(status, lessLoadBalancerIngress(status)) // BUG: data race - see issue #829
511566

512567
curIPs := ing.Status.LoadBalancer.Ingress
513568
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))

internal/ingress/status/status_test.go

Lines changed: 160 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func fakeSynFn(interface{}) error {
183183
return nil
184184
}
185185

186-
func buildExtensionsIngresses() []networkingv1beta1.Ingress {
186+
func buildIngressesV1beta1() []networkingv1beta1.Ingress {
187187
return []networkingv1beta1.Ingress{
188188
{
189189
ObjectMeta: metav1.ObjectMeta{
@@ -234,18 +234,66 @@ func buildExtensionsIngresses() []networkingv1beta1.Ingress {
234234
}
235235
}
236236

237-
type testIngressLister struct {
237+
func buildIngressesV1() []networkingv1.Ingress {
238+
return []networkingv1.Ingress{
239+
{
240+
ObjectMeta: metav1.ObjectMeta{
241+
Name: "foo_ingress_1",
242+
Namespace: apiv1.NamespaceDefault,
243+
},
244+
Status: networkingv1.IngressStatus{
245+
LoadBalancer: apiv1.LoadBalancerStatus{
246+
Ingress: []apiv1.LoadBalancerIngress{
247+
{
248+
IP: "10.0.0.1",
249+
Hostname: "foo1",
250+
},
251+
},
252+
},
253+
},
254+
},
255+
{
256+
ObjectMeta: metav1.ObjectMeta{
257+
Name: "foo_ingress_different_class",
258+
Namespace: metav1.NamespaceDefault,
259+
Annotations: map[string]string{
260+
"kubernetes.io/ingress.class": "no-nginx",
261+
},
262+
},
263+
Status: networkingv1.IngressStatus{
264+
LoadBalancer: apiv1.LoadBalancerStatus{
265+
Ingress: []apiv1.LoadBalancerIngress{
266+
{
267+
IP: "0.0.0.0",
268+
Hostname: "foo.bar.com",
269+
},
270+
},
271+
},
272+
},
273+
},
274+
{
275+
ObjectMeta: metav1.ObjectMeta{
276+
Name: "foo_ingress_2",
277+
Namespace: apiv1.NamespaceDefault,
278+
},
279+
Status: networkingv1.IngressStatus{
280+
LoadBalancer: apiv1.LoadBalancerStatus{
281+
Ingress: []apiv1.LoadBalancerIngress{},
282+
},
283+
},
284+
},
285+
}
238286
}
239287

240-
func (til *testIngressLister) ListIngressesV1beta1() []*networkingv1beta1.Ingress {
241-
var ingresses []*networkingv1beta1.Ingress
242-
ingresses = append(ingresses, &networkingv1beta1.Ingress{
288+
var sampleIngressesV1beta1 = []*networkingv1beta1.Ingress{
289+
{
243290
ObjectMeta: metav1.ObjectMeta{
244291
Name: "foo_ingress_non_01",
245292
Namespace: apiv1.NamespaceDefault,
246-
}})
293+
},
294+
},
247295

248-
ingresses = append(ingresses, &networkingv1beta1.Ingress{
296+
{
249297
ObjectMeta: metav1.ObjectMeta{
250298
Name: "foo_ingress_1",
251299
Namespace: apiv1.NamespaceDefault,
@@ -255,13 +303,41 @@ func (til *testIngressLister) ListIngressesV1beta1() []*networkingv1beta1.Ingres
255303
Ingress: buildLoadBalancerIngressByIP(),
256304
},
257305
},
258-
})
306+
},
307+
}
308+
309+
var sampleIngressesV1 = []*networkingv1.Ingress{
310+
{
311+
ObjectMeta: metav1.ObjectMeta{
312+
Name: "foo_ingress_non_01",
313+
Namespace: apiv1.NamespaceDefault,
314+
},
315+
},
316+
317+
{
318+
ObjectMeta: metav1.ObjectMeta{
319+
Name: "foo_ingress_1",
320+
Namespace: apiv1.NamespaceDefault,
321+
},
322+
Status: networkingv1.IngressStatus{
323+
LoadBalancer: apiv1.LoadBalancerStatus{
324+
Ingress: buildLoadBalancerIngressByIP(),
325+
},
326+
},
327+
},
328+
}
329+
330+
type testIngressLister struct {
331+
ingressesV1beta1 []*networkingv1beta1.Ingress
332+
ingressesV1 []*networkingv1.Ingress
333+
}
259334

260-
return ingresses
335+
func (til *testIngressLister) ListIngressesV1beta1() []*networkingv1beta1.Ingress {
336+
return til.ingressesV1beta1
261337
}
262338

263339
func (til *testIngressLister) ListIngressesV1() []*networkingv1.Ingress {
264-
return nil
340+
return til.ingressesV1
265341
}
266342

267343
func (til *testIngressLister) ListTCPIngresses() ([]*configurationv1beta1.TCPIngress, error) {
@@ -272,10 +348,6 @@ func (til *testIngressLister) ListKnativeIngresses() ([]*knative.Ingress, error)
272348
return nil, nil
273349
}
274350

275-
func buildIngressLister() ingressLister {
276-
return &testIngressLister{}
277-
}
278-
279351
func buildStatusSync() statusSync {
280352
return statusSync{
281353
pod: &utils.PodInfo{
@@ -287,9 +359,9 @@ func buildStatusSync() statusSync {
287359
},
288360
syncQueue: task.NewTaskQueue(fakeSynFn, logrus.New()),
289361
Config: Config{
290-
CoreClient: buildSimpleClientSet(&networkingv1beta1.IngressList{Items: buildExtensionsIngresses()}),
362+
CoreClient: buildSimpleClientSet(&networkingv1beta1.IngressList{Items: buildIngressesV1beta1()}),
291363
PublishService: apiv1.NamespaceDefault + "/" + "foo",
292-
IngressLister: buildIngressLister(),
364+
IngressLister: &testIngressLister{ingressesV1beta1: sampleIngressesV1beta1},
293365
IngressAPI: utils.ExtensionsV1beta1,
294366
},
295367
}
@@ -301,9 +373,9 @@ func TestStatusActionsV1beta1(t *testing.T) {
301373
os.Setenv("POD_NAME", "foo1")
302374
os.Setenv("POD_NAMESPACE", apiv1.NamespaceDefault)
303375
c := Config{
304-
CoreClient: buildSimpleClientSet(&networkingv1beta1.IngressList{Items: buildExtensionsIngresses()}),
376+
CoreClient: buildSimpleClientSet(&networkingv1beta1.IngressList{Items: buildIngressesV1beta1()}),
305377
PublishService: apiv1.NamespaceDefault + "/" + "foo",
306-
IngressLister: buildIngressLister(),
378+
IngressLister: &testIngressLister{ingressesV1beta1: sampleIngressesV1beta1},
307379
UpdateStatusOnShutdown: true,
308380
IngressAPI: utils.NetworkingV1beta1,
309381
Logger: logrus.New(),
@@ -365,6 +437,76 @@ func TestStatusActionsV1beta1(t *testing.T) {
365437
}
366438
}
367439

440+
func TestStatusActionsV1(t *testing.T) {
441+
ctx := context.Background()
442+
// make sure election can be created
443+
os.Setenv("POD_NAME", "foo1")
444+
os.Setenv("POD_NAMESPACE", apiv1.NamespaceDefault)
445+
c := Config{
446+
CoreClient: buildSimpleClientSet(&networkingv1.IngressList{Items: buildIngressesV1()}),
447+
PublishService: apiv1.NamespaceDefault + "/" + "foo",
448+
IngressLister: &testIngressLister{ingressesV1: sampleIngressesV1},
449+
UpdateStatusOnShutdown: true,
450+
IngressAPI: utils.NetworkingV1,
451+
Logger: logrus.New(),
452+
}
453+
// create object
454+
fkSync, err := NewStatusSyncer(ctx, c)
455+
if fkSync == nil {
456+
t.Fatalf("expected a valid Sync")
457+
}
458+
459+
fk := fkSync.(statusSync)
460+
461+
// start it and wait for the election and syn actions
462+
go fk.Run()
463+
// wait for the election
464+
time.Sleep(100 * time.Millisecond)
465+
// execute sync
466+
err = fk.sync("just-test")
467+
if err != nil {
468+
t.Fatalf("unexpected error: %v", err)
469+
}
470+
471+
newIPs := []apiv1.LoadBalancerIngress{{
472+
IP: "11.0.0.2",
473+
}}
474+
fooIngress1, err1 := fk.CoreClient.NetworkingV1().Ingresses(
475+
apiv1.NamespaceDefault).Get(ctx, "foo_ingress_1", metav1.GetOptions{})
476+
if err1 != nil {
477+
t.Fatalf("unexpected error")
478+
}
479+
fooIngress1CurIPs := fooIngress1.Status.LoadBalancer.Ingress
480+
if !ingressSliceEqual(fooIngress1CurIPs, newIPs) {
481+
t.Fatalf("returned %v but expected %v", fooIngress1CurIPs, newIPs)
482+
}
483+
484+
time.Sleep(1 * time.Second)
485+
486+
// execute shutdown
487+
fk.Shutdown(true)
488+
// ingress should be empty
489+
newIPs2 := []apiv1.LoadBalancerIngress{}
490+
fooIngress2, err2 := fk.CoreClient.NetworkingV1().Ingresses(
491+
apiv1.NamespaceDefault).Get(ctx, "foo_ingress_1", metav1.GetOptions{})
492+
if err2 != nil {
493+
t.Fatalf("unexpected error")
494+
}
495+
fooIngress2CurIPs := fooIngress2.Status.LoadBalancer.Ingress
496+
if !ingressSliceEqual(fooIngress2CurIPs, newIPs2) {
497+
t.Fatalf("returned %v but expected %v", fooIngress2CurIPs, newIPs2)
498+
}
499+
500+
oic, err := fk.CoreClient.NetworkingV1().Ingresses(
501+
metav1.NamespaceDefault).Get(ctx, "foo_ingress_different_class", metav1.GetOptions{})
502+
if err != nil {
503+
t.Fatalf("unexpected error")
504+
}
505+
if oic.Status.LoadBalancer.Ingress[0].IP != "0.0.0.0" && oic.Status.LoadBalancer.Ingress[0].Hostname != "foo.bar.com" {
506+
t.Fatalf("invalid ingress status for rule with different class")
507+
}
508+
}
509+
368510
func TestCallback(t *testing.T) {
369511
buildStatusSync()
370512
}

0 commit comments

Comments
 (0)