Skip to content

Commit

Permalink
reporting: Serialize batches as they're constructed
Browse files Browse the repository at this point in the history
Part of #1220.

For scaling events, the size of a gzip-compressed batch is *much* less
than the uncompressed version, so it's actually worth it to compress the
JSON-encoded contents of the batch as we go, instead of buffering the
events as a []ScalingEvent in memory.

**Implementation**

This change primarily adds the reporting.BatchBuilder[E] type (and some
implementations), which abstracts over the process of gradually building
the serialized bytes from a batch of events.

Two implementations of BatchBuilder are provided in pkg/reporting - one
for large JSON arrays, and one for the JSON lines format.

To allow continuous gzip compression, both of these JSON batch builders
are constructed with a new 'IOBuffer' type, which is just an io.Writer
with a method to collect the bytes at the end. There's implementations
of IOBuffer for normal bytes and for a gzip-compressed buffer.

All that together means that batches of billing and scaling events
should now be GZIP-compressed (where possible) as events are added to
the batches, dramatically reducing the memory footprint.
  • Loading branch information
sharnoff committed Jan 29, 2025
1 parent f4786db commit a4afcb3
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 162 deletions.
34 changes: 16 additions & 18 deletions pkg/agent/billing/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
logger.Info("Created HTTP client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "http",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: jsonMarshalEvents, // note: NOT gzipped.
Name: "http",
Base: client,
BaseConfig: c.BaseClientConfig,
NewBatchBuilder: jsonArrayBatch(reporting.NewByteBuffer), // note: NOT gzipped.
})

}
Expand All @@ -67,10 +67,10 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
logger.Info("Created Azure Blob Storage client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "azureblob",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents),
Name: "azureblob",
Base: client,
BaseConfig: c.BaseClientConfig,
NewBatchBuilder: jsonArrayBatch(reporting.NewGZIPBuffer),
})
}
if c := cfg.S3; c != nil {
Expand All @@ -82,22 +82,20 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
logger.Info("Created S3 client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents),
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
NewBatchBuilder: jsonArrayBatch(reporting.NewGZIPBuffer),
})
}

return clients, nil
}

func jsonMarshalEvents(events []*billing.IncrementalEvent) ([]byte, reporting.SimplifiableError) {
obj := struct {
Events []*billing.IncrementalEvent `json:"events"`
}{Events: events}

return reporting.JSONMarshalBatch(&obj)
func jsonArrayBatch[B reporting.IOBuffer](buf func() B) func() reporting.BatchBuilder[*billing.IncrementalEvent] {
return func() reporting.BatchBuilder[*billing.IncrementalEvent] {
return reporting.NewJSONArrayBuilder[*billing.IncrementalEvent](buf())
}
}

// Returns a function to generate keys for the placement of billing events data into blob storage.
Expand Down
14 changes: 10 additions & 4 deletions pkg/agent/scalingevents/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,22 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
logger.Info("Created S3 client for scaling events", zap.Any("config", c))

clients = append(clients, eventsClient{
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: reporting.WrapSerialize[ScalingEvent](reporting.GZIPCompress, reporting.JSONLinesMarshalBatch),
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
NewBatchBuilder: jsonLinesBatch(reporting.NewGZIPBuffer),
})
}

return clients, nil
}

func jsonLinesBatch[B reporting.IOBuffer](buf func() B) func() reporting.BatchBuilder[ScalingEvent] {
return func() reporting.BatchBuilder[ScalingEvent] {
return reporting.NewJSONArrayBuilder[ScalingEvent](buf())
}
}

// Returns a function to generate keys for the placement of scaling events data into blob storage.
//
// Example: prefix/2024/10/31/23/events_{uuid}.ndjson.gz (11pm on halloween, UTC)
Expand Down
58 changes: 58 additions & 0 deletions pkg/reporting/batch_jsonarray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package reporting

import (
"encoding/json"
"fmt"
)

var _ BatchBuilder[int] = (*JSONArrayBuilder[int])(nil)

// JSONArrayBuilder is a BatchBuilder where all the events in a batch are serialized as a single
// large JSON array.
type JSONArrayBuilder[E any] struct {
buf IOBuffer
started bool
}

// NewJSONArrayBatch creates a new JSONArrayBuilder using the underlying IOBuffer to potentially
// process the JSON encoding -- either with ByteBuffer for plaintext or GZIPBuffer for gzip
// compression.
func NewJSONArrayBuilder[E any](buf IOBuffer) *JSONArrayBuilder[E] {
// open the array:
if _, err := buf.Write([]byte{'['}); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}

return &JSONArrayBuilder[E]{
buf: buf,
started: false,
}
}

