Skip to content

Commit 59e7d6c

Browse files
authored
Extend bucket index to add parquet fields to block (#6721)
1 parent 9f70dd9 commit 59e7d6c

File tree

6 files changed

+292
-7
lines changed

6 files changed

+292
-7
lines changed

pkg/storage/parquet/converter_marker.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Instr
3636

3737
return &ConverterMark{}, err
3838
}
39-
defer runutil.CloseWithLogOnErr(logger, reader, "close bucket index reader")
39+
defer runutil.CloseWithLogOnErr(logger, reader, "close parquet converter marker file reader")
4040

4141
metaContent, err := io.ReadAll(reader)
4242
if err != nil {
@@ -59,3 +59,8 @@ func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Buck
5959
}
6060
return userBkt.Upload(ctx, markerPath, bytes.NewReader(b))
6161
}
62+
63+
// ConverterMarkMeta is used in Bucket Index. It might not be the same as ConverterMark.
64+
type ConverterMarkMeta struct {
65+
Version int `json:"version"`
66+
}

pkg/storage/tsdb/bucketindex/index.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package bucketindex
33
import (
44
"fmt"
55
"path/filepath"
6+
"slices"
67
"strings"
78
"time"
89

@@ -11,6 +12,7 @@ import (
1112
"github.com/thanos-io/thanos/pkg/block"
1213
"github.com/thanos-io/thanos/pkg/block/metadata"
1314

15+
"github.com/cortexproject/cortex/pkg/storage/parquet"
1416
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
1517
"github.com/cortexproject/cortex/pkg/util"
1618
)
@@ -51,14 +53,14 @@ func (idx *Index) GetUpdatedAt() time.Time {
5153
func (idx *Index) RemoveBlock(id ulid.ULID) {
5254
for i := 0; i < len(idx.Blocks); i++ {
5355
if idx.Blocks[i].ID == id {
54-
idx.Blocks = append(idx.Blocks[:i], idx.Blocks[i+1:]...)
56+
idx.Blocks = slices.Delete(idx.Blocks, i, i+1)
5557
break
5658
}
5759
}
5860

5961
for i := 0; i < len(idx.BlockDeletionMarks); i++ {
6062
if idx.BlockDeletionMarks[i].ID == id {
61-
idx.BlockDeletionMarks = append(idx.BlockDeletionMarks[:i], idx.BlockDeletionMarks[i+1:]...)
63+
idx.BlockDeletionMarks = slices.Delete(idx.BlockDeletionMarks, i, i+1)
6264
break
6365
}
6466
}
@@ -91,6 +93,9 @@ type Block struct {
9193
// UploadedAt is a unix timestamp (seconds precision) of when the block has been completed to be uploaded
9294
// to the storage.
9395
UploadedAt int64 `json:"uploaded_at"`
96+
97+
// Parquet metadata if exists. If doesn't exist it will be nil.
98+
Parquet *parquet.ConverterMarkMeta `json:"parquet,omitempty"`
9499
}
95100

96101
// Within returns whether the block contains samples within the provided range.

pkg/storage/tsdb/bucketindex/markers.go

+18
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,24 @@ func IsBlockNoCompactMarkFilename(name string) (ulid.ULID, bool) {
8282
return id, err == nil
8383
}
8484

85+
// IsBlockParquetConverterMarkFilename returns whether the input filename matches the expected pattern
86+
// of block parquet converter markers stored in the markers location.
87+
func IsBlockParquetConverterMarkFilename(name string) (ulid.ULID, bool) {
88+
parts := strings.SplitN(name, "-", 2)
89+
if len(parts) != 2 {
90+
return ulid.ULID{}, false
91+
}
92+
93+
// Ensure the 2nd part matches the parquet converter mark filename.
94+
if parts[1] != parquet.ConverterMarkerFileName {
95+
return ulid.ULID{}, false
96+
}
97+
98+
// Ensure the 1st part is a valid block ID.
99+
id, err := ulid.Parse(filepath.Base(parts[0]))
100+
return id, err == nil
101+
}
102+
85103
// MigrateBlockDeletionMarksToGlobalLocation list all tenant's blocks and, for each of them, look for
86104
// a deletion mark in the block location. Found deletion marks are copied to the global markers location.
87105
// The migration continues on error and returns once all blocks have been checked.

pkg/storage/tsdb/bucketindex/updater.go

+63-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/thanos-io/thanos/pkg/block"
1616
"github.com/thanos-io/thanos/pkg/block/metadata"
1717

18+
"github.com/cortexproject/cortex/pkg/storage/parquet"
1819
"github.com/cortexproject/cortex/pkg/storage/tsdb"
1920

2021
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -33,8 +34,9 @@ var (
3334

3435
// Updater is responsible to generate an update in-memory bucket index.
3536
type Updater struct {
36-
bkt objstore.InstrumentedBucket
37-
logger log.Logger
37+
bkt objstore.InstrumentedBucket
38+
logger log.Logger
39+
parquetEnabled bool
3840
}
3941

4042
func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *Updater {
@@ -44,11 +46,18 @@ func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantCon
4446
}
4547
}
4648

49+
func (w *Updater) EnableParquet() *Updater {
50+
w.parquetEnabled = true
51+
return w
52+
}
53+
4754
// UpdateIndex generates the bucket index and returns it, without storing it to the storage.
4855
// If the old index is not passed in input, then the bucket index will be generated from scratch.
4956
func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid.ULID]error, int64, error) {
50-
var oldBlocks []*Block
51-
var oldBlockDeletionMarks []*BlockDeletionMark
57+
var (
58+
oldBlocks []*Block
59+
oldBlockDeletionMarks []*BlockDeletionMark
60+
)
5261

5362
// Read the old index, if provided.
5463
if old != nil {
@@ -65,6 +74,11 @@ func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid
6574
if err != nil {
6675
return nil, nil, 0, err
6776
}
77+
if w.parquetEnabled {
78+
if err := w.updateParquetBlocks(ctx, blocks); err != nil {
79+
return nil, nil, 0, err
80+
}
81+
}
6882

6983
return &Index{
7084
Version: IndexVersion1,
@@ -180,6 +194,23 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo
180194
return block, nil
181195
}
182196

197+
func (w *Updater) updateParquetBlockIndexEntry(ctx context.Context, id ulid.ULID, block *Block) error {
198+
marker, err := parquet.ReadConverterMark(ctx, id, w.bkt, w.logger)
199+
if err != nil {
200+
return errors.Wrapf(err, "read parquet converter marker file: %v", path.Join(id.String(), parquet.ConverterMarkerFileName))
201+
}
202+
// Could be not found or access denied.
203+
// Just treat it as no parquet block available.
204+
if marker == nil || marker.Version == 0 {
205+
return nil
206+
}
207+
208+
block.Parquet = &parquet.ConverterMarkMeta{
209+
Version: marker.Version,
210+
}
211+
return nil
212+
}
213+
183214
func (w *Updater) updateBlockMarks(ctx context.Context, old []*BlockDeletionMark) ([]*BlockDeletionMark, map[ulid.ULID]struct{}, int64, error) {
184215
out := make([]*BlockDeletionMark, 0, len(old))
185216
deletedBlocks := map[ulid.ULID]struct{}{}
@@ -249,3 +280,31 @@ func (w *Updater) updateBlockDeletionMarkIndexEntry(ctx context.Context, id ulid
249280

250281
return BlockDeletionMarkFromThanosMarker(&m), nil
251282
}
283+
284+
func (w *Updater) updateParquetBlocks(ctx context.Context, blocks []*Block) error {
285+
discoveredParquetBlocks := map[ulid.ULID]struct{}{}
286+
287+
// Find all parquet markers in the storage.
288+
if err := w.bkt.Iter(ctx, parquet.ConverterMarkerPrefix+"/", func(name string) error {
289+
if blockID, ok := IsBlockParquetConverterMarkFilename(path.Base(name)); ok {
290+
discoveredParquetBlocks[blockID] = struct{}{}
291+
}
292+
293+
return nil
294+
}); err != nil {
295+
return errors.Wrap(err, "list block parquet converter marks")
296+
}
297+
298+
// Check if parquet mark has been uploaded or deleted for the block.
299+
for _, m := range blocks {
300+
if _, ok := discoveredParquetBlocks[m.ID]; ok {
301+
if err := w.updateParquetBlockIndexEntry(ctx, m.ID, m); err != nil {
302+
return err
303+
}
304+
} else if m.Parquet != nil {
305+
// Converter marker removed. Reset parquet field.
306+
m.Parquet = nil
307+
}
308+
}
309+
return nil
310+
}

pkg/storage/tsdb/bucketindex/updater_test.go

+179
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package bucketindex
33
import (
44
"bytes"
55
"context"
6+
"encoding/json"
67
"path"
78
"strings"
89
"testing"
@@ -21,6 +22,7 @@ import (
2122
"github.com/thanos-io/thanos/pkg/block/metadata"
2223

2324
"github.com/cortexproject/cortex/pkg/storage/bucket"
25+
"github.com/cortexproject/cortex/pkg/storage/parquet"
2426
"github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
2527
)
2628

@@ -301,6 +303,150 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) {
301303
}
302304
}
303305

306+
func TestUpdater_UpdateIndex_WithParquet(t *testing.T) {
307+
const userID = "user-1"
308+
309+
bkt, _ := testutil.PrepareFilesystemBucket(t)
310+
311+
ctx := context.Background()
312+
logger := log.NewNopLogger()
313+
314+
// Generate the initial index.
315+
bkt = BucketWithGlobalMarkers(bkt)
316+
block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20)
317+
block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30)
318+
block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2)
319+
// Add parquet marker to block 1.
320+
block1ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block1)
321+
322+
w := NewUpdater(bkt, userID, nil, logger).EnableParquet()
323+
returnedIdx, _, _, err := w.UpdateIndex(ctx, nil)
324+
require.NoError(t, err)
325+
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
326+
[]tsdb.BlockMeta{block1, block2},
327+
[]*metadata.DeletionMark{block2Mark}, map[string]*parquet.ConverterMarkMeta{
328+
block1.ULID.String(): {Version: block1ParquetMark.Version},
329+
})
330+
331+
// Create new blocks, and update the index.
332+
block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40)
333+
block4 := testutil.MockStorageBlock(t, bkt, userID, 40, 50)
334+
block4Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block4)
335+
336+
returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx)
337+
require.NoError(t, err)
338+
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
339+
[]tsdb.BlockMeta{block1, block2, block3, block4},
340+
[]*metadata.DeletionMark{block2Mark, block4Mark},
341+
map[string]*parquet.ConverterMarkMeta{
342+
block1.ULID.String(): {Version: block1ParquetMark.Version},
343+
})
344+
345+
// Hard delete a block and update the index.
346+
require.NoError(t, block.Delete(ctx, log.NewNopLogger(), bucket.NewUserBucketClient(userID, bkt, nil), block2.ULID))
347+
348+
returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx)
349+
require.NoError(t, err)
350+
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
351+
[]tsdb.BlockMeta{block1, block3, block4},
352+
[]*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{
353+
block1.ULID.String(): {Version: block1ParquetMark.Version},
354+
})
355+
356+
// Upload parquet marker to an old block and update index
357+
block3ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block3)
358+
returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx)
359+
require.NoError(t, err)
360+
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
361+
[]tsdb.BlockMeta{block1, block3, block4},
362+
[]*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{
363+
block1.ULID.String(): {Version: block1ParquetMark.Version},
364+
block3.ULID.String(): {Version: block3ParquetMark.Version},
365+
})
366+
}
367+
368+
func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) {
369+
const userID = "user-1"
370+
ctx := context.Background()
371+
logger := log.NewNopLogger()
372+
373+
tests := []struct {
374+
name string
375+
setupBucket func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket
376+
expectedError error
377+
expectParquet bool
378+
expectParquetMeta *parquet.ConverterMarkMeta
379+
}{
380+
{
381+
name: "should successfully read parquet marker",
382+
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
383+
parquetMark := parquet.ConverterMarkMeta{
384+
Version: 1,
385+
}
386+
data, err := json.Marshal(parquetMark)
387+
require.NoError(t, err)
388+
require.NoError(t, bkt.Upload(ctx, path.Join(userID, blockID.String(), parquet.ConverterMarkerFileName), bytes.NewReader(data)))
389+
return bkt
390+
},
391+
expectedError: nil,
392+
expectParquet: true,
393+
expectParquetMeta: &parquet.ConverterMarkMeta{Version: 1},
394+
},
395+
{
396+
name: "should handle missing parquet marker",
397+
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
398+
// Don't upload any parquet marker
399+
return bkt
400+
},
401+
expectedError: nil,
402+
expectParquet: false,
403+
},
404+
{
405+
name: "should handle access denied",
406+
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
407+
return &testutil.MockBucketFailure{
408+
Bucket: bkt,
409+
GetFailures: map[string]error{
410+
path.Join(userID, blockID.String(), parquet.ConverterMarkerFileName): testutil.ErrKeyAccessDeniedError,
411+
},
412+
}
413+
},
414+
expectedError: nil,
415+
expectParquet: false,
416+
},
417+
}
418+
419+
for _, tc := range tests {
420+
t.Run(tc.name, func(t *testing.T) {
421+
bkt, _ := testutil.PrepareFilesystemBucket(t)
422+
blockID := ulid.MustNew(1, nil)
423+
block := &Block{ID: blockID}
424+
425+
// Setup the bucket with test data
426+
bkt = tc.setupBucket(t, bkt, blockID)
427+
428+
// Create an instrumented bucket wrapper
429+
registry := prometheus.NewRegistry()
430+
ibkt := objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry), "test-bucket")
431+
w := NewUpdater(ibkt, userID, nil, logger)
432+
433+
err := w.updateParquetBlockIndexEntry(ctx, blockID, block)
434+
if tc.expectedError != nil {
435+
assert.True(t, errors.Is(err, tc.expectedError))
436+
} else {
437+
assert.NoError(t, err)
438+
}
439+
440+
if tc.expectParquet {
441+
assert.NotNil(t, block.Parquet)
442+
assert.Equal(t, tc.expectParquetMeta, block.Parquet)
443+
} else {
444+
assert.Nil(t, block.Parquet)
445+
}
446+
})
447+
}
448+
}
449+
304450
func getBlockUploadedAt(t testing.TB, bkt objstore.Bucket, userID string, blockID ulid.ULID) int64 {
305451
metaFile := path.Join(userID, blockID.String(), block.MetaFilename)
306452

@@ -338,3 +484,36 @@ func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userI
338484

339485
assert.ElementsMatch(t, expectedMarkEntries, idx.BlockDeletionMarks)
340486
}
487+
488+
func assertBucketIndexEqualWithParquet(t testing.TB, idx *Index, bkt objstore.Bucket, userID string, expectedBlocks []tsdb.BlockMeta, expectedDeletionMarks []*metadata.DeletionMark, parquetBlocks map[string]*parquet.ConverterMarkMeta) {
489+
assert.Equal(t, IndexVersion1, idx.Version)
490+
assert.InDelta(t, time.Now().Unix(), idx.UpdatedAt, 2)
491+
492+
// Build the list of expected block index entries.
493+
var expectedBlockEntries []*Block
494+
for _, b := range expectedBlocks {
495+
block := &Block{
496+
ID: b.ULID,
497+
MinTime: b.MinTime,
498+
MaxTime: b.MaxTime,
499+
UploadedAt: getBlockUploadedAt(t, bkt, userID, b.ULID),
500+
}
501+
if meta, ok := parquetBlocks[b.ULID.String()]; ok {
502+
block.Parquet = meta
503+
}
504+
expectedBlockEntries = append(expectedBlockEntries, block)
505+
}
506+
507+
assert.ElementsMatch(t, expectedBlockEntries, idx.Blocks)
508+
509+
// Build the list of expected block deletion mark index entries.
510+
var expectedMarkEntries []*BlockDeletionMark
511+
for _, m := range expectedDeletionMarks {
512+
expectedMarkEntries = append(expectedMarkEntries, &BlockDeletionMark{
513+
ID: m.ID,
514+
DeletionTime: m.DeletionTime,
515+
})
516+
}
517+
518+
assert.ElementsMatch(t, expectedMarkEntries, idx.BlockDeletionMarks)
519+
}

0 commit comments

Comments
 (0)