Skip to content

Commit 1b01c99

Browse files
committed
stream output object at compaction
1 parent 9fbf325 commit 1b01c99

File tree

6 files changed

+79
-185
lines changed

6 files changed

+79
-185
lines changed

pkg/experiment/block/block.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ const (
1919
defaultObjectSizeLoadInMemory = 1 << 20
2020
defaultTenantDatasetSizeLoadInMemory = 1 << 20
2121

22-
maxRowsPerRowGroup = 10 << 10
23-
symbolsPrefetchSize = 32 << 10
24-
compactionCopyBufferSize = 32 << 10
22+
maxRowsPerRowGroup = 10 << 10
23+
symbolsPrefetchSize = 32 << 10
24+
compactionCopyBufferSize = 32 << 10
25+
compactionUploadBufferSize = 32 << 10
2526
)
2627

2728
func estimateReadBufferSize(s int64) int {

pkg/experiment/block/compaction.go

+25-66
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"context"
66
"fmt"
77
"io"
8-
"os"
98
"slices"
109
"sort"
1110
"strings"
@@ -40,12 +39,6 @@ func WithCompactionObjectOptions(options ...ObjectOption) CompactionOption {
4039
}
4140
}
4241

43-
func WithCompactionTempDir(tempdir string) CompactionOption {
44-
return func(p *compactionConfig) {
45-
p.tempdir = tempdir
46-
}
47-
}
48-
4942
func WithCompactionDestination(storage objstore.Bucket) CompactionOption {
5043
return func(p *compactionConfig) {
5144
p.destination = storage
@@ -54,7 +47,6 @@ func WithCompactionDestination(storage objstore.Bucket) CompactionOption {
5447

5548
type compactionConfig struct {
5649
objectOptions []ObjectOption
57-
tempdir string
5850
source objstore.BucketReader
5951
destination objstore.Bucket
6052
}
@@ -66,7 +58,6 @@ func Compact(
6658
options ...CompactionOption,
6759
) (m []*metastorev1.BlockMeta, err error) {
6860
c := &compactionConfig{
69-
tempdir: os.TempDir(),
7061
source: storage,
7162
destination: storage,
7263
}
@@ -89,9 +80,9 @@ func Compact(
8980

9081
compacted := make([]*metastorev1.BlockMeta, 0, len(plan))
9182
for _, p := range plan {
92-
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir)
83+
md, compactionErr := p.Compact(ctx, c.destination)
9384
if compactionErr != nil {
94-
return nil, err
85+
return nil, compactionErr
9586
}
9687
compacted = append(compacted, md)
9788
}
@@ -182,14 +173,12 @@ func newBlockCompaction(
182173
return p
183174
}
184175

185-
func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) {
186-
w, err := NewBlockWriter(dst, b.path, tmpdir)
187-
if err != nil {
188-
return nil, fmt.Errorf("block writer: %w", err)
189-
}
176+
func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket) (m *metastorev1.BlockMeta, err error) {
177+
w := NewBlockWriter(ctx, dst, b.path)
190178
defer func() {
191-
err = multierror.New(err, w.Close()).Err()
179+
_ = w.Close()
192180
}()
181+
193182
// Datasets are compacted in a strict order.
194183
for i, s := range b.datasets {
195184
b.datasetIndex.resetDatasetIndex(uint32(i))
@@ -207,7 +196,7 @@ func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdi
207196
return nil, fmt.Errorf("writing metadata: %w", err)
208197
}
209198
b.meta.Size = w.Offset()
210-
if err = w.Upload(ctx); err != nil {
199+
if err = w.Close(); err != nil {
211200
return nil, fmt.Errorf("flushing block writer: %w", err)
212201
}
213202
return b.meta, nil
@@ -266,7 +255,6 @@ type datasetCompaction struct {
266255
parent *CompactionPlan
267256
meta *metastorev1.Dataset
268257
labels *metadata.LabelBuilder
269-
path string // Set at open.
270258

271259
datasets []*Dataset
272260

@@ -312,42 +300,38 @@ func (m *datasetCompaction) append(s *Dataset) {
312300
}
313301

314302
func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error) {
315-
if err = m.open(ctx, w.Dir()); err != nil {
303+
off := w.Offset()
304+
m.meta.TableOfContents = make([]uint64, 0, 3)
305+
m.meta.TableOfContents = append(m.meta.TableOfContents, w.Offset())
306+
307+
if err = m.open(ctx, w); err != nil {
316308
return fmt.Errorf("failed to open sections for compaction: %w", err)
317309
}
318-
defer func() {
319-
err = multierror.New(err, m.cleanup()).Err()
320-
}()
321310
if err = m.mergeAndClose(ctx); err != nil {
322311
return fmt.Errorf("failed to merge datasets: %w", err)
323312
}
324-
if err = m.writeTo(w); err != nil {
325-
return fmt.Errorf("failed to write sections: %w", err)
313+
314+
m.meta.TableOfContents = append(m.meta.TableOfContents, w.Offset())
315+
if _, err = w.ReadFrom(bytes.NewReader(m.indexRewriter.buf)); err != nil {
316+
return fmt.Errorf("failed to read index: %w", err)
317+
}
318+
m.meta.TableOfContents = append(m.meta.TableOfContents, w.Offset())
319+
if _, err = w.ReadFrom(bytes.NewReader(m.symbolsRewriter.buf.Bytes())); err != nil {
320+
return fmt.Errorf("failed to read symbols: %w", err)
326321
}
322+
323+
m.meta.Size = w.Offset() - off
324+
m.meta.Labels = m.labels.Build()
327325
return nil
328326
}
329327

330-
func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
331-
m.path = path
332-
defer func() {
333-
if err != nil {
334-
err = multierror.New(err, m.cleanup()).Err()
335-
}
336-
}()
337-
338-
if err = os.MkdirAll(m.path, 0o777); err != nil {
339-
return err
340-
}
341-
328+
func (m *datasetCompaction) open(ctx context.Context, w io.Writer) (err error) {
342329
var estimatedProfileTableSize int64
343330
for _, ds := range m.datasets {
344331
estimatedProfileTableSize += ds.sectionSize(SectionProfiles)
345332
}
346333
pageBufferSize := estimatePageBufferSize(estimatedProfileTableSize)
347-
m.profilesWriter, err = newProfileWriter(m.path, pageBufferSize)
348-
if err != nil {
349-
return err
350-
}
334+
m.profilesWriter = newProfileWriter(pageBufferSize, w)
351335

352336
m.indexRewriter = newIndexRewriter()
353337
m.symbolsRewriter = newSymbolsRewriter()
@@ -373,7 +357,6 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
373357
}
374358
return merr.Err()
375359
}
376-
377360
return nil
378361
}
379362

@@ -433,30 +416,6 @@ func (m *datasetCompaction) close() (err error) {
433416
return err
434417
}
435418

436-
func (m *datasetCompaction) writeTo(w *Writer) error {
437-
off := w.Offset()
438-
m.meta.TableOfContents = make([]uint64, 0, 3)
439-
m.meta.TableOfContents = append(m.meta.TableOfContents, w.Offset())
440-
if err := w.ReadFromFile(FileNameProfilesParquet); err != nil {
441-
return err
442-
}
443-
m.meta.TableOfContents = append(m.meta.TableOfContents, w.Offset())
444-
if _, err := w.ReadFrom(bytes.NewReader(m.indexRewriter.buf)); err != nil {
445-
return err
446-
}
447-
m.meta.TableOfContents = append(m.meta.TableOfContents, w.Offset())
448-
if _, err := w.ReadFrom(bytes.NewReader(m.symbolsRewriter.buf.Bytes())); err != nil {
449-
return err
450-
}
451-
m.meta.Size = w.Offset() - off
452-
m.meta.Labels = m.labels.Build()
453-
return nil
454-
}
455-
456-
func (m *datasetCompaction) cleanup() error {
457-
return os.RemoveAll(m.path)
458-
}
459-
460419
func newIndexRewriter() *indexRewriter {
461420
return &indexRewriter{
462421
symbols: make(map[string]struct{}),

pkg/experiment/block/compaction_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ func Test_CompactBlocks(t *testing.T) {
2828
dst, tempdir := testutil.NewFilesystemBucket(t, ctx, t.TempDir())
2929
compactedBlocks, err := Compact(ctx, resp.Blocks, bucket,
3030
WithCompactionDestination(dst),
31-
WithCompactionTempDir(tempdir),
3231
WithCompactionObjectOptions(
3332
WithObjectDownload(filepath.Join(tempdir, "source")),
3433
WithObjectMaxSizeLoadInMemory(0)), // Force download.

pkg/experiment/block/section_profiles.go

+4-20
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"fmt"
77
"io"
88
"math"
9-
"os"
10-
"path/filepath"
119

1210
"github.com/parquet-go/parquet-go"
1311
"github.com/pkg/errors"
@@ -159,28 +157,22 @@ func (f *ParquetFile) Column(ctx context.Context, columnName string, predicate q
159157

160158
type profilesWriter struct {
161159
*parquet.GenericWriter[*schemav1.Profile]
162-
file *os.File
163160
buf []parquet.Row
164161
profiles uint64
165162
}
166163

167-
func newProfileWriter(dst string, pageBufferSize int) (*profilesWriter, error) {
168-
f, err := os.Create(filepath.Join(dst, FileNameProfilesParquet))
169-
if err != nil {
170-
return nil, err
171-
}
164+
func newProfileWriter(pageBufferSize int, w io.Writer) *profilesWriter {
172165
return &profilesWriter{
173-
file: f,
174-
buf: make([]parquet.Row, 1),
175-
GenericWriter: parquet.NewGenericWriter[*schemav1.Profile](f,
166+
buf: make([]parquet.Row, 1),
167+
GenericWriter: parquet.NewGenericWriter[*schemav1.Profile](w,
176168
parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision),
177169
parquet.PageBufferSize(pageBufferSize),
178170
// Note that parquet keeps ALL RG pages in memory (ColumnPageBuffers).
179171
parquet.MaxRowsPerRowGroup(maxRowsPerRowGroup),
180172
schemav1.ProfilesSchema,
181173
// parquet.ColumnPageBuffers(),
182174
),
183-
}, nil
175+
}
184176
}
185177

186178
func (p *profilesWriter) writeRow(e ProfileEntry) error {
@@ -190,14 +182,6 @@ func (p *profilesWriter) writeRow(e ProfileEntry) error {
190182
return err
191183
}
192184

193-
func (p *profilesWriter) Close() error {
194-
err := p.GenericWriter.Close()
195-
if err != nil {
196-
return err
197-
}
198-
return p.file.Close()
199-
}
200-
201185
type readerWithFooter struct {
202186
reader io.ReaderAt
203187
footer []byte

0 commit comments

Comments
 (0)