diff --git a/autoscaler-agent/config_map.yaml b/autoscaler-agent/config_map.yaml index 184f7e0e3..94b3460d7 100644 --- a/autoscaler-agent/config_map.yaml +++ b/autoscaler-agent/config_map.yaml @@ -29,6 +29,12 @@ data: "accumulateEverySeconds": 24, "clients": {} }, + "scalingEvents": { + "cuMultiplier": 0.25, + "rereportThreshold": 0.25, + "regionName": "replaceme", + "clients": {} + }, "monitor": { "serverPort": 10301, "responseTimeoutSeconds": 5, diff --git a/pkg/agent/billing/clients.go b/pkg/agent/billing/clients.go index 1a243e637..951de3a2d 100644 --- a/pkg/agent/billing/clients.go +++ b/pkg/agent/billing/clients.go @@ -103,6 +103,9 @@ func jsonMarshalEvents(events []*billing.IncrementalEvent) ([]byte, reporting.Si // Returns a function to generate keys for the placement of billing events data into blob storage. // // Example: prefixInContainer/year=2021/month=01/day=26/hh:mm:ssZ_{uuid}.ndjson.gz +// +// NOTE: This key format is different from the one we use for scaling events, but similar to the one +// proxy/storage use. func newBlobStorageKeyGenerator(prefix string) func() string { return func() string { now := time.Now() diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 254e7f6ae..c6f39939e 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -8,6 +8,7 @@ import ( "github.com/tychoish/fun/erc" "github.com/neondatabase/autoscaling/pkg/agent/billing" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/reporting" ) @@ -15,12 +16,14 @@ import ( type Config struct { RefreshStateIntervalSeconds uint `json:"refereshStateIntervalSeconds"` + Billing billing.Config `json:"billing"` + ScalingEvents scalingevents.Config `json:"scalingEvents"` + Scaling ScalingConfig `json:"scaling"` Metrics MetricsConfig `json:"metrics"` Scheduler SchedulerConfig `json:"scheduler"` Monitor MonitorConfig `json:"monitor"` NeonVM NeonVMConfig `json:"neonvm"` - Billing billing.Config `json:"billing"` DumpState *DumpStateConfig `json:"dumpState"` } @@ -173,6 +176,10 @@ func (c *Config) validate() error { erc.Whenf(ec, cfg.PushRequestTimeoutSeconds == 0, zeroTmpl, fmt.Sprintf("%s.pushRequestTimeoutSeconds", key)) erc.Whenf(ec, cfg.MaxBatchSize == 0, zeroTmpl, fmt.Sprintf("%s.maxBatchSize", key)) } + validateS3ReportingConfig := func(cfg *reporting.S3ClientConfig, key string) { + erc.Whenf(ec, cfg.Bucket == "", emptyTmpl, fmt.Sprintf(".%s.bucket", key)) + erc.Whenf(ec, cfg.Region == "", emptyTmpl, fmt.Sprintf(".%s.region", key)) + } erc.Whenf(ec, c.Billing.ActiveTimeMetricName == "", emptyTmpl, ".billing.activeTimeMetricName") erc.Whenf(ec, c.Billing.CPUMetricName == "", emptyTmpl, ".billing.cpuMetricName") @@ -189,10 +196,19 @@ func (c *Config) validate() error { } if c.Billing.Clients.S3 != nil { validateBaseReportingConfig(&c.Billing.Clients.S3.BaseClientConfig, "billing.clients.s3") - erc.Whenf(ec, c.Billing.Clients.S3.Bucket == "", emptyTmpl, ".billing.clients.s3.bucket") - erc.Whenf(ec, c.Billing.Clients.S3.Region == "", emptyTmpl, ".billing.clients.s3.region") + validateS3ReportingConfig(&c.Billing.Clients.S3.S3ClientConfig, ".billing.clients.s3") erc.Whenf(ec, c.Billing.Clients.S3.PrefixInBucket == "", emptyTmpl, ".billing.clients.s3.prefixInBucket") } + + erc.Whenf(ec, c.ScalingEvents.CUMultiplier == 0, zeroTmpl, ".scalingEvents.cuMultiplier") + erc.Whenf(ec, c.ScalingEvents.RereportThreshold == 0, zeroTmpl, ".scalingEvents.rereportThreshold") + erc.Whenf(ec, c.ScalingEvents.RegionName == "", emptyTmpl, ".scalingEvents.regionName") + if c.ScalingEvents.Clients.S3 != nil { + validateBaseReportingConfig(&c.ScalingEvents.Clients.S3.BaseClientConfig, "scalingEvents.clients.s3") + validateS3ReportingConfig(&c.ScalingEvents.Clients.S3.S3ClientConfig, ".scalingEvents.clients.s3") + erc.Whenf(ec, c.ScalingEvents.Clients.S3.PrefixInBucket == "", emptyTmpl, ".scalingEvents.clients.s3.prefixInBucket") + } + erc.Whenf(ec, c.DumpState != nil && c.DumpState.Port == 0, zeroTmpl, ".dumpState.port") erc.Whenf(ec, c.DumpState != nil && c.DumpState.TimeoutSeconds == 0, zeroTmpl, ".dumpState.timeoutSeconds") diff --git a/pkg/agent/core/goalcu.go b/pkg/agent/core/goalcu.go index 7d93daed8..9498206ea 100644 --- a/pkg/agent/core/goalcu.go +++ b/pkg/agent/core/goalcu.go @@ -13,9 +13,23 @@ import ( "github.com/neondatabase/autoscaling/pkg/api" ) -type scalingGoal struct { - hasAllMetrics bool - goalCU uint32 +type ScalingGoal struct { + HasAllMetrics bool + Parts ScalingGoalParts +} + +type ScalingGoalParts struct { + CPU *float64 + Mem *float64 + LFC *float64 +} + +func (g *ScalingGoal) GoalCU() uint32 { + return uint32(math.Ceil(max( + math.Round(lo.FromPtr(g.Parts.CPU)), // for historical compatibility, use round() instead of ceil() + lo.FromPtr(g.Parts.Mem), + lo.FromPtr(g.Parts.LFC), + ))) } func calculateGoalCU( @@ -24,38 +38,41 @@ func calculateGoalCU( computeUnit api.Resources, systemMetrics *SystemMetrics, lfcMetrics *LFCMetrics, -) (scalingGoal, []zap.Field) { +) (ScalingGoal, []zap.Field) { hasAllMetrics := systemMetrics != nil && (!*cfg.EnableLFCMetrics || lfcMetrics != nil) if !hasAllMetrics { warn("Making scaling decision without all required metrics available") } - var lfcGoalCU, cpuGoalCU, memGoalCU, memTotalGoalCU uint32 var logFields []zap.Field + var parts ScalingGoalParts var wss *api.Bytes // estimated working set size if lfcMetrics != nil { var lfcLogFunc func(zapcore.ObjectEncoder) error + var lfcGoalCU float64 lfcGoalCU, wss, lfcLogFunc = calculateLFCGoalCU(warn, cfg, computeUnit, *lfcMetrics) + parts.LFC = lo.ToPtr(lfcGoalCU) if lfcLogFunc != nil { logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFunc))) } } if systemMetrics != nil { - cpuGoalCU = calculateCPUGoalCU(cfg, computeUnit, *systemMetrics) + cpuGoalCU := calculateCPUGoalCU(cfg, computeUnit, *systemMetrics) + parts.CPU = lo.ToPtr(cpuGoalCU) - memGoalCU = calculateMemGoalCU(cfg, computeUnit, *systemMetrics) + memGoalCU := calculateMemGoalCU(cfg, computeUnit, *systemMetrics) + parts.Mem = lo.ToPtr(memGoalCU) } if systemMetrics != nil && wss != nil { - memTotalGoalCU = calculateMemTotalGoalCU(cfg, computeUnit, *systemMetrics, *wss) + memTotalGoalCU := calculateMemTotalGoalCU(cfg, computeUnit, *systemMetrics, *wss) + parts.Mem = lo.ToPtr(max(*parts.Mem, memTotalGoalCU)) } - goalCU := max(cpuGoalCU, memGoalCU, memTotalGoalCU, lfcGoalCU) - - return scalingGoal{hasAllMetrics: hasAllMetrics, goalCU: goalCU}, logFields + return ScalingGoal{HasAllMetrics: hasAllMetrics, Parts: parts}, logFields } // For CPU: @@ -65,7 +82,7 @@ func calculateCPUGoalCU( cfg api.ScalingConfig, computeUnit api.Resources, systemMetrics SystemMetrics, -) uint32 { +) float64 { stableThreshold := *cfg.CPUStableZoneRatio * systemMetrics.LoadAverage5Min mixedThreshold := stableThreshold + *cfg.CPUMixedZoneRatio*systemMetrics.LoadAverage5Min @@ -77,7 +94,7 @@ func calculateCPUGoalCU( blendedLoadAverage := load1Weight*systemMetrics.LoadAverage1Min + (1-load1Weight)*systemMetrics.LoadAverage5Min goalCPUs := blendedLoadAverage / *cfg.LoadAverageFractionTarget - cpuGoalCU := uint32(math.Round(goalCPUs / computeUnit.VCPU.AsFloat64())) + cpuGoalCU := goalCPUs / computeUnit.VCPU.AsFloat64() return cpuGoalCU } @@ -100,13 +117,11 @@ func calculateMemGoalCU( cfg api.ScalingConfig, computeUnit api.Resources, systemMetrics SystemMetrics, -) uint32 { +) float64 { // goal memory size, just looking at allocated memory (not including page cache...) - memGoalBytes := api.Bytes(math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget)) + memGoalBytes := math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget) - // note: this is equal to ceil(memGoalBytes / computeUnit.Mem), because ceil(X/M) == floor((X+M-1)/M) - memGoalCU := uint32((memGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem) - return memGoalCU + return memGoalBytes / float64(computeUnit.Mem) } // goal memory size, looking at allocated memory and min(page cache usage, LFC working set size) @@ -115,12 +130,11 @@ func calculateMemTotalGoalCU( computeUnit api.Resources, systemMetrics SystemMetrics, wss api.Bytes, -) uint32 { +) float64 { lfcCached := min(float64(wss), systemMetrics.MemoryCachedBytes) - totalGoalBytes := api.Bytes((lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget) + totalGoalBytes := (lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget - memTotalGoalCU := uint32((totalGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem) - return memTotalGoalCU + return totalGoalBytes / float64(computeUnit.Mem) } func calculateLFCGoalCU( @@ -128,7 +142,7 @@ func calculateLFCGoalCU( cfg api.ScalingConfig, computeUnit api.Resources, lfcMetrics LFCMetrics, -) (uint32, *api.Bytes, func(zapcore.ObjectEncoder) error) { +) (float64, *api.Bytes, func(zapcore.ObjectEncoder) error) { wssValues := lfcMetrics.ApproximateworkingSetSizeBuckets // At this point, we can assume that the values are equally spaced at 1 minute apart, // starting at 1 minute. @@ -162,7 +176,6 @@ func calculateLFCGoalCU( requiredMem := estimateWssMem / *cfg.LFCToMemoryRatio // ... and then convert that into the actual CU required to fit the working set: requiredCU := requiredMem / computeUnit.Mem.AsFloat64() - lfcGoalCU := uint32(math.Ceil(requiredCU)) lfcLogFields := func(obj zapcore.ObjectEncoder) error { obj.AddFloat64("estimateWssPages", estimateWss) @@ -171,6 +184,6 @@ func calculateLFCGoalCU( return nil } - return lfcGoalCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields + return requiredCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields } } diff --git a/pkg/agent/core/goalcu_test.go b/pkg/agent/core/goalcu_test.go index 021276245..17609cca1 100644 --- a/pkg/agent/core/goalcu_test.go +++ b/pkg/agent/core/goalcu_test.go @@ -33,16 +33,20 @@ func Test_calculateGoalCU(t *testing.T) { cfgUpdater func(*api.ScalingConfig) sys *SystemMetrics lfc *LFCMetrics - want scalingGoal + want ScalingGoal }{ { name: "basic", cfgUpdater: nil, sys: nil, lfc: nil, - want: scalingGoal{ - goalCU: 0, - hasAllMetrics: false, + want: ScalingGoal{ + HasAllMetrics: false, + Parts: ScalingGoalParts{ + CPU: nil, + Mem: nil, + LFC: nil, + }, }, }, { @@ -53,9 +57,13 @@ func Test_calculateGoalCU(t *testing.T) { LoadAverage1Min: 0.2, }, lfc: nil, - want: scalingGoal{ - goalCU: 1, - hasAllMetrics: false, + want: ScalingGoal{ + HasAllMetrics: false, + Parts: ScalingGoalParts{ + CPU: lo.ToPtr(0.8), + Mem: lo.ToPtr(0.0), + LFC: nil, + }, }, }, { @@ -66,9 +74,13 @@ func Test_calculateGoalCU(t *testing.T) { LoadAverage1Min: 1, }, lfc: nil, - want: scalingGoal{ - goalCU: 4, - hasAllMetrics: false, + want: ScalingGoal{ + HasAllMetrics: false, + Parts: ScalingGoalParts{ + CPU: lo.ToPtr(4.0), + Mem: lo.ToPtr(0.0), + LFC: nil, + }, }, }, { @@ -82,9 +94,13 @@ func Test_calculateGoalCU(t *testing.T) { LoadAverage5Min: 0.0, }, lfc: nil, - want: scalingGoal{ - goalCU: 3, - hasAllMetrics: false, + want: ScalingGoal{ + HasAllMetrics: false, + Parts: ScalingGoalParts{ + CPU: lo.ToPtr(2.8), + Mem: lo.ToPtr(0.0), + LFC: nil, + }, }, }, { @@ -99,9 +115,13 @@ func Test_calculateGoalCU(t *testing.T) { MemoryCachedBytes: 0, }, lfc: nil, - want: scalingGoal{ - goalCU: 3, - hasAllMetrics: false, + want: ScalingGoal{ + HasAllMetrics: false, + Parts: ScalingGoalParts{ + CPU: lo.ToPtr(2.8), + Mem: lo.ToPtr(0.0), + LFC: nil, + }, }, }, { @@ -117,9 +137,13 @@ func Test_calculateGoalCU(t *testing.T) { MemoryCachedBytes: 0, }, lfc: nil, - want: scalingGoal{ - goalCU: 5, // Weighted average between 7 and 4 CUs - hasAllMetrics: false, + want: ScalingGoal{ + HasAllMetrics: false, + Parts: ScalingGoalParts{ + CPU: lo.ToPtr(5.499997000005999), + Mem: lo.ToPtr(0.0), + LFC: nil, + }, }, }, } diff --git a/pkg/agent/core/state.go b/pkg/agent/core/state.go index b7db040dd..068cba27a 100644 --- a/pkg/agent/core/state.go +++ b/pkg/agent/core/state.go @@ -38,8 +38,16 @@ type ObservabilityCallbacks struct { PluginLatency revsource.ObserveCallback MonitorLatency revsource.ObserveCallback NeonVMLatency revsource.ObserveCallback + + ActualScaling ReportActualScalingEventCallback + HypotheticalScaling ReportHypotheticalScalingEventCallback } +type ( + ReportActualScalingEventCallback func(timestamp time.Time, current uint32, target uint32) + ReportHypotheticalScalingEventCallback func(timestamp time.Time, current uint32, target uint32, parts ScalingGoalParts) +) + type RevisionSource interface { Next(ts time.Time, flags vmv1.Flag) vmv1.Revision Observe(moment time.Time, rev vmv1.Revision) error @@ -727,6 +735,17 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) ( // 2. Cap the goal CU by min/max, etc // 3. that's it! + reportGoals := func(goalCU uint32, parts ScalingGoalParts) { + currentCU, ok := s.VM.Using().DivResources(s.Config.ComputeUnit) + if !ok { + return // skip reporting if the current CU is not right. + } + + if report := s.Config.ObservabilityCallbacks.HypotheticalScaling; report != nil { + report(now, uint32(currentCU), goalCU, parts) + } + } + sg, goalCULogFields := calculateGoalCU( s.warn, s.scalingConfig(), @@ -734,10 +753,14 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) ( s.Metrics, s.LFCMetrics, ) - goalCU := sg.goalCU + goalCU := sg.GoalCU() // If we don't have all the metrics we need, we'll later prevent downscaling to avoid flushing // the VM's cache on autoscaler-agent restart if we have SystemMetrics but not LFCMetrics. - hasAllMetrics := sg.hasAllMetrics + hasAllMetrics := sg.HasAllMetrics + + if hasAllMetrics { + reportGoals(goalCU, sg.Parts) + } // Copy the initial value of the goal CU so that we can accurately track whether either // requested upscaling or denied downscaling affected the outcome. @@ -1220,6 +1243,15 @@ func (s *State) NeonVM() NeonVMHandle { } func (h NeonVMHandle) StartingRequest(now time.Time, resources api.Resources) { + if report := h.s.Config.ObservabilityCallbacks.ActualScaling; report != nil { + currentCU, currentOk := h.s.VM.Using().DivResources(h.s.Config.ComputeUnit) + targetCU, targetOk := resources.DivResources(h.s.Config.ComputeUnit) + + if currentOk && targetOk { + report(now, uint32(currentCU), uint32(targetCU)) + } + } + // FIXME: add time to ongoing request info (or maybe only in RequestFailed?) h.s.NeonVM.OngoingRequested = &resources } diff --git a/pkg/agent/core/state_test.go b/pkg/agent/core/state_test.go index 5241ce628..67d9600ec 100644 --- a/pkg/agent/core/state_test.go +++ b/pkg/agent/core/state_test.go @@ -263,9 +263,11 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { }, RevisionSource: revsource.NewRevisionSource(0, nil), ObservabilityCallbacks: core.ObservabilityCallbacks{ - PluginLatency: nil, - MonitorLatency: nil, - NeonVMLatency: nil, + PluginLatency: nil, + MonitorLatency: nil, + NeonVMLatency: nil, + ActualScaling: nil, + HypotheticalScaling: nil, }, } } @@ -351,9 +353,11 @@ var DefaultInitialStateConfig = helpers.InitialStateConfig{ }, RevisionSource: &helpers.NilRevisionSource{}, ObservabilityCallbacks: core.ObservabilityCallbacks{ - PluginLatency: nil, - MonitorLatency: nil, - NeonVMLatency: nil, + PluginLatency: nil, + MonitorLatency: nil, + NeonVMLatency: nil, + ActualScaling: nil, + HypotheticalScaling: nil, }, }, } diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index 38e07cec2..ae315161f 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -11,6 +11,7 @@ import ( vmclient "github.com/neondatabase/autoscaling/neonvm/client/clientset/versioned" "github.com/neondatabase/autoscaling/pkg/agent/billing" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/agent/schedwatch" "github.com/neondatabase/autoscaling/pkg/util" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" @@ -51,7 +52,13 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { } defer schedTracker.Stop() - globalState, globalPromReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, schedTracker) + scalingEventsMetrics := scalingevents.NewPromMetrics() + scalingReporter, err := scalingevents.NewReporter(ctx, logger, &r.Config.ScalingEvents, scalingEventsMetrics) + if err != nil { + return fmt.Errorf("Error creating scaling events reporter: %w", err) + } + + globalState, globalPromReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, schedTracker, scalingReporter) watchMetrics.MustRegister(globalPromReg) logger.Info("Starting billing metrics collector") diff --git a/pkg/agent/globalstate.go b/pkg/agent/globalstate.go index f15c1227b..748c0f871 100644 --- a/pkg/agent/globalstate.go +++ b/pkg/agent/globalstate.go @@ -17,6 +17,7 @@ import ( vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" vmclient "github.com/neondatabase/autoscaling/neonvm/client/clientset/versioned" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/agent/schedwatch" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" @@ -40,12 +41,15 @@ type agentState struct { vmClient *vmclient.Clientset schedTracker *schedwatch.SchedulerTracker metrics GlobalMetrics + + scalingReporter *scalingevents.Reporter } func (r MainRunner) newAgentState( baseLogger *zap.Logger, podIP string, schedTracker *schedwatch.SchedulerTracker, + scalingReporter *scalingevents.Reporter, ) (*agentState, *prometheus.Registry) { metrics, promReg := makeGlobalMetrics() @@ -59,6 +63,8 @@ func (r MainRunner) newAgentState( podIP: podIP, schedTracker: schedTracker, metrics: metrics, + + scalingReporter: scalingReporter, } return state, promReg diff --git a/pkg/agent/runner.go b/pkg/agent/runner.go index b198d17c0..861b786f6 100644 --- a/pkg/agent/runner.go +++ b/pkg/agent/runner.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "runtime/debug" "strconv" @@ -36,6 +37,7 @@ import ( "github.com/neondatabase/autoscaling/pkg/agent/core" "github.com/neondatabase/autoscaling/pkg/agent/core/revsource" "github.com/neondatabase/autoscaling/pkg/agent/executor" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/agent/schedwatch" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" @@ -195,6 +197,8 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util if vmInfo.CurrentRevision != nil { initialRevision = vmInfo.CurrentRevision.Value } + // "dsrl" stands for "desired scaling report limiter" -- helper to avoid spamming events. + dsrl := &desiredScalingReportLimiter{lastEvent: nil} revisionSource := revsource.NewRevisionSource(initialRevision, WrapHistogramVec(&r.global.metrics.scalingLatency)) executorCore := executor.NewExecutorCore(coreExecLogger, vmInfo, executor.Config{ OnNextActions: r.global.metrics.runnerNextActions.Inc, @@ -217,6 +221,14 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util PluginLatency: WrapHistogramVec(&r.global.metrics.pluginLatency), MonitorLatency: WrapHistogramVec(&r.global.metrics.monitorLatency), NeonVMLatency: WrapHistogramVec(&r.global.metrics.neonvmLatency), + ActualScaling: r.reportScalingEvent, + HypotheticalScaling: func(ts time.Time, current, target uint32, parts core.ScalingGoalParts) { + r.reportDesiredScaling(dsrl, ts, current, target, scalingevents.GoalCUComponents{ + CPU: parts.CPU, + Mem: parts.Mem, + LFC: parts.LFC, + }) + }, }, }, }) @@ -322,6 +334,78 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util } } +func (r *Runner) reportScalingEvent(timestamp time.Time, currentCU, targetCU uint32) { + endpointID := func() string { + return r.status.endpointID + }() + + reporter := r.global.scalingReporter + reporter.Submit(reporter.NewActualEvent( + timestamp, + endpointID, + currentCU, + targetCU, + )) +} + +func (r *Runner) reportDesiredScaling( + rl *desiredScalingReportLimiter, + timestamp time.Time, + currentCU uint32, + targetCU uint32, + parts scalingevents.GoalCUComponents, +) { + endpointID := func() string { + return r.status.endpointID + }() + + // TODO: Use this opportunity to report the desired scaling in the per-VM + // metrics. + + rl.report(r.global.scalingReporter, r.global.scalingReporter.NewHypotheticalEvent( + timestamp, + endpointID, + currentCU, + targetCU, + parts, + )) +} + +type desiredScalingReportLimiter struct { + lastEvent *scalingevents.ScalingEvent +} + +func (rl *desiredScalingReportLimiter) report( + reporter *scalingevents.Reporter, + event scalingevents.ScalingEvent, +) { + closeEnough := func(x *float64, y *float64) bool { + if (x != nil) != (y != nil) { + return false + } + if x == nil /* && y == nil */ { + return true + } + // true iff x and y are within the threshold of each other + return math.Abs(*x-*y) < 0.25 + } + + // Check if we should skip this time. + if rl.lastEvent != nil { + skip := rl.lastEvent.TargetMilliCU == event.TargetMilliCU && + closeEnough(rl.lastEvent.GoalComponents.CPU, event.GoalComponents.CPU) && + closeEnough(rl.lastEvent.GoalComponents.Mem, event.GoalComponents.Mem) && + closeEnough(rl.lastEvent.GoalComponents.LFC, event.GoalComponents.LFC) + if skip { + return + } + } + + // Not skipping. + rl.lastEvent = &event + reporter.Submit(event) +} + ////////////////////// // Background tasks // ////////////////////// diff --git a/pkg/agent/scalingevents/clients.go b/pkg/agent/scalingevents/clients.go new file mode 100644 index 000000000..357967d4d --- /dev/null +++ b/pkg/agent/scalingevents/clients.go @@ -0,0 +1,66 @@ +package scalingevents + +import ( + "context" + "fmt" + "time" + + "github.com/lithammer/shortuuid" + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/reporting" +) + +type ClientsConfig struct { + S3 *S3ClientConfig `json:"s3"` +} + +type S3ClientConfig struct { + reporting.BaseClientConfig + reporting.S3ClientConfig + PrefixInBucket string `json:"prefixInBucket"` +} + +type eventsClient = reporting.Client[ScalingEvent] + +func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) ([]eventsClient, error) { + var clients []eventsClient + + if c := cfg.S3; c != nil { + generateKey := newBlobStorageKeyGenerator(c.PrefixInBucket) + client, err := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey) + if err != nil { + return nil, fmt.Errorf("error creating S3 client: %w", err) + } + logger.Info("Created S3 client for scaling events", zap.Any("config", c)) + + clients = append(clients, eventsClient{ + Name: "s3", + Base: client, + BaseConfig: c.BaseClientConfig, + SerializeBatch: reporting.WrapSerialize[ScalingEvent](reporting.GZIPCompress, reporting.JSONLinesMarshalBatch), + }) + } + + return clients, nil +} + +// Returns a function to generate keys for the placement of scaling events data into blob storage. +// +// Example: prefix/2024/10/31/23/events_{uuid}.ndjson.gz (11pm on halloween, UTC) +// +// NOTE: This key format is different from the one we use for billing, but similar to the one proxy +// uses for its reporting. +func newBlobStorageKeyGenerator(prefix string) func() string { + return func() string { + now := time.Now().UTC() + id := shortuuid.New() + + return fmt.Sprintf( + "%s/%d/%02d/%02d/%02d/events_%s.ndjson.gz", + prefix, + now.Year(), now.Month(), now.Day(), now.Hour(), + id, + ) + } +} diff --git a/pkg/agent/scalingevents/prommetrics.go b/pkg/agent/scalingevents/prommetrics.go new file mode 100644 index 000000000..705cafa97 --- /dev/null +++ b/pkg/agent/scalingevents/prommetrics.go @@ -0,0 +1,43 @@ +package scalingevents + +// Prometheus metrics for the agent's scaling event reporting subsystem + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/neondatabase/autoscaling/pkg/reporting" +) + +type PromMetrics struct { + reporting *reporting.EventSinkMetrics + totalCount *prometheus.GaugeVec +} + +func NewPromMetrics() PromMetrics { + return PromMetrics{ + reporting: reporting.NewEventSinkMetrics("autoscaling_agent_events"), + totalCount: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "autoscaling_agent_scaling_events_total", + Help: "Total number of scaling events generated", + }, + []string{"kind"}, + ), + } +} + +func (m PromMetrics) MustRegister(reg *prometheus.Registry) { + m.reporting.MustRegister(reg) + reg.MustRegister(m.totalCount) +} + +func (m PromMetrics) recordSubmitted(event ScalingEvent) { + var eventKind string + switch event.Kind { + case scalingEventActual, scalingEventHypothetical: + eventKind = string(event.Kind) + default: + eventKind = "unknown" + } + m.totalCount.WithLabelValues(eventKind).Inc() +} diff --git a/pkg/agent/scalingevents/reporter.go b/pkg/agent/scalingevents/reporter.go new file mode 100644 index 000000000..cd9165f01 --- /dev/null +++ b/pkg/agent/scalingevents/reporter.go @@ -0,0 +1,141 @@ +package scalingevents + +import ( + "context" + "math" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/reporting" +) + +type Config struct { + // CUMultiplier sets the ratio between our internal compute unit and the one that should be + // reported. + // + // This exists because Neon allows fractional compute units, while the autoscaler-agent acts on + // integer multiples of a smaller compute unit. + CUMultiplier float64 `json:"cuMultiplier"` + + // RereportThreshold sets the minimum amount of change in desired compute units required for us to + // re-report the desired scaling. + RereportThreshold float64 `json:"rereportThreshold"` + + // RegionName is the name of the region that the reporting autoscaler-agent is in. + RegionName string `json:"regionName"` + + Clients ClientsConfig `json:"clients"` +} + +type Reporter struct { + conf *Config + sink *reporting.EventSink[ScalingEvent] + metrics PromMetrics +} + +type ScalingEvent struct { + Timestamp time.Time `json:"timestamp"` + Region string `json:"region"` + EndpointID string `json:"endpoint_id"` + Kind scalingEventKind `json:"kind"` + CurrentMilliCU uint32 `json:"current_cu"` + TargetMilliCU uint32 `json:"target_cu"` + GoalComponents *GoalCUComponents `json:"goalComponents,omitempty"` +} + +type GoalCUComponents struct { + CPU *float64 `json:"cpu,omitempty"` + Mem *float64 `json:"mem,omitempty"` + LFC *float64 `json:"lfc,omitempty"` +} + +type scalingEventKind string + +const ( + scalingEventActual = "actual" + scalingEventHypothetical = "hypothetical" +) + +func NewReporter( + ctx context.Context, + parentLogger *zap.Logger, + conf *Config, + metrics PromMetrics, +) (*Reporter, error) { + logger := parentLogger.Named("scalingevents") + + clients, err := createClients(ctx, logger, conf.Clients) + if err != nil { + return nil, err + } + + sink := reporting.NewEventSink(logger, metrics.reporting, clients...) + + return &Reporter{ + conf: conf, + sink: sink, + metrics: metrics, + }, nil +} + +// Submit adds the ScalingEvent to the sender queue(s), returning without waiting for it to be sent. +func (r *Reporter) Submit(event ScalingEvent) { + r.metrics.recordSubmitted(event) + r.sink.Enqueue(event) +} + +func convertToMilliCU(cu uint32, multiplier float64) uint32 { + return uint32(math.Round(1000 * float64(cu) * multiplier)) +} + +// NewActualEvent is a helper function to create a ScalingEvent for actual scaling that has +// occurred. +// +// This method also handles compute unit translation. +func (r *Reporter) NewActualEvent( + timestamp time.Time, + endpointID string, + currentCU uint32, + targetCU uint32, +) ScalingEvent { + return ScalingEvent{ + Timestamp: timestamp, + Region: r.conf.RegionName, + EndpointID: endpointID, + Kind: scalingEventActual, + CurrentMilliCU: convertToMilliCU(currentCU, r.conf.CUMultiplier), + TargetMilliCU: convertToMilliCU(targetCU, r.conf.CUMultiplier), + GoalComponents: nil, + } +} + +func (r *Reporter) NewHypotheticalEvent( + timestamp time.Time, + endpointID string, + currentCU uint32, + targetCU uint32, + goalCUs GoalCUComponents, +) ScalingEvent { + convertFloat := func(cu *float64) *float64 { + if cu != nil { + return lo.ToPtr(*cu * r.conf.CUMultiplier) + } + return nil + } + + return ScalingEvent{ + Timestamp: timestamp, + Region: r.conf.RegionName, + EndpointID: endpointID, + Kind: scalingEventHypothetical, + CurrentMilliCU: convertToMilliCU(currentCU, r.conf.CUMultiplier), + TargetMilliCU: convertToMilliCU(targetCU, r.conf.CUMultiplier), + GoalComponents: &GoalCUComponents{ + CPU: convertFloat(goalCUs.CPU), + Mem: convertFloat(goalCUs.Mem), + LFC: convertFloat(goalCUs.LFC), + }, + } +} diff --git a/pkg/api/types.go b/pkg/api/types.go index 61b53a8c7..e1842010a 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -381,6 +381,23 @@ func (r Resources) Mul(factor uint16) Resources { } } +// DivResources divides the resources by the smaller amount, returning the uint16 value such that +// other.Mul(factor) is equal to the original resources. +// +// If r is not an integer multiple of other, then (0, false) will be returned. +func (r Resources) DivResources(other Resources) (uint16, bool) { + cpuFactor := uint16(r.VCPU / other.VCPU) + cpuOk := r.VCPU%other.VCPU == 0 + memFactor := uint16(r.Mem / other.Mem) + memOk := r.Mem%other.Mem == 0 + + if !cpuOk || !memOk || cpuFactor != memFactor { + return 0, false + } + + return cpuFactor, true // already known equal to memFactor +} + // AbsDiff returns a new Resources with each field F as the absolute value of the difference between // r.F and cmp.F func (r Resources) AbsDiff(cmp Resources) Resources {