diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b51f16..0a23253 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,9 @@ REFACTOR: * Use constants for vshard error names and codes. * Reduce SLOC by using CallAsync method. * BucketForceCreate optimization: don't decode tnt response. +* Remove bucketStatError type, use StorageCallVShardError type instead. +* Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100). +* Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content). TESTS: * Rename bootstrap_test.go -> tarantool_test.go and new test in this file. diff --git a/discovery.go b/discovery.go index c7de589..12b1265 100644 --- a/discovery.go +++ b/discovery.go @@ -88,8 +88,8 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl for _, rsFuture := range rsFutures { if _, err := bucketStatWait(rsFuture.future); err != nil { - var bsError bucketStatError - if !errors.As(err, &bsError) { + var vshardError StorageCallVShardError + if !errors.As(err, &vshardError) { r.log().Errorf(ctx, "bucketSearchLegacy: bucketStatWait call error for %v: %v", rsFuture.rsID, err) } // just skip, bucket may not belong to this replicaset diff --git a/error.go b/error.go index e8c4869..7ad5a6b 100644 --- a/error.go +++ b/error.go @@ -92,20 +92,6 @@ const ( VShardErrNameInstanceNameMismatch = "INSTANCE_NAME_MISMATCH" ) -type bucketStatError struct { - BucketID uint64 `msgpack:"bucket_id"` - Reason string `msgpack:"reason"` - Code int `msgpack:"code"` - Type string `msgpack:"type"` - Message string `msgpack:"message"` - Name string `msgpack:"name"` -} - -func (bse bucketStatError) Error() string { - type alias bucketStatError - return fmt.Sprintf("%+v", alias(bse)) -} - func newVShardErrorNoRouteToBucket(bucketID uint64) error { return &StorageCallVShardError{ Name: VShardErrNameNoRouteToBucket, diff --git a/replicaset.go b/replicaset.go index 74692b8..fe60a73 100644 --- a/replicaset.go +++ b/replicaset.go @@ -7,9 +7,10 @@ import ( "time" "github.com/google/uuid" - "github.com/mitchellh/mapstructure" "github.com/tarantool/go-tarantool/v2" "github.com/tarantool/go-tarantool/v2/pool" + "github.com/vmihailenco/msgpack/v5" + "github.com/vmihailenco/msgpack/v5/msgpcode" ) type ReplicasetInfo struct { @@ -51,41 +52,71 @@ func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tar return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RO}, bucketStatFnc, []interface{}{bucketID}) } -func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) { - var bsInfo BucketStatInfo +type vshardStorageBucketStatResponseProto struct { + ok bool + info BucketStatInfo + err StorageCallVShardError +} - respData, err := future.Get() +func (r *vshardStorageBucketStatResponseProto) DecodeMsgpack(d *msgpack.Decoder) error { + // bucket_stat returns pair: stat, err + // https://github.com/tarantool/vshard/blob/e1c806e1d3d2ce8a4e6b4d498c09051bf34ab92a/vshard/storage/init.lua#L1413 + + respArrayLen, err := d.DecodeArrayLen() if err != nil { - return bsInfo, err + return err } - if len(respData) == 0 { - return bsInfo, fmt.Errorf("protocol violation bucketStatWait: empty response") + if respArrayLen == 0 { + return fmt.Errorf("protocol violation bucketStatWait: empty response") } - if respData[0] == nil { - if len(respData) != 2 { - return bsInfo, fmt.Errorf("protocol violation bucketStatWait: invalid response length %d when respData[0] is nil", len(respData)) + code, err := d.PeekCode() + if err != nil { + return err + } + + if code == msgpcode.Nil { + err = d.DecodeNil() + if err != nil { + return err + } + + if respArrayLen != 2 { + return fmt.Errorf("protocol violation bucketStatWait: length is %d on vshard error case", respArrayLen) } - var bsError bucketStatError - err = mapstructure.Decode(respData[1], &bsError) + err = d.Decode(&r.err) if err != nil { - // We could not decode respData[1] as bsError, so return respData[1] as is, add info why we could not decode. - return bsInfo, fmt.Errorf("bucketStatWait error: %v (can't decode into bsError: %v)", respData[1], err) + return fmt.Errorf("failed to decode storage vshard error: %w", err) } - return bsInfo, bsError + return nil } - // A problem with key-code 1 - // todo: fix after https://github.com/tarantool/go-tarantool/issues/368 - err = mapstructure.Decode(respData[0], &bsInfo) + err = d.Decode(&r.info) if err != nil { - return bsInfo, fmt.Errorf("can't decode bsInfo: %w", err) + return fmt.Errorf("failed to decode bucket stat info: %w", err) + } + + r.ok = true + + return nil +} + +func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) { + var bucketStatResponse vshardStorageBucketStatResponseProto + + err := future.GetTyped(&bucketStatResponse) + if err != nil { + return BucketStatInfo{}, err + } + + if !bucketStatResponse.ok { + return BucketStatInfo{}, bucketStatResponse.err } - return bsInfo, nil + return bucketStatResponse.info, nil } // ReplicaCall perform function on remote storage diff --git a/vshard.go b/vshard.go index db0da6d..86b557e 100644 --- a/vshard.go +++ b/vshard.go @@ -9,6 +9,7 @@ import ( "github.com/google/uuid" "github.com/snksoft/crc" + "github.com/vmihailenco/msgpack/v5" tarantool "github.com/tarantool/go-tarantool/v2" ) @@ -122,8 +123,48 @@ type Config struct { } type BucketStatInfo struct { - BucketID uint64 `mapstructure:"id"` - Status string `mapstructure:"status"` + BucketID uint64 `msgpack:"id"` + Status string `msgpack:"status"` +} + +// tnt vshard storage returns map with 'int' keys for bucketStatInfo, +// example: map[id:48 status:active 1:48 2:active]. +// But msgpackv5 supports only string keys when decoding maps into structs, +// see issue: https://github.com/vmihailenco/msgpack/issues/372 +// To workaround this we decode BucketStatInfo manually. +// When the issue above will be resolved, this code can be (and should be) deleted. +func (bsi *BucketStatInfo) DecodeMsgpack(d *msgpack.Decoder) error { + nKeys, err := d.DecodeMapLen() + if err != nil { + return err + } + + for i := 0; i < nKeys; i++ { + key, err := d.DecodeInterface() + if err != nil { + return err + } + + keyName, _ := key.(string) + switch keyName { + case "id": + if err := d.Decode(&bsi.BucketID); err != nil { + return err + } + case "status": + if err := d.Decode(&bsi.Status); err != nil { + return err + } + default: + // skip unused value + if err := d.Skip(); err != nil { + return err + } + } + + } + + return nil } type InstanceInfo struct {