@@ -24,12 +24,12 @@ import (
24
24
"github.com/scylladb/gemini/store"
25
25
"go.uber.org/zap"
26
26
"golang.org/x/exp/rand"
27
- "gopkg.in/tomb.v2 "
27
+ "golang.org/x/sync/errgroup "
28
28
)
29
29
30
30
// MutationJob continuously applies mutations against the database
31
31
// for as long as the pump is active.
32
- func MutationJob (ctx context.Context , pump <- chan heartBeat , schema * gemini.Schema , schemaCfg gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , c chan Status , mode string , warmup time.Duration , logger * zap.Logger ) {
32
+ func MutationJob (ctx context.Context , pump <- chan heartBeat , schema * gemini.Schema , schemaCfg gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , c chan Status , mode string , warmup time.Duration , logger * zap.Logger ) error {
33
33
schemaConfig := & schemaCfg
34
34
logger = logger .Named ("mutation_job" )
35
35
testStatus := Status {}
@@ -38,29 +38,39 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sche
38
38
c <- testStatus
39
39
}()
40
40
var i int
41
- for hb := range pump {
42
- hb .await ()
43
- ind := r .Intn (1000000 )
44
- if ind % 100000 == 0 {
45
- ddl (ctx , schema , schemaConfig , table , s , r , p , & testStatus , logger )
46
- } else {
47
- mutation (ctx , schema , schemaConfig , table , s , r , p , g , & testStatus , true , logger )
48
- }
49
- if i % 1000 == 0 {
50
- c <- testStatus
51
- testStatus = Status {}
52
- }
53
- if failFast && (testStatus .ReadErrors > 0 || testStatus .WriteErrors > 0 ) {
54
- c <- testStatus
55
- return
41
+ for {
42
+ select {
43
+ case <- ctx .Done ():
44
+ logger .Debug ("mutation job terminated" )
45
+ return ctx .Err ()
46
+ case hb := <- pump :
47
+ hb .await ()
48
+ ind := r .Intn (1000000 )
49
+ if ind % 100000 == 0 {
50
+ if err := ddl (ctx , schema , schemaConfig , table , s , r , p , & testStatus , logger ); err != nil {
51
+ return err
52
+ }
53
+ } else {
54
+ if err := mutation (ctx , schema , schemaConfig , table , s , r , p , g , & testStatus , true , logger ); err != nil {
55
+ return err
56
+ }
57
+ }
58
+ if i % 1000 == 0 {
59
+ c <- testStatus
60
+ testStatus = Status {}
61
+ }
62
+ if failFast && (testStatus .ReadErrors > 0 || testStatus .WriteErrors > 0 ) {
63
+ c <- testStatus
64
+ return nil
65
+ }
56
66
}
57
67
i ++
58
68
}
59
69
}
60
70
61
71
// ValidationJob continuously applies validations against the database
62
72
// for as long as the pump is active.
63
- func ValidationJob (ctx context.Context , pump <- chan heartBeat , schema * gemini.Schema , schemaCfg gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , c chan Status , mode string , warmup time.Duration , logger * zap.Logger ) {
73
+ func ValidationJob (ctx context.Context , pump <- chan heartBeat , schema * gemini.Schema , schemaCfg gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , c chan Status , mode string , warmup time.Duration , logger * zap.Logger ) error {
64
74
schemaConfig := & schemaCfg
65
75
logger = logger .Named ("validation_job" )
66
76
@@ -69,41 +79,55 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sc
69
79
c <- testStatus
70
80
}()
71
81
var i int
72
- for hb := range pump {
73
- hb .await ()
74
- validation (ctx , schema , schemaConfig , table , s , r , p , g , & testStatus , logger )
75
- if i % 1000 == 0 {
76
- c <- testStatus
77
- testStatus = Status {}
78
- }
79
- if failFast && (testStatus .ReadErrors > 0 || testStatus .WriteErrors > 0 ) {
80
- return
82
+ for {
83
+ select {
84
+ case <- ctx .Done ():
85
+ return ctx .Err ()
86
+ case hb := <- pump :
87
+ hb .await ()
88
+ if cql , err := validation (ctx , schema , schemaConfig , table , s , r , p , g , & testStatus , logger ); err != nil {
89
+ e := JobError {
90
+ Timestamp : time .Now (),
91
+ Message : "Validation failed: " + err .Error (),
92
+ Query : cql ,
93
+ }
94
+ testStatus .Errors = append (testStatus .Errors , e )
95
+ testStatus .ReadErrors ++
96
+ } else {
97
+ testStatus .ReadOps ++
98
+ }
99
+
100
+ if i % 1000 == 0 {
101
+ c <- testStatus
102
+ testStatus = Status {}
103
+ }
104
+ if failFast && (testStatus .ReadErrors > 0 || testStatus .WriteErrors > 0 ) {
105
+ return nil
106
+ }
81
107
}
82
108
i ++
83
109
}
84
110
}
85
111
86
112
// WarmupJob continuously applies mutations against the database
87
113
// for as long as the pump is active or the supplied duration expires.
88
- func WarmupJob (ctx context.Context , pump <- chan heartBeat , schema * gemini.Schema , schemaCfg gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , c chan Status , mode string , warmup time.Duration , logger * zap.Logger ) {
114
+ func WarmupJob (ctx context.Context , pump <- chan heartBeat , schema * gemini.Schema , schemaCfg gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , c chan Status , mode string , warmup time.Duration , logger * zap.Logger ) error {
89
115
schemaConfig := & schemaCfg
90
116
testStatus := Status {}
91
117
var i int
92
118
warmupTimer := time .NewTimer (warmup )
93
119
for {
94
120
select {
95
- case _ , ok := <- pump :
96
- if ! ok {
97
- logger .Debug ("warmup job terminated" )
98
- c <- testStatus
99
- return
100
- }
101
- }
102
- select {
121
+ case <- ctx .Done ():
122
+ logger .Debug ("warmup job terminated" )
123
+ c <- testStatus
124
+ return ctx .Err ()
103
125
case <- warmupTimer .C :
126
+ logger .Debug ("warmup job finished" )
104
127
c <- testStatus
105
- return
128
+ return nil
106
129
default :
130
+ // Do we care about errors during warmup?
107
131
mutation (ctx , schema , schemaConfig , table , s , r , p , g , & testStatus , false , logger )
108
132
if i % 1000 == 0 {
109
133
c <- testStatus
@@ -113,8 +137,8 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema
113
137
}
114
138
}
115
139
116
- func job (t * tomb. Tomb , f testJob , actors uint64 , schema * gemini.Schema , schemaConfig gemini.SchemaConfig , s store.Store , pump * Pump , generators []* gemini.Generator , result chan Status , logger * zap.Logger ) {
117
- workerCtx , _ := context . WithCancel ( context . Background () )
140
+ func job (ctx context. Context , f testJob , actors uint64 , schema * gemini.Schema , schemaConfig gemini.SchemaConfig , s store.Store , pump * Pump , generators []* gemini.Generator , result chan Status , logger * zap.Logger ) error {
141
+ g , gCtx := errgroup . WithContext ( ctx )
118
142
partitionRangeConfig := gemini.PartitionRangeConfig {
119
143
MaxBlobLength : schemaConfig .MaxBlobLength ,
120
144
MinBlobLength : schemaConfig .MinBlobLength ,
@@ -123,39 +147,39 @@ func job(t *tomb.Tomb, f testJob, actors uint64, schema *gemini.Schema, schemaCo
123
147
}
124
148
125
149
for j , table := range schema .Tables {
126
- g := generators [j ]
150
+ gen := generators [j ]
127
151
for i := 0 ; i < int (actors ); i ++ {
128
152
r := rand .New (rand .NewSource (seed ))
129
- t .Go (func () error {
130
- f (workerCtx , pump .ch , schema , schemaConfig , table , s , r , partitionRangeConfig , g , result , mode , warmup , logger )
131
- return nil
153
+ g .Go (func () error {
154
+ return f (gCtx , pump .ch , schema , schemaConfig , table , s , r , partitionRangeConfig , gen , result , mode , warmup , logger )
132
155
})
133
156
}
134
157
}
158
+ return g .Wait ()
135
159
}
136
160
137
- func ddl (ctx context.Context , schema * gemini.Schema , sc * gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , testStatus * Status , logger * zap.Logger ) {
161
+ func ddl (ctx context.Context , schema * gemini.Schema , sc * gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , testStatus * Status , logger * zap.Logger ) error {
138
162
if sc .CQLFeature != gemini .CQL_FEATURE_ALL {
139
163
logger .Debug ("ddl statements disabled" )
140
- return
164
+ return nil
141
165
}
142
166
table .Lock ()
143
167
if len (table .MaterializedViews ) > 0 {
144
168
// Scylla does not allow changing the DDL of a table with materialized views.
145
- return
169
+ return nil
146
170
}
147
171
defer table .Unlock ()
148
172
ddlStmts , postStmtHook , err := schema .GenDDLStmt (table , r , p , sc )
149
173
if err != nil {
150
174
logger .Error ("Failed! Mutation statement generation failed" , zap .Error (err ))
151
175
testStatus .WriteErrors ++
152
- return
176
+ return err
153
177
}
154
178
if ddlStmts == nil {
155
179
if w := logger .Check (zap .DebugLevel , "no statement generated" ); w != nil {
156
180
w .Write (zap .String ("job" , "ddl" ))
157
181
}
158
- return
182
+ return nil
159
183
}
160
184
defer postStmtHook ()
161
185
defer func () {
@@ -181,20 +205,21 @@ func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, ta
181
205
testStatus .WriteOps ++
182
206
}
183
207
}
208
+ return nil
184
209
}
185
210
186
- func mutation (ctx context.Context , schema * gemini.Schema , _ * gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , testStatus * Status , deletes bool , logger * zap.Logger ) {
211
+ func mutation (ctx context.Context , schema * gemini.Schema , _ * gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , testStatus * Status , deletes bool , logger * zap.Logger ) error {
187
212
mutateStmt , err := schema .GenMutateStmt (table , g , r , p , deletes )
188
213
if err != nil {
189
214
logger .Error ("Failed! Mutation statement generation failed" , zap .Error (err ))
190
215
testStatus .WriteErrors ++
191
- return
216
+ return err
192
217
}
193
218
if mutateStmt == nil {
194
219
if w := logger .Check (zap .DebugLevel , "no statement generated" ); w != nil {
195
220
w .Write (zap .String ("job" , "mutation" ))
196
221
}
197
- return
222
+ return err
198
223
}
199
224
mutateQuery := mutateStmt .Query
200
225
token , mutateValues := mutateStmt .Values ()
@@ -217,15 +242,16 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig
217
242
} else {
218
243
testStatus .WriteOps ++
219
244
}
245
+ return nil
220
246
}
221
247
222
- func validation (ctx context.Context , schema * gemini.Schema , sc * gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , testStatus * Status , logger * zap.Logger ) {
248
+ func validation (ctx context.Context , schema * gemini.Schema , sc * gemini.SchemaConfig , table * gemini.Table , s store.Store , r * rand.Rand , p gemini.PartitionRangeConfig , g * gemini.Generator , testStatus * Status , logger * zap.Logger ) ( string , error ) {
223
249
checkStmt := schema .GenCheckStmt (table , g , r , p )
224
250
if checkStmt == nil {
225
251
if w := logger .Check (zap .DebugLevel , "no statement generated" ); w != nil {
226
252
w .Write (zap .String ("job" , "validation" ))
227
253
}
228
- return
254
+ return "" , nil
229
255
}
230
256
checkQuery := checkStmt .Query
231
257
token , checkValues := checkStmt .Values ()
@@ -243,23 +269,19 @@ func validation(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaCon
243
269
for attempts := 0 ; attempts < maxAttempts ; attempts ++ {
244
270
logger .Info ("validation failed for possible async operation" ,
245
271
zap .Duration ("trying_again_in" , delay ))
246
- time .Sleep (delay )
272
+ select {
273
+ case <- time .After (delay ):
274
+ case <- ctx .Done ():
275
+ return checkStmt .PrettyCQL (), err
276
+ }
247
277
// Should we sample all the errors?
248
278
if err = s .Check (ctx , table , checkQuery , checkValues ... ); err == nil {
249
279
// Result sets stabilized
250
- return
280
+ return "" , nil
251
281
}
252
282
}
253
283
}
254
- // De-duplication needed?
255
- e := JobError {
256
- Timestamp : time .Now (),
257
- Message : "Validation failed: " + err .Error (),
258
- Query : checkStmt .PrettyCQL (),
259
- }
260
- testStatus .Errors = append (testStatus .Errors , e )
261
- testStatus .ReadErrors ++
262
- } else {
263
- testStatus .ReadOps ++
284
+ return checkStmt .PrettyCQL (), err
264
285
}
286
+ return "" , nil
265
287
}
0 commit comments