diff --git a/cmd/app.go b/cmd/app.go index e46dcc6..b283d91 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -19,6 +19,7 @@ type App struct { hardwareSvc *hardwareService dbSvc *dbService + networkSvc *networkService } type Opts struct { @@ -37,6 +38,11 @@ type dbService struct { queries map[string]string } +type networkService struct { + hosts []string + queries map[string]string +} + // fetchHWMetrics fetches hardware metrics from the Prometheus HTTP API. func (app *App) fetchHWMetrics() (map[string]models.HWPromResp, error) { hwMetrics := make(map[string]models.HWPromResp) @@ -138,6 +144,40 @@ func (app *App) fetchDBMetrics() (map[string]models.DBPromResp, error) { return dbMetrics, nil } +// fetchNetworkMetrics fetches network metrics from the Prometheus HTTP API. +func (app *App) fetchNetworkMetrics() (map[string]models.NetworkPromResp, error) { + networkMetrics := make(map[string]models.NetworkPromResp) + + for _, host := range app.networkSvc.hosts { + networkMetricsResp := models.NetworkPromResp{} + for metric, query := range app.networkSvc.queries { + switch metric { + case "packet_errors": + value, err := app.metricsMgr.Query(fmt.Sprintf(query, host, host)) + if err != nil { + app.lo.Error("Failed to query Prometheus", + "host", host, + "metric", metric, + "error", err) + continue + } + networkMetricsResp.PacketErrors = value + + default: + app.lo.Warn("Unknown network metric queried", + "host", host, + "metric", metric) + } + } + + // Add host metrics to the map. + networkMetrics[host] = networkMetricsResp + app.lo.Debug("fetched metrics", "host", host, "data", networkMetricsResp) + } + + return networkMetrics, nil +} + // pushHWMetrics pushes hardware metrics to the NSE. func (app *App) pushHWMetrics(host string, data models.HWPromResp) error { for i := 0; i < app.opts.MaxRetries; i++ { @@ -185,3 +225,27 @@ func (app *App) pushDBMetrics(host string, data models.DBPromResp) error { } return nil } + +// pushNetworkMetrics pushes network metrics to the NSE. +func (app *App) pushNetworkMetrics(host string, data models.NetworkPromResp) error { + for i := 0; i < app.opts.MaxRetries; i++ { + if err := app.nseMgr.PushNetworkMetrics(host, data); err != nil { + // Handle retry logic. + if i < app.opts.MaxRetries-1 { + app.lo.Error("Failed to push network metrics to NSE. Retrying...", + "host", host, + "attempt", i+1, + "error", err) + time.Sleep(app.opts.RetryInterval) + continue + } + app.lo.Error("Failed to push network metrics to NSE after max retries", + "host", host, + "max_retries", app.opts.MaxRetries, + "error", err) + return err + } + break + } + return nil +} diff --git a/cmd/init.go b/cmd/init.go index 7965fab..1804aaa 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -112,10 +112,11 @@ func initMetricsManager(ko *koanf.Koanf) (*metrics.Manager, error) { func inithardwareSvc(ko *koanf.Koanf) (*hardwareService, error) { var ( queries = map[string]string{ - "cpu": ko.MustString("metrics.hardware.cpu"), - "memory": ko.MustString("metrics.hardware.memory"), - "disk": ko.MustString("metrics.hardware.disk"), - "uptime": ko.MustString("metrics.hardware.uptime"), + "cpu": ko.MustString("metrics.hardware.cpu"), + "memory": ko.MustString("metrics.hardware.memory"), + "disk": ko.MustString("metrics.hardware.disk"), + "uptime": ko.MustString("metrics.hardware.uptime"), + "network_packet_erro": ko.MustString("metrics.hardware.uptime"), } hosts = ko.Strings("metrics.hardware.hosts") cfgPath = ko.String("prometheus.config_path") @@ -175,6 +176,38 @@ func initDBSvc(ko *koanf.Koanf) (*dbService, error) { }, nil } +// Load network metrics queries and hosts from the configuration +func initNetworkSvc(ko *koanf.Koanf) (*networkService, error) { + var ( + queries = map[string]string{ + "packet_errors": ko.MustString("metrics.network.packet_errors"), + } + hosts = ko.Strings("metrics.network.hosts") + cfgPath = ko.String("prometheus.config_path") + ) + + // If no hosts are provided, try to load from the prometheus config. + if len(hosts) == 0 && cfgPath != "" { + // Fallback to the default hosts from the config. + // Load the config files from the path provided. + defaultHosts, err := initDefaultHosts(ko, cfgPath) + if err != nil { + return nil, err + } + hosts = defaultHosts + } + + // Validate that hosts are loaded. + if len(hosts) == 0 { + return nil, fmt.Errorf("no hosts found in the config") + } + + return &networkService{ + hosts: hosts, + queries: queries, + }, nil +} + // initNSEManager initialises the NSE manager. func initNSEManager(ko *koanf.Koanf, lo *slog.Logger) (*nse.Manager, error) { nseMgr, err := nse.New(lo, nse.Opts{ diff --git a/cmd/main.go b/cmd/main.go index 4cbc1cb..8a80293 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -45,6 +45,13 @@ func main() { exit() } + // Load queries for network metrics. + networkSvc, err := initNetworkSvc(ko) + if err != nil { + lo.Error("failed to init network service", "error", err) + exit() + } + // Initialise the NSE manager. nseMgr, err := initNSEManager(ko, lo) if err != nil { @@ -60,6 +67,7 @@ func main() { nseMgr: nseMgr, hardwareSvc: hardwareSvc, dbSvc: dbSvc, + networkSvc: networkSvc, } // Create a new context which is cancelled when `SIGINT`/`SIGTERM` is received. @@ -74,6 +82,9 @@ func main() { wg.Add(1) go app.syncDBMetricsWorker(ctx, wg) + wg.Add(1) + go app.syncNetworkMetricsWorker(ctx, wg) + // Listen on the close channel indefinitely until a // `SIGINT` or `SIGTERM` is received. <-ctx.Done() @@ -155,3 +166,38 @@ func (app *App) syncDBMetricsWorker(ctx context.Context, wg *sync.WaitGroup) { } } } + +func (app *App) syncNetworkMetricsWorker(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + ticker := time.NewTicker(app.opts.SyncInterval) + defer ticker.Stop() + + app.lo.Info("Starting network metrics worker", "interval", app.opts.SyncInterval) + for { + select { + case <-ticker.C: + data, err := app.fetchNetworkMetrics() + if err != nil { + app.lo.Error("Failed to fetch network metrics", "error", err) + continue + } + + // Push to upstream LAMA APIs. + for host, hostData := range data { + if err := app.pushNetworkMetrics(host, hostData); err != nil { + app.lo.Error("Failed to push network metrics to NSE", "host", host, "error", err) + continue + } + + // FIXME: Currently the LAMA API does not support multiple hosts. + // Once we've pushed the data for the first host, break the loop. + // Once the LAMA API supports multiple hosts, remove this. + break + } + case <-ctx.Done(): + app.lo.Info("Stopping network metrics worker") + return + } + } +} diff --git a/config.sample.toml b/config.sample.toml index 06b9633..0c3c57f 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -1,35 +1,40 @@ [app] -log_level = "debug" # To enable debug logging, level should be `debug`. -sync_interval = "5m" # Interval at which the app should fetch data from metrics store. +log_level = "debug" # To enable debug logging, level should be `debug`. +sync_interval = "5m" # Interval at which the app should fetch data from metrics store. retry_interval = "5s" # Interval at which the app should retry if the previous request failed. -max_retries = 3 # Maximum number of retries for a failed request. +max_retries = 3 # Maximum number of retries for a failed request. [lama.nse] url = "https://lama.nse.internal" # Endpoint for NSE LAMA API Gateway login_id = "redacted" member_id = "redacted" password = "redacted" -timeout = "30s" # Timeout for HTTP requests -idle_timeout = "5m" # Idle timeout for HTTP requests -exchange_id = 1 # 1=National Stock Exchange +timeout = "30s" # Timeout for HTTP requests +idle_timeout = "5m" # Idle timeout for HTTP requests +exchange_id = 1 # 1=National Stock Exchange [prometheus] -endpoint = "http://prometheus:9090" # Endpoint for Prometheus API -query_path = "/api/v1/query" # Endpoint for Prometheus query API -username = "redacted" # HTTP Basic Auth username -password = "redacted" # HTTP Basic Auth password -timeout = "10s" # Timeout for HTTP requests -idle_timeout = "5m" # Idle timeout for HTTP requests +endpoint = "http://prometheus:9090" # Endpoint for Prometheus API +query_path = "/api/v1/query" # Endpoint for Prometheus query API +username = "redacted" # HTTP Basic Auth username +password = "redacted" # HTTP Basic Auth password +timeout = "10s" # Timeout for HTTP requests +idle_timeout = "5m" # Idle timeout for HTTP requests max_idle_conns = 10 config_path = "/etc/prometheus/prometheus.yml" # Path to Prometheus config file. This is used to load a list of hosts to fetch metrics for. [metrics.hardware] # Define Prometheus queries for hardware metrics -hosts = [] # List of hosts to fetch metrics for. Keep this empty to fetch metrics for all hosts defined in `prometheus.config_path` file. +# List of hosts to fetch metrics for. Keep this empty to fetch metrics for all hosts defined in `prometheus.config_path` file. +hosts = [] cpu = '100 * (1 - avg(rate(node_cpu_seconds_total{mode="idle", hostname="%s"}[5m])))' memory = '(1 - ((node_memory_MemFree_bytes{hostname="%s"} + node_memory_Buffers_bytes{hostname="%s"} + node_memory_Cached_bytes{hostname="%s"}) / node_memory_MemTotal_bytes{hostname="%s"})) * 100' disk = '100 - ((node_filesystem_avail_bytes{hostname="%s",device!~"rootfs"} * 100) / node_filesystem_size_bytes{hostname="%s",device!~"rootfs"})' uptime = '(node_time_seconds{hostname="%s"} - node_boot_time_seconds{hostname="%s"}) / 60' [metrics.database] # Define Prometheus queries for db metrics -hosts = [] # List of hosts to fetch metrics for. Keep this empty to fetch metrics for all hosts defined in `prometheus.config_path` file. +hosts = [] status = 'up{hostname="%s"}' + +[metrics.network] +packet_errors = 'sum(rate(node_network_receive_errs_total{hostname="%s"}[5m])) + sum(rate(node_network_transmit_errs_total{hostname="%s"}[5m]))' +hosts = [] diff --git a/internal/nse/nse.go b/internal/nse/nse.go index a689fae..8c536dc 100644 --- a/internal/nse/nse.go +++ b/internal/nse/nse.go @@ -48,8 +48,9 @@ type Manager struct { token string - dbSeqID int - hwSeqID int + dbSeqID int + hwSeqID int + netSeqID int } type LoginReq struct { @@ -115,6 +116,14 @@ type DatabaseReq struct { Payload []MetricPayload `json:"payload"` } +type NetworkReq struct { + MemberID string `json:"memberId"` + ExchangeID int `json:"exchangeId"` + SequenceID int `json:"sequenceId"` + Timestamp int64 `json:"timestamp"` + Payload []MetricPayload `json:"payload"` +} + func New(lo *slog.Logger, opts Opts) (*Manager, error) { client := &http.Client{ Timeout: opts.Timeout, @@ -141,12 +150,13 @@ func New(lo *slog.Logger, opts Opts) (*Manager, error) { lgr.Debug("mii-lama client created") mgr := &Manager{ - opts: opts, - lo: lgr, - client: client, - headers: h, - hwSeqID: 1, - dbSeqID: 1, + opts: opts, + lo: lgr, + client: client, + headers: h, + hwSeqID: 1, + dbSeqID: 1, + netSeqID: 1, } return mgr, nil @@ -426,6 +436,114 @@ func newMetricData(key string, avg float64, simple bool) MetricData { } } +// PushNetworkMetrics sends network metrics to NSE LAMA API. +func (mgr *Manager) PushNetworkMetrics(host string, data models.NetworkPromResp) error { + endpoint := fmt.Sprintf("%s%s", mgr.opts.URL, "/api/V1/metrics/network") + + // Acquire read lock to safely read token and sequence ID. + mgr.RLock() + token := mgr.token + seqID := mgr.netSeqID + mgr.RUnlock() + + netPayload := createNetworkReq(data, mgr.opts.MemberID, mgr.opts.ExchangeID, seqID, 1) + + payload, err := json.Marshal(netPayload) + if err != nil { + mgr.lo.Error("Failed to marshal network metrics payload", "error", err) + return fmt.Errorf("failed to marshal network metrics payload: %v", err) + } + + mgr.lo.Info("Preparing to send network metrics", "host", host, "URL", endpoint, "payload", string(payload), "headers", mgr.headers) + + // Initialize new HTTP request for metrics push. + req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(payload)) + if err != nil { + mgr.lo.Error("Failed to create HTTP request", "error", err) + return fmt.Errorf("failed to create HTTP request: %v", err) + } + + // Set headers for the request. + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + for k, v := range mgr.headers { + req.Header.Set(k, strings.Join(v, ",")) + } + + // Execute HTTP request using the HTTP client. + resp, err := mgr.client.Do(req) + if err != nil { + mgr.lo.Error("Network metrics HTTP request failed", "error", err) + return fmt.Errorf("network metrics HTTP request failed: %v", err) + } + defer resp.Body.Close() + + // Unmarshal the response into MetricsResp object. + var r MetricsResp + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + mgr.lo.Error("Failed to unmarshal network metrics response", "error", err) + return fmt.Errorf("failed to unmarshal network metrics response: %v", err) + } + + mgr.lo.Info("Received response for network metrics push", "response_code", r.ResponseCode, "response_description", r.ResponseDesc, "http_status", resp.StatusCode) + + if resp.StatusCode != http.StatusOK { + mgr.lo.Error("Network metrics push failed", "response_code", r.ResponseCode, "response_desc", r.ResponseDesc, "errors", r.Errors) + switch r.ResponseCode { + case NSE_RESP_CODE_INVALID_TOKEN, NSE_RESP_CODE_EXPIRED_TOKEN: + mgr.lo.Warn("Token is invalid or expired, attempting to log in again") + if err := mgr.Login(); err != nil { + mgr.lo.Error("Relogin attempt failed", "error", err) + return fmt.Errorf("failed to log in again: %v", err) + } + return fmt.Errorf("new token obtained after relogin, retrying network metrics push") + + case NSE_RESP_CODE_INVALID_SEQ_ID: + mgr.lo.Warn("Sequence ID is invalid, attempting to update") + expectedSeqID, err := extractExpectedSequenceID(r.ResponseDesc) + if err != nil { + mgr.lo.Error("Failed to extract expected sequence ID", "error", err) + return fmt.Errorf("failed to extract expected sequence ID: %v", err) + } + mgr.lo.Info("Expected sequence ID identified", "expected_seq_id", expectedSeqID) + mgr.Lock() + mgr.netSeqID = expectedSeqID + mgr.Unlock() + return fmt.Errorf("sequence ID has been updated, retrying network metrics push") + + default: + mgr.lo.Error("Network metrics push failed with unhandled response code", "response_code", r.ResponseCode) + return fmt.Errorf("network metrics push failed with unhandled response code: %d", r.ResponseCode) + } + } + + // Increase sequence ID if metrics push was successful or partially successful. + if r.ResponseCode == NSE_RESP_CODE_SUCCESS || r.ResponseCode == NSE_RESP_CODE_PARTIAL_SUCCESS { + mgr.Lock() + mgr.netSeqID++ + mgr.Unlock() + } + + return nil +} + +func createNetworkReq(metrics models.NetworkPromResp, memberId string, exchangeId, sequenceId, applicationId int) NetworkReq { + return NetworkReq{ + MemberID: memberId, + ExchangeID: exchangeId, + SequenceID: sequenceId, + Timestamp: time.Now().Unix(), + Payload: []MetricPayload{ + { + ApplicationID: applicationId, + MetricData: []MetricData{ + newMetricData("packetCount", float64(metrics.PacketErrors), true), + }, + }, + }, + } +} + func createHardwareReq(metrics models.HWPromResp, memberId string, exchangeId, sequenceId, applicationId int) HardwareReq { return HardwareReq{ MemberID: memberId, diff --git a/pkg/models/models.go b/pkg/models/models.go index 766dae7..a982985 100644 --- a/pkg/models/models.go +++ b/pkg/models/models.go @@ -12,3 +12,8 @@ type HWPromResp struct { type DBPromResp struct { Status float64 `json:"status"` } + +// NetworkPromResp is the response from the Prometheus HTTP API for network metrics. +type NetworkPromResp struct { + PacketErrors float64 `json:"packet_errors"` +}