Skip to content

Commit f53b260

Browse files
committed
Shut runnables down gracefully and log so
This commit - fixes a problem whereby the runner loop didn't exit (the good old "break out of the select but not out of the for" mistake) - uses a WaitGroup to make sure all the runner children (i.e., client caches) have exited - puts in more logging of things starting up and shutting down Signed-off-by: Michael Bridgen <[email protected]>
1 parent af86737 commit f53b260

File tree

4 files changed

+36
-13
lines changed

4 files changed

+36
-13
lines changed

controllers/leveltriggered/caching.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (c *caches) setupWithManager(mgr ctrl.Manager) error {
6363
c.baseLogger = mgr.GetLogger().WithValues("component", "target-cache")
6464
c.reader = mgr.GetClient() // this specifically gets the client that has the indexing installed below; i.e., these are coupled.
6565

66-
c.runner = newRunner()
66+
c.runner = newRunner(mgr.GetLogger().WithValues("component", "cache-runner"))
6767
if err := mgr.Add(c.runner); err != nil {
6868
return err
6969
}
@@ -193,7 +193,7 @@ func (c *caches) watchTargetAndGetReader(ctx context.Context, clusterObject *clu
193193
return nil, false, err
194194
}
195195

196-
cancel := c.runner.run(func(ctx context.Context) {
196+
cancel := c.runner.run("cache-"+cacheKey.String(), func(ctx context.Context) {
197197
if err := ca.Start(ctx); err != nil {
198198
logger.Error(err, "cache exited with error")
199199
}
@@ -373,6 +373,7 @@ func (gc *gc) loop() {
373373
for {
374374
item, shutdown := gc.queue.Get()
375375
if shutdown {
376+
gc.log.Info("exiting cache GC loop")
376377
return
377378
}
378379
key, ok := item.(clusterAndGVK)

controllers/leveltriggered/runner.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package leveltriggered
22

3-
import "context"
3+
import (
4+
"context"
5+
"github.com/go-logr/logr"
6+
"sync"
7+
)
48

59
// This is a dead simple way to run things using a manager's context as a base, so that they will
610
// get shut down when the manager does. It must be constructed with `newRunner`, and added to a manager:
@@ -19,29 +23,33 @@ import "context"
1923
// It'll deadlock if you call `run` before adding it to a manager (or otherwise calling `Start`).
2024

2125
type runWithContext struct {
22-
ctx context.Context
23-
run func(context.Context)
26+
name string
27+
ctx context.Context
28+
do func(context.Context)
2429
}
2530

2631
type runner struct {
32+
log logr.Logger
2733
rootContext context.Context
2834
tostart chan runWithContext
2935
ready chan struct{}
3036
}
3137

32-
func newRunner() *runner {
38+
func newRunner(log logr.Logger) *runner {
3339
return &runner{
40+
log: log,
3441
tostart: make(chan runWithContext),
3542
ready: make(chan struct{}),
3643
}
3744
}
3845

39-
func (r *runner) run(fn func(ctx context.Context)) context.CancelFunc {
46+
func (r *runner) run(name string, fn func(ctx context.Context)) context.CancelFunc {
4047
<-r.ready // wait until there's a root context
4148
ctx, cancel := context.WithCancel(r.rootContext)
4249
r.tostart <- runWithContext{
43-
run: fn,
44-
ctx: ctx,
50+
name: name,
51+
do: fn,
52+
ctx: ctx,
4553
}
4654
return cancel
4755
}
@@ -51,12 +59,24 @@ func (r *runner) run(fn func(ctx context.Context)) context.CancelFunc {
5159
func (r *runner) Start(ctx context.Context) error {
5260
r.rootContext = ctx
5361
close(r.ready) // broadcast that things can be run
62+
var wg sync.WaitGroup
63+
loop:
5464
for {
5565
select {
5666
case randc := <-r.tostart:
57-
go randc.run(randc.ctx)
67+
r.log.Info("starting child", "name", randc.name)
68+
wg.Add(1)
69+
go func(rc runWithContext) {
70+
defer wg.Done()
71+
rc.do(rc.ctx)
72+
r.log.Info("child exited", "name", rc.name)
73+
}(randc)
5874
case <-r.rootContext.Done():
59-
return nil
75+
break loop
6076
}
6177
}
78+
r.log.Info("Stopping and waiting for children")
79+
wg.Wait()
80+
r.log.Info("All children stopped; runner exit")
81+
return nil
6282
}

controllers/leveltriggered/suite_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,13 @@ func TestMain(m *testing.M) {
176176

177177
cancel()
178178
wg.Wait()
179+
log.Println("manager exited")
179180

180181
err = testEnv.Stop()
181182
if err != nil {
182-
log.Fatalf("stoping test env failed: %s", err)
183+
log.Fatalf("stopping test env failed: %s", err)
183184
}
185+
log.Println("test env stopped")
184186

185187
os.Exit(retCode)
186188
}

controllers/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func TestMain(m *testing.M) {
163163

164164
err = testEnv.Stop()
165165
if err != nil {
166-
log.Fatalf("stoping test env failed: %s", err)
166+
log.Fatalf("stopping test env failed: %s", err)
167167
}
168168

169169
os.Exit(retCode)

0 commit comments

Comments
 (0)