Skip to content

Commit 82969e9

Browse files
committed
sql: add SST writer to distributed index backfill merge pipeline
This change continues the integration of the distributed merge pipeline into the index backfill flow. When enabled, the backfiller writes its KV output batches to SSTs (backed by ExternalStorage) via a new SST sink. - Processor: Updated rowexec backfiller to route KV batches through the SST sink. - Job progress: Persist SST manifest metadata in job progress via IndexBackfillMapProgress. Informs: #158378 Epic: CRDB-48845 Release note: none
1 parent a95d1dd commit 82969e9

File tree

18 files changed

+573
-27
lines changed

18 files changed

+573
-27
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,19 @@ message BackfillProgress {
843843
// backfilled into the destination indexes. Note that this will never contain
844844
// tenant prefixes even if the data corresponds to a secondary tenant.
845845
repeated roachpb.Span completed_spans = 5 [(gogoproto.nullable) = false];
846+
847+
// SSTManifests contains metadata about SST files produced while backfilling
848+
// via the distributed merge pipeline. Each entry corresponds to a flushed SST.
849+
repeated IndexBackfillSSTManifest sst_manifests = 6 [(gogoproto.nullable) = false, (gogoproto.customname) = "SSTManifests"];
850+
}
851+
852+
// IndexBackfillSSTManifest describes an SST produced by the backfill map stage.
853+
message IndexBackfillSSTManifest {
854+
string uri = 1 [(gogoproto.customname) = "URI"];
855+
roachpb.Span span = 2;
856+
uint64 file_size = 3;
857+
bytes row_sample = 4 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
858+
util.hlc.Timestamp write_timestamp = 5;
846859
}
847860

848861
// MergeProgress is used to track index merge progress in the declarative

pkg/sql/backfill.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,7 @@ func (sc *SchemaChanger) distIndexBackfill(
10441044
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
10451045
chunkSize := sc.getChunkSize(indexBatchSize)
10461046
spec := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes, 0)
1047-
if err := maybeEnableDistributedMergeIndexBackfill(ctx, sc.execCfg.Settings, &spec); err != nil {
1047+
if err := maybeEnableDistributedMergeIndexBackfill(ctx, sc.execCfg.Settings, sc.execCfg.NodeInfo.NodeID.SQLInstanceID(), &spec); err != nil {
10481048
return err
10491049
}
10501050
p, err = sc.distSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, todoSpans)

