Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ make docker-image/pyroscope/build # Build Docker image
- **parquet-go**: Parquet file format implementation
- **go-kit/log**: Structured logging
- **prometheus/client_golang**: Metrics instrumentation
- **opentracing-go**: Distributed tracing
- **opentelemetry**: Distributed tracing

## When Working on Features

Expand Down
4 changes: 2 additions & 2 deletions examples/tracing/golang-push/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ less than the sample interval (10ms).
### Instrumentation

- `rideshare` demo application instrumented with OpenTelemetry: [OTel integration] . Please refer to our [documentation] for more details.
- `pyroscope` itself is instrumented with `opentracing-go` SDK and [`spanprofiler`] for profiling integration.
- `pyroscope` itself is instrumented with OpenTelemetry and [`otel-profiling-go`] for profiling integration.

[OTel integration]:https://github.com/grafana/otel-profiling-go
[`spanprofiler`]:https://github.com/grafana/dskit/tree/main/spanprofiler
[`otel-profiling-go`]:https://github.com/grafana/otel-profiling-go
[documentation]:https://grafana.com/docs/pyroscope/latest/configure-client/trace-span-profiles/go-span-profiles/


Expand Down
2 changes: 1 addition & 1 deletion examples/tracing/tempo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ less than the sample interval (10ms).
- `rideshare` demo application instrumented with OpenTelemetry:
- Go [OTel integration](https://github.com/grafana/otel-profiling-go)
- Java [OTel integration](https://github.com/grafana/otel-profiling-java)
- `pyroscope` itself is instrumented with `opentracing-go` SDK and [`spanprofiler`](https://github.com/grafana/dskit/tree/main/spanprofiler) for profiling integration.
- `pyroscope` itself is instrumented with OpenTelemetry and [`otel-profiling-go`](https://github.com/grafana/otel-profiling-go) for profiling integration.

### Grafana Tempo configuration

Expand Down
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo/v2 v2.26.0
github.com/onsi/gomega v1.38.2
github.com/opentracing-contrib/go-grpc v0.1.2
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b
github.com/parquet-go/parquet-go v0.24.0
github.com/pkg/errors v0.9.1
github.com/planetscale/vtprotobuf v0.6.1-0.20250313105119-ba97887b0a25
Expand All @@ -75,7 +73,6 @@ require (
github.com/spf13/afero v1.15.0
github.com/stretchr/testify v1.11.1
github.com/thanos-io/objstore v0.0.0-20250813080715-4e5fd4289b50
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/valyala/bytebufferpool v1.0.0
github.com/xlab/treeprint v1.2.0
go.etcd.io/bbolt v1.4.3
Expand Down Expand Up @@ -106,6 +103,9 @@ require (

require (
github.com/Masterminds/semver/v3 v3.4.0 // indirect
github.com/opentracing-contrib/go-grpc v0.1.2 // indirect
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
)

Expand Down Expand Up @@ -204,7 +204,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.2 // indirect
github.com/grafana/jfr-parser v0.13.0
github.com/grafana/otel-profiling-go v0.5.1 // indirect
github.com/grafana/otel-profiling-go v0.5.1
github.com/hashicorp/consul/api v1.32.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down Expand Up @@ -295,10 +295,10 @@ require (
go.opentelemetry.io/contrib/bridges/otelzap v0.11.0 // indirect
go.opentelemetry.io/contrib/bridges/prometheus v0.61.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.38.0 // indirect
go.opentelemetry.io/contrib/exporters/autoexport v0.61.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
go.opentelemetry.io/contrib/exporters/autoexport v0.61.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.61.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0
go.opentelemetry.io/contrib/propagators/jaeger v1.35.0 // indirect
go.opentelemetry.io/contrib/samplers/jaegerremote v0.30.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
Expand All @@ -315,10 +315,10 @@ require (
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.36.0 // indirect
go.opentelemetry.io/otel/log v0.12.2 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/sdk/log v0.12.2 // indirect
go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.46.0 // indirect
Expand Down
17 changes: 8 additions & 9 deletions pkg/compactionworker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tracing"
"github.com/oklog/ulid/v2"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -371,14 +371,13 @@ func (w *Worker) runCompaction(job *compactionJob) {
}()

w.metrics.jobsInProgress.WithLabelValues(metricLabels...).Inc()
sp, ctx := opentracing.StartSpanFromContext(job.ctx, "runCompaction",
opentracing.Tag{Key: "Job", Value: job.String()},
opentracing.Tag{Key: "Tenant", Value: job.Tenant},
opentracing.Tag{Key: "Shard", Value: job.Shard},
opentracing.Tag{Key: "CompactionLevel", Value: job.CompactionLevel},
opentracing.Tag{Key: "SourceBlocks", Value: len(job.SourceBlocks)},
opentracing.Tag{Key: "Tombstones", Value: len(job.Tombstones)},
)
sp, ctx := tracing.StartSpanFromContext(job.ctx, "runCompaction")
sp.SetTag("Job", job.String())
sp.SetTag("Tenant", job.Tenant)
sp.SetTag("Shard", job.Shard)
sp.SetTag("CompactionLevel", job.CompactionLevel)
sp.SetTag("SourceBlocks", len(job.SourceBlocks))
sp.SetTag("Tombstones", len(job.Tombstones))
defer sp.Finish()

logger := log.With(w.logger, "job", job.Name)
Expand Down
80 changes: 39 additions & 41 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/runutil"
"github.com/grafana/dskit/tracing"
"github.com/oklog/ulid/v2"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"

"github.com/grafana/pyroscope/pkg/objstore"
Expand Down Expand Up @@ -102,7 +103,7 @@ func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bu

// SyncMetas synchronizes local state of block metas with what we have in the bucket.
func (s *Syncer) SyncMetas(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SyncMetas")
sp, ctx := tracing.StartSpanFromContext(ctx, "SyncMetas")
defer sp.Finish()
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -129,7 +130,7 @@ func (s *Syncer) Metas() map[ulid.ULID]*block.Meta {
// block with a higher compaction level.
// Call to SyncMetas function is required to populate duplicateIDs in duplicateBlocksFilter.
func (s *Syncer) GarbageCollect(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "GarbageCollect")
sp, ctx := tracing.StartSpanFromContext(ctx, "GarbageCollect")
defer sp.Finish()
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down Expand Up @@ -352,7 +353,8 @@ func (c *BlockCompactor) CompactWithSplitting(ctx context.Context, dest string,
}()

err = func() error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "OpenBlocks", opentracing.Tag{Key: "concurrency", Value: c.blockOpenConcurrency})
sp, ctx := tracing.StartSpanFromContext(ctx, "OpenBlocks")
sp.SetTag("concurrency", c.blockOpenConcurrency)
defer sp.Finish()
// Open all blocks
return concurrency.ForEachJob(ctx, len(readers), c.blockOpenConcurrency, func(ctx context.Context, idx int) error {
Expand Down Expand Up @@ -380,9 +382,7 @@ func (c *BlockCompactor) CompactWithSplitting(ctx context.Context, dest string,
}
}
currentLevel++
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.SetTag("compaction_level", currentLevel)
}
trace.SpanFromContext(ctx).SetAttributes(attribute.Int("compaction_level", currentLevel))
start := time.Now()
defer func() {
c.metrics.Duration.WithLabelValues(fmt.Sprintf("%d", currentLevel)).Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -463,26 +463,25 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul

level.Info(jobLogger).Log("msg", "compaction available and planned; downloading blocks", "blocks", len(toCompact), "plan", fmt.Sprintf("%v", toCompact))

sp, ctx := opentracing.StartSpanFromContext(ctx, "CompactJob",
opentracing.Tag{Key: "GroupKey", Value: job.Key()},
opentracing.Tag{Key: "Job", Value: job.String()},
opentracing.Tag{Key: "Labels", Value: job.Labels().String()},
opentracing.Tag{Key: "MinCompactionLevel", Value: job.MinCompactionLevel()},
opentracing.Tag{Key: "Resolution", Value: job.Resolution()},
opentracing.Tag{Key: "ShardKey", Value: job.ShardingKey()},
opentracing.Tag{Key: "SplitStageSize", Value: job.SplitStageSize()},
opentracing.Tag{Key: "UseSplitting", Value: job.UseSplitting()},
opentracing.Tag{Key: "SplittingShards", Value: job.SplittingShards()},
opentracing.Tag{Key: "BlockCount", Value: len(toCompact)},
)
sp, ctx := tracing.StartSpanFromContext(ctx, "CompactJob")
sp.SetTag("GroupKey", job.Key())
sp.SetTag("Job", job.String())
sp.SetTag("Labels", job.Labels().String())
sp.SetTag("MinCompactionLevel", job.MinCompactionLevel())
sp.SetTag("Resolution", job.Resolution())
sp.SetTag("ShardKey", job.ShardingKey())
sp.SetTag("SplitStageSize", job.SplitStageSize())
sp.SetTag("UseSplitting", job.UseSplitting())
sp.SetTag("SplittingShards", job.SplittingShards())
sp.SetTag("BlockCount", len(toCompact))
defer sp.Finish()

blocksToCompactDirs := make([]string, len(toCompact))
// Once we have a plan we need to download the actual data.
downloadBegin := time.Now()

err = func() error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "DownloadBlocks")
sp, ctx := tracing.StartSpanFromContext(ctx, "DownloadBlocks")
defer func() {
elapsed := time.Since(downloadBegin)
level.Info(jobLogger).Log("msg", "downloaded and verified blocks; compacting blocks", "blocks", len(blocksToCompactDirs), "plan", fmt.Sprintf("%v", blocksToCompactDirs), "duration", elapsed, "duration_ms", elapsed.Milliseconds())
Expand All @@ -508,12 +507,12 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
return nil
}()
if err != nil {
ext.LogError(sp, err)
sp.LogError(err)
return false, nil, err
}

err = func() error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "CompactBlocks")
sp, ctx := tracing.StartSpanFromContext(ctx, "CompactBlocks")
compactionBegin := time.Now()
defer func() {
sp.Finish()
Expand All @@ -534,7 +533,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
return err
}()
if err != nil {
ext.LogError(sp, err)
sp.LogError(err)
return false, nil, errors.Wrapf(err, "compact blocks %v", blocksToCompactDirs)
}

Expand All @@ -545,12 +544,12 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
}

// Spawn a new context so we always finish uploading and marking a block for deletion in full on shutdown.
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute)
ctx = opentracing.ContextWithSpan(ctx, sp)
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 20*time.Minute)
defer cancel()

err = func() error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "Uploading blocks", opentracing.Tag{Key: "count", Value: len(compIDs)})
sp, ctx := tracing.StartSpanFromContext(ctx, "Uploading blocks")
sp.SetTag("count", len(compIDs))
uploadBegin := time.Now()
uploadedBlocks := atomic.NewInt64(0)
defer func() {
Expand Down Expand Up @@ -587,11 +586,12 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
}()

if err != nil {
ext.LogError(sp, err)
sp.LogError(err)
return false, nil, err
}

sp, ctx = opentracing.StartSpanFromContext(ctx, "Deleting blocks", opentracing.Tag{Key: "count", Value: len(compIDs)})
sp, ctx = tracing.StartSpanFromContext(ctx, "Deleting blocks")
sp.SetTag("count", len(compIDs))
defer sp.Finish()
// Mark for deletion the blocks we just compacted from the job and bucket so they do not get included
// into the next planning cycle.
Expand Down Expand Up @@ -773,10 +773,8 @@ func NewBucketCompactor(
// Compact runs compaction over bucket.
// If maxCompactionTime is positive then after this time no more new compactions are started.
func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Duration) (rerr error) {
sp := opentracing.SpanFromContext(ctx)
if sp == nil {
sp, ctx = opentracing.StartSpanFromContext(ctx, "Compact")
}
sp, ctx := tracing.StartSpanFromContext(ctx, "Compact")
defer sp.Finish()
sp.SetTag("max_compaction_time", maxCompactionTime)
sp.SetTag("concurrency", c.concurrency)
defer func() {
Expand Down Expand Up @@ -854,32 +852,32 @@ func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Du

level.Info(c.logger).Log("msg", "start sync of metas")
if err := c.sy.SyncMetas(ctx); err != nil {
ext.LogError(sp, err)
sp.LogError(err)
return errors.Wrap(err, "sync")
}

level.Info(c.logger).Log("msg", "start of GC")
// Blocks that were compacted are garbage collected after each Compaction.
// However if compactor crashes we need to resolve those on startup.
if err := c.sy.GarbageCollect(ctx); err != nil {
ext.LogError(sp, err)
sp.LogError(err)
return errors.Wrap(err, "blocks garbage collect")
}

jobs, err := c.grouper.Groups(c.sy.Metas())
if err != nil {
ext.LogError(sp, err)
sp.LogError(err)
return errors.Wrap(err, "build compaction jobs")
}
sp.LogKV("discovered_jobs", len(jobs))
sp.SetTag("discovered_jobs", len(jobs))

// There is another check just before we start processing the job, but we can avoid sending it
// to the goroutine in the first place.
jobs, err = c.filterOwnJobs(jobs)
if err != nil {
return err
}
sp.LogKV("own_jobs", len(jobs))
sp.SetTag("own_jobs", len(jobs))

// Record the difference between now and the max time for a block being compacted. This
// is used to detect compactors not being able to keep up with the rate of blocks being
Expand All @@ -891,7 +889,7 @@ func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Du

// Skip jobs for which the wait period hasn't been honored yet.
jobs = c.filterJobsByWaitPeriod(ctx, jobs)
sp.LogKV("filtered_jobs", len(jobs))
sp.SetTag("filtered_jobs", len(jobs))

// Sort jobs based on the configured ordering algorithm.
jobs = c.sortJobs(jobs)
Expand All @@ -916,14 +914,14 @@ func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Du
for _, g := range jobs {
select {
case jobErr := <-errChan:
ext.LogError(sp, jobErr)
sp.LogError(jobErr)
jobErrs.Add(jobErr)
break jobLoop
case jobChan <- g:
case <-maxCompactionTimeChan:
maxCompactionTimeReached = true
level.Info(c.logger).Log("msg", "max compaction time reached, no more compactions will be started")
sp.LogKV("msg", "max compaction time reached, no more compactions will be started")
sp.SetTag("msg", "max compaction time reached, no more compactions will be started")
break jobLoop
}
}
Expand Down
Loading