Skip to content

Commit d95b374

Browse files
authored
feat(pyroscope.receive_http): Support pushv1.Push in receive_http (#2431)
* feat(pyroscope.receive_http): Support pushv1.Push in receive_http /push.v1.PusherService/Push which is a connect API used by profilecli and pyroscope.write with pyroscope.ebpf and pyroscope.alloy. This is in addtion to the /ingest API the component already supports. * Update changelog and docs
1 parent d23c3af commit d95b374

File tree

4 files changed

+266
-14
lines changed

4 files changed

+266
-14
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ Main (unreleased)
3636

3737
- Bump snmp_exporter and embedded modules to 0.27.0. Add support for multi-module handling by comma separation and expose argument to increase SNMP polling concurrency for `prometheus.exporter.snmp`. (@v-zhuravlev)
3838

39+
- Add support for pushv1.PusherService Connect API in `pyroscope.receive_http`. (@simonswine)
40+
3941
v1.6.1
4042
-----------------
4143

docs/sources/reference/components/pyroscope/pyroscope.receive_http.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ title: pyroscope.receive_http
1212

1313
`pyroscope.receive_http` receives profiles over HTTP and forwards them to `pyroscope.*` components capable of receiving profiles.
1414

15-
The HTTP API exposed is compatible with the Pyroscope [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/).
15+
The HTTP API exposed is compatible with both the Pyroscope [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/) and the [pushv1.PusherService](https://github.com/grafana/pyroscope/blob/main/api/push/v1/push.proto) Connect API.
1616
This allows `pyroscope.receive_http` to act as a proxy for Pyroscope profiles, enabling flexible routing and distribution of profile data.
1717

1818
## Usage
@@ -30,6 +30,7 @@ pyroscope.receive_http "LABEL" {
3030
The component will start an HTTP server supporting the following endpoint.
3131

3232
* `POST /ingest` - send profiles to the component, which will be forwarded to the receivers as configured in the `forward_to argument`. The request format must match the format of the Pyroscope ingest API.
33+
* `POST /push.v1.PusherService/Push` - send profiles to the component, which will be forwarded to the receivers as configured in the `forward_to argument`. The request format must match the format of the Pyroscope pushv1.PusherService Connect API.
3334

3435
## Arguments
3536

internal/component/pyroscope/receive_http/receive_http.go

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99
"reflect"
1010
"sync"
1111

12+
"connectrpc.com/connect"
1213
"github.com/gorilla/mux"
1314
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/prometheus/model/labels"
1416
"golang.org/x/sync/errgroup"
1517

1618
"github.com/grafana/alloy/internal/component"
@@ -20,6 +22,9 @@ import (
2022
"github.com/grafana/alloy/internal/featuregate"
2123
"github.com/grafana/alloy/internal/runtime/logging/level"
2224
"github.com/grafana/alloy/internal/util"
25+
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
26+
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
27+
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
2328
)
2429

2530
const (
@@ -137,14 +142,81 @@ func (c *Component) Update(args component.Arguments) error {
137142
c.server = srv
138143

139144
return c.server.MountAndRun(func(router *mux.Router) {
145+
// this mounts the og pyroscope ingest API, mostly used by SDKs
140146
router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost)
147+
148+
// mount connect go pushv1
149+
pathPush, handlePush := pushv1connect.NewPusherServiceHandler(c)
150+
router.PathPrefix(pathPush).Handler(handlePush).Methods(http.MethodPost)
141151
})
142152
}
143153

144-
func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
154+
func setLabelBuilderFromAPI(lb *labels.Builder, api []*typesv1.LabelPair) {
155+
for i := range api {
156+
lb.Set(api[i].Name, api[i].Value)
157+
}
158+
}
159+
160+
func apiToAlloySamples(api []*pushv1.RawSample) []*pyroscope.RawSample {
161+
var (
162+
alloy = make([]*pyroscope.RawSample, len(api))
163+
)
164+
for i := range alloy {
165+
alloy[i] = &pyroscope.RawSample{
166+
RawProfile: api[i].RawProfile,
167+
}
168+
}
169+
return alloy
170+
}
171+
172+
func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest],
173+
) (*connect.Response[pushv1.PushResponse], error) {
174+
appendables := c.getAppendables()
175+
176+
// Create an errgroup with the timeout context
177+
g, ctx := errgroup.WithContext(ctx)
178+
179+
// Start copying the request body to all pipes
180+
for i := range appendables {
181+
appendable := appendables[i].Appender()
182+
g.Go(func() error {
183+
var (
184+
errs error
185+
lb = labels.NewBuilder(nil)
186+
)
187+
188+
for idx := range req.Msg.Series {
189+
lb.Reset(nil)
190+
setLabelBuilderFromAPI(lb, req.Msg.Series[idx].Labels)
191+
err := appendable.Append(ctx, lb.Labels(), apiToAlloySamples(req.Msg.Series[idx].Samples))
192+
if err != nil {
193+
errs = errors.Join(
194+
errs,
195+
fmt.Errorf("unable to append series %s to appendable %d: %w", lb.Labels().String(), i, err),
196+
)
197+
}
198+
}
199+
return errs
200+
})
201+
}
202+
if err := g.Wait(); err != nil {
203+
level.Error(c.opts.Logger).Log("msg", "Failed to forward profiles requests", "err", err)
204+
return nil, connect.NewError(connect.CodeInternal, err)
205+
}
206+
207+
level.Debug(c.opts.Logger).Log("msg", "Profiles successfully forwarded")
208+
return connect.NewResponse(&pushv1.PushResponse{}), nil
209+
}
210+
211+
func (c *Component) getAppendables() []pyroscope.Appendable {
145212
c.mut.Lock()
213+
defer c.mut.Unlock()
146214
appendables := c.appendables
147-
c.mut.Unlock()
215+
return appendables
216+
}
217+
218+
func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
219+
appendables := c.getAppendables()
148220

149221
// Create a pipe for each appendable
150222
pipeWriters := make([]io.Writer, len(appendables))

0 commit comments

Comments
 (0)