Skip to content

Commit 7fd41ca

Browse files
author
James DeFelice
committed
introduce DisconnectionDetector and CodeIndicatingSubscriptionLoss
1 parent 5df42ce commit 7fd41ca

File tree

2 files changed

+95
-34
lines changed

2 files changed

+95
-34
lines changed

api/v1/lib/httpcli/httpsched/httpsched.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package httpsched
22

33
import (
4-
"errors"
54
"log"
65
"net/http"
76
"net/url"
@@ -17,13 +16,13 @@ import (
1716
)
1817

1918
var (
20-
errNotHTTP = errors.New("expected an HTTP object, found something else instead")
21-
errBadLocation = errors.New("failed to build new Mesos service endpoint URL from Location header")
19+
errNotHTTPCli = httpcli.ProtocolError("expected an httpcli.Response object, found something else instead")
20+
errBadLocation = httpcli.ProtocolError("failed to build new Mesos service endpoint URL from Location header")
2221

2322
DefaultRedirectSettings = RedirectSettings{
2423
MaxAttempts: 9,
2524
MaxBackoffPeriod: 13 * time.Second,
26-
MinBackoffPeriod: 100 * time.Millisecond,
25+
MinBackoffPeriod: 500 * time.Millisecond,
2726
}
2827
)
2928

@@ -176,12 +175,13 @@ func (cli *client) redirectHandler() httpcli.Opt {
176175
if err == nil || !isErrNotLeader(err) {
177176
return resp, err
178177
}
178+
// TODO(jdef) for now, we're tightly coupled to the httpcli package's Response type
179179
res, ok := resp.(*httpcli.Response)
180180
if !ok {
181181
if resp != nil {
182182
resp.Close()
183183
}
184-
return nil, errNotHTTP
184+
return nil, errNotHTTPCli
185185
}
186186
log.Println("master changed?")
187187
location, ok := buildNewEndpoint(res.Header.Get("Location"), cli.Endpoint())

api/v1/lib/httpcli/httpsched/state.go

Lines changed: 90 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package httpsched
22

33
import (
4-
"errors"
54
"fmt"
65
"log"
76
"net/http"
@@ -10,6 +9,7 @@ import (
109
"github.com/mesos/mesos-go/api/v1/lib"
1110
"github.com/mesos/mesos-go/api/v1/lib/encoding"
1211
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
12+
"github.com/mesos/mesos-go/api/v1/lib/httpcli/apierrors"
1313
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
1414
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
1515
)
@@ -19,10 +19,13 @@ const (
1919
debug = false
2020
)
2121

22+
type StateError string
23+
24+
func (err StateError) Error() string { return string(err) }
25+
2226
var (
23-
errMissingMesosStreamId = errors.New("missing Mesos-Stream-Id header expected with successful SUBSCRIBE")
24-
errAlreadySubscribed = errors.New("already subscribed, cannot re-issue a SUBSCRIBE call")
25-
errNotSubscribed = errors.New("not yet subscribed, must first issue a SUBSCRIBE call")
27+
errMissingMesosStreamId = httpcli.ProtocolError("missing Mesos-Stream-Id header expected with successful SUBSCRIBE")
28+
errAlreadySubscribed = StateError("already subscribed, cannot re-issue a SUBSCRIBE call")
2629
)
2730

2831
type (
@@ -59,11 +62,54 @@ func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc {
5962
return f
6063
}
6164

65+
// DisconnectionDetector is a programmable response decorator that attempts to detect errors
66+
// that should transition the state from "connected" to "disconnected". Detector implementations
67+
// are expected to invoke the `disconnect` callback in order to initiate the disconnection.
68+
//
69+
// The default implementation will transition to a disconnected state when:
70+
// - an error occurs while decoding an object from the subscription stream
71+
// - mesos reports an ERROR-type scheduler.Event object via the subscription stream
72+
// - an object on the stream does not decode to a *scheduler.Event (sanity check)
73+
//
74+
// Consumers of this package may choose to override default behavior by overwriting the default
75+
// value of this var, but should exercise caution: failure to properly transition to a disconnected
76+
// state may cause subsequent Call operations to fail (without recourse).
77+
var DisconnectionDetector = func(resp mesos.Response, disconnect func()) mesos.Response {
78+
return &mesos.ResponseWrapper{
79+
Response: resp,
80+
DecoderFunc: func() encoding.Decoder {
81+
decoder := resp.Decoder()
82+
return func(u encoding.Unmarshaler) (err error) {
83+
err = decoder(u)
84+
if err != nil {
85+
disconnect()
86+
return
87+
}
88+
switch e := u.(type) {
89+
case (*scheduler.Event):
90+
if e.GetType() == scheduler.Event_ERROR {
91+
// the mesos scheduler API recommends that scheduler implementations
92+
// resubscribe in this case. we initiate the disconnection here because
93+
// it is assumed to be convenient for most framework implementations.
94+
disconnect()
95+
}
96+
default:
97+
// sanity check: this should never happen in practice.
98+
err = httpcli.ProtocolError(
99+
fmt.Sprintf("unexpected object on subscription event stream: %v", e))
100+
disconnect()
101+
}
102+
return
103+
}
104+
},
105+
}
106+
}
107+
62108
func disconnectedFn(state *state) stateFn {
63109
// (a) validate call = SUBSCRIBE
64110
if state.call.GetType() != scheduler.Call_SUBSCRIBE {
65111
state.resp = nil
66-
state.err = errNotSubscribed
112+
state.err = &apierrors.Error{Code: apierrors.CodeUnsubscribed}
67113
return disconnectedFn
68114
}
69115

@@ -116,29 +162,7 @@ func disconnectedFn(state *state) stateFn {
116162

117163
// wrap the response: any errors processing the subscription stream should result in a
118164
// transition to a disconnected state ASAP.
119-
state.resp = &mesos.ResponseWrapper{
120-
Response: stateResp,
121-
DecoderFunc: func() encoding.Decoder {
122-
decoder := stateResp.Decoder()
123-
return func(u encoding.Unmarshaler) (err error) {
124-
err = decoder(u)
125-
if err != nil {
126-
transitionToDisconnected()
127-
return
128-
}
129-
switch e := u.(type) {
130-
case (*scheduler.Event):
131-
if e.GetType() == scheduler.Event_ERROR {
132-
transitionToDisconnected()
133-
}
134-
default:
135-
err = httpcli.ProtocolError(
136-
fmt.Sprintf("unexpected object on subscription event stream", e))
137-
}
138-
return
139-
}
140-
},
141-
}
165+
state.resp = DisconnectionDetector(stateResp, transitionToDisconnected)
142166

143167
// (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed
144168
state.caller = &callerTemporary{
@@ -148,18 +172,55 @@ func disconnectedFn(state *state) stateFn {
148172
return connectedFn
149173
}
150174

175+
// CodesIndicatingSubscriptionLoss is a set of apierror.Code entries which each indicate that
176+
// the event subscription stream has been severed between the scheduler and mesos. It's respresented
177+
// as a public map variable so that clients can program additional error codes (if such are discovered)
178+
// without hacking the code of the mesos-go library directly.
179+
var CodesIndicatingSubscriptionLoss = func(codes ...apierrors.Code) map[apierrors.Code]struct{} {
180+
result := make(map[apierrors.Code]struct{}, len(codes))
181+
for _, code := range codes {
182+
result[code] = struct{}{}
183+
}
184+
return result
185+
}(
186+
// expand this list as we discover other errors that guarantee we've lost our event subscription.
187+
apierrors.CodeUnsubscribed,
188+
)
189+
190+
func errorIndicatesSubscriptionLoss(err error) (result bool) {
191+
if apiError, ok := err.(*apierrors.Error); ok {
192+
_, result = CodesIndicatingSubscriptionLoss[apiError.Code]
193+
}
194+
// TODO(jdef) should other error types be considered here as well?
195+
return
196+
}
197+
151198
func connectedFn(state *state) stateFn {
152199
// (a) validate call != SUBSCRIBE
153200
if state.call.GetType() == scheduler.Call_SUBSCRIBE {
154201
state.resp = nil
202+
203+
// TODO(jdef) not super happy with this error: I don't think that mesos minds if we issue
204+
// redundant subscribe calls. However, the state tracking mechanism in this module can't
205+
// cope with it (e.g. we'll need to track a new stream-id, etc).
206+
// We make a best effort to transition to a disconnected state if we detect protocol errors,
207+
// error events, or mesos-generated "not subscribed" errors. But we don't handle things such
208+
// as, for example, authentication errors. Granted, the initial subscribe call should fail
209+
// if authentication is an issue, so we should never end up here. I'm not convinced there's
210+
// not other edge cases though with respect to other error codes.
155211
state.err = errAlreadySubscribed
156212
return connectedFn
157213
}
158214

159215
// (b) execute call, save the result in resp, err
160216
state.resp, state.err = state.caller.Call(state.call)
161217

162-
// stay connected, don't attempt to interpret errors here
218+
if errorIndicatesSubscriptionLoss(state.err) {
219+
// properly transition back to a disconnected state if mesos thinks that we're unsubscribed
220+
return disconnectedFn
221+
}
222+
223+
// stay connected, don't attempt to interpret other errors here
163224
return connectedFn
164225
}
165226

0 commit comments

Comments
 (0)