Skip to content

Commit 9ee6dac

Browse files
Merge pull request #23 from RedisGraph/data.import
Enabled --data-import-terms via csv file
2 parents ea6ffd2 + 1a9c1c5 commit 9ee6dac

File tree

8 files changed

+107
-20
lines changed

8 files changed

+107
-20
lines changed
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
name: Test and coverage
2+
3+
on: [push, pull_request]
4+
5+
jobs:
6+
build:
7+
runs-on: ubuntu-latest
8+
steps:
9+
- uses: actions/checkout@v2
10+
with:
11+
fetch-depth: 2
12+
- uses: actions/setup-go@v2
13+
with:
14+
go-version: '1.17'
15+
- name: Run coverage
16+
run: go test -race -coverprofile=coverage.txt -covermode=atomic
17+
- name: Upload coverage to Codecov
18+
uses: codecov/codecov-action@v3

Makefile

+7
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,10 @@ release:
6666
$(GOGET) github.com/mitchellh/gox
6767
$(GOGET) github.com/tcnksm/ghr
6868
GO111MODULE=on gox -osarch "linux/amd64 darwin/amd64" -output "dist/redisgraph-benchmark-go_{{.OS}}_{{.Arch}}" .
69+
70+
publish: release
71+
@for f in $(shell ls ${DISTDIR}); \
72+
do \
73+
echo "copying ${DISTDIR}/$${f}"; \
74+
aws s3 cp ${DISTDIR}/$${f} s3://benchmarks.redislabs/tools/redisgraph-benchmark-go/unstable/$${f} --acl public-read; \
75+
done

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
[![license](https://img.shields.io/github/license/RedisGraph/redisgraph-benchmark-go.svg)](https://github.com/RedisGraph/redisgraph-benchmark-go)
44
[![GitHub issues](https://img.shields.io/github/release/RedisGraph/redisgraph-benchmark-go.svg)](https://github.com/RedisGraph/redisgraph-benchmark-go/releases/latest)
55
[![Discord](https://img.shields.io/discord/697882427875393627?style=flat-square)](https://discord.gg/gWBRT6P)
6+
[![codecov](https://codecov.io/github/RedisGraph/redisgraph-benchmark-go/branch/main/graph/badge.svg?token=B6ISQSDK3Y)](https://codecov.io/github/RedisGraph/redisgraph-benchmark-go)
67

78
## Overview
89

go.mod

+4-1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ require (
1111
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
1212
)
1313

14-
require github.com/mattn/go-runewidth v0.0.7 // indirect
14+
require (
15+
github.com/google/go-cmp v0.5.9 // indirect
16+
github.com/mattn/go-runewidth v0.0.7 // indirect
17+
)

go.sum

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUz
1818
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
1919
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
2020
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
21-
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
2221
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
22+
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
23+
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
2324
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
2425
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
2526
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=

redisgraph-bechmark-go.go

+46-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"encoding/csv"
45
"flag"
56
"fmt"
67
"github.com/RedisGraph/redisgraph-go"
@@ -26,14 +27,16 @@ func main() {
2627
numberRequests := flag.Uint64("n", 1000000, "Total number of requests")
2728
debug := flag.Int("debug", 0, "Client debug level.")
2829
randomSeed := flag.Int64("random-seed", 12345, "Random seed to use.")
30+
dataImportFile := flag.String("data-import-terms", "", "Read field replacement data from file in csv format. each column should start and end with '__' chars. Example __field1__,__field2__.")
31+
dataImportMode := flag.String("data-import-terms-mode", "seq", "Either 'seq' or 'rand'.")
2932
randomIntMin := flag.Int64("random-int-min", 1, "__rand_int__ lower value limit. __rand_int__ distribution is uniform Random")
3033
randomIntMax := flag.Int64("random-int-max", 1000000, "__rand_int__ upper value limit. __rand_int__ distribution is uniform Random")
3134
graphKey := flag.String("graph-key", "graph", "graph key.")
3235
flag.Var(&benchmarkQueries, "query", "Specify a RedisGraph query to send in quotes. Each command that you specify is run with its ratio. For example: -query=\"CREATE (n)\" -query-ratio=1")
3336
flag.Var(&benchmarkQueriesRO, "query-ro", "Specify a RedisGraph read-only query to send in quotes. You can run multiple commands (both read/write) on the same benchmark. Each command that you specify is run with its ratio. For example: -query=\"CREATE (n)\" -query-ratio=0.5 -query-ro=\"MATCH (n) RETURN n\" -query-ratio=0.5")
3437
flag.Var(&benchmarkQueryRates, "query-ratio", "The query ratio vs other queries used in the same benchmark. Each command that you specify is run with its ratio. For example: -query=\"CREATE (n)\" -query-ratio=0.5 -query=\"MATCH (n) RETURN n\" -query-ratio=0.5")
3538
jsonOutputFile := flag.String("json-out-file", "benchmark-results.json", "Name of json output file to output benchmark results. If not set, will not print to json.")
36-
cliUpdateTick := flag.Duration("reporting-period", time.Second*10, "Period to report stats.")
39+
cliUpdateTick := flag.Duration("reporting-period", time.Second*5, "Period to report stats.")
3740
// data sink
3841
runName := flag.String("exporter-run-name", "perf-run", "Run name.")
3942
rtsHost := flag.String("exporter-rts-host", "127.0.0.1", "RedisTimeSeries hostname.")
@@ -101,9 +104,46 @@ func main() {
101104
} else {
102105
log.Printf("Running in loop until you hit Ctrl+C\n")
103106
}
104-
queries := make([]string, len(benchmarkQueries)+len(benchmarkQueriesRO))
105-
queryIsReadOnly := make([]bool, len(benchmarkQueries)+len(benchmarkQueriesRO))
106-
cmdRates := make([]float64, len(benchmarkQueries)+len(benchmarkQueriesRO))
107+
totalQueries := len(benchmarkQueries) + len(benchmarkQueriesRO)
108+
queries := make([]string, totalQueries)
109+
queryIsReadOnly := make([]bool, totalQueries)
110+
cmdRates := make([]float64, totalQueries)
111+
var replacementArr []map[string]string
112+
dataReplacementEnabled := false
113+
if *dataImportFile != "" {
114+
log.Printf("Reading term data import file from: %s. Using '%s' record read mode.\n", *dataImportFile, *dataImportMode)
115+
dataReplacementEnabled = true
116+
replacementArr = make([]map[string]string, 0)
117+
118+
f, err := os.Open(*dataImportFile)
119+
if err != nil {
120+
log.Fatal("Unable to read input file "+*dataImportFile, err)
121+
}
122+
defer f.Close()
123+
124+
csvReader := csv.NewReader(f)
125+
records, err := csvReader.ReadAll()
126+
headers := records[0]
127+
rlen := len(records) - 1
128+
for i := 0; i < int(*numberRequests); i++ {
129+
// seq mode
130+
recordPos := i % rlen
131+
if strings.Compare(*dataImportMode, "rand") == 0 {
132+
recordPos = rand.Intn(rlen)
133+
}
134+
record := records[recordPos+1]
135+
lineMap := make(map[string]string)
136+
for j := 0; j < len(headers); j++ {
137+
lineMap[headers[j]] = record[j]
138+
}
139+
replacementArr = append(replacementArr, lineMap)
140+
}
141+
if err != nil {
142+
log.Fatal("Unable to parse file as CSV for "+*dataImportFile, err)
143+
}
144+
log.Printf("There are a total of %d disticint lines of terms. Each line has %d columns. Prepared %d groups of records for the benchmark.\n", rlen, len(headers), len(replacementArr))
145+
146+
}
107147
readAndWriteQueries := append(benchmarkQueries, benchmarkQueriesRO...)
108148

109149
for i := 0; i < len(queries); i++ {
@@ -158,7 +198,8 @@ func main() {
158198
if uint64(client_id) == (*clients - uint64(1)) {
159199
clientTotalCmds = samplesPerClientRemainder + samplesPerClient
160200
}
161-
go ingestionRoutine(&rgs[client_id], *continueOnError, queries, queryIsReadOnly, cdf, *randomIntMin, randLimit, clientTotalCmds, *loop, *debug, &wg, useRateLimiter, rateLimiter, graphDatapointsChann)
201+
cmdStartPos := uint64(client_id) * samplesPerClient
202+
go ingestionRoutine(&rgs[client_id], *continueOnError, queries, queryIsReadOnly, cdf, *randomIntMin, randLimit, clientTotalCmds, *loop, *debug, &wg, useRateLimiter, rateLimiter, graphDatapointsChann, dataReplacementEnabled, replacementArr, cmdStartPos)
162203
}
163204

164205
// enter the update loopupdateCLIupdateCLI

workers.go

+16-5
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,28 @@ import (
1111
"time"
1212
)
1313

14-
func ingestionRoutine(rg *redisgraph.Graph, continueOnError bool, cmdS []string, commandIsRO []bool, commandsCDF []float32, randomIntPadding, randomIntMax int64, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, useLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint) {
14+
func ingestionRoutine(rg *redisgraph.Graph, continueOnError bool, cmdS []string, commandIsRO []bool, commandsCDF []float32, randomIntPadding, randomIntMax int64, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, useLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint, replacementEnabled bool, replacementArr []map[string]string, commandStartPos uint64) {
1515
defer wg.Done()
16+
var replacementTerms map[string]string
1617
for i := 0; uint64(i) < number_samples || loop; i++ {
1718
cmdPos := sample(commandsCDF)
18-
sendCmdLogic(rg, cmdS[cmdPos], commandIsRO[cmdPos], randomIntPadding, randomIntMax, cmdPos, continueOnError, debug_level, useLimiter, rateLimiter, statsChannel)
19+
termReplacementPos := commandStartPos + uint64(i)
20+
if replacementEnabled {
21+
replacementTerms = replacementArr[termReplacementPos]
22+
}
23+
sendCmdLogic(rg, cmdS[cmdPos], commandIsRO[cmdPos], randomIntPadding, randomIntMax, cmdPos, continueOnError, debug_level, useLimiter, rateLimiter, statsChannel, replacementEnabled, replacementTerms)
1924
}
2025
}
2126

22-
func sendCmdLogic(rg *redisgraph.Graph, query string, readOnly bool, randomIntPadding, randomIntMax int64, cmdPos int, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint) {
27+
func sendCmdLogic(rg *redisgraph.Graph, query string, readOnly bool, randomIntPadding, randomIntMax int64, cmdPos int, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint, replacementEnabled bool, replacementTerms map[string]string) {
2328
if useRateLimiter {
2429
r := rateLimiter.ReserveN(time.Now(), int(1))
2530
time.Sleep(r.Delay())
2631
}
2732
var err error
2833
var queryResult *redisgraph.QueryResult
29-
processedQuery := processQuery(query, randomIntPadding, randomIntMax)
34+
35+
processedQuery := processQuery(query, randomIntPadding, randomIntMax, replacementEnabled, replacementTerms)
3036
startT := time.Now()
3137
if readOnly {
3238
queryResult, err = rg.ROQuery(processedQuery)
@@ -77,7 +83,12 @@ func sendCmdLogic(rg *redisgraph.Graph, query string, readOnly bool, randomIntPa
7783
statsChannel <- datapoint
7884
}
7985

80-
func processQuery(query string, randomIntPadding int64, randomIntMax int64) string {
86+
func processQuery(query string, randomIntPadding int64, randomIntMax int64, replacementEnabled bool, replacementTerms map[string]string) string {
87+
if replacementEnabled {
88+
for placeholder, term := range replacementTerms {
89+
query = strings.Replace(query, placeholder, term, -1)
90+
}
91+
}
8192
for strings.Index(query, randIntPlaceholder) != -1 {
8293
randIntString := fmt.Sprintf("%d", rand.Int63n(randomIntMax)+randomIntPadding)
8394
query = strings.Replace(query, randIntPlaceholder, randIntString, 1)

workers_test.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,29 @@ func Test_processQuery(t *testing.T) {
1010
query string
1111
randomIntPadding int64
1212
randomIntMax int64
13+
termsMapEnabled bool
14+
termsMap map[string]string
1315
}
1416
rand.Seed(12345)
1517
tests := []struct {
1618
name string
1719
args args
1820
want string
1921
}{
20-
{"no-replacing", args{"CREATE(n)", 0, 0}, "CREATE(n)"},
21-
{"no-replacing", args{"ProblemList=[29849199,27107682]", 0, 0}, "ProblemList=[29849199,27107682]"},
22-
{"no-replacing", args{"ProblemList=[29849199,__rand_int__]", 0, 1}, "ProblemList=[29849199,0]"},
23-
{"no-replacing", args{"ProblemList=[__rand_int__,__rand_int__]", 0, 1}, "ProblemList=[0,0]"},
24-
{"no-replacing", args{"ProblemList=[__rand_int__,11]", 0, 1}, "ProblemList=[0,11]"},
25-
{"no-replacing", args{"ProblemList=[__rand_int__,__rand_int__]", 0, 10}, "ProblemList=[3,4]"},
26-
{"no-replacing", args{"ProblemList=[__rand_int__,__rand_int__]", -1, 10}, "ProblemList=[7,-1]"},
22+
{"no-replacing", args{"CREATE(n)", 0, 0, false, nil}, "CREATE(n)"},
23+
{"no-replacing", args{"ProblemList=[29849199,27107682]", 0, 0, false, nil}, "ProblemList=[29849199,27107682]"},
24+
{"no-replacing", args{"ProblemList=[29849199,__rand_int__]", 0, 1, false, nil}, "ProblemList=[29849199,0]"},
25+
{"no-replacing", args{"ProblemList=[__rand_int__,__rand_int__]", 0, 1, false, nil}, "ProblemList=[0,0]"},
26+
{"no-replacing", args{"ProblemList=[__rand_int__,11]", 0, 1, false, nil}, "ProblemList=[0,11]"},
27+
{"no-replacing", args{"ProblemList=[__rand_int__,__rand_int__]", 0, 10, false, nil}, "ProblemList=[3,4]"},
28+
{"no-replacing", args{"ProblemList=[__rand_int__,__rand_int__]", -1, 10, false, nil}, "ProblemList=[7,-1]"},
29+
{"disabled-term-map", args{"CYPHER entityUid='__Entity__' MATCH(entity:Entity{entityUid:$entityUid}) RETURN entity", -1, 10, false, nil}, "CYPHER entityUid='__Entity__' MATCH(entity:Entity{entityUid:$entityUid}) RETURN entity"},
30+
{"replacing-term-map", args{"CYPHER entityUid='__Entity__' MATCH(entity:Entity{entityUid:$entityUid}) RETURN entity", -1, 10, true, map[string]string{"__Entity__": "fbfa03a5-762b-4d32-be97-f19f3f3dda72"}}, "CYPHER entityUid='fbfa03a5-762b-4d32-be97-f19f3f3dda72' MATCH(entity:Entity{entityUid:$entityUid}) RETURN entity"},
31+
{"term-exists-but-disabled", args{"CYPHER entityUid='__Entity__' MATCH(entity:Entity{entityUid:$entityUid}) RETURN entity", -1, 10, false, map[string]string{"__Entity__": "fbfa03a5-762b-4d32-be97-f19f3f3dda72"}}, "CYPHER entityUid='__Entity__' MATCH(entity:Entity{entityUid:$entityUid}) RETURN entity"},
2732
}
2833
for _, tt := range tests {
2934
t.Run(tt.name, func(t *testing.T) {
30-
if got := processQuery(tt.args.query, tt.args.randomIntPadding, tt.args.randomIntMax); got != tt.want {
35+
if got := processQuery(tt.args.query, tt.args.randomIntPadding, tt.args.randomIntMax, tt.args.termsMapEnabled, tt.args.termsMap); got != tt.want {
3136
t.Errorf("processQuery() = %v, want %v", got, tt.want)
3237
}
3338
})

0 commit comments

Comments
 (0)