@@ -10,6 +10,8 @@ import (
10
10
11
11
"github.com/tarantool/go-tarantool/v2"
12
12
"github.com/tarantool/go-tarantool/v2/pool"
13
+ "github.com/vmihailenco/msgpack/v5"
14
+ "github.com/vmihailenco/msgpack/v5/msgpcode"
13
15
)
14
16
15
17
// --------------------------------------------------------------------------------
@@ -27,32 +29,135 @@ func (c VshardMode) String() string {
27
29
return string (c )
28
30
}
29
31
30
- type storageCallAssertError struct {
32
+ type vshardStorageCallResponseProto struct {
33
+ AssertError * assertError // not nil if there is assert error
34
+ VshardError * StorageCallVShardError // not nil if there is vshard response
35
+ Data []interface {} // raw response data
36
+ }
37
+
38
+ func (r * vshardStorageCallResponseProto ) DecodeMsgpack (d * msgpack.Decoder ) error {
39
+ /* vshard.storage.call(func) response has the next 4 possbile formats:
40
+ See: https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130
41
+ 1. vshard error has occurred:
42
+ array[nil, vshard_error]
43
+ 2. User method has finished with some error:
44
+ array[false, assert error]
45
+ 3. User mehod has finished successfully
46
+ a) but has not returned anything
47
+ array[true]
48
+ b) has returned 1 element
49
+ array[true, elem1]
50
+ c) has returned 2 element
51
+ array[true, elem1, elem2]
52
+ d) has returned 3 element
53
+ array[true, elem1, elem2, elem3]
54
+ */
55
+
56
+ // Ensure it is an array and get array len for protocol violation check
57
+ respArrayLen , err := d .DecodeArrayLen ()
58
+ if err != nil {
59
+ return err
60
+ }
61
+
62
+ if respArrayLen == 0 {
63
+ return fmt .Errorf ("protocol violation: invalid array length: %d" , respArrayLen )
64
+ }
65
+
66
+ // we need peek code to make our check faster than decode interface
67
+ // later we will check if code nil or bool
68
+ code , err := d .PeekCode ()
69
+ if err != nil {
70
+ return err
71
+ }
72
+
73
+ // this is storage error
74
+ if code == msgpcode .Nil {
75
+ err = d .DecodeNil ()
76
+ if err != nil {
77
+ return err
78
+ }
79
+
80
+ if respArrayLen != 2 {
81
+ return fmt .Errorf ("protocol violation: length is %d on vshard error case" , respArrayLen )
82
+ }
83
+
84
+ var vshardError StorageCallVShardError
85
+
86
+ err = d .Decode (& vshardError )
87
+ if err != nil {
88
+ return fmt .Errorf ("failed to decode storage vshard error: %w" , err )
89
+ }
90
+
91
+ r .VshardError = & vshardError
92
+
93
+ return nil
94
+ }
95
+
96
+ isVShardRespOk , err := d .DecodeBool ()
97
+ if err != nil {
98
+ return err
99
+ }
100
+
101
+ if ! isVShardRespOk {
102
+ // that means we have an assert errors and response is not ok
103
+ if respArrayLen != 2 {
104
+ return fmt .Errorf ("protocol violation: length is %d on assert error case" , respArrayLen )
105
+ }
106
+
107
+ var assertError assertError
108
+ err = d .Decode (& assertError )
109
+ if err != nil {
110
+ return fmt .Errorf ("failed to decode storage assert error: %w" , err )
111
+ }
112
+
113
+ r .AssertError = & assertError
114
+
115
+ return nil
116
+ }
117
+
118
+ // isVShardRespOk is true
119
+ r .Data = make ([]interface {}, 0 , respArrayLen - 1 )
120
+
121
+ for i := 1 ; i < respArrayLen ; i ++ {
122
+ elem , err := d .DecodeInterface ()
123
+ if err != nil {
124
+ return fmt .Errorf ("failed to decode into interface element #%d of response array" , i + 1 )
125
+ }
126
+ r .Data = append (r .Data , elem )
127
+ }
128
+
129
+ return nil
130
+ }
131
+
132
+ type assertError struct {
31
133
Code int `msgpack:"code"`
32
134
BaseType string `msgpack:"base_type"`
33
135
Type string `msgpack:"type"`
34
136
Message string `msgpack:"message"`
35
137
Trace interface {} `msgpack:"trace"`
36
138
}
37
139
38
- func (s storageCallAssertError ) Error () string {
39
- type alias storageCallAssertError
140
+ func (s assertError ) Error () string {
141
+ // Just print struct as is, use hack with alias type to avoid recursion:
142
+ // %v attempts to call Error() method for s, which is recursion.
143
+ // This alias doesn't have method Error().
144
+ type alias assertError
40
145
return fmt .Sprintf ("%+v" , alias (s ))
41
146
}
42
147
43
148
type StorageCallVShardError struct {
44
- BucketID uint64 `msgpack:"bucket_id" mapstructure:"bucket_id" `
149
+ BucketID uint64 `msgpack:"bucket_id"`
45
150
Reason string `msgpack:"reason"`
46
151
Code int `msgpack:"code"`
47
152
Type string `msgpack:"type"`
48
153
Message string `msgpack:"message"`
49
154
Name string `msgpack:"name"`
50
155
// These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type
51
156
// Example: 00000000-0000-0002-0002-000000000000
52
- MasterUUID string `msgpack:"master" mapstructure:"master" `
53
- ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset" `
54
- ReplicaUUID string `msgpack:"replica" mapstructure:"replica" `
55
- Destination string `msgpack:"destination" mapstructure:"destination" `
157
+ MasterUUID string `msgpack:"master"`
158
+ ReplicasetUUID string `msgpack:"replicaset"`
159
+ ReplicaUUID string `msgpack:"replica"`
160
+ Destination string `msgpack:"destination"`
56
161
}
57
162
58
163
func (s StorageCallVShardError ) Error () string {
@@ -108,7 +213,6 @@ func (r *Router) RouterCallImpl(ctx context.Context,
108
213
})
109
214
110
215
var err error
111
- var vshardError StorageCallVShardError
112
216
113
217
for {
114
218
if since := time .Since (timeStart ); since > timeout {
@@ -141,32 +245,20 @@ func (r *Router) RouterCallImpl(ctx context.Context,
141
245
142
246
future := rs .conn .Do (req , opts .PoolMode )
143
247
144
- var respData [] interface {}
145
- respData , err = future .Get ( )
248
+ var storageCallResponse vshardStorageCallResponseProto
249
+ err = future .GetTyped ( & storageCallResponse )
146
250
if err != nil {
147
251
return nil , nil , fmt .Errorf ("got error on future.Get(): %w" , err )
148
252
}
149
253
150
- r .log ().Debugf (ctx , "Got call result response data %v" , respData )
254
+ r .log ().Debugf (ctx , "Got call result response data %+ v" , storageCallResponse )
151
255
152
- if len (respData ) == 0 {
153
- // vshard.storage.call(func) returns up to two values:
154
- // - true/false/nil
155
- // - func result, omitted if func does not return anything
156
- return nil , nil , fmt .Errorf ("protocol violation %s: got empty response" , vshardStorageClientCall )
256
+ if storageCallResponse .AssertError != nil {
257
+ return nil , nil , fmt .Errorf ("%s: %s failed: %+v" , vshardStorageClientCall , fnc , storageCallResponse .AssertError )
157
258
}
158
259
159
- if respData [0 ] == nil {
160
- if len (respData ) != 2 {
161
- return nil , nil , fmt .Errorf ("protocol violation %s: length is %d when respData[0] is nil" , vshardStorageClientCall , len (respData ))
162
- }
163
-
164
- err = mapstructure .Decode (respData [1 ], & vshardError )
165
- if err != nil {
166
- // Something unexpected happened: we couldn't decode respData[1] as a vshardError,
167
- // so return reason why and respData[1], that is supposed to be a vshardError.
168
- return nil , nil , fmt .Errorf ("cant decode vhsard err by trarantool with err: %v (%v)" , err , respData [1 ])
169
- }
260
+ if storageCallResponse .VshardError != nil {
261
+ vshardError := storageCallResponse .VshardError
170
262
171
263
switch vshardError .Name {
172
264
case VShardErrNameWrongBucket , VShardErrNameBucketIsLocked :
@@ -201,7 +293,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
201
293
time .Sleep (defaultPoolingPause )
202
294
203
295
if time .Since (timeStart ) > timeout {
204
- return nil , nil , & vshardError
296
+ return nil , nil , vshardError
205
297
}
206
298
}
207
299
}
@@ -210,60 +302,36 @@ func (r *Router) RouterCallImpl(ctx context.Context,
210
302
211
303
r .metrics ().RetryOnCall ("bucket_migrate" )
212
304
213
- r .log ().Debugf (ctx , "Retrying fnc '%s' cause got vshard error: %v" , fnc , & vshardError )
305
+ r .log ().Debugf (ctx , "Retrying fnc '%s' cause got vshard error: %v" , fnc , vshardError )
214
306
215
307
// this vshardError will be returned to a caller in case of timeout
216
- err = & vshardError
308
+ err = vshardError
217
309
continue
218
310
case VShardErrNameTransferIsInProgress :
219
311
// Since lua vshard router doesn't retry here, we don't retry too.
220
312
// There is a comment why lua vshard router doesn't retry:
221
313
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
222
314
r .BucketReset (bucketID )
223
- return nil , nil , & vshardError
315
+ return nil , nil , vshardError
224
316
case VShardErrNameNonMaster :
225
317
// vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
226
318
// See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
227
319
// Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
228
320
// we just return this error as is.
229
- return nil , nil , & vshardError
321
+ return nil , nil , vshardError
230
322
default :
231
- return nil , nil , & vshardError
323
+ return nil , nil , vshardError
232
324
}
233
325
}
234
326
235
- var isVShardRespOk bool
236
- err = future .GetTyped (& []interface {}{& isVShardRespOk })
237
- if err != nil {
238
- return nil , nil , fmt .Errorf ("protocol violation %s: can't decode respData[0] as boolean: %v" , vshardStorageClientCall , err )
239
- }
240
-
241
- if ! isVShardRespOk {
242
- // Since we got respData[0] == false, it means that an error has happened
243
- // while executing user-defined function on vshard storage.
244
- // In this case, vshard storage must return a pair: false, error.
245
- if len (respData ) != 2 {
246
- return nil , nil , fmt .Errorf ("protocol violation %s: response length is %d when respData[0] is false" , vshardStorageClientCall , len (respData ))
247
- }
248
-
249
- var assertError storageCallAssertError
250
- err = mapstructure .Decode (respData [1 ], & assertError )
251
- if err != nil {
252
- // We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode.
253
- return nil , nil , fmt .Errorf ("%s: %s failed %v (decoding to assertError failed %v)" , vshardStorageClientCall , fnc , respData [1 ], err )
254
- }
255
-
256
- return nil , nil , fmt .Errorf ("%s: %s failed: %+v" , vshardStorageClientCall , fnc , assertError )
257
- }
258
-
259
327
r .metrics ().RequestDuration (time .Since (timeStart ), true , false )
260
328
261
- return respData [ 1 :] , func (result interface {}) error {
262
- if len (respData ) < 2 {
329
+ return storageCallResponse . Data , func (result interface {}) error {
330
+ if len (storageCallResponse . Data ) == 0 {
263
331
return nil
264
332
}
265
333
266
- var stub interface {}
334
+ var stub bool
267
335
268
336
return future .GetTyped (& []interface {}{& stub , result })
269
337
}, nil
@@ -399,7 +467,7 @@ func (r *Router) RouterMapCallRWImpl(
399
467
return nil , fmt .Errorf ("protocol violation: invalid respData length when respData[0] == nil, must be = 2, current: %d" , len (respData ))
400
468
}
401
469
402
- var assertError storageCallAssertError
470
+ var assertError assertError
403
471
err = mapstructure .Decode (respData [1 ], & assertError )
404
472
if err != nil {
405
473
// We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode.
0 commit comments