Skip to content

Commit 91f2f8d

Browse files
committed
Use the private service's hostname & target port
When we set 'ClusterIP: None' on the private service the activator can't perform the fallback service probing. This is needed while in mesh mode. This change tweaks the fallback probing to use the hostname of the private service instead of using a cluster IP. On caveat is that when using a headless service Istio/K8s doesn't perform any port translation 80 (http)->8012 (queue-proxy http). Thus we perform this lookup ourselves.
1 parent 6760aa6 commit 91f2f8d

File tree

6 files changed

+173
-174
lines changed

6 files changed

+173
-174
lines changed

pkg/activator/net/helpers.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
corev1 "k8s.io/api/core/v1"
2424
"k8s.io/apimachinery/pkg/util/sets"
25-
2625
"knative.dev/networking/pkg/apis/networking"
2726
)
2827

@@ -92,13 +91,13 @@ func endpointsToDests(endpoints *corev1.Endpoints, portName string) (ready, notR
9291
return ready, notReady
9392
}
9493

95-
// getServicePort takes a service and a protocol and returns the port number of
94+
// getTargetPort takes a service and a protocol and returns the port number of
9695
// the port named for that protocol. If the port is not found then ok is false.
97-
func getServicePort(protocol networking.ProtocolType, svc *corev1.Service) (int, bool) {
96+
func getTargetPort(protocol networking.ProtocolType, svc *corev1.Service) (int, bool) {
9897
wantName := networking.ServicePortName(protocol)
9998
for _, p := range svc.Spec.Ports {
10099
if p.Name == wantName {
101-
return int(p.Port), true
100+
return p.TargetPort.IntValue(), true
102101
}
103102
}
104103

pkg/activator/net/helpers_test.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/google/go-cmp/cmp"
2424
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/util/intstr"
2526
"k8s.io/apimachinery/pkg/util/sets"
2627
"knative.dev/networking/pkg/apis/networking"
2728
)
@@ -153,7 +154,7 @@ func TestEndpointsToDests(t *testing.T) {
153154
}
154155
}
155156

