@@ -52,6 +52,7 @@ type StorageCallVShardError struct {
52
52
MasterUUID string `msgpack:"master" mapstructure:"master"`
53
53
ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset"`
54
54
ReplicaUUID string `msgpack:"replica" mapstructure:"replica"`
55
+ Destination string `msgpack:"destination" mapstructure:"destination"`
55
56
}
56
57
57
58
func (s StorageCallVShardError ) Error () string {
@@ -169,12 +170,44 @@ func (r *Router) RouterCallImpl(ctx context.Context,
169
170
170
171
switch vshardError .Name {
171
172
case VShardErrNameWrongBucket , VShardErrNameBucketIsLocked :
173
+ // We reproduce here behavior in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663
172
174
r .BucketReset (bucketID )
173
175
174
- // TODO we should inspect here err.destination like lua vshard router does,
175
- // but we don't support vshard error fully yet:
176
- // https://github.com/KaymeKaydex/go-vshard-router/issues/94
177
- // So we just retry here as a temporary solution.
176
+ if vshardError .Destination != "" {
177
+ destinationUUID , err := uuid .Parse (vshardError .Destination )
178
+ if err != nil {
179
+ return nil , nil , fmt .Errorf ("protocol violation %s: malformed destination %w: %w" ,
180
+ vshardStorageClientCall , vshardError , err )
181
+ }
182
+
183
+ var loggedOnce bool
184
+ for {
185
+ idToReplicasetRef := r .getIDToReplicaset ()
186
+ if _ , ok := idToReplicasetRef [destinationUUID ]; ok {
187
+ _ , err := r .BucketSet (bucketID , destinationUUID )
188
+ if err == nil {
189
+ break // breaks loop
190
+ }
191
+ r .log ().Warnf (ctx , "Failed set bucket %d to %v (possible race): %v" , bucketID , destinationUUID , err )
192
+ }
193
+
194
+ if ! loggedOnce {
195
+ r .log ().Warnf (ctx , "Replicaset '%v' was not found, but received from storage as destination - please " +
196
+ "update configuration" , destinationUUID )
197
+ loggedOnce = true
198
+ }
199
+
200
+ const defaultPoolingPause = 50 * time .Millisecond
201
+ time .Sleep (defaultPoolingPause )
202
+
203
+ if time .Since (timeStart ) > timeout {
204
+ return nil , nil , & vshardError
205
+ }
206
+ }
207
+ }
208
+
209
+ // retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked
210
+
178
211
r .metrics ().RetryOnCall ("bucket_migrate" )
179
212
180
213
r .log ().Debugf (ctx , "Retrying fnc '%s' cause got vshard error: %v" , fnc , & vshardError )
0 commit comments