Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add live graph backend (merge to feature branch) #2645

Open
wants to merge 11 commits into
base: alloy-live-graph
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ require (
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-git/go-billy/v5 v5.6.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/go-kit/kit v0.13.0 // indirect
github.com/go-kit/kit v0.13.0
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
Expand Down Expand Up @@ -883,9 +883,6 @@ require (
github.com/containers/common v0.61.0 // indirect
github.com/deneonet/benc v1.1.2 // indirect
github.com/itchyny/timefmt-go v0.1.6 // indirect
github.com/onsi/ginkgo/v2 v2.21.0 // indirect
github.com/onsi/gomega v1.35.1 // indirect
go.etcd.io/bbolt v1.3.11 // indirect
)

// NOTE: replace directives below must always be *temporary*.
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1852,17 +1852,11 @@ github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoA
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gosnmp/gosnmp v1.37.0 h1:/Tf8D3b9wrnNuf/SfbvO+44mPrjVphBhRtcGg22V07Y=
github.com/gosnmp/gosnmp v1.37.0/go.mod h1:GDH9vNqpsD7f2HvZhKs5dlqSEcAS6s6Qp099oZRCR+M=
github.com/gosnmp/gosnmp v1.38.0 h1:I5ZOMR8kb0DXAFg/88ACurnuwGwYkXWq3eLpJPHMEYc=
github.com/gosnmp/gosnmp v1.38.0/go.mod h1:FE+PEZvKrFz9afP9ii1W3cprXuVZ17ypCcyyfYuu5LY=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/alloy-remote-config v0.0.9 h1:gy34SxZ8Iq/HrDTIFZi80+8BlT+FnJhKiP9mryHNEUE=
github.com/grafana/alloy-remote-config v0.0.9/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k=
github.com/grafana/beyla v0.0.0-20250108110233-3f1b9b55c6dc h1:oY8yQB8IG0dBo1UrLlLC2CspxbiVtSWWExMxXOnfWgk=
github.com/grafana/beyla v0.0.0-20250108110233-3f1b9b55c6dc/go.mod h1:hpk185gTeIQXjxV/so9vAxhZtSEgm8ODanWXZNVnH2M=
github.com/grafana/beyla v1.9.1-0.20250122195759-1117708def46 h1:/aw+Ze9lUluE1hNZ0fAtwhmf2CKP0VbsLFumpN8xztY=
github.com/grafana/beyla v1.9.1-0.20250122195759-1117708def46/go.mod h1:CRWu15fkScScSYBlYUtdJu2Ak8ojGvnuwHToGGkaOXY=
github.com/grafana/beyla v1.10.0-alloy h1:kGyZtBSS/Br2qdhbvzu8sVYZHuE9a3OzWpbp6gN55EY=
github.com/grafana/beyla v1.10.0-alloy/go.mod h1:CRWu15fkScScSYBlYUtdJu2Ak8ojGvnuwHToGGkaOXY=
github.com/grafana/cadvisor v0.0.0-20240729082359-1f04a91701e2 h1:ju6EcY2aEobeBg185ETtFCKj5WzaQ48qfkbsSRRQrF4=
Expand Down Expand Up @@ -2126,8 +2120,6 @@ github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465 h1:KwWnWVWCNtNq/ewIX7HIKnELmEx2nDP42yskD/pi7QE=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/ianlancetaylor/demangle v0.0.0-20240912202439-0a2b6291aafd h1:EVX1s+XNss9jkRW9K6XGJn2jL2lB1h5H804oKPsxOec=
github.com/ianlancetaylor/demangle v0.0.0-20240912202439-0a2b6291aafd/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 h1:hk4LPqXIY/c9XzRbe7dA6qQxaT6Axcbny0L/G5a4owQ=
Expand Down
1 change: 1 addition & 0 deletions internal/alloycli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error {
uiService := uiservice.New(uiservice.Options{
UIPrefix: fr.uiPrefix,
CallbackManager: liveDebuggingService.Data().(livedebugging.CallbackManager),
Logger: log.With(l, "service", "ui"),
})

otelService := otel_service.New(l)
Expand Down
5 changes: 2 additions & 3 deletions internal/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ type DebugComponent interface {
DebugInfo() interface{}
}

// LiveDebugging is an interface used by the components that support the live debugging feature.
// LiveDebugging is a marker interface to check if a component supports live debugging.
type LiveDebugging interface {
// LiveDebugging is invoked when the number of consumers changes.
LiveDebugging(consumers int)
LiveDebugging() // This function is never called.
}
11 changes: 7 additions & 4 deletions internal/component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,12 @@ func (c *Component) runDiscovery(ctx context.Context, d DiscovererWithMetrics) {
send := func() {
allTargets := toAlloyTargets(cache)
componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s", allTargets))
}
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
componentID,
livedebugging.Target,
uint64(len(allTargets)),
func() string { return fmt.Sprintf("%s", allTargets) },
))
c.opts.OnStateChange(Exports{Targets: allTargets})
}

