Skip to content

Commit

Permalink
store metadata in the block
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Feb 4, 2025
1 parent 3ab3fb0 commit aab405e
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 40 deletions.
11 changes: 9 additions & 2 deletions pkg/experiment/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ func newBlockCompaction(
}

func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) {
w := NewBlockWriter(dst, b.path, tmpdir)
w, err := NewBlockWriter(dst, b.path, tmpdir)
if err != nil {
return nil, err
}
defer func() {
err = multierror.New(err, w.Close()).Err()
}()
Expand All @@ -198,7 +201,11 @@ func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdi
return nil, fmt.Errorf("writing tenant index: %w", err)
}
b.meta.StringTable = b.strings.Strings
if err = w.Flush(ctx); err != nil {
b.meta.MetadataOffset = w.Offset()
if err = metadata.Encode(w, b.meta); err != nil {
return nil, fmt.Errorf("writing metadata: %w", err)
}
if err = w.Upload(ctx); err != nil {
return nil, fmt.Errorf("flushing block writer: %w", err)
}
b.meta.Size = w.Offset()
Expand Down
43 changes: 43 additions & 0 deletions pkg/experiment/block/metadata/metadata.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package metadata

import (
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
"sync"
"time"

Expand All @@ -10,6 +15,8 @@ import (
"github.com/grafana/pyroscope/pkg/iter"
)

var ErrMetadataInvalid = errors.New("metadata: invalid metadata")

func Tenant(md *metastorev1.BlockMeta) string {
if md.Tenant <= 0 || int(md.Tenant) >= len(md.StringTable) {
return ""
Expand Down Expand Up @@ -148,3 +155,39 @@ func OpenStringTable(src *metastorev1.BlockMeta) *StringTable {
}
return t
}

var castagnoli = crc32.MakeTable(crc32.Castagnoli)

// Encode writes the metadata to the writer in the following format:
//
// raw | protobuf-encoded metadata
// be_uint32 | size of the raw metadata
// be_uint32 | CRC32 of the raw metadata and size
func Encode(w io.Writer, md *metastorev1.BlockMeta) error {
ww := crc32.New(castagnoli)
b, _ := md.MarshalVT()
n, err := w.Write(b)
if err != nil {
return err
}
if err = binary.Write(w, binary.BigEndian, uint32(n)); err != nil {
return err
}
return binary.Write(w, binary.BigEndian, ww.Sum32())
}

// Decode metadata encoded with Encode.
func Decode(b []byte, md *metastorev1.BlockMeta) error {
if len(b) <= 8 {
return fmt.Errorf("%w: invalid size", ErrMetadataInvalid)
}
crc := binary.BigEndian.Uint32(b[len(b)-4:])
size := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4])
if size != uint32(len(b)-8) {
return fmt.Errorf("%w: invalid size", ErrMetadataInvalid)
}
if crc32.Checksum(b[:len(b)-4], castagnoli) != crc {
return fmt.Errorf("%w: invalid CRC", ErrMetadataInvalid)
}
return md.UnmarshalVT(b[:len(b)-8])
}
2 changes: 1 addition & 1 deletion pkg/experiment/block/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (obj *Object) ReadMetadata(ctx context.Context) error {
return fmt.Errorf("reading block metadata %s: %w", obj.path, err)
}
var meta metastorev1.BlockMeta
if err := meta.UnmarshalVT(buf.B); err != nil {
if err := metadata.Decode(buf.B, &meta); err != nil {
return fmt.Errorf("decoding block metadata %s: %w", obj.path, err)
}
obj.meta = &meta
Expand Down
83 changes: 46 additions & 37 deletions pkg/experiment/block/writer.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,92 @@
package block

import (
"bufio"
"context"
"io"
"os"
"path/filepath"
"strconv"

"github.com/grafana/dskit/multierror"

"github.com/grafana/pyroscope/pkg/objstore"
"github.com/grafana/pyroscope/pkg/util/bufferpool"
)

// TODO(kolesnikovae):
// - Avoid staging files where possible.
// - If stage files are required, at least avoid
// recreating them for each tenant dataset.
// - objstore.Bucket should provide object writer.
// * Get rid of the staging files.
// * Pipe upload reader.

type Writer struct {
storage objstore.Bucket
path string
local string
off uint64
w *os.File
w *bufio.Writer
f *os.File

tmp string
n int
cur string

// Used by CopyBuffer when copying
// data from staging files.
buf *bufferpool.Buffer
}

func NewBlockWriter(storage objstore.Bucket, path string, tmp string) *Writer {
b := &Writer{
func NewBlockWriter(storage objstore.Bucket, path string, tmp string) (*Writer, error) {
w := &Writer{
storage: storage,
path: path,
tmp: tmp,
local: filepath.Join(tmp, FileNameDataObject),
buf: bufferpool.GetBuffer(compactionCopyBufferSize),
}
return b
if err := w.open(); err != nil {
return nil, err
}
return w, nil
}

func (b *Writer) open() (err error) {
if b.f, err = os.Create(b.local); err != nil {
return err
}
b.w = bufio.NewWriter(b.f)
return nil
}

func (b *Writer) Close() error {
var merr multierror.MultiError
if b.w != nil {
merr.Add(b.w.Flush())
b.w = nil
}
if b.buf != nil {
bufferpool.Put(b.buf)
b.buf = nil
}
if b.f != nil {
merr.Add(b.f.Close())
b.f = nil
}
return merr.Err()
}

func (b *Writer) Offset() uint64 { return b.off }

// Dir returns path to the new temp directory.
func (b *Writer) Dir() string {
b.n++
b.cur = filepath.Join(b.tmp, strconv.Itoa(b.n))
return b.cur
}

// ReadFromFiles located in the directory Dir.
func (b *Writer) ReadFromFiles(files ...string) (toc []uint64, err error) {
toc = make([]uint64, len(files))
for i := range files {
toc[i] = b.off
if err = b.ReadFromFile(files[i]); err != nil {
break
}
}
return toc, err
}
func (b *Writer) Write(p []byte) (n int, err error) { return b.w.Write(p) }

// ReadFromFile located in the directory Dir.
func (b *Writer) ReadFromFile(file string) (err error) {
if b.w == nil {
if b.w, err = os.Create(b.local); err != nil {
return err
}
}
f, err := os.Open(filepath.Join(b.cur, file))
if err != nil {
return err
Expand All @@ -86,10 +105,8 @@ func (b *Writer) ReadFrom(r io.Reader) (n int64, err error) {
return n, err
}

func (b *Writer) Offset() uint64 { return b.off }

func (b *Writer) Flush(ctx context.Context) error {
if err := b.w.Close(); err != nil {
func (b *Writer) Upload(ctx context.Context) error {
if err := b.Close(); err != nil {
return err
}
b.w = nil
Expand All @@ -100,13 +117,5 @@ func (b *Writer) Flush(ctx context.Context) error {
defer func() {
_ = f.Close()
}()
return b.storage.Upload(ctx, b.path, f)
}

func (b *Writer) Close() error {
bufferpool.Put(b.buf)
if b.w != nil {
return b.w.Close()
}
return nil
return b.storage.Upload(ctx, b.path, bufio.NewReader(f))
}
4 changes: 4 additions & 0 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ func (s *segment) flushBlock(stream flushStream) ([]byte, *metastorev1.BlockMeta
}

meta.StringTable = stringTable.Strings
meta.MetadataOffset = uint64(w.offset)
if err := metadata.Encode(w, meta); err != nil {
return nil, nil, fmt.Errorf("failed to encode metadata: %w", err)
}
meta.Size = uint64(w.offset)
s.debuginfo.flushBlockDuration = time.Since(start)
return blockFile.Bytes(), meta, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/experiment/metastore/index/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func cloneBlockMetadataForQuery(b *metastorev1.BlockMeta) *metastorev1.BlockMeta

func cloneDatasetMetadataForQuery(ds *metastorev1.Dataset) *metastorev1.Dataset {
ls := ds.Labels
// TODO: Preserve __labels__
ds.Labels = nil
c := ds.CloneVT()
ds.Labels = ls
Expand Down

0 comments on commit aab405e

Please sign in to comment.