Skip to content

Add sync pool enable options #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ Service with go-vshard-router on top of the tarantool example from the original
## Benchmarks
### Go Bench

Вот обновленная таблица с добавленным benchmark:

| Benchmark | Runs | Time (ns/op) | Memory (B/op) | Allocations (allocs/op) |
|---------------------------------------|--------|---------------|----------------|-------------------------|
| BenchmarkCallSimpleInsert_GO-12 | 14216 | 81118 | 1419 | 29 |
| BenchmarkCallSimpleInsert_Lua-12 | 9580 | 123307 | 1131 | 19 |
| BenchmarkCallSimpleSelect_GO-12 | 18832 | 65190 | 1879 | 38 |
| BenchmarkCallSimpleSelect_Lua-12 | 9963 | 104781 | 1617 | 28 |

| BenchmarkRouter_Call_Select_SyncPool-12 | 19956 | 60924 | 1848 | 36 |

### [K6](https://github.com/grafana/k6)
Topology:
Expand Down
4 changes: 2 additions & 2 deletions README_ru.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,12 @@ func main() {
### Go Bench

| Бенчмарк | Число запусков | Время (ns/op) | Память (B/op) | Аллокации (allocs/op) |
|----------------------------------|----------------|---------------|---------------|-----------------------|
|---------------------------------------|--------|---------------|----------------|-------------------------|
| BenchmarkCallSimpleInsert_GO-12 | 14216 | 81118 | 1419 | 29 |
| BenchmarkCallSimpleInsert_Lua-12 | 9580 | 123307 | 1131 | 19 |
| BenchmarkCallSimpleSelect_GO-12 | 18832 | 65190 | 1879 | 38 |
| BenchmarkCallSimpleSelect_Lua-12 | 9963 | 104781 | 1617 | 28 |

| BenchmarkRouter_Call_Select_SyncPool-12 | 19956 | 60924 | 1848 | 36 |

### [K6](https://github.com/grafana/k6)

Expand Down
48 changes: 45 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sync"
"time"

"github.com/google/uuid"
Expand All @@ -14,6 +15,14 @@ import (
"github.com/vmihailenco/msgpack/v5/msgpcode"
)

// Create a global pool for bytes.Buffer
var responseBufferPool = sync.Pool{
New: func() interface{} {
// The New function creates a new bytes.Buffer instance if the pool is empty
return &bytes.Buffer{}
},
}

// --------------------------------------------------------------------------------
// -- API
// --------------------------------------------------------------------------------
Expand All @@ -36,6 +45,9 @@ type vshardStorageCallResponseProto struct {
AssertError *assertError // not nil if there is assert error
VshardError *StorageCallVShardError // not nil if there is vshard response
CallResp VshardRouterCallResp

EnableResponseSyncPool bool
EnableDecodersSyncPool bool
}

func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
Expand Down Expand Up @@ -119,7 +131,13 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error
}

// isVShardRespOk is true
buf := bytes.NewBuffer(nil)
var buf *bytes.Buffer

if r.EnableResponseSyncPool {
buf = responseBufferPool.Get().(*bytes.Buffer)
} else {
buf = bytes.NewBuffer(nil)
}

buf.WriteByte(msgpcode.FixedArrayLow | byte(respArrayLen-1))

Expand All @@ -130,6 +148,9 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error

r.CallResp.buf = buf

r.CallResp.enableDecodersSyncPool = r.EnableDecodersSyncPool
r.CallResp.enableResponseSyncPool = r.EnableResponseSyncPool

return nil
}

Expand Down Expand Up @@ -197,7 +218,16 @@ const (

// VshardRouterCallResp represents a response from Router.Call[XXX] methods.
type VshardRouterCallResp struct {
buf *bytes.Buffer
buf *bytes.Buffer
enableDecodersSyncPool bool
enableResponseSyncPool bool
}

func (r VshardRouterCallResp) Close() {
if r.enableResponseSyncPool {
r.buf.Reset()
responseBufferPool.Put(r.buf)
}
}

// Get returns a response from user defined function as []interface{}.
Expand All @@ -210,7 +240,15 @@ func (r VshardRouterCallResp) Get() ([]interface{}, error) {

// GetTyped decodes a response from user defined function into custom values.
func (r VshardRouterCallResp) GetTyped(result interface{}) error {
return msgpack.Unmarshal(r.buf.Bytes(), result)
if !r.enableDecodersSyncPool {
return msgpack.Unmarshal(r.buf.Bytes(), result)
}

decoder := msgpack.GetDecoder()
decoder.Reset(r.buf)
defer msgpack.PutDecoder(decoder)

return decoder.Decode(result)
}

// Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.
Expand Down Expand Up @@ -294,6 +332,10 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)

var storageCallResponse vshardStorageCallResponseProto

storageCallResponse.EnableResponseSyncPool = r.cfg.EnableResponseSyncPool
storageCallResponse.EnableDecodersSyncPool = r.cfg.EnableDecodersSyncPool

err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse)
if err != nil {
return VshardRouterCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err)
Expand Down
69 changes: 69 additions & 0 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,3 +467,72 @@ func BenchmarkCallSimpleSelect_GO_Call(b *testing.B) {

b.ReportAllocs()
}

func BenchmarkRouter_Call_Select_SyncPool(b *testing.B) {
b.StopTimer()

ctx := context.Background()

router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
TopologyProvider: static.NewProvider(topology),
DiscoveryTimeout: 5 * time.Second,
DiscoveryMode: vshardrouter.DiscoveryModeOn,
TotalBucketCount: totalBucketCount,
User: username,
EnableResponseSyncPool: true,
EnableDecodersSyncPool: true,
})
require.NoError(b, err)

err = router.ClusterBootstrap(ctx, true)
require.NoError(b, err)

ids := make([]uuid.UUID, b.N)

for i := 0; i < b.N; i++ {
id := uuid.New()
ids[i] = id

bucketID := router.RouterBucketIDStrCRC32(id.String())
resp, err := router.Call(
ctx,
bucketID,
vshardrouter.CallModeRW,
"product_add",
[]interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}},
vshardrouter.CallOpts{},
)
require.NoError(b, err)
resp.Close()
}

type Request struct {
ID string `msgpack:"id"`
}

b.StartTimer()
for i := 0; i < b.N; i++ {
id := ids[i]

bucketID := router.RouterBucketIDStrCRC32(id.String())
resp, err1 := router.Call(
ctx,
bucketID,
vshardrouter.CallModeBRO,
"product_get",
[]interface{}{&Request{ID: id.String()}},
vshardrouter.CallOpts{Timeout: time.Second},
)

var product Product

err2 := resp.GetTyped(&[]interface{}{&product})
resp.Close()
b.StopTimer()
require.NoError(b, err1)
require.NoError(b, err2)
b.StartTimer()
}

b.ReportAllocs()
}
9 changes: 9 additions & 0 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ type Config struct {
// that is, our retry timeout if the buckets, for example, move.
// Currently, it only works for sugar implementations .
RequestTimeout time.Duration

// EnableResponseSyncPool determines whether the sync pool for responses should be enabled.
// When enabled, a pool will be used to reuse response objects, improving memory management and performance.
// If this option is enabled, you must call the Close method on the Response object when done using it,
// otherwise, memory leaks may occur due to unreleased resources.
EnableResponseSyncPool bool
// EnableDecodersSyncPool determines whether the sync pool for decoders should be enabled.
// When enabled, a pool will be used to reuse decoder objects, reducing the overhead of creating new decoders.
EnableDecodersSyncPool bool
}

type BucketStatInfo struct {
Expand Down
Loading