Skip to content

Commit b9c6bb6

Browse files
craig[bot]andy-kimball
andcommitted
Merge #141837
141837: vecindex: add context params to Index and Txn methods r=drewkimball a=andy-kimball For performance reasons, it's desirable to be able to reuse state across calls to Index and Txn methods. For example, when searching partitions, we need to repeatedly allocate large slices. Previously, we used a Workspace attached to the Go context.Context object for reusing slices. This PR adds explicit IndexContext and TxnContext parameters to Index and Txn methods that implementations can use for temporary state. In addition, we plan to add more context information to these contexts, notably "tree keys", which allow a C-SPANN index to be partitioned into multiple trees, separated by KV prefix. Epic: CRDB-42943 Release note: None Co-authored-by: Andrew Kimball <[email protected]>
2 parents 881952f + a46e560 commit b9c6bb6

35 files changed

+400
-406
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -648,8 +648,8 @@ ALL_TESTS = [
648648
"//pkg/sql/types:types_test",
649649
"//pkg/sql/vecindex/cspann/memstore:memstore_test",
650650
"//pkg/sql/vecindex/cspann/quantize:quantize_test",
651+
"//pkg/sql/vecindex/cspann/workspace:workspace_test",
651652
"//pkg/sql/vecindex/cspann:cspann_test",
652-
"//pkg/sql/vecindex/veclib:veclib_test",
653653
"//pkg/sql/vecindex/vecstore:vecstore_test",
654654
"//pkg/sql/vecindex:vecindex_test",
655655
"//pkg/sql:sql_disallowed_imports_test",
@@ -2343,10 +2343,10 @@ GO_TARGETS = [
23432343
"//pkg/sql/vecindex/cspann/quantize:quantize_test",
23442344
"//pkg/sql/vecindex/cspann/testutils:testutils",
23452345
"//pkg/sql/vecindex/cspann/utils:utils",
2346+
"//pkg/sql/vecindex/cspann/workspace:workspace",
2347+
"//pkg/sql/vecindex/cspann/workspace:workspace_test",
23462348
"//pkg/sql/vecindex/cspann:cspann",
23472349
"//pkg/sql/vecindex/cspann:cspann_test",
2348-
"//pkg/sql/vecindex/veclib:veclib",
2349-
"//pkg/sql/vecindex/veclib:veclib_test",
23502350
"//pkg/sql/vecindex/vecpb:vecpb",
23512351
"//pkg/sql/vecindex/vecstore:vecstore",
23522352
"//pkg/sql/vecindex/vecstore:vecstore_test",

pkg/cmd/vecbench/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ go_library(
1313
"//pkg/sql/vecindex/cspann",
1414
"//pkg/sql/vecindex/cspann/memstore",
1515
"//pkg/sql/vecindex/cspann/quantize",
16-
"//pkg/sql/vecindex/veclib",
1716
"//pkg/util/stop",
1817
"//pkg/util/syncutil",
1918
"//pkg/util/timeutil",

pkg/cmd/vecbench/main.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann"
2929
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann/memstore"
3030
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann/quantize"
31-
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/veclib"
3231
"github.com/cockroachdb/cockroach/pkg/util/stop"
3332
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3433
"github.com/cockroachdb/cockroach/pkg/util/vector"
@@ -195,12 +194,13 @@ func searchIndex(ctx context.Context, stopper *stop.Stopper, datasetName string)
195194
data := loadDataset(searchFileName)
196195
fmt.Println()
197196

198-
var workspace veclib.Workspace
197+
var idxCtx cspann.Context
199198
doSearch := func(beamSize int) {
200199
start := timeutil.Now()
201200

202-
txn := beginTransaction(ctx, &workspace, index.Store())
201+
txn := beginTransaction(ctx, index.Store())
203202
defer commitTransaction(ctx, index.Store(), txn)
203+
idxCtx.Init(txn)
204204

205205
// Search for test vectors.
206206
var sumMAP, sumVectors, sumLeafVectors, sumFullVectors, sumPartitions float64
@@ -213,7 +213,7 @@ func searchIndex(ctx context.Context, stopper *stop.Stopper, datasetName string)
213213
searchOptions := cspann.SearchOptions{BaseBeamSize: beamSize}
214214

215215
// Calculate prediction set for the vector.
216-
err = index.Search(ctx, txn, queryVector, &searchSet, searchOptions)
216+
err = index.Search(ctx, &idxCtx, queryVector, &searchSet, searchOptions)
217217
if err != nil {
218218
panic(err)
219219
}
@@ -407,16 +407,17 @@ func buildIndex(
407407

408408
// Insert block of vectors within the scope of a transaction.
409409
var insertCount atomic.Uint64
410-
insertBlock := func(w *veclib.Workspace, start, end int) {
411-
txn := beginTransaction(ctx, w, store)
410+
insertBlock := func(idxCtx *cspann.Context, start, end int) {
411+
txn := beginTransaction(ctx, store)
412412
defer commitTransaction(ctx, store, txn)
413+
idxCtx.Init(txn)
413414

414415
for i := start; i < end; i++ {
415416
key := primaryKeys[i*4 : i*4+4]
416417
vec := data.Train.At(i)
417418
store.InsertVector(key, vec)
418419
startMono := crtime.NowMono()
419-
if err := index.Insert(ctx, txn, vec, key); err != nil {
420+
if err := index.Insert(ctx, idxCtx, vec, key); err != nil {
420421
panic(err)
421422
}
422423
estimator.Add(startMono.Elapsed().Seconds())
@@ -443,11 +444,11 @@ func buildIndex(
443444
for i := 0; i < data.Train.Count; i += countPerProc {
444445
end := min(i+countPerProc, data.Train.Count)
445446
go func(start, end int) {
446-
var workspace veclib.Workspace
447+
var indexCtx cspann.Context
447448
// Break vector group into individual transactions that each insert a
448449
// block of vectors. Run any pending fixups after each block.
449450
for j := start; j < end; j += blockSize {
450-
insertBlock(&workspace, j, min(j+blockSize, end))
451+
insertBlock(&indexCtx, j, min(j+blockSize, end))
451452
}
452453
}(i, end)
453454
}
@@ -575,16 +576,16 @@ func loadDataset(fileName string) dataset {
575576
return data
576577
}
577578

578-
func beginTransaction(ctx context.Context, w *veclib.Workspace, store cspann.Store) cspann.Txn {
579-
txn, err := store.Begin(ctx, w)
579+
func beginTransaction(ctx context.Context, store cspann.Store) cspann.Txn {
580+
txn, err := store.BeginTransaction(ctx)
580581
if err != nil {
581582
panic(err)
582583
}
583584
return txn
584585
}
585586

586587
func commitTransaction(ctx context.Context, store cspann.Store, txn cspann.Txn) {
587-
if err := store.Commit(ctx, txn); err != nil {
588+
if err := store.CommitTransaction(ctx, txn); err != nil {
588589
panic(err)
589590
}
590591
}

pkg/sql/vecindex/cspann/BUILD.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ go_library(
2929
deps = [
3030
"//pkg/sql/vecindex/cspann/quantize",
3131
"//pkg/sql/vecindex/cspann/utils",
32-
"//pkg/sql/vecindex/veclib",
32+
"//pkg/sql/vecindex/cspann/workspace",
3333
"//pkg/util/container/heap",
3434
"//pkg/util/log",
3535
"//pkg/util/num32",
@@ -64,7 +64,7 @@ go_test(
6464
"//pkg/sql/vecindex/cspann/quantize",
6565
"//pkg/sql/vecindex/cspann/testutils",
6666
"//pkg/sql/vecindex/cspann/utils",
67-
"//pkg/sql/vecindex/veclib",
67+
"//pkg/sql/vecindex/cspann/workspace",
6868
"//pkg/util/leaktest",
6969
"//pkg/util/log",
7070
"//pkg/util/num32",

pkg/sql/vecindex/cspann/commontest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ go_library(
99
"//pkg/sql/vecindex/cspann",
1010
"//pkg/sql/vecindex/cspann/quantize",
1111
"//pkg/sql/vecindex/cspann/testutils",
12-
"//pkg/sql/vecindex/veclib",
12+
"//pkg/sql/vecindex/cspann/workspace",
1313
"//pkg/util/vector",
1414
"@com_github_stretchr_testify//require",
1515
"@org_gonum_v1_gonum//floats/scalar",

pkg/sql/vecindex/cspann/commontest/storetests.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann"
1313
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann/quantize"
1414
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann/testutils"
15-
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/veclib"
15+
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann/workspace"
1616
"github.com/cockroachdb/cockroach/pkg/util/vector"
1717
"github.com/stretchr/testify/require"
1818
"gonum.org/v1/gonum/floats/scalar"
@@ -28,7 +28,7 @@ func StoreTests(
2828
testPKs []cspann.KeyBytes,
2929
testVectors []vector.T,
3030
) {
31-
var workspace veclib.Workspace
31+
var workspace workspace.T
3232
childKey2 := cspann.ChildKey{PartitionKey: 2}
3333
valueBytes2 := cspann.ValueBytes{0}
3434
primaryKey100 := cspann.ChildKey{KeyBytes: cspann.KeyBytes{1, 00}}
@@ -45,7 +45,7 @@ func StoreTests(
4545
valueBytes600 := cspann.ValueBytes{11, 12}
4646

4747
t.Run("get full vectors", func(t *testing.T) {
48-
txn := BeginTransaction(ctx, t, &workspace, store)
48+
txn := BeginTransaction(ctx, t, store)
4949
defer CommitTransaction(ctx, t, store, txn)
5050

5151
// Include primary keys that cannot be found.
@@ -84,7 +84,7 @@ func StoreTests(
8484
})
8585

8686
t.Run("search empty root partition", func(t *testing.T) {
87-
txn := BeginTransaction(ctx, t, &workspace, store)
87+
txn := BeginTransaction(ctx, t, store)
8888
defer CommitTransaction(ctx, t, store, txn)
8989

9090
searchSet := cspann.SearchSet{MaxResults: 2}
@@ -103,7 +103,7 @@ func StoreTests(
103103
})
104104

105105
t.Run("add to root partition", func(t *testing.T) {
106-
txn := BeginTransaction(ctx, t, &workspace, store)
106+
txn := BeginTransaction(ctx, t, store)
107107
defer CommitTransaction(ctx, t, store, txn)
108108

109109
// Get partition metadata with forUpdate = true before updates.
@@ -112,18 +112,22 @@ func StoreTests(
112112
CheckPartitionMetadata(t, metadata, cspann.Level(1), vector.T{0, 0}, 0)
113113

114114
// Add to root partition.
115-
metadata, err = txn.AddToPartition(ctx, cspann.RootKey, vector.T{1, 2}, primaryKey100, valueBytes100)
115+
metadata, err = txn.AddToPartition(
116+
ctx, cspann.RootKey, vector.T{1, 2}, primaryKey100, valueBytes100)
116117
require.NoError(t, err)
117118
CheckPartitionMetadata(t, metadata, cspann.LeafLevel, vector.T{0, 0}, 1)
118-
metadata, err = txn.AddToPartition(ctx, cspann.RootKey, vector.T{7, 4}, primaryKey200, valueBytes200)
119+
metadata, err = txn.AddToPartition(
120+
ctx, cspann.RootKey, vector.T{7, 4}, primaryKey200, valueBytes200)
119121
require.NoError(t, err)
120122
CheckPartitionMetadata(t, metadata, cspann.LeafLevel, vector.T{0, 0}, 2)
121-
metadata, err = txn.AddToPartition(ctx, cspann.RootKey, vector.T{4, 3}, primaryKey300, valueBytes300)
123+
metadata, err = txn.AddToPartition(
124+
ctx, cspann.RootKey, vector.T{4, 3}, primaryKey300, valueBytes300)
122125
require.NoError(t, err)
123126
CheckPartitionMetadata(t, metadata, cspann.LeafLevel, vector.T{0, 0}, 3)
124127

125128
// Add duplicate and expect value to be overwritten
126-
metadata, err = txn.AddToPartition(ctx, cspann.RootKey, vector.T{5, 5}, primaryKey300, valueBytes300)
129+
metadata, err = txn.AddToPartition(
130+
ctx, cspann.RootKey, vector.T{5, 5}, primaryKey300, valueBytes300)
127131
require.NoError(t, err)
128132
CheckPartitionMetadata(t, metadata, cspann.LeafLevel, vector.T{0, 0}, 3)
129133

@@ -153,7 +157,7 @@ func StoreTests(
153157

154158
var root *cspann.Partition
155159
t.Run("get root partition", func(t *testing.T) {
156-
txn := BeginTransaction(ctx, t, &workspace, store)
160+
txn := BeginTransaction(ctx, t, store)
157161
defer CommitTransaction(ctx, t, store, txn)
158162

159163
// Get root partition.
@@ -184,7 +188,7 @@ func StoreTests(
184188
})
185189

186190
t.Run("replace root partition", func(t *testing.T) {
187-
txn := BeginTransaction(ctx, t, &workspace, store)
191+
txn := BeginTransaction(ctx, t, store)
188192
defer CommitTransaction(ctx, t, store, txn)
189193

190194
// Replace root partition.
@@ -220,7 +224,7 @@ func StoreTests(
220224

221225
var partitionKey1 cspann.PartitionKey
222226
t.Run("insert another partition and update it", func(t *testing.T) {
223-
txn := BeginTransaction(ctx, t, &workspace, store)
227+
txn := BeginTransaction(ctx, t, store)
224228
defer CommitTransaction(ctx, t, store, txn)
225229

226230
_, err := txn.GetPartition(ctx, cspann.RootKey)
@@ -263,7 +267,7 @@ func StoreTests(
263267
})
264268

265269
t.Run("search multiple partitions at leaf level", func(t *testing.T) {
266-
txn := BeginTransaction(ctx, t, &workspace, store)
270+
txn := BeginTransaction(ctx, t, store)
267271
defer CommitTransaction(ctx, t, store, txn)
268272

269273
_, err := txn.GetPartition(ctx, cspann.RootKey)
@@ -307,23 +311,21 @@ func CheckPartitionMetadata(
307311
}
308312

309313
// BeginTransaction starts a new transaction for the given store and returns it.
310-
func BeginTransaction(
311-
ctx context.Context, t *testing.T, w *veclib.Workspace, store cspann.Store,
312-
) cspann.Txn {
313-
txn, err := store.Begin(ctx, w)
314+
func BeginTransaction(ctx context.Context, t *testing.T, store cspann.Store) cspann.Txn {
315+
txn, err := store.BeginTransaction(ctx)
314316
require.NoError(t, err)
315317
return txn
316318
}
317319

318320
// CommitTransaction commits a transaction that was started by BeginTransaction.
319321
func CommitTransaction(ctx context.Context, t *testing.T, store cspann.Store, txn cspann.Txn) {
320-
err := store.Commit(ctx, txn)
322+
err := store.CommitTransaction(ctx, txn)
321323
require.NoError(t, err)
322324
}
323325

324326
// AbortTransaction aborts a transaction that was started by BeginTransaction.
325327
func AbortTransaction(ctx context.Context, t *testing.T, store cspann.Store, txn cspann.Txn) {
326-
err := store.Abort(ctx, txn)
328+
err := store.AbortTransaction(ctx, txn)
327329
require.NoError(t, err)
328330
}
329331

0 commit comments

Comments
 (0)