From 88ea630ab1ba9d6d1f1b5f60fd88ac01ab81a160 Mon Sep 17 00:00:00 2001 From: Kirill-Churkin Date: Tue, 7 Dec 2021 13:31:49 +0300 Subject: [PATCH] bench: add benchmark test functionality for Tarantool User setup Tarantool single node or cluster and try to understand "How many specific traffic Tarantool can handle on this hardware" The same official things are for redis, postgresql and aerospike. Cartridge bench module makes some load for Tarantool. user@cartridge-cli % ./cartridge bench Tarantool 2.8.2 (Binary) f4897ffe-98dd-40fc-a6f2-21ca8bb52fe7 Parameters: URL: 127.0.0.1:3301 user: guest connections: 10 simultaneous requests: 10 duration: 10 seconds key size: 10 bytes data size: 20 bytes Data schema | key | value ------------------------------------------ | random(10) | random(20) Benchmark start ... Benchmark stop Results: Success operations: 1169481 Failed operations: 0 Request count: 1170485 Time (seconds): 10.000551801 Requests per second: 117042 Part of #645 --- CHANGELOG.md | 1 + cli/bench/bench.go | 138 +++++++++++++++++++++++++++ cli/bench/config.go | 34 +++++++ cli/bench/space.go | 56 +++++++++++ cli/bench/types.go | 10 ++ cli/commands/bench.go | 33 +++++++ cli/context/context.go | 12 +++ test/integration/bench/test_bench.py | 38 ++++++++ test/utils.py | 8 ++ 9 files changed, 330 insertions(+) create mode 100644 cli/bench/bench.go create mode 100644 cli/bench/config.go create mode 100644 cli/bench/space.go create mode 100644 cli/bench/types.go create mode 100644 cli/commands/bench.go create mode 100644 test/integration/bench/test_bench.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 797e5f845..b5e71a16d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Tarantool benchmark tool (early alpha, API can be changed in the near future). - Ability to reverse search in ``cartridge enter`` and ``cartridge connect`` commands. - Added support for functionality from Golang 1.17. diff --git a/cli/bench/bench.go b/cli/bench/bench.go new file mode 100644 index 000000000..5b1e32851 --- /dev/null +++ b/cli/bench/bench.go @@ -0,0 +1,138 @@ +package bench + +import ( + bctx "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/common" + "github.com/tarantool/cartridge-cli/cli/context" +) + +// printResults outputs benchmark foramatted results. +func printResults(results Results) { + fmt.Printf("\nResults:\n") + fmt.Printf("\tSuccess operations: %d\n", results.successResultCount) + fmt.Printf("\tFailed operations: %d\n", results.failedResultCount) + fmt.Printf("\tRequest count: %d\n", results.handledRequestsCount) + fmt.Printf("\tTime (seconds): %f\n", results.duration) + fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond) +} + +// spacePreset prepares space for a benchmark. +func spacePreset(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) error { + dropBenchmarkSpace(tarantoolConnection) + return createBenchmarkSpace(tarantoolConnection) +} + +// incrementRequest increases the counter of successful/failed requests depending on the presence of an error. +func incrementRequest(err error, results *Results) { + if err == nil { + results.successResultCount++ + } else { + results.failedResultCount++ + } + results.handledRequestsCount++ +} + +// requestsLoop continuously executes the insert query until the benchmark time runs out. +func requestsLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) { + for { + select { + case <-backgroundCtx.Done(): + return + default: + _, err := tarantoolConnection.Exec( + tarantool.Insert( + benchSpaceName, + []interface{}{common.RandomString(ctx.KeySize), common.RandomString(ctx.DataSize)})) + incrementRequest(err, results) + } + } +} + +// connectionLoop runs "ctx.SimultaneousRequests" requests execution threads through the same connection. +func connectionLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) { + var connectionWait sync.WaitGroup + for i := 0; i < ctx.SimultaneousRequests; i++ { + connectionWait.Add(1) + go func() { + defer connectionWait.Done() + requestsLoop(ctx, tarantoolConnection, results, backgroundCtx) + }() + } + + connectionWait.Wait() +} + +// Main benchmark function. +func Run(ctx context.BenchCtx) error { + rand.Seed(time.Now().UnixNano()) + + // Connect to tarantool and preset space for benchmark. + tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{ + User: ctx.User, + Password: ctx.Password, + }) + if err != nil { + return fmt.Errorf( + "Couldn't connect to Tarantool %s.", + ctx.URL) + } + defer tarantoolConnection.Close() + + printConfig(ctx, tarantoolConnection) + + if err := spacePreset(ctx, tarantoolConnection); err != nil { + return err + } + + /// Сreate a "connectionPool" before starting the benchmark to exclude the connection establishment time from measurements. + connectionPool := make([]*tarantool.Connection, ctx.Connections) + for i := 0; i < ctx.Connections; i++ { + connectionPool[i], err = tarantool.Connect(ctx.URL, tarantool.Opts{ + User: ctx.User, + Password: ctx.Password, + }) + if err != nil { + return err + } + defer connectionPool[i].Close() + } + + fmt.Println("Benchmark start") + fmt.Println("...") + + // The "context" will be used to stop all "connectionLoop" when the time is out. + backgroundCtx, cancel := bctx.WithCancel(bctx.Background()) + var waitGroup sync.WaitGroup + results := Results{} + + startTime := time.Now() + timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second))) + + // Start detached connections. + for i := 0; i < ctx.Connections; i++ { + waitGroup.Add(1) + go func(connection *tarantool.Connection) { + defer waitGroup.Done() + connectionLoop(ctx, connection, &results, backgroundCtx) + }(connectionPool[i]) + } + // Sends "signal" to all "connectionLoop" and waits for them to complete. + <-timer.C + cancel() + waitGroup.Wait() + + results.duration = time.Since(startTime).Seconds() + results.requestsPerSecond = int(float64(results.handledRequestsCount) / results.duration) + + dropBenchmarkSpace(tarantoolConnection) + fmt.Println("Benchmark stop") + + printResults(results) + return nil +} diff --git a/cli/bench/config.go b/cli/bench/config.go new file mode 100644 index 000000000..dae0c8005 --- /dev/null +++ b/cli/bench/config.go @@ -0,0 +1,34 @@ +package bench + +import ( + "fmt" + "os" + "text/tabwriter" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/context" +) + +var ( + benchSpaceName = "__benchmark_space__" + benchSpacePrimaryIndexName = "__bench_primary_key__" +) + +// printConfig output formatted config parameters. +func printConfig(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) { + fmt.Printf("%s\n", tarantoolConnection.Greeting().Version) + fmt.Printf("Parameters:\n") + fmt.Printf("\tURL: %s\n", ctx.URL) + fmt.Printf("\tuser: %s\n", ctx.User) + fmt.Printf("\tconnections: %d\n", ctx.Connections) + fmt.Printf("\tsimultaneous requests: %d\n", ctx.SimultaneousRequests) + fmt.Printf("\tduration: %d seconds\n", ctx.Duration) + fmt.Printf("\tkey size: %d bytes\n", ctx.KeySize) + fmt.Printf("\tdata size: %d bytes\n", ctx.DataSize) + fmt.Printf("Data schema\n") + w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) + fmt.Fprintf(w, "|\tkey\t|\tvalue\n") + fmt.Fprintf(w, "|\t------------------------------\t|\t------------------------------\n") + fmt.Fprintf(w, "|\trandom(%d)\t|\trandom(%d)\n", ctx.KeySize, ctx.DataSize) + w.Flush() +} diff --git a/cli/bench/space.go b/cli/bench/space.go new file mode 100644 index 000000000..27211b244 --- /dev/null +++ b/cli/bench/space.go @@ -0,0 +1,56 @@ +package bench + +import ( + "fmt" + "reflect" + + "github.com/FZambia/tarantool" +) + +// createBenchmarkSpace creates benchmark space with formatting and primary index. +func createBenchmarkSpace(tarantoolConnection *tarantool.Connection) error { + // Creating space. + createCommand := "return box.schema.space.create(...).name" + _, err := tarantoolConnection.Exec(tarantool.Eval(createCommand, []interface{}{benchSpaceName, map[string]bool{"if_not_exists": true}})) + if err != nil { + return err + } + + // Formatting space. + formatCommand := fmt.Sprintf("box.space.%s:format", benchSpaceName) + _, err = tarantoolConnection.Exec(tarantool.Call(formatCommand, [][]map[string]string{ + { + {"name": "key", "type": "string"}, + {"name": "value", "type": "string"}, + }, + })) + if err != nil { + return err + } + + // Creating primary index. + createIndexCommand := fmt.Sprintf("box.space.%s:create_index", benchSpaceName) + _, err = tarantoolConnection.Exec(tarantool.Call(createIndexCommand, []interface{}{ + benchSpacePrimaryIndexName, + map[string]interface{}{ + "parts": []string{"key"}, + "if_not_exists": true, + }, + })) + return err +} + +// dropBenchmarkSpace deletes benchmark space. +func dropBenchmarkSpace(tarantoolConnection *tarantool.Connection) error { + checkCommand := fmt.Sprintf("return box.space.%s.index[0].name", benchSpaceName) + indexName, err := tarantoolConnection.Exec(tarantool.Eval(checkCommand, []interface{}{})) + if err != nil { + return err + } + if reflect.ValueOf(indexName.Data).Index(0).Elem().String() == benchSpacePrimaryIndexName { + dropCommand := fmt.Sprintf("box.space.%s:drop", benchSpaceName) + _, err := tarantoolConnection.Exec(tarantool.Call(dropCommand, []interface{}{})) + return err + } + return nil +} diff --git a/cli/bench/types.go b/cli/bench/types.go new file mode 100644 index 000000000..8398b2ab5 --- /dev/null +++ b/cli/bench/types.go @@ -0,0 +1,10 @@ +package bench + +// Results describes set of benchmark results. +type Results struct { + handledRequestsCount int // Count of all executed requests. + successResultCount int // Count of successful request in all connections. + failedResultCount int // Count of failed request in all connections. + duration float64 // Benchmark duration. + requestsPerSecond int // Cumber of requests per second - the main measured value. +} diff --git a/cli/commands/bench.go b/cli/commands/bench.go new file mode 100644 index 000000000..8e408a6e8 --- /dev/null +++ b/cli/commands/bench.go @@ -0,0 +1,33 @@ +package commands + +import ( + "github.com/apex/log" + "github.com/spf13/cobra" + "github.com/tarantool/cartridge-cli/cli/bench" +) + +func init() { + var benchCmd = &cobra.Command{ + Use: "bench", + Short: "Util for running benchmarks for Tarantool", + Long: "Benchmark utility that simulates running commands done by N clients at the same time sending M simultaneous queries", + Run: func(cmd *cobra.Command, args []string) { + if err := bench.Run(ctx.Bench); err != nil { + log.Fatalf(err.Error()) + } + }, + } + rootCmd.AddCommand(benchCmd) + + configureFlags(benchCmd) + + benchCmd.Flags().StringVar(&ctx.Bench.URL, "url", "127.0.0.1:3301", "Tarantool address") + benchCmd.Flags().StringVar(&ctx.Bench.User, "user", "guest", "Tarantool user for connection") + benchCmd.Flags().StringVar(&ctx.Bench.Password, "password", "", "Tarantool password for connection") + + benchCmd.Flags().IntVar(&ctx.Bench.Connections, "connections", 10, "Number of concurrent connections") + benchCmd.Flags().IntVar(&ctx.Bench.SimultaneousRequests, "requests", 10, "Number of simultaneous requests per connection") + benchCmd.Flags().IntVar(&ctx.Bench.Duration, "duration", 10, "Duration of benchmark test (seconds)") + benchCmd.Flags().IntVar(&ctx.Bench.KeySize, "keysize", 10, "Size of key part of benchmark data (bytes)") + benchCmd.Flags().IntVar(&ctx.Bench.DataSize, "datasize", 20, "Size of value part of benchmark data (bytes)") +} diff --git a/cli/context/context.go b/cli/context/context.go index bb1a2eca1..69859a1c3 100644 --- a/cli/context/context.go +++ b/cli/context/context.go @@ -21,6 +21,7 @@ type Ctx struct { Replicasets ReplicasetsCtx Connect ConnectCtx Failover FailoverCtx + Bench BenchCtx } type ProjectCtx struct { @@ -173,3 +174,14 @@ type FailoverCtx struct { ParamsJSON string ProviderParamsJSON string } + +type BenchCtx struct { + URL string // URL - the URL of the tarantool used for testing + User string // User - username to connect to the tarantool. + Password string // Password to connect to the tarantool. + Connections int // Connections describes the number of connection to be used in the test. + SimultaneousRequests int // SimultaneousRequests describes the number of parallel requests from one connection. + Duration int // Duration describes test duration in seconds. + KeySize int // DataSize describes the size of key part of benchmark data (bytes). + DataSize int // DataSize describes the size of value part of benchmark data (bytes). +} diff --git a/test/integration/bench/test_bench.py b/test/integration/bench/test_bench.py new file mode 100644 index 000000000..c0a8cb247 --- /dev/null +++ b/test/integration/bench/test_bench.py @@ -0,0 +1,38 @@ +import os +import signal +import socket +from subprocess import PIPE, STDOUT, Popen +from threading import Thread + +import tenacity +from utils import consume_lines, run_command_and_get_output + + +@tenacity.retry(stop=tenacity.stop_after_delay(15), wait=tenacity.wait_fixed(1)) +def wait_for_connect(): + socket.create_connection(('127.0.0.1', 3301)) + + +def test_bench(cartridge_cmd, request, tmpdir): + base_cmd = [cartridge_cmd, 'bench', '--duration=1'] + tarantool_cmd = [ + "tarantool", + "-e", f"""box.cfg{{listen="127.0.0.1:3301",work_dir=[[{tmpdir}]]}}""", + "-e", """box.schema.user.grant("guest","super",nil,nil,{if_not_exists=true})""" + ] + + env = os.environ.copy() + process = Popen(tarantool_cmd, stdout=PIPE, stderr=STDOUT, env=env) + thread = Thread(target=consume_lines, args=["3301", process.stdout]) + thread.start() + + def kill(): + process.send_signal(signal.SIGKILL) + if thread is not None: + thread.join(5) + request.addfinalizer(kill) + + wait_for_connect() + + rc, output = run_command_and_get_output(base_cmd, cwd=tmpdir) + assert rc == 0 diff --git a/test/utils.py b/test/utils.py index 61ddca47c..a4dfe81e6 100644 --- a/test/utils.py +++ b/test/utils.py @@ -1,6 +1,7 @@ import glob import gzip import json +import logging import os import re import shutil @@ -1438,3 +1439,10 @@ def get_tarantool_installer_cmd(package_manager): return f"curl -L https://tarantool.io/installer.sh | \ VER={short_version} bash -s -- --type {tarantool_type} \ && {package_manager} install -y tarantool" + + +def consume_lines(port, pipe): + logger = logging.getLogger(f'localhost:{port}') + with pipe: + for line in iter(pipe.readline, b''): + logger.warning(line.rstrip().decode('utf-8'))