99 "context"
1010
1111 "github.com/cockroachdb/cockroach/pkg/jobs"
12+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1213 "github.com/cockroachdb/cockroach/pkg/kv"
1314 "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1415 "github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -25,6 +26,7 @@ import (
2526 "github.com/cockroachdb/cockroach/pkg/util/hlc"
2627 "github.com/cockroachdb/cockroach/pkg/util/syncutil"
2728 "github.com/cockroachdb/errors"
29+ gogotypes "github.com/gogo/protobuf/types"
2830)
2931
3032// IndexBackfillPlanner holds dependencies for an index backfiller
@@ -33,6 +35,39 @@ type IndexBackfillPlanner struct {
3335 execCfg * ExecutorConfig
3436}
3537
38+ type sstManifestBuffer struct {
39+ syncutil.Mutex
40+ manifests []jobspb.IndexBackfillSSTManifest
41+ }
42+
43+ func newSSTManifestBuffer (initial []jobspb.IndexBackfillSSTManifest ) * sstManifestBuffer {
44+ buf := & sstManifestBuffer {}
45+ buf .manifests = append (buf .manifests , initial ... )
46+ return buf
47+ }
48+
49+ func (b * sstManifestBuffer ) snapshotLocked () []jobspb.IndexBackfillSSTManifest {
50+ return append ([]jobspb.IndexBackfillSSTManifest (nil ), b .manifests ... )
51+ }
52+
53+ func (b * sstManifestBuffer ) Snapshot () []jobspb.IndexBackfillSSTManifest {
54+ b .Lock ()
55+ defer b .Unlock ()
56+ return b .snapshotLocked ()
57+ }
58+
59+ func (b * sstManifestBuffer ) Append (
60+ newManifests []jobspb.IndexBackfillSSTManifest ,
61+ ) []jobspb.IndexBackfillSSTManifest {
62+ if len (newManifests ) == 0 {
63+ return b .Snapshot ()
64+ }
65+ b .Lock ()
66+ defer b .Unlock ()
67+ b .manifests = append (b .manifests , newManifests ... )
68+ return b .snapshotLocked ()
69+ }
70+
3671// NewIndexBackfiller creates a new IndexBackfillPlanner.
3772func NewIndexBackfiller (execCfg * ExecutorConfig ) * IndexBackfillPlanner {
3873 return & IndexBackfillPlanner {execCfg : execCfg }
@@ -81,13 +116,25 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
81116 }
82117 // Add spans that were already completed before the job resumed.
83118 addCompleted (progress .CompletedSpans ... )
119+ sstManifestBuf := newSSTManifestBuffer (progress .SSTManifests )
120+ progress .SSTManifests = sstManifestBuf .Snapshot ()
121+ updateSSTManifests := func (newManifests []jobspb.IndexBackfillSSTManifest ) {
122+ progress .SSTManifests = sstManifestBuf .Append (newManifests )
123+ }
84124 updateFunc := func (
85125 ctx context.Context , meta * execinfrapb.ProducerMetadata ,
86126 ) error {
87127 if meta .BulkProcessorProgress == nil {
88128 return nil
89129 }
90130 progress .CompletedSpans = addCompleted (meta .BulkProcessorProgress .CompletedSpans ... )
131+ var mapProgress execinfrapb.IndexBackfillMapProgress
132+ if gogotypes .Is (& meta .BulkProcessorProgress .ProgressDetails , & mapProgress ) {
133+ if err := gogotypes .UnmarshalAny (& meta .BulkProcessorProgress .ProgressDetails , & mapProgress ); err != nil {
134+ return err
135+ }
136+ updateSSTManifests (mapProgress .SSTManifests )
137+ }
91138 // Make sure the progress update does not contain overlapping spans.
92139 // This is a sanity check that only runs in test configurations, since it
93140 // is an expensive n^2 check.
@@ -211,7 +258,7 @@ func (ib *IndexBackfillPlanner) plan(
211258 * td .TableDesc (), writeAsOf , writeAtRequestTimestamp , chunkSize ,
212259 indexesToBackfill , sourceIndexID ,
213260 )
214- if err := maybeEnableDistributedMergeIndexBackfill (ctx , ib .execCfg .Settings , & spec ); err != nil {
261+ if err := maybeEnableDistributedMergeIndexBackfill (ctx , ib .execCfg .Settings , ib . execCfg . NodeInfo . NodeID . SQLInstanceID (), & spec ); err != nil {
215262 return err
216263 }
217264 var err error
0 commit comments