Skip to content

Commit

Permalink
reporting: Continuously send full batches of events
Browse files Browse the repository at this point in the history
Part of #1220.

In cases where (a) we expect batches to be very large, and (b) we expect
that some autoscaler-agent instances may end up with many batches of
events between reporting periods, we might see excessive memory usage
from those events.

So instead of pushing events exactly every push period, we should push
*at least* every push period, and exactly when the next batch is full if
it's full before the push period is reached.
  • Loading branch information
sharnoff committed Jan 29, 2025
1 parent 864689e commit f4786db
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 119 deletions.
124 changes: 124 additions & 0 deletions pkg/reporting/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package reporting

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

type eventBatcher[E any] struct {
mu sync.Mutex

targetBatchSize int

ongoing []E

completed []batch[E]
onComplete func()
completedSize int

sizeGauge prometheus.Gauge
}

type batch[E any] struct {
events []E
}

func newEventBatcher[E any](
targetBatchSize int,
notifyCompletedBatch func(),
sizeGauge prometheus.Gauge,
) *eventBatcher[E] {
return &eventBatcher[E]{
mu: sync.Mutex{},

targetBatchSize: targetBatchSize,

ongoing: []E{},

completed: []batch[E]{},
onComplete: notifyCompletedBatch,
completedSize: 0,

sizeGauge: sizeGauge,
}
}

// enqueue adds an event to the current in-progress batch.
//
// If the target batch size is reached, the batch will be packaged up for consumption by
// (*eventBatcher[E]).peekCompleted() and b.onComplete() will be called.
func (b *eventBatcher[E]) enqueue(event E) {
b.mu.Lock()
defer b.mu.Unlock()

b.ongoing = append(b.ongoing, event)
b.updateGauge()

if len(b.ongoing) >= b.targetBatchSize {
b.finishCurrentBatch()
}
}

// finishOngoing collects any events that have not yet been packaged up into a batch, adding them to
// a batch visible in (*eventBatcher[E]).peekCompleted().
//
// If there are outstanding events when this method is called, b.onComplete() will be called.
// Otherwise, it will not be called.
func (b *eventBatcher[E]) finishOngoing() {
b.mu.Lock()
defer b.mu.Unlock()

if len(b.ongoing) == 0 {
return
}

b.finishCurrentBatch()
}

// peekCompleted returns the batches that have been completed but not yet dropped, without modifying
// the batcher.
//
// Once done with these batches, you should call (*eventBatcher[E]).dropCompleted() to remove them
// from future consideration.
func (b *eventBatcher[E]) peekCompleted() []batch[E] {
b.mu.Lock()
defer b.mu.Unlock()

tmp := make([]batch[E], len(b.completed))
copy(tmp, b.completed)
return tmp
}

// dropCompleted drops the given number of batches from internal storage, removing them from view in
// (*eventBatcher[E]).peekCompleted().
func (b *eventBatcher[e]) dropCompleted(batchCount int) {
b.mu.Lock()
defer b.mu.Unlock()

dropped := b.completed[:batchCount]
b.completed = b.completed[batchCount:]

for _, batch := range dropped {
b.completedSize -= len(batch.events)
}

b.updateGauge()
}

// NB: must hold mu
func (b *eventBatcher[E]) updateGauge() {
b.sizeGauge.Set(float64(len(b.ongoing) + b.completedSize))
}

// NB: must hold mu
func (b *eventBatcher[E]) finishCurrentBatch() {
b.completed = append(b.completed, batch[E]{
events: b.ongoing,
})

b.completedSize += len(b.ongoing)
b.ongoing = []E{}

b.onComplete()
}
76 changes: 0 additions & 76 deletions pkg/reporting/queue.go

This file was deleted.

94 changes: 57 additions & 37 deletions pkg/reporting/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@ import (
"time"

"go.uber.org/zap"

"github.com/neondatabase/autoscaling/pkg/util"
)

type eventSender[E any] struct {
client Client[E]

metrics *EventSinkMetrics
queue eventQueuePuller[E]

// lastSendDuration tracks the "real" last full duration of (eventSender).sendAllCurrentEvents().
queue *eventBatcher[E]
batchComplete util.BroadcastReceiver

// lastSendDuration tracks the "real" last full duration of (eventSender).sendAllCompletedBatches().
//
// It's separate from metrics.lastSendDuration because (a) we'd like to include the duration of
// ongoing calls to sendAllCurrentEvents, but (b) we don't want the bias towards lower durations
// that comes with that.
// ongoing calls to sendAllCompletedBatches, but (b) we don't want the bias towards lower
// durations that comes with that.
//
// Here's some more detail:
//
// To make sure that long-running sendAllCurrentEvents() loops show up in the metrics while
// To make sure that long-running sendAllCompletedBatches() loops show up in the metrics while
// they're still running, we want to periodically update metrics.lastSendDuration before the
// loop has finished. A side-effect of doing this naively is that the gauge will sometimes
// return durations that are much shorter than the *actual* previous send loop duration.
Expand All @@ -37,8 +41,10 @@ type eventSender[E any] struct {
}

func (s eventSender[E]) senderLoop(ctx context.Context, logger *zap.Logger) {
ticker := time.NewTicker(time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds))
defer ticker.Stop()
heartbeat := time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds)

