Skip to content

Commit 6ae2802

Browse files
bench: Add benchmark functionality for single node Tarantool topology
1 parent d9fca95 commit 6ae2802

File tree

9 files changed

+335
-0
lines changed

9 files changed

+335
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2424

2525
### Added
2626

27+
- Tarantool benchmark tool.
2728
- Ability to reverse search in ``cartridge enter`` and ``cartridge connect`` commands.
2829
- Added support for functionality from Golang 1.17.
2930

cli/bench/bench.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package bench
2+
3+
import (
4+
bctx "context"
5+
"fmt"
6+
"math/rand"
7+
"sync"
8+
"time"
9+
10+
"github.com/FZambia/tarantool"
11+
"github.com/tarantool/cartridge-cli/cli/common"
12+
"github.com/tarantool/cartridge-cli/cli/context"
13+
)
14+
15+
// Output benchmark foramatted results.
16+
func printResults(results Results) {
17+
fmt.Printf("\nResults:\n")
18+
fmt.Printf("\tSuccess operations: %d\n", results.successResultCount)
19+
fmt.Printf("\tFailed operations: %d\n", results.failedResultCount)
20+
fmt.Printf("\tRequest count: %d\n", results.handledRequestsCount)
21+
fmt.Printf("\tTime (seconds): %f\n", results.duration)
22+
fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond)
23+
}
24+
25+
// Delete and create specified space for benchmark.
26+
func spacePreset(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) error {
27+
dropSpace(ctx, tarantoolConnection)
28+
return createSpace(tarantoolConnection)
29+
}
30+
31+
// Increases the counter of successful/failed requests depending on the presence of an error.
32+
func incrementRequest(err error, results *Results) {
33+
if err == nil {
34+
results.successResultCount++
35+
} else {
36+
results.failedResultCount++
37+
}
38+
results.handledRequestsCount++
39+
}
40+
41+
// Сontinuously executes the specified query until the benchmark time runs out.
42+
func requestsLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) {
43+
for {
44+
select {
45+
case <-backgroundCtx.Done():
46+
return
47+
default:
48+
_, err := tarantoolConnection.Exec(
49+
tarantool.Insert(
50+
benchSpaceName,
51+
[]interface{}{common.RandomString(ctx.KeySize), common.RandomString(ctx.DataSize)}))
52+
incrementRequest(err, results)
53+
}
54+
}
55+
}
56+
57+
// Represents a separate connection with a certain number of parallel requests.
58+
func connectionLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) {
59+
var connectionWait sync.WaitGroup
60+
for i := 0; i < ctx.SimultaneousRequests; i++ {
61+
connectionWait.Add(1)
62+
go func() {
63+
defer connectionWait.Done()
64+
requestsLoop(ctx, tarantoolConnection, results, backgroundCtx)
65+
}()
66+
}
67+
68+
connectionWait.Wait()
69+
}
70+
71+
// Main benchmark function.
72+
func Run(ctx context.BenchCtx) error {
73+
rand.Seed(time.Now().UnixNano())
74+
75+
// Connect to tarantool and preset space for benchmark.
76+
tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{
77+
User: ctx.User,
78+
Password: ctx.Password,
79+
})
80+
if err != nil {
81+
return fmt.Errorf(
82+
"Couldn't connect to Tarantool %s. Did you forget to enter tarantool -e 'box.cfg ({listen = 3301})'?",
83+
ctx.URL)
84+
}
85+
defer tarantoolConnection.Close()
86+
87+
printConfig(ctx, tarantoolConnection)
88+
89+
if err := spacePreset(ctx, tarantoolConnection); err != nil {
90+
return err
91+
}
92+
93+
// Create connections before detaching.
94+
connectionPool := make([]*tarantool.Connection, ctx.Connections)
95+
for i := 0; i < ctx.Connections; i++ {
96+
connectionPool[i], err = tarantool.Connect(ctx.URL, tarantool.Opts{
97+
User: ctx.User,
98+
Password: ctx.Password,
99+
})
100+
if err != nil {
101+
return err
102+
}
103+
defer connectionPool[i].Close()
104+
}
105+
106+
fmt.Println("Benchmark start")
107+
fmt.Println("...")
108+
109+
backgroundCtx, cancel := bctx.WithCancel(bctx.Background())
110+
var waitGroup sync.WaitGroup
111+
results := Results{}
112+
connectionId := 0
113+
114+
startTime := time.Now()
115+
timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second)))
116+
117+
// Start detached connections.
118+
for i := 0; i < ctx.Connections; i++ {
119+
waitGroup.Add(1)
120+
connection := connectionPool[connectionId]
121+
go func() {
122+
defer waitGroup.Done()
123+
connectionLoop(ctx, connection, &results, backgroundCtx)
124+
}()
125+
connectionId++
126+
}
127+
// Waiting for the time runs out.
128+
<-timer.C
129+
cancel()
130+
waitGroup.Wait()
131+
132+
results.duration = time.Since(startTime).Seconds()
133+
results.requestsPerSecond = int(float64(results.handledRequestsCount) / results.duration)
134+
135+
dropSpace(ctx, tarantoolConnection)
136+
fmt.Println("Benchmark stop")
137+
138+
printResults(results)
139+
return nil
140+
}

