Skip to content

Commit

Permalink
bugfix: fix eds updates when discovery enabled (#10391)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuval-k authored Nov 22, 2024
1 parent e2ca028 commit b98e704
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 14 deletions.
5 changes: 5 additions & 0 deletions changelog/v1.18.0-rc2/eds-discovery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
changelog:
- type: NON_USER_FACING
description: >-
Fix an issue introduced earlier in this version, where changes in EDS would not be detected
properly if upstream discovery was turn on.
25 changes: 21 additions & 4 deletions projects/gateway2/krtcollections/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package krtcollections

import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"hash/fnv"
Expand Down Expand Up @@ -128,14 +129,16 @@ type EndpointsForUpstream struct {
Hostname string

LbEpsEqualityHash uint64
upstreamHash uint64
epsEqualityHash uint64
}

func NewEndpointsForUpstream(us UpstreamWrapper, logger *zap.Logger) *EndpointsForUpstream {
// start with a hash of the cluster name. technically we dont need it for krt, as we can compare the upstream name. but it helps later
// to compute the hash we present envoy with.
h := fnv.New64()
h.Write([]byte(us.Inner.GetMetadata().Ref().String()))
lbEpsEqualityHash := h.Sum64()
upstreamHash := h.Sum64()

// add the upstream hash to the clustername, so that if it changes the envoy cluster will become warm again.
clusterName := GetEndpointClusterName(us.Inner)
Expand All @@ -148,12 +151,13 @@ func NewEndpointsForUpstream(us UpstreamWrapper, logger *zap.Logger) *EndpointsF
},
Port: ggv2utils.GetPortForUpstream(us.Inner),
Hostname: ggv2utils.GetHostnameForUpstream(us.Inner),
LbEpsEqualityHash: lbEpsEqualityHash,
LbEpsEqualityHash: upstreamHash,
upstreamHash: upstreamHash,
}
}

func hashEndpoints(l PodLocality, emd EndpointWithMd) uint64 {
hasher := fnv.New64()
hasher := fnv.New64a()
hasher.Write([]byte(l.Region))
hasher.Write([]byte(l.Zone))
hasher.Write([]byte(l.Subzone))
Expand All @@ -163,10 +167,23 @@ func hashEndpoints(l PodLocality, emd EndpointWithMd) uint64 {
return hasher.Sum64()
}

func hash(a, b uint64) uint64 {
hasher := fnv.New64a()
var buf [16]byte
binary.NativeEndian.PutUint64(buf[:8], a)
binary.NativeEndian.PutUint64(buf[8:], b)
hasher.Write(buf[:])
return hasher.Sum64()
}

func (e *EndpointsForUpstream) Add(l PodLocality, emd EndpointWithMd) {
// xor it as we dont care about order - if we have the same endpoints in the same locality
// we are good.
e.LbEpsEqualityHash ^= hashEndpoints(l, emd)
e.epsEqualityHash ^= hashEndpoints(l, emd)
// we can't xor the endpoint hash with the upstream hash, because upstreams with
// different names and similar endpoints will cancel out, so endpoint changes
// won't result in different equality hashes.
e.LbEpsEqualityHash = hash(e.epsEqualityHash, e.upstreamHash)
e.LbEps[l] = append(e.LbEps[l], emd)
}

Expand Down
108 changes: 108 additions & 0 deletions projects/gateway2/krtcollections/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,114 @@ func TestEndpointsForUpstreamOrderDoesntMatter(t *testing.T) {

}

func TestEndpointsForUpstreamWithDiscoveredUpstream(t *testing.T) {
g := gomega.NewWithT(t)

us := UpstreamWrapper{
Inner: &gloov1.Upstream{
Metadata: &core.Metadata{Name: "name", Namespace: "ns"},
UpstreamType: &gloov1.Upstream_Kube{
Kube: &kubernetes.UpstreamSpec{
ServiceName: "svc",
ServiceNamespace: "ns",
ServicePort: 8080,
},
},
},
}
usd := UpstreamWrapper{
Inner: &gloov1.Upstream{
Metadata: &core.Metadata{Name: "discovered-name", Namespace: "ns"},
UpstreamType: &gloov1.Upstream_Kube{
Kube: &kubernetes.UpstreamSpec{
ServiceName: "svc",
ServiceNamespace: "ns",
ServicePort: 8080,
},
},
},
}
// input
emd1 := EndpointWithMd{
LbEndpoint: &endpointv3.LbEndpoint{
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
Endpoint: &endpointv3.Endpoint{
Address: &envoy_config_core_v3.Address{
Address: &envoy_config_core_v3.Address_SocketAddress{
SocketAddress: &envoy_config_core_v3.SocketAddress{
Address: "1.2.3.4",
PortSpecifier: &envoy_config_core_v3.SocketAddress_PortValue{
PortValue: 8080,
},
},
},
},
},
},
},
EndpointMd: EndpointMetadata{
Labels: map[string]string{
corev1.LabelTopologyRegion: "region",
corev1.LabelTopologyZone: "zone",
},
},
}
emd2 := EndpointWithMd{
LbEndpoint: &endpointv3.LbEndpoint{
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
Endpoint: &endpointv3.Endpoint{
Address: &envoy_config_core_v3.Address{
Address: &envoy_config_core_v3.Address_SocketAddress{
SocketAddress: &envoy_config_core_v3.SocketAddress{
Address: "1.2.3.5",
PortSpecifier: &envoy_config_core_v3.SocketAddress_PortValue{
PortValue: 8080,
},
},
},
},
},
},
},
EndpointMd: EndpointMetadata{
Labels: map[string]string{
corev1.LabelTopologyRegion: "region",
corev1.LabelTopologyZone: "zone",
},
},
}

