From ea8a1be9ba3be313680522ee0cb85284258a0f54 Mon Sep 17 00:00:00 2001 From: Konstanty Karagiorgis Date: Thu, 6 Jun 2024 21:59:17 +0200 Subject: [PATCH] #34: Kubernetes service informer automatically restarts --- docker/go/dev-entrypoint.sh | 5 ++- go.mod | 1 + go.sum | 2 + pkg/api/admin.go | 8 +++- pkg/cmd/root.go | 10 +++-- pkg/cmd/server.go | 2 +- pkg/k8s/svcdetector/repository.go | 68 ++++++++++++++++++++++--------- 7 files changed, 69 insertions(+), 27 deletions(-) diff --git a/docker/go/dev-entrypoint.sh b/docker/go/dev-entrypoint.sh index 26f31fa..f76ee87 100755 --- a/docker/go/dev-entrypoint.sh +++ b/docker/go/dev-entrypoint.sh @@ -3,6 +3,7 @@ # /src should contain files bundled from dockerfile # /src-tmp should have mounted volume, that will allow syncing file from Tilt + # if /src-tmp is empty, copy all files from /src to /src-tmp if [ ! "$(ls -A /src-tmp)" ]; then cp -r /src/* /src-tmp @@ -14,6 +15,8 @@ find . -type f -exec bash -c 'for file; do [ ! -e "/src/$file" ] && rm -f "$file args=$@ cwd=$(pwd) -echo "Starting watchexec on ${cwd}" +echo "Starting watchexec on ${cwd} (first time compilation may take a while)" + +set -x watchexec -n -q -r -e go,mod,sum -- sh -c "while true; do sleep 1 && go run main.go --debug ${args}; done" \ No newline at end of file diff --git a/go.mod b/go.mod index dda1716..ed1d9ad 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22 require ( github.com/avast/retry-go/v4 v4.5.1 + github.com/gin-contrib/pprof v1.5.0 github.com/gin-gonic/gin v1.10.0 github.com/gorilla/mux v1.8.0 github.com/mitchellh/go-ps v1.0.0 diff --git a/go.sum b/go.sum index 747b25f..cbf83e7 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gin-contrib/pprof v1.5.0 h1:E/Oy7g+kNw94KfdCy3bZxQFtyDnAX2V7axRS7sNYVrU= +github.com/gin-contrib/pprof v1.5.0/go.mod h1:GqFL6LerKoCQ/RSWnkYczkTJ+tOAUVN/8sbnEtaqOKs= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= diff --git a/pkg/api/admin.go b/pkg/api/admin.go index b48af50..561d323 100644 --- a/pkg/api/admin.go +++ b/pkg/api/admin.go @@ -3,6 +3,8 @@ package api import ( "github.com/gin-gonic/gin" + + "github.com/gin-contrib/pprof" ) // Controller contains a set of functionalities for the API @@ -11,7 +13,7 @@ type Controller interface { } // NewAdminAPI bootstraps the creation of the gin engine -func NewAdminAPI(controllers []Controller) *gin.Engine { +func NewAdminAPI(controllers []Controller, debug bool) *gin.Engine { r := gin.Default() for _, controller := range controllers { controller.registerRoutes(r) @@ -22,5 +24,9 @@ func NewAdminAPI(controllers []Controller) *gin.Engine { " for them. If you find a wormhole, you're a very lucky person because they are extremely rare.", }) }) + + if debug { + pprof.Register(r) + } return r } diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go index 59b5bbf..69560ba 100644 --- a/pkg/cmd/root.go +++ b/pkg/cmd/root.go @@ -8,6 +8,11 @@ import ( "github.com/urfave/cli/v2" ) +var debugFlag = &cli.BoolFlag{ + Name: "debug", + Usage: "Be more verbose when logging stuff", +} + // Run starts wormgole func Run() { app := &cli.App{ @@ -21,10 +26,7 @@ func Run() { }, Version: projectVersion, Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "debug", - Usage: "Be more verbose when logging stuff", - }, + debugFlag, &cli.BoolFlag{ Name: "trace", Usage: "Be even more verbose when logging stuff", diff --git a/pkg/cmd/server.go b/pkg/cmd/server.go index bbd40bb..13370cc 100644 --- a/pkg/cmd/server.go +++ b/pkg/cmd/server.go @@ -189,7 +189,7 @@ var serverCommand *cli.Command = &cli.Command{ err := api.NewAdminAPI([]api.Controller{ api.NewAppsController(appSource), api.NewPeersController(peerStorage, wgConfig, watcher), - }).Run(":8082") + }, c.Bool(debugFlag.Name)).Run(":8082") if err != nil { logrus.Fatalf("Failed to start admin API: %v", err) } diff --git a/pkg/k8s/svcdetector/repository.go b/pkg/k8s/svcdetector/repository.go index 0b16983..683b88a 100644 --- a/pkg/k8s/svcdetector/repository.go +++ b/pkg/k8s/svcdetector/repository.go @@ -45,6 +45,8 @@ func (event watchEvent) isDeleted() bool { type defaultServiceRepository struct { client dynamic.Interface + // it tends to hang after a while + serviceInformerRestartInterval time.Duration } func (repository defaultServiceRepository) list() ([]serviceWrapper, error) { @@ -73,20 +75,21 @@ func (repository defaultServiceRepository) list() ([]serviceWrapper, error) { } func (repository defaultServiceRepository) watch() chan watchEvent { - informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( - repository.client, - time.Second*10, - metav1.NamespaceAll, - nil, - ) - informer := informerFactory.ForResource(schema.GroupVersionResource{ - Group: "", - Version: "v1", - Resource: "services", - }) theChannel := make(chan watchEvent) - go func() { - stopCh := make(chan struct{}) + runInformerInBg(func() chan struct{} { + informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( + repository.client, + time.Second*10, + metav1.NamespaceAll, + nil, + ) + logrus.Debug("Setting up new kubernetes service informer") + informer := informerFactory.ForResource(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", + }) + informerStopChan := make(chan struct{}) go func(stopCh <-chan struct{}, s cache.SharedIndexInformer) { handlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { @@ -110,12 +113,10 @@ func (repository defaultServiceRepository) watch() chan watchEvent { return } s.Run(stopCh) - }(stopCh, informer.Informer()) - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, os.Interrupt) - <-sigCh - close(stopCh) - }() + logrus.Debug("Kubernetes service informer stopped") + }(informerStopChan, informer.Informer()) + return informerStopChan + }, repository.serviceInformerRestartInterval) return theChannel } @@ -145,6 +146,33 @@ func (repository defaultServiceRepository) dispatchEvents(eventType int, informe // NewDefaultServiceRepository creates ServiceRepository instances func NewDefaultServiceRepository(client dynamic.Interface) ServiceRepository { return &defaultServiceRepository{ - client: client, + client: client, + serviceInformerRestartInterval: time.Minute * 5, } } + +func runInformerInBg(f func() chan struct{}, timeout time.Duration) { + go func() { + for { + timeoutChan := make(chan struct{}) + go func() { + time.Sleep(timeout) + select { + case timeoutChan <- struct{}{}: + default: + } + }() + stopChan := f() + sigIngChan := make(chan os.Signal, 1) + signal.Notify(sigIngChan, os.Interrupt) + select { + case <-sigIngChan: + close(stopChan) + return + case <-timeoutChan: + logrus.Debug("Timeout reached, restarting kubernetes service informer") + close(stopChan) + } + } + }() +}