Skip to content

Commit

Permalink
agent: Send last billing/scaling events before shutdown
Browse files Browse the repository at this point in the history
Follow-up to #1107, ref #1220.

In short, this change:

1. Moves reporting.EventSink's event sender thread management into a new
   Run() method
2. Calls (EventSink).Run() from (billing.MetricsCollector).Run(),
   appropriately canceling when done generating events, and waiting for
   it to finish before returning.
3. Adds (scalingevents.Reporter).Run() that just calls the underlying
   (EventSink).Run(), and calls *that* in the entrypoint's taskgroup.

Or in other words, exactly what was suggested in this comment:
#1221 (comment)
  • Loading branch information
sharnoff committed Jan 28, 2025
1 parent 46ad90f commit b4aed57
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 12 deletions.
16 changes: 14 additions & 2 deletions pkg/agent/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/billing"
"github.com/neondatabase/autoscaling/pkg/reporting"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

type Config struct {
Expand Down Expand Up @@ -96,22 +97,33 @@ func (mc *MetricsCollector) Run(
logger *zap.Logger,
store VMStoreForNode,
) error {
logger = logger.Named("collect")

collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds))
defer collectTicker.Stop()
// Offset by half a second, so it's a bit more deterministic.
time.Sleep(500 * time.Millisecond)
accumulateTicker := time.NewTicker(time.Second * time.Duration(mc.conf.AccumulateEverySeconds))
defer accumulateTicker.Stop()

// Create a new taskgroup to manage mc.sink.Run() -- we want to run the event senders in the
// background and cancel them when we're done collecting, but wait for them to finish before
// returning.
sinkTg := taskgroup.NewGroup(logger)
sinkCtx, cancelSink := context.WithCancel(context.Background())
defer sinkTg.Wait() //nolint:errcheck // cannot fail, sink-run returns nil.
defer cancelSink()
sinkTg.Go("sink-run", func(logger *zap.Logger) error {
mc.sink.Run(sinkCtx)
return nil
})

state := metricsState{
historical: make(map[metricsKey]vmMetricsHistory),
present: make(map[metricsKey]vmMetricsInstant),
lastCollectTime: nil,
pushWindowStart: time.Now(),
}

logger = logger.Named("collect")
state.collect(logger, store, mc.metrics)

for {
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
}

tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))
tg.Go("scalingevents-run", func(logger *zap.Logger) error {
scalingReporter.Run(tg.Ctx())
return nil
})
tg.Go("billing", func(logger *zap.Logger) error {
return mc.Run(tg.Ctx(), logger, storeForNode)
})
Expand Down
8 changes: 8 additions & 0 deletions pkg/agent/scalingevents/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ func NewReporter(
}, nil
}

// Run calls the underlying reporting.EventSink's Run() method, periodically pushing events to the
// clients specified in Config until the context expires.
//
// Refer there for more information.
func (r *Reporter) Run(ctx context.Context) {
r.sink.Run(ctx)
}

// Submit adds the ScalingEvent to the sender queue(s), returning without waiting for it to be sent.
func (r *Reporter) Submit(event ScalingEvent) {
r.metrics.recordSubmitted(event)
Expand Down
5 changes: 2 additions & 3 deletions pkg/reporting/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type eventSender[E any] struct {

metrics *EventSinkMetrics
queue eventQueuePuller[E]
done <-chan struct{}

// lastSendDuration tracks the "real" last full duration of (eventSender).sendAllCurrentEvents().
//
Expand All @@ -37,15 +36,15 @@ type eventSender[E any] struct {
lastSendDuration time.Duration
}

func (s eventSender[E]) senderLoop(logger *zap.Logger) {
func (s eventSender[E]) senderLoop(ctx context.Context, logger *zap.Logger) {
ticker := time.NewTicker(time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds))
defer ticker.Stop()

for {
final := false

select {
case <-s.done:
case <-ctx.Done():
logger.Info("Received notification that events submission is done")
final = true
case <-ticker.C:
Expand Down
65 changes: 58 additions & 7 deletions pkg/reporting/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,94 @@ package reporting
// public API for event reporting

import (
"context"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

type EventSink[E any] struct {
queueWriters []eventQueuePusher[E]
done func()

runSenders func(context.Context)
}

// NewEventSink creates a new EventSink with the given clients to dispatch events into.
//
// You MUST call (*EventSink[E]).Run() if you wish for any enqueued events to actually be sent via
// the clients.
func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients ...Client[E]) *EventSink[E] {
var queueWriters []eventQueuePusher[E]
signalDone := make(chan struct{})

var senders []struct {
name string
sender eventSender[E]
}

for _, c := range clients {
qw, qr := newEventQueue[E](metrics.queueSizeCurrent.WithLabelValues(c.Name))
queueWriters = append(queueWriters, qw)

// Start the sender
// Create the sender -- we'll save starting it for the call to Run()
sender := eventSender[E]{
client: c,
metrics: metrics,
queue: qr,
done: signalDone,
lastSendDuration: 0,
}
go sender.senderLoop(logger.Named(fmt.Sprintf("send-%s", c.Name)))
senders = append(senders, struct {
name string
sender eventSender[E]
}{
name: c.Name,
sender: sender,
})
}

var runSenders func(context.Context)
if len(senders) > 0 {
runSenders = func(ctx context.Context) {
tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))

for _, s := range senders {
taskName := fmt.Sprintf("send-%s", s.name)
tg.Go(taskName, func(logger *zap.Logger) error {
s.sender.senderLoop(tg.Ctx(), logger)
return nil
})
}

_ = tg.Wait() // no need to check error, they all return nil.
}
} else {
// Special case when there's no clients -- we want our run function to just wait until the
// context is complete, matching what the behavior *would* be if there were actually sender
// threads we were waiting on.
runSenders = func(ctx context.Context) {
<-ctx.Done()
}
}

return &EventSink[E]{
queueWriters: queueWriters,
done: sync.OnceFunc(func() { close(signalDone) }),
runSenders: runSenders,
}
}

// Run executes the client threads responsible for actually pushing enqueued events to the
// appropriate places.
//
// The clients will periodically push events until the context expires, at which point they will
// push any remaining events. Run() only completes after these final events have been pushed.
//
// Calling Run() more than once is unsound.
func (s *EventSink[E]) Run(ctx context.Context) {
s.runSenders(ctx)
}

// Enqueue submits the event to the internal client sending queues, returning without blocking.
func (s *EventSink[E]) Enqueue(event E) {
for _, q := range s.queueWriters {
Expand Down

0 comments on commit b4aed57

Please sign in to comment.