result1 := NewEndpointsForUpstream(us, nil)
result1.Add(PodLocality{
Region: "region",
Zone: "zone",
}, emd1)

result2 := NewEndpointsForUpstream(usd, nil)
result2.Add(PodLocality{
Region: "region",
Zone: "zone",
}, emd1)

result3 := NewEndpointsForUpstream(us, nil)
result3.Add(PodLocality{
Region: "region",
Zone: "zone",
}, emd2)

result4 := NewEndpointsForUpstream(usd, nil)
result4.Add(PodLocality{
Region: "region",
Zone: "zone",
}, emd2)

h1 := result1.LbEpsEqualityHash ^ result2.LbEpsEqualityHash
h2 := result3.LbEpsEqualityHash ^ result4.LbEpsEqualityHash

g.Expect(h1).NotTo(Equal(h2), "not expected %v, got %v", h1, h2)

}

func TestEndpoints(t *testing.T) {
testCases := []struct {
name string
Expand Down
5 changes: 3 additions & 2 deletions projects/gateway2/krtcollections/uniqueclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type callbacks struct {
collection atomic.Pointer[callbacksCollection]
}

type UniquelyConnectedClientsBulider func(ctx context.Context, augmentedPods krt.Collection[LocalityPod]) krt.Collection[UniqlyConnectedClient]
type UniquelyConnectedClientsBulider func(ctx context.Context, handler *krt.DebugHandler, augmentedPods krt.Collection[LocalityPod]) krt.Collection[UniqlyConnectedClient]

// THIS IS THE SET OF THINGS WE RUN TRANSLATION FOR
// add returned callbacks to the xds server.
Expand All @@ -102,7 +102,7 @@ func NewUniquelyConnectedClients() (xdsserver.Callbacks, UniquelyConnectedClient
}

func buildCollection(callbacks *callbacks) UniquelyConnectedClientsBulider {
return func(ctx context.Context, augmentedPods krt.Collection[LocalityPod]) krt.Collection[UniqlyConnectedClient] {
return func(ctx context.Context, handler *krt.DebugHandler, augmentedPods krt.Collection[LocalityPod]) krt.Collection[UniqlyConnectedClient] {
trigger := krt.NewRecomputeTrigger(true)
col := &callbacksCollection{
logger: contextutils.LoggerFrom(ctx).Desugar(),
Expand All @@ -121,6 +121,7 @@ func buildCollection(callbacks *callbacks) UniquelyConnectedClientsBulider {
return col.getClients()
},
krt.WithName("UniqueConnectedClients"),
krt.WithDebugging(handler),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion projects/gateway2/krtcollections/uniqueclients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestUniqueClients(t *testing.T) {
pods.Synced().WaitUntilSynced(context.Background().Done())

cb, uccBuilder := NewUniquelyConnectedClients()
ucc := uccBuilder(context.Background(), pods)
ucc := uccBuilder(context.Background(), nil, pods)
ucc.Synced().WaitUntilSynced(context.Background().Done())

// check fetch as well
Expand Down
2 changes: 1 addition & 1 deletion projects/gateway2/setup/ggv2setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func StartGGv2WithConfig(ctx context.Context,
settingsGVR,
krt.WithName("GlooSettings"))

ucc := uccBuilder(ctx, augmentedPods)
ucc := uccBuilder(ctx, setupOpts.KrtDebugger, augmentedPods)

settingsSingle := krt.NewSingleton(func(ctx krt.HandlerContext) *glookubev1.Settings {
s := krt.FetchOne(ctx, setting,
Expand Down
8 changes: 4 additions & 4 deletions projects/gateway2/setup/ggv2setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ type xdsFetcher struct {

func (x *xdsFetcher) getclusters(t *testing.T, ctx context.Context) []*envoycluster.Cluster {

ctx, cancel := context.WithTimeout(ctx, time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

cds := envoy_service_cluster_v3.NewClusterDiscoveryServiceClient(x.conn)
Expand Down Expand Up @@ -634,7 +634,7 @@ func getroutesnames(l *envoylistener.Listener) []string {

func (x *xdsFetcher) getlisteners(t *testing.T, ctx context.Context) []*envoylistener.Listener {

ctx, cancel := context.WithTimeout(ctx, time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

ds := envoy_service_listener_v3.NewListenerDiscoveryServiceClient(x.conn)
Expand Down Expand Up @@ -667,7 +667,7 @@ func (x *xdsFetcher) getlisteners(t *testing.T, ctx context.Context) []*envoylis
func (x *xdsFetcher) getendpoints(t *testing.T, ctx context.Context, clusterServiceNames []string) []*envoyendpoint.ClusterLoadAssignment {

eds := envoy_service_endpoint_v3.NewEndpointDiscoveryServiceClient(x.conn)
ctx, cancel := context.WithTimeout(ctx, time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

epcli, err := eds.StreamEndpoints(ctx)
Expand Down Expand Up @@ -700,7 +700,7 @@ func (x *xdsFetcher) getendpoints(t *testing.T, ctx context.Context, clusterServ
func (x *xdsFetcher) getroutes(t *testing.T, ctx context.Context, rosourceNames []string) []*envoy_config_route_v3.RouteConfiguration {

eds := envoy_service_route_v3.NewRouteDiscoveryServiceClient(x.conn)
ctx, cancel := context.WithTimeout(ctx, time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

epcli, err := eds.StreamRoutes(ctx)
Expand Down
4 changes: 2 additions & 2 deletions projects/gateway2/utils/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
)

func HashProto(resource proto.Message) uint64 {
hasher := fnv.New64()
hasher := fnv.New64a()
HashProtoWithHasher(hasher, resource)
return hasher.Sum64()
}

func HashProtoWithHasher(hasher hash.Hash64, resource proto.Message) {
func HashProtoWithHasher(hasher hash.Hash, resource proto.Message) {
var buffer [1024]byte
mo := proto.MarshalOptions{Deterministic: true}
buf := buffer[:0]
Expand Down

0 comments on commit b98e704

Please sign in to comment.