diff --git a/objdb/etcdLock.go b/objdb/etcdLock.go index e756ca01e..5450db29a 100644 --- a/objdb/etcdLock.go +++ b/objdb/etcdLock.go @@ -1,7 +1,7 @@ package objdb import ( - "strings" + "reflect" "sync" "time" @@ -353,12 +353,21 @@ func (lk *etcdLock) watchLock() { } for { resp, err := watcher.Next(lk.watchCtx) - if err != nil && (err.Error() == client.ErrClusterUnavailable.Error() || - strings.Contains(err.Error(), "context canceled")) { - log.Infof("Stopping watch on key %s", keyName) - return - } else if err != nil { - log.Errorf("Error watching the key %s, Err %v.", keyName, err) + if err != nil { + log.Infof("Watch %s next failed: %v %v", keyName, reflect.TypeOf(err), err) + switch err.(type) { + case *client.ClusterError: + // retry and wait for etcd cluster to recover! + time.Sleep(time.Second * 5) + case client.Error: + if err.(client.Error).Code == client.ErrorCodeEventIndexCleared { + watcher = lk.kapi.Watcher(keyName, nil) + log.Errorf("Watch next failed: reset watcher") + } + default: + // do nothing, just retry + } + continue } else { log.Debugf("Got Watch Resp: %+v", resp) diff --git a/objdb/etcdService.go b/objdb/etcdService.go index bdfa5998b..c6bd2232d 100644 --- a/objdb/etcdService.go +++ b/objdb/etcdService.go @@ -26,6 +26,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/coreos/etcd/client" + "reflect" ) // Service state @@ -194,14 +195,22 @@ func (ep *EtcdClient) WatchService(name string, eventCh chan WatchServiceEvent, for { // Block till next watch event etcdRsp, err := watcher.Next(watchCtx) - if err != nil && err.Error() == client.ErrClusterUnavailable.Error() { - log.Infof("Stopping watch on key %s", keyName) - return - } else if err != nil { - log.Errorf("Error %v during watch. Watch thread exiting", err) - return + if err != nil { + log.Infof("Watch %s next failed: %v %v", keyName, reflect.TypeOf(err), err) + switch err.(type) { + case *client.ClusterError: + // retry and wait for etcd cluster to recover! + time.Sleep(time.Second * 5) + case client.Error: + if err.(client.Error).Code == client.ErrorCodeEventIndexCleared { + watcher = ep.kapi.Watcher(keyName, &client.WatcherOptions{Recursive: true}) + log.Errorf("Watch next failed: reset watcher") + } + default: + // do nothing, just retry + } + continue } - // Send it to watch channel watchCh <- etcdRsp } diff --git a/state/etcdstatedriver.go b/state/etcdstatedriver.go index 8012d046f..f56eb5630 100644 --- a/state/etcdstatedriver.go +++ b/state/etcdstatedriver.go @@ -176,16 +176,26 @@ func (d *EtcdStateDriver) ReadAll(baseKey string) ([][]byte, error) { return [][]byte{}, err } -func (d *EtcdStateDriver) channelEtcdEvents(watcher client.Watcher, rsps chan [2][]byte) { +func (d *EtcdStateDriver) channelEtcdEvents(watcher client.Watcher, rsps chan [2][]byte, baseKey string) { for { // block on change notifications etcdRsp, err := watcher.Next(context.Background()) if err != nil { - log.Errorf("Error %v during watch", err) - time.Sleep(time.Second) + log.Infof("Watch %s next failed: %v %v", baseKey, reflect.TypeOf(err), err) + switch err.(type) { + case *client.ClusterError: + // retry and wait for etcd cluster to recover! + time.Sleep(time.Second * 5) + case client.Error: + if err.(client.Error).Code == client.ErrorCodeEventIndexCleared { + watcher = d.KeysAPI.Watcher(baseKey, &client.WatcherOptions{Recursive: true}) + log.Errorf("Watch next failed: reset watcher") + } + default: + // do nothing, just retry + } continue } - // XXX: The logic below assumes that the node returned is always a node // of interest. Eg: If we set a watch on /a/b/c, then we are mostly // interested in changes in that directory i.e. changes to /a/b/c/d1..d2 @@ -220,7 +230,7 @@ func (d *EtcdStateDriver) WatchAll(baseKey string, rsps chan [2][]byte) error { return errors.New("etcd watch failed") } - go d.channelEtcdEvents(watcher, rsps) + go d.channelEtcdEvents(watcher, rsps, baseKey) return nil }