Skip to content

Commit

Permalink
#34: Kubernetes service informer automatically restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
glothriel committed Jun 6, 2024
1 parent 7cbbd77 commit ea8a1be
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 27 deletions.
5 changes: 4 additions & 1 deletion docker/go/dev-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# /src should contain files bundled from dockerfile
# /src-tmp should have mounted volume, that will allow syncing file from Tilt


# if /src-tmp is empty, copy all files from /src to /src-tmp
if [ ! "$(ls -A /src-tmp)" ]; then
cp -r /src/* /src-tmp
Expand All @@ -14,6 +15,8 @@ find . -type f -exec bash -c 'for file; do [ ! -e "/src/$file" ] && rm -f "$file

args=$@
cwd=$(pwd)
echo "Starting watchexec on ${cwd}"
echo "Starting watchexec on ${cwd} (first time compilation may take a while)"


set -x
watchexec -n -q -r -e go,mod,sum -- sh -c "while true; do sleep 1 && go run main.go --debug ${args}; done"
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22

require (
github.com/avast/retry-go/v4 v4.5.1
github.com/gin-contrib/pprof v1.5.0
github.com/gin-gonic/gin v1.10.0
github.com/gorilla/mux v1.8.0
github.com/mitchellh/go-ps v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/gin-contrib/pprof v1.5.0 h1:E/Oy7g+kNw94KfdCy3bZxQFtyDnAX2V7axRS7sNYVrU=
github.com/gin-contrib/pprof v1.5.0/go.mod h1:GqFL6LerKoCQ/RSWnkYczkTJ+tOAUVN/8sbnEtaqOKs=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
Expand Down
8 changes: 7 additions & 1 deletion pkg/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package api

import (
"github.com/gin-gonic/gin"

"github.com/gin-contrib/pprof"
)

// Controller contains a set of functionalities for the API
Expand All @@ -11,7 +13,7 @@ type Controller interface {
}

// NewAdminAPI bootstraps the creation of the gin engine
func NewAdminAPI(controllers []Controller) *gin.Engine {
func NewAdminAPI(controllers []Controller, debug bool) *gin.Engine {
r := gin.Default()
for _, controller := range controllers {
controller.registerRoutes(r)
Expand All @@ -22,5 +24,9 @@ func NewAdminAPI(controllers []Controller) *gin.Engine {
" for them. If you find a wormhole, you're a very lucky person because they are extremely rare.",
})
})

if debug {
pprof.Register(r)
}
return r
}
10 changes: 6 additions & 4 deletions pkg/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/urfave/cli/v2"
)

var debugFlag = &cli.BoolFlag{
Name: "debug",
Usage: "Be more verbose when logging stuff",
}

// Run starts wormgole
func Run() {
app := &cli.App{
Expand All @@ -21,10 +26,7 @@ func Run() {
},
Version: projectVersion,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "debug",
Usage: "Be more verbose when logging stuff",
},
debugFlag,
&cli.BoolFlag{
Name: "trace",
Usage: "Be even more verbose when logging stuff",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ var serverCommand *cli.Command = &cli.Command{
err := api.NewAdminAPI([]api.Controller{
api.NewAppsController(appSource),
api.NewPeersController(peerStorage, wgConfig, watcher),
}).Run(":8082")
}, c.Bool(debugFlag.Name)).Run(":8082")
if err != nil {
logrus.Fatalf("Failed to start admin API: %v", err)
}
Expand Down
68 changes: 48 additions & 20 deletions pkg/k8s/svcdetector/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func (event watchEvent) isDeleted() bool {

type defaultServiceRepository struct {
client dynamic.Interface
// it tends to hang after a while
serviceInformerRestartInterval time.Duration
}

func (repository defaultServiceRepository) list() ([]serviceWrapper, error) {
Expand Down Expand Up @@ -73,20 +75,21 @@ func (repository defaultServiceRepository) list() ([]serviceWrapper, error) {
}

func (repository defaultServiceRepository) watch() chan watchEvent {
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
repository.client,
time.Second*10,
metav1.NamespaceAll,
nil,
)
informer := informerFactory.ForResource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "services",
})
theChannel := make(chan watchEvent)
go func() {
stopCh := make(chan struct{})
runInformerInBg(func() chan struct{} {
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
repository.client,
time.Second*10,
metav1.NamespaceAll,
nil,
)
logrus.Debug("Setting up new kubernetes service informer")
informer := informerFactory.ForResource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "services",
})
informerStopChan := make(chan struct{})
go func(stopCh <-chan struct{}, s cache.SharedIndexInformer) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
Expand All @@ -110,12 +113,10 @@ func (repository defaultServiceRepository) watch() chan watchEvent {
return
}
s.Run(stopCh)
}(stopCh, informer.Informer())
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
<-sigCh
close(stopCh)
}()
logrus.Debug("Kubernetes service informer stopped")
}(informerStopChan, informer.Informer())
return informerStopChan
}, repository.serviceInformerRestartInterval)
return theChannel
}

Expand Down Expand Up @@ -145,6 +146,33 @@ func (repository defaultServiceRepository) dispatchEvents(eventType int, informe
// NewDefaultServiceRepository creates ServiceRepository instances
func NewDefaultServiceRepository(client dynamic.Interface) ServiceRepository {
return &defaultServiceRepository{
client: client,
client: client,
serviceInformerRestartInterval: time.Minute * 5,
}
}

func runInformerInBg(f func() chan struct{}, timeout time.Duration) {
go func() {
for {
timeoutChan := make(chan struct{})
go func() {
time.Sleep(timeout)
select {
case timeoutChan <- struct{}{}:
default:
}
}()
stopChan := f()
sigIngChan := make(chan os.Signal, 1)
signal.Notify(sigIngChan, os.Interrupt)
select {
case <-sigIngChan:
close(stopChan)
return
case <-timeoutChan:
logrus.Debug("Timeout reached, restarting kubernetes service informer")
close(stopChan)
}
}
}()
}

0 comments on commit ea8a1be

Please sign in to comment.