diff --git a/CHANGELOG.md b/CHANGELOG.md index 797e5f845..b5e71a16d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Tarantool benchmark tool (early alpha, API can be changed in the near future). - Ability to reverse search in ``cartridge enter`` and ``cartridge connect`` commands. - Added support for functionality from Golang 1.17. diff --git a/cli/bench/bench.go b/cli/bench/bench.go new file mode 100644 index 000000000..5b1e32851 --- /dev/null +++ b/cli/bench/bench.go @@ -0,0 +1,138 @@ +package bench + +import ( + bctx "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/common" + "github.com/tarantool/cartridge-cli/cli/context" +) + +// printResults outputs benchmark foramatted results. +func printResults(results Results) { + fmt.Printf("\nResults:\n") + fmt.Printf("\tSuccess operations: %d\n", results.successResultCount) + fmt.Printf("\tFailed operations: %d\n", results.failedResultCount) + fmt.Printf("\tRequest count: %d\n", results.handledRequestsCount) + fmt.Printf("\tTime (seconds): %f\n", results.duration) + fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond) +} + +// spacePreset prepares space for a benchmark. +func spacePreset(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) error { + dropBenchmarkSpace(tarantoolConnection) + return createBenchmarkSpace(tarantoolConnection) +} + +// incrementRequest increases the counter of successful/failed requests depending on the presence of an error. +func incrementRequest(err error, results *Results) { + if err == nil { + results.successResultCount++ + } else { + results.failedResultCount++ + } + results.handledRequestsCount++ +} + +// requestsLoop continuously executes the insert query until the benchmark time runs out. +func requestsLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) { + for { + select { + case <-backgroundCtx.Done(): + return + default: + _, err := tarantoolConnection.Exec( + tarantool.Insert( + benchSpaceName, + []interface{}{common.RandomString(ctx.KeySize), common.RandomString(ctx.DataSize)})) + incrementRequest(err, results) + } + } +} + +// connectionLoop runs "ctx.SimultaneousRequests" requests execution threads through the same connection. +func connectionLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) { + var connectionWait sync.WaitGroup + for i := 0; i < ctx.SimultaneousRequests; i++ { + connectionWait.Add(1) + go func() { + defer connectionWait.Done() + requestsLoop(ctx, tarantoolConnection, results, backgroundCtx) + }() + } + + connectionWait.Wait() +} + +// Main benchmark function. +func Run(ctx context.BenchCtx) error { + rand.Seed(time.Now().UnixNano()) + + // Connect to tarantool and preset space for benchmark. + tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{ + User: ctx.User, + Password: ctx.Password, + }) + if err != nil { + return fmt.Errorf( + "Couldn't connect to Tarantool %s.", + ctx.URL) + } + defer tarantoolConnection.Close() + + printConfig(ctx, tarantoolConnection) + + if err := spacePreset(ctx, tarantoolConnection); err != nil { + return err + } + + /// Сreate a "connectionPool" before starting the benchmark to exclude the connection establishment time from measurements. + connectionPool := make([]*tarantool.Connection, ctx.Connections) + for i := 0; i < ctx.Connections; i++ { + connectionPool[i], err = tarantool.Connect(ctx.URL, tarantool.Opts{ + User: ctx.User, + Password: ctx.Password, + }) + if err != nil { + return err + } + defer connectionPool[i].Close() + } + + fmt.Println("Benchmark start") + fmt.Println("...") + + // The "context" will be used to stop all "connectionLoop" when the time is out. + backgroundCtx, cancel := bctx.WithCancel(bctx.Background()) + var waitGroup sync.WaitGroup + results := Results{} + + startTime := time.Now() + timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second))) + + // Start detached connections. + for i := 0; i < ctx.Connections; i++ { + waitGroup.Add(1) + go func(connection *tarantool.Connection) { + defer waitGroup.Done() + connectionLoop(ctx, connection, &results, backgroundCtx) + }(connectionPool[i]) + } + // Sends "signal" to all "connectionLoop" and waits for them to complete. + <-timer.C + cancel() + waitGroup.Wait() + + results.duration = time.Since(startTime).Seconds() + results.requestsPerSecond = int(float64(results.handledRequestsCount) / results.duration) + + dropBenchmarkSpace(tarantoolConnection) + fmt.Println("Benchmark stop") + + printResults(results) + return nil +} diff --git a/cli/bench/config.go b/cli/bench/config.go new file mode 100644 index 000000000..dae0c8005 --- /dev/null +++ b/cli/bench/config.go @@ -0,0 +1,34 @@ +package bench + +import ( + "fmt" + "os" + "text/tabwriter" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/context" +) + +var ( + benchSpaceName = "__benchmark_space__" + benchSpacePrimaryIndexName = "__bench_primary_key__" +) + +// printConfig output formatted config parameters. +func printConfig(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) { + fmt.Printf("%s\n", tarantoolConnection.Greeting().Version) + fmt.Printf("Parameters:\n") + fmt.Printf("\tURL: %s\n", ctx.URL) + fmt.Printf("\tuser: %s\n", ctx.User) + fmt.Printf("\tconnections: %d\n", ctx.Connections) + fmt.Printf("\tsimultaneous requests: %d\n", ctx.SimultaneousRequests) + fmt.Printf("\tduration: %d seconds\n", ctx.Duration) + fmt.Printf("\tkey size: %d bytes\n", ctx.KeySize) + fmt.Printf("\tdata size: %d bytes\n", ctx.DataSize) + fmt.Printf("Data schema\n") + w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) + fmt.Fprintf(w, "|\tkey\t|\tvalue\n") + fmt.Fprintf(w, "|\t------------------------------\t|\t------------------------------\n") + fmt.Fprintf(w, "|\trandom(%d)\t|\trandom(%d)\n", ctx.KeySize, ctx.DataSize) + w.Flush() +} diff --git a/cli/bench/space.go b/cli/bench/space.go new file mode 100644 index 000000000..27211b244 --- /dev/null +++ b/cli/bench/space.go @@ -0,0 +1,56 @@ +package bench + +import ( + "fmt" + "reflect" + + "github.com/FZambia/tarantool" +) + +// createBenchmarkSpace creates benchmark space with formatting and primary index. +func createBenchmarkSpace(tarantoolConnection *tarantool.Connection) error { + // Creating space. + createCommand := "return box.schema.space.create(...).name" + _, err := tarantoolConnection.Exec(tarantool.Eval(createCommand, []interface{}{benchSpaceName, map[string]bool{"if_not_exists": true}})) + if err != nil { + return err + } + + // Formatting space. + formatCommand := fmt.Sprintf("box.space.%s:format", benchSpaceName) + _, err = tarantoolConnection.Exec(tarantool.Call(formatCommand, [][]map[string]string{ + { + {"name": "key", "type": "string"}, + {"name": "value", "type": "string"}, + }, + })) + if err != nil { + return err + } + + // Creating primary index. + createIndexCommand := fmt.Sprintf("box.space.%s:create_index", benchSpaceName) + _, err = tarantoolConnection.Exec(tarantool.Call(createIndexCommand, []interface{}{ + benchSpacePrimaryIndexName, + map[string]interface{}{ + "parts": []string{"key"}, + "if_not_exists": true, + }, + })) + return err +} + +// dropBenchmarkSpace deletes benchmark space. +func dropBenchmarkSpace(tarantoolConnection *tarantool.Connection) error { + checkCommand := fmt.Sprintf("return box.space.%s.index[0].name", benchSpaceName) + indexName, err := tarantoolConnection.Exec(tarantool.Eval(checkCommand, []interface{}{})) + if err != nil { + return err + } + if reflect.ValueOf(indexName.Data).Index(0).Elem().String() == benchSpacePrimaryIndexName { + dropCommand := fmt.Sprintf("box.space.%s:drop", benchSpaceName) + _, err := tarantoolConnection.Exec(tarantool.Call(dropCommand, []interface{}{})) + return err + } + return nil +} diff --git a/cli/bench/types.go b/cli/bench/types.go new file mode 100644 index 000000000..8398b2ab5 --- /dev/null +++ b/cli/bench/types.go @@ -0,0 +1,10 @@ +package bench + +// Results describes set of benchmark results. +type Results struct { + handledRequestsCount int // Count of all executed requests. + successResultCount int // Count of successful request in all connections. + failedResultCount int // Count of failed request in all connections. + duration float64 // Benchmark duration. + requestsPerSecond int // Cumber of requests per second - the main measured value. +} diff --git a/cli/commands/bench.go b/cli/commands/bench.go new file mode 100644 index 000000000..8e408a6e8 --- /dev/null +++ b/cli/commands/bench.go @@ -0,0 +1,33 @@ +package commands + +import ( + "github.com/apex/log" + "github.com/spf13/cobra" + "github.com/tarantool/cartridge-cli/cli/bench" +) + +func init() { + var benchCmd = &cobra.Command{ + Use: "bench", + Short: "Util for running benchmarks for Tarantool", + Long: "Benchmark utility that simulates running commands done by N clients at the same time sending M simultaneous queries", + Run: func(cmd *cobra.Command, args []string) { + if err := bench.Run(ctx.Bench); err != nil { + log.Fatalf(err.Error()) + } + }, + } + rootCmd.AddCommand(benchCmd) + + configureFlags(benchCmd) + + benchCmd.Flags().StringVar(&ctx.Bench.URL, "url", "127.0.0.1:3301", "Tarantool address") + benchCmd.Flags().StringVar(&ctx.Bench.User, "user", "guest", "Tarantool user for connection") + benchCmd.Flags().StringVar(&ctx.Bench.Password, "password", "", "Tarantool password for connection") + + benchCmd.Flags().IntVar(&ctx.Bench.Connections, "connections", 10, "Number of concurrent connections") + benchCmd.Flags().IntVar(&ctx.Bench.SimultaneousRequests, "requests", 10, "Number of simultaneous requests per connection") + benchCmd.Flags().IntVar(&ctx.Bench.Duration, "duration", 10, "Duration of benchmark test (seconds)") + benchCmd.Flags().IntVar(&ctx.Bench.KeySize, "keysize", 10, "Size of key part of benchmark data (bytes)") + benchCmd.Flags().IntVar(&ctx.Bench.DataSize, "datasize", 20, "Size of value part of benchmark data (bytes)") +} diff --git a/cli/context/context.go b/cli/context/context.go index bb1a2eca1..69859a1c3 100644 --- a/cli/context/context.go +++ b/cli/context/context.go @@ -21,6 +21,7 @@ type Ctx struct { Replicasets ReplicasetsCtx Connect ConnectCtx Failover FailoverCtx + Bench BenchCtx } type ProjectCtx struct { @@ -173,3 +174,14 @@ type FailoverCtx struct { ParamsJSON string ProviderParamsJSON string } + +type BenchCtx struct { + URL string // URL - the URL of the tarantool used for testing + User string // User - username to connect to the tarantool. + Password string // Password to connect to the tarantool. + Connections int // Connections describes the number of connection to be used in the test. + SimultaneousRequests int // SimultaneousRequests describes the number of parallel requests from one connection. + Duration int // Duration describes test duration in seconds. + KeySize int // DataSize describes the size of key part of benchmark data (bytes). + DataSize int // DataSize describes the size of value part of benchmark data (bytes). +} diff --git a/test/integration/bench/test_bench.py b/test/integration/bench/test_bench.py new file mode 100644 index 000000000..c0a8cb247 --- /dev/null +++ b/test/integration/bench/test_bench.py @@ -0,0 +1,38 @@ +import os +import signal +import socket +from subprocess import PIPE, STDOUT, Popen +from threading import Thread + +import tenacity +from utils import consume_lines, run_command_and_get_output + + +@tenacity.retry(stop=tenacity.stop_after_delay(15), wait=tenacity.wait_fixed(1)) +def wait_for_connect(): + socket.create_connection(('127.0.0.1', 3301)) + + +def test_bench(cartridge_cmd, request, tmpdir): + base_cmd = [cartridge_cmd, 'bench', '--duration=1'] + tarantool_cmd = [ + "tarantool", + "-e", f"""box.cfg{{listen="127.0.0.1:3301",work_dir=[[{tmpdir}]]}}""", + "-e", """box.schema.user.grant("guest","super",nil,nil,{if_not_exists=true})""" + ] + + env = os.environ.copy() + process = Popen(tarantool_cmd, stdout=PIPE, stderr=STDOUT, env=env) + thread = Thread(target=consume_lines, args=["3301", process.stdout]) + thread.start() + + def kill(): + process.send_signal(signal.SIGKILL) + if thread is not None: + thread.join(5) + request.addfinalizer(kill) + + wait_for_connect() + + rc, output = run_command_and_get_output(base_cmd, cwd=tmpdir) + assert rc == 0 diff --git a/test/utils.py b/test/utils.py index 61ddca47c..a4dfe81e6 100644 --- a/test/utils.py +++ b/test/utils.py @@ -1,6 +1,7 @@ import glob import gzip import json +import logging import os import re import shutil @@ -1438,3 +1439,10 @@ def get_tarantool_installer_cmd(package_manager): return f"curl -L https://tarantool.io/installer.sh | \ VER={short_version} bash -s -- --type {tarantool_type} \ && {package_manager} install -y tarantool" + + +def consume_lines(port, pipe): + logger = logging.getLogger(f'localhost:{port}') + with pipe: + for line in iter(pipe.readline, b''): + logger.warning(line.rstrip().decode('utf-8'))