Expand Down Expand Up @@ -285,4 +288,4 @@ func toAlloyTargets(cache map[string]*targetgroup.Group) []Target {
return allTargets
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
11 changes: 7 additions & 4 deletions internal/component/discovery/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ func (c *Component) Run(ctx context.Context) error {
c.changed()

componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s", c.processes))
}
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
componentID,
livedebugging.Target,
uint64(len(c.processes)),
func() string { return fmt.Sprintf("%s", c.processes) },
))

return nil
}
Expand Down Expand Up @@ -109,4 +112,4 @@ func (c *Component) changed() {
})
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
11 changes: 7 additions & 4 deletions internal/component/discovery/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ func (c *Component) Update(args component.Arguments) error {
targets = append(targets, promLabelsToComponent(relabelled))
}
componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lset.String(), relabelled.String()))
}
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
componentID,
livedebugging.Target,
1,
func() string { return fmt.Sprintf("%s => %s", lset.String(), relabelled.String()) },
))
}

c.opts.OnStateChange(Exports{
Expand All @@ -111,7 +114,7 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}

func componentMapToPromLabels(ls discovery.Target) labels.Labels {
res := make([]labels.Label, 0, len(ls))
Expand Down
24 changes: 17 additions & 7 deletions internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,14 @@ func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) {
return
case entry := <-c.receiver.Chan():
c.mut.RLock()
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("[IN]: timestamp: %s, entry: %s, labels: %s", entry.Timestamp.Format(time.RFC3339Nano), entry.Line, entry.Labels.String()))
}
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
componentID,
livedebugging.LokiLog,
0, // does not count because we count only the data that exists
func() string {
return fmt.Sprintf("[IN]: timestamp: %s, entry: %s, labels: %s", entry.Timestamp.Format(time.RFC3339Nano), entry.Line, entry.Labels.String())
},
))
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -194,9 +199,14 @@ func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) {

// The log entry is the same for every fanout,
// so we can publish it only once.
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("[OUT]: timestamp: %s, entry: %s, labels: %s", entry.Timestamp.Format(time.RFC3339Nano), entry.Line, entry.Labels.String()))
}
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
componentID,
livedebugging.LokiLog,
1,
func() string {
return fmt.Sprintf("[OUT]: timestamp: %s, entry: %s, labels: %s", entry.Timestamp.Format(time.RFC3339Nano), entry.Line, entry.Labels.String())
},
))

