Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: Send last billing/scaling events before shutdown #1221

Merged
merged 6 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion pkg/agent/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package billing
import (
"context"
"errors"
"fmt"
"math"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/billing"
"github.com/neondatabase/autoscaling/pkg/reporting"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

type Config struct {
Expand Down Expand Up @@ -96,8 +98,33 @@ func (mc *MetricsCollector) Run(
logger *zap.Logger,
store VMStoreForNode,
) error {
logger = logger.Named("collect")
tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))

// note: sink has its own context, so that it is canceled only after runCollector finishes.
sinkCtx, cancelSink := context.WithCancel(context.Background())
defer cancelSink() // make sure resources are cleaned up

tg.Go("collect", func(logger *zap.Logger) error {
defer cancelSink() // cancel event sending *only when we're done collecting*
return mc.runCollector(tg.Ctx(), logger, store)
})

tg.Go("sink-run", func(logger *zap.Logger) error {
err := mc.sink.Run(sinkCtx) // note: NOT tg.Ctx(); see more above.
if err != nil {
return fmt.Errorf("billing events sink failed: %w", err)
}
return nil
})

return tg.Wait()
}

func (mc *MetricsCollector) runCollector(
ctx context.Context,
logger *zap.Logger,
store VMStoreForNode,
) error {
collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds))
defer collectTicker.Stop()
// Offset by half a second, so it's a bit more deterministic.
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
}

tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))
tg.Go("scalingevents-run", func(logger *zap.Logger) error {
return scalingReporter.Run(tg.Ctx())
})
tg.Go("billing", func(logger *zap.Logger) error {
return mc.Run(tg.Ctx(), logger, storeForNode)
})
Expand Down
12 changes: 12 additions & 0 deletions pkg/agent/scalingevents/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scalingevents

import (
"context"
"fmt"
"math"
"time"

Expand Down Expand Up @@ -80,6 +81,17 @@ func NewReporter(
}, nil
}

// Run calls the underlying reporting.EventSink's Run() method, periodically pushing events to the
// clients specified in Config until the context expires.
//
// Refer there for more information.
func (r *Reporter) Run(ctx context.Context) error {
if err := r.sink.Run(ctx); err != nil {
return fmt.Errorf("scaling events sink failed: %w", err)
}
return nil
}

// Submit adds the ScalingEvent to the sender queue(s), returning without waiting for it to be sent.
func (r *Reporter) Submit(event ScalingEvent) {
r.metrics.recordSubmitted(event)
Expand Down
5 changes: 2 additions & 3 deletions pkg/reporting/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type eventSender[E any] struct {

metrics *EventSinkMetrics
queue eventQueuePuller[E]
done <-chan struct{}

// lastSendDuration tracks the "real" last full duration of (eventSender).sendAllCurrentEvents().
//
Expand All @@ -37,15 +36,15 @@ type eventSender[E any] struct {
lastSendDuration time.Duration
}

func (s eventSender[E]) senderLoop(logger *zap.Logger) {
func (s eventSender[E]) senderLoop(ctx context.Context, logger *zap.Logger) {
ticker := time.NewTicker(time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds))
defer ticker.Stop()

for {
final := false

select {
case <-s.done:
case <-ctx.Done():
logger.Info("Received notification that events submission is done")
final = true
case <-ticker.C:
Expand Down
57 changes: 49 additions & 8 deletions pkg/reporting/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,86 @@ package reporting
// public API for event reporting

import (
"context"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

type EventSink[E any] struct {
queueWriters []eventQueuePusher[E]
done func()

runSenders func(context.Context) error
}

// NewEventSink creates a new EventSink with the given clients to dispatch events into.
//
// You MUST call (*EventSink[E]).Run() if you wish for any enqueued events to actually be sent via
// the clients.
func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients ...Client[E]) *EventSink[E] {
var queueWriters []eventQueuePusher[E]
signalDone := make(chan struct{})

var senders []eventSender[E]

for _, c := range clients {
qw, qr := newEventQueue[E](metrics.queueSizeCurrent.WithLabelValues(c.Name))
queueWriters = append(queueWriters, qw)

// Start the sender
sender := eventSender[E]{
// Create the sender -- we'll save starting it for the call to Run()
senders = append(senders, eventSender[E]{
client: c,
metrics: metrics,
queue: qr,
done: signalDone,
lastSendDuration: 0,
})
}

var runSenders func(context.Context) error
if len(senders) > 0 {
runSenders = func(ctx context.Context) error {
tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))

for _, sender := range senders {
taskName := fmt.Sprintf("send-%s", sender.client.Name)
tg.Go(taskName, func(logger *zap.Logger) error {
sender.senderLoop(tg.Ctx(), logger)
return nil
})
}

return tg.Wait()
}
} else {
// Special case when there's no clients -- we want our run function to just wait until the
// context is complete, matching what the behavior *would* be if there were actually sender
// threads we were waiting on.
runSenders = func(ctx context.Context) error {
<-ctx.Done()
return nil
}
go sender.senderLoop(logger.Named(fmt.Sprintf("send-%s", c.Name)))
}

return &EventSink[E]{
queueWriters: queueWriters,
done: sync.OnceFunc(func() { close(signalDone) }),
runSenders: runSenders,
}
}

// Run executes the client threads responsible for actually pushing enqueued events to the
// appropriate places.
//
// The clients will periodically push events until the context expires, at which point they will
// push any remaining events. Run() only completes after these final events have been pushed.
//
// Calling Run() more than once is unsound.
func (s *EventSink[E]) Run(ctx context.Context) error {
return s.runSenders(ctx)
}

// Enqueue submits the event to the internal client sending queues, returning without blocking.
func (s *EventSink[E]) Enqueue(event E) {
for _, q := range s.queueWriters {
Expand Down
Loading