cli/bench/config.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package bench
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"text/tabwriter"
7+
8+
"github.com/FZambia/tarantool"
9+
"github.com/tarantool/cartridge-cli/cli/context"
10+
)
11+
12+
var (
13+
benchSpaceName = "__benchmark_space__"
14+
benchSpacePrimaryIndexName = "__bench_primary_key__"
15+
)
16+
17+
// Output formatted config parameters.
18+
func printConfig(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) {
19+
fmt.Printf("%s\n", tarantoolConnection.Greeting().Version)
20+
fmt.Printf("Parameters:\n")
21+
fmt.Printf("\tURL: %s\n", ctx.URL)
22+
fmt.Printf("\tuser: %s\n", ctx.User)
23+
fmt.Printf("\tconnections: %d\n", ctx.Connections)
24+
fmt.Printf("\tsimultaneous requests: %d\n", ctx.SimultaneousRequests)
25+
fmt.Printf("\tduration: %d seconds\n", ctx.Duration)
26+
fmt.Printf("\tkey size: %d bytes\n", ctx.KeySize)
27+
fmt.Printf("\tdata size: %d bytes\n", ctx.DataSize)
28+
fmt.Printf("Data schema\n")
29+
w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)
30+
fmt.Fprintf(w, "|\tkey\t|\tvalue\n")
31+
fmt.Fprintf(w, "|\t------------------------------\t|\t------------------------------\n")
32+
fmt.Fprintf(w, "|\trandom(%d)\t|\trandom(%d)\n", ctx.KeySize, ctx.DataSize)
33+
w.Flush()
34+
}

cli/bench/space.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package bench
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
7+
"github.com/FZambia/tarantool"
8+
"github.com/tarantool/cartridge-cli/cli/context"
9+
)
10+
11+
// Create specified benchmark space with formatting and primary index.
12+
func createSpace(tarantoolConnection *tarantool.Connection) error {
13+
// Creationg space.
14+
createCommand := "return box.schema.space.create(...).name"
15+
_, err := tarantoolConnection.Exec(tarantool.Eval(createCommand, []interface{}{benchSpaceName, map[string]bool{"if_not_exists": true}}))
16+
if err != nil {
17+
return err
18+
}
19+
20+
// Formatting space.
21+
formatCommand := fmt.Sprintf("box.space.%s:format", benchSpaceName)
22+
_, err = tarantoolConnection.Exec(tarantool.Call(formatCommand, [][]map[string]string{
23+
{
24+
{"name": "key", "type": "string"},
25+
{"name": "value", "type": "string"},
26+
},
27+
}))
28+
if err != nil {
29+
return err
30+
}
31+
32+
// Create primary index.
33+
createIndexCommand := fmt.Sprintf("box.space.%s:create_index", benchSpaceName)
34+
_, err = tarantoolConnection.Exec(tarantool.Call(createIndexCommand, []interface{}{
35+
benchSpacePrimaryIndexName,
36+
map[string]interface{}{
37+
"parts": []string{"key"},
38+
"if_not_exists": true,
39+
},
40+
}))
41+
return err
42+
}
43+
44+
// Delete specified benchmark space.
45+
func dropSpace(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) error {
46+
checkCommand := fmt.Sprintf("return box.space.%s.index[0].name", benchSpaceName)
47+
indexName, err := tarantoolConnection.Exec(tarantool.Eval(checkCommand, []interface{}{}))
48+
if err != nil {
49+
return err
50+
}
51+
if reflect.ValueOf(indexName.Data).Index(0).Elem().String() == benchSpacePrimaryIndexName {
52+
dropCommand := fmt.Sprintf("box.space.%s:drop", benchSpaceName)
53+
_, err := tarantoolConnection.Exec(tarantool.Call(dropCommand, []interface{}{}))
54+
return err
55+
}
56+
return nil
57+
}

cli/bench/types.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package bench
2+
3+
// Describes set of benchmark results.
4+
type Results struct {
5+
handledRequestsCount int // count of all executed requests.
6+
successResultCount int // count of successful request in all connections.
7+
failedResultCount int // count of failed request in all connections.
8+
duration float64 // benchmark duration.
9+
requestsPerSecond int // number of requests per second - the main measured value.
10+
}