for _, f := range fanout {
select {
Expand All @@ -221,4 +231,4 @@ func stagesChanged(prev, next []stages.StageConfig) bool {
return false
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
2 changes: 1 addition & 1 deletion internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func getServiceDataWithLiveDebugging(log *testlivedebugging.Log) func(string) (i
ld.AddCallback(
"callback1",
"",
func(data string) { log.Append(data) },
func(data *livedebugging.Data) { log.Append(data.DataFunc()) },
)

return func(name string) (interface{}, error) {
Expand Down
15 changes: 12 additions & 3 deletions internal/component/loki/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,18 @@ func (c *Component) Run(ctx context.Context) error {
c.metrics.entriesProcessed.Inc()
lbls := c.relabel(entry)

if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("entry: %s, labels: %s => %s", entry.Line, entry.Labels.String(), lbls.String()))
count := uint64(1)
if len(lbls) == 0 {
count = 0 // if no labels are left, the count is not incremented because the log will be filtered out
}
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
componentID,
livedebugging.LokiLog,
count,
func() string {
return fmt.Sprintf("entry: %s, labels: %s => %s", entry.Line, entry.Labels.String(), lbls.String())
},
))

if len(lbls) == 0 {
level.Debug(c.opts.Logger).Log("msg", "dropping entry after relabeling", "labels", entry.Labels.String())
Expand Down Expand Up @@ -246,4 +255,4 @@ func (c *Component) process(e loki.Entry) model.LabelSet {
return relabeled
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
14 changes: 10 additions & 4 deletions internal/component/loki/secretfilter/secretfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,15 @@ func (c *Component) Run(ctx context.Context) error {
c.mut.RLock()
// Start processing the log entry to redact secrets
newEntry := c.processEntry(entry)
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", entry.Line, newEntry.Line))
}

c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
componentID,
livedebugging.LokiLog,
1,
func() string {
return fmt.Sprintf("%s => %s", entry.Line, newEntry.Line)
},
))

for _, f := range c.fanout {
select {
Expand Down Expand Up @@ -376,4 +382,4 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
39 changes: 20 additions & 19 deletions internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"context"
"errors"
"os"
"sync"

"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
otelconnector "go.opentelemetry.io/collector/connector"
otelextension "go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pipeline"
sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
Expand All @@ -20,9 +22,10 @@ import (
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/interceptorconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingpublisher"
"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util/zapadapter"
Expand Down Expand Up @@ -79,10 +82,11 @@ type Connector struct {
sched *scheduler.Scheduler
collector *lazycollector.Collector

liveDebuggingConsumer *livedebuggingconsumer.Consumer
debugDataPublisher livedebugging.DebugDataPublisher
debugDataPublisher livedebugging.DebugDataPublisher

args Arguments

updateMut sync.Mutex
}

var (
Expand All @@ -105,7 +109,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn

ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.NewPaused(ctx)
consumer := lazyconsumer.NewPaused(ctx, opts.ID)

// Create a lazy collector where metrics from the upstream component will be
// forwarded.
Expand All @@ -127,10 +131,9 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn
factory: f,
consumer: consumer,

liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,
}
if err := p.Update(args); err != nil {
return nil, err
Expand Down Expand Up @@ -193,8 +196,6 @@ func (p *Connector) Update(args component.Arguments) error {

next := p.args.NextConsumers()

liveDebuggingActive := p.debugDataPublisher.IsActive(livedebugging.ComponentID(p.opts.ID))

// Create instances of the connector from our factory for each of our
// supported telemetry signals.
var components []otelcomponent.Component
Expand All @@ -210,12 +211,14 @@ func (p *Connector) Update(args component.Arguments) error {
}

if len(next.Metrics) > 0 {
metrics := next.Metrics
if liveDebuggingActive {
metrics = append(metrics, p.liveDebuggingConsumer)
}
nextMetrics := fanoutconsumer.Metrics(metrics)
tracesConnector, err = p.factory.CreateTracesToMetrics(p.ctx, settings, connectorConfig, nextMetrics)
fanout := fanoutconsumer.Metrics(next.Metrics)
metricsInterceptor := interceptorconsumer.Metrics(fanout, false,
func(ctx context.Context, md pmetric.Metrics) error {
livedebuggingpublisher.PublishMetricsIfActive(p.debugDataPublisher, p.opts.ID, md, next.Metrics)
return fanout.ConsumeMetrics(ctx, md)
},
)
tracesConnector, err = p.factory.CreateTracesToMetrics(p.ctx, settings, connectorConfig, metricsInterceptor)
if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) {
return err
} else if tracesConnector != nil {
Expand All @@ -240,6 +243,4 @@ func (p *Connector) CurrentHealth() component.Health {
return p.sched.CurrentHealth()
}

func (p *Connector) LiveDebugging(_ int) {
p.Update(p.args)
}
func (p *Connector) LiveDebugging() {}
Loading
Loading