diff --git a/CHANGELOG.md b/CHANGELOG.md index 83248d8..c4eec17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ CHANGES: * Don't support 'type Error struct' anymore. * Linter: don't capitalize error strings and capitalize log. * Fix misspellings. +* Handle vshard error the same way as lua vshard router (resolve issue #77). FEATURES: diff --git a/api.go b/api.go index 2cc2277..ce6eb0e 100644 --- a/api.go +++ b/api.go @@ -52,6 +52,7 @@ type StorageCallVShardError struct { MasterUUID string `msgpack:"master" mapstructure:"master"` ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset"` ReplicaUUID string `msgpack:"replica" mapstructure:"replica"` + Destination string `msgpack:"destination" mapstructure:"destination"` } func (s StorageCallVShardError) Error() string { @@ -169,12 +170,44 @@ func (r *Router) RouterCallImpl(ctx context.Context, switch vshardError.Name { case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked: + // We reproduce here behavior in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663 r.BucketReset(bucketID) - // TODO we should inspect here err.destination like lua vshard router does, - // but we don't support vshard error fully yet: - // https://github.com/KaymeKaydex/go-vshard-router/issues/94 - // So we just retry here as a temporary solution. + if vshardError.Destination != "" { + destinationUUID, err := uuid.Parse(vshardError.Destination) + if err != nil { + return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w", + vshardStorageClientCall, vshardError, err) + } + + var loggedOnce bool + for { + idToReplicasetRef := r.getIDToReplicaset() + if _, ok := idToReplicasetRef[destinationUUID]; ok { + _, err := r.BucketSet(bucketID, destinationUUID) + if err == nil { + break // breaks loop + } + r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationUUID, err) + } + + if !loggedOnce { + r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+ + "update configuration", destinationUUID) + loggedOnce = true + } + + const defaultPoolingPause = 50 * time.Millisecond + time.Sleep(defaultPoolingPause) + + if time.Since(timeStart) > timeout { + return nil, nil, &vshardError + } + } + } + + // retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked + r.metrics().RetryOnCall("bucket_migrate") r.log().Debugf(ctx, "Retrying fnc '%s' cause got vshard error: %v", fnc, &vshardError)