Skip to content

Commit

Permalink
Merge pull request #671 from haoming29/exit-with-error
Browse files Browse the repository at this point in the history
Exit main process if error group goroutine returns error
  • Loading branch information
haoming29 authored Jan 24, 2024
2 parents cb64873 + ef234e3 commit 91974fd
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 41 deletions.
49 changes: 25 additions & 24 deletions cmd/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,18 @@ func getNSAdsFromDirector() ([]director.NamespaceAd, error) {
}

func serveCache(cmd *cobra.Command, _ []string) error {
err := serveCacheInternal(cmd.Context())
cancel, err := serveCacheInternal(cmd.Context())
if err != nil {
cancel()
return err
}

return nil
}

func serveCacheInternal(ctx context.Context) error {
func serveCacheInternal(cmdCtx context.Context) (context.CancelFunc, error) {
// Use this context for any goroutines that needs to react to server shutdown
ctx, shutdownCancel := context.WithCancel(ctx)
ctx, shutdownCancel := context.WithCancel(cmdCtx)

err := config.InitServer(ctx, config.CacheType)
cobra.CheckErr(err)
Expand All @@ -97,92 +98,92 @@ func serveCacheInternal(ctx context.Context) error {
egrp = &errgroup.Group{}
}

// Added the same logic from launcher.go as we currently launch cache separately from other services
egrp.Go(func() error {
log.Debug("Will shutdown process on signal")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
select {
case sig := <-sigs:
log.Debugf("Received signal %v; will shutdown process", sig)
log.Warningf("Received signal %v; will shutdown process", sig)
shutdownCancel()
return errors.New("Federation process has been cancelled")
return nil
case <-ctx.Done():
return nil
}
})

err = xrootd.SetUpMonitoring(ctx, egrp)
if err != nil {
return err
return shutdownCancel, err
}

nsAds, err := getNSAdsFromDirector()
if err != nil {
return err
return shutdownCancel, err
}

cacheServer := &cache_ui.CacheServer{}
cacheServer.SetNamespaceAds(nsAds)
err = server_ui.CheckDefaults(cacheServer)
if err != nil {
return err
return shutdownCancel, err
}

cachePrefix := "/caches/" + param.Xrootd_Sitename.GetString()

viper.Set("Origin.NamespacePrefix", cachePrefix)

if err = server_ui.RegisterNamespaceWithRetry(ctx, egrp); err != nil {
return err
return shutdownCancel, err
}

if err = server_ui.LaunchPeriodicAdvertise(ctx, egrp, []server_utils.XRootDServer{cacheServer}); err != nil {
return err
return shutdownCancel, err
}

engine, err := web_ui.GetEngine()
if err != nil {
return err
return shutdownCancel, err
}

// Set up necessary APIs to support Web UI, including auth and metrics
if err := web_ui.ConfigureServerWebAPI(ctx, engine, egrp); err != nil {
return err
return shutdownCancel, err
}

go func() {
if err := web_ui.RunEngine(ctx, engine, egrp); err != nil {
log.Panicln("Failure when running the web engine:", err)
egrp.Go(func() (err error) {
if err = web_ui.RunEngine(ctx, engine, egrp); err != nil {
log.Errorln("Failure when running the web engine:", err)
}
shutdownCancel()
}()
return
})
if param.Server_EnableUI.GetBool() {
if err = web_ui.ConfigureEmbeddedPrometheus(ctx, engine); err != nil {
return errors.Wrap(err, "Failed to configure embedded prometheus instance")
return shutdownCancel, errors.Wrap(err, "Failed to configure embedded prometheus instance")
}

if err = web_ui.InitServerWebLogin(ctx); err != nil {
return err
return shutdownCancel, err
}
}

configPath, err := xrootd.ConfigXrootd(ctx, false)
if err != nil {
return err
return shutdownCancel, err
}

xrootd.LaunchXrootdMaintenance(ctx, cacheServer, 2*time.Minute)

log.Info("Launching cache")
launchers, err := xrootd.ConfigureLaunchers(false, configPath, false)
if err != nil {
return err
return shutdownCancel, err
}

if err = daemon.LaunchDaemons(ctx, launchers, egrp); err != nil {
return err
return shutdownCancel, err
}

log.Info("Clean shutdown of the cache")
return nil
return shutdownCancel, nil
}
13 changes: 6 additions & 7 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,17 @@ func (i *uint16Value) Type() string {
func (i *uint16Value) String() string { return strconv.FormatUint(uint64(*i), 10) }

func Execute() error {
egrp := errgroup.Group{}
egrp, egrpCtx := errgroup.WithContext(context.Background())
defer func() {
err := egrp.Wait()
if err != nil {
if err.Error() == "Federation process has been cancelled" {
log.Info("Process was shutdown")
return
}
log.Errorln("Error occurred when shutting down process:", err)
log.Errorln("Fatal error occurred that lead to the shutdown of the process:", err)
} else {
// Use Error instead of Info because our default log level is Error
log.Error("Pelican is safely exited")
}
}()
ctx := context.WithValue(context.Background(), config.EgrpKey, &egrp)
ctx := context.WithValue(egrpCtx, config.EgrpKey, egrp)
return rootCmd.ExecuteContext(ctx)
}

Expand Down
3 changes: 2 additions & 1 deletion director/director_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,12 @@ func ConfigTTLCache(ctx context.Context, egrp *errgroup.Group) {
// Put stop logic in a separate goroutine so that parent function is not blocking
egrp.Go(func() error {
<-ctx.Done()
log.Info("Gracefully stopping TTL cache eviction...")
log.Info("Gracefully stopping director TTL cache eviction...")
serverAds.DeleteAll()
serverAds.Stop()
namespaceKeys.DeleteAll()
namespaceKeys.Stop()
log.Info("Director TTL cache eviction has been stopped")
return nil
})
}
5 changes: 5 additions & 0 deletions director/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type PromDiscoveryItem struct {
var (
minClientVersion, _ = version.NewVersion("7.0.0")
minOriginVersion, _ = version.NewVersion("7.0.0")
minCacheVersion, _ = version.NewVersion("7.3.0")
healthTestCancelFuncs = make(map[ServerAd]context.CancelFunc)
healthTestCancelFuncsMutex = sync.RWMutex{}
)
Expand Down Expand Up @@ -152,6 +153,10 @@ func versionCompatCheck(ginCtx *gin.Context) error {
minCompatVer = minClientVersion
case "origin":
minCompatVer = minOriginVersion
case "cache":
minCompatVer = minCacheVersion
default:
return errors.Errorf("Invalid version format. The director does not support your %s version (%s).", service, reqVer.String())
}

if reqVer.LessThan(minCompatVer) {
Expand Down
5 changes: 2 additions & 3 deletions launchers/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func LaunchModules(ctx context.Context, modules config.ServerType) (context.Canc
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
select {
case sig := <-sigs:
log.Debugf("Received signal %v; will shutdown process", sig)
log.Warningf("Received signal %v; will shutdown process", sig)
shutdownCancel()
return errors.New("Federation process has been cancelled")
return nil
case <-ctx.Done():
return nil
}
Expand Down Expand Up @@ -139,7 +139,6 @@ func LaunchModules(ctx context.Context, modules config.ServerType) (context.Canc
egrp.Go(func() error {
if err := web_ui.RunEngine(ctx, engine, egrp); err != nil {
log.Errorln("Failure when running the web engine:", err)
shutdownCancel()
return err
}
log.Info("Web engine has shutdown")
Expand Down
2 changes: 1 addition & 1 deletion metrics/xrootd_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func ConfigureMonitoring(ctx context.Context, egrp *errgroup.Group) (int, error)
sessions.Stop()
userids.Stop()
transfers.Stop()
log.Infoln("Gracefully stopping metrics cache auto eviction...")
log.Infoln("Xrootd metrics cache eviction has been stopped")
return nil
})

Expand Down
2 changes: 1 addition & 1 deletion origin_ui/origin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func LaunchPeriodicDirectorTimeout(ctx context.Context, egrp *errgroup.Group) {
log.Debugln("Got notification from director")
directorTimeoutTicker.Reset(directorTimeoutDuration)
case <-ctx.Done():
log.Infoln("Gracefully terminating the director-health test timeout loop...")
log.Infoln("Director health test timeout loop has been terminated")
return nil
}
}
Expand Down
1 change: 1 addition & 0 deletions server_ui/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func LaunchPeriodicAdvertise(ctx context.Context, egrp *errgroup.Group, servers
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusOK, "")
}
case <-ctx.Done():
log.Infoln("Periodic advertisement loop has been terminated")
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion server_utils/server_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func LaunchWatcherMaintenance(ctx context.Context, dirPaths []string, descriptio
log.Warningf("Failure during %s routine: %v", description, err)
}
} else if chosen == 1 {
log.Infof("%s routine has been cancelled. Shutting down", description)
log.Infof("%s routine has been cancelled. Shutting down", description)
return nil
} else if chosen == 2 { // watcher.Events
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion web_ui/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func ConfigureEmbeddedPrometheus(ctx context.Context, engine *gin.Engine) error
// Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
select {
case <-ctx.Done():
err := level.Warn(logger).Log("msg", "Received shutdown, exiting gracefully...")
err := level.Info(logger).Log("msg", "Received shutdown, exiting gracefully...")
_ = err
reloadReady.Close()
case <-cancel:
Expand Down
4 changes: 2 additions & 2 deletions web_ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ func runEngineWithListener(ctx context.Context, ln net.Listener, engine *gin.Eng
defer cancel()
err = server.Shutdown(ctx)
if err != nil {
log.Panicln("Failed to shutdown server:", err)
log.Errorln("Failed to shutdown server:", err)
}
return nil
return err
})

if err := server.ServeTLS(ln, "", ""); err != nil && !errors.Is(err, http.ErrServerClosed) {
Expand Down

0 comments on commit 91974fd

Please sign in to comment.