diff --git a/pkg/experiment/block/compaction.go b/pkg/experiment/block/compaction.go index d5d8d5d069..dd33a75ba5 100644 --- a/pkg/experiment/block/compaction.go +++ b/pkg/experiment/block/compaction.go @@ -51,11 +51,39 @@ func WithCompactionDestination(storage objstore.Bucket) CompactionOption { } } +func WithSampleObserver(observer SampleObserver) CompactionOption { + return func(p *compactionConfig) { + p.sampleObserver = observer + } +} + type compactionConfig struct { - objectOptions []ObjectOption - tempdir string - source objstore.BucketReader - destination objstore.Bucket + objectOptions []ObjectOption + tempdir string + source objstore.BucketReader + destination objstore.Bucket + sampleObserver SampleObserver +} + +type SampleObserver interface { + // Observe is called before the compactor appends the entry + // to the output block. This method must not modify the entry. + Observe(ProfileEntry) + + // Flush is called before the compactor flushes the output dataset. + // This call invalidates all references (such as symbols) to the source + // and output blocks. Any error returned by the call terminates the + // compaction job: it's caller responsibility to suppress errors. + Flush() error +} + +type NoOpObserver struct{} + +func (o *NoOpObserver) Observe(row ProfileEntry) { +} + +func (o *NoOpObserver) Flush() error { + return nil } func Compact( @@ -88,7 +116,7 @@ func Compact( compacted := make([]*metastorev1.BlockMeta, 0, len(plan)) for _, p := range plan { - md, compactionErr := p.Compact(ctx, c.destination, c.tempdir) + md, compactionErr := p.Compact(ctx, c.destination, c.tempdir, c.sampleObserver) if compactionErr != nil { return nil, compactionErr } @@ -179,18 +207,27 @@ func newBlockCompaction( return p } -func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) { +func (b *CompactionPlan) Compact( + ctx context.Context, + dst objstore.Bucket, + tmpdir string, + observer SampleObserver, +) (m *metastorev1.BlockMeta, err error) { w := NewBlockWriter(dst, b.path, tmpdir) defer func() { err = multierror.New(err, w.Close()).Err() }() // Datasets are compacted in a strict order. for _, s := range b.datasets { + s.registerSampleObserver(observer) if err = s.compact(ctx, w); err != nil { return nil, fmt.Errorf("compacting block: %w", err) } b.meta.Datasets = append(b.meta.Datasets, s.meta) } + if err = observer.Flush(); err != nil { + return nil, fmt.Errorf("flushing sample observer: %w", err) + } if err = w.Flush(ctx); err != nil { return nil, fmt.Errorf("flushing block writer: %w", err) } @@ -236,6 +273,8 @@ type datasetCompaction struct { profiles uint64 flushOnce sync.Once + + observer SampleObserver } func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction { @@ -284,6 +323,10 @@ func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error) return nil } +func (m *datasetCompaction) registerSampleObserver(observer SampleObserver) { + m.observer = observer +} + func (m *datasetCompaction) open(ctx context.Context, path string) (err error) { m.path = path defer func() { @@ -366,6 +409,7 @@ func (m *datasetCompaction) writeRow(r ProfileEntry) (err error) { if err = m.symbolsRewriter.rewriteRow(r); err != nil { return err } + m.observer.Observe(r) return m.profilesWriter.writeRow(r) } diff --git a/pkg/experiment/block/compaction_test.go b/pkg/experiment/block/compaction_test.go index 0176077e40..959982cbd3 100644 --- a/pkg/experiment/block/compaction_test.go +++ b/pkg/experiment/block/compaction_test.go @@ -32,6 +32,7 @@ func Test_CompactBlocks(t *testing.T) { WithCompactionObjectOptions( WithObjectDownload(filepath.Join(tempdir, "source")), WithObjectMaxSizeLoadInMemory(0)), // Force download. + WithSampleObserver(&NoOpObserver{}), ) require.NoError(t, err) diff --git a/pkg/experiment/compactor/compaction_worker.go b/pkg/experiment/compactor/compaction_worker.go index 05a4398740..aa47fdac8a 100644 --- a/pkg/experiment/compactor/compaction_worker.go +++ b/pkg/experiment/compactor/compaction_worker.go @@ -25,6 +25,7 @@ import ( metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" "github.com/grafana/pyroscope/pkg/experiment/block" "github.com/grafana/pyroscope/pkg/experiment/block/metadata" + metricsexport "github.com/grafana/pyroscope/pkg/experiment/metrics" "github.com/grafana/pyroscope/pkg/objstore" "github.com/grafana/pyroscope/pkg/util" ) @@ -49,11 +50,12 @@ type Worker struct { } type Config struct { - JobConcurrency int `yaml:"job_capacity"` - JobPollInterval time.Duration `yaml:"job_poll_interval"` - SmallObjectSize int `yaml:"small_object_size_bytes"` - TempDir string `yaml:"temp_dir"` - RequestTimeout time.Duration `yaml:"request_timeout"` + JobConcurrency int `yaml:"job_capacity"` + JobPollInterval time.Duration `yaml:"job_poll_interval"` + SmallObjectSize int `yaml:"small_object_size_bytes"` + TempDir string `yaml:"temp_dir"` + RequestTimeout time.Duration `yaml:"request_timeout"` + MetricsExporterEnabled bool `yaml:"metrics_exporter_enabled"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -63,6 +65,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RequestTimeout, prefix+"request-timeout", 5*time.Second, "Job request timeout.") f.IntVar(&cfg.SmallObjectSize, prefix+"small-object-size-bytes", 8<<20, "Size of the object that can be loaded in memory.") f.StringVar(&cfg.TempDir, prefix+"temp-dir", os.TempDir(), "Temporary directory for compaction jobs.") + f.BoolVar(&cfg.MetricsExporterEnabled, prefix+"metrics-exporter.enabled", false, "This parameter specifies whether the metrics exporter is enabled.") } type compactionJob struct { @@ -399,7 +402,9 @@ func (w *Worker) runCompaction(job *compactionJob) { block.WithCompactionObjectOptions( block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize), block.WithObjectDownload(sourcedir), - )) + ), + block.WithSampleObserver(newSampleObserver(w.config.MetricsExporterEnabled, job)), + ) defer func() { if err = os.RemoveAll(tempdir); err != nil { level.Warn(logger).Log("msg", "failed to remove compaction directory", "path", tempdir, "err", err) @@ -458,6 +463,13 @@ func (w *Worker) runCompaction(job *compactionJob) { _ = deleteGroup.Wait() } +func newSampleObserver(metricsExporterEnabled bool, job *compactionJob) block.SampleObserver { + if metricsExporterEnabled && job.CompactionLevel == 0 { + return metricsexport.NewMetricsExporterSampleObserver(job.Tenant, job.blocks[0]) + } + return &block.NoOpObserver{} +} + func (w *Worker) getBlockMetadata(logger log.Logger, job *compactionJob) error { ctx, cancel := context.WithTimeout(job.ctx, w.config.RequestTimeout) defer cancel() diff --git a/pkg/experiment/metrics/exporter.go b/pkg/experiment/metrics/exporter.go new file mode 100644 index 0000000000..e2a1cb79b4 --- /dev/null +++ b/pkg/experiment/metrics/exporter.go @@ -0,0 +1,109 @@ +package metrics + +import ( + "context" + "net/url" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/klauspost/compress/snappy" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" +) + +type Exporter struct { + config Config + client remote.WriteClient + data map[AggregatedFingerprint]*TimeSeries +} + +type Config struct { + url string + username string + password config.Secret +} + +func NewExporter(tenant string, recordings []*Recording) *Exporter { + cfg := configFromTenant(tenant) + exporter := &Exporter{ + config: cfg, + data: map[AggregatedFingerprint]*TimeSeries{}, + } + for _, r := range recordings { + for fp, ts := range r.data { + exporter.data[fp] = ts + } + } + return exporter +} + +func (e *Exporter) Send() error { + if len(e.data) == 0 { + return nil + } + if e.client == nil { + e.client = newClient(e.config) + } + + p := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, 0, len(e.data))} + for _, ts := range e.data { + pts := prompb.TimeSeries{ + Labels: make([]prompb.Label, 0, len(ts.Labels)), + } + for _, l := range ts.Labels { + pts.Labels = append(pts.Labels, prompb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + for _, s := range ts.Samples { + pts.Samples = append(pts.Samples, prompb.Sample{ + Value: s.Value, + Timestamp: s.Timestamp, + }) + } + p.Timeseries = append(p.Timeseries, pts) + } + buf := proto.NewBuffer(nil) + if err := buf.Marshal(p); err != nil { + return err + } + return e.client.Store(context.Background(), snappy.Encode(nil, buf.Bytes()), 0) +} + +func newClient(cfg Config) remote.WriteClient { + wURL, err := url.Parse(cfg.url) + if err != nil { + panic(err) + } + + c, err := remote.NewWriteClient("pyroscope-metrics-exporter", &remote.ClientConfig{ + URL: &config.URL{URL: wURL}, + Timeout: model.Duration(time.Second * 10), + HTTPClientConfig: config.HTTPClientConfig{ + BasicAuth: &config.BasicAuth{ + Username: cfg.username, + Password: cfg.password, + }, + }, + SigV4Config: nil, + AzureADConfig: nil, + Headers: nil, + RetryOnRateLimit: false, + }) + if err != nil { + panic(err) + } + return c +} + +func configFromTenant(tenant string) Config { + // TODO + return Config{ + url: "omitted", + username: "omitted", + password: "omitted", + } +} diff --git a/pkg/experiment/metrics/observer.go b/pkg/experiment/metrics/observer.go new file mode 100644 index 0000000000..ecfafc7e09 --- /dev/null +++ b/pkg/experiment/metrics/observer.go @@ -0,0 +1,44 @@ +package metrics + +import ( + "fmt" + + "github.com/cespare/xxhash/v2" + "github.com/oklog/ulid" + + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/experiment/block" +) + +type MetricsExporterSampleObserver struct { + tenant string + recorder *Recorder +} + +func NewMetricsExporterSampleObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsExporterSampleObserver { + recordingTime := int64(ulid.MustParse(meta.Id).Time()) + rules := recordingRulesFromTenant(tenant) + pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy) + return &MetricsExporterSampleObserver{ + tenant: tenant, + recorder: NewRecorder(rules, recordingTime, pyroscopeInstance), + } +} + +func pyroscopeInstanceHash(shard uint32, createdBy int32) string { + buf := make([]byte, 0, 8) + buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard)) + buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy)) + return fmt.Sprintf("%x", xxhash.Sum64(buf)) +} + +func (o *MetricsExporterSampleObserver) Observe(row block.ProfileEntry) { + o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue()) +} + +func (o *MetricsExporterSampleObserver) Flush() error { + go func() { + NewExporter(o.tenant, o.recorder.Recordings).Send() // TODO log error + }() + return nil +} diff --git a/pkg/experiment/metrics/recorder.go b/pkg/experiment/metrics/recorder.go new file mode 100644 index 0000000000..d2e4b41549 --- /dev/null +++ b/pkg/experiment/metrics/recorder.go @@ -0,0 +1,147 @@ +package metrics + +import ( + "sort" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + phlaremodel "github.com/grafana/pyroscope/pkg/model" +) + +type Recorder struct { + Recordings []*Recording + recordingTime int64 + pyroscopeInstance string +} + +type Recording struct { + rule RecordingRule + data map[AggregatedFingerprint]*TimeSeries + state *recordingState +} + +type recordingState struct { + fp *model.Fingerprint + matches bool + timeSeries *TimeSeries +} + +func (r *Recording) InitState(fp model.Fingerprint, lbls phlaremodel.Labels, pyroscopeInstance string, recordingTime int64) { + r.state.fp = &fp + labelsMap := map[string]string{} + for _, label := range lbls { + labelsMap[label.Name] = label.Value + } + r.state.matches = r.matches(labelsMap) + if !r.state.matches { + return + } + + exportedLabels := generateExportedLabels(labelsMap, r, pyroscopeInstance) + sort.Sort(exportedLabels) + aggregatedFp := AggregatedFingerprint(exportedLabels.Hash()) + timeSeries, ok := r.data[aggregatedFp] + if !ok { + timeSeries = newTimeSeries(exportedLabels, recordingTime) + r.data[aggregatedFp] = timeSeries + } + r.state.timeSeries = timeSeries +} + +type AggregatedFingerprint model.Fingerprint + +type TimeSeries struct { + Labels labels.Labels + Samples []Sample +} + +type Sample struct { + Value float64 + Timestamp int64 +} + +func NewRecorder(recordingRules []*RecordingRule, recordingTime int64, pyroscopeInstance string) *Recorder { + recordings := make([]*Recording, len(recordingRules)) + for i, rule := range recordingRules { + recordings[i] = &Recording{ + rule: *rule, + data: make(map[AggregatedFingerprint]*TimeSeries), + state: &recordingState{ + fp: nil, + }, + } + } + return &Recorder{ + Recordings: recordings, + recordingTime: recordingTime, + pyroscopeInstance: pyroscopeInstance, + } +} + +func (r *Recorder) RecordRow(fp model.Fingerprint, lbls phlaremodel.Labels, totalValue int64) { + for _, recording := range r.Recordings { + if recording.state.fp == nil || *recording.state.fp != fp { + recording.InitState(fp, lbls, r.pyroscopeInstance, r.recordingTime) + } + if !recording.state.matches { + continue + } + recording.state.timeSeries.Samples[0].Value += float64(totalValue) + } +} + +func newTimeSeries(exportedLabels labels.Labels, time int64) *TimeSeries { + return &TimeSeries{ + Labels: exportedLabels, + Samples: []Sample{ + { + Value: float64(0), + Timestamp: time, + }, + }, + } +} + +func generateExportedLabels(labelsMap map[string]string, rec *Recording, pyroscopeInstance string) labels.Labels { + exportedLabels := labels.Labels{ + labels.Label{ + Name: "__name__", + Value: rec.rule.metricName, + }, + labels.Label{ + Name: "pyroscope_instance", + Value: pyroscopeInstance, + }, + } + // Add filters as exported labels + for _, matcher := range rec.rule.matchers { + exportedLabels = append(exportedLabels, labels.Label{ + Name: matcher.Name, + Value: matcher.Value, + }) + } + // Keep the expected labels + for _, label := range rec.rule.keepLabels { + labelValue, ok := labelsMap[label] + if ok { + exportedLabels = append(exportedLabels, labels.Label{ + Name: label, + Value: labelValue, + }) + } + } + return exportedLabels +} + +func (r *Recording) matches(labelsMap map[string]string) bool { + if r.rule.profileType != labelsMap["__profile_type__"] { + return false + } + for _, matcher := range r.rule.matchers { + if !matcher.Matches(labelsMap[matcher.Name]) { + return false + } + } + return true +} diff --git a/pkg/experiment/metrics/rules.go b/pkg/experiment/metrics/rules.go new file mode 100644 index 0000000000..7445c77e95 --- /dev/null +++ b/pkg/experiment/metrics/rules.go @@ -0,0 +1,52 @@ +package metrics + +import ( + "github.com/prometheus/prometheus/model/labels" +) + +type RecordingRule struct { + profileType string + metricName string + matchers []*labels.Matcher + keepLabels []string +} + +func recordingRulesFromTenant(tenant string) []*RecordingRule { + // TODO + return []*RecordingRule{ + { + profileType: "process_cpu:samples:count:cpu:nanoseconds", + metricName: "ride_sharing_app_car_cpu_nanoseconds", + matchers: []*labels.Matcher{ + { + Type: labels.MatchEqual, + Name: "service_name", + Value: "ride-sharing-app", + }, + { + Type: labels.MatchEqual, + Name: "vehicle", + Value: "car", + }, + }, + keepLabels: []string{"region"}, + }, + { + profileType: "process_cpu:samples:count:cpu:nanoseconds", + metricName: "ride_sharing_app_car_all_regions_cpu_nanoseconds", + matchers: []*labels.Matcher{ + { + Type: labels.MatchEqual, + Name: "service_name", + Value: "ride-sharing-app", + }, + { + Type: labels.MatchEqual, + Name: "vehicle", + Value: "car", + }, + }, + keepLabels: []string{}, + }, + } +} diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index d03f78f6fc..e831e20da8 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -74,6 +74,7 @@ var ( valueColIndex int timeNanoColIndex int stacktracePartitionColIndex int + totalValueColIndex int downsampledValueColIndex int @@ -110,6 +111,11 @@ func init() { panic(fmt.Errorf("StacktracePartition column not found")) } stacktracePartitionColIndex = stacktracePartitionCol.ColumnIndex + totalValueCol, ok := ProfilesSchema.Lookup(TotalValueColumnName) + if !ok { + panic(fmt.Errorf("TotalValue column not found")) + } + totalValueColIndex = totalValueCol.ColumnIndex downsampledValueCol, ok := DownsampledProfilesSchema.Lookup(SampleValueColumnPath...) if !ok { @@ -669,6 +675,8 @@ func (p ProfileRow) StacktracePartitionID() uint64 { return p[stacktracePartitionColIndex].Uint64() } +func (p ProfileRow) TotalValue() int64 { return p[totalValueColIndex].Int64() } + func (p ProfileRow) TimeNanos() int64 { var ts int64 for i := len(p) - 1; i >= 0; i-- {