From f2681cc3461bd71f0de56be76da5374bf461e1d4 Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Wed, 17 Jan 2024 20:04:53 +0000 Subject: [PATCH 1/7] Fix director panic when cache registers --- director/redirect.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/director/redirect.go b/director/redirect.go index 638ff5f73..da7a45a14 100644 --- a/director/redirect.go +++ b/director/redirect.go @@ -45,6 +45,7 @@ type PromDiscoveryItem struct { var ( minClientVersion, _ = version.NewVersion("7.0.0") minOriginVersion, _ = version.NewVersion("7.0.0") + minCacheVersion, _ = version.NewVersion("7.3.0") healthTestCancelFuncs = make(map[ServerAd]context.CancelFunc) healthTestCancelFuncsMutex = sync.RWMutex{} ) @@ -152,6 +153,10 @@ func versionCompatCheck(ginCtx *gin.Context) error { minCompatVer = minClientVersion case "origin": minCompatVer = minOriginVersion + case "cache": + minCompatVer = minCacheVersion + default: + return errors.Errorf("Invalid version format. The director does not support your %s version (%s).", service, reqVer.String()) } if reqVer.LessThan(minCompatVer) { From f641e5f680930c94940128774a0ec0dc54739722 Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Wed, 17 Jan 2024 20:55:40 +0000 Subject: [PATCH 2/7] Fix cache not exit with fatal error --- cmd/cache_serve.go | 9 ++++----- cmd/root.go | 10 +++------- metrics/xrootd_metrics.go | 2 +- server_utils/server_utils.go | 2 +- 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/cmd/cache_serve.go b/cmd/cache_serve.go index 8e5b3bacf..0996f9750 100644 --- a/cmd/cache_serve.go +++ b/cmd/cache_serve.go @@ -85,9 +85,9 @@ func serveCache(cmd *cobra.Command, _ []string) error { return nil } -func serveCacheInternal(ctx context.Context) error { +func serveCacheInternal(cmdCtx context.Context) error { // Use this context for any goroutines that needs to react to server shutdown - ctx, shutdownCancel := context.WithCancel(ctx) + ctx, shutdownCancel := context.WithCancel(cmdCtx) err := config.InitServer(ctx, config.CacheType) cobra.CheckErr(err) @@ -103,9 +103,9 @@ func serveCacheInternal(ctx context.Context) error { signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) select { case sig := <-sigs: - log.Debugf("Received signal %v; will shutdown process", sig) + log.Infof("Received signal %v; will shutdown process", sig) shutdownCancel() - return errors.New("Federation process has been cancelled") + return nil case <-ctx.Done(): return nil } @@ -183,6 +183,5 @@ func serveCacheInternal(ctx context.Context) error { return err } - log.Info("Clean shutdown of the cache") return nil } diff --git a/cmd/root.go b/cmd/root.go index 36ad11e66..d7ebb23c5 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -78,18 +78,14 @@ func (i *uint16Value) Type() string { func (i *uint16Value) String() string { return strconv.FormatUint(uint64(*i), 10) } func Execute() error { - egrp := errgroup.Group{} + egrp, egrpCtx := errgroup.WithContext(context.Background()) defer func() { err := egrp.Wait() if err != nil { - if err.Error() == "Federation process has been cancelled" { - log.Info("Process was shutdown") - return - } - log.Errorln("Error occurred when shutting down process:", err) + log.Errorln("Fatal error occured that leads to shutdown of the process:", err) } }() - ctx := context.WithValue(context.Background(), config.EgrpKey, &egrp) + ctx := context.WithValue(egrpCtx, config.EgrpKey, egrp) return rootCmd.ExecuteContext(ctx) } diff --git a/metrics/xrootd_metrics.go b/metrics/xrootd_metrics.go index 87ccd647a..8bf08b4b9 100644 --- a/metrics/xrootd_metrics.go +++ b/metrics/xrootd_metrics.go @@ -326,7 +326,7 @@ func ConfigureMonitoring(ctx context.Context, egrp *errgroup.Group) (int, error) sessions.Stop() userids.Stop() transfers.Stop() - log.Infoln("Gracefully stopping metrics cache auto eviction...") + log.Infoln("Metrics cache auto eviction has been stopped") return nil }) diff --git a/server_utils/server_utils.go b/server_utils/server_utils.go index b5f3c06c1..a6d416ce7 100644 --- a/server_utils/server_utils.go +++ b/server_utils/server_utils.go @@ -145,7 +145,7 @@ func LaunchWatcherMaintenance(ctx context.Context, dirPaths []string, descriptio log.Warningf("Failure during %s routine: %v", description, err) } } else if chosen == 1 { - log.Infof("%s routine has been cancelled. Shutting down", description) + log.Infof("%s routine has been cancelled. Shutting down", description) return nil } else if chosen == 2 { // watcher.Events if !ok { From 5b99fbb655afad8d300810119a59d166a7883cab Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Wed, 17 Jan 2024 21:01:44 +0000 Subject: [PATCH 3/7] Refactor federation errgroup to return correct value --- director/director_api.go | 3 ++- launchers/launcher.go | 3 +-- metrics/xrootd_metrics.go | 2 +- origin_ui/origin_api.go | 2 +- server_ui/advertise.go | 1 + web_ui/ui.go | 2 +- 6 files changed, 7 insertions(+), 6 deletions(-) diff --git a/director/director_api.go b/director/director_api.go index 1f43f7669..ccd7d4345 100644 --- a/director/director_api.go +++ b/director/director_api.go @@ -204,11 +204,12 @@ func ConfigTTLCache(ctx context.Context, egrp *errgroup.Group) { // Put stop logic in a separate goroutine so that parent function is not blocking egrp.Go(func() error { <-ctx.Done() - log.Info("Gracefully stopping TTL cache eviction...") + log.Info("Gracefully stopping director TTL cache eviction...") serverAds.DeleteAll() serverAds.Stop() namespaceKeys.DeleteAll() namespaceKeys.Stop() + log.Info("Director TTL cache eviction has been stopped") return nil }) } diff --git a/launchers/launcher.go b/launchers/launcher.go index ce4d8be95..b99f53a65 100644 --- a/launchers/launcher.go +++ b/launchers/launcher.go @@ -53,7 +53,7 @@ func LaunchModules(ctx context.Context, modules config.ServerType) (context.Canc case sig := <-sigs: log.Debugf("Received signal %v; will shutdown process", sig) shutdownCancel() - return errors.New("Federation process has been cancelled") + return nil case <-ctx.Done(): return nil } @@ -139,7 +139,6 @@ func LaunchModules(ctx context.Context, modules config.ServerType) (context.Canc egrp.Go(func() error { if err := web_ui.RunEngine(ctx, engine, egrp); err != nil { log.Errorln("Failure when running the web engine:", err) - shutdownCancel() return err } log.Info("Web engine has shutdown") diff --git a/metrics/xrootd_metrics.go b/metrics/xrootd_metrics.go index 8bf08b4b9..b5d7034eb 100644 --- a/metrics/xrootd_metrics.go +++ b/metrics/xrootd_metrics.go @@ -326,7 +326,7 @@ func ConfigureMonitoring(ctx context.Context, egrp *errgroup.Group) (int, error) sessions.Stop() userids.Stop() transfers.Stop() - log.Infoln("Metrics cache auto eviction has been stopped") + log.Infoln("Xrootd metrics cache eviction has been stopped") return nil }) diff --git a/origin_ui/origin_api.go b/origin_ui/origin_api.go index 54df1396f..b77e632f6 100644 --- a/origin_ui/origin_api.go +++ b/origin_ui/origin_api.go @@ -113,7 +113,7 @@ func LaunchPeriodicDirectorTimeout(ctx context.Context, egrp *errgroup.Group) { log.Debugln("Got notification from director") directorTimeoutTicker.Reset(directorTimeoutDuration) case <-ctx.Done(): - log.Infoln("Gracefully terminating the director-health test timeout loop...") + log.Infoln("Director health test timeout loop has been terminated") return nil } } diff --git a/server_ui/advertise.go b/server_ui/advertise.go index c8ee517b2..38c4b5afc 100644 --- a/server_ui/advertise.go +++ b/server_ui/advertise.go @@ -66,6 +66,7 @@ func LaunchPeriodicAdvertise(ctx context.Context, egrp *errgroup.Group, servers metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusOK, "") } case <-ctx.Done(): + log.Infoln("Periodic advertisement loop has been terminated") return nil } } diff --git a/web_ui/ui.go b/web_ui/ui.go index f5a10913e..334786e65 100644 --- a/web_ui/ui.go +++ b/web_ui/ui.go @@ -388,7 +388,7 @@ func runEngineWithListener(ctx context.Context, ln net.Listener, engine *gin.Eng if err != nil { log.Panicln("Failed to shutdown server:", err) } - return nil + return err }) if err := server.ServeTLS(ln, "", ""); err != nil && !errors.Is(err, http.ErrServerClosed) { From d9461815e8a9505923b2fa00c3d783e28d6264b7 Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Wed, 17 Jan 2024 21:28:28 +0000 Subject: [PATCH 4/7] Refactor a bunch of exit related logic --- cmd/cache_serve.go | 51 +++++++++++++++---------------------------- cmd/root.go | 3 +++ launchers/launcher.go | 2 +- web_ui/prometheus.go | 2 +- 4 files changed, 22 insertions(+), 36 deletions(-) diff --git a/cmd/cache_serve.go b/cmd/cache_serve.go index 0996f9750..1f56fbb1a 100644 --- a/cmd/cache_serve.go +++ b/cmd/cache_serve.go @@ -24,9 +24,6 @@ import ( "context" "encoding/json" "net/url" - "os" - "os/signal" - "syscall" "time" "github.com/pelicanplatform/pelican/cache_ui" @@ -87,31 +84,15 @@ func serveCache(cmd *cobra.Command, _ []string) error { func serveCacheInternal(cmdCtx context.Context) error { // Use this context for any goroutines that needs to react to server shutdown - ctx, shutdownCancel := context.WithCancel(cmdCtx) - - err := config.InitServer(ctx, config.CacheType) + err := config.InitServer(cmdCtx, config.CacheType) cobra.CheckErr(err) - egrp, ok := ctx.Value(config.EgrpKey).(*errgroup.Group) + egrp, ok := cmdCtx.Value(config.EgrpKey).(*errgroup.Group) if !ok { egrp = &errgroup.Group{} } - egrp.Go(func() error { - log.Debug("Will shutdown process on signal") - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - select { - case sig := <-sigs: - log.Infof("Received signal %v; will shutdown process", sig) - shutdownCancel() - return nil - case <-ctx.Done(): - return nil - } - }) - - err = xrootd.SetUpMonitoring(ctx, egrp) + err = xrootd.SetUpMonitoring(cmdCtx, egrp) if err != nil { return err } @@ -132,11 +113,11 @@ func serveCacheInternal(cmdCtx context.Context) error { viper.Set("Origin.NamespacePrefix", cachePrefix) - if err = server_ui.RegisterNamespaceWithRetry(ctx, egrp); err != nil { + if err = server_ui.RegisterNamespaceWithRetry(cmdCtx, egrp); err != nil { return err } - if err = server_ui.LaunchPeriodicAdvertise(ctx, egrp, []server_utils.XRootDServer{cacheServer}); err != nil { + if err = server_ui.LaunchPeriodicAdvertise(cmdCtx, egrp, []server_utils.XRootDServer{cacheServer}); err != nil { return err } @@ -146,32 +127,34 @@ func serveCacheInternal(cmdCtx context.Context) error { } // Set up necessary APIs to support Web UI, including auth and metrics - if err := web_ui.ConfigureServerWebAPI(ctx, engine, egrp); err != nil { + if err := web_ui.ConfigureServerWebAPI(cmdCtx, engine, egrp); err != nil { return err } - go func() { - if err := web_ui.RunEngine(ctx, engine, egrp); err != nil { + egrp.Go(func() error { + if err := web_ui.RunEngine(cmdCtx, engine, egrp); err != nil { log.Panicln("Failure when running the web engine:", err) + return err + } else { + return err } - shutdownCancel() - }() + }) if param.Server_EnableUI.GetBool() { - if err = web_ui.ConfigureEmbeddedPrometheus(ctx, engine); err != nil { + if err = web_ui.ConfigureEmbeddedPrometheus(cmdCtx, engine); err != nil { return errors.Wrap(err, "Failed to configure embedded prometheus instance") } - if err = web_ui.InitServerWebLogin(ctx); err != nil { + if err = web_ui.InitServerWebLogin(cmdCtx); err != nil { return err } } - configPath, err := xrootd.ConfigXrootd(ctx, false) + configPath, err := xrootd.ConfigXrootd(cmdCtx, false) if err != nil { return err } - xrootd.LaunchXrootdMaintenance(ctx, cacheServer, 2*time.Minute) + xrootd.LaunchXrootdMaintenance(cmdCtx, cacheServer, 2*time.Minute) log.Info("Launching cache") launchers, err := xrootd.ConfigureLaunchers(false, configPath, false) @@ -179,7 +162,7 @@ func serveCacheInternal(cmdCtx context.Context) error { return err } - if err = daemon.LaunchDaemons(ctx, launchers, egrp); err != nil { + if err = daemon.LaunchDaemons(cmdCtx, launchers, egrp); err != nil { return err } diff --git a/cmd/root.go b/cmd/root.go index d7ebb23c5..fe78ee160 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -83,6 +83,9 @@ func Execute() error { err := egrp.Wait() if err != nil { log.Errorln("Fatal error occured that leads to shutdown of the process:", err) + } else { + // Use Error instead of Info because our default log level is Error + log.Error("Pelican is safely exited") } }() ctx := context.WithValue(egrpCtx, config.EgrpKey, egrp) diff --git a/launchers/launcher.go b/launchers/launcher.go index b99f53a65..c3707a95e 100644 --- a/launchers/launcher.go +++ b/launchers/launcher.go @@ -51,7 +51,7 @@ func LaunchModules(ctx context.Context, modules config.ServerType) (context.Canc signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) select { case sig := <-sigs: - log.Debugf("Received signal %v; will shutdown process", sig) + log.Warningf("Received signal %v; will shutdown process", sig) shutdownCancel() return nil case <-ctx.Done(): diff --git a/web_ui/prometheus.go b/web_ui/prometheus.go index 9baf1de31..af36b2959 100644 --- a/web_ui/prometheus.go +++ b/web_ui/prometheus.go @@ -595,7 +595,7 @@ func ConfigureEmbeddedPrometheus(ctx context.Context, engine *gin.Engine) error // Don't forget to release the reloadReady channel so that waiting blocks can exit normally. select { case <-ctx.Done(): - err := level.Warn(logger).Log("msg", "Received shutdown, exiting gracefully...") + err := level.Info(logger).Log("msg", "Received shutdown, exiting gracefully...") _ = err reloadReady.Close() case <-cancel: From e336cbd06206146d03fc33f9a99b87519fd01ba7 Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Wed, 17 Jan 2024 22:25:20 +0000 Subject: [PATCH 5/7] Don't panic don't panic --- cmd/cache_serve.go | 2 +- web_ui/ui.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/cache_serve.go b/cmd/cache_serve.go index 1f56fbb1a..a029f3582 100644 --- a/cmd/cache_serve.go +++ b/cmd/cache_serve.go @@ -133,7 +133,7 @@ func serveCacheInternal(cmdCtx context.Context) error { egrp.Go(func() error { if err := web_ui.RunEngine(cmdCtx, engine, egrp); err != nil { - log.Panicln("Failure when running the web engine:", err) + log.Errorln("Failure when running the web engine:", err) return err } else { return err diff --git a/web_ui/ui.go b/web_ui/ui.go index 334786e65..37ae882ca 100644 --- a/web_ui/ui.go +++ b/web_ui/ui.go @@ -386,7 +386,7 @@ func runEngineWithListener(ctx context.Context, ln net.Listener, engine *gin.Eng defer cancel() err = server.Shutdown(ctx) if err != nil { - log.Panicln("Failed to shutdown server:", err) + log.Errorln("Failed to shutdown server:", err) } return err }) From dcf10300bc6147d3cb69503208501d355d76f91a Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Thu, 18 Jan 2024 16:11:03 +0000 Subject: [PATCH 6/7] Go generate trim extra new line --- param/parameters_struct.go | 1 - 1 file changed, 1 deletion(-) diff --git a/param/parameters_struct.go b/param/parameters_struct.go index a5fdd0052..37c9bfdd7 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -332,4 +332,3 @@ type configWithType struct { SummaryMonitoringHost struct { Type string; Value string } } } - From ef234e3d8257b040d76e2532c827ed5d2424aafb Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Thu, 18 Jan 2024 16:16:19 +0000 Subject: [PATCH 7/7] Address code review feedback --- cmd/cache_serve.go | 81 ++++++++++++++++++++++++++++------------------ cmd/root.go | 2 +- 2 files changed, 51 insertions(+), 32 deletions(-) diff --git a/cmd/cache_serve.go b/cmd/cache_serve.go index a029f3582..0bed38f6c 100644 --- a/cmd/cache_serve.go +++ b/cmd/cache_serve.go @@ -24,6 +24,9 @@ import ( "context" "encoding/json" "net/url" + "os" + "os/signal" + "syscall" "time" "github.com/pelicanplatform/pelican/cache_ui" @@ -74,97 +77,113 @@ func getNSAdsFromDirector() ([]director.NamespaceAd, error) { } func serveCache(cmd *cobra.Command, _ []string) error { - err := serveCacheInternal(cmd.Context()) + cancel, err := serveCacheInternal(cmd.Context()) if err != nil { + cancel() return err } return nil } -func serveCacheInternal(cmdCtx context.Context) error { +func serveCacheInternal(cmdCtx context.Context) (context.CancelFunc, error) { // Use this context for any goroutines that needs to react to server shutdown - err := config.InitServer(cmdCtx, config.CacheType) + ctx, shutdownCancel := context.WithCancel(cmdCtx) + + err := config.InitServer(ctx, config.CacheType) cobra.CheckErr(err) - egrp, ok := cmdCtx.Value(config.EgrpKey).(*errgroup.Group) + egrp, ok := ctx.Value(config.EgrpKey).(*errgroup.Group) if !ok { egrp = &errgroup.Group{} } - err = xrootd.SetUpMonitoring(cmdCtx, egrp) + // Added the same logic from launcher.go as we currently launch cache separately from other services + egrp.Go(func() error { + log.Debug("Will shutdown process on signal") + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + select { + case sig := <-sigs: + log.Warningf("Received signal %v; will shutdown process", sig) + shutdownCancel() + return nil + case <-ctx.Done(): + return nil + } + }) + + err = xrootd.SetUpMonitoring(ctx, egrp) if err != nil { - return err + return shutdownCancel, err } nsAds, err := getNSAdsFromDirector() if err != nil { - return err + return shutdownCancel, err } cacheServer := &cache_ui.CacheServer{} cacheServer.SetNamespaceAds(nsAds) err = server_ui.CheckDefaults(cacheServer) if err != nil { - return err + return shutdownCancel, err } cachePrefix := "/caches/" + param.Xrootd_Sitename.GetString() viper.Set("Origin.NamespacePrefix", cachePrefix) - if err = server_ui.RegisterNamespaceWithRetry(cmdCtx, egrp); err != nil { - return err + if err = server_ui.RegisterNamespaceWithRetry(ctx, egrp); err != nil { + return shutdownCancel, err } - if err = server_ui.LaunchPeriodicAdvertise(cmdCtx, egrp, []server_utils.XRootDServer{cacheServer}); err != nil { - return err + if err = server_ui.LaunchPeriodicAdvertise(ctx, egrp, []server_utils.XRootDServer{cacheServer}); err != nil { + return shutdownCancel, err } engine, err := web_ui.GetEngine() if err != nil { - return err + return shutdownCancel, err } // Set up necessary APIs to support Web UI, including auth and metrics - if err := web_ui.ConfigureServerWebAPI(cmdCtx, engine, egrp); err != nil { - return err + if err := web_ui.ConfigureServerWebAPI(ctx, engine, egrp); err != nil { + return shutdownCancel, err } - egrp.Go(func() error { - if err := web_ui.RunEngine(cmdCtx, engine, egrp); err != nil { + egrp.Go(func() (err error) { + if err = web_ui.RunEngine(ctx, engine, egrp); err != nil { log.Errorln("Failure when running the web engine:", err) - return err - } else { - return err } + return }) if param.Server_EnableUI.GetBool() { - if err = web_ui.ConfigureEmbeddedPrometheus(cmdCtx, engine); err != nil { - return errors.Wrap(err, "Failed to configure embedded prometheus instance") + if err = web_ui.ConfigureEmbeddedPrometheus(ctx, engine); err != nil { + return shutdownCancel, errors.Wrap(err, "Failed to configure embedded prometheus instance") } - if err = web_ui.InitServerWebLogin(cmdCtx); err != nil { - return err + if err = web_ui.InitServerWebLogin(ctx); err != nil { + return shutdownCancel, err } } - configPath, err := xrootd.ConfigXrootd(cmdCtx, false) + configPath, err := xrootd.ConfigXrootd(ctx, false) if err != nil { - return err + return shutdownCancel, err } - xrootd.LaunchXrootdMaintenance(cmdCtx, cacheServer, 2*time.Minute) + xrootd.LaunchXrootdMaintenance(ctx, cacheServer, 2*time.Minute) log.Info("Launching cache") launchers, err := xrootd.ConfigureLaunchers(false, configPath, false) if err != nil { - return err + return shutdownCancel, err } - if err = daemon.LaunchDaemons(cmdCtx, launchers, egrp); err != nil { - return err + if err = daemon.LaunchDaemons(ctx, launchers, egrp); err != nil { + return shutdownCancel, err } - return nil + return shutdownCancel, nil } diff --git a/cmd/root.go b/cmd/root.go index fe78ee160..51e418dbb 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -82,7 +82,7 @@ func Execute() error { defer func() { err := egrp.Wait() if err != nil { - log.Errorln("Fatal error occured that leads to shutdown of the process:", err) + log.Errorln("Fatal error occurred that lead to the shutdown of the process:", err) } else { // Use Error instead of Info because our default log level is Error log.Error("Pelican is safely exited")