Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bench: Add benchmark test functionality for single node Tarantool topology #656

Merged
merged 1 commit into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Tarantool benchmark tool (early alpha, API can be changed in the near future).
- Ability to reverse search in ``cartridge enter`` and ``cartridge connect`` commands.
- Added support for functionality from Golang 1.17.

Expand Down
138 changes: 138 additions & 0 deletions cli/bench/bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package bench

import (
bctx "context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/FZambia/tarantool"
"github.com/tarantool/cartridge-cli/cli/common"
"github.com/tarantool/cartridge-cli/cli/context"
)

// printResults outputs benchmark foramatted results.
func printResults(results Results) {
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("\nResults:\n")
fmt.Printf("\tSuccess operations: %d\n", results.successResultCount)
fmt.Printf("\tFailed operations: %d\n", results.failedResultCount)
fmt.Printf("\tRequest count: %d\n", results.handledRequestsCount)
fmt.Printf("\tTime (seconds): %f\n", results.duration)
fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond)
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved
}

// spacePreset prepares space for a benchmark.
func spacePreset(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) error {
dropBenchmarkSpace(tarantoolConnection)
return createBenchmarkSpace(tarantoolConnection)
}

// incrementRequest increases the counter of successful/failed requests depending on the presence of an error.
func incrementRequest(err error, results *Results) {
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
results.successResultCount++
} else {
results.failedResultCount++
}
results.handledRequestsCount++
}
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved

// requestsLoop continuously executes the insert query until the benchmark time runs out.
func requestsLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) {
for {
select {
case <-backgroundCtx.Done():
return
default:
_, err := tarantoolConnection.Exec(
tarantool.Insert(
benchSpaceName,
[]interface{}{common.RandomString(ctx.KeySize), common.RandomString(ctx.DataSize)}))
incrementRequest(err, results)
}
}
}

// connectionLoop runs "ctx.SimultaneousRequests" requests execution threads through the same connection.
func connectionLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) {
var connectionWait sync.WaitGroup
for i := 0; i < ctx.SimultaneousRequests; i++ {
connectionWait.Add(1)
go func() {
defer connectionWait.Done()
requestsLoop(ctx, tarantoolConnection, results, backgroundCtx)
}()
}

connectionWait.Wait()
}

// Main benchmark function.
func Run(ctx context.BenchCtx) error {
rand.Seed(time.Now().UnixNano())

// Connect to tarantool and preset space for benchmark.
tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{
User: ctx.User,
Password: ctx.Password,
})
if err != nil {
return fmt.Errorf(
"Couldn't connect to Tarantool %s.",
ctx.URL)
}
defer tarantoolConnection.Close()

printConfig(ctx, tarantoolConnection)

if err := spacePreset(ctx, tarantoolConnection); err != nil {
return err
}

/// Сreate a "connectionPool" before starting the benchmark to exclude the connection establishment time from measurements.
connectionPool := make([]*tarantool.Connection, ctx.Connections)
for i := 0; i < ctx.Connections; i++ {
connectionPool[i], err = tarantool.Connect(ctx.URL, tarantool.Opts{
User: ctx.User,
Password: ctx.Password,
})
if err != nil {
return err
}
defer connectionPool[i].Close()
}

fmt.Println("Benchmark start")
fmt.Println("...")

// The "context" will be used to stop all "connectionLoop" when the time is out.
backgroundCtx, cancel := bctx.WithCancel(bctx.Background())
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved
var waitGroup sync.WaitGroup
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved
results := Results{}
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved

startTime := time.Now()
timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second)))

// Start detached connections.
for i := 0; i < ctx.Connections; i++ {
waitGroup.Add(1)
go func(connection *tarantool.Connection) {
defer waitGroup.Done()
connectionLoop(ctx, connection, &results, backgroundCtx)
}(connectionPool[i])
}
// Sends "signal" to all "connectionLoop" and waits for them to complete.
<-timer.C
cancel()
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved
waitGroup.Wait()

results.duration = time.Since(startTime).Seconds()
results.requestsPerSecond = int(float64(results.handledRequestsCount) / results.duration)

dropBenchmarkSpace(tarantoolConnection)
fmt.Println("Benchmark stop")

printResults(results)
return nil
}
34 changes: 34 additions & 0 deletions cli/bench/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package bench

