From 2bb97adcf58ed9538a6bbf093535cfb3f709e592 Mon Sep 17 00:00:00 2001 From: Alberto Soto Date: Fri, 7 Feb 2025 10:02:30 +0100 Subject: [PATCH] flush async, rename MetricsObserver to SampleObserver, agent name --- pkg/experiment/compactor/compaction_worker.go | 2 +- pkg/experiment/metrics/exporter.go | 15 ++++++++------- pkg/experiment/metrics/observer.go | 17 +++++++++-------- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/pkg/experiment/compactor/compaction_worker.go b/pkg/experiment/compactor/compaction_worker.go index 17c7ea3898..1b69f029be 100644 --- a/pkg/experiment/compactor/compaction_worker.go +++ b/pkg/experiment/compactor/compaction_worker.go @@ -463,7 +463,7 @@ func (w *Worker) runCompaction(job *compactionJob) { func newSampleObserver(job *compactionJob) block.SampleObserver { if job.CompactionLevel == 0 { - return metricsexport.NewMetricsObserver(job.Tenant, job.blocks[0]) + return metricsexport.NewSampleObserver(job.Tenant, job.blocks[0]) } return &block.NoOpObserver{} } diff --git a/pkg/experiment/metrics/exporter.go b/pkg/experiment/metrics/exporter.go index 87be3a4b8d..e2a1cb79b4 100644 --- a/pkg/experiment/metrics/exporter.go +++ b/pkg/experiment/metrics/exporter.go @@ -25,23 +25,24 @@ type Config struct { password config.Secret } -func NewExporter(tenant string) *Exporter { +func NewExporter(tenant string, recordings []*Recording) *Exporter { cfg := configFromTenant(tenant) - return &Exporter{ + exporter := &Exporter{ config: cfg, data: map[AggregatedFingerprint]*TimeSeries{}, } -} - -func (e *Exporter) AppendMetrics(recordings []*Recording) { for _, r := range recordings { for fp, ts := range r.data { - e.data[fp] = ts + exporter.data[fp] = ts } } + return exporter } func (e *Exporter) Send() error { + if len(e.data) == 0 { + return nil + } if e.client == nil { e.client = newClient(e.config) } @@ -78,7 +79,7 @@ func newClient(cfg Config) remote.WriteClient { panic(err) } - c, err := remote.NewWriteClient("exporter", &remote.ClientConfig{ + c, err := remote.NewWriteClient("pyroscope-metrics-exporter", &remote.ClientConfig{ URL: &config.URL{URL: wURL}, Timeout: model.Duration(time.Second * 10), HTTPClientConfig: config.HTTPClientConfig{ diff --git a/pkg/experiment/metrics/observer.go b/pkg/experiment/metrics/observer.go index 100313413c..d28a4a27b3 100644 --- a/pkg/experiment/metrics/observer.go +++ b/pkg/experiment/metrics/observer.go @@ -10,16 +10,16 @@ import ( "github.com/grafana/pyroscope/pkg/experiment/block" ) -type MetricsObserver struct { +type SampleObserver struct { tenant string recorder *Recorder } -func NewMetricsObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsObserver { +func NewSampleObserver(tenant string, meta *metastorev1.BlockMeta) *SampleObserver { recordingTime := int64(ulid.MustParse(meta.Id).Time()) rules := recordingRulesFromTenant(tenant) pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy) - return &MetricsObserver{ + return &SampleObserver{ tenant: tenant, recorder: NewRecorder(rules, recordingTime, pyroscopeInstance), } @@ -32,12 +32,13 @@ func pyroscopeInstanceHash(shard uint32, createdBy int32) string { return fmt.Sprintf("%x", xxhash.Sum64(buf)) } -func (o *MetricsObserver) Observe(row block.ProfileEntry) { +func (o *SampleObserver) Observe(row block.ProfileEntry) { o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue()) } -func (o *MetricsObserver) Flush() error { - exporter := NewExporter(o.tenant) - exporter.AppendMetrics(o.recorder.Recordings) - return exporter.Send() +func (o *SampleObserver) Flush() error { + go func() { + NewExporter(o.tenant, o.recorder.Recordings).Send() // TODO log error + }() + return nil }