Skip to content

Commit

Permalink
agent: Send last billing/scaling events before shutdown
Browse files Browse the repository at this point in the history
Follow-up to #1107, ref #1220.

Basically, this change adds support into 'pkg/reporting' (which the
autoscaler-agent uses for billing and scaling events) to cleanly send
all remaining events on shutdown.

This has three pieces:

1. The event sender threads now send and exit when their context expires;
2. reporting.NewEventSink() now takes a parent context (or .Finish() can
   be called instead to explicitly trigger a clean exit); and
3. reporting.NewEventSink() now spawns the senders with a taskgroup.Group
   passed in by the caller, so that shutdown of the entire
   autoscaler-agent will wait for sending to complete.
  • Loading branch information
sharnoff committed Jan 27, 2025
1 parent 4e81d97 commit cafbcd4
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 13 deletions.
9 changes: 8 additions & 1 deletion pkg/agent/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/billing"
"github.com/neondatabase/autoscaling/pkg/reporting"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

type Config struct {
Expand Down Expand Up @@ -72,6 +73,7 @@ type MetricsCollector struct {
func NewMetricsCollector(
ctx context.Context,
parentLogger *zap.Logger,
tg taskgroup.Group,
conf *Config,
metrics PromMetrics,
) (*MetricsCollector, error) {
Expand All @@ -82,7 +84,10 @@ func NewMetricsCollector(
return nil, err
}

sink := reporting.NewEventSink(logger, metrics.reporting, clients...)
// Note: we pass context.TODO() because we want to manually shut down the event senders only
// once we've had a chance to run a final collection of remaining billing events.
// So instead, we defer a call to sink.Finish() in (*MetricsCollector).Run()
sink := reporting.NewEventSink(context.TODO(), logger, tg, metrics.reporting, clients...)

return &MetricsCollector{
conf: conf,
Expand All @@ -105,6 +110,8 @@ func (mc *MetricsCollector) Run(
accumulateTicker := time.NewTicker(time.Second * time.Duration(mc.conf.AccumulateEverySeconds))
defer accumulateTicker.Stop()

defer mc.sink.Finish()

state := metricsState{
historical: make(map[metricsKey]vmMetricsHistory),
present: make(map[metricsKey]vmMetricsInstant),
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
}
defer schedTracker.Stop()

tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))

scalingEventsMetrics := scalingevents.NewPromMetrics()
scalingReporter, err := scalingevents.NewReporter(ctx, logger, &r.Config.ScalingEvents, scalingEventsMetrics)
scalingReporter, err := scalingevents.NewReporter(ctx, logger, tg, &r.Config.ScalingEvents, scalingEventsMetrics)
if err != nil {
return fmt.Errorf("Error creating scaling events reporter: %w", err)
}
Expand Down Expand Up @@ -82,12 +84,11 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
}
}

mc, err := billing.NewMetricsCollector(ctx, logger, &r.Config.Billing, metrics)
mc, err := billing.NewMetricsCollector(ctx, logger, tg, &r.Config.Billing, metrics)
if err != nil {
return fmt.Errorf("error creating billing metrics collector: %w", err)
}

tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))
tg.Go("billing", func(logger *zap.Logger) error {
return mc.Run(tg.Ctx(), logger, storeForNode)
})
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/scalingevents/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.uber.org/zap"

"github.com/neondatabase/autoscaling/pkg/reporting"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

type Config struct {
Expand Down Expand Up @@ -61,6 +62,7 @@ const (
func NewReporter(
ctx context.Context,
parentLogger *zap.Logger,
tg taskgroup.Group,
conf *Config,
metrics PromMetrics,
) (*Reporter, error) {
Expand All @@ -71,7 +73,7 @@ func NewReporter(
return nil, err
}

sink := reporting.NewEventSink(logger, metrics.reporting, clients...)
sink := reporting.NewEventSink(ctx, logger, tg, metrics.reporting, clients...)

return &Reporter{
conf: conf,
Expand Down
5 changes: 2 additions & 3 deletions pkg/reporting/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type eventSender[E any] struct {

metrics *EventSinkMetrics
queue eventQueuePuller[E]
done <-chan struct{}

// lastSendDuration tracks the "real" last full duration of (eventSender).sendAllCurrentEvents().
//
Expand All @@ -37,15 +36,15 @@ type eventSender[E any] struct {
lastSendDuration time.Duration
}

func (s eventSender[E]) senderLoop(logger *zap.Logger) {
func (s eventSender[E]) senderLoop(ctx context.Context, logger *zap.Logger) {
ticker := time.NewTicker(time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds))
defer ticker.Stop()

for {
final := false

select {
case <-s.done:
case <-ctx.Done():
logger.Info("Received notification that events submission is done")
final = true
case <-ticker.C:
Expand Down
29 changes: 24 additions & 5 deletions pkg/reporting/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,31 @@ package reporting
// public API for event reporting

import (
"context"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

type EventSink[E any] struct {
queueWriters []eventQueuePusher[E]
done func()
}

func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients ...Client[E]) *EventSink[E] {
func NewEventSink[E any](
ctx context.Context,
logger *zap.Logger,
tg taskgroup.Group,
metrics *EventSinkMetrics,
clients ...Client[E],
) *EventSink[E] {
var queueWriters []eventQueuePusher[E]
signalDone := make(chan struct{})

senderCtx, cancelSenders := context.WithCancel(ctx)

for _, c := range clients {
qw, qr := newEventQueue[E](metrics.queueSizeCurrent.WithLabelValues(c.Name))
Expand All @@ -28,15 +38,18 @@ func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients
client: c,
metrics: metrics,
queue: qr,
done: signalDone,
lastSendDuration: 0,
}
go sender.senderLoop(logger.Named(fmt.Sprintf("send-%s", c.Name)))
taskName := fmt.Sprintf("send-%s", c.Name)
tg.Go(taskName, func(_ *zap.Logger) error {
sender.senderLoop(senderCtx, logger.Named(taskName))
return nil
})
}

return &EventSink[E]{
queueWriters: queueWriters,
done: sync.OnceFunc(func() { close(signalDone) }),
done: sync.OnceFunc(cancelSenders),
}
}

Expand All @@ -47,6 +60,12 @@ func (s *EventSink[E]) Enqueue(event E) {
}
}

// Finish signals that the last events have been Enqueue'd, and so they should be sent off before
// shutting down.
func (s *EventSink[E]) Finish() {
s.done()
}

type EventSinkMetrics struct {
queueSizeCurrent *prometheus.GaugeVec
lastSendDuration *prometheus.GaugeVec
Expand Down

0 comments on commit cafbcd4

Please sign in to comment.