Skip to content

Commit 9c86d74

Browse files
author
Kubernetes Submit Queue
authored
Merge pull request kubernetes#48583 from smarterclayton/record_errors
Automatic merge from submit-queue Record 429 and timeout errors to prometheus Allows gathering of load being shed. Fixes kubernetes#48559 @deads2k please review, there was a logic error in apiserver RequestInfo (minor, fortunately) ```release-note Requests with the query parameter `?watch=` are treated by the API server as a request to watch, but authorization and metrics were not correctly identifying those as watch requests, instead grouping them as list calls. ```
2 parents c2ddbd2 + 2e33a2f commit 9c86d74

File tree

7 files changed

+86
-27
lines changed

7 files changed

+86
-27
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/proxy.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,12 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
6060
var httpCode int
6161
reqStart := time.Now()
6262
defer func() {
63-
metrics.Monitor(&verb, &apiResource, &subresource,
63+
metrics.Monitor(
64+
verb, apiResource, subresource,
6465
net.GetHTTPClient(req),
6566
w.Header().Get("Content-Type"),
66-
httpCode, reqStart)
67+
httpCode, reqStart,
68+
)
6769
}()
6870

6971
ctx, ok := r.Mapper.Get(req)

staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,29 @@ func Register() {
6969
prometheus.MustRegister(requestLatenciesSummary)
7070
}
7171

72-
func Monitor(verb, resource, subresource *string, client, contentType string, httpCode int, reqStart time.Time) {
72+
// Monitor records a request to the apiserver endpoints that follow the Kubernetes API conventions. verb must be
73+
// uppercase to be backwards compatible with existing monitoring tooling.
74+
func Monitor(verb, resource, subresource, client, contentType string, httpCode int, reqStart time.Time) {
7375
elapsed := float64((time.Since(reqStart)) / time.Microsecond)
74-
requestCounter.WithLabelValues(*verb, *resource, *subresource, client, contentType, codeToString(httpCode)).Inc()
75-
requestLatencies.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed)
76-
requestLatenciesSummary.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed)
76+
requestCounter.WithLabelValues(verb, resource, subresource, client, contentType, codeToString(httpCode)).Inc()
77+
requestLatencies.WithLabelValues(verb, resource, subresource).Observe(elapsed)
78+
requestLatenciesSummary.WithLabelValues(verb, resource, subresource).Observe(elapsed)
79+
}
80+
81+
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
82+
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
83+
func MonitorRequest(request *http.Request, verb, resource, subresource, contentType string, httpCode int, reqStart time.Time) {
84+
reportedVerb := verb
85+
if verb == "LIST" {
86+
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
87+
if values := request.URL.Query()["watch"]; len(values) > 0 {
88+
if value := strings.ToLower(values[0]); value != "0" && value != "false" {
89+
reportedVerb = "WATCH"
90+
}
91+
}
92+
}
93+
client := cleanUserAgent(utilnet.GetHTTPClient(request))
94+
Monitor(reportedVerb, resource, subresource, client, contentType, httpCode, reqStart)
7795
}
7896

7997
func Reset() {
@@ -103,11 +121,7 @@ func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.R
103121

104122
routeFunc(request, response)
105123

106-
reportedVerb := verb
107-
if verb == "LIST" && strings.ToLower(request.QueryParameter("watch")) == "true" {
108-
reportedVerb = "WATCH"
109-
}
110-
Monitor(&reportedVerb, &resource, &subresource, cleanUserAgent(utilnet.GetHTTPClient(request.Request)), rw.Header().Get("Content-Type"), delegate.status, now)
124+
MonitorRequest(request.Request, verb, resource, subresource, rw.Header().Get("Content-Type"), delegate.status, now)
111125
})
112126
}
113127

