Skip to content

Commit

Permalink
flush async, rename MetricsObserver to SampleObserver, agent name
Browse files Browse the repository at this point in the history
  • Loading branch information
alsoba13 committed Feb 7, 2025
1 parent 56bc260 commit d8db16c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (w *Worker) runCompaction(job *compactionJob) {

func newSampleObserver(job *compactionJob) block.SampleObserver {
if job.CompactionLevel == 0 {
return metricsexport.NewMetricsObserver(job.Tenant, job.blocks[0])
return metricsexport.NewMetricsExporterSampleObserver(job.Tenant, job.blocks[0])
}
return &block.NoOpObserver{}
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/experiment/metrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ type Config struct {
password config.Secret
}

func NewExporter(tenant string) *Exporter {
func NewExporter(tenant string, recordings []*Recording) *Exporter {
cfg := configFromTenant(tenant)
return &Exporter{
exporter := &Exporter{
config: cfg,
data: map[AggregatedFingerprint]*TimeSeries{},
}
}

func (e *Exporter) AppendMetrics(recordings []*Recording) {
for _, r := range recordings {
for fp, ts := range r.data {
e.data[fp] = ts
exporter.data[fp] = ts
}
}
return exporter
}

func (e *Exporter) Send() error {
if len(e.data) == 0 {
return nil
}
if e.client == nil {
e.client = newClient(e.config)
}
Expand Down Expand Up @@ -78,7 +79,7 @@ func newClient(cfg Config) remote.WriteClient {
panic(err)
}

c, err := remote.NewWriteClient("exporter", &remote.ClientConfig{
c, err := remote.NewWriteClient("pyroscope-metrics-exporter", &remote.ClientConfig{
URL: &config.URL{URL: wURL},
Timeout: model.Duration(time.Second * 10),
HTTPClientConfig: config.HTTPClientConfig{
Expand Down
17 changes: 9 additions & 8 deletions pkg/experiment/metrics/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (
"github.com/grafana/pyroscope/pkg/experiment/block"
)

type MetricsObserver struct {
type MetricsExporterSampleObserver struct {
tenant string
recorder *Recorder
}

func NewMetricsObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsObserver {
func NewMetricsExporterSampleObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsExporterSampleObserver {
recordingTime := int64(ulid.MustParse(meta.Id).Time())
rules := recordingRulesFromTenant(tenant)
pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy)
return &MetricsObserver{
return &MetricsExporterSampleObserver{
tenant: tenant,
recorder: NewRecorder(rules, recordingTime, pyroscopeInstance),
}
Expand All @@ -32,12 +32,13 @@ func pyroscopeInstanceHash(shard uint32, createdBy int32) string {
return fmt.Sprintf("%x", xxhash.Sum64(buf))
}

func (o *MetricsObserver) Observe(row block.ProfileEntry) {
func (o *MetricsExporterSampleObserver) Observe(row block.ProfileEntry) {
o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue())
}

func (o *MetricsObserver) Flush() error {
exporter := NewExporter(o.tenant)
exporter.AppendMetrics(o.recorder.Recordings)
return exporter.Send()
func (o *MetricsExporterSampleObserver) Flush() error {
go func() {
NewExporter(o.tenant, o.recorder.Recordings).Send() // TODO log error
}()
return nil
}

0 comments on commit d8db16c

Please sign in to comment.