diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index bfb882e2..510f4197 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -2,6 +2,7 @@ package httpsched import ( "errors" + "fmt" "log" "net/http" "sync" @@ -106,6 +107,13 @@ func disconnectedFn(state *state) stateFn { return disconnectedFn } + transitionToDisconnected := func() { + state.m.Lock() + defer state.m.Unlock() + state.fn = disconnectedFn + _ = stateResp.Close() // swallow any error here + } + // wrap the response: any errors processing the subscription stream should result in a // transition to a disconnected state ASAP. state.resp = &mesos.ResponseWrapper{ @@ -115,10 +123,17 @@ func disconnectedFn(state *state) stateFn { return func(u encoding.Unmarshaler) (err error) { err = decoder(u) if err != nil { - state.m.Lock() - defer state.m.Unlock() - state.fn = disconnectedFn - _ = stateResp.Close() // swallow any error here + transitionToDisconnected() + return + } + switch e := u.(type) { + case (*scheduler.Event): + if e.GetType() == scheduler.Event_ERROR { + transitionToDisconnected() + } + default: + err = httpcli.ProtocolError( + fmt.Sprintf("unexpected object on subscription event stream", e)) } return }