156-
func TestGetServicePort(t *testing.T) {
157+
func TestGetTargetPort(t *testing.T) {
157158
for _, tc := range []struct {
158159
name string
159160
protocol networking.ProtocolType
@@ -164,17 +165,17 @@ func TestGetServicePort(t *testing.T) {
164165
name: "Single port",
165166
protocol: networking.ProtocolHTTP1,
166167
ports: []corev1.ServicePort{{
167-
Name: "http",
168-
Port: 100,
168+
Name: "http",
169+
TargetPort: intstr.FromInt(100),
169170
}},
170171
expect: 100,
171172
expectOK: true,
172173
}, {
173174
name: "Missing port",
174175
protocol: networking.ProtocolHTTP1,
175176
ports: []corev1.ServicePort{{
176-
Name: "invalid",
177-
Port: 100,
177+
Name: "invalid",
178+
TargetPort: intstr.FromInt(100),
178179
}},
179180
expect: 0,
180181
expectOK: false,
@@ -186,7 +187,7 @@ func TestGetServicePort(t *testing.T) {
186187
},
187188
}
188189

189-
port, ok := getServicePort(tc.protocol, &svc)
190+
port, ok := getTargetPort(tc.protocol, &svc)
190191
if ok != tc.expectOK {
191192
t.Errorf("Wanted ok %v, got %v", tc.expectOK, ok)
192193
}

pkg/activator/net/revision_backends.go

+32-31
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"knative.dev/pkg/controller"
4949
"knative.dev/pkg/logging"
5050
"knative.dev/pkg/logging/logkey"
51+
"knative.dev/pkg/network"
5152
"knative.dev/pkg/reconciler"
5253
"knative.dev/serving/pkg/apis/serving"
5354
revisioninformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/revision"
@@ -62,9 +63,9 @@ import (
6263
// ClusterIPDest will be set to non empty string and Dests will be nil. Otherwise Dests will be set
6364
// to a slice of healthy l4 dests for reaching the revision.
6465
type revisionDestsUpdate struct {
65-
Rev types.NamespacedName
66-
ClusterIPDest string
67-
Dests sets.Set[string]
66+
Rev types.NamespacedName
67+
PrivateService string
68+
Dests sets.Set[string]
6869
}
6970

7071
type dests struct {
@@ -91,7 +92,7 @@ const (
9192
defaultProbeFrequency time.Duration = 200 * time.Millisecond
9293
)
9394

94-
// revisionWatcher watches the podIPs and ClusterIP of the service for a revision. It implements the logic
95+
// revisionWatcher watches the podIPs/service of a revision. It implements the logic
9596
// to supply revisionDestsUpdate events on updateCh
9697
type revisionWatcher struct {
9798
stopCh <-chan struct{}
@@ -103,8 +104,9 @@ type revisionWatcher struct {
103104

104105
// Stores the list of pods that have been successfully probed.
105106
healthyPods sets.Set[string]
106-
// Stores whether the service ClusterIP has been seen as healthy.
107-
clusterIPHealthy bool
107+
108+
// Stores whether the private k8s service has been seen as healthy.
109+
privateServiceHealthy bool
108110

109111
transport http.RoundTripper
110112
destsCh chan dests
@@ -200,23 +202,22 @@ func (rw *revisionWatcher) probe(ctx context.Context, dest string) (pass bool, n
200202
return match, notMesh, err
201203
}
202204

203-
func (rw *revisionWatcher) getDest() (string, error) {
204-
svc, err := rw.serviceLister.Services(rw.rev.Namespace).Get(names.PrivateService(rw.rev.Name))
205+
func (rw *revisionWatcher) getPrivateServiceDest() (string, error) {
206+
svcName := names.PrivateService(rw.rev.Name)
207+
svc, err := rw.serviceLister.Services(rw.rev.Namespace).Get(svcName)
205208
if err != nil {
206209
return "", err
207210
}
208-
if svc.Spec.ClusterIP == "" {
209-
return "", fmt.Errorf("private service %s/%s clusterIP is nil, this should never happen", svc.ObjectMeta.Namespace, svc.ObjectMeta.Name)
210-
}
211211

212-
svcPort, ok := getServicePort(rw.protocol, svc)
212+
svcHostname := network.GetServiceHostname(svcName, rw.rev.Namespace)
213+
svcPort, ok := getTargetPort(rw.protocol, svc)
213214
if !ok {
214215
return "", fmt.Errorf("unable to find port in service %s/%s", svc.Namespace, svc.Name)
215216
}
216-
return net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(svcPort)), nil
217+
return net.JoinHostPort(svcHostname, strconv.Itoa(svcPort)), nil
217218
}
218219

219-
func (rw *revisionWatcher) probeClusterIP(dest string) (bool, error) {
220+
func (rw *revisionWatcher) probePrivateService(dest string) (bool, error) {
220221
ctx, cancel := context.WithTimeout(context.Background(), probeTimeout)
221222
defer cancel()
222223
match, _, err := rw.probe(ctx, dest)
@@ -296,12 +297,12 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.Set[string]) (succee
296297
return healthy, unchanged, sawNotMesh.Load(), err
297298
}
298299

299-
func (rw *revisionWatcher) sendUpdate(clusterIP string, dests sets.Set[string]) {
300+
func (rw *revisionWatcher) sendUpdate(privateService string, dests sets.Set[string]) {
300301
select {
301302
case <-rw.stopCh:
302303
return
303304
default:
304-
rw.updateCh <- revisionDestsUpdate{Rev: rw.rev, ClusterIPDest: clusterIP, Dests: dests}
305+
rw.updateCh <- revisionDestsUpdate{Rev: rw.rev, PrivateService: privateService, Dests: dests}
305306
}
306307
}
307308

@@ -310,9 +311,9 @@ func (rw *revisionWatcher) sendUpdate(clusterIP string, dests sets.Set[string])
310311
func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
311312
if len(curDests.ready) == 0 && len(curDests.notReady) == 0 {
312313
// We must have scaled down.
313-
rw.clusterIPHealthy = false
314+
rw.privateServiceHealthy = false
314315
rw.healthyPods = nil
315-
rw.logger.Debug("ClusterIP is no longer healthy.")
316+
rw.logger.Debug("Private service is no longer healthy.")
316317
// Send update that we are now inactive (both params invalid).
317318
rw.sendUpdate("", nil)
318319
return
@@ -351,7 +352,7 @@ func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
351352
// Note: it's important that this copies (via hs.Union) the healthy pods
352353
// set before sending the update to avoid concurrent modifications
353354
// affecting the throttler, which iterates over the set.
354-
rw.sendUpdate("" /*clusterIP*/, hs.Union(nil))
355+
rw.sendUpdate("", hs.Union(nil))
355356
return
356357
}
357358
// no-op, and we have successfully probed at least one pod.
@@ -380,28 +381,28 @@ func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
380381
// If we failed to probe even a single pod, check the clusterIP.
381382
// NB: We can't cache the IP address, since user might go rogue
382383
// and delete the K8s service. We'll fix it, but the cluster IP will be different.
383-
dest, err := rw.getDest()
384+
dest, err := rw.getPrivateServiceDest()
384385
if err != nil {
385386
rw.logger.Errorw("Failed to determine service destination", zap.Error(err))
386387
return
387388
}
388389

389-
// If cluster IP is healthy and we haven't scaled down, short circuit.
390-
if rw.clusterIPHealthy {
391-
rw.logger.Debugf("ClusterIP %s already probed (ready backends: %d)", dest, len(curDests.ready))
390+
// If service hostname is healthy and we haven't scaled down, short circuit.
391+
if rw.privateServiceHealthy {
392+
rw.logger.Debugf("service hostname %s already probed (ready backends: %d)", dest, len(curDests.ready))
392393
rw.sendUpdate(dest, curDests.ready)
393394
return
394395
}
395396

396-
// If clusterIP is healthy send this update and we are done.
397-
if ok, err := rw.probeClusterIP(dest); err != nil {
398-
rw.logger.Errorw("Failed to probe clusterIP "+dest, zap.Error(err))
397+
// If service via hostname is healthy send this update and we are done.
398+
if ok, err := rw.probePrivateService(dest); err != nil {
399+
rw.logger.Errorw("Failed to probe private service: "+dest, zap.Error(err))
399400
} else if ok {
400401
// We can reach here only iff pods are not successfully individually probed
401-
// but ClusterIP conversely has been successfully probed.
402+
// but PrivateService conversely has been successfully probed.
402403
rw.podsAddressable = false
403-
rw.logger.Debugf("ClusterIP is successfully probed: %s (ready backends: %d)", dest, len(curDests.ready))
404-
rw.clusterIPHealthy = true
404+
rw.logger.Debugf("Private service is successfully probed: %s (ready backends: %d)", dest, len(curDests.ready))
405+
rw.privateServiceHealthy = true
405406
rw.healthyPods = nil
406407
rw.sendUpdate(dest, curDests.ready)
407408
}
@@ -421,8 +422,8 @@ func (rw *revisionWatcher) run(probeFrequency time.Duration) {
421422
// then we want to probe on timer.
422423
rw.logger.Debugw("Revision state", zap.Object("dests", curDests),
423424
zap.Object("healthy", logging.StringSet(rw.healthyPods)),
424-
zap.Bool("clusterIPHealthy", rw.clusterIPHealthy))
425-
if len(curDests.ready)+len(curDests.notReady) > 0 && !(rw.clusterIPHealthy ||
425+
zap.Bool("clusterHealthy", rw.privateServiceHealthy))
426+
if len(curDests.ready)+len(curDests.notReady) > 0 && !(rw.privateServiceHealthy ||
426427
curDests.ready.Union(curDests.notReady).Equal(rw.healthyPods)) {
427428
rw.logger.Debug("Probing on timer")
428429
tickCh = timer.C

0 commit comments

Comments
 (0)