func (b *JSONArrayBuilder[E]) Add(event E) {
if b.started {
if _, err := b.buf.Write([]byte("\n\t,")); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
}

// note: we use a discrete json.Marshal here instead of json.Encoder becaues encoder adds a
// newline at the end, and that'll make the formatting weird for us.
tmpJSON, err := json.Marshal(event)
if err != nil {
panic(fmt.Sprintf("failed to JSON encode: %s", err))
}

if _, err := b.buf.Write(tmpJSON); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
b.started = true
}

func (b *JSONArrayBuilder[E]) Finish() []byte {
if _, err := b.buf.Write([]byte("\n]")); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}

return b.buf.Collect()
}
38 changes: 38 additions & 0 deletions pkg/reporting/batch_jsonlines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package reporting

import (
"encoding/json"
"fmt"
)

var _ BatchBuilder[int] = (*JSONLinesBuilder[int])(nil)

// JSONLinesBuilder is a BatchBuilder where each event in the batch is serialized as a separate JSON
// object on its own line, adhering to the "JSON lines"/"jsonl" format.
type JSONLinesBuilder[E any] struct {
buf IOBuffer
}

func NewJSONLinesBuilder[E any](buf IOBuffer) *JSONLinesBuilder[E] {
return &JSONLinesBuilder[E]{
buf: buf,
}
}

func (b *JSONLinesBuilder[E]) Add(event E) {
tmpJSON, err := json.Marshal(event)
if err != nil {
panic(fmt.Sprintf("failed to JSON encode: %s", err))
}

if _, err := b.buf.Write(tmpJSON); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
if _, err := b.buf.Write([]byte{'\n'}); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
}

func (b *JSONLinesBuilder[E]) Finish() []byte {
return b.buf.Collect()
}
42 changes: 31 additions & 11 deletions pkg/reporting/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,25 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// BatchBuilder is an interface for gradually converting []E to []byte, allowing us to construct
// batches of events without buffering them uncompressed, in memory.
//
// Implementations of BatchBuilder are defined in various 'batch_*.go' files.
type BatchBuilder[E any] interface {
// Add appends an event to the in-progress batch.
Add(event E)
// Finish completes the in-progress batch, returning the events serialized as bytes.
Finish() []byte
}

type eventBatcher[E any] struct {
mu sync.Mutex

targetBatchSize int

ongoing []E
newBatch func() BatchBuilder[E]
ongoing BatchBuilder[E]
ongoingSize int

completed []batch[E]
onComplete func()
Expand All @@ -21,11 +34,13 @@ type eventBatcher[E any] struct {
}

type batch[E any] struct {
events []E
serialized []byte
count int
}

func newEventBatcher[E any](
targetBatchSize int,
newBatch func() BatchBuilder[E],
notifyCompletedBatch func(),
sizeGauge prometheus.Gauge,
) *eventBatcher[E] {
Expand All @@ -34,7 +49,9 @@ func newEventBatcher[E any](

targetBatchSize: targetBatchSize,

ongoing: []E{},
newBatch: newBatch,
ongoing: newBatch(),
ongoingSize: 0,

completed: []batch[E]{},
onComplete: notifyCompletedBatch,
Expand All @@ -52,10 +69,11 @@ func (b *eventBatcher[E]) enqueue(event E) {
b.mu.Lock()
defer b.mu.Unlock()

b.ongoing = append(b.ongoing, event)
b.ongoing.Add(event)
b.ongoingSize += 1
b.updateGauge()

if len(b.ongoing) >= b.targetBatchSize {
if b.ongoingSize >= b.targetBatchSize {
b.finishCurrentBatch()
}
}
Expand All @@ -69,7 +87,7 @@ func (b *eventBatcher[E]) finishOngoing() {
b.mu.Lock()
defer b.mu.Unlock()

if len(b.ongoing) == 0 {
if b.ongoingSize == 0 {
return
}

Expand Down Expand Up @@ -100,25 +118,27 @@ func (b *eventBatcher[e]) dropCompleted(batchCount int) {
b.completed = b.completed[batchCount:]

for _, batch := range dropped {
b.completedSize -= len(batch.events)
b.completedSize -= batch.count
}

b.updateGauge()
}

// NB: must hold mu
func (b *eventBatcher[E]) updateGauge() {
b.sizeGauge.Set(float64(len(b.ongoing) + b.completedSize))
b.sizeGauge.Set(float64(b.ongoingSize + b.completedSize))
}

// NB: must hold mu
func (b *eventBatcher[E]) finishCurrentBatch() {
b.completed = append(b.completed, batch[E]{
events: b.ongoing,
serialized: b.ongoing.Finish(),
count: b.ongoingSize,
})

b.completedSize += len(b.ongoing)
b.ongoing = []E{}
b.completedSize += b.ongoingSize
b.ongoingSize = 0
b.ongoing = b.newBatch()

b.onComplete()
}
Loading

0 comments on commit a4afcb3

Please sign in to comment.