diff --git a/CHANGELOG.md b/CHANGELOG.md index c4eec17..cfa8c83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ CHANGES: * Linter: don't capitalize error strings and capitalize log. * Fix misspellings. * Handle vshard error the same way as lua vshard router (resolve issue #77). +* Decode 'vshard.storage.call' response manually into struct vshardStorageCallResponseProto using DecodeMsgpack interface to reduce allocations (partially #61, #100). +* Remove `mapstructure` tag from StorageCallVShardError. +* Update benchmarks in README files. FEATURES: @@ -19,6 +22,9 @@ REFACTOR: * Use constants for vshard error names and codes. +TESTS: +* Rename bootstrap_test.go -> tarantool_test.go and new test in this file. + ## v1.2.0 CHANGES: diff --git a/README.md b/README.md index 244686d..e01f645 100644 --- a/README.md +++ b/README.md @@ -191,12 +191,12 @@ Service with go-vshard-router on top of the tarantool example from the original ## Benchmarks ### Go Bench -| Benchmark | Runs | Time (ns/op) | Memory (B/op) | Allocations (allocs/op) | -|-------------------------------------|--------|---------------|----------------|--------------------------| -| BenchmarkCallSimpleInsert_GO-8 | 9844 | 114596 | 1894 | 41 | -| BenchmarkCallSimpleInsert_Lua-8 | 7587 | 156181 | 1101 | 19 | -| BenchmarkCallSimpleSelect_GO-8 | 16350 | 75770 | 2827 | 60 | -| BenchmarkCallSimpleSelect_Lua-8 | 10060 | 116768 | 1610 | 28 | +| Benchmark | Runs | Time (ns/op) | Memory (B/op) | Allocations (allocs/op) | +|---------------------------------------|--------|---------------|----------------|-------------------------| +| BenchmarkCallSimpleInsert_GO-12 | 14216 | 81118 | 1419 | 29 | +| BenchmarkCallSimpleInsert_Lua-12 | 9580 | 123307 | 1131 | 19 | +| BenchmarkCallSimpleSelect_GO-12 | 18832 | 65190 | 1879 | 38 | +| BenchmarkCallSimpleSelect_Lua-12 | 9963 | 104781 | 1617 | 28 | ### [K6](https://github.com/grafana/k6) diff --git a/README_ru.md b/README_ru.md index bf2e2d0..d4648b8 100644 --- a/README_ru.md +++ b/README_ru.md @@ -191,12 +191,12 @@ func main() { ## Бенчмарки ### Go Bench -| Бенчмарк | Запусков | Время (ns/op) | Память (B/op) | Аллокации (allocs/op) | -|----------------------------------------|----------|----------------|----------------|------------------------| -| BenchmarkCallSimpleInsert_GO-8 | 9844 | 114596 | 1894 | 41 | -| BenchmarkCallSimpleInsert_Lua-8 | 7587 | 156181 | 1101 | 19 | -| BenchmarkCallSimpleSelect_GO-8 | 16350 | 75770 | 2827 | 60 | -| BenchmarkCallSimpleSelect_Lua-8 | 10060 | 116768 | 1610 | 28 | +| Бенчмарк | Число запусков | Время (ns/op) | Память (B/op) | Аллокации (allocs/op) | +|----------------------------------|----------------|---------------|---------------|-----------------------| +| BenchmarkCallSimpleInsert_GO-12 | 14216 | 81118 | 1419 | 29 | +| BenchmarkCallSimpleInsert_Lua-12 | 9580 | 123307 | 1131 | 19 | +| BenchmarkCallSimpleSelect_GO-12 | 18832 | 65190 | 1879 | 38 | +| BenchmarkCallSimpleSelect_Lua-12 | 9963 | 104781 | 1617 | 28 | ### [K6](https://github.com/grafana/k6) diff --git a/api.go b/api.go index ce6eb0e..35f0ce8 100644 --- a/api.go +++ b/api.go @@ -10,6 +10,8 @@ import ( "github.com/tarantool/go-tarantool/v2" "github.com/tarantool/go-tarantool/v2/pool" + "github.com/vmihailenco/msgpack/v5" + "github.com/vmihailenco/msgpack/v5/msgpcode" ) // -------------------------------------------------------------------------------- @@ -27,7 +29,107 @@ func (c VshardMode) String() string { return string(c) } -type storageCallAssertError struct { +type vshardStorageCallResponseProto struct { + AssertError *assertError // not nil if there is assert error + VshardError *StorageCallVShardError // not nil if there is vshard response + Data []interface{} // raw response data +} + +func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error { + /* vshard.storage.call(func) response has the next 4 possbile formats: + See: https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130 + 1. vshard error has occurred: + array[nil, vshard_error] + 2. User method has finished with some error: + array[false, assert error] + 3. User mehod has finished successfully + a) but has not returned anything + array[true] + b) has returned 1 element + array[true, elem1] + c) has returned 2 element + array[true, elem1, elem2] + d) has returned 3 element + array[true, elem1, elem2, elem3] + */ + + // Ensure it is an array and get array len for protocol violation check + respArrayLen, err := d.DecodeArrayLen() + if err != nil { + return err + } + + if respArrayLen == 0 { + return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen) + } + + // we need peek code to make our check faster than decode interface + // later we will check if code nil or bool + code, err := d.PeekCode() + if err != nil { + return err + } + + // this is storage error + if code == msgpcode.Nil { + err = d.DecodeNil() + if err != nil { + return err + } + + if respArrayLen != 2 { + return fmt.Errorf("protocol violation: length is %d on vshard error case", respArrayLen) + } + + var vshardError StorageCallVShardError + + err = d.Decode(&vshardError) + if err != nil { + return fmt.Errorf("failed to decode storage vshard error: %w", err) + } + + r.VshardError = &vshardError + + return nil + } + + isVShardRespOk, err := d.DecodeBool() + if err != nil { + return err + } + + if !isVShardRespOk { + // that means we have an assert errors and response is not ok + if respArrayLen != 2 { + return fmt.Errorf("protocol violation: length is %d on assert error case", respArrayLen) + } + + var assertError assertError + err = d.Decode(&assertError) + if err != nil { + return fmt.Errorf("failed to decode storage assert error: %w", err) + } + + r.AssertError = &assertError + + return nil + } + + // isVShardRespOk is true + r.Data = make([]interface{}, 0, respArrayLen-1) + + for i := 1; i < respArrayLen; i++ { + elem, err := d.DecodeInterface() + if err != nil { + return fmt.Errorf("failed to decode into interface element #%d of response array", i+1) + } + r.Data = append(r.Data, elem) + } + + return nil +} + +type assertError struct { Code int `msgpack:"code"` BaseType string `msgpack:"base_type"` Type string `msgpack:"type"` @@ -35,13 +137,16 @@ type storageCallAssertError struct { Trace interface{} `msgpack:"trace"` } -func (s storageCallAssertError) Error() string { - type alias storageCallAssertError +func (s assertError) Error() string { + // Just print struct as is, use hack with alias type to avoid recursion: + // %v attempts to call Error() method for s, which is recursion. + // This alias doesn't have method Error(). + type alias assertError return fmt.Sprintf("%+v", alias(s)) } type StorageCallVShardError struct { - BucketID uint64 `msgpack:"bucket_id" mapstructure:"bucket_id"` + BucketID uint64 `msgpack:"bucket_id"` Reason string `msgpack:"reason"` Code int `msgpack:"code"` Type string `msgpack:"type"` @@ -49,10 +154,10 @@ type StorageCallVShardError struct { Name string `msgpack:"name"` // These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type // Example: 00000000-0000-0002-0002-000000000000 - MasterUUID string `msgpack:"master" mapstructure:"master"` - ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset"` - ReplicaUUID string `msgpack:"replica" mapstructure:"replica"` - Destination string `msgpack:"destination" mapstructure:"destination"` + MasterUUID string `msgpack:"master"` + ReplicasetUUID string `msgpack:"replicaset"` + ReplicaUUID string `msgpack:"replica"` + Destination string `msgpack:"destination"` } func (s StorageCallVShardError) Error() string { @@ -108,7 +213,6 @@ func (r *Router) RouterCallImpl(ctx context.Context, }) var err error - var vshardError StorageCallVShardError for { if since := time.Since(timeStart); since > timeout { @@ -141,32 +245,20 @@ func (r *Router) RouterCallImpl(ctx context.Context, future := rs.conn.Do(req, opts.PoolMode) - var respData []interface{} - respData, err = future.Get() + var storageCallResponse vshardStorageCallResponseProto + err = future.GetTyped(&storageCallResponse) if err != nil { return nil, nil, fmt.Errorf("got error on future.Get(): %w", err) } - r.log().Debugf(ctx, "Got call result response data %v", respData) + r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse) - if len(respData) == 0 { - // vshard.storage.call(func) returns up to two values: - // - true/false/nil - // - func result, omitted if func does not return anything - return nil, nil, fmt.Errorf("protocol violation %s: got empty response", vshardStorageClientCall) + if storageCallResponse.AssertError != nil { + return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError) } - if respData[0] == nil { - if len(respData) != 2 { - return nil, nil, fmt.Errorf("protocol violation %s: length is %d when respData[0] is nil", vshardStorageClientCall, len(respData)) - } - - err = mapstructure.Decode(respData[1], &vshardError) - if err != nil { - // Something unexpected happened: we couldn't decode respData[1] as a vshardError, - // so return reason why and respData[1], that is supposed to be a vshardError. - return nil, nil, fmt.Errorf("cant decode vhsard err by trarantool with err: %v (%v)", err, respData[1]) - } + if storageCallResponse.VshardError != nil { + vshardError := storageCallResponse.VshardError switch vshardError.Name { case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked: @@ -201,7 +293,7 @@ func (r *Router) RouterCallImpl(ctx context.Context, time.Sleep(defaultPoolingPause) if time.Since(timeStart) > timeout { - return nil, nil, &vshardError + return nil, nil, vshardError } } } @@ -210,60 +302,36 @@ func (r *Router) RouterCallImpl(ctx context.Context, r.metrics().RetryOnCall("bucket_migrate") - r.log().Debugf(ctx, "Retrying fnc '%s' cause got vshard error: %v", fnc, &vshardError) + r.log().Debugf(ctx, "Retrying fnc '%s' cause got vshard error: %v", fnc, vshardError) // this vshardError will be returned to a caller in case of timeout - err = &vshardError + err = vshardError continue case VShardErrNameTransferIsInProgress: // Since lua vshard router doesn't retry here, we don't retry too. // There is a comment why lua vshard router doesn't retry: // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697 r.BucketReset(bucketID) - return nil, nil, &vshardError + return nil, nil, vshardError case VShardErrNameNonMaster: // vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case: // See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704. // Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master, // we just return this error as is. - return nil, nil, &vshardError + return nil, nil, vshardError default: - return nil, nil, &vshardError + return nil, nil, vshardError } } - var isVShardRespOk bool - err = future.GetTyped(&[]interface{}{&isVShardRespOk}) - if err != nil { - return nil, nil, fmt.Errorf("protocol violation %s: can't decode respData[0] as boolean: %v", vshardStorageClientCall, err) - } - - if !isVShardRespOk { - // Since we got respData[0] == false, it means that an error has happened - // while executing user-defined function on vshard storage. - // In this case, vshard storage must return a pair: false, error. - if len(respData) != 2 { - return nil, nil, fmt.Errorf("protocol violation %s: response length is %d when respData[0] is false", vshardStorageClientCall, len(respData)) - } - - var assertError storageCallAssertError - err = mapstructure.Decode(respData[1], &assertError) - if err != nil { - // We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode. - return nil, nil, fmt.Errorf("%s: %s failed %v (decoding to assertError failed %v)", vshardStorageClientCall, fnc, respData[1], err) - } - - return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, assertError) - } - r.metrics().RequestDuration(time.Since(timeStart), true, false) - return respData[1:], func(result interface{}) error { - if len(respData) < 2 { + return storageCallResponse.Data, func(result interface{}) error { + if len(storageCallResponse.Data) == 0 { return nil } - var stub interface{} + var stub bool return future.GetTyped(&[]interface{}{&stub, result}) }, nil @@ -399,7 +467,7 @@ func (r *Router) RouterMapCallRWImpl( return nil, fmt.Errorf("protocol violation: invalid respData length when respData[0] == nil, must be = 2, current: %d", len(respData)) } - var assertError storageCallAssertError + var assertError assertError err = mapstructure.Decode(respData[1], &assertError) if err != nil { // We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode. diff --git a/bootstrap_test.go b/tarantool_test.go similarity index 64% rename from bootstrap_test.go rename to tarantool_test.go index fb8fd89..076ae8d 100644 --- a/bootstrap_test.go +++ b/tarantool_test.go @@ -7,35 +7,16 @@ import ( "testing" "time" - vshard_router "github.com/KaymeKaydex/go-vshard-router" + vshardrouter "github.com/KaymeKaydex/go-vshard-router" "github.com/KaymeKaydex/go-vshard-router/providers/static" chelper "github.com/KaymeKaydex/go-vshard-router/test_helper" "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/pool" "github.com/tarantool/go-tarantool/v2/test_helpers" ) -func TestRouter_ClusterBootstrap(t *testing.T) { - ctx := context.Background() - - router, err := vshard_router.NewRouter(ctx, vshard_router.Config{ - TotalBucketCount: 100, - TopologyProvider: static.NewProvider(topology), - User: "guest", - }) - require.NotNil(t, router) - require.NoError(t, err) - - err = router.ClusterBootstrap(ctx, false) - require.NoError(t, err) - for _, rs := range router.RouterRouteAll() { - count, err := rs.BucketsCount(ctx) - require.NoError(t, err) - require.Equal(t, count, uint64(50)) - } -} - const instancesCount = 4 // init servers from our cluster @@ -48,7 +29,7 @@ var serverNames = map[string]string{ "storage_2_b": "127.0.0.1:3304", } -var topology = map[vshard_router.ReplicasetInfo][]vshard_router.InstanceInfo{ +var topology = map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{ { Name: "storage_1", UUID: uuid.New(), @@ -121,3 +102,58 @@ func TestMain(m *testing.M) { code := runTestMain(m) os.Exit(code) } + +func TestRouter_ClusterBootstrap(t *testing.T) { + ctx := context.Background() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TotalBucketCount: 100, + TopologyProvider: static.NewProvider(topology), + User: "guest", + }) + require.NotNil(t, router) + require.NoError(t, err) + + err = router.ClusterBootstrap(ctx, false) + require.NoError(t, err) + for _, rs := range router.RouterRouteAll() { + count, err := rs.BucketsCount(ctx) + require.NoError(t, err) + require.NotEqual(t, count, uint64(0)) + } +} + +func TestRouter_RouterCallImpl_Decoding(t *testing.T) { + ctx := context.Background() + + type Product struct { + BucketID uint64 `msgpack:"bucket_id"` + ID string `msgpack:"id"` + Name string `msgpack:"name"` + Count uint64 `msgpack:"count"` + } + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TotalBucketCount: 100, + TopologyProvider: static.NewProvider(topology), + User: "guest", + }) + require.NotNil(t, router) + require.NoError(t, err) + + // bootstrap and discovery again if this is single test testing + require.NoError(t, router.ClusterBootstrap(ctx, true)) + require.NoError(t, router.DiscoveryAllBuckets(ctx)) + + id := uuid.New() + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + _, _, err = router.RouterCallImpl( + ctx, + bucketID, + vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW, Timeout: 10 * time.Second}, + "echo", + []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}, "test"}) + + require.NoError(t, err) +} diff --git a/tests/tnt/call_bench_test.go b/tests/tnt/call_bench_test.go index ea1fb56..46c2dbd 100644 --- a/tests/tnt/call_bench_test.go +++ b/tests/tnt/call_bench_test.go @@ -35,6 +35,7 @@ func BenchmarkCallSimpleInsert_GO(b *testing.B) { TotalBucketCount: totalBucketCount, User: defaultTntUser, Password: defaultTntPassword, + RequestTimeout: time.Minute, }) require.NoError(b, err) @@ -46,7 +47,7 @@ func BenchmarkCallSimpleInsert_GO(b *testing.B) { _, _, err := router.RouterCallImpl( ctx, bucketID, - vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW, Timeout: time.Second}, + vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW, Timeout: 10 * time.Second}, "product_add", []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}) require.NoError(b, err)