Skip to content

Commit

Permalink
agent: Add scaling event reporting (#1107)
Browse files Browse the repository at this point in the history
This is part 2 of 2; see #1078 for the ground work and
neondatabase/cloud#15939 for the full context.

In short, this PR:

* Adds a new package: 'pkg/agent/scalingevents'
* Adds new callbacks to `core.State` to allow it to report on scaling
  events changes in desired CU.
  • Loading branch information
sharnoff authored Jan 24, 2025
1 parent 5a64efc commit 4e81d97
Show file tree
Hide file tree
Showing 14 changed files with 518 additions and 56 deletions.
6 changes: 6 additions & 0 deletions autoscaler-agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ data:
"accumulateEverySeconds": 24,
"clients": {}
},
"scalingEvents": {
"cuMultiplier": 0.25,
"rereportThreshold": 0.25,
"regionName": "replaceme",
"clients": {}
},
"monitor": {
"serverPort": 10301,
"responseTimeoutSeconds": 5,
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/billing/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func jsonMarshalEvents(events []*billing.IncrementalEvent) ([]byte, reporting.Si
// Returns a function to generate keys for the placement of billing events data into blob storage.
//
// Example: prefixInContainer/year=2021/month=01/day=26/hh:mm:ssZ_{uuid}.ndjson.gz
//
// NOTE: This key format is different from the one we use for scaling events, but similar to the one
// proxy/storage use.
func newBlobStorageKeyGenerator(prefix string) func() string {
return func() string {
now := time.Now()
Expand Down
22 changes: 19 additions & 3 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@ import (
"github.com/tychoish/fun/erc"

"github.com/neondatabase/autoscaling/pkg/agent/billing"
"github.com/neondatabase/autoscaling/pkg/agent/scalingevents"
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/reporting"
)

type Config struct {
RefreshStateIntervalSeconds uint `json:"refereshStateIntervalSeconds"`

Billing billing.Config `json:"billing"`
ScalingEvents scalingevents.Config `json:"scalingEvents"`

Scaling ScalingConfig `json:"scaling"`
Metrics MetricsConfig `json:"metrics"`
Scheduler SchedulerConfig `json:"scheduler"`
Monitor MonitorConfig `json:"monitor"`
NeonVM NeonVMConfig `json:"neonvm"`
Billing billing.Config `json:"billing"`
DumpState *DumpStateConfig `json:"dumpState"`
}

Expand Down Expand Up @@ -173,6 +176,10 @@ func (c *Config) validate() error {
erc.Whenf(ec, cfg.PushRequestTimeoutSeconds == 0, zeroTmpl, fmt.Sprintf("%s.pushRequestTimeoutSeconds", key))
erc.Whenf(ec, cfg.MaxBatchSize == 0, zeroTmpl, fmt.Sprintf("%s.maxBatchSize", key))
}
validateS3ReportingConfig := func(cfg *reporting.S3ClientConfig, key string) {
erc.Whenf(ec, cfg.Bucket == "", emptyTmpl, fmt.Sprintf(".%s.bucket", key))
erc.Whenf(ec, cfg.Region == "", emptyTmpl, fmt.Sprintf(".%s.region", key))
}

erc.Whenf(ec, c.Billing.ActiveTimeMetricName == "", emptyTmpl, ".billing.activeTimeMetricName")
erc.Whenf(ec, c.Billing.CPUMetricName == "", emptyTmpl, ".billing.cpuMetricName")
Expand All @@ -189,10 +196,19 @@ func (c *Config) validate() error {
}
if c.Billing.Clients.S3 != nil {
validateBaseReportingConfig(&c.Billing.Clients.S3.BaseClientConfig, "billing.clients.s3")
erc.Whenf(ec, c.Billing.Clients.S3.Bucket == "", emptyTmpl, ".billing.clients.s3.bucket")
erc.Whenf(ec, c.Billing.Clients.S3.Region == "", emptyTmpl, ".billing.clients.s3.region")
validateS3ReportingConfig(&c.Billing.Clients.S3.S3ClientConfig, ".billing.clients.s3")
erc.Whenf(ec, c.Billing.Clients.S3.PrefixInBucket == "", emptyTmpl, ".billing.clients.s3.prefixInBucket")
}

erc.Whenf(ec, c.ScalingEvents.CUMultiplier == 0, zeroTmpl, ".scalingEvents.cuMultiplier")
erc.Whenf(ec, c.ScalingEvents.RereportThreshold == 0, zeroTmpl, ".scalingEvents.rereportThreshold")
erc.Whenf(ec, c.ScalingEvents.RegionName == "", emptyTmpl, ".scalingEvents.regionName")
if c.ScalingEvents.Clients.S3 != nil {
validateBaseReportingConfig(&c.ScalingEvents.Clients.S3.BaseClientConfig, "scalingEvents.clients.s3")
validateS3ReportingConfig(&c.ScalingEvents.Clients.S3.S3ClientConfig, ".scalingEvents.clients.s3")
erc.Whenf(ec, c.ScalingEvents.Clients.S3.PrefixInBucket == "", emptyTmpl, ".scalingEvents.clients.s3.prefixInBucket")
}

erc.Whenf(ec, c.DumpState != nil && c.DumpState.Port == 0, zeroTmpl, ".dumpState.port")
erc.Whenf(ec, c.DumpState != nil && c.DumpState.TimeoutSeconds == 0, zeroTmpl, ".dumpState.timeoutSeconds")

Expand Down
63 changes: 38 additions & 25 deletions pkg/agent/core/goalcu.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,23 @@ import (
"github.com/neondatabase/autoscaling/pkg/api"
)

type scalingGoal struct {
hasAllMetrics bool
goalCU uint32
type ScalingGoal struct {
HasAllMetrics bool
Parts ScalingGoalParts
}

type ScalingGoalParts struct {
CPU *float64
Mem *float64
LFC *float64
}

func (g *ScalingGoal) GoalCU() uint32 {
return uint32(math.Ceil(max(
math.Round(lo.FromPtr(g.Parts.CPU)), // for historical compatibility, use round() instead of ceil()
lo.FromPtr(g.Parts.Mem),
lo.FromPtr(g.Parts.LFC),
)))
}

func calculateGoalCU(
Expand All @@ -24,38 +38,41 @@ func calculateGoalCU(
computeUnit api.Resources,
systemMetrics *SystemMetrics,
lfcMetrics *LFCMetrics,
) (scalingGoal, []zap.Field) {
) (ScalingGoal, []zap.Field) {
hasAllMetrics := systemMetrics != nil && (!*cfg.EnableLFCMetrics || lfcMetrics != nil)
if !hasAllMetrics {
warn("Making scaling decision without all required metrics available")
}

var lfcGoalCU, cpuGoalCU, memGoalCU, memTotalGoalCU uint32
var logFields []zap.Field
var parts ScalingGoalParts

var wss *api.Bytes // estimated working set size

if lfcMetrics != nil {
var lfcLogFunc func(zapcore.ObjectEncoder) error
var lfcGoalCU float64
lfcGoalCU, wss, lfcLogFunc = calculateLFCGoalCU(warn, cfg, computeUnit, *lfcMetrics)
parts.LFC = lo.ToPtr(lfcGoalCU)
if lfcLogFunc != nil {
logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFunc)))
}
}

if systemMetrics != nil {
cpuGoalCU = calculateCPUGoalCU(cfg, computeUnit, *systemMetrics)
cpuGoalCU := calculateCPUGoalCU(cfg, computeUnit, *systemMetrics)
parts.CPU = lo.ToPtr(cpuGoalCU)

memGoalCU = calculateMemGoalCU(cfg, computeUnit, *systemMetrics)
memGoalCU := calculateMemGoalCU(cfg, computeUnit, *systemMetrics)
parts.Mem = lo.ToPtr(memGoalCU)
}

if systemMetrics != nil && wss != nil {
memTotalGoalCU = calculateMemTotalGoalCU(cfg, computeUnit, *systemMetrics, *wss)
memTotalGoalCU := calculateMemTotalGoalCU(cfg, computeUnit, *systemMetrics, *wss)
parts.Mem = lo.ToPtr(max(*parts.Mem, memTotalGoalCU))
}

goalCU := max(cpuGoalCU, memGoalCU, memTotalGoalCU, lfcGoalCU)

return scalingGoal{hasAllMetrics: hasAllMetrics, goalCU: goalCU}, logFields
return ScalingGoal{HasAllMetrics: hasAllMetrics, Parts: parts}, logFields
}

// For CPU:
Expand All @@ -65,7 +82,7 @@ func calculateCPUGoalCU(
cfg api.ScalingConfig,
computeUnit api.Resources,
systemMetrics SystemMetrics,
) uint32 {
) float64 {
stableThreshold := *cfg.CPUStableZoneRatio * systemMetrics.LoadAverage5Min
mixedThreshold := stableThreshold + *cfg.CPUMixedZoneRatio*systemMetrics.LoadAverage5Min

Expand All @@ -77,7 +94,7 @@ func calculateCPUGoalCU(
blendedLoadAverage := load1Weight*systemMetrics.LoadAverage1Min + (1-load1Weight)*systemMetrics.LoadAverage5Min

goalCPUs := blendedLoadAverage / *cfg.LoadAverageFractionTarget
cpuGoalCU := uint32(math.Round(goalCPUs / computeUnit.VCPU.AsFloat64()))
cpuGoalCU := goalCPUs / computeUnit.VCPU.AsFloat64()
return cpuGoalCU
}

Expand All @@ -100,13 +117,11 @@ func calculateMemGoalCU(
cfg api.ScalingConfig,
computeUnit api.Resources,
systemMetrics SystemMetrics,
) uint32 {
) float64 {
// goal memory size, just looking at allocated memory (not including page cache...)
memGoalBytes := api.Bytes(math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget))
memGoalBytes := math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget)

// note: this is equal to ceil(memGoalBytes / computeUnit.Mem), because ceil(X/M) == floor((X+M-1)/M)
memGoalCU := uint32((memGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem)
return memGoalCU
return memGoalBytes / float64(computeUnit.Mem)
}

// goal memory size, looking at allocated memory and min(page cache usage, LFC working set size)
Expand All @@ -115,20 +130,19 @@ func calculateMemTotalGoalCU(
computeUnit api.Resources,
systemMetrics SystemMetrics,
wss api.Bytes,
) uint32 {
) float64 {
lfcCached := min(float64(wss), systemMetrics.MemoryCachedBytes)
totalGoalBytes := api.Bytes((lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget)
totalGoalBytes := (lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget

memTotalGoalCU := uint32((totalGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem)
return memTotalGoalCU
return totalGoalBytes / float64(computeUnit.Mem)
}

func calculateLFCGoalCU(
warn func(string),
cfg api.ScalingConfig,
computeUnit api.Resources,
lfcMetrics LFCMetrics,
) (uint32, *api.Bytes, func(zapcore.ObjectEncoder) error) {
) (float64, *api.Bytes, func(zapcore.ObjectEncoder) error) {
wssValues := lfcMetrics.ApproximateworkingSetSizeBuckets
// At this point, we can assume that the values are equally spaced at 1 minute apart,
// starting at 1 minute.
Expand Down Expand Up @@ -162,7 +176,6 @@ func calculateLFCGoalCU(
requiredMem := estimateWssMem / *cfg.LFCToMemoryRatio
// ... and then convert that into the actual CU required to fit the working set:
requiredCU := requiredMem / computeUnit.Mem.AsFloat64()
lfcGoalCU := uint32(math.Ceil(requiredCU))

lfcLogFields := func(obj zapcore.ObjectEncoder) error {
obj.AddFloat64("estimateWssPages", estimateWss)
Expand All @@ -171,6 +184,6 @@ func calculateLFCGoalCU(
return nil
}

return lfcGoalCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields
return requiredCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields
}
}
62 changes: 43 additions & 19 deletions pkg/agent/core/goalcu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ func Test_calculateGoalCU(t *testing.T) {
cfgUpdater func(*api.ScalingConfig)
sys *SystemMetrics
lfc *LFCMetrics
want scalingGoal
want ScalingGoal
}{
{
name: "basic",
cfgUpdater: nil,
sys: nil,
lfc: nil,
want: scalingGoal{
goalCU: 0,
hasAllMetrics: false,
want: ScalingGoal{
HasAllMetrics: false,
Parts: ScalingGoalParts{
CPU: nil,
Mem: nil,
LFC: nil,
},
},
},
{
Expand All @@ -53,9 +57,13 @@ func Test_calculateGoalCU(t *testing.T) {
LoadAverage1Min: 0.2,
},
lfc: nil,
want: scalingGoal{
goalCU: 1,
hasAllMetrics: false,
want: ScalingGoal{
HasAllMetrics: false,
Parts: ScalingGoalParts{
CPU: lo.ToPtr(0.8),
Mem: lo.ToPtr(0.0),
LFC: nil,
},
},
},
{
Expand All @@ -66,9 +74,13 @@ func Test_calculateGoalCU(t *testing.T) {
LoadAverage1Min: 1,
},
lfc: nil,
want: scalingGoal{
goalCU: 4,
hasAllMetrics: false,
want: ScalingGoal{
HasAllMetrics: false,
Parts: ScalingGoalParts{
CPU: lo.ToPtr(4.0),
Mem: lo.ToPtr(0.0),
LFC: nil,
},
},
},
{
Expand All @@ -82,9 +94,13 @@ func Test_calculateGoalCU(t *testing.T) {
LoadAverage5Min: 0.0,
},
lfc: nil,
want: scalingGoal{
goalCU: 3,
hasAllMetrics: false,
want: ScalingGoal{
HasAllMetrics: false,
Parts: ScalingGoalParts{
CPU: lo.ToPtr(2.8),
Mem: lo.ToPtr(0.0),
LFC: nil,
},
},
},
{
Expand All @@ -99,9 +115,13 @@ func Test_calculateGoalCU(t *testing.T) {
MemoryCachedBytes: 0,
},
lfc: nil,
want: scalingGoal{
goalCU: 3,
hasAllMetrics: false,
want: ScalingGoal{
HasAllMetrics: false,
Parts: ScalingGoalParts{
CPU: lo.ToPtr(2.8),
Mem: lo.ToPtr(0.0),
LFC: nil,
},
},
},
{
Expand All @@ -117,9 +137,13 @@ func Test_calculateGoalCU(t *testing.T) {
MemoryCachedBytes: 0,
},
lfc: nil,
want: scalingGoal{
goalCU: 5, // Weighted average between 7 and 4 CUs
hasAllMetrics: false,
want: ScalingGoal{
HasAllMetrics: false,
Parts: ScalingGoalParts{
CPU: lo.ToPtr(5.499997000005999),
Mem: lo.ToPtr(0.0),
LFC: nil,
},
},
},
}
Expand Down
Loading

0 comments on commit 4e81d97

Please sign in to comment.