-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
reporting: Continuously send full batches of events #1226
base: main
Are you sure you want to change the base?
Conversation
No changes to the coverage.
HTML Report |
Part of #1220. In cases where (a) we expect batches to be very large, and (b) we expect that some autoscaler-agent instances may end up with many batches of events between reporting periods, we might see excessive memory usage from those events. So instead of pushing events exactly every push period, we should push *at least* every push period, and exactly when the next batch is full if it's full before the push period is reached.
95a12a0
to
b6d3c65
Compare
|
||
batcher := newEventBatcher[string](targetBatchSize, notify, gauge) | ||
|
||
// First batch: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: do those batches rely on each other? if no, I suggest to wrap them in t.Run calls, it would be a bit nicer to read in the unittest run report
@@ -37,8 +41,10 @@ type eventSender[E any] struct { | |||
} | |||
|
|||
func (s eventSender[E]) senderLoop(ctx context.Context, logger *zap.Logger) { | |||
ticker := time.NewTicker(time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds)) | |||
defer ticker.Stop() | |||
heartbeat := time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick (out of context): BaseConfig could have those fields defined as time.Duration which supports json marshaling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC time.Duration
doesn't have fancy JSON marshaling (it's just an int64 of nanoseconds), unless I'm missing something?
// finish up any in-progress batch, so that we can send it before we exit. | ||
s.queue.finishOngoing() | ||
case <-s.batchComplete.Wait(): | ||
s.batchComplete.Awake() // consume this notification |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: why is that needed?
sidenote, nitpick: comment is a great example of "what we do" statement while it would be more useful to have "why we do" statement
defer ticker.Stop() | ||
heartbeat := time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds) | ||
|
||
timer := time.NewTimer(heartbeat) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question, suggestion: Since we are effectively have a loop here, should it be ticker instead of timer?
defer b.mu.Unlock() | ||
|
||
tmp := make([]batch[E], len(b.completed)) | ||
copy(tmp, b.completed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: why do we copy slice every time we peek into it?
if size := s.queue.size(); size != 0 { | ||
logger.Info("Current queue size is non-zero", zap.Int("queueSize", size)) | ||
} | ||
batches := s.queue.peekCompleted() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: It looks like we never use batches as a collection of batches, rather sticking with the first completed batch only. Should the queue API be changed to something like "peekLatestCompleted" and "peekCompletedCount"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks legit, left few questions, none of them I consider as a blocker for merging.
Part of #1220.
In cases where (a) we expect batches to be very large, and (b) we expect that some autoscaler-agent instances may end up with many batches of events between reporting periods, we might see excessive memory usage from those events.
So instead of pushing events exactly every push period, we should push at least every push period, and exactly when the next batch is full if it's full before the push period is reached.
Implementation
The new implementation replaces the event queue between
reporting.EventSink
and the event sender threads with a queue-like construct that collects events into discrete batches. This new construct iseventBatcher
, in the newbatcher.go
file.Then, in all of the places where the event sender previously dealt with collecting chunks of events, it instead operates on a single "batch" of events at a time.
The gauge metric showing the number of events in the queue still works in the same way it did before, counting both events in batches that haven't been finalized, and finalized batches that haven't been sent yet.
Note: This PR builds on #1221, and must not be merged before it.