From 3ac5841c59423f1f25838ca8f9fc93a98d49d14b Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 7 Jun 2024 22:19:47 +0400 Subject: [PATCH] neonvm-controller: replace failing reconciliation with per-VM failure interval (#949) The old method had frequent false positive, because there might be a lot of intermittent failures, but overall the system does progress, and every particular VM is getting reconciled. --- neonvm/config/controller/deployment.yaml | 2 + neonvm/controllers/config.go | 10 ++ neonvm/controllers/failurelag/tracker.go | 108 ++++++++++++++++ neonvm/controllers/failurelag/tracker_test.go | 110 ++++++++++++++++ neonvm/controllers/metrics.go | 120 ++++++++++++------ neonvm/controllers/vm_controller.go | 8 +- neonvm/controllers/vm_controller_test.go | 2 + neonvm/controllers/vmmigration_controller.go | 8 +- neonvm/main.go | 13 ++ 9 files changed, 343 insertions(+), 38 deletions(-) create mode 100644 neonvm/controllers/failurelag/tracker.go create mode 100644 neonvm/controllers/failurelag/tracker_test.go diff --git a/neonvm/config/controller/deployment.yaml b/neonvm/config/controller/deployment.yaml index 60b0ad2d4..d51d7edee 100644 --- a/neonvm/config/controller/deployment.yaml +++ b/neonvm/config/controller/deployment.yaml @@ -59,6 +59,8 @@ spec: # * cache.direct=on - use O_DIRECT (don't abuse host's page cache!) # * cache.no-flush=on - ignores disk flush operations (not needed; our disks are ephemeral) - "--qemu-disk-cache-settings=cache.writeback=on,cache.direct=on,cache.no-flush=on" + - "--failure-pending-period=1m" + - "--failing-refresh-interval=15s" env: - name: NAD_IPAM_NAME value: $(NAD_IPAM_NAME) diff --git a/neonvm/controllers/config.go b/neonvm/controllers/config.go index c6f462ac7..5c25c6d63 100644 --- a/neonvm/controllers/config.go +++ b/neonvm/controllers/config.go @@ -1,5 +1,7 @@ package controllers +import "time" + // ReconcilerConfig stores shared configuration for VirtualMachineReconciler and // VirtualMachineMigrationReconciler. type ReconcilerConfig struct { @@ -22,6 +24,14 @@ type ReconcilerConfig struct { // This field is passed to neonvm-runner as the `-qemu-disk-cache-settings` arg, and is directly // used in setting up the VM disks via QEMU's `-drive` flag. QEMUDiskCacheSettings string + + // FailurePendingPeriod is the period for the propagation of + // reconciliation failures to the observability instruments + FailurePendingPeriod time.Duration + + // FailingRefreshInterval is the interval between consecutive + // updates of metrics and logs, related to failing reconciliations + FailingRefreshInterval time.Duration } func (c *ReconcilerConfig) criEndpointSocketPath() string { diff --git a/neonvm/controllers/failurelag/tracker.go b/neonvm/controllers/failurelag/tracker.go new file mode 100644 index 000000000..b69afd459 --- /dev/null +++ b/neonvm/controllers/failurelag/tracker.go @@ -0,0 +1,108 @@ +package failurelag + +import ( + "sync" + "time" +) + +// Tracker accumulates failure events for a given key and determines if +// the key is degraded. The key becomes degraded if it receives only failures +// over a configurable pending period. Once the success event is received, the key +// is no longer considered degraded, and the pending period is reset. +type Tracker[T comparable] struct { + period time.Duration + + pendingSince map[T]time.Time + degraded map[T]struct{} + degradeAt []degradeAt[T] + + lock sync.Mutex + Now func() time.Time +} + +type degradeAt[T comparable] struct { + ts time.Time + key T +} + +func NewTracker[T comparable](period time.Duration) *Tracker[T] { + return &Tracker[T]{ + period: period, + pendingSince: make(map[T]time.Time), + degraded: make(map[T]struct{}), + degradeAt: []degradeAt[T]{}, + lock: sync.Mutex{}, + Now: time.Now, + } +} + +// forward processes all the fireAt events that are now in the past. +func (t *Tracker[T]) forward(now time.Time) { + i := 0 + for ; i < len(t.degradeAt); i++ { + event := t.degradeAt[i] + if event.ts.After(now) { + break + } + pendingSince, ok := t.pendingSince[event.key] + if !ok { + // There was a success event in between + continue + } + + if event.ts.Sub(pendingSince) < t.period { + // There was a success, and another failure in between + // We will have another fireAt event for this key in the future + continue + } + t.degraded[event.key] = struct{}{} + } + t.degradeAt = t.degradeAt[i:] +} + +func (t *Tracker[T]) RecordSuccess(key T) { + t.lock.Lock() + defer t.lock.Unlock() + + delete(t.degraded, key) + delete(t.pendingSince, key) + t.forward(t.Now()) +} + +func (t *Tracker[T]) RecordFailure(key T) { + t.lock.Lock() + defer t.lock.Unlock() + + now := t.Now() + + if _, ok := t.pendingSince[key]; !ok { + t.pendingSince[key] = now + } + + t.degradeAt = append(t.degradeAt, degradeAt[T]{ + ts: now.Add(t.period), + key: key, + }) + + t.forward(now) +} + +func (t *Tracker[T]) DegradedCount() int { + t.lock.Lock() + defer t.lock.Unlock() + + t.forward(t.Now()) + return len(t.degraded) +} + +func (t *Tracker[T]) Degraded() []T { + t.lock.Lock() + defer t.lock.Unlock() + + t.forward(t.Now()) + keys := make([]T, 0, len(t.degraded)) + for k := range t.degraded { + keys = append(keys, k) + } + return keys +} diff --git a/neonvm/controllers/failurelag/tracker_test.go b/neonvm/controllers/failurelag/tracker_test.go new file mode 100644 index 000000000..f191f53f9 --- /dev/null +++ b/neonvm/controllers/failurelag/tracker_test.go @@ -0,0 +1,110 @@ +package failurelag_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/neondatabase/autoscaling/neonvm/controllers/failurelag" +) + +type nowMock struct { + ts time.Time +} + +func (n *nowMock) Now() time.Time { + return n.ts +} + +func (n *nowMock) Add(d time.Duration) { + n.ts = n.ts.Add(d) +} + +func newNowMock() *nowMock { + ts, _ := time.Parse("2006-01-02", "2024-01-01") + return &nowMock{ts: ts} +} + +func TestTracker(t *testing.T) { + now := newNowMock() + tracker := failurelag.NewTracker[string](10 * time.Minute) + tracker.Now = now.Now + + // Alert fires after 15 minutes + tracker.RecordFailure("key1") + assert.Equal(t, tracker.DegradedCount(), 0) + now.Add(15 * time.Minute) + assert.Equal(t, tracker.DegradedCount(), 1) + + // Alert no longer fires + tracker.RecordSuccess("key1") + assert.Equal(t, tracker.DegradedCount(), 0) +} + +func TestFailureSuccess(t *testing.T) { + now := newNowMock() + tracker := failurelag.NewTracker[string](10 * time.Minute) + tracker.Now = now.Now + + // Alert doesn't fire if there was a success in the interval + tracker.RecordFailure("key1") + + now.Add(5 * time.Minute) + tracker.RecordSuccess("key1") + + now.Add(10 * time.Minute) + assert.Equal(t, tracker.DegradedCount(), 0) +} + +func TestFailureSuccessFailure(t *testing.T) { + now := newNowMock() + tracker := failurelag.NewTracker[string](10 * time.Minute) + tracker.Now = now.Now + + // Alert doesn't fire if there was success + failure in the interval + tracker.RecordFailure("key1") + + now.Add(5 * time.Minute) + tracker.RecordSuccess("key1") + + now.Add(1 * time.Minute) + tracker.RecordFailure("key1") + + now.Add(5 * time.Minute) + assert.Equal(t, tracker.DegradedCount(), 0) + + // But after 7 more minutes it does + now.Add(7 * time.Minute) + assert.Equal(t, tracker.DegradedCount(), 1) +} + +func TestMultipleKeys(t *testing.T) { + now := newNowMock() + tracker := failurelag.NewTracker[string](10 * time.Minute) + tracker.Now = now.Now + + // A combination of TestFailureSuccess and TestFailureSuccessFailure + tracker.RecordFailure("key1") + tracker.RecordFailure("key2") + + now.Add(5 * time.Minute) + tracker.RecordSuccess("key1") + tracker.RecordSuccess("key2") + + now.Add(1 * time.Minute) + tracker.RecordFailure("key1") + + now.Add(5 * time.Minute) + assert.Equal(t, tracker.DegradedCount(), 0) + + now.Add(7 * time.Minute) + assert.Equal(t, tracker.DegradedCount(), 1) + assert.Equal(t, tracker.Degraded(), []string{"key1"}) + + tracker.RecordFailure("key2") + now.Add(15 * time.Minute) + assert.Equal(t, tracker.DegradedCount(), 2) + assert.Contains(t, tracker.Degraded(), "key1") + assert.Contains(t, tracker.Degraded(), "key2") +} diff --git a/neonvm/controllers/metrics.go b/neonvm/controllers/metrics.go index 3e25e0845..16ea5ce6b 100644 --- a/neonvm/controllers/metrics.go +++ b/neonvm/controllers/metrics.go @@ -2,9 +2,10 @@ package controllers import ( "context" - "sync" + "fmt" "time" + "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -14,6 +15,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" + "github.com/neondatabase/autoscaling/neonvm/controllers/failurelag" "github.com/neondatabase/autoscaling/pkg/util" ) @@ -96,13 +98,13 @@ func (m ReconcilerMetrics) ObserveReconcileDuration( } type wrappedReconciler struct { - ControllerName string - Reconciler reconcile.Reconciler - Metrics ReconcilerMetrics + ControllerName string + Reconciler reconcile.Reconciler + Metrics ReconcilerMetrics + refreshFailingInterval time.Duration - lock sync.Mutex - failing map[client.ObjectKey]struct{} - conflicting map[client.ObjectKey]struct{} + failing *failurelag.Tracker[client.ObjectKey] + conflicting *failurelag.Tracker[client.ObjectKey] } // ReconcilerWithMetrics is a Reconciler produced by WithMetrics that can return a snapshot of the @@ -111,6 +113,7 @@ type ReconcilerWithMetrics interface { reconcile.Reconciler Snapshot() ReconcileSnapshot + FailingRefresher() FailingRefresher } // ReconcileSnapshot provides a glimpse into the current state of ongoing reconciles @@ -135,17 +138,73 @@ type ReconcileSnapshot struct { // // The returned reconciler also provides a way to get a snapshot of the state of ongoing reconciles, // to see the data backing the metrics. -func WithMetrics(reconciler reconcile.Reconciler, rm ReconcilerMetrics, cntrlName string) ReconcilerWithMetrics { +func WithMetrics( + reconciler reconcile.Reconciler, + rm ReconcilerMetrics, + cntrlName string, + failurePendingPeriod time.Duration, + refreshFailingInterval time.Duration, +) ReconcilerWithMetrics { return &wrappedReconciler{ - Reconciler: reconciler, - Metrics: rm, - ControllerName: cntrlName, - lock: sync.Mutex{}, - failing: make(map[client.ObjectKey]struct{}), - conflicting: make(map[client.ObjectKey]struct{}), + Reconciler: reconciler, + Metrics: rm, + ControllerName: cntrlName, + failing: failurelag.NewTracker[client.ObjectKey](failurePendingPeriod), + conflicting: failurelag.NewTracker[client.ObjectKey](failurePendingPeriod), + refreshFailingInterval: refreshFailingInterval, } } +func (d *wrappedReconciler) refreshFailing( + log logr.Logger, + outcome ReconcileOutcome, + tracker *failurelag.Tracker[client.ObjectKey], +) { + degraded := tracker.Degraded() + d.Metrics.failing.WithLabelValues(d.ControllerName, string(outcome)). + Set(float64(len(degraded))) + + // Log each object on a separate line (even though we could just put them all on the same line) + // so that: + // 1. we avoid super long log lines (which can make log storage / querying unhappy), and + // 2. so that we can process it with Grafana Loki, which can't handle arrays + for _, obj := range degraded { + log.Info( + fmt.Sprintf("Currently failing to reconcile %v object", d.ControllerName), + "outcome", outcome, + "object", obj, + ) + } +} + +func (d *wrappedReconciler) runRefreshFailing(ctx context.Context) { + log := log.FromContext(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(d.refreshFailingInterval): + d.refreshFailing(log, FailureOutcome, d.failing) + d.refreshFailing(log, ConflictOutcome, d.conflicting) + } + } +} + +func (d *wrappedReconciler) FailingRefresher() FailingRefresher { + return FailingRefresher{r: d} +} + +// FailingRefresher is a wrapper, which implements manager.Runnable +type FailingRefresher struct { + r *wrappedReconciler +} + +func (f FailingRefresher) Start(ctx context.Context) error { + go f.r.runRefreshFailing(ctx) + return nil +} + func (d *wrappedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := log.FromContext(ctx) @@ -153,21 +212,14 @@ func (d *wrappedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct res, err := d.Reconciler.Reconcile(ctx, req) duration := time.Since(now) - // This part is executed sequentially since we acquire a mutex lock. It - // should be quite fast since a mutex lock/unlock + 2 memory writes takes less - // than 100ns. I (@shayanh) preferred to go with the simplest implementation - // as of now. For a more performant solution, if needed, we can switch to an - // async approach. - d.lock.Lock() - defer d.lock.Unlock() outcome := SuccessOutcome if err != nil { if errors.IsConflict(err) { outcome = ConflictOutcome - d.conflicting[req.NamespacedName] = struct{}{} + d.conflicting.RecordFailure(req.NamespacedName) } else { outcome = FailureOutcome - d.failing[req.NamespacedName] = struct{}{} + d.failing.RecordFailure(req.NamespacedName) // If the VM is now getting non-conflict errors, it probably // means transient conflicts has been resolved. @@ -176,39 +228,35 @@ func (d *wrappedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // if a VM is getting conflict errors, it doesn't mean // non-conflict errors are resolved, as they are more // likely to be persistent. - delete(d.conflicting, req.NamespacedName) + d.conflicting.RecordSuccess(req.NamespacedName) } log.Error(err, "Failed to reconcile VirtualMachine", "duration", duration.String(), "outcome", outcome) } else { - delete(d.failing, req.NamespacedName) - delete(d.conflicting, req.NamespacedName) + d.failing.RecordSuccess(req.NamespacedName) log.Info("Successful reconciliation", "duration", duration.String()) } d.Metrics.ObserveReconcileDuration(outcome, duration) d.Metrics.failing.WithLabelValues(d.ControllerName, - string(FailureOutcome)).Set(float64(len(d.failing))) + string(FailureOutcome)).Set(float64(d.failing.DegradedCount())) d.Metrics.failing.WithLabelValues(d.ControllerName, - string(ConflictOutcome)).Set(float64(len(d.conflicting))) + string(ConflictOutcome)).Set(float64(d.conflicting.DegradedCount())) return res, err } -func keysToSlice(m map[client.ObjectKey]struct{}) []string { - keys := make([]string, 0, len(m)) - for k := range m { +func toStringSlice(s []client.ObjectKey) []string { + keys := make([]string, 0, len(s)) + for _, k := range s { keys = append(keys, k.String()) } return keys } func (r *wrappedReconciler) Snapshot() ReconcileSnapshot { - r.lock.Lock() - defer r.lock.Unlock() - - failing := keysToSlice(r.failing) - conflicting := keysToSlice(r.conflicting) + failing := toStringSlice(r.failing.Degraded()) + conflicting := toStringSlice(r.conflicting.Degraded()) return ReconcileSnapshot{ ControllerName: r.ControllerName, diff --git a/neonvm/controllers/vm_controller.go b/neonvm/controllers/vm_controller.go index b0ff86077..e9ebfcf24 100644 --- a/neonvm/controllers/vm_controller.go +++ b/neonvm/controllers/vm_controller.go @@ -1711,7 +1711,13 @@ func podSpec(vm *vmv1.VirtualMachine, sshSecret *corev1.Secret, config *Reconcil // desirable state on the cluster func (r *VMReconciler) SetupWithManager(mgr ctrl.Manager) (ReconcilerWithMetrics, error) { cntrlName := "virtualmachine" - reconciler := WithMetrics(withCatchPanic(r), r.Metrics, cntrlName) + reconciler := WithMetrics( + withCatchPanic(r), + r.Metrics, + cntrlName, + r.Config.FailurePendingPeriod, + r.Config.FailingRefreshInterval, + ) err := ctrl.NewControllerManagedBy(mgr). For(&vmv1.VirtualMachine{}). Owns(&corev1.Pod{}). diff --git a/neonvm/controllers/vm_controller_test.go b/neonvm/controllers/vm_controller_test.go index 94d58a04d..5d26157e5 100644 --- a/neonvm/controllers/vm_controller_test.go +++ b/neonvm/controllers/vm_controller_test.go @@ -102,6 +102,8 @@ var _ = Describe("VirtualMachine controller", func() { UseContainerMgr: true, MaxConcurrentReconciles: 1, QEMUDiskCacheSettings: "cache=none", + FailurePendingPeriod: 1 * time.Minute, + FailingRefreshInterval: 1 * time.Minute, }, } diff --git a/neonvm/controllers/vmmigration_controller.go b/neonvm/controllers/vmmigration_controller.go index becb718f7..6ecb97760 100644 --- a/neonvm/controllers/vmmigration_controller.go +++ b/neonvm/controllers/vmmigration_controller.go @@ -670,7 +670,13 @@ func (r *VirtualMachineMigrationReconciler) doFinalizerOperationsForVirtualMachi // desirable state on the cluster func (r *VirtualMachineMigrationReconciler) SetupWithManager(mgr ctrl.Manager) (ReconcilerWithMetrics, error) { cntrlName := "virtualmachinemigration" - reconciler := WithMetrics(withCatchPanic(r), r.Metrics, cntrlName) + reconciler := WithMetrics( + withCatchPanic(r), + r.Metrics, + cntrlName, + r.Config.FailurePendingPeriod, + r.Config.FailingRefreshInterval, + ) err := ctrl.NewControllerManagedBy(mgr). For(&vmv1.VirtualMachineMigration{}). Owns(&corev1.Pod{}). diff --git a/neonvm/main.go b/neonvm/main.go index c0fdd4650..3e11e42f5 100644 --- a/neonvm/main.go +++ b/neonvm/main.go @@ -98,6 +98,8 @@ func main() { var concurrencyLimit int var enableContainerMgr bool var qemuDiskCacheSettings string + var failurePendingPeriod time.Duration + var failingRefreshInterval time.Duration flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, @@ -106,6 +108,10 @@ func main() { flag.IntVar(&concurrencyLimit, "concurrency-limit", 1, "Maximum number of concurrent reconcile operations") flag.BoolVar(&enableContainerMgr, "enable-container-mgr", false, "Enable crictl-based container-mgr alongside each VM") flag.StringVar(&qemuDiskCacheSettings, "qemu-disk-cache-settings", "cache=none", "Set neonvm-runner's QEMU disk cache settings") + flag.DurationVar(&failurePendingPeriod, "failure-pending-period", 1*time.Minute, + "the period for the propagation of reconciliation failures to the observability instruments") + flag.DurationVar(&failingRefreshInterval, "failing-refresh-interval", 1*time.Minute, + "the interval between consecutive updates of metrics and logs, related to failing reconciliations") flag.Parse() logConfig := zap.NewProductionConfig() @@ -173,6 +179,8 @@ func main() { UseContainerMgr: enableContainerMgr, MaxConcurrentReconciles: concurrencyLimit, QEMUDiskCacheSettings: qemuDiskCacheSettings, + FailurePendingPeriod: failurePendingPeriod, + FailingRefreshInterval: failingRefreshInterval, } vmReconciler := &controllers.VMReconciler{ @@ -225,6 +233,11 @@ func main() { os.Exit(1) } + if err := mgr.Add(vmReconcilerMetrics.FailingRefresher()); err != nil { + setupLog.Error(err, "unable to set up failing refresher") + os.Exit(1) + } + if err := run(mgr); err != nil { setupLog.Error(err, "run manager error") os.Exit(1)