From aab405ef2691492c5cbb637add8b891fc25bcd0a Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Tue, 4 Feb 2025 19:09:04 +0800 Subject: [PATCH] store metadata in the block --- pkg/experiment/block/compaction.go | 11 ++- pkg/experiment/block/metadata/metadata.go | 43 ++++++++++++ pkg/experiment/block/object.go | 2 +- pkg/experiment/block/writer.go | 83 +++++++++++++---------- pkg/experiment/ingester/segment.go | 4 ++ pkg/experiment/metastore/index/query.go | 1 + 6 files changed, 104 insertions(+), 40 deletions(-) diff --git a/pkg/experiment/block/compaction.go b/pkg/experiment/block/compaction.go index 2b50e48601..aa2c409119 100644 --- a/pkg/experiment/block/compaction.go +++ b/pkg/experiment/block/compaction.go @@ -182,7 +182,10 @@ func newBlockCompaction( } func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) { - w := NewBlockWriter(dst, b.path, tmpdir) + w, err := NewBlockWriter(dst, b.path, tmpdir) + if err != nil { + return nil, err + } defer func() { err = multierror.New(err, w.Close()).Err() }() @@ -198,7 +201,11 @@ func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdi return nil, fmt.Errorf("writing tenant index: %w", err) } b.meta.StringTable = b.strings.Strings - if err = w.Flush(ctx); err != nil { + b.meta.MetadataOffset = w.Offset() + if err = metadata.Encode(w, b.meta); err != nil { + return nil, fmt.Errorf("writing metadata: %w", err) + } + if err = w.Upload(ctx); err != nil { return nil, fmt.Errorf("flushing block writer: %w", err) } b.meta.Size = w.Offset() diff --git a/pkg/experiment/block/metadata/metadata.go b/pkg/experiment/block/metadata/metadata.go index ba423f8a96..d53d0ad2d0 100644 --- a/pkg/experiment/block/metadata/metadata.go +++ b/pkg/experiment/block/metadata/metadata.go @@ -1,6 +1,11 @@ package metadata import ( + "encoding/binary" + "errors" + "fmt" + "hash/crc32" + "io" "sync" "time" @@ -10,6 +15,8 @@ import ( "github.com/grafana/pyroscope/pkg/iter" ) +var ErrMetadataInvalid = errors.New("metadata: invalid metadata") + func Tenant(md *metastorev1.BlockMeta) string { if md.Tenant <= 0 || int(md.Tenant) >= len(md.StringTable) { return "" @@ -148,3 +155,39 @@ func OpenStringTable(src *metastorev1.BlockMeta) *StringTable { } return t } + +var castagnoli = crc32.MakeTable(crc32.Castagnoli) + +// Encode writes the metadata to the writer in the following format: +// +// raw | protobuf-encoded metadata +// be_uint32 | size of the raw metadata +// be_uint32 | CRC32 of the raw metadata and size +func Encode(w io.Writer, md *metastorev1.BlockMeta) error { + ww := crc32.New(castagnoli) + b, _ := md.MarshalVT() + n, err := w.Write(b) + if err != nil { + return err + } + if err = binary.Write(w, binary.BigEndian, uint32(n)); err != nil { + return err + } + return binary.Write(w, binary.BigEndian, ww.Sum32()) +} + +// Decode metadata encoded with Encode. +func Decode(b []byte, md *metastorev1.BlockMeta) error { + if len(b) <= 8 { + return fmt.Errorf("%w: invalid size", ErrMetadataInvalid) + } + crc := binary.BigEndian.Uint32(b[len(b)-4:]) + size := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4]) + if size != uint32(len(b)-8) { + return fmt.Errorf("%w: invalid size", ErrMetadataInvalid) + } + if crc32.Checksum(b[:len(b)-4], castagnoli) != crc { + return fmt.Errorf("%w: invalid CRC", ErrMetadataInvalid) + } + return md.UnmarshalVT(b[:len(b)-8]) +} diff --git a/pkg/experiment/block/object.go b/pkg/experiment/block/object.go index c693e21572..ab246ba50a 100644 --- a/pkg/experiment/block/object.go +++ b/pkg/experiment/block/object.go @@ -213,7 +213,7 @@ func (obj *Object) ReadMetadata(ctx context.Context) error { return fmt.Errorf("reading block metadata %s: %w", obj.path, err) } var meta metastorev1.BlockMeta - if err := meta.UnmarshalVT(buf.B); err != nil { + if err := metadata.Decode(buf.B, &meta); err != nil { return fmt.Errorf("decoding block metadata %s: %w", obj.path, err) } obj.meta = &meta diff --git a/pkg/experiment/block/writer.go b/pkg/experiment/block/writer.go index 8792a43d64..03eccdedb2 100644 --- a/pkg/experiment/block/writer.go +++ b/pkg/experiment/block/writer.go @@ -1,47 +1,81 @@ package block import ( + "bufio" "context" "io" "os" "path/filepath" "strconv" + "github.com/grafana/dskit/multierror" + "github.com/grafana/pyroscope/pkg/objstore" "github.com/grafana/pyroscope/pkg/util/bufferpool" ) // TODO(kolesnikovae): -// - Avoid staging files where possible. -// - If stage files are required, at least avoid -// recreating them for each tenant dataset. -// - objstore.Bucket should provide object writer. +// * Get rid of the staging files. +// * Pipe upload reader. type Writer struct { storage objstore.Bucket path string local string off uint64 - w *os.File + w *bufio.Writer + f *os.File tmp string n int cur string + // Used by CopyBuffer when copying + // data from staging files. buf *bufferpool.Buffer } -func NewBlockWriter(storage objstore.Bucket, path string, tmp string) *Writer { - b := &Writer{ +func NewBlockWriter(storage objstore.Bucket, path string, tmp string) (*Writer, error) { + w := &Writer{ storage: storage, path: path, tmp: tmp, local: filepath.Join(tmp, FileNameDataObject), buf: bufferpool.GetBuffer(compactionCopyBufferSize), } - return b + if err := w.open(); err != nil { + return nil, err + } + return w, nil +} + +func (b *Writer) open() (err error) { + if b.f, err = os.Create(b.local); err != nil { + return err + } + b.w = bufio.NewWriter(b.f) + return nil +} + +func (b *Writer) Close() error { + var merr multierror.MultiError + if b.w != nil { + merr.Add(b.w.Flush()) + b.w = nil + } + if b.buf != nil { + bufferpool.Put(b.buf) + b.buf = nil + } + if b.f != nil { + merr.Add(b.f.Close()) + b.f = nil + } + return merr.Err() } +func (b *Writer) Offset() uint64 { return b.off } + // Dir returns path to the new temp directory. func (b *Writer) Dir() string { b.n++ @@ -49,25 +83,10 @@ func (b *Writer) Dir() string { return b.cur } -// ReadFromFiles located in the directory Dir. -func (b *Writer) ReadFromFiles(files ...string) (toc []uint64, err error) { - toc = make([]uint64, len(files)) - for i := range files { - toc[i] = b.off - if err = b.ReadFromFile(files[i]); err != nil { - break - } - } - return toc, err -} +func (b *Writer) Write(p []byte) (n int, err error) { return b.w.Write(p) } // ReadFromFile located in the directory Dir. func (b *Writer) ReadFromFile(file string) (err error) { - if b.w == nil { - if b.w, err = os.Create(b.local); err != nil { - return err - } - } f, err := os.Open(filepath.Join(b.cur, file)) if err != nil { return err @@ -86,10 +105,8 @@ func (b *Writer) ReadFrom(r io.Reader) (n int64, err error) { return n, err } -func (b *Writer) Offset() uint64 { return b.off } - -func (b *Writer) Flush(ctx context.Context) error { - if err := b.w.Close(); err != nil { +func (b *Writer) Upload(ctx context.Context) error { + if err := b.Close(); err != nil { return err } b.w = nil @@ -100,13 +117,5 @@ func (b *Writer) Flush(ctx context.Context) error { defer func() { _ = f.Close() }() - return b.storage.Upload(ctx, b.path, f) -} - -func (b *Writer) Close() error { - bufferpool.Put(b.buf) - if b.w != nil { - return b.w.Close() - } - return nil + return b.storage.Upload(ctx, b.path, bufio.NewReader(f)) } diff --git a/pkg/experiment/ingester/segment.go b/pkg/experiment/ingester/segment.go index c372987489..a8b8deaf1a 100644 --- a/pkg/experiment/ingester/segment.go +++ b/pkg/experiment/ingester/segment.go @@ -281,6 +281,10 @@ func (s *segment) flushBlock(stream flushStream) ([]byte, *metastorev1.BlockMeta } meta.StringTable = stringTable.Strings + meta.MetadataOffset = uint64(w.offset) + if err := metadata.Encode(w, meta); err != nil { + return nil, nil, fmt.Errorf("failed to encode metadata: %w", err) + } meta.Size = uint64(w.offset) s.debuginfo.flushBlockDuration = time.Since(start) return blockFile.Bytes(), meta, nil diff --git a/pkg/experiment/metastore/index/query.go b/pkg/experiment/metastore/index/query.go index 1d553eb23c..685363d19f 100644 --- a/pkg/experiment/metastore/index/query.go +++ b/pkg/experiment/metastore/index/query.go @@ -182,6 +182,7 @@ func cloneBlockMetadataForQuery(b *metastorev1.BlockMeta) *metastorev1.BlockMeta func cloneDatasetMetadataForQuery(ds *metastorev1.Dataset) *metastorev1.Dataset { ls := ds.Labels + // TODO: Preserve __labels__ ds.Labels = nil c := ds.CloneVT() ds.Labels = ls