Skip to content

Commit

Permalink
agent: Send last billing/scaling events before shutdown (#1221)
Browse files Browse the repository at this point in the history
Follow-up to #1107, ref #1220.

In short, this change:

1. Moves reporting.EventSink's event sender thread management into a new
   Run() method
2. Calls (EventSink).Run() from (billing.MetricsCollector).Run(),
   appropriately canceling when done generating events, and waiting for
   it to finish before returning.
3. Adds (scalingevents.Reporter).Run() that just calls the underlying
   (EventSink).Run(), and calls *that* in the entrypoint's taskgroup.

Or in other words, exactly what was suggested in this comment:
#1221 (comment)
  • Loading branch information
sharnoff authored Feb 10, 2025
1 parent f9403d0 commit 55aff0e
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 12 deletions.
29 changes: 28 additions & 1 deletion pkg/agent/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package billing
import (
"context"
"errors"
"fmt"
"math"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/billing"
"github.com/neondatabase/autoscaling/pkg/reporting"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

type Config struct {
Expand Down Expand Up @@ -96,8 +98,33 @@ func (mc *MetricsCollector) Run(
logger *zap.Logger,
store VMStoreForNode,
) error {
logger = logger.Named("collect")
tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))

// note: sink has its own context, so that it is canceled only after runCollector finishes.
sinkCtx, cancelSink := context.WithCancel(context.Background())
defer cancelSink() // make sure resources are cleaned up

tg.Go("collect", func(logger *zap.Logger) error {
defer cancelSink() // cancel event sending *only when we're done collecting*
return mc.runCollector(tg.Ctx(), logger, store)
})

tg.Go("sink-run", func(logger *zap.Logger) error {
err := mc.sink.Run(sinkCtx) // note: NOT tg.Ctx(); see more above.
if err != nil {
return fmt.Errorf("billing events sink failed: %w", err)
}
return nil
})

return tg.Wait()
}

func (mc *MetricsCollector) runCollector(
ctx context.Context,
logger *zap.Logger,
store VMStoreForNode,
) error {
collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds))
defer collectTicker.Stop()
// Offset by half a second, so it's a bit more deterministic.
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
}

tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))
tg.Go("scalingevents-run", func(logger *zap.Logger) error {
return scalingReporter.Run(tg.Ctx())
})
tg.Go("billing", func(logger *zap.Logger) error {
return mc.Run(tg.Ctx(), logger, storeForNode)
})
Expand Down
12 changes: 12 additions & 0 deletions pkg/agent/scalingevents/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scalingevents

import (
"context"
"fmt"
"math"
"time"

Expand Down Expand Up @@ -80,6 +81,17 @@ func NewReporter(
}, nil
}

// Run calls the underlying reporting.EventSink's Run() method, periodically pushing events to the
// clients specified in Config until the context expires.
//
// Refer there for more information.
func (r *Reporter) Run(ctx context.Context) error {
if err := r.sink.Run(ctx); err != nil {
return fmt.Errorf("scaling events sink failed: %w", err)
}
return nil
}

// Submit adds the ScalingEvent to the sender queue(s), returning without waiting for it to be sent.
func (r *Reporter) Submit(event ScalingEvent) {
r.metrics.recordSubmitted(event)
Expand Down
5 changes: 2 additions & 3 deletions pkg/reporting/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type eventSender[E any] struct {

metrics *EventSinkMetrics
queue eventQueuePuller[E]
done <-chan struct{}

// lastSendDuration tracks the "real" last full duration of (eventSender).sendAllCurrentEvents().
//
Expand All @@ -37,15 +36,15 @@ type eventSender[E any] struct {
lastSendDuration time.Duration
}

func (s eventSender[E]) senderLoop(logger *zap.Logger) {
func (s eventSender[E]) senderLoop(ctx context.Context, logger *zap.Logger) {
ticker := time.NewTicker(time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds))
defer ticker.Stop()

for {
final := false

select {
case <-s.done:
case <-ctx.Done():
logger.Info("Received notification that events submission is done")
final = true
case <-ticker.C:
Expand Down
57 changes: 49 additions & 8 deletions pkg/reporting/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,86 @@ package reporting
// public API for event reporting

import (
"context"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

type EventSink[E any] struct {
queueWriters []eventQueuePusher[E]
done func()

runSenders func(context.Context) error
}

// NewEventSink creates a new EventSink with the given clients to dispatch events into.
//
// You MUST call (*EventSink[E]).Run() if you wish for any enqueued events to actually be sent via
// the clients.
func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients ...Client[E]) *EventSink[E] {
var queueWriters []eventQueuePusher[E]
signalDone := make(chan struct{})

var senders []eventSender[E]

for _, c := range clients {
qw, qr := newEventQueue[E](metrics.queueSizeCurrent.WithLabelValues(c.Name))
queueWriters = append(queueWriters, qw)

// Start the sender
sender := eventSender[E]{
// Create the sender -- we'll save starting it for the call to Run()
senders = append(senders, eventSender[E]{
client: c,
metrics: metrics,
queue: qr,
done: signalDone,
lastSendDuration: 0,
})
}

var runSenders func(context.Context) error
if len(senders) > 0 {
runSenders = func(ctx context.Context) error {
tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))

for _, sender := range senders {
taskName := fmt.Sprintf("send-%s", sender.client.Name)
tg.Go(taskName, func(logger *zap.Logger) error {
sender.senderLoop(tg.Ctx(), logger)
return nil
})
}

return tg.Wait()
}
} else {
// Special case when there's no clients -- we want our run function to just wait until the
// context is complete, matching what the behavior *would* be if there were actually sender
// threads we were waiting on.
runSenders = func(ctx context.Context) error {
<-ctx.Done()
return nil
}
go sender.senderLoop(logger.Named(fmt.Sprintf("send-%s", c.Name)))
}

return &EventSink[E]{
queueWriters: queueWriters,
done: sync.OnceFunc(func() { close(signalDone) }),
runSenders: runSenders,
}
}

// Run executes the client threads responsible for actually pushing enqueued events to the
// appropriate places.
//
// The clients will periodically push events until the context expires, at which point they will
// push any remaining events. Run() only completes after these final events have been pushed.
//
// Calling Run() more than once is unsound.
func (s *EventSink[E]) Run(ctx context.Context) error {
return s.runSenders(ctx)
}

// Enqueue submits the event to the internal client sending queues, returning without blocking.
func (s *EventSink[E]) Enqueue(event E) {
for _, q := range s.queueWriters {
Expand Down

0 comments on commit 55aff0e

Please sign in to comment.