import (
"fmt"
"os"
"text/tabwriter"

"github.com/FZambia/tarantool"
"github.com/tarantool/cartridge-cli/cli/context"
)

var (
benchSpaceName = "__benchmark_space__"
benchSpacePrimaryIndexName = "__bench_primary_key__"
)

// printConfig output formatted config parameters.
func printConfig(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) {
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("%s\n", tarantoolConnection.Greeting().Version)
fmt.Printf("Parameters:\n")
fmt.Printf("\tURL: %s\n", ctx.URL)
fmt.Printf("\tuser: %s\n", ctx.User)
fmt.Printf("\tconnections: %d\n", ctx.Connections)
fmt.Printf("\tsimultaneous requests: %d\n", ctx.SimultaneousRequests)
fmt.Printf("\tduration: %d seconds\n", ctx.Duration)
fmt.Printf("\tkey size: %d bytes\n", ctx.KeySize)
fmt.Printf("\tdata size: %d bytes\n", ctx.DataSize)
filonenko-mikhail marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("Data schema\n")
filonenko-mikhail marked this conversation as resolved.
Show resolved Hide resolved
w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "|\tkey\t|\tvalue\n")
fmt.Fprintf(w, "|\t------------------------------\t|\t------------------------------\n")
fmt.Fprintf(w, "|\trandom(%d)\t|\trandom(%d)\n", ctx.KeySize, ctx.DataSize)
w.Flush()
}
56 changes: 56 additions & 0 deletions cli/bench/space.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package bench

import (
"fmt"
"reflect"

"github.com/FZambia/tarantool"
)

// createBenchmarkSpace creates benchmark space with formatting and primary index.
func createBenchmarkSpace(tarantoolConnection *tarantool.Connection) error {
// Creating space.
createCommand := "return box.schema.space.create(...).name"
_, err := tarantoolConnection.Exec(tarantool.Eval(createCommand, []interface{}{benchSpaceName, map[string]bool{"if_not_exists": true}}))
if err != nil {
return err
}

// Formatting space.
formatCommand := fmt.Sprintf("box.space.%s:format", benchSpaceName)
_, err = tarantoolConnection.Exec(tarantool.Call(formatCommand, [][]map[string]string{
{
{"name": "key", "type": "string"},
{"name": "value", "type": "string"},
},
}))
if err != nil {
return err
}

// Creating primary index.
createIndexCommand := fmt.Sprintf("box.space.%s:create_index", benchSpaceName)
_, err = tarantoolConnection.Exec(tarantool.Call(createIndexCommand, []interface{}{
mRrvz marked this conversation as resolved.
Show resolved Hide resolved
benchSpacePrimaryIndexName,
map[string]interface{}{
"parts": []string{"key"},
"if_not_exists": true,
},
}))
return err
}

// dropBenchmarkSpace deletes benchmark space.
func dropBenchmarkSpace(tarantoolConnection *tarantool.Connection) error {
checkCommand := fmt.Sprintf("return box.space.%s.index[0].name", benchSpaceName)
indexName, err := tarantoolConnection.Exec(tarantool.Eval(checkCommand, []interface{}{}))
if err != nil {
return err
}
if reflect.ValueOf(indexName.Data).Index(0).Elem().String() == benchSpacePrimaryIndexName {
dropCommand := fmt.Sprintf("box.space.%s:drop", benchSpaceName)
_, err := tarantoolConnection.Exec(tarantool.Call(dropCommand, []interface{}{}))
return err
}
return nil
LeonidVas marked this conversation as resolved.
Show resolved Hide resolved
}
10 changes: 10 additions & 0 deletions cli/bench/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package bench

// Results describes set of benchmark results.
type Results struct {
handledRequestsCount int // Count of all executed requests.
successResultCount int // Count of successful request in all connections.
failedResultCount int // Count of failed request in all connections.
duration float64 // Benchmark duration.
requestsPerSecond int // Cumber of requests per second - the main measured value.
}
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved
33 changes: 33 additions & 0 deletions cli/commands/bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package commands

import (
"github.com/apex/log"
"github.com/spf13/cobra"
"github.com/tarantool/cartridge-cli/cli/bench"
)

