Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reporting: Continuously send full batches of events #1226

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions pkg/reporting/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package reporting

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

type eventBatcher[E any] struct {
mu sync.Mutex

targetBatchSize int

ongoing []E

completed []batch[E]
onComplete func()
completedSize int

sizeGauge prometheus.Gauge
}

type batch[E any] struct {
events []E
}

func newEventBatcher[E any](
targetBatchSize int,
notifyCompletedBatch func(),
sizeGauge prometheus.Gauge,
) *eventBatcher[E] {
return &eventBatcher[E]{
mu: sync.Mutex{},

targetBatchSize: targetBatchSize,

ongoing: []E{},

completed: []batch[E]{},
onComplete: notifyCompletedBatch,
completedSize: 0,

sizeGauge: sizeGauge,
}
}

// enqueue adds an event to the current in-progress batch.
//
// If the target batch size is reached, the batch will be packaged up for consumption by
// (*eventBatcher[E]).peekCompleted() and b.onComplete() will be called.
func (b *eventBatcher[E]) enqueue(event E) {
b.mu.Lock()
defer b.mu.Unlock()

b.ongoing = append(b.ongoing, event)
b.updateGauge()

if len(b.ongoing) >= b.targetBatchSize {
b.finishCurrentBatch()
}
}

// finishOngoing collects any events that have not yet been packaged up into a batch, adding them to
// a batch visible in (*eventBatcher[E]).peekCompleted().
//
// If there are outstanding events when this method is called, b.onComplete() will be called.
// Otherwise, it will not be called.
func (b *eventBatcher[E]) finishOngoing() {
b.mu.Lock()
defer b.mu.Unlock()

if len(b.ongoing) == 0 {
return
}

b.finishCurrentBatch()
}

// peekCompleted returns the batches that have been completed but not yet dropped, without modifying
// the batcher.
//
// Once done with these batches, you should call (*eventBatcher[E]).dropCompleted() to remove them
// from future consideration.
func (b *eventBatcher[E]) peekCompleted() []batch[E] {
b.mu.Lock()
defer b.mu.Unlock()

tmp := make([]batch[E], len(b.completed))
copy(tmp, b.completed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why do we copy slice every time we peek into it?

return tmp
}

// dropCompleted drops the given number of batches from internal storage, removing them from view in
// (*eventBatcher[E]).peekCompleted().
func (b *eventBatcher[e]) dropCompleted(batchCount int) {
b.mu.Lock()
defer b.mu.Unlock()

dropped := b.completed[:batchCount]
b.completed = b.completed[batchCount:]

for _, batch := range dropped {
b.completedSize -= len(batch.events)
}

b.updateGauge()
}

// NB: must hold mu
func (b *eventBatcher[E]) updateGauge() {
b.sizeGauge.Set(float64(len(b.ongoing) + b.completedSize))
}

// NB: must hold mu
func (b *eventBatcher[E]) finishCurrentBatch() {
b.completed = append(b.completed, batch[E]{
events: b.ongoing,
})

b.completedSize += len(b.ongoing)
b.ongoing = []E{}

b.onComplete()
}
80 changes: 80 additions & 0 deletions pkg/reporting/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package reporting

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)

func TestEventBatching(t *testing.T) {
targetBatchSize := 3

var notified bool
notify := func() {
notified = true
}
gauge := prometheus.NewGauge(prometheus.GaugeOpts{})

batcher := newEventBatcher[string](targetBatchSize, notify, gauge)

// First batch:
Copy link
Contributor

@mikhail-sakhnov mikhail-sakhnov Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: do those batches rely on each other? if no, I suggest to wrap them in t.Run calls, it would be a bit nicer to read in the unittest run report

// Add a small number of items to the batch, and then explicitly request early completion.
batcher.enqueue("b1-1")
batcher.enqueue("b1-2")
// check that there is not anything completed:
assert.Equal(t, false, notified)
assert.Equal(t, []batch[string]{}, batcher.peekCompleted())
// Request early completion:
batcher.finishOngoing()
// check that this batch was completed:
assert.Equal(t, true, notified)
assert.Equal(t, []batch[string]{{events: []string{"b1-1", "b1-2"}}}, batcher.peekCompleted())
// clear the current batch:
notified = false
batcher.dropCompleted(1)

// Second, third, and fourth batches:
// Add enough events that three batches are automatically created of appropriate sizes, and
// check that
batcher.enqueue("b2-1")
batcher.enqueue("b2-2")
assert.Equal(t, false, notified)
batcher.enqueue("b2-3")
assert.Equal(t, true, notified)
notified = false // reset the notification
batcher.enqueue("b3-1")
batcher.enqueue("b3-2")
assert.Equal(t, false, notified)
batcher.enqueue("b3-3")
assert.Equal(t, true, notified)
notified = false // reset the notification
// check that the batches so far match what we expect:
assert.Equal(t, []batch[string]{
{events: []string{"b2-1", "b2-2", "b2-3"}},
{events: []string{"b3-1", "b3-2", "b3-3"}},
}, batcher.peekCompleted())
// add the last batch:
batcher.enqueue("b4-1")
batcher.enqueue("b4-2")
assert.Equal(t, false, notified)
batcher.enqueue("b4-3")
assert.Equal(t, true, notified)
// Check that the final batches are what we expect
assert.Equal(t, []batch[string]{
{events: []string{"b2-1", "b2-2", "b2-3"}},
{events: []string{"b3-1", "b3-2", "b3-3"}},
{events: []string{"b4-1", "b4-2", "b4-3"}},
}, batcher.peekCompleted())
// Consume one batch:
batcher.dropCompleted(1)
// and now, it should just be b3 and b4:
assert.Equal(t, []batch[string]{
{events: []string{"b3-1", "b3-2", "b3-3"}},
{events: []string{"b4-1", "b4-2", "b4-3"}},
}, batcher.peekCompleted())
// consume the final two:
batcher.dropCompleted(2)
// so, there should be nothing left:
assert.Equal(t, []batch[string]{}, batcher.peekCompleted())
}
76 changes: 0 additions & 76 deletions pkg/reporting/queue.go

This file was deleted.

Loading
Loading