-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
reporting: Continuously send full batches of events
Part of #1220. In cases where (a) we expect batches to be very large, and (b) we expect that some autoscaler-agent instances may end up with many batches of events between reporting periods, we might see excessive memory usage from those events. So instead of pushing events exactly every push period, we should push *at least* every push period, and exactly when the next batch is full if it's full before the push period is reached.
- Loading branch information
Showing
5 changed files
with
279 additions
and
119 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package reporting | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
) | ||
|
||
type eventBatcher[E any] struct { | ||
mu sync.Mutex | ||
|
||
targetBatchSize int | ||
|
||
ongoing []E | ||
|
||
completed []batch[E] | ||
onComplete func() | ||
completedSize int | ||
|
||
sizeGauge prometheus.Gauge | ||
} | ||
|
||
type batch[E any] struct { | ||
events []E | ||
} | ||
|
||
func newEventBatcher[E any]( | ||
targetBatchSize int, | ||
notifyCompletedBatch func(), | ||
sizeGauge prometheus.Gauge, | ||
) *eventBatcher[E] { | ||
return &eventBatcher[E]{ | ||
mu: sync.Mutex{}, | ||
|
||
targetBatchSize: targetBatchSize, | ||
|
||
ongoing: []E{}, | ||
|
||
completed: []batch[E]{}, | ||
onComplete: notifyCompletedBatch, | ||
completedSize: 0, | ||
|
||
sizeGauge: sizeGauge, | ||
} | ||
} | ||
|
||
// enqueue adds an event to the current in-progress batch. | ||
// | ||
// If the target batch size is reached, the batch will be packaged up for consumption by | ||
// (*eventBatcher[E]).peekCompleted() and b.onComplete() will be called. | ||
func (b *eventBatcher[E]) enqueue(event E) { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
b.ongoing = append(b.ongoing, event) | ||
b.updateGauge() | ||
|
||
if len(b.ongoing) >= b.targetBatchSize { | ||
b.finishCurrentBatch() | ||
} | ||
} | ||
|
||
// finishOngoing collects any events that have not yet been packaged up into a batch, adding them to | ||
// a batch visible in (*eventBatcher[E]).peekCompleted(). | ||
// | ||
// If there are outstanding events when this method is called, b.onComplete() will be called. | ||
// Otherwise, it will not be called. | ||
func (b *eventBatcher[E]) finishOngoing() { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
if len(b.ongoing) == 0 { | ||
return | ||
} | ||
|
||
b.finishCurrentBatch() | ||
} | ||
|
||
// peekCompleted returns the batches that have been completed but not yet dropped, without modifying | ||
// the batcher. | ||
// | ||
// Once done with these batches, you should call (*eventBatcher[E]).dropCompleted() to remove them | ||
// from future consideration. | ||
func (b *eventBatcher[E]) peekCompleted() []batch[E] { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
tmp := make([]batch[E], len(b.completed)) | ||
copy(tmp, b.completed) | ||
return tmp | ||
} | ||
|
||
// dropCompleted drops the given number of batches from internal storage, removing them from view in | ||
// (*eventBatcher[E]).peekCompleted(). | ||
func (b *eventBatcher[e]) dropCompleted(batchCount int) { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
dropped := b.completed[:batchCount] | ||
b.completed = b.completed[batchCount:] | ||
|
||
for _, batch := range dropped { | ||
b.completedSize -= len(batch.events) | ||
} | ||
|
||
b.updateGauge() | ||
} | ||
|
||
// NB: must hold mu | ||
func (b *eventBatcher[E]) updateGauge() { | ||
b.sizeGauge.Set(float64(len(b.ongoing) + b.completedSize)) | ||
} | ||
|
||
// NB: must hold mu | ||
func (b *eventBatcher[E]) finishCurrentBatch() { | ||
b.completed = append(b.completed, batch[E]{ | ||
events: b.ongoing, | ||
}) | ||
|
||
b.completedSize += len(b.ongoing) | ||
b.ongoing = []E{} | ||
|
||
b.onComplete() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package reporting | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestEventBatching(t *testing.T) { | ||
targetBatchSize := 3 | ||
|
||
var notified bool | ||
notify := func() { | ||
notified = true | ||
} | ||
gauge := prometheus.NewGauge(prometheus.GaugeOpts{}) | ||
|
||
batcher := newEventBatcher[string](targetBatchSize, notify, gauge) | ||
|
||
// First batch: | ||
// Add a small number of items to the batch, and then explicitly request early completion. | ||
batcher.enqueue("b1-1") | ||
batcher.enqueue("b1-2") | ||
// check that there is not anything completed: | ||
assert.Equal(t, false, notified) | ||
assert.Equal(t, []batch[string]{}, batcher.peekCompleted()) | ||
// Request early completion: | ||
batcher.finishOngoing() | ||
// check that this batch was completed: | ||
assert.Equal(t, true, notified) | ||
assert.Equal(t, []batch[string]{{events: []string{"b1-1", "b1-2"}}}, batcher.peekCompleted()) | ||
// clear the current batch: | ||
notified = false | ||
batcher.dropCompleted(1) | ||
|
||
// Second, third, and fourth batches: | ||
// Add enough events that three batches are automatically created of appropriate sizes, and | ||
// check that | ||
batcher.enqueue("b2-1") | ||
batcher.enqueue("b2-2") | ||
assert.Equal(t, false, notified) | ||
batcher.enqueue("b2-3") | ||
assert.Equal(t, true, notified) | ||
notified = false // reset the notification | ||
batcher.enqueue("b3-1") | ||
batcher.enqueue("b3-2") | ||
assert.Equal(t, false, notified) | ||
batcher.enqueue("b3-3") | ||
assert.Equal(t, true, notified) | ||
notified = false // reset the notification | ||
// check that the batches so far match what we expect: | ||
assert.Equal(t, []batch[string]{ | ||
{events: []string{"b2-1", "b2-2", "b2-3"}}, | ||
{events: []string{"b3-1", "b3-2", "b3-3"}}, | ||
}, batcher.peekCompleted()) | ||
// add the last batch: | ||
batcher.enqueue("b4-1") | ||
batcher.enqueue("b4-2") | ||
assert.Equal(t, false, notified) | ||
batcher.enqueue("b4-3") | ||
assert.Equal(t, true, notified) | ||
// Check that the final batches are what we expect | ||
assert.Equal(t, []batch[string]{ | ||
{events: []string{"b2-1", "b2-2", "b2-3"}}, | ||
{events: []string{"b3-1", "b3-2", "b3-3"}}, | ||
{events: []string{"b4-1", "b4-2", "b4-3"}}, | ||
}, batcher.peekCompleted()) | ||
// Consume one batch: | ||
batcher.dropCompleted(1) | ||
// and now, it should just be b3 and b4: | ||
assert.Equal(t, []batch[string]{ | ||
{events: []string{"b3-1", "b3-2", "b3-3"}}, | ||
{events: []string{"b4-1", "b4-2", "b4-3"}}, | ||
}, batcher.peekCompleted()) | ||
// consume the final two: | ||
batcher.dropCompleted(2) | ||
// so, there should be nothing left: | ||
assert.Equal(t, []batch[string]{}, batcher.peekCompleted()) | ||
} |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.