Skip to content

Commit 84f74b3

Browse files
author
Henrik Johansson
authored
Merge pull request #211 from scylladb/allow_secondary_indexes_to_stabilize
gemini: reapply loads of possibly async resultset
2 parents 13e8655 + 18c5907 commit 84f74b3

File tree

4 files changed

+151
-98
lines changed

4 files changed

+151
-98
lines changed

cmd/gemini/jobs.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig
201201
}
202202
}
203203

204-
func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, logger *zap.Logger) {
204+
func validation(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, logger *zap.Logger) {
205205
checkStmt := schema.GenCheckStmt(table, g, r, p)
206206
if checkStmt == nil {
207207
if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
@@ -219,6 +219,20 @@ func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConf
219219
w.Write(zap.String("pretty_cql", checkStmt.PrettyCQL()))
220220
}
221221
if err := s.Check(ctx, table, checkQuery, checkValues...); err != nil {
222+
if checkStmt.QueryType.PossibleAsyncOperation() {
223+
maxAttempts := sc.AsyncObjectStabilizationAttempts
224+
delay := sc.AsyncObjectStabilizationDelay
225+
for attempts := 0; attempts < maxAttempts; attempts++ {
226+
logger.Info("validation failed for possible async operation",
227+
zap.Duration("trying_again_in", delay))
228+
time.Sleep(delay)
229+
// Should we sample all the errors?
230+
if err = s.Check(ctx, table, checkQuery, checkValues...); err == nil {
231+
// Result sets stabilized
232+
return
233+
}
234+
}
235+
}
222236
// De-duplication needed?
223237
e := JobError{
224238
Timestamp: time.Now(),

cmd/gemini/root.go

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -31,42 +31,44 @@ import (
3131
)
3232

3333
var (
34-
testClusterHost []string
35-
oracleClusterHost []string
36-
schemaFile string
37-
outFileArg string
38-
concurrency uint64
39-
seed uint64
40-
dropSchema bool
41-
verbose bool
42-
mode string
43-
failFast bool
44-
nonInteractive bool
45-
duration time.Duration
46-
bind string
47-
warmup time.Duration
48-
compactionStrategy string
49-
replicationStrategy string
50-
consistency string
51-
maxTables int
52-
maxPartitionKeys int
53-
minPartitionKeys int
54-
maxClusteringKeys int
55-
minClusteringKeys int
56-
maxColumns int
57-
minColumns int
58-
datasetSize string
59-
cqlFeatures string
60-
level string
61-
maxRetriesMutate int
62-
maxRetriesMutateSleep time.Duration
63-
pkBufferReuseSize uint64
64-
partitionCount uint64
65-
partitionKeyDistribution string
66-
normalDistMean float64
67-
normalDistSigma float64
68-
tracingOutFile string
69-
useCounters bool
34+
testClusterHost []string
35+
oracleClusterHost []string
36+
schemaFile string
37+
outFileArg string
38+
concurrency uint64
39+
seed uint64
40+
dropSchema bool
41+
verbose bool
42+
mode string
43+
failFast bool
44+
nonInteractive bool
45+
duration time.Duration
46+
bind string
47+
warmup time.Duration
48+
compactionStrategy string
49+
replicationStrategy string
50+
consistency string
51+
maxTables int
52+
maxPartitionKeys int
53+
minPartitionKeys int
54+
maxClusteringKeys int
55+
minClusteringKeys int
56+
maxColumns int
57+
minColumns int
58+
datasetSize string
59+
cqlFeatures string
60+
level string
61+
maxRetriesMutate int
62+
maxRetriesMutateSleep time.Duration
63+
pkBufferReuseSize uint64
64+
partitionCount uint64
65+
partitionKeyDistribution string
66+
normalDistMean float64
67+
normalDistSigma float64
68+
tracingOutFile string
69+
useCounters bool
70+
asyncObjectStabilizationAttempts int
71+
asyncObjectStabilizationDelay time.Duration
7072
)
7173

7274
const (
@@ -458,6 +460,8 @@ func init() {
458460
rootCmd.Flags().Float64VarP(&normalDistSigma, "normal-dist-sigma", "", oneStdDev, "Sigma of the normal distribution, defaults to one standard deviation ~0.341")
459461
rootCmd.Flags().StringVarP(&tracingOutFile, "tracing-outfile", "", "", "Specify the file to which tracing information gets written. Two magic names are available, 'stdout' and 'stderr'. By default tracing is disabled.")
460462
rootCmd.Flags().BoolVarP(&useCounters, "use-counters", "", false, "Ensure that at least one table is a counter table")
463+
rootCmd.Flags().IntVarP(&asyncObjectStabilizationAttempts, "async-objects-stabilization-attempts", "", 10, "Maximum number of attempts to validate result sets from MV and SI")
464+
rootCmd.Flags().DurationVarP(&asyncObjectStabilizationDelay, "async-objects-stabilization-backoff", "", 10*time.Millisecond, "Duration between attempts to validate result sets from MV and SI for example 10ms or 1s")
461465
}
462466

463467
func printSetup() error {

cmd/gemini/schema.go

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,23 @@ func createSchemaConfig(logger *zap.Logger) gemini.SchemaConfig {
1313
switch strings.ToLower(datasetSize) {
1414
case "small":
1515
return gemini.SchemaConfig{
16-
CompactionStrategy: defaultConfig.CompactionStrategy,
17-
ReplicationStrategy: defaultConfig.ReplicationStrategy,
18-
MaxTables: defaultConfig.MaxTables,
19-
MaxPartitionKeys: defaultConfig.MaxPartitionKeys,
20-
MinPartitionKeys: defaultConfig.MinPartitionKeys,
21-
MaxClusteringKeys: defaultConfig.MaxClusteringKeys,
22-
MinClusteringKeys: defaultConfig.MinClusteringKeys,
23-
MaxColumns: defaultConfig.MaxColumns,
24-
MinColumns: defaultConfig.MinColumns,
25-
MaxUDTParts: 2,
26-
MaxTupleParts: 2,
27-
MaxBlobLength: 20,
28-
MaxStringLength: 20,
29-
UseCounters: defaultConfig.UseCounters,
30-
CQLFeature: defaultConfig.CQLFeature,
16+
CompactionStrategy: defaultConfig.CompactionStrategy,
17+
ReplicationStrategy: defaultConfig.ReplicationStrategy,
18+
MaxTables: defaultConfig.MaxTables,
19+
MaxPartitionKeys: defaultConfig.MaxPartitionKeys,
20+
MinPartitionKeys: defaultConfig.MinPartitionKeys,
21+
MaxClusteringKeys: defaultConfig.MaxClusteringKeys,
22+
MinClusteringKeys: defaultConfig.MinClusteringKeys,
23+
MaxColumns: defaultConfig.MaxColumns,
24+
MinColumns: defaultConfig.MinColumns,
25+
MaxUDTParts: 2,
26+
MaxTupleParts: 2,
27+
MaxBlobLength: 20,
28+
MaxStringLength: 20,
29+
UseCounters: defaultConfig.UseCounters,
30+
CQLFeature: defaultConfig.CQLFeature,
31+
AsyncObjectStabilizationAttempts: defaultConfig.AsyncObjectStabilizationAttempts,
32+
AsyncObjectStabilizationDelay: defaultConfig.AsyncObjectStabilizationDelay,
3133
}
3234
default:
3335
return defaultConfig
@@ -44,22 +46,24 @@ func createDefaultSchemaConfig(logger *zap.Logger) gemini.SchemaConfig {
4446
MaxUDTParts = 20
4547
)
4648
return gemini.SchemaConfig{
47-
CompactionStrategy: getCompactionStrategy(compactionStrategy, logger),
48-
ReplicationStrategy: getReplicationStrategy(replicationStrategy, replication.NewSimpleStrategy(), logger),
49-
MaxTables: maxTables,
50-
MaxPartitionKeys: maxPartitionKeys,
51-
MinPartitionKeys: minPartitionKeys,
52-
MaxClusteringKeys: maxClusteringKeys,
53-
MinClusteringKeys: minClusteringKeys,
54-
MaxColumns: maxColumns,
55-
MinColumns: minColumns,
56-
MaxUDTParts: MaxUDTParts,
57-
MaxTupleParts: MaxTupleParts,
58-
MaxBlobLength: MaxBlobLength,
59-
MinBlobLength: MinBlobLength,
60-
MaxStringLength: MaxStringLength,
61-
MinStringLength: MinStringLength,
62-
UseCounters: useCounters,
63-
CQLFeature: getCQLFeature(cqlFeatures),
49+
CompactionStrategy: getCompactionStrategy(compactionStrategy, logger),
50+
ReplicationStrategy: getReplicationStrategy(replicationStrategy, replication.NewSimpleStrategy(), logger),
51+
MaxTables: maxTables,
52+
MaxPartitionKeys: maxPartitionKeys,
53+
MinPartitionKeys: minPartitionKeys,
54+
MaxClusteringKeys: maxClusteringKeys,
55+
MinClusteringKeys: minClusteringKeys,
56+
MaxColumns: maxColumns,
57+
MinColumns: minColumns,
58+
MaxUDTParts: MaxUDTParts,
59+
MaxTupleParts: MaxTupleParts,
60+
MaxBlobLength: MaxBlobLength,
61+
MinBlobLength: MinBlobLength,
62+
MaxStringLength: MaxStringLength,
63+
MinStringLength: MinStringLength,
64+
UseCounters: useCounters,
65+
CQLFeature: getCQLFeature(cqlFeatures),
66+
AsyncObjectStabilizationAttempts: asyncObjectStabilizationAttempts,
67+
AsyncObjectStabilizationDelay: asyncObjectStabilizationDelay,
6468
}
6569
}

schema.go

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"strings"
77
"sync"
8+
"time"
89

910
"github.com/gocql/gocql"
1011
"github.com/pkg/errors"
@@ -26,23 +27,25 @@ const (
2627
type Value []interface{}
2728

2829
type SchemaConfig struct {
29-
CompactionStrategy *CompactionStrategy
30-
ReplicationStrategy *replication.Replication
31-
MaxTables int
32-
MaxPartitionKeys int
33-
MinPartitionKeys int
34-
MaxClusteringKeys int
35-
MinClusteringKeys int
36-
MaxColumns int
37-
MinColumns int
38-
MaxUDTParts int
39-
MaxTupleParts int
40-
MaxBlobLength int
41-
MaxStringLength int
42-
MinBlobLength int
43-
MinStringLength int
44-
UseCounters bool
45-
CQLFeature CQLFeature
30+
CompactionStrategy *CompactionStrategy
31+
ReplicationStrategy *replication.Replication
32+
MaxTables int
33+
MaxPartitionKeys int
34+
MinPartitionKeys int
35+
MaxClusteringKeys int
36+
MinClusteringKeys int
37+
MaxColumns int
38+
MinColumns int
39+
MaxUDTParts int
40+
MaxTupleParts int
41+
MaxBlobLength int
42+
MaxStringLength int
43+
MinBlobLength int
44+
MinStringLength int
45+
UseCounters bool
46+
CQLFeature CQLFeature
47+
AsyncObjectStabilizationAttempts int
48+
AsyncObjectStabilizationDelay time.Duration
4649
}
4750

4851
var (
@@ -294,6 +297,7 @@ func (t *Table) alterColumn(keyspace string) ([]*Stmt, func(), error) {
294297
Values: func() (uint64, []interface{}) {
295298
return 0, nil
296299
},
300+
QueryType: AlterColumnStatementType,
297301
})
298302
fmt.Println(stmt)
299303
return stmts, func() {
@@ -315,6 +319,7 @@ func (t *Table) dropColumn(keyspace string) ([]*Stmt, func(), error) {
315319
Values: func() (uint64, []interface{}) {
316320
return 0, nil
317321
},
322+
QueryType: DropColumnStatementType,
318323
})
319324
return stmts, func() {
320325
t.Columns = append(t.Columns[:idx], t.Columns[idx+1:]...)
@@ -328,11 +333,29 @@ type MaterializedView struct {
328333
}
329334

330335
type Stmt struct {
331-
Query qb.Builder
332-
Values func() (uint64, []interface{})
333-
Types []Type
336+
Query qb.Builder
337+
Values func() (uint64, []interface{})
338+
Types []Type
339+
QueryType StatementType
334340
}
335341

342+
type StatementType uint8
343+
344+
func (st StatementType) PossibleAsyncOperation() bool {
345+
return st == SelectByIndexStatementType || st == SelectFromMaterializedViewStatementType
346+
}
347+
348+
const (
349+
SelectStatementType StatementType = iota
350+
SelectRangeStatementType
351+
SelectByIndexStatementType
352+
SelectFromMaterializedViewStatementType
353+
DeleteStatementType
354+
InsertStatement
355+
AlterColumnStatementType
356+
DropColumnStatementType
357+
)
358+
336359
func (s *Stmt) PrettyCQL() string {
337360
var replaced int
338361
query, _ := s.Query.ToCql()
@@ -378,7 +401,7 @@ func GenSchema(sc SchemaConfig) *Schema {
378401
builder.Keyspace(keyspace)
379402
numTables := 1 + rand.Intn(sc.GetMaxTables())
380403
for i := 0; i < numTables; i++ {
381-
table := createTable(sc, fmt.Sprintf("table%d", i + 1))
404+
table := createTable(sc, fmt.Sprintf("table%d", i+1))
382405
builder.Table(&table)
383406
}
384407
return builder.Build()
@@ -650,7 +673,8 @@ func (s *Schema) insertStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRan
650673
Values: func() (uint64, []interface{}) {
651674
return valuesWithToken.Token, values
652675
},
653-
Types: typs,
676+
Types: typs,
677+
QueryType: InsertStatement,
654678
}, nil
655679
}
656680

@@ -711,7 +735,8 @@ func (s *Schema) GenInsertJsonStmt(t *Table, g *Generator, r *rand.Rand, p Parti
711735
Values: func() (uint64, []interface{}) {
712736
return vals.Token, []interface{}{string(jsonString)}
713737
},
714-
Types: []Type{TYPE_TEXT},
738+
Types: []Type{TYPE_TEXT},
739+
QueryType: InsertStatement,
715740
}, nil
716741
}
717742

@@ -745,7 +770,8 @@ func (s *Schema) GenDeleteRows(t *Table, g *Generator, r *rand.Rand, p Partition
745770
Values: func() (uint64, []interface{}) {
746771
return vs.Token, values
747772
},
748-
Types: typs,
773+
Types: typs,
774+
QueryType: DeleteStatementType,
749775
}, nil
750776
}
751777

@@ -840,7 +866,8 @@ func (s *Schema) genSinglePartitionQuery(t *Table, g *Generator, r *rand.Rand, p
840866
Values: func() (uint64, []interface{}) {
841867
return values.Token, values.Value
842868
},
843-
Types: typs,
869+
Types: typs,
870+
QueryType: SelectStatementType,
844871
}
845872
}
846873

@@ -883,7 +910,8 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, g *Generator, r *rand.Rand,
883910
Values: func() (uint64, []interface{}) {
884911
return 0, values
885912
},
886-
Types: typs,
913+
Types: typs,
914+
QueryType: SelectStatementType,
887915
}
888916
}
889917

@@ -933,7 +961,8 @@ func (s *Schema) genClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p
933961
Values: func() (uint64, []interface{}) {
934962
return 0, values
935963
},
936-
Types: typs,
964+
Types: typs,
965+
QueryType: SelectRangeStatementType,
937966
}
938967
}
939968

@@ -989,7 +1018,8 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, g *Generator
9891018
Values: func() (uint64, []interface{}) {
9901019
return 0, values
9911020
},
992-
Types: typs,
1021+
Types: typs,
1022+
QueryType: SelectRangeStatementType,
9931023
}
9941024
}
9951025

@@ -1023,7 +1053,8 @@ func (s *Schema) genSingleIndexQuery(t *Table, g *Generator, r *rand.Rand, p Par
10231053
Values: func() (uint64, []interface{}) {
10241054
return 0, values
10251055
},
1026-
Types: typs,
1056+
Types: typs,
1057+
QueryType: SelectByIndexStatementType,
10271058
}
10281059
}
10291060

0 commit comments

Comments
 (0)