diff --git a/pkg/experiment/block/compaction.go b/pkg/experiment/block/compaction.go index 1017083fa1..a15b09fffe 100644 --- a/pkg/experiment/block/compaction.go +++ b/pkg/experiment/block/compaction.go @@ -10,9 +10,7 @@ import ( "strings" "sync" - "github.com/cespare/xxhash/v2" "github.com/grafana/dskit/multierror" - "github.com/oklog/ulid" "github.com/parquet-go/parquet-go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" @@ -20,7 +18,6 @@ import ( metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" "github.com/grafana/pyroscope/pkg/experiment/block/metadata" - "github.com/grafana/pyroscope/pkg/experiment/metrics" phlaremodel "github.com/grafana/pyroscope/pkg/model" "github.com/grafana/pyroscope/pkg/objstore" "github.com/grafana/pyroscope/pkg/phlaredb/block" @@ -54,11 +51,30 @@ func WithCompactionDestination(storage objstore.Bucket) CompactionOption { } } +func WithSampleObserver(observer SampleObserver) CompactionOption { + return func(p *compactionConfig) { + p.sampleObserver = observer + } +} + type compactionConfig struct { - objectOptions []ObjectOption - tempdir string - source objstore.BucketReader - destination objstore.Bucket + objectOptions []ObjectOption + tempdir string + source objstore.BucketReader + destination objstore.Bucket + sampleObserver SampleObserver +} + +type SampleObserver interface { + // Observe is called before the compactor appends the entry + // to the output block. This method must not modify the entry. + Observe(ProfileEntry) + + // Flush is called before the compactor flushes the output dataset. + // This call invalidates all references (such as symbols) to the source + // and output blocks. Any error returned by the call terminates the + // compaction job: it's caller responsibility to suppress errors. + Flush() error } func Compact( @@ -91,19 +107,11 @@ func Compact( compacted := make([]*metastorev1.BlockMeta, 0, len(plan)) for _, p := range plan { - md, compactionErr := p.Compact(ctx, c.destination, c.tempdir) + md, compactionErr := p.Compact(ctx, c.destination, c.tempdir, c.sampleObserver) if compactionErr != nil { return nil, compactionErr } compacted = append(compacted, md) - - if p.metricsExporter != nil { - go func() { - if sendErr := p.SendRecordedMetrics(); sendErr != nil { - println("ERROR", sendErr) // TODO - } - }() - } } return compacted, nil @@ -160,13 +168,12 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) { } type CompactionPlan struct { - tenant string - path string - datasetMap map[int32]*datasetCompaction - datasets []*datasetCompaction - meta *metastorev1.BlockMeta - strings *metadata.StringTable - metricsExporter *metrics.Exporter + tenant string + path string + datasetMap map[int32]*datasetCompaction + datasets []*datasetCompaction + meta *metastorev1.BlockMeta + strings *metadata.StringTable } func newBlockCompaction( @@ -188,27 +195,30 @@ func newBlockCompaction( Shard: shard, CompactionLevel: compactionLevel, } - if compactionLevel == 1 { - p.metricsExporter = metrics.NewExporter(tenant) - } return p } -func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) { +func (b *CompactionPlan) Compact( + ctx context.Context, + dst objstore.Bucket, + tmpdir string, + observer SampleObserver, +) (m *metastorev1.BlockMeta, err error) { w := NewBlockWriter(dst, b.path, tmpdir) defer func() { err = multierror.New(err, w.Close()).Err() }() // Datasets are compacted in a strict order. for _, s := range b.datasets { + s.registerSampleObserver(observer) if err = s.compact(ctx, w); err != nil { return nil, fmt.Errorf("compacting block: %w", err) } - if b.metricsExporter != nil { - b.metricsExporter.AppendMetrics(s.metricsRecorder.Recordings) - } b.meta.Datasets = append(b.meta.Datasets, s.meta) } + if err = observer.Flush(); err != nil { + return nil, fmt.Errorf("flushing sample observer: %w", err) + } if err = w.Flush(ctx); err != nil { return nil, fmt.Errorf("flushing block writer: %w", err) } @@ -235,10 +245,6 @@ func (b *CompactionPlan) addDataset(md *metastorev1.BlockMeta, s *metastorev1.Da return sm } -func (c *CompactionPlan) SendRecordedMetrics() error { - return c.metricsExporter.Send() -} - type datasetCompaction struct { // Dataset name. name string @@ -259,7 +265,7 @@ type datasetCompaction struct { flushOnce sync.Once - metricsRecorder *metrics.Recorder + observer SampleObserver } func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction { @@ -308,6 +314,10 @@ func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error) return nil } +func (m *datasetCompaction) registerSampleObserver(observer SampleObserver) { + m.observer = observer +} + func (m *datasetCompaction) open(ctx context.Context, path string) (err error) { m.path = path defer func() { @@ -333,13 +343,6 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) { m.indexRewriter = newIndexRewriter(m.path) m.symbolsRewriter = newSymbolsRewriter(m.path) - if m.parent.meta.CompactionLevel == 1 { - recordingTime := int64(ulid.MustParse(m.parent.meta.Id).Time()) - rules := metrics.RecordingRulesFromTenant(m.parent.tenant) - pyroscopeInstance := pyroscopeInstanceHash(m.parent.meta.Shard, m.parent.meta.CreatedBy) - m.metricsRecorder = metrics.NewRecorder(rules, recordingTime, pyroscopeInstance) - } - g, ctx := errgroup.WithContext(ctx) for _, s := range m.datasets { s := s @@ -361,13 +364,6 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) { return nil } -func pyroscopeInstanceHash(shard uint32, createdBy int32) string { - buf := make([]byte, 0, 8) - buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard)) - buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy)) - return fmt.Sprintf("%x", xxhash.Sum64(buf)) -} - func (m *datasetCompaction) mergeAndClose(ctx context.Context) (err error) { defer func() { err = multierror.New(err, m.close()).Err() @@ -404,9 +400,7 @@ func (m *datasetCompaction) writeRow(r ProfileEntry) (err error) { if err = m.symbolsRewriter.rewriteRow(r); err != nil { return err } - if m.metricsRecorder != nil { - m.metricsRecorder.RecordRow(r.Fingerprint, r.Labels, r.Row.TotalValue()) - } + m.observer.Observe(r) return m.profilesWriter.writeRow(r) } diff --git a/pkg/experiment/compactor/compaction_worker.go b/pkg/experiment/compactor/compaction_worker.go index 7579246791..de91d00795 100644 --- a/pkg/experiment/compactor/compaction_worker.go +++ b/pkg/experiment/compactor/compaction_worker.go @@ -25,6 +25,7 @@ import ( metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" "github.com/grafana/pyroscope/pkg/experiment/block" "github.com/grafana/pyroscope/pkg/experiment/block/metadata" + metrics2 "github.com/grafana/pyroscope/pkg/experiment/metrics" "github.com/grafana/pyroscope/pkg/objstore" "github.com/grafana/pyroscope/pkg/util" ) @@ -368,7 +369,9 @@ func (w *Worker) runCompaction(job *compactionJob) { block.WithCompactionObjectOptions( block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize), block.WithObjectDownload(sourcedir), - )) + ), + block.WithSampleObserver(newSampleObserver(job)), + ) switch { case err == nil: @@ -417,6 +420,13 @@ func (w *Worker) runCompaction(job *compactionJob) { _ = deleteGroup.Wait() } +func newSampleObserver(job *compactionJob) block.SampleObserver { + if job.CompactionLevel == 0 { + return metrics2.NewMetricsObserver(job.Tenant, job.blocks[0]) + } + return &metrics2.NoOpObserver{} +} + func (w *Worker) getBlockMetadata(logger log.Logger, job *compactionJob) error { ctx, cancel := context.WithTimeout(job.ctx, w.config.RequestTimeout) defer cancel() diff --git a/pkg/experiment/metrics/observer.go b/pkg/experiment/metrics/observer.go new file mode 100644 index 0000000000..2d1a333dd3 --- /dev/null +++ b/pkg/experiment/metrics/observer.go @@ -0,0 +1,52 @@ +package metrics + +import ( + "fmt" + + "github.com/cespare/xxhash/v2" + "github.com/oklog/ulid" + + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/experiment/block" +) + +type MetricsObserver struct { + tenant string + recorder *Recorder +} + +func NewMetricsObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsObserver { + recordingTime := int64(ulid.MustParse(meta.Id).Time()) + rules := recordingRulesFromTenant(tenant) + pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy) + return &MetricsObserver{ + tenant: tenant, + recorder: NewRecorder(rules, recordingTime, pyroscopeInstance), + } +} + +func pyroscopeInstanceHash(shard uint32, createdBy int32) string { + buf := make([]byte, 0, 8) + buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard)) + buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy)) + return fmt.Sprintf("%x", xxhash.Sum64(buf)) +} + +func (o *MetricsObserver) Observe(row block.ProfileEntry) { + o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue()) +} + +func (o *MetricsObserver) Flush() error { + exporter := NewExporter(o.tenant) + exporter.AppendMetrics(o.recorder.Recordings) + return exporter.Send() +} + +type NoOpObserver struct{} + +func (o *NoOpObserver) Observe(row block.ProfileEntry) { +} + +func (o *NoOpObserver) Flush() error { + return nil +} diff --git a/pkg/experiment/metrics/recorder.go b/pkg/experiment/metrics/recorder.go index 73d37f5c26..d2e4b41549 100644 --- a/pkg/experiment/metrics/recorder.go +++ b/pkg/experiment/metrics/recorder.go @@ -66,7 +66,6 @@ func NewRecorder(recordingRules []*RecordingRule, recordingTime int64, pyroscope for i, rule := range recordingRules { recordings[i] = &Recording{ rule: *rule, - // fps: make(map[model.Fingerprint]*AggregatedFingerprint), data: make(map[AggregatedFingerprint]*TimeSeries), state: &recordingState{ fp: nil, @@ -111,7 +110,7 @@ func generateExportedLabels(labelsMap map[string]string, rec *Recording, pyrosco Value: rec.rule.metricName, }, labels.Label{ - Name: "__pyroscope_instance__", + Name: "pyroscope_instance", Value: pyroscopeInstance, }, } diff --git a/pkg/experiment/metrics/rules.go b/pkg/experiment/metrics/rules.go index c60831b514..7445c77e95 100644 --- a/pkg/experiment/metrics/rules.go +++ b/pkg/experiment/metrics/rules.go @@ -11,7 +11,7 @@ type RecordingRule struct { keepLabels []string } -func RecordingRulesFromTenant(tenant string) []*RecordingRule { +func recordingRulesFromTenant(tenant string) []*RecordingRule { // TODO return []*RecordingRule{ {