Skip to content

Commit

Permalink
informant: service orchestration (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sam Kleinman authored Mar 13, 2023
1 parent b2f5506 commit 774288b
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions cmd/vm-informant/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"net/http"
Expand All @@ -13,6 +12,7 @@ import (
"time"

"github.com/containerd/cgroups/v3/cgroup2"
"github.com/tychoish/fun/srv"

klog "k8s.io/klog/v2"

Expand All @@ -25,6 +25,17 @@ const minSubProcessRestartInterval = 5 * time.Second
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM)
defer cancel()
ctx = srv.SetShutdown(ctx) // allows workers to cause a shutdown
ctx = srv.WithOrchestrator(ctx) // creates and starts an orchestrator
ctx = srv.SetBaseContext(ctx) // sets a context for starting async work in request scopes

orca := srv.GetOrchestrator(ctx)

defer func() {
if err := orca.Service().Wait(); err != nil {
klog.Fatal("failed to shut down service", err)
}
}()

buildInfo := util.GetBuildInfo()
klog.Infof("buildInfo.GitInfo: %s", buildInfo.GitInfo)
Expand Down Expand Up @@ -69,6 +80,10 @@ func main() {
}

runRestartOnFailure(ctx, args, cleanupHooks)
// this cancels the context
srv.GetShutdown(ctx)()
// this drops to the defer that waits for all services to shutdown
// will run now.
return
}

Expand Down Expand Up @@ -108,14 +123,17 @@ func main() {
util.AddHandler("", mux, "/upscale", http.MethodPut, "RawResources", state.NotifyUpscale)
util.AddHandler("", mux, "/unregister", http.MethodDelete, "AgentDesc", state.UnregisterAgent)

server := http.Server{Addr: "0.0.0.0:10301", Handler: mux}
klog.Infof("Starting server at %s", server.Addr)
err = server.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
klog.Infof("Server ended.")
} else {
klog.Fatalf("Server failed: %s", err)
addr := "0.0.0.0:10301"
klog.Infof("Starting server at %s", addr)

// we create an http service and add it to the orchestrator,
// which will start it and manage its lifecycle.
if err := orca.Add(srv.HTTP("vm-informant-api", 5*time.Second, &http.Server{Addr: addr, Handler: mux})); err != nil {
klog.Fatalf("failed to add informant api server: %s", err)
}

// we drop to the defers now, which will block until the signal
// handler is called.
}

// runRestartOnFailure repeatedly calls this binary with the same flags, but with 'auto-restart'
Expand All @@ -131,6 +149,9 @@ func runRestartOnFailure(ctx context.Context, args []string, cleanupHooks []func

for {
startTime := time.Now()
sig := make(chan struct{})
var exitMode string

func() {
pctx, pcancel := context.WithCancel(context.Background())
defer pcancel()
Expand All @@ -143,6 +164,8 @@ func runRestartOnFailure(ctx context.Context, args []string, cleanupHooks []func
err := cmd.Start()
if err == nil {
go func() {
defer close(sig)

select {
case <-pctx.Done():
return
Expand Down Expand Up @@ -181,18 +204,18 @@ func runRestartOnFailure(ctx context.Context, args []string, cleanupHooks []func

select {
case <-ctx.Done():
klog.Infof("vm-informant restart loop: received termination signal")
klog.Infof("vm-informant: received signal")
return
default:
case <-sig:
dur := time.Since(startTime)
if dur < minSubProcessRestartInterval {
// drain the timer before resetting it, required by Timer.Reset:
// drain the timer before resetting it, required by Timer.Reset::
if !timer.Stop() {
<-timer.C
}
timer.Reset(minSubProcessRestartInterval - dur)

klog.Infof("vm-informant exited. respecting minimum wait of %s", minSubProcessRestartInterval)
klog.Infof("vm-informant %s. respecting minimum wait of %s", exitMode, minSubProcessRestartInterval)
select {
case <-ctx.Done():
klog.Infof("vm-informant restart loop: received termination signal")
Expand Down

0 comments on commit 774288b

Please sign in to comment.