diff --git a/tests/fixture/tmpnet/monitor_processes.go b/tests/fixture/tmpnet/monitor_processes.go index c9cf2cb78700..114e3351d8a4 100644 --- a/tests/fixture/tmpnet/monitor_processes.go +++ b/tests/fixture/tmpnet/monitor_processes.go @@ -7,7 +7,9 @@ import ( "context" "errors" "fmt" + "io" "io/fs" + "net/http" "os" "os/exec" "path/filepath" @@ -16,37 +18,63 @@ import ( "time" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/wait" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/perms" ) -type configGeneratorFunc func(workingDir string, serviceDiscoveryDir string, username string, password string) string - const ( collectorTickerInterval = 100 * time.Millisecond + // TODO(marun) Maybe use dynamic HTTP ports to avoid the possibility of them being already bound? + + // Prometheus configuration + prometheusCmd = "prometheus" + defaultPrometheusURL = "https://prometheus-poc.avax-dev.network" prometheusScrapeInterval = 10 * time.Second + prometheusHTTPPort = 9090 - prometheusCmd = "prometheus" - promtailCmd = "promtail" + // Promtail configuration + promtailCmd = "promtail" + defaultLokiURL = "https://loki-poc.avax-dev.network" + promtailHTTPPort = 3101 // Use a delay slightly longer than the scrape interval to ensure a final scrape before shutdown NetworkShutdownDelay = prometheusScrapeInterval + 2*time.Second ) +var ( + prometheusListenAddress = fmt.Sprintf("127.0.0.1:%d", prometheusHTTPPort) + prometheusReadinessURL = fmt.Sprintf("http://%s/-/ready", prometheusListenAddress) + + promtailReadinessURL = fmt.Sprintf("http://127.0.0.1:%d/ready", promtailHTTPPort) +) + // StartCollectors ensures collectors are running to collect logs and metrics from local nodes. func StartCollectors(ctx context.Context, log logging.Logger) error { if _, ok := ctx.Deadline(); !ok { return errors.New("unable to start collectors with a context without a deadline") } - if err := startPrometheus(ctx, log); err != nil { + if err := startPromtail(ctx, log); err != nil { return err } - if err := startPromtail(ctx, log); err != nil { + if err := startPrometheus(ctx, log); err != nil { return err } + // Wait for readiness. These checks are performed separately from start to + // minimize time to readiness. + readinessURLs := map[string]string{ + promtailCmd: promtailReadinessURL, + prometheusCmd: prometheusReadinessURL, + } + for cmdName, readinessURLs := range readinessURLs { + if err := waitForReadiness(ctx, log, cmdName, readinessURLs); err != nil { + return err + } + } + log.Info("To stop: tmpnetctl stop-collectors") return nil @@ -57,7 +85,7 @@ func StopCollectors(ctx context.Context, log logging.Logger) error { if _, ok := ctx.Deadline(); !ok { return errors.New("unable to start collectors with a context without a deadline") } - for _, cmdName := range []string{prometheusCmd, promtailCmd} { + for _, cmdName := range []string{promtailCmd, prometheusCmd} { // Determine if the process is running workingDir, err := getWorkingDir(cmdName) if err != nil { @@ -87,12 +115,10 @@ func StopCollectors(ctx context.Context, log logging.Logger) error { zap.String("cmdName", cmdName), zap.Int("pid", proc.Pid), ) - ticker := time.NewTicker(collectorTickerInterval) - defer ticker.Stop() - for { + if err := pollUntilContextCancel(ctx, func(_ context.Context) (bool, error) { p, err := getProcess(proc.Pid) if err != nil { - return fmt.Errorf("failed to retrieve process: %w", err) + return false, fmt.Errorf("failed to retrieve process: %w", err) } if p == nil { // Process is no longer running @@ -105,15 +131,10 @@ func StopCollectors(ctx context.Context, log logging.Logger) error { zap.Error(err), ) } - - break - } - - select { - case <-ctx.Done(): - return fmt.Errorf("failed to see %s stop before timeout: %w", cmdName, ctx.Err()) - case <-ticker.C: } + return p == nil, nil + }); err != nil { + return err } log.Info("collector stopped", zap.String("cmdName", cmdName), @@ -125,14 +146,24 @@ func StopCollectors(ctx context.Context, log logging.Logger) error { // startPrometheus ensures an agent-mode prometheus process is running to collect metrics from local nodes. func startPrometheus(ctx context.Context, log logging.Logger) error { - return startCollector( - ctx, - log, - prometheusCmd, - "--config.file=prometheus.yaml --storage.agent.path=./data --web.listen-address=localhost:0 --enable-feature=agent", - "PROMETHEUS", - func(_ string, serviceDiscoveryDir string, username string, password string) string { - return fmt.Sprintf(` + cmdName := prometheusCmd + + args := fmt.Sprintf( + "--config.file=prometheus.yaml --web.listen-address=%s --enable-feature=agent --storage.agent.path=./data", + prometheusListenAddress, + ) + + username, password, err := getCollectorCredentials(cmdName) + if err != nil { + return err + } + + serviceDiscoveryDir, err := getServiceDiscoveryDir(cmdName) + if err != nil { + return err + } + + config := fmt.Sprintf(` global: scrape_interval: %v # Default is every 1 minute. evaluation_interval: 10s # The default is every 1 minute. @@ -146,34 +177,49 @@ scrape_configs: - '%s/*.json' remote_write: - - url: "https://prometheus-poc.avax-dev.network/api/v1/write" + - url: "%s/api/v1/write" basic_auth: username: "%s" password: "%s" -`, prometheusScrapeInterval, serviceDiscoveryDir, username, password) - }, - ) +`, prometheusScrapeInterval, serviceDiscoveryDir, getPrometheusURL(), username, password) + + return startCollector(ctx, log, cmdName, args, config) } // startPromtail ensures a promtail process is running to collect logs from local nodes. func startPromtail(ctx context.Context, log logging.Logger) error { - return startCollector( - ctx, - log, - promtailCmd, - "-config.file=promtail.yaml", - "LOKI", - func(workingDir string, serviceDiscoveryDir string, username string, password string) string { - return fmt.Sprintf(` + cmdName := promtailCmd + + args := fmt.Sprintf( + "--config.file=prometheus.yaml --web.listen-address=%s --enable-feature=agent --storage.agent.path=./data", + prometheusListenAddress, + ) + + username, password, err := getCollectorCredentials(cmdName) + if err != nil { + return err + } + + workingDir, err := getWorkingDir(cmdName) + if err != nil { + return err + } + + serviceDiscoveryDir, err := getServiceDiscoveryDir(cmdName) + if err != nil { + return err + } + + config := fmt.Sprintf(` server: - http_listen_port: 0 + http_listen_port: %d grpc_listen_port: 0 positions: filename: %s/positions.yaml client: - url: "https://loki-poc.avax-dev.network/api/prom/push" + url: "%s/api/prom/push" basic_auth: username: "%s" password: "%s" @@ -183,9 +229,9 @@ scrape_configs: file_sd_configs: - files: - '%s/*.json' -`, workingDir, username, password, serviceDiscoveryDir) - }, - ) +`, promtailHTTPPort, workingDir, getLokiURL(), username, password, serviceDiscoveryDir) + + return startCollector(ctx, log, cmdName, args, config) } func getWorkingDir(cmdName string) (string, error) { @@ -214,8 +260,7 @@ func startCollector( log logging.Logger, cmdName string, args string, - baseEnvName string, - configGenerator configGeneratorFunc, + config string, ) error { // Determine paths workingDir, err := getWorkingDir(cmdName) @@ -251,7 +296,12 @@ func startCollector( } // Write the collector config file - if err := writeConfigFile(log, cmdName, workingDir, baseEnvName, configGenerator); err != nil { + confFilename := cmdName + ".yaml" + confPath := filepath.Join(workingDir, confFilename) + log.Info("writing "+cmdName+" config", + zap.String("path", confPath), + ) + if err := os.WriteFile(confPath, []byte(config), perms.ReadWrite); err != nil { return err } @@ -301,36 +351,26 @@ func clearStalePIDFile(log logging.Logger, cmdName string, pidPath string) error return nil } -// writeConfigFile writes the configuration file for a collector -func writeConfigFile( - log logging.Logger, - cmdName string, - workingDir string, - baseEnvName string, - configGenerator configGeneratorFunc, -) error { - // Retrieve the credentials for the command - username, password, err := getCredentials(baseEnvName) - if err != nil { - return err - } +func getPrometheusURL() string { + return GetEnvWithDefault("PROMETHEUS_URL", defaultPrometheusURL) +} - // Generate configuration for the command to its working dir - confFilename := cmdName + ".yaml" - confPath := filepath.Join(workingDir, confFilename) - log.Info("writing "+cmdName+" config", - zap.String("path", confPath), - ) - serviceDiscoveryDir, err := getServiceDiscoveryDir(cmdName) - if err != nil { - return err - } - config := configGenerator(workingDir, serviceDiscoveryDir, username, password) - return os.WriteFile(confPath, []byte(config), perms.ReadWrite) +func getLokiURL() string { + return GetEnvWithDefault("LOKI_URL", defaultPrometheusURL) } -// getCredentials retrieves the username and password for the given base env name. -func getCredentials(baseEnvName string) (string, string, error) { +// getCollectorCredentials retrieves the username and password for the command. +func getCollectorCredentials(cmdName string) (string, string, error) { + var baseEnvName string + switch cmdName { + case prometheusCmd: + baseEnvName = "PROMETHEUS" + case promtailCmd: + baseEnvName = "LOKI" + default: + return "", "", fmt.Errorf("unsupported cmd: %s", cmdName) + } + usernameEnvVar := baseEnvName + "_USERNAME" username := GetEnvWithDefault(usernameEnvVar, "") if len(username) == 0 { @@ -359,7 +399,8 @@ func startCollectorProcess( workingDir string, pidPath string, ) error { - fullCmd := "nohup " + cmdName + " " + args + " > " + cmdName + ".log 2>&1 & echo -n \"$!\" > " + pidPath + logFilename := cmdName + ".log" + fullCmd := "nohup " + cmdName + " " + args + " > " + logFilename + " 2>&1 & echo -n \"$!\" > " + pidPath log.Info("starting "+cmdName, zap.String("workingDir", workingDir), zap.String("fullCmd", fullCmd), @@ -372,63 +413,89 @@ func startCollectorProcess( return fmt.Errorf("failed to start %s: %w", cmdName, err) } - // Wait for PID file to be written. It's not enough to check for the PID of cmd - // because the PID we want is a child of the process that cmd represents. - if pid, err := waitForPIDFile(ctx, cmdName, pidPath); err != nil { + // Wait for PID file + var pid int + if err := pollUntilContextCancel(ctx, func(_ context.Context) (bool, error) { + var err error + pid, err = getPID(cmdName, pidPath) + if err != nil { + log.Warn("failed to read PID file", + zap.String("cmd", cmdName), + zap.String("pidPath", pidPath), + zap.Error(err), + ) + } + return pid != 0, nil + }); err != nil { return err - } else { - log.Info(cmdName+" started", - zap.String("pid", pid), - ) } + log.Info(cmdName+" started", + zap.Int("pid", pid), + ) - // TODO(marun) Perform a readiness check - // TODO(marun) Check that the log is not empty + // Wait for non-empty log file. An empty log file should only occur if the command + // invocation is not correctly redirecting stderr and stdout to the expected file. + logPath := filepath.Join(workingDir, logFilename) + if err := pollUntilContextCancel(ctx, func(_ context.Context) (bool, error) { + logData, err := os.ReadFile(logPath) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return false, fmt.Errorf("failed to read log file %s for %s: %w", logPath, cmdName, err) + } + return len(logData) != 0, nil + }); err != nil { + return fmt.Errorf("empty log file %s for %s indicates misconfiguration: %w", logPath, cmdName, err) + } return nil } -// waitForPIDFile waits for the PID file to be written as an indication of process start. -func waitForPIDFile(ctx context.Context, cmdName string, pidPath string) (string, error) { - var ( - ticker = time.NewTicker(collectorTickerInterval) - pid string - ) - defer ticker.Stop() - for { - if fileExistsAndNotEmpty(pidPath) { - var err error - pid, err = readFileContents(pidPath) - if err != nil { - return "", fmt.Errorf("failed to read pid file: %w", err) - } - break - } - select { - case <-ctx.Done(): - return "", fmt.Errorf("failed to wait for %s to start before timeout: %w", cmdName, ctx.Err()) - case <-ticker.C: - } +// checkReadiness retrieves the provided URL and indicates whether it returned 200 +func checkReadiness(ctx context.Context, url string) (bool, string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return false, "", err } - return pid, nil -} -func fileExistsAndNotEmpty(filename string) bool { - fileInfo, err := os.Stat(filename) + resp, err := http.DefaultClient.Do(req) if err != nil { - if os.IsNotExist(err) { - return false - } - fmt.Printf("Error stating file: %v\n", err) - return false + return false, "", fmt.Errorf("request failed: %w", err) } - return fileInfo.Size() > 0 -} + defer resp.Body.Close() -func readFileContents(filename string) (string, error) { - content, err := os.ReadFile(filename) + body, err := io.ReadAll(resp.Body) if err != nil { - return "", err + return false, "", fmt.Errorf("failed to read response: %w", err) } - return string(content), nil + + return resp.StatusCode == http.StatusOK, string(body), nil +} + +// waitForReadiness waits until the given readiness URL returns 200 +func waitForReadiness(ctx context.Context, log logging.Logger, cmdName string, readinessURL string) error { + log.Info("waiting for "+cmdName+" readiness", + zap.String("url", readinessURL), + ) + if err := pollUntilContextCancel(ctx, func(_ context.Context) (bool, error) { + ready, body, err := checkReadiness(ctx, readinessURL) + if err == nil { + return ready, nil + } + log.Warn("failed to check readiness", + zap.String("cmd", cmdName), + zap.String("url", readinessURL), + zap.String("body", body), + zap.Error(err), + ) + return false, nil + }); err != nil { + return err + } + log.Info(cmdName+" ready", + zap.String("url", readinessURL), + ) + return nil +} + +func pollUntilContextCancel(ctx context.Context, condition wait.ConditionWithContextFunc) error { + return wait.PollUntilContextCancel(ctx, collectorTickerInterval, true /* immediate */, condition) }