Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature flag for metrics exporter #3898

56 changes: 50 additions & 6 deletions pkg/experiment/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,39 @@ func WithCompactionDestination(storage objstore.Bucket) CompactionOption {
}
}

func WithSampleObserver(observer SampleObserver) CompactionOption {
return func(p *compactionConfig) {
p.sampleObserver = observer
}
}

type compactionConfig struct {
objectOptions []ObjectOption
tempdir string
source objstore.BucketReader
destination objstore.Bucket
objectOptions []ObjectOption
tempdir string
source objstore.BucketReader
destination objstore.Bucket
sampleObserver SampleObserver
}

type SampleObserver interface {
// Observe is called before the compactor appends the entry
// to the output block. This method must not modify the entry.
Observe(ProfileEntry)

// Flush is called before the compactor flushes the output dataset.
// This call invalidates all references (such as symbols) to the source
// and output blocks. Any error returned by the call terminates the
// compaction job: it's caller responsibility to suppress errors.
Flush() error
}

type NoOpObserver struct{}

func (o *NoOpObserver) Observe(row ProfileEntry) {
}

func (o *NoOpObserver) Flush() error {
return nil
}

func Compact(
Expand Down Expand Up @@ -88,7 +116,7 @@ func Compact(

compacted := make([]*metastorev1.BlockMeta, 0, len(plan))
for _, p := range plan {
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir)
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir, c.sampleObserver)
if compactionErr != nil {
return nil, compactionErr
}
Expand Down Expand Up @@ -179,18 +207,27 @@ func newBlockCompaction(
return p
}

func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) {
func (b *CompactionPlan) Compact(
ctx context.Context,
dst objstore.Bucket,
tmpdir string,
observer SampleObserver,
) (m *metastorev1.BlockMeta, err error) {
w := NewBlockWriter(dst, b.path, tmpdir)
defer func() {
err = multierror.New(err, w.Close()).Err()
}()
// Datasets are compacted in a strict order.
for _, s := range b.datasets {
s.registerSampleObserver(observer)
if err = s.compact(ctx, w); err != nil {
return nil, fmt.Errorf("compacting block: %w", err)
}
b.meta.Datasets = append(b.meta.Datasets, s.meta)
}
if err = observer.Flush(); err != nil {
return nil, fmt.Errorf("flushing sample observer: %w", err)
}
if err = w.Flush(ctx); err != nil {
return nil, fmt.Errorf("flushing block writer: %w", err)
}
Expand Down Expand Up @@ -236,6 +273,8 @@ type datasetCompaction struct {
profiles uint64

flushOnce sync.Once

observer SampleObserver
}

func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction {
Expand Down Expand Up @@ -284,6 +323,10 @@ func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error)
return nil
}

func (m *datasetCompaction) registerSampleObserver(observer SampleObserver) {
m.observer = observer
}

func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
m.path = path
defer func() {
Expand Down Expand Up @@ -366,6 +409,7 @@ func (m *datasetCompaction) writeRow(r ProfileEntry) (err error) {
if err = m.symbolsRewriter.rewriteRow(r); err != nil {
return err
}
m.observer.Observe(r)
return m.profilesWriter.writeRow(r)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/experiment/block/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func Test_CompactBlocks(t *testing.T) {
WithCompactionObjectOptions(
WithObjectDownload(filepath.Join(tempdir, "source")),
WithObjectMaxSizeLoadInMemory(0)), // Force download.
WithSampleObserver(&NoOpObserver{}),
)

require.NoError(t, err)
Expand Down
24 changes: 18 additions & 6 deletions pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
metricsexport "github.com/grafana/pyroscope/pkg/experiment/metrics"
"github.com/grafana/pyroscope/pkg/objstore"
"github.com/grafana/pyroscope/pkg/util"
)
Expand All @@ -49,11 +50,12 @@ type Worker struct {
}

type Config struct {
JobConcurrency int `yaml:"job_capacity"`
JobPollInterval time.Duration `yaml:"job_poll_interval"`
SmallObjectSize int `yaml:"small_object_size_bytes"`
TempDir string `yaml:"temp_dir"`
RequestTimeout time.Duration `yaml:"request_timeout"`
JobConcurrency int `yaml:"job_capacity"`
JobPollInterval time.Duration `yaml:"job_poll_interval"`
SmallObjectSize int `yaml:"small_object_size_bytes"`
TempDir string `yaml:"temp_dir"`
RequestTimeout time.Duration `yaml:"request_timeout"`
MetricsExporterEnabled bool `yaml:"metrics_exporter_enabled"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -63,6 +65,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RequestTimeout, prefix+"request-timeout", 5*time.Second, "Job request timeout.")
f.IntVar(&cfg.SmallObjectSize, prefix+"small-object-size-bytes", 8<<20, "Size of the object that can be loaded in memory.")
f.StringVar(&cfg.TempDir, prefix+"temp-dir", os.TempDir(), "Temporary directory for compaction jobs.")
f.BoolVar(&cfg.MetricsExporterEnabled, prefix+"metrics-exporter.enabled", false, "This parameter specifies whether the metrics exporter is enabled.")
}

type compactionJob struct {
Expand Down Expand Up @@ -399,7 +402,9 @@ func (w *Worker) runCompaction(job *compactionJob) {
block.WithCompactionObjectOptions(
block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize),
block.WithObjectDownload(sourcedir),
))
),
block.WithSampleObserver(newSampleObserver(w.config.MetricsExporterEnabled, job)),
)
defer func() {
if err = os.RemoveAll(tempdir); err != nil {
level.Warn(logger).Log("msg", "failed to remove compaction directory", "path", tempdir, "err", err)
Expand Down Expand Up @@ -458,6 +463,13 @@ func (w *Worker) runCompaction(job *compactionJob) {
_ = deleteGroup.Wait()
}

func newSampleObserver(metricsExporterEnabled bool, job *compactionJob) block.SampleObserver {
if metricsExporterEnabled && job.CompactionLevel == 0 {
return metricsexport.NewMetricsExporterSampleObserver(job.Tenant, job.blocks[0])
}
return &block.NoOpObserver{}
}

func (w *Worker) getBlockMetadata(logger log.Logger, job *compactionJob) error {
ctx, cancel := context.WithTimeout(job.ctx, w.config.RequestTimeout)
defer cancel()
Expand Down
109 changes: 109 additions & 0 deletions pkg/experiment/metrics/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package metrics

import (
"context"
"net/url"
"time"

"github.com/gogo/protobuf/proto"
"github.com/klauspost/compress/snappy"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
)

type Exporter struct {
config Config
client remote.WriteClient
data map[AggregatedFingerprint]*TimeSeries
}

type Config struct {
url string
username string
password config.Secret
}

func NewExporter(tenant string, recordings []*Recording) *Exporter {
cfg := configFromTenant(tenant)
exporter := &Exporter{
config: cfg,
data: map[AggregatedFingerprint]*TimeSeries{},
}
for _, r := range recordings {
for fp, ts := range r.data {
exporter.data[fp] = ts
}
}
return exporter
}

func (e *Exporter) Send() error {
if len(e.data) == 0 {
return nil
}
if e.client == nil {
e.client = newClient(e.config)
}

p := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, 0, len(e.data))}
for _, ts := range e.data {
pts := prompb.TimeSeries{
Labels: make([]prompb.Label, 0, len(ts.Labels)),
}
for _, l := range ts.Labels {
pts.Labels = append(pts.Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
}
for _, s := range ts.Samples {
pts.Samples = append(pts.Samples, prompb.Sample{
Value: s.Value,
Timestamp: s.Timestamp,
})
}
p.Timeseries = append(p.Timeseries, pts)
}
buf := proto.NewBuffer(nil)
if err := buf.Marshal(p); err != nil {
return err
}
return e.client.Store(context.Background(), snappy.Encode(nil, buf.Bytes()), 0)
}

func newClient(cfg Config) remote.WriteClient {
wURL, err := url.Parse(cfg.url)
if err != nil {
panic(err)
}

c, err := remote.NewWriteClient("pyroscope-metrics-exporter", &remote.ClientConfig{
URL: &config.URL{URL: wURL},
Timeout: model.Duration(time.Second * 10),
HTTPClientConfig: config.HTTPClientConfig{
BasicAuth: &config.BasicAuth{
Username: cfg.username,
Password: cfg.password,
},
},
SigV4Config: nil,
AzureADConfig: nil,
Headers: nil,
RetryOnRateLimit: false,
})
if err != nil {
panic(err)
}
return c
}

func configFromTenant(tenant string) Config {
// TODO
return Config{
url: "omitted",
username: "omitted",
password: "omitted",
}
}
44 changes: 44 additions & 0 deletions pkg/experiment/metrics/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package metrics

import (
"fmt"

"github.com/cespare/xxhash/v2"
"github.com/oklog/ulid"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
)

type MetricsExporterSampleObserver struct {
tenant string
recorder *Recorder
}

func NewMetricsExporterSampleObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsExporterSampleObserver {
recordingTime := int64(ulid.MustParse(meta.Id).Time())
rules := recordingRulesFromTenant(tenant)
pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy)
return &MetricsExporterSampleObserver{
tenant: tenant,
recorder: NewRecorder(rules, recordingTime, pyroscopeInstance),
}
}

func pyroscopeInstanceHash(shard uint32, createdBy int32) string {
buf := make([]byte, 0, 8)
buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard))
buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy))
return fmt.Sprintf("%x", xxhash.Sum64(buf))
}

func (o *MetricsExporterSampleObserver) Observe(row block.ProfileEntry) {
o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue())
}

func (o *MetricsExporterSampleObserver) Flush() error {
go func() {
NewExporter(o.tenant, o.recorder.Recordings).Send() // TODO log error
}()
return nil
}
Loading
Loading