timer := time.NewTimer(heartbeat)
defer timer.Stop()

for {
final := false
Expand All @@ -47,10 +53,18 @@ func (s eventSender[E]) senderLoop(ctx context.Context, logger *zap.Logger) {
case <-ctx.Done():
logger.Info("Received notification that events submission is done")
final = true
case <-ticker.C:
// finish up any in-progress batch, so that we can send it before we exit.
s.queue.finishOngoing()
case <-s.batchComplete.Wait():
s.batchComplete.Awake() // consume this notification
case <-timer.C:
}

s.sendAllCurrentEvents(logger)
// Make sure that if there are no more events within the next heartbeat duration, that we'll
// push the events that have been accumulated so far.
timer.Reset(heartbeat)

s.sendAllCompletedBatches(logger)

if final {
logger.Info("Ending events sender loop")
Expand All @@ -59,51 +73,54 @@ func (s eventSender[E]) senderLoop(ctx context.Context, logger *zap.Logger) {
}
}

func (s eventSender[E]) sendAllCurrentEvents(logger *zap.Logger) {
logger.Info("Pushing all available events")
func (s eventSender[E]) sendAllCompletedBatches(logger *zap.Logger) {
logger.Info("Pushing all available event batches")

if s.queue.size() == 0 {
logger.Info("No events to push")
if len(s.queue.peekCompleted()) == 0 {
logger.Info("No event batches to push")
s.lastSendDuration = 0
s.metrics.lastSendDuration.WithLabelValues(s.client.Name).Set(1e-6) // small value, to indicate that nothing happened
return
}

total := 0
totalEvents := 0
totalBatches := 0
startTime := time.Now()

// while there's still events in the queue, send them
// while there's still batches of events in the queue, send them
//
// If events are being added to the queue faster than we can send them, this loop will not
// terminate. For the most part, that's ok: worst-case, we miss the collectorFinished
// notification, which isn't the end of the world. Any long-running call to this function will
// be reported by s.metrics.lastSendDuration as we go (provided the request timeout isn't too
// long).
// If batches are being added to the queue faster than we can send them, this loop will not
// terminate. For the most part, that's ok: worst-case, we miss that the parent context has
// expired, which isn't the end of the world (eventually the autoscaler-agent will just be
// force-killed). Any long-running call to this function will be reported by
// s.metrics.lastSendDuration as we go (provided the request timeout isn't too long), so we
// should get observability for it either way.
for {
if size := s.queue.size(); size != 0 {
logger.Info("Current queue size is non-zero", zap.Int("queueSize", size))
}
batches := s.queue.peekCompleted()

chunk := s.queue.get(int(s.client.BaseConfig.MaxBatchSize))
count := len(chunk)
if count == 0 {
if len(batches) != 0 {
logger.Info("Current queue size is non-zero", zap.Int("batchCount", len(batches)))
} else {
totalTime := time.Since(startTime)
s.lastSendDuration = totalTime
s.metrics.lastSendDuration.WithLabelValues(s.client.Name).Set(totalTime.Seconds())

logger.Info(
"All available events have been sent",
zap.Int("total", total),
"All available event batches have been sent",
zap.Int("totalEvents", totalEvents),
zap.Int("totalBatches", totalBatches),
zap.Duration("totalTime", totalTime),
)
return
}

batch := batches[0]

req := s.client.Base.NewRequest()

logger.Info(
"Pushing events",
zap.Int("count", count),
"Pushing events batch",
zap.Int("count", len(batch.events)),
req.LogFields(),
)

Expand All @@ -115,7 +132,7 @@ func (s eventSender[E]) sendAllCurrentEvents(logger *zap.Logger) {
)
defer cancel()

payload, err := s.client.SerializeBatch(chunk)
payload, err := s.client.SerializeBatch(batch.events)
if err != nil {
return err
}
Expand All @@ -129,10 +146,11 @@ func (s eventSender[E]) sendAllCurrentEvents(logger *zap.Logger) {
// events.
logger.Error(
"Failed to push billing events",
zap.Int("count", count),
zap.Int("count", len(batch.events)),
zap.Duration("after", reqDuration),
req.LogFields(),
zap.Int("total", total),
zap.Int("totalEvents", totalEvents),
zap.Int("totalBatches", totalBatches),
zap.Duration("totalTime", time.Since(startTime)),
zap.Error(err),
)
Expand All @@ -145,16 +163,18 @@ func (s eventSender[E]) sendAllCurrentEvents(logger *zap.Logger) {
return
}

s.queue.drop(count) // mark len(chunk) as successfully processed
total += len(chunk)
s.queue.dropCompleted(1) // mark this batch as complete
totalEvents += len(batch.events)
totalBatches += 1
currentTotalTime := time.Since(startTime)

logger.Info(
"Successfully pushed some events",
zap.Int("count", count),
zap.Int("count", len(batch.events)),
zap.Duration("after", reqDuration),
req.LogFields(),
zap.Int("total", total),
zap.Int("totalEvents", totalEvents),
zap.Int("totalBatches", totalBatches),
zap.Duration("totalTime", currentTotalTime),
)

Expand Down
Loading

0 comments on commit f4786db

Please sign in to comment.