diff --git a/neonvm-controller/cmd/main.go b/neonvm-controller/cmd/main.go index d51658334..bda99f904 100644 --- a/neonvm-controller/cmd/main.go +++ b/neonvm-controller/cmd/main.go @@ -52,6 +52,7 @@ import ( "github.com/neondatabase/autoscaling/pkg/neonvm/controllers" "github.com/neondatabase/autoscaling/pkg/neonvm/ipam" "github.com/neondatabase/autoscaling/pkg/util" + "github.com/neondatabase/autoscaling/pkg/util/taskgroup" ) var ( @@ -153,7 +154,8 @@ func main() { logConfig.Sampling = nil // Disabling sampling; it's enabled by default for zap's production configs. logConfig.Level.SetLevel(zap.InfoLevel) logConfig.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - logger := zapr.NewLogger(zap.Must(logConfig.Build(zap.AddStacktrace(zapcore.PanicLevel)))) + zapLogger := zap.Must(logConfig.Build(zap.AddStacktrace(zapcore.PanicLevel))) + logger := zapr.NewLogger(zapLogger) ctrl.SetLogger(logger) // define klog settings (used in LeaderElector) @@ -271,12 +273,16 @@ func main() { setupLog.Error(err, "unable to set up failing refresher") panic(err) } + tg := taskgroup.NewGroup(zapLogger) + tg.Go("run manager", func(logger *zap.Logger) error { + return run(mgr) + }) + tg.Go("run ipam cleanup", func(logger *zap.Logger) error { + return ipam.RunCleanup(context.Background(), "default") + }) - // NOTE: THE CONTROLLER MUST IMMEDIATELY EXIT AFTER RUNNING THE MANAGER. - if err := run(mgr); err != nil { - setupLog.Error(err, "run manager error") - panic(err) - } + err = tg.Wait() + panic(err) } func debugServerFunc(reconcilers ...controllers.ReconcilerWithMetrics) manager.RunnableFunc { diff --git a/pkg/neonvm/ipam/allocate.go b/pkg/neonvm/ipam/allocate.go index 41693e59d..a36a43ce7 100644 --- a/pkg/neonvm/ipam/allocate.go +++ b/pkg/neonvm/ipam/allocate.go @@ -28,6 +28,12 @@ func getReleaseAction(vmName types.NamespacedName) ipamAction { } } +func getCleanupAction(vms map[string]bool) ipamAction { + return func(ipRange RangeConfiguration, reservation []whereaboutstypes.IPReservation) (net.IPNet, []whereaboutstypes.IPReservation, error) { + return doCleanup(ipRange, reservation, vms) + } +} + func doAcquire( ipRange RangeConfiguration, reservation []whereaboutstypes.IPReservation, @@ -76,6 +82,21 @@ func doRelease( return net.IPNet{IP: ip, Mask: ipnet.Mask}, newReservation, nil } +func doCleanup( + _ RangeConfiguration, + reservations []whereaboutstypes.IPReservation, + vms map[string]bool, +) (net.IPNet, []whereaboutstypes.IPReservation, error) { + var result []whereaboutstypes.IPReservation + for _, r := range reservations { + if vms[r.ContainerID] { + result = append(result, r) + } + } + var ip net.IPNet + return ip, result, nil +} + func getMatchingIPReservationIndex(reservation []whereaboutstypes.IPReservation, id string) int { foundidx := -1 for idx, v := range reservation { diff --git a/pkg/neonvm/ipam/ipam.go b/pkg/neonvm/ipam/ipam.go index 06c8f1c18..65b33bce2 100644 --- a/pkg/neonvm/ipam/ipam.go +++ b/pkg/neonvm/ipam/ipam.go @@ -41,6 +41,8 @@ const ( DefaultLeaderLeaseDurationMs = 3000 DefaultLeaderRenewDeadlineMs = 2500 DefaultLeaderRetryPeriodMs = 2000 + + CleanupInterval = 15 * time.Minute ) type Temporary interface { @@ -70,6 +72,29 @@ func (i *IPAM) ReleaseIP(ctx context.Context, vmName types.NamespacedName) (net. return ip, nil } +func (i *IPAM) RunCleanup(ctx context.Context, namespace string) error { + for { + vms, err := i.vmClient.NeonvmV1().VirtualMachines(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("error listing virtual machines: %w", err) + } + vmsMap := make(map[string]bool) + for _, vm := range vms.Items { + vmsMap[vm.Name] = true + } + _, err = i.runIPAM(ctx, getCleanupAction(vmsMap)) + if err != nil { + return fmt.Errorf("error cleaning up IPAM: %w", err) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(CleanupInterval): + } + } +} + // New returns a new IPAM object with ipam config and k8s/crd clients func New(nadName string, nadNamespace string) (*IPAM, error) { // get Kubernetes client config