From b4aed5781c25204d6b709d4901d8eedc67e43d84 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Tue, 28 Jan 2025 19:56:38 +0000 Subject: [PATCH 1/4] agent: Send last billing/scaling events before shutdown Follow-up to #1107, ref #1220. In short, this change: 1. Moves reporting.EventSink's event sender thread management into a new Run() method 2. Calls (EventSink).Run() from (billing.MetricsCollector).Run(), appropriately canceling when done generating events, and waiting for it to finish before returning. 3. Adds (scalingevents.Reporter).Run() that just calls the underlying (EventSink).Run(), and calls *that* in the entrypoint's taskgroup. Or in other words, exactly what was suggested in this comment: https://github.com/neondatabase/autoscaling/pull/1221#discussion_r1931155991 --- pkg/agent/billing/billing.go | 16 ++++++- pkg/agent/entrypoint.go | 4 ++ pkg/agent/scalingevents/reporter.go | 8 ++++ pkg/reporting/send.go | 5 +-- pkg/reporting/sink.go | 65 +++++++++++++++++++++++++---- 5 files changed, 86 insertions(+), 12 deletions(-) diff --git a/pkg/agent/billing/billing.go b/pkg/agent/billing/billing.go index c2bdfca1e..c83feae38 100644 --- a/pkg/agent/billing/billing.go +++ b/pkg/agent/billing/billing.go @@ -14,6 +14,7 @@ import ( "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/billing" "github.com/neondatabase/autoscaling/pkg/reporting" + "github.com/neondatabase/autoscaling/pkg/util/taskgroup" ) type Config struct { @@ -96,8 +97,6 @@ func (mc *MetricsCollector) Run( logger *zap.Logger, store VMStoreForNode, ) error { - logger = logger.Named("collect") - collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds)) defer collectTicker.Stop() // Offset by half a second, so it's a bit more deterministic. @@ -105,6 +104,18 @@ func (mc *MetricsCollector) Run( accumulateTicker := time.NewTicker(time.Second * time.Duration(mc.conf.AccumulateEverySeconds)) defer accumulateTicker.Stop() + // Create a new taskgroup to manage mc.sink.Run() -- we want to run the event senders in the + // background and cancel them when we're done collecting, but wait for them to finish before + // returning. + sinkTg := taskgroup.NewGroup(logger) + sinkCtx, cancelSink := context.WithCancel(context.Background()) + defer sinkTg.Wait() //nolint:errcheck // cannot fail, sink-run returns nil. + defer cancelSink() + sinkTg.Go("sink-run", func(logger *zap.Logger) error { + mc.sink.Run(sinkCtx) + return nil + }) + state := metricsState{ historical: make(map[metricsKey]vmMetricsHistory), present: make(map[metricsKey]vmMetricsInstant), @@ -112,6 +123,7 @@ func (mc *MetricsCollector) Run( pushWindowStart: time.Now(), } + logger = logger.Named("collect") state.collect(logger, store, mc.metrics) for { diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index ae315161f..d71712882 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -88,6 +88,10 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { } tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) + tg.Go("scalingevents-run", func(logger *zap.Logger) error { + scalingReporter.Run(tg.Ctx()) + return nil + }) tg.Go("billing", func(logger *zap.Logger) error { return mc.Run(tg.Ctx(), logger, storeForNode) }) diff --git a/pkg/agent/scalingevents/reporter.go b/pkg/agent/scalingevents/reporter.go index cd9165f01..aef9e4a29 100644 --- a/pkg/agent/scalingevents/reporter.go +++ b/pkg/agent/scalingevents/reporter.go @@ -80,6 +80,14 @@ func NewReporter( }, nil } +// Run calls the underlying reporting.EventSink's Run() method, periodically pushing events to the +// clients specified in Config until the context expires. +// +// Refer there for more information. +func (r *Reporter) Run(ctx context.Context) { + r.sink.Run(ctx) +} + // Submit adds the ScalingEvent to the sender queue(s), returning without waiting for it to be sent. func (r *Reporter) Submit(event ScalingEvent) { r.metrics.recordSubmitted(event) diff --git a/pkg/reporting/send.go b/pkg/reporting/send.go index 0a2780d06..d97603241 100644 --- a/pkg/reporting/send.go +++ b/pkg/reporting/send.go @@ -12,7 +12,6 @@ type eventSender[E any] struct { metrics *EventSinkMetrics queue eventQueuePuller[E] - done <-chan struct{} // lastSendDuration tracks the "real" last full duration of (eventSender).sendAllCurrentEvents(). // @@ -37,7 +36,7 @@ type eventSender[E any] struct { lastSendDuration time.Duration } -func (s eventSender[E]) senderLoop(logger *zap.Logger) { +func (s eventSender[E]) senderLoop(ctx context.Context, logger *zap.Logger) { ticker := time.NewTicker(time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds)) defer ticker.Stop() @@ -45,7 +44,7 @@ func (s eventSender[E]) senderLoop(logger *zap.Logger) { final := false select { - case <-s.done: + case <-ctx.Done(): logger.Info("Received notification that events submission is done") final = true case <-ticker.C: diff --git a/pkg/reporting/sink.go b/pkg/reporting/sink.go index 941d156dc..eaef4e862 100644 --- a/pkg/reporting/sink.go +++ b/pkg/reporting/sink.go @@ -3,43 +3,94 @@ package reporting // public API for event reporting import ( + "context" "fmt" - "sync" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/util/taskgroup" ) type EventSink[E any] struct { queueWriters []eventQueuePusher[E] - done func() + + runSenders func(context.Context) } +// NewEventSink creates a new EventSink with the given clients to dispatch events into. +// +// You MUST call (*EventSink[E]).Run() if you wish for any enqueued events to actually be sent via +// the clients. func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients ...Client[E]) *EventSink[E] { var queueWriters []eventQueuePusher[E] - signalDone := make(chan struct{}) + + var senders []struct { + name string + sender eventSender[E] + } for _, c := range clients { qw, qr := newEventQueue[E](metrics.queueSizeCurrent.WithLabelValues(c.Name)) queueWriters = append(queueWriters, qw) - // Start the sender + // Create the sender -- we'll save starting it for the call to Run() sender := eventSender[E]{ client: c, metrics: metrics, queue: qr, - done: signalDone, lastSendDuration: 0, } - go sender.senderLoop(logger.Named(fmt.Sprintf("send-%s", c.Name))) + senders = append(senders, struct { + name string + sender eventSender[E] + }{ + name: c.Name, + sender: sender, + }) + } + + var runSenders func(context.Context) + if len(senders) > 0 { + runSenders = func(ctx context.Context) { + tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) + + for _, s := range senders { + taskName := fmt.Sprintf("send-%s", s.name) + tg.Go(taskName, func(logger *zap.Logger) error { + s.sender.senderLoop(tg.Ctx(), logger) + return nil + }) + } + + _ = tg.Wait() // no need to check error, they all return nil. + } + } else { + // Special case when there's no clients -- we want our run function to just wait until the + // context is complete, matching what the behavior *would* be if there were actually sender + // threads we were waiting on. + runSenders = func(ctx context.Context) { + <-ctx.Done() + } } return &EventSink[E]{ queueWriters: queueWriters, - done: sync.OnceFunc(func() { close(signalDone) }), + runSenders: runSenders, } } +// Run executes the client threads responsible for actually pushing enqueued events to the +// appropriate places. +// +// The clients will periodically push events until the context expires, at which point they will +// push any remaining events. Run() only completes after these final events have been pushed. +// +// Calling Run() more than once is unsound. +func (s *EventSink[E]) Run(ctx context.Context) { + s.runSenders(ctx) +} + // Enqueue submits the event to the internal client sending queues, returning without blocking. func (s *EventSink[E]) Enqueue(event E) { for _, q := range s.queueWriters { From 864689e5a98035d3c2fcab8cae52ba04fc04c6ba Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Tue, 28 Jan 2025 20:52:17 +0000 Subject: [PATCH 2/4] simplify senders list --- pkg/reporting/sink.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/pkg/reporting/sink.go b/pkg/reporting/sink.go index eaef4e862..c8ac0c401 100644 --- a/pkg/reporting/sink.go +++ b/pkg/reporting/sink.go @@ -25,28 +25,18 @@ type EventSink[E any] struct { func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients ...Client[E]) *EventSink[E] { var queueWriters []eventQueuePusher[E] - var senders []struct { - name string - sender eventSender[E] - } + var senders []eventSender[E] for _, c := range clients { qw, qr := newEventQueue[E](metrics.queueSizeCurrent.WithLabelValues(c.Name)) queueWriters = append(queueWriters, qw) // Create the sender -- we'll save starting it for the call to Run() - sender := eventSender[E]{ + senders = append(senders, eventSender[E]{ client: c, metrics: metrics, queue: qr, lastSendDuration: 0, - } - senders = append(senders, struct { - name string - sender eventSender[E] - }{ - name: c.Name, - sender: sender, }) } @@ -55,10 +45,10 @@ func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients runSenders = func(ctx context.Context) { tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) - for _, s := range senders { - taskName := fmt.Sprintf("send-%s", s.name) + for _, sender := range senders { + taskName := fmt.Sprintf("send-%s", sender.client.Name) tg.Go(taskName, func(logger *zap.Logger) error { - s.sender.senderLoop(tg.Ctx(), logger) + sender.senderLoop(tg.Ctx(), logger) return nil }) } From ed73d72b9d1405cc6a99a66cd12e3513f3bf44cd Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Mon, 10 Feb 2025 13:14:43 +0000 Subject: [PATCH 3/4] pass errors back to propagate panics --- pkg/agent/billing/billing.go | 6 +++++- pkg/agent/entrypoint.go | 3 +-- pkg/agent/scalingevents/reporter.go | 8 ++++++-- pkg/reporting/sink.go | 15 ++++++++------- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/pkg/agent/billing/billing.go b/pkg/agent/billing/billing.go index c83feae38..6a083d448 100644 --- a/pkg/agent/billing/billing.go +++ b/pkg/agent/billing/billing.go @@ -3,6 +3,7 @@ package billing import ( "context" "errors" + "fmt" "math" "time" @@ -112,7 +113,10 @@ func (mc *MetricsCollector) Run( defer sinkTg.Wait() //nolint:errcheck // cannot fail, sink-run returns nil. defer cancelSink() sinkTg.Go("sink-run", func(logger *zap.Logger) error { - mc.sink.Run(sinkCtx) + err := mc.sink.Run(sinkCtx) + if err != nil { + return fmt.Errorf("billing events sink failed: %w", err) + } return nil }) diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index d08034a58..c0bf20b5d 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -88,8 +88,7 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) tg.Go("scalingevents-run", func(logger *zap.Logger) error { - scalingReporter.Run(tg.Ctx()) - return nil + return scalingReporter.Run(tg.Ctx()) }) tg.Go("billing", func(logger *zap.Logger) error { return mc.Run(tg.Ctx(), logger, storeForNode) diff --git a/pkg/agent/scalingevents/reporter.go b/pkg/agent/scalingevents/reporter.go index aef9e4a29..bacbc8584 100644 --- a/pkg/agent/scalingevents/reporter.go +++ b/pkg/agent/scalingevents/reporter.go @@ -2,6 +2,7 @@ package scalingevents import ( "context" + "fmt" "math" "time" @@ -84,8 +85,11 @@ func NewReporter( // clients specified in Config until the context expires. // // Refer there for more information. -func (r *Reporter) Run(ctx context.Context) { - r.sink.Run(ctx) +func (r *Reporter) Run(ctx context.Context) error { + if err := r.sink.Run(ctx); err != nil { + return fmt.Errorf("scaling events sink failed: %w", err) + } + return nil } // Submit adds the ScalingEvent to the sender queue(s), returning without waiting for it to be sent. diff --git a/pkg/reporting/sink.go b/pkg/reporting/sink.go index 3062a5167..088413112 100644 --- a/pkg/reporting/sink.go +++ b/pkg/reporting/sink.go @@ -16,7 +16,7 @@ import ( type EventSink[E any] struct { queueWriters []eventQueuePusher[E] - runSenders func(context.Context) + runSenders func(context.Context) error } // NewEventSink creates a new EventSink with the given clients to dispatch events into. @@ -41,9 +41,9 @@ func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients }) } - var runSenders func(context.Context) + var runSenders func(context.Context) error if len(senders) > 0 { - runSenders = func(ctx context.Context) { + runSenders = func(ctx context.Context) error { tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) for _, sender := range senders { @@ -54,14 +54,15 @@ func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients }) } - _ = tg.Wait() // no need to check error, they all return nil. + return tg.Wait() } } else { // Special case when there's no clients -- we want our run function to just wait until the // context is complete, matching what the behavior *would* be if there were actually sender // threads we were waiting on. - runSenders = func(ctx context.Context) { + runSenders = func(ctx context.Context) error { <-ctx.Done() + return nil } } @@ -78,8 +79,8 @@ func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients // push any remaining events. Run() only completes after these final events have been pushed. // // Calling Run() more than once is unsound. -func (s *EventSink[E]) Run(ctx context.Context) { - s.runSenders(ctx) +func (s *EventSink[E]) Run(ctx context.Context) error { + return s.runSenders(ctx) } // Enqueue submits the event to the internal client sending queues, returning without blocking. From f0ba926ca848f93dffb9f6916ea7de137c9e3959 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Mon, 10 Feb 2025 13:24:04 +0000 Subject: [PATCH 4/4] split (*MetricsCollector).Run() --- pkg/agent/billing/billing.go | 41 +++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/pkg/agent/billing/billing.go b/pkg/agent/billing/billing.go index 6a083d448..b5583c1a7 100644 --- a/pkg/agent/billing/billing.go +++ b/pkg/agent/billing/billing.go @@ -98,28 +98,40 @@ func (mc *MetricsCollector) Run( logger *zap.Logger, store VMStoreForNode, ) error { - collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds)) - defer collectTicker.Stop() - // Offset by half a second, so it's a bit more deterministic. - time.Sleep(500 * time.Millisecond) - accumulateTicker := time.NewTicker(time.Second * time.Duration(mc.conf.AccumulateEverySeconds)) - defer accumulateTicker.Stop() + tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) - // Create a new taskgroup to manage mc.sink.Run() -- we want to run the event senders in the - // background and cancel them when we're done collecting, but wait for them to finish before - // returning. - sinkTg := taskgroup.NewGroup(logger) + // note: sink has its own context, so that it is canceled only after runCollector finishes. sinkCtx, cancelSink := context.WithCancel(context.Background()) - defer sinkTg.Wait() //nolint:errcheck // cannot fail, sink-run returns nil. - defer cancelSink() - sinkTg.Go("sink-run", func(logger *zap.Logger) error { - err := mc.sink.Run(sinkCtx) + defer cancelSink() // make sure resources are cleaned up + + tg.Go("collect", func(logger *zap.Logger) error { + defer cancelSink() // cancel event sending *only when we're done collecting* + return mc.runCollector(tg.Ctx(), logger, store) + }) + + tg.Go("sink-run", func(logger *zap.Logger) error { + err := mc.sink.Run(sinkCtx) // note: NOT tg.Ctx(); see more above. if err != nil { return fmt.Errorf("billing events sink failed: %w", err) } return nil }) + return tg.Wait() +} + +func (mc *MetricsCollector) runCollector( + ctx context.Context, + logger *zap.Logger, + store VMStoreForNode, +) error { + collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds)) + defer collectTicker.Stop() + // Offset by half a second, so it's a bit more deterministic. + time.Sleep(500 * time.Millisecond) + accumulateTicker := time.NewTicker(time.Second * time.Duration(mc.conf.AccumulateEverySeconds)) + defer accumulateTicker.Stop() + state := metricsState{ historical: make(map[metricsKey]vmMetricsHistory), present: make(map[metricsKey]vmMetricsInstant), @@ -127,7 +139,6 @@ func (mc *MetricsCollector) Run( pushWindowStart: time.Now(), } - logger = logger.Named("collect") state.collect(logger, store, mc.metrics) for {