pkg/sql/bulksst/sst_file_allocator.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,11 @@ func (f *fileAllocatorBase) addFile(
5353
uri string, span roachpb.Span, rowSample roachpb.Key, fileSize uint64,
5454
) {
5555
f.fileInfo.SST = append(f.fileInfo.SST, &SSTFileInfo{
56-
URI: uri,
57-
StartKey: span.Key,
58-
EndKey: span.EndKey,
59-
FileSize: fileSize,
56+
URI: uri,
57+
StartKey: span.Key,
58+
EndKey: span.EndKey,
59+
FileSize: fileSize,
60+
RowSample: rowSample,
6061
})
6162
f.fileInfo.TotalSize += fileSize
6263
f.recordRowSample(rowSample)

pkg/sql/bulksst/sst_info.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ message SSTFileInfo {
1414
required bytes start_key = 2 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
1515
required bytes end_key = 3 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
1616
required uint64 file_size = 4 [(gogoproto.nullable) = false];
17+
optional bytes row_sample = 5 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
1718
}
1819

1920
message SSTFiles {

pkg/sql/bulksst/sst_writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var _ kvserverbase.BulkAdder = &Writer{}
3838

3939
var BatchSize = settings.RegisterByteSizeSetting(settings.ApplicationLevel,
4040
"bulkio.sst_writer.batch_size",
41-
"Writer in memory batch size",
41+
"bytes to buffer in-memory per SST before the bulk writer flushes",
4242
128*1024*1024)
4343

4444
// NewUnsortedSSTBatcher creates a new SST batcher, a file allocator must be

pkg/sql/distsql_plan_backfill.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ package sql
77

88
import (
99
"context"
10+
"fmt"
1011
"time"
1112
"unsafe"
1213

14+
"github.com/cockroachdb/cockroach/pkg/base"
1315
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1416
"github.com/cockroachdb/cockroach/pkg/roachpb"
1517
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -92,7 +94,10 @@ var distributedMergeIndexBackfillEnabled = settings.RegisterBoolSetting(
9294
)
9395

9496
func maybeEnableDistributedMergeIndexBackfill(
95-
ctx context.Context, st *cluster.Settings, spec *execinfrapb.BackfillerSpec,
97+
ctx context.Context,
98+
st *cluster.Settings,
99+
nodeID base.SQLInstanceID,
100+
spec *execinfrapb.BackfillerSpec,
96101
) error {
97102
if !distributedMergeIndexBackfillEnabled.Get(&st.SV) {
98103
return nil
@@ -101,6 +106,7 @@ func maybeEnableDistributedMergeIndexBackfill(
101106
return pgerror.New(pgcode.FeatureNotSupported, "distributed merge requires cluster version 26.1")
102107
}
103108
spec.UseDistributedMergeSink = true
109+
spec.DistributedMergeFilePrefix = fmt.Sprintf("nodelocal://%d/index-backfill", nodeID)
104110
return nil
105111
}
106112

pkg/sql/execinfrapb/processors_bulk_io.proto

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,11 @@ message BackfillerSpec {
8787

8888
optional bool use_distributed_merge_sink = 17 [(gogoproto.nullable) = false];
8989

90-
// NEXTID: 18.
90+
// distributed_merge_file_prefix is the ExternalStorage URI prefix to use when
91+
// writing map outputs for the distributed merge pipeline.
92+
optional string distributed_merge_file_prefix = 18 [(gogoproto.nullable) = false];
93+
94+
// NEXTID: 19.
9195
}
9296

9397
// JobProgress identifies the job to report progress on. This reporting
@@ -609,3 +613,9 @@ message BulkMergeSpec {
609613

610614
// NEXT ID: 4.
611615
}
616+
617+
// IndexBackfillMapProgress is emitted by map-stage processors to describe newly
618+
// produced SST manifests.
619+
message IndexBackfillMapProgress {
620+
repeated jobs.jobspb.IndexBackfillSSTManifest sst_manifests = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "SSTManifests"];
621+
}

pkg/sql/index_backfiller.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/jobs"
12+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1213
"github.com/cockroachdb/cockroach/pkg/kv"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -25,6 +26,7 @@ import (
2526
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2627
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2728
"github.com/cockroachdb/errors"
29+
gogotypes "github.com/gogo/protobuf/types"
2830
)
2931

3032
// IndexBackfillPlanner holds dependencies for an index backfiller
@@ -33,6 +35,39 @@ type IndexBackfillPlanner struct {
3335
execCfg *ExecutorConfig
3436
}
3537

38+
type sstManifestBuffer struct {
39+
syncutil.Mutex
40+
manifests []jobspb.IndexBackfillSSTManifest
41+
}
42+
43+
func newSSTManifestBuffer(initial []jobspb.IndexBackfillSSTManifest) *sstManifestBuffer {
44+
buf := &sstManifestBuffer{}
45+
buf.manifests = append(buf.manifests, initial...)
46+
return buf
47+
}
48+
49+
func (b *sstManifestBuffer) snapshotLocked() []jobspb.IndexBackfillSSTManifest {
50+
return append([]jobspb.IndexBackfillSSTManifest(nil), b.manifests...)
51+
}
52+
53+
func (b *sstManifestBuffer) Snapshot() []jobspb.IndexBackfillSSTManifest {
54+
b.Lock()
55+
defer b.Unlock()
56+
return b.snapshotLocked()
57+
}
58+
59+
func (b *sstManifestBuffer) Append(
60+
newManifests []jobspb.IndexBackfillSSTManifest,
61+
) []jobspb.IndexBackfillSSTManifest {
62+
if len(newManifests) == 0 {
63+
return b.Snapshot()
64+
}
65+
b.Lock()
66+
defer b.Unlock()
67+
b.manifests = append(b.manifests, newManifests...)
68+
return b.snapshotLocked()
69+
}
70+
3671
// NewIndexBackfiller creates a new IndexBackfillPlanner.
3772
func NewIndexBackfiller(execCfg *ExecutorConfig) *IndexBackfillPlanner {
3873
return &IndexBackfillPlanner{execCfg: execCfg}
@@ -81,13 +116,25 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
81116
}
82117
// Add spans that were already completed before the job resumed.
83118
addCompleted(progress.CompletedSpans...)
119+
sstManifestBuf := newSSTManifestBuffer(progress.SSTManifests)
120+
progress.SSTManifests = sstManifestBuf.Snapshot()
121+
updateSSTManifests := func(newManifests []jobspb.IndexBackfillSSTManifest) {
122+
progress.SSTManifests = sstManifestBuf.Append(newManifests)
123+
}
84124
updateFunc := func(
85125
ctx context.Context, meta *execinfrapb.ProducerMetadata,
86126
) error {
87127
if meta.BulkProcessorProgress == nil {
88128
return nil
89129
}
90130
progress.CompletedSpans = addCompleted(meta.BulkProcessorProgress.CompletedSpans...)
131+
var mapProgress execinfrapb.IndexBackfillMapProgress
132+
if gogotypes.Is(&meta.BulkProcessorProgress.ProgressDetails, &mapProgress) {
133+
if err := gogotypes.UnmarshalAny(&meta.BulkProcessorProgress.ProgressDetails, &mapProgress); err != nil {
134+
return err
135+
}
136+
updateSSTManifests(mapProgress.SSTManifests)
137+
}
91138
// Make sure the progress update does not contain overlapping spans.
92139
// This is a sanity check that only runs in test configurations, since it
93140
// is an expensive n^2 check.
@@ -211,7 +258,7 @@ func (ib *IndexBackfillPlanner) plan(
211258
*td.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize,
212259
indexesToBackfill, sourceIndexID,
213260
)
214-
if err := maybeEnableDistributedMergeIndexBackfill(ctx, ib.execCfg.Settings, &spec); err != nil {
261+
if err := maybeEnableDistributedMergeIndexBackfill(ctx, ib.execCfg.Settings, ib.execCfg.NodeInfo.NodeID.SQLInstanceID(), &spec); err != nil {
215262
return err
216263
}
217264
var err error

pkg/sql/logictest/testdata/logic_test/create_index

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -714,11 +714,18 @@ subtest distributed_merge_index_backfill_placeholder
714714
statement ok
715715
CREATE TABLE dist_merge_idx (a INT PRIMARY KEY, b INT)
716716

717+
statement ok
718+
INSERT INTO dist_merge_idx VALUES (1,1), (2,2), (3,3)
719+
717720
statement ok
718721
SET CLUSTER SETTING bulkio.index_backfill.distributed_merge.enabled = true
719722

723+
# TODO(158378): The end-to-end flow for create index using distributed
724+
# merge isn't implemented yet, so we get to the validation step and notice
725+
# we didn't ingest any rows. This is expected until the entire flow is
726+
# implemented.
720727
skipif config local-mixed-25.3 local-mixed-25.4
721-
statement error pq: .*index backfill distributed merge sink is not implemented yet.*
728+
statement error pgcode XX000 pq: .*validation of non-unique index dist_merge_idx_idx failed: expected 3 rows, found 0.*
722729
CREATE INDEX dist_merge_idx_idx ON dist_merge_idx (b)
723730

724731
onlyif config local-mixed-25.3 local-mixed-25.4

pkg/sql/rowexec/BUILD.bazel

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"hashgroupjoiner.go",
1414
"hashjoiner.go",
1515
"indexbackfiller.go",
16+
"indexbackfiller_sst_sink.go",
1617
"inverted_expr_evaluator.go",
1718
"inverted_filterer.go",
1819
"inverted_joiner.go",
@@ -42,6 +43,7 @@ go_library(
4243
importpath = "github.com/cockroachdb/cockroach/pkg/sql/rowexec",
4344
visibility = ["//visibility:public"],
4445
deps = [
46+
"//pkg/cloud",
4547
"//pkg/jobs",
4648
"//pkg/jobs/jobspb",
4749
"//pkg/keys",
@@ -51,9 +53,11 @@ go_library(
5153
"//pkg/kv/kvpb",
5254
"//pkg/kv/kvserver/kvserverbase",
5355
"//pkg/roachpb",
56+
"//pkg/security/username",
5457
"//pkg/server/telemetry",
5558
"//pkg/settings",
5659
"//pkg/sql/backfill",
60+
"//pkg/sql/bulksst",
5761
"//pkg/sql/catalog",
5862
"//pkg/sql/catalog/catenumpb",
5963
"//pkg/sql/catalog/colinfo",
@@ -102,7 +106,6 @@ go_library(
102106
"//pkg/util/collatedstring",
103107
"//pkg/util/ctxgroup",
104108
"//pkg/util/encoding",
105-
"//pkg/util/errorutil/unimplemented",
106109
"//pkg/util/hlc",
107110
"//pkg/util/intsets",
108111
"//pkg/util/log",
@@ -123,6 +126,7 @@ go_library(
123126
"@com_github_cockroachdb_errors//:errors",
124127
"@com_github_cockroachdb_logtags//:logtags",
125128
"@com_github_cockroachdb_redact//:redact",
129+
"@com_github_gogo_protobuf//types",
126130
],
127131
)
128132

@@ -135,6 +139,7 @@ go_test(
135139
"distinct_test.go",
136140
"filterer_test.go",
137141
"hashjoiner_test.go",
142+
"indexbackfiller_sst_sink_test.go",
138143
"indexbackfiller_test.go",
139144
"inverted_expr_evaluator_test.go",
140145
"inverted_filterer_test.go",
@@ -166,6 +171,9 @@ go_test(
166171
}),
167172
deps = [
168173
"//pkg/base",
174+
"//pkg/cloud",
175+
"//pkg/cloud/cloudpb",
176+
"//pkg/cloud/nodelocal",
169177
"//pkg/jobs",
170178
"//pkg/jobs/jobspb",
171179
"//pkg/keys",
@@ -184,6 +192,7 @@ go_test(
184192
"//pkg/settings/cluster",
185193
"//pkg/sql",
186194
"//pkg/sql/backfill",
195+
"//pkg/sql/bulksst",
187196
"//pkg/sql/catalog",
188197
"//pkg/sql/catalog/catalogkeys",
189198
"//pkg/sql/catalog/catenumpb",
@@ -237,6 +246,8 @@ go_test(
237246
"//pkg/util/tracing/tracingpb",
238247
"@com_github_axiomhq_hyperloglog//:hyperloglog",
239248
"@com_github_cockroachdb_errors//:errors",
249+
"@com_github_cockroachdb_pebble//objstorage",
250+
"@com_github_cockroachdb_pebble//objstorage/objstorageprovider",
240251
"@com_github_gogo_protobuf//types",
241252
"@com_github_jackc_pgx_v5//:pgx",
242253
"@com_github_stretchr_testify//require",

0 commit comments

Comments
 (0)