func init() {
var benchCmd = &cobra.Command{
Use: "bench",
Short: "Util for running benchmarks for Tarantool",
Long: "Benchmark utility that simulates running commands done by N clients at the same time sending M simultaneous queries",
Run: func(cmd *cobra.Command, args []string) {
if err := bench.Run(ctx.Bench); err != nil {
log.Fatalf(err.Error())
}
},
}
rootCmd.AddCommand(benchCmd)

configureFlags(benchCmd)

benchCmd.Flags().StringVar(&ctx.Bench.URL, "url", "127.0.0.1:3301", "Tarantool address")
filonenko-mikhail marked this conversation as resolved.
Show resolved Hide resolved
benchCmd.Flags().StringVar(&ctx.Bench.User, "user", "guest", "Tarantool user for connection")
benchCmd.Flags().StringVar(&ctx.Bench.Password, "password", "", "Tarantool password for connection")
LeonidVas marked this conversation as resolved.
Show resolved Hide resolved

benchCmd.Flags().IntVar(&ctx.Bench.Connections, "connections", 10, "Number of concurrent connections")
benchCmd.Flags().IntVar(&ctx.Bench.SimultaneousRequests, "requests", 10, "Number of simultaneous requests per connection")
benchCmd.Flags().IntVar(&ctx.Bench.Duration, "duration", 10, "Duration of benchmark test (seconds)")
benchCmd.Flags().IntVar(&ctx.Bench.KeySize, "keysize", 10, "Size of key part of benchmark data (bytes)")
benchCmd.Flags().IntVar(&ctx.Bench.DataSize, "datasize", 20, "Size of value part of benchmark data (bytes)")
}
12 changes: 12 additions & 0 deletions cli/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Ctx struct {
Replicasets ReplicasetsCtx
Connect ConnectCtx
Failover FailoverCtx
Bench BenchCtx
}

type ProjectCtx struct {
Expand Down Expand Up @@ -173,3 +174,14 @@ type FailoverCtx struct {
ParamsJSON string
ProviderParamsJSON string
}

type BenchCtx struct {
URL string // URL - the URL of the tarantool used for testing
User string // User - username to connect to the tarantool.
Password string // Password to connect to the tarantool.
Connections int // Connections describes the number of connection to be used in the test.
SimultaneousRequests int // SimultaneousRequests describes the number of parallel requests from one connection.
Duration int // Duration describes test duration in seconds.
KeySize int // DataSize describes the size of key part of benchmark data (bytes).
DataSize int // DataSize describes the size of value part of benchmark data (bytes).
}
38 changes: 38 additions & 0 deletions test/integration/bench/test_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import signal
import socket
from subprocess import PIPE, STDOUT, Popen
from threading import Thread

import tenacity
from utils import consume_lines, run_command_and_get_output

Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved

@tenacity.retry(stop=tenacity.stop_after_delay(15), wait=tenacity.wait_fixed(1))
def wait_for_connect():
socket.create_connection(('127.0.0.1', 3301))

Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved

def test_bench(cartridge_cmd, request, tmpdir):
base_cmd = [cartridge_cmd, 'bench', '--duration=1']
tarantool_cmd = [
"tarantool",
"-e", f"""box.cfg{{listen="127.0.0.1:3301",work_dir=[[{tmpdir}]]}}""",
"-e", """box.schema.user.grant("guest","super",nil,nil,{if_not_exists=true})"""
]
Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved

env = os.environ.copy()
process = Popen(tarantool_cmd, stdout=PIPE, stderr=STDOUT, env=env)
thread = Thread(target=consume_lines, args=["3301", process.stdout])
thread.start()

def kill():
process.send_signal(signal.SIGKILL)
if thread is not None:
thread.join(5)
request.addfinalizer(kill)

wait_for_connect()

Kirill-Churkin marked this conversation as resolved.
Show resolved Hide resolved
rc, output = run_command_and_get_output(base_cmd, cwd=tmpdir)
assert rc == 0
8 changes: 8 additions & 0 deletions test/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import glob
import gzip
import json
import logging
import os
import re
import shutil
Expand Down Expand Up @@ -1438,3 +1439,10 @@ def get_tarantool_installer_cmd(package_manager):
return f"curl -L https://tarantool.io/installer.sh | \
VER={short_version} bash -s -- --type {tarantool_type} \
&& {package_manager} install -y tarantool"


def consume_lines(port, pipe):
mRrvz marked this conversation as resolved.
Show resolved Hide resolved
logger = logging.getLogger(f'localhost:{port}')
with pipe:
for line in iter(pipe.readline, b''):
logger.warning(line.rstrip().decode('utf-8'))