staging/src/k8s.io/apiserver/pkg/endpoints/request/requestinfo.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,17 @@ func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, er
195195
// if there's no name on the request and we thought it was a get before, then the actual verb is a list or a watch
196196
if len(requestInfo.Name) == 0 && requestInfo.Verb == "get" {
197197
// Assumes v1.ListOptions
198-
// Duplicates logic of Convert_Slice_string_To_bool
199-
switch strings.ToLower(req.URL.Query().Get("watch")) {
200-
case "false", "0", "":
198+
// Any query value that is not 0 or false is considered true
199+
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
200+
if values := req.URL.Query()["watch"]; len(values) > 0 {
201+
switch strings.ToLower(values[0]) {
202+
case "false", "0":
203+
requestInfo.Verb = "list"
204+
default:
205+
requestInfo.Verb = "watch"
206+
}
207+
} else {
201208
requestInfo.Verb = "list"
202-
default:
203-
requestInfo.Verb = "watch"
204209
}
205210
}
206211
// if there's no name on the request and we thought it was a delete before, then the actual verb is deletecollection

staging/src/k8s.io/apiserver/pkg/server/filters/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ go_library(
4848
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
4949
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
5050
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
51+
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
5152
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
5253
"//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library",
5354
],

staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ package filters
1919
import (
2020
"fmt"
2121
"net/http"
22+
"strings"
23+
"time"
2224

2325
"k8s.io/apimachinery/pkg/api/errors"
2426
"k8s.io/apimachinery/pkg/util/sets"
27+
"k8s.io/apiserver/pkg/endpoints/metrics"
2528
apirequest "k8s.io/apiserver/pkg/endpoints/request"
2629
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
2730
"k8s.io/apiserver/pkg/server/httplog"
@@ -94,6 +97,7 @@ func WithMaxInFlightLimit(
9497
defer func() { <-c }()
9598
handler.ServeHTTP(w, r)
9699
default:
100+
metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", errors.StatusTooManyRequests, time.Now())
97101
tooManyRequests(r, w)
98102
}
99103
}

staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ import (
2222
"fmt"
2323
"net"
2424
"net/http"
25+
"strings"
2526
"sync"
2627
"time"
2728

2829
apierrors "k8s.io/apimachinery/pkg/api/errors"
2930
"k8s.io/apimachinery/pkg/runtime/schema"
31+
"k8s.io/apiserver/pkg/endpoints/metrics"
3032
apirequest "k8s.io/apiserver/pkg/endpoints/request"
3133
)
3234

@@ -39,24 +41,28 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
3941
if longRunning == nil {
4042
return handler
4143
}
42-
timeoutFunc := func(req *http.Request) (<-chan time.Time, *apierrors.StatusError) {
44+
timeoutFunc := func(req *http.Request) (<-chan time.Time, func(), *apierrors.StatusError) {
4345
// TODO unify this with apiserver.MaxInFlightLimit
4446
ctx, ok := requestContextMapper.Get(req)
4547
if !ok {
4648
// if this happens, the handler chain isn't setup correctly because there is no context mapper
47-
return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout"))
49+
return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout"))
4850
}
4951

5052
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
5153
if !ok {
5254
// if this happens, the handler chain isn't setup correctly because there is no request info
53-
return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
55+
return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
5456
}
5557

5658
if longRunning(req, requestInfo) {
57-
return nil, nil
59+
return nil, nil, nil
5860
}
59-
return time.After(globalTimeout), apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0)
61+
now := time.Now()
62+
metricFn := func() {
63+
metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", http.StatusInternalServerError, now)
64+
}
65+
return time.After(globalTimeout), metricFn, apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0)
6066
}
6167
return WithTimeout(handler, timeoutFunc)
6268
}
@@ -68,18 +74,19 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
6874
// provided. (If msg is empty, a suitable default message will be sent.) After
6975
// the handler times out, writes by h to its http.ResponseWriter will return
7076
// http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no
71-
// timeout will be enforced.
72-
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, err *apierrors.StatusError)) http.Handler {
77+
// timeout will be enforced. recordFn is a function that will be invoked whenever
78+
// a timeout happens.
79+
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, recordFn func(), err *apierrors.StatusError)) http.Handler {
7380
return &timeoutHandler{h, timeoutFunc}
7481
}
7582

7683
type timeoutHandler struct {
7784
handler http.Handler
78-
timeout func(*http.Request) (<-chan time.Time, *apierrors.StatusError)
85+
timeout func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError)
7986
}
8087

8188
func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
82-
after, err := t.timeout(r)
89+
after, recordFn, err := t.timeout(r)
8390
if after == nil {
8491
t.handler.ServeHTTP(w, r)
8592
return
@@ -95,6 +102,7 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
95102
case <-done:
96103
return
97104
case <-after:
105+
recordFn()
98106
tw.timeout(err)
99107
}
100108
}

staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"net/http"
2323
"net/http/httptest"
2424
"reflect"
25+
"sync"
2526
"testing"
2627
"time"
2728

@@ -31,21 +32,39 @@ import (
3132
"k8s.io/apimachinery/pkg/util/diff"
3233
)
3334

35+
type recorder struct {
36+
lock sync.Mutex
37+
count int
38+
}
39+
40+
func (r *recorder) Record() {
41+
r.lock.Lock()
42+
defer r.lock.Unlock()
43+
r.count++
44+
}
45+
46+
func (r *recorder) Count() int {
47+
r.lock.Lock()
48+
defer r.lock.Unlock()
49+
return r.count
50+
}
51+
3452
func TestTimeout(t *testing.T) {
3553
sendResponse := make(chan struct{}, 1)
3654
writeErrors := make(chan error, 1)
3755
timeout := make(chan time.Time, 1)
3856
resp := "test response"
3957
timeoutErr := apierrors.NewServerTimeout(schema.GroupResource{Group: "foo", Resource: "bar"}, "get", 0)
58+
record := &recorder{}
4059

4160
ts := httptest.NewServer(WithTimeout(http.HandlerFunc(
4261
func(w http.ResponseWriter, r *http.Request) {
4362
<-sendResponse
4463
_, err := w.Write([]byte(resp))
4564
writeErrors <- err
4665
}),
47-
func(*http.Request) (<-chan time.Time, *apierrors.StatusError) {
48-
return timeout, timeoutErr
66+
func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError) {
67+
return timeout, record.Record, timeoutErr
4968
}))
5069
defer ts.Close()
5170

@@ -65,6 +84,9 @@ func TestTimeout(t *testing.T) {
6584
if err := <-writeErrors; err != nil {
6685
t.Errorf("got unexpected Write error on first request: %v", err)
6786
}
87+
if record.Count() != 0 {
88+
t.Errorf("invoked record method: %#v", record)
89+
}
6890

6991
// Times out
7092
timeout <- time.Time{}
@@ -83,6 +105,9 @@ func TestTimeout(t *testing.T) {
83105
if !reflect.DeepEqual(status, &timeoutErr.ErrStatus) {
84106
t.Errorf("unexpected object: %s", diff.ObjectReflectDiff(&timeoutErr.ErrStatus, status))
85107
}
108+
if record.Count() != 1 {
109+
t.Errorf("did not invoke record method: %#v", record)
110+
}
86111

87112
// Now try to send a response
88113
sendResponse <- struct{}{}

0 commit comments

Comments
 (0)