Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: record metrics from rules and export to remote #3861

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

alsoba13
Copy link
Contributor

In this PR, we introduce a first version of the metrics recorder and metrics exporter.

Every level 1 compaction job will record metrics from profiles in the form of time series. The recording will follow some recording rules given by config or an external service (for now, this is hardcoded to a single recording rule). The recorded metrics are exported to a remote after the compaction process.

Generated metrics are aggregations of total values of some kind of dimension (or profile type). The aggregation process is explained below:

  • Given a recording rule with a profile type T, a filter F (made of a set of key-value) and a set of labels E to export.
  • Every profile seen during the compaction that matches T and F will be considered for the aggregation.
  • To aggregate, profiles are grouped by E, resulting in multiple time series.
  • Every time serie will have a single sample with time = blockTime and value equal to the sum of all totalValues that match (T, F, E).
  • Hence, as we are adding up all totalValues that fulfill the conditions, we are conceptually aggregating by time (we discard the original profile timestamp and use the block time), resulting into a single sample per series per compaction job.

Example:

Let's consider the following profiles present in some blocks being compacted

profile  profile type  labels totalValue stacktraces (ignored) timestamp (ignored) 
memory alloc_space {service_name="worker", job="batch_compress", region="eu"} 10 ... ...
2 cpu samples {service_name="worker", job="batch_compress", region="eu"} 20 ... ...
3 cpu samples {service_name="API", region="eu"} 1 ... ...
4 cpu samples {service_name="worker", job="batch_compress", region="ap"} 30 ... ...
5 cpu samples {service_name="worker", job="batch_compress", region="us"} 40 ... ...
6 cpu samples {service_name="worker", job="batch_compress", region="eu"} 100 ... ...

And the following recording rule:
Name = "cpu_usage_compress_workers"
T = cpu samples
F = {service_name="worker", job="batch_compress"}
E = "region"

This will result in the following exported series and samples.
{__name__="cpu_usage_compress_workers", service_name="worker", job="batch_compress", region="eu"} = (t, 120)
{__name__="cpu_usage_compress_workers", service_name="worker", job="batch_compress", region="ap"} = (t, 30)
{__name__="cpu_usage_compress_workers", service_name="worker", job="batch_compress", region="us"} = (t, 40)

Note that Profile 1 was discarded by profile type. Profiles 2 and 6 were aggregated, and Profile 3 was discarded by filter. For all of the 3 exported samples, t = blockTime.

Given the distributed architecture and concurrent nature of compactors, and the chosen timestamp for samples, time collisions may happen. For that reason, an extra __pyroscope_instance__ label has been added, so that two compaction jobs may write to prometheus without causing overwrites. This intance id is computed from a worker id and a shard id.

Next steps:

  • Get the export config programmatically so every metric is exported to the expected datasource (tenant-wise)
  • Read rules from external service (tenant-settings?) and config.
  • Error handling: lack of error handling is evident. There's a lot of room here for improvement but we should strive to not interfere with compaction and consider retries vs metrics loss.

Out of scope right now:

  • functions/stacktraces processing

@alsoba13 alsoba13 requested a review from a team as a code owner January 21, 2025 14:52
@alsoba13 alsoba13 force-pushed the alsoba13/metrics-from-profiles-record-and-export branch from be2d95a to c90f289 Compare January 21, 2025 14:57
@alsoba13 alsoba13 marked this pull request as draft January 21, 2025 22:02
Copy link
Collaborator

@kolesnikovae kolesnikovae left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work, Alberto! 🚀

I'd like to discuss the queries we aim to answer. Have you analyzed how the exported metrics will be used? Just some example use cases