cli/commands/bench.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package commands
2+
3+
import (
4+
"github.com/apex/log"
5+
"github.com/spf13/cobra"
6+
"github.com/tarantool/cartridge-cli/cli/bench"
7+
)
8+
9+
func init() {
10+
var benchCmd = &cobra.Command{
11+
Use: "bench",
12+
Short: "Util for running benchmarks for Tarantool",
13+
Long: "Benchmark utility that simulates running commands done by N clients at the same time sending M simultaneous queries",
14+
Run: func(cmd *cobra.Command, args []string) {
15+
if err := bench.Run(ctx.Bench); err != nil {
16+
log.Fatalf(err.Error())
17+
}
18+
},
19+
}
20+
rootCmd.AddCommand(benchCmd)
21+
22+
configureFlags(benchCmd)
23+
24+
benchCmd.Flags().StringVar(&ctx.Bench.URL, "url", "127.0.0.1:3301", "Tarantool address")
25+
benchCmd.Flags().StringVar(&ctx.Bench.User, "user", "guest", "Tarantool user for connection")
26+
benchCmd.Flags().StringVar(&ctx.Bench.Password, "password", "", "Tarantool password for connection")
27+
28+
benchCmd.Flags().IntVar(&ctx.Bench.Connections, "connections", 10, "Number of concurrent connections")
29+
benchCmd.Flags().IntVar(&ctx.Bench.SimultaneousRequests, "requests", 10, "Number of simultaneous requests per connection")
30+
benchCmd.Flags().IntVar(&ctx.Bench.Duration, "duration", 10, "Duration of benchmark test (seconds)")
31+
benchCmd.Flags().IntVar(&ctx.Bench.KeySize, "keysize", 10, "Size of key part of benchmark data (bytes)")
32+
benchCmd.Flags().IntVar(&ctx.Bench.DataSize, "datasize", 20, "Size of value part of benchmark data (bytes)")
33+
}

cli/context/context.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Ctx struct {
2121
Replicasets ReplicasetsCtx
2222
Connect ConnectCtx
2323
Failover FailoverCtx
24+
Bench BenchCtx
2425
}
2526

2627
type ProjectCtx struct {
@@ -173,3 +174,14 @@ type FailoverCtx struct {
173174
ParamsJSON string
174175
ProviderParamsJSON string
175176
}
177+
178+
type BenchCtx struct {
179+
URL string
180+
User string
181+
Password string
182+
Connections int
183+
SimultaneousRequests int
184+
Duration int
185+
KeySize int
186+
DataSize int
187+
}

test/integration/bench/test_bench.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import os
2+
import signal
3+
import time
4+
from socket import AF_INET, SOCK_STREAM, socket
5+
from subprocess import PIPE, STDOUT, Popen
6+
from threading import Thread
7+
8+
from utils import consume_lines, run_command_and_get_output
9+
10+
11+
def test_bench(cartridge_cmd, request, tmpdir):
12+
base_cmd = [cartridge_cmd, 'bench', '--duration=1']
13+
tarantool_cmd = [
14+
"tarantool",
15+
"-e", f"""box.cfg{{listen="127.0.0.1:3301",work_dir=[[{tmpdir}]]}}""",
16+
"-e", """box.schema.user.grant("guest","super",nil,nil,{if_not_exists=true})"""
17+
]
18+
19+
env = os.environ.copy()
20+
process = Popen(tarantool_cmd, stdout=PIPE, stderr=STDOUT, env=env)
21+
thread = Thread(target=consume_lines, args=["3301", process.stdout])
22+
thread.start()
23+
24+
def kill():
25+
process.send_signal(signal.SIGKILL)
26+
if thread is not None:
27+
thread.join(5)
28+
request.addfinalizer(kill)
29+
30+
tnt_socket = socket(AF_INET, SOCK_STREAM)
31+
attempt = 10
32+
while attempt > 0:
33+
try:
34+
time.sleep(1)
35+
tnt_socket.connect(('127.0.0.1', 3301))
36+
except Exception:
37+
attempt -= 1
38+
39+
rc, output = run_command_and_get_output(base_cmd, cwd=tmpdir)
40+
assert rc == 0

test/utils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import glob
22
import gzip
33
import json
4+
import logging
45
import os
56
import re
67
import shutil
@@ -1394,3 +1395,10 @@ def get_response_data(response):
13941395
assert 'errors' not in response_json, response_json
13951396

13961397
return response_json['data']
1398+
1399+
1400+
def consume_lines(port, pipe):
1401+
logger = logging.getLogger(f'localhost:{port}')
1402+
with pipe:
1403+
for line in iter(pipe.readline, b''):
1404+
logger.warning(line.rstrip().decode('utf-8'))

0 commit comments

Comments
 (0)