diff --git a/api/v1/lib/httpcli/httpsched/httpsched.go b/api/v1/lib/httpcli/httpsched/httpsched.go index bef1e337..0f53d905 100644 --- a/api/v1/lib/httpcli/httpsched/httpsched.go +++ b/api/v1/lib/httpcli/httpsched/httpsched.go @@ -1,7 +1,6 @@ package httpsched import ( - "errors" "log" "net/http" "net/url" @@ -17,13 +16,13 @@ import ( ) var ( - errNotHTTP = errors.New("expected an HTTP object, found something else instead") - errBadLocation = errors.New("failed to build new Mesos service endpoint URL from Location header") + errNotHTTPCli = httpcli.ProtocolError("expected an httpcli.Response object, found something else instead") + errBadLocation = httpcli.ProtocolError("failed to build new Mesos service endpoint URL from Location header") DefaultRedirectSettings = RedirectSettings{ MaxAttempts: 9, MaxBackoffPeriod: 13 * time.Second, - MinBackoffPeriod: 100 * time.Millisecond, + MinBackoffPeriod: 500 * time.Millisecond, } ) @@ -176,12 +175,13 @@ func (cli *client) redirectHandler() httpcli.Opt { if err == nil || !isErrNotLeader(err) { return resp, err } + // TODO(jdef) for now, we're tightly coupled to the httpcli package's Response type res, ok := resp.(*httpcli.Response) if !ok { if resp != nil { resp.Close() } - return nil, errNotHTTP + return nil, errNotHTTPCli } log.Println("master changed?") location, ok := buildNewEndpoint(res.Header.Get("Location"), cli.Endpoint()) diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index 510f4197..1339434d 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -1,7 +1,6 @@ package httpsched import ( - "errors" "fmt" "log" "net/http" @@ -10,6 +9,7 @@ import ( "github.com/mesos/mesos-go/api/v1/lib" "github.com/mesos/mesos-go/api/v1/lib/encoding" "github.com/mesos/mesos-go/api/v1/lib/httpcli" + "github.com/mesos/mesos-go/api/v1/lib/httpcli/apierrors" "github.com/mesos/mesos-go/api/v1/lib/scheduler" "github.com/mesos/mesos-go/api/v1/lib/scheduler/calls" ) @@ -19,10 +19,13 @@ const ( debug = false ) +type StateError string + +func (err StateError) Error() string { return string(err) } + var ( - errMissingMesosStreamId = errors.New("missing Mesos-Stream-Id header expected with successful SUBSCRIBE") - errAlreadySubscribed = errors.New("already subscribed, cannot re-issue a SUBSCRIBE call") - errNotSubscribed = errors.New("not yet subscribed, must first issue a SUBSCRIBE call") + errMissingMesosStreamId = httpcli.ProtocolError("missing Mesos-Stream-Id header expected with successful SUBSCRIBE") + errAlreadySubscribed = StateError("already subscribed, cannot re-issue a SUBSCRIBE call") ) type ( @@ -59,11 +62,54 @@ func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc { return f } +// DisconnectionDetector is a programmable response decorator that attempts to detect errors +// that should transition the state from "connected" to "disconnected". Detector implementations +// are expected to invoke the `disconnect` callback in order to initiate the disconnection. +// +// The default implementation will transition to a disconnected state when: +// - an error occurs while decoding an object from the subscription stream +// - mesos reports an ERROR-type scheduler.Event object via the subscription stream +// - an object on the stream does not decode to a *scheduler.Event (sanity check) +// +// Consumers of this package may choose to override default behavior by overwriting the default +// value of this var, but should exercise caution: failure to properly transition to a disconnected +// state may cause subsequent Call operations to fail (without recourse). +var DisconnectionDetector = func(resp mesos.Response, disconnect func()) mesos.Response { + return &mesos.ResponseWrapper{ + Response: resp, + DecoderFunc: func() encoding.Decoder { + decoder := resp.Decoder() + return func(u encoding.Unmarshaler) (err error) { + err = decoder(u) + if err != nil { + disconnect() + return + } + switch e := u.(type) { + case (*scheduler.Event): + if e.GetType() == scheduler.Event_ERROR { + // the mesos scheduler API recommends that scheduler implementations + // resubscribe in this case. we initiate the disconnection here because + // it is assumed to be convenient for most framework implementations. + disconnect() + } + default: + // sanity check: this should never happen in practice. + err = httpcli.ProtocolError( + fmt.Sprintf("unexpected object on subscription event stream: %v", e)) + disconnect() + } + return + } + }, + } +} + func disconnectedFn(state *state) stateFn { // (a) validate call = SUBSCRIBE if state.call.GetType() != scheduler.Call_SUBSCRIBE { state.resp = nil - state.err = errNotSubscribed + state.err = &apierrors.Error{Code: apierrors.CodeUnsubscribed} return disconnectedFn } @@ -116,29 +162,7 @@ func disconnectedFn(state *state) stateFn { // wrap the response: any errors processing the subscription stream should result in a // transition to a disconnected state ASAP. - state.resp = &mesos.ResponseWrapper{ - Response: stateResp, - DecoderFunc: func() encoding.Decoder { - decoder := stateResp.Decoder() - return func(u encoding.Unmarshaler) (err error) { - err = decoder(u) - if err != nil { - transitionToDisconnected() - return - } - switch e := u.(type) { - case (*scheduler.Event): - if e.GetType() == scheduler.Event_ERROR { - transitionToDisconnected() - } - default: - err = httpcli.ProtocolError( - fmt.Sprintf("unexpected object on subscription event stream", e)) - } - return - } - }, - } + state.resp = DisconnectionDetector(stateResp, transitionToDisconnected) // (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed state.caller = &callerTemporary{ @@ -148,10 +172,42 @@ func disconnectedFn(state *state) stateFn { return connectedFn } +// CodesIndicatingSubscriptionLoss is a set of apierror.Code entries which each indicate that +// the event subscription stream has been severed between the scheduler and mesos. It's respresented +// as a public map variable so that clients can program additional error codes (if such are discovered) +// without hacking the code of the mesos-go library directly. +var CodesIndicatingSubscriptionLoss = func(codes ...apierrors.Code) map[apierrors.Code]struct{} { + result := make(map[apierrors.Code]struct{}, len(codes)) + for _, code := range codes { + result[code] = struct{}{} + } + return result +}( + // expand this list as we discover other errors that guarantee we've lost our event subscription. + apierrors.CodeUnsubscribed, +) + +func errorIndicatesSubscriptionLoss(err error) (result bool) { + if apiError, ok := err.(*apierrors.Error); ok { + _, result = CodesIndicatingSubscriptionLoss[apiError.Code] + } + // TODO(jdef) should other error types be considered here as well? + return +} + func connectedFn(state *state) stateFn { // (a) validate call != SUBSCRIBE if state.call.GetType() == scheduler.Call_SUBSCRIBE { state.resp = nil + + // TODO(jdef) not super happy with this error: I don't think that mesos minds if we issue + // redundant subscribe calls. However, the state tracking mechanism in this module can't + // cope with it (e.g. we'll need to track a new stream-id, etc). + // We make a best effort to transition to a disconnected state if we detect protocol errors, + // error events, or mesos-generated "not subscribed" errors. But we don't handle things such + // as, for example, authentication errors. Granted, the initial subscribe call should fail + // if authentication is an issue, so we should never end up here. I'm not convinced there's + // not other edge cases though with respect to other error codes. state.err = errAlreadySubscribed return connectedFn } @@ -159,7 +215,12 @@ func connectedFn(state *state) stateFn { // (b) execute call, save the result in resp, err state.resp, state.err = state.caller.Call(state.call) - // stay connected, don't attempt to interpret errors here + if errorIndicatesSubscriptionLoss(state.err) { + // properly transition back to a disconnected state if mesos thinks that we're unsubscribed + return disconnectedFn + } + + // stay connected, don't attempt to interpret other errors here return connectedFn }