pkg/experiment/block/compaction.go Outdated Show resolved Hide resolved
pkg/experiment/block/compaction.go Outdated Show resolved Hide resolved
pkg/experiment/metrics/recorder.go Outdated Show resolved Hide resolved
pkg/experiment/metrics/recorder.go Outdated Show resolved Hide resolved
@alsoba13 alsoba13 force-pushed the alsoba13/metrics-from-profiles-record-and-export branch 3 times, most recently from 7d1e59b to 1700a83 Compare January 22, 2025 09:42
@alsoba13 alsoba13 marked this pull request as ready for review January 22, 2025 13:58
w := NewBlockWriter(dst, b.path, tmpdir)
defer func() {
err = multierror.New(err, w.Close()).Err()
}()
// Datasets are compacted in a strict order.
for _, s := range b.datasets {
s.registerSampleObserver(observer)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I think it's better to register observer in the dataset instead of passing it through this long call chain compact > mergeAndClose > merge > writeRow

@alsoba13 alsoba13 force-pushed the alsoba13/metrics-from-profiles-record-and-export branch from e1221a0 to ff45b50 Compare January 27, 2025 09:45
@alsoba13 alsoba13 force-pushed the alsoba13/metrics-from-profiles-record-and-export branch from ff45b50 to 56bc260 Compare January 27, 2025 10:43
@alsoba13 alsoba13 force-pushed the alsoba13/metrics-from-profiles-record-and-export branch from 2bb97ad to d8db16c Compare February 7, 2025 09:06
pkg/experiment/metrics/observer.go Outdated Show resolved Hide resolved
pkg/experiment/metrics/exporter.go Outdated Show resolved Hide resolved
Comment on lines +103 to +108
// TODO
return Config{
url: "omitted",
username: "omitted",
password: "omitted",
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this is hardcoded, how can this be used?

Eventually you could read from environment variables until this is figured out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will inject it by config/env vars in a following PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked in slack, I'll inject them by env vars in this PR

pkg/experiment/metrics/exporter.go Outdated Show resolved Hide resolved
pkg/experiment/metrics/exporter.go Outdated Show resolved Hide resolved
pkg/experiment/metrics/exporter.go Show resolved Hide resolved
}

func recordingRulesFromTenant(tenant string) []*RecordingRule {
// TODO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though you want to hardcode and test it, I would prefer those coming from e.g. an environment variable or a file (and parsed as yaml)so we can change them quicker than recompiling a new version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the code will be deleted at #3874 (although it hasn't been removed from the draft yet). On the other hand, supporting static recording rules was discarded. We may implement it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked in slack, I'll merge #3874 first.

Comment on lines 229 to +236
if err = s.compact(ctx, w); err != nil {
return nil, fmt.Errorf("compacting block: %w", err)
}
b.meta.Datasets = append(b.meta.Datasets, s.meta)
}
if err = observer.Flush(); err != nil {
return nil, fmt.Errorf("flushing sample observer: %w", err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I see no reasons why we want to fail a compaction job because of the metric export errors.
  2. We may want to skip the entire dataset if it cannot possibly match any rule.
  3. I'm not sure why we're flushing after all the datasets have been processed. My idea was to flush observer when the dataset is closed in compact: as we release resources referenced by ProfileEntry, observer must not access them after the Flush call.

Copy link
Contributor Author

@alsoba13 alsoba13 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes, I forgot to remove this. Since for this implementation error is always nul (async) I'll just ignore it.
  2. It will be hard to implement from block package with such a simple api (observe and flush), what do you really mean? Do you mean implementing it behind the scenes at the metrics.SampleObserver?
  3. My idea was to minimize network here, record them all and send just once for each rule/tenant. I see your point on not handling data after finishing the dataset, I'll have a closer look

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to skip the entire dataset if it cannot possibly match any rule.

It will be hard to implement from block package with such a simple api (observe and flush), what do you really mean? Do you mean implementing it behind the scenes at the metrics.SampleObserver?

It's more of a TODO that came to mind while I was reading this. Let's revisit it later if we encounter performance issues here – SampleObserver has access to the dataset the entry comes from and its TSDB index, which should be sufficient.

The thing is that we perform R * S evaluations, where S is the number of series and it might be quite large (10K+ per job), and R is the number of rules, that is not limited (which is another concern) and might be a matter of dozens / hundreds for a large tenant. We can transform it to a more predictable R * S', where S' is the ~constant number of input segments, but the evaluation (query) might be more expensive.

My idea was to minimize network here, record them all and send just once for each rule/tenant. I see your point on not handling data after finishing the dataset, I'll have a closer look

SampleObserver.Flush does not have to result in a network call. It's Exporter responsibility to do that.

Comment on lines 49 to 64
func (o *MetricsExporterSampleObserver) Flush() error {
rec := o.recorder
o.recorder = nil
go func(tenant string, recorder *Recorder) {
exporter, err := NewExporter(tenant, recorder.Recordings)
if err != nil {
level.Error(o.logger).Log("msg", "error creating metrics exporter", "err", err)
return
}

if err = exporter.Send(); err != nil {
level.Error(o.logger).Log("msg", "error sending recording metrics", "err", err)
}
}(o.tenant, rec)
return nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Exporter should be really a dependency of the MetricsExporterSampleObserver. Also, please, consider alternative names. For example, metrics.SampleObserver would say everything we need to know. Moreover, please consider incorporating MetricsExporterSampleObserver and Recorder – as far as I can see, there's no reason to separate their responsibilities.

I would suggest implementing a simple channel/queue that is handled in the background (e.g., per tenant, or tenant/rule). Keep in mind that we want to flush all samples on shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did all at b0f8c17.

Please note:

  • Exporter being a dependency of observer means creating it at observer creation time. Keep in mind that a single Observer will export to multiple tenant's datasources, so client can't be created there. As a result, Exporter becomes an empty struct (I just preserved so we can discuss further and for future extensions). Data is now injected at Send as you requested in another comment.

I'll create an issue for the channel/queue part, it makes all the sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exporter being a dependency of observer means creating it at observer creation time.

Not quite. See #3861 (comment)

pkg/experiment/metrics/recorder.go Outdated Show resolved Hide resolved
pkg/experiment/metrics/recorder.go Outdated Show resolved Hide resolved
Comment on lines 12 to 30
type Recorder struct {
Recordings []*Recording
recordingTime int64
pyroscopeInstance string
}

type Recording struct {
rule RecordingRule
data map[AggregatedFingerprint]*TimeSeries
state *recordingState
}

type recordingState struct {
fp *model.Fingerprint
matches bool
timeSeries *TimeSeries
}

func (r *Recording) InitState(fp model.Fingerprint, lbls phlaremodel.Labels, pyroscopeInstance string, recordingTime int64) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it's not clear why this is separate from MetricsExporterSampleObserver, given that the latter is practically a wrapper for Recorder – you can simply reset its state on MetricsExporterSampleObserver.Init

Copy link
Contributor Author

@alsoba13 alsoba13 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on the part of Recorder being merged to Observer. Already changed that part at b0f8c17.

But I'm not sure I understand your last sentence. Restarting state here is important because Recording object has it's own state that helps aggregating series. We don't InitState once as SampleObserver.Init. When we find a sample that hits the filters, we store an state so we can fast aggregate rows of the same series. InitState here will prepare the state to fast append the following rows.

pkg/experiment/metrics/exporter.go Outdated Show resolved Hide resolved
Comment on lines 23 to 31
func NewMetricsExporterSampleObserver(meta *metastorev1.BlockMeta, logger log.Logger) *MetricsExporterSampleObserver {
recordingTime := int64(ulid.MustParse(meta.Id).Time())
pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy)
return &MetricsExporterSampleObserver{
recordingTime: recordingTime,
pyroscopeInstance: pyroscopeInstance,
logger: logger,
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire thing with pyroscopeInstance label can be offloaded from this component. From its perspective, it is just an extra label it will be adding.

Copy link
Contributor Author

@alsoba13 alsoba13 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from this component

Clarification request: Do you mean to remove the pyroscopeInstance addition from the compaction-worker? Do you mean to create pyroscopeInstance label on the write path for every profile and include it on the export step?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the observer. See #3861 (comment)

Comment on lines 40 to 47
func (o *MetricsExporterSampleObserver) Init(tenant string) {
o.tenant = tenant
o.recorder = NewRecorder(recordingRulesFromTenant(tenant), o.recordingTime, o.pyroscopeInstance)
}

func (o *MetricsExporterSampleObserver) Observe(row block.ProfileEntry) {
o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: explicit Init is not required, tbh: block.ProfileEntry has a deference to the dataset the row comes from, including its metadata

Copy link
Contributor Author

@alsoba13 alsoba13 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I considered. But it means harsh trade-offs:

  • First, block.Dataset.Meta() won't have access to tenant name but tenant string index. Not sure if I'm missing something but couldn't extract from it. I could also extend block.Dataset, I guess it's not a big deal.
  • On the other hand, in the current implementation, not providing a tenant would give Observe more responsibilities that I would find convenient, i.e: fetching recording rules for that specific tenant. Note that there's a single Observer for all the tenants. Am I missing something? Were you imagining pre-fetching all the rules for all the tenants present in the source blocks at SampleObserver creation time? In fact I'm not sure that's possible, does L0 block meta have some hint on the tenants in that block (before reading from object storage)?

@kolesnikovae
Copy link
Collaborator

I think before merging the PR we should:

  1. Define the model of our rules. See the comment.
  2. Improve composition of components and clarify their lifecycle.
    • Usually, components that have side effects (e.g., external calls) are injected as dependencies and are not created in-place (like exporter).
    • We should not abandon goroutines (like in exporter)
  3. Implement component that manages recording rules.
    • We probably don't want to fetch them for each compaction job.
    • We don't want to fail compaction if the rules cannot be retrieved.
  4. Add basic unit tests.

@alsoba13 alsoba13 force-pushed the alsoba13/metrics-from-profiles-record-and-export branch from 41de226 to 0d9ffa6 Compare February 11, 2025 20:02
Comment on lines +53 to +56
buf := make([]byte, 0, 8)
buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard))
buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy))
return fmt.Sprintf("%x", xxhash.Sum64(buf))
Copy link
Collaborator

@kolesnikovae kolesnikovae Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to use binary.AppendUint32

buf := make([]byte, 0, 8)
buf = binary.BigEndian.AppendUint32(buf, shard)
buf = binary.BigEndian.AppendUint32(buf, createdBy)

Or PutUint32:

buf := make([]byte, 8)
binary.BigEndian.PutUint32(buf[0:4], shard)
binary.BigEndian.PutUint32(buf[4:8], createdBy)

The order (including endianness) does not matter much.

Comment on lines +41 to +44
func NewSampleObserver(meta *metastorev1.BlockMeta, logger log.Logger) *SampleObserver {
recordingTime := int64(ulid.MustParse(meta.Id).Time())
pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy)
return &SampleObserver{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how I would approach this.


We pass metadata to handle it in the constructor. I suggest that we alter the signature: observer should not know about the blocks and their metadata (it's not its concern) – that's much easier to use and test.

type Ruler interface {
    RecordingRules(tenant string) []*<package>.RecordingRule
}

type Exporter interface {
    // I don't think we can handle these errors. Therefore, I'd remove it altogether
    // to avoid confusion: it is always on the Exporter to handle them.
    // The same goes to `Flush`: compactor does not want to know about those either.
    Send(tenant string, samples []prompb.TimeSeries) error
}

func NewSampleObserver(unixMillis int64, exporter Exporter, ruler Ruler, labels ...[]labels.Label) *SampleObserver

I don't see how we can use logger here, so I removed it. It's on the Exporter to handle errors – the observer, as well as the compactor, cannot handle them.

For me it's not clear, whether we restrict a tenant to a single exporter (read: time series database), and where the remote write setup comes from. In the snippet above, I assume that the "remote write" targets are not part of the recording rules and a tenant can have only one exporter.

We may extend this later. I'd suggest that each rule explicitly specifies where to export the data (named exporters) – we don't have to include all the details like URL, creds, etc. in the export rule, just a name. Then, the Exporter component would also take the name parameter to identify the remote target.


We add ruler and exporter to the compaction worker, and initialize them once the compaction worker is started (if required). As these are shared by all threads, we'd need to ensure thread-safety.

// TODO for me: make the compaction worker thread first-class citizen.

func New(
    logger log.Logger,
    config Config,
    client MetastoreClient,
    storage objstore.Bucket,
    reg prometheus.Registerer,
    ruler Ruler,
    exporter Exporter,
) (*Worker, error) 

On the caller side, where we initialize the compaction worker (in modules):

ruler := NewStaticRulerFromEnvVars(...)
exporter := NewStaticExporterFromEnvVars(...)
compactionworker.New(..., ruler, exporter)

Then, in the compaction worker, when a compaction job is created, we construct a new sample observer.

func buildSampleObserver(md *metastorev1.BlockMeta) SampleObserver {
    if md.CompactionLevel > 0 {
       return nil
    }
    timestamp := 0 // Get from the block metadata.
    labels := nil // Create from the block metadata.
    return metrics.NewSampleObserver(timestamp, w.exporter, w.ruler, labels...)
}

compacted, err := block.Compact(..., WithSampleObserver(buildSampleObserver(md)))

Options should be optional – as of now, if the option is not specified, compaction will panic: nil observer should not be called in writeRow, and NoOpObserver is not needed.


Finally, on shutdown, when we return from the running:

func (w *Worker) running(ctx context.Context) error {
   // ...
   <-pollingDone
   // Force exporter to send all staged samples (depends on the implementation)
   // Must be a blocking call.
   w.exporter.Flush()
   return nil
}

Alternatively, you can add service manager and manage ruler and exporter as sub-services, but that's way less obvious.

Comment on lines +87 to +118
func (o *SampleObserver) Flush() error {
recs := o.Recordings
o.Recordings = nil
go func(tenant string, recordings []*Recording) {
timeSeries := make([]prompb.TimeSeries, 0)
for _, recording := range recordings {
for _, sample := range recording.data {
ts := prompb.TimeSeries{
Labels: make([]prompb.Label, 0, len(sample.Labels)),
Samples: []prompb.Sample{
{
Value: sample.Value,
Timestamp: sample.Timestamp,
},
},
}
for _, l := range sample.Labels {
ts.Labels = append(ts.Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
}
timeSeries = append(timeSeries, ts)
}
}

if err := o.exporter.Send(tenant, timeSeries); err != nil {
level.Error(o.logger).Log("msg", "error sending recording metrics", "err", err)
}
}(o.tenant, recs)
return nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should control concurrency in the Exporter implementation. Otherwise, it's implied that the SampleObserver knows implementation details of the Exporter. Moreover, a method called Flush is typically meant to be blocking.

Comment on lines +24 to +36
func (e *Exporter) Send(tenant string, data []prompb.TimeSeries) error {
p := &prompb.WriteRequest{Timeseries: data}
buf := proto.NewBuffer(nil)
if err := buf.Marshal(p); err != nil {
return err
}
cfg := configFromTenant(tenant)
client, err := newClient(cfg)
if err != nil {
return err
}
return client.Store(context.Background(), snappy.Encode(nil, buf.Bytes()), 0)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we're aimed to promote it from the prototype state, I think we should consider refactoring this bit.

  1. We should avoid creating a new client on every Send call.

    • Exporter should maintain a pool of tenant-specific clients (see my other comment about the remote write targets, we probably should consider make the clients named, if a single tenant might have multiple write targets).
    • Exporter should poll for changes (of get notifications from outside) and reconcile the pool with the configuration.
    • Clients should be initialized lazily, on access.
  2. Context: it is fine to use the background context here, however, we should consider tracing of the Send method and the outgoing call.

  3. We may want to add metrics to keep track of the latencies and errors.

  4. Limiting the amount of data per a client.Store call. This probably should be implemented as part of the background queue.

  5. Here and elsewhere: if we can avoid allocating an empty buffer, we shouldn't miss the opportunity. I believe it's entirely possible to reuse a buffer that has already been allocated.

Copy link
Collaborator

@kolesnikovae kolesnikovae Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we assume that a single remote write target is used (for all tenants, for all rules). This allows to simplify the p.1 (tenant-specific clients) as we only have to create a single client with a static configuration. The routing of the requests can be implemented on the other side.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Context: it is fine to use the background context here, however, we should consider tracing of the Send method and the outgoing call.

We can add it later, if needed. Metrics are more important.

  1. We may want to add metrics to keep track of the latencies and errors.

The client we use probably already has it. We just need to make sure these are enabled/configured (ideally, we need pyroscope_ prefix).

  1. Limiting the amount of data per a client.Store call. This probably should be implemented as part of the background queue.

Not now. Let's use a simple wait group (a goroutine per call in Exporter) and wait for completion on shutdown of the exporter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants