diff --git a/clients/bothttp/client.go b/clients/bothttp/client.go index 7adfd893..55be9af6 100644 --- a/clients/bothttp/client.go +++ b/clients/bothttp/client.go @@ -5,19 +5,34 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" + "strings" "github.com/forta-network/forta-core-go/utils/httpclient" "github.com/hashicorp/go-multierror" + log "github.com/sirupsen/logrus" +) + +var ( + // responseSizeLimit is the maximum number of bytes to read from the response body. + responseSizeLimit = int64(2 << 20) // 2MB ) type HealthResponse struct { - Errors []string `json:"errors"` + Errors []string `json:"errors"` + Metrics []Metrics `json:"metrics"` +} + +type Metrics struct { + // ChainID is the id of the chain the metrics are for + ChainID int64 `json:"chainId"` + DataPoints map[string][]float64 `json:"dataPoints"` } // Client is the bot HTTP client interface. type Client interface { - Health(ctx context.Context) error + Health(ctx context.Context) ([]Metrics, error) } type botClient struct { @@ -25,7 +40,7 @@ type botClient struct { httpClient *http.Client } -// NewClient creates anew client. +// NewClient creates a new client. func NewClient(host string, port int) Client { return &botClient{ baseUrl: fmt.Sprintf("http://%s:%d", host, port), @@ -34,31 +49,42 @@ func NewClient(host string, port int) Client { } // Health does a health check on the bot. -func (bc *botClient) Health(ctx context.Context) error { +func (bc *botClient) Health(ctx context.Context) ([]Metrics, error) { healthUrl := fmt.Sprintf("%s/health", bc.baseUrl) req, err := http.NewRequestWithContext(ctx, "GET", healthUrl, nil) if err != nil { - return err + return nil, err } + resp, err := bc.httpClient.Do(req) if err != nil { - return err + return nil, err } defer resp.Body.Close() if resp.StatusCode > 200 { - return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } var healthResp HealthResponse - if err := json.NewDecoder(resp.Body).Decode(&healthResp); err != nil { - return nil // ignore decoding errors + + // Limit the response size to a certain number of bytes + limitedReader := io.LimitReader(resp.Body, responseSizeLimit) + if err := json.NewDecoder(limitedReader).Decode(&healthResp); err != nil { + if strings.Contains(err.Error(), "EOF") { + log.WithError(err).Warn("response size limit for health check is reached") + } + + return nil, nil // ignore decoding errors } + if len(healthResp.Errors) == 0 { - return nil + return healthResp.Metrics, nil } + for _, errMsg := range healthResp.Errors { err = multierror.Append(err, errors.New(errMsg)) } - return err + + return healthResp.Metrics, err } diff --git a/clients/bothttp/client_test.go b/clients/bothttp/client_test.go index d2f6fe81..cdec5181 100644 --- a/clients/bothttp/client_test.go +++ b/clients/bothttp/client_test.go @@ -6,6 +6,7 @@ import ( "net/http" "testing" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/require" ) @@ -28,12 +29,42 @@ func TestHealth(t *testing.T) { go server.ListenAndServe() client := NewClient("localhost", 8183) - err := client.Health(context.Background()) + _, err := client.Health(context.Background()) r.NoError(err) respData.Errors = append(respData.Errors, "some error msg") - err = client.Health(context.Background()) + _, err = client.Health(context.Background()) r.Error(err) + respData = HealthResponse{ + Metrics: []Metrics{ + { + ChainID: 1, + DataPoints: map[string][]float64{ + "tx.success": {1, 2, 3}, + }, + }, + { + ChainID: 2, + DataPoints: map[string][]float64{ + "tx.success": {3}, + }, + }, + }, + } + + hook := test.NewGlobal() + + metrics, err := client.Health(context.Background()) + r.NoError(err) + r.EqualValues(respData.Metrics, metrics) + + responseSizeLimit = 1 + + _, err = client.Health(context.Background()) + r.NoError(err) + r.Equal(1, len(hook.Entries)) + r.Equal("response size limit for health check is reached", hook.LastEntry().Message) + server.Close() } diff --git a/go.mod b/go.mod index a3b9755d..58de2f5a 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible require ( github.com/docker/docker v1.6.2 github.com/docker/go-connections v0.4.0 - github.com/forta-network/forta-core-go v0.0.0-20240129095537-dad5459b7283 + github.com/forta-network/forta-core-go v0.0.0-20240129180226-af53540338f3 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.39.0 diff --git a/go.sum b/go.sum index ddb97fd1..d4c5c1a3 100644 --- a/go.sum +++ b/go.sum @@ -331,6 +331,8 @@ github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwU github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/forta-network/forta-core-go v0.0.0-20240129095537-dad5459b7283 h1:MmvZ3so59eNLtsJgEnRS1cwy/uqI/PazAS0x9Xkl3+E= github.com/forta-network/forta-core-go v0.0.0-20240129095537-dad5459b7283/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= +github.com/forta-network/forta-core-go v0.0.0-20240129180226-af53540338f3 h1:tfuCghhFdyolM3CiapTxtdLVHcy7ssRUjo5JxwwJnGc= +github.com/forta-network/forta-core-go v0.0.0-20240129180226-af53540338f3/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= diff --git a/services/components/botio/bot_client.go b/services/components/botio/bot_client.go index 855697bc..adb70ee0 100644 --- a/services/components/botio/bot_client.go +++ b/services/components/botio/bot_client.go @@ -545,7 +545,7 @@ func (bot *botClient) processTransaction(ctx context.Context, lg *log.Entry, req // truncate findings if len(resp.Findings) > MaxFindings { dropped := len(resp.Findings) - MaxFindings - droppedMetric := metrics.CreateAgentMetric(botConfig, metrics.MetricFindingsDropped, float64(dropped)) + droppedMetric := metrics.CreateAgentMetricV1(botConfig, domain.MetricFindingsDropped, float64(dropped)) bot.msgClient.PublishProto( messaging.SubjectMetricAgent, &protocol.AgentMetricList{Metrics: []*protocol.AgentMetric{droppedMetric}}, @@ -613,8 +613,8 @@ func (bot *botClient) processBlock(ctx context.Context, lg *log.Entry, request * // truncate findings if len(resp.Findings) > MaxFindings { dropped := len(resp.Findings) - MaxFindings - droppedMetric := metrics.CreateAgentMetric( - botConfig, metrics.MetricFindingsDropped, float64(dropped), + droppedMetric := metrics.CreateAgentMetricV1( + botConfig, domain.MetricFindingsDropped, float64(dropped), ) bot.msgClient.PublishProto( messaging.SubjectMetricAgent, @@ -706,7 +706,7 @@ func (bot *botClient) processCombinationAlert(ctx context.Context, lg *log.Entry // truncate findings if len(resp.Findings) > MaxFindings { dropped := len(resp.Findings) - MaxFindings - droppedMetric := metrics.CreateAgentMetric(botConfig, metrics.MetricFindingsDropped, float64(dropped)) + droppedMetric := metrics.CreateAgentMetricV1(botConfig, domain.MetricFindingsDropped, float64(dropped)) bot.msgClient.PublishProto( messaging.SubjectMetricAgent, &protocol.AgentMetricList{Metrics: []*protocol.AgentMetric{droppedMetric}}, ) @@ -754,7 +754,24 @@ func (bot *botClient) doHealthCheck(ctx context.Context, lg *log.Entry) bool { var err error if botConfig.ProtocolVersion >= 2 { - err = bot.clientV2.Health(ctx) + var botMetrics []bothttp.Metrics + botMetrics, err = bot.clientV2.Health(ctx) + + if len(botMetrics) != 0 { + agentMetrics := make([]*protocol.AgentMetric, 0, len(botMetrics)) + for _, botMetric := range botMetrics { + for metricName, metricValues := range botMetric.DataPoints { + for _, metricValue := range metricValues { + agentMetrics = append(agentMetrics, metrics.CreateAgentMetricV2(botConfig, metricName, metricValue, botMetric.ChainID)) + } + } + } + + bot.msgClient.PublishProto( + messaging.SubjectMetricAgent, + &protocol.AgentMetricList{Metrics: agentMetrics}, + ) + } } else { err = botClient.DoHealthCheck(ctx) } diff --git a/services/components/botio/bot_client_test.go b/services/components/botio/bot_client_test.go index e651a5db..02df30d6 100644 --- a/services/components/botio/bot_client_test.go +++ b/services/components/botio/bot_client_test.go @@ -96,15 +96,20 @@ func (s *BotClientSuite) TestStartProcessStop() { s.lifecycleMetrics.EXPECT().ActionSubscribe(combinerSubscriptions) // test health checks + healthCheckChan := make(chan interface{}) HealthCheckInterval = time.Second - s.botGrpc.EXPECT().DoHealthCheck(gomock.Any()).MinTimes(1) + s.botGrpc.EXPECT().DoHealthCheck(gomock.Any()).Return(nil).MinTimes(1) s.lifecycleMetrics.EXPECT().HealthCheckAttempt(s.botClient.configUnsafe).MinTimes(1) - s.lifecycleMetrics.EXPECT().HealthCheckSuccess(s.botClient.configUnsafe).MinTimes(1) + s.lifecycleMetrics.EXPECT().HealthCheckSuccess(s.botClient.configUnsafe).Do(func(_ interface{}) { + close(healthCheckChan) + }).MinTimes(1) s.msgClient.EXPECT().Publish(messaging.SubjectAgentsAlertSubscribe, combinerSubscriptions) s.botClient.StartProcessing() s.botClient.Initialize() + <-healthCheckChan + <-s.botClient.Initialized() txReq := &protocol.EvaluateTxRequest{ @@ -193,8 +198,6 @@ func (s *BotClientSuite) TestStartProcessStop() { s.lifecycleMetrics.EXPECT().ActionUnsubscribe(combinerSubscriptions) s.r.NoError(s.botClient.Close()) - // Using small sleep to allow goroutines to be executed (e.g. health check) - time.Sleep(30 * time.Millisecond) } func (s *BotClientSuite) TestCombinerBotSubscriptions() { diff --git a/services/components/botio/sender.go b/services/components/botio/sender.go index 337214f6..f4ad1e2f 100644 --- a/services/components/botio/sender.go +++ b/services/components/botio/sender.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/forta-network/forta-core-go/clients/health" + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" "github.com/forta-network/forta-node/clients" "github.com/forta-network/forta-node/clients/messaging" @@ -115,7 +116,7 @@ func (rs *requestSender) SendEvaluateTxRequest(req *protocol.EvaluateTxRequest) }: default: // do not try to send if the buffer is full lg.WithField("bot", botConfig.ID).Debug("agent tx request buffer is full - skipping") - metricsList = append(metricsList, metrics.CreateAgentMetric(botConfig, metrics.MetricTxDrop, 1)) + metricsList = append(metricsList, metrics.CreateAgentMetricV1(botConfig, domain.MetricTxDrop, 1)) } lg.WithFields(log.Fields{ "bot": botConfig.ID, @@ -164,7 +165,7 @@ func (rs *requestSender) SendEvaluateBlockRequest(req *protocol.EvaluateBlockReq }: default: // do not try to send if the buffer is full lg.WithField("bot", botConfig.ID).Warn("agent block request buffer is full - skipping") - metricsList = append(metricsList, metrics.CreateAgentMetric(botConfig, metrics.MetricBlockDrop, 1)) + metricsList = append(metricsList, metrics.CreateAgentMetricV1(botConfig, domain.MetricBlockDrop, 1)) } lg.WithFields( log.Fields{ @@ -249,7 +250,7 @@ func (rs *requestSender) SendEvaluateAlertRequest(req *protocol.EvaluateAlertReq }: default: // do not try to send if the buffer is full lg.WithField("bot", botConfig.ID).Warn("agent alert request buffer is full - skipping") - metricsList = append(metricsList, metrics.CreateAgentMetric(botConfig, metrics.MetricCombinerDrop, 1)) + metricsList = append(metricsList, metrics.CreateAgentMetricV1(botConfig, domain.MetricCombinerDrop, 1)) } lg.WithFields( diff --git a/services/components/lifecycle/bot_manager.go b/services/components/lifecycle/bot_manager.go index 59e54f0e..ecc268ba 100644 --- a/services/components/lifecycle/bot_manager.go +++ b/services/components/lifecycle/bot_manager.go @@ -208,9 +208,7 @@ func (blm *botLifecycleManager) ExitInactiveBots(ctx context.Context) error { botConfig, found := blm.findBotConfigByID(inactiveBotID) logger := log.WithField("bot", inactiveBotID) if !found { - logger.Warn("could not find the config for inactive bot - skipping stop") - // send this metric by ID, because it doesn't have a shard ID (since it's not found) - blm.lifecycleMetrics.StatusInactive(config.AgentConfig{ID: inactiveBotID}) + logger.Warn("could not find the config for inactive bot - skipping stop", inactiveBotID) continue } diff --git a/services/components/lifecycle/bot_monitor.go b/services/components/lifecycle/bot_monitor.go index 0b860912..a620f27c 100644 --- a/services/components/lifecycle/bot_monitor.go +++ b/services/components/lifecycle/bot_monitor.go @@ -3,6 +3,7 @@ package lifecycle import ( "sync" + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" "github.com/forta-network/forta-node/services/components/metrics" ) @@ -49,7 +50,7 @@ func (bm *botMonitor) UpdateWithMetrics(botMetrics *protocol.AgentMetricList) er } for _, botMetric := range botMetrics.Metrics { - if botMetric.Name == metrics.MetricStatusActive { + if botMetric.Name == domain.MetricStatusActive { bm.saveBotActivity(botMetric.AgentId) } } diff --git a/services/components/lifecycle/bot_monitor_test.go b/services/components/lifecycle/bot_monitor_test.go index 41538232..f82d74e5 100644 --- a/services/components/lifecycle/bot_monitor_test.go +++ b/services/components/lifecycle/bot_monitor_test.go @@ -4,8 +4,8 @@ import ( "testing" "time" + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" - "github.com/forta-network/forta-node/services/components/metrics" mock_metrics "github.com/forta-network/forta-node/services/components/metrics/mocks" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -51,7 +51,7 @@ func TestBotMonitor(t *testing.T) { r.NoError(botMonitor.UpdateWithMetrics(&protocol.AgentMetricList{ Metrics: []*protocol.AgentMetric{ { - Name: metrics.MetricStatusActive, + Name: domain.MetricStatusActive, AgentId: testTrackerBotID2, }, }, diff --git a/services/components/metrics/activity.go b/services/components/metrics/activity.go index 1b05f7dc..99cd4376 100644 --- a/services/components/metrics/activity.go +++ b/services/components/metrics/activity.go @@ -1,6 +1,7 @@ package metrics import ( + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" "github.com/forta-network/forta-node/config" ) @@ -10,9 +11,9 @@ func FindActiveBotsFromMetrics(allBotMetrics []*protocol.AgentMetrics) (found [] for _, botMetrics := range allBotMetrics { botID := botMetrics.AgentId for _, botMetric := range botMetrics.Metrics { - if botMetric.Name == MetricHealthCheckSuccess { + if botMetric.Name == domain.MetricHealthCheckSuccess { // copy over shardID value so metric will indicate shard - cfg := &config.AgentConfig{ID: botID} + cfg := &config.AgentConfig{ID: botID, ChainID: int(botMetric.ChainId)} if botMetric.ShardId >= 0 { cfg.ShardConfig = &config.ShardConfig{ShardID: uint(botMetric.ShardId)} } diff --git a/services/components/metrics/lifecycle.go b/services/components/metrics/lifecycle.go index a21255c8..db41e4b4 100644 --- a/services/components/metrics/lifecycle.go +++ b/services/components/metrics/lifecycle.go @@ -11,37 +11,6 @@ import ( "github.com/forta-network/forta-node/config" ) -// Bot lifecycle metrics -const ( - MetricClientDial = "agent.client.dial" - MetricClientClose = "agent.client.close" - - MetricStatusRunning = "agent.status.running" - MetricStatusAttached = "agent.status.attached" - MetricStatusInitialized = "agent.status.initialized" - MetricStatusStopping = "agent.status.stopping" - MetricStatusActive = "agent.status.active" - MetricStatusInactive = "agent.status.inactive" - - MetricActionUpdate = "agent.action.update" - MetricActionRestart = "agent.action.restart" - MetricActionSubscribe = "agent.action.subscribe" - MetricActionUnsubscribe = "agent.action.unsubscribe" - - MetricFailurePull = "agent.failure.pull" - MetricFailureLaunch = "agent.failure.launch" - MetricFailureStop = "agent.failure.stop" - MetricFailureDial = "agent.failure.dial" - MetricFailureInitialize = "agent.failure.initialize" - MetricFailureInitializeResponse = "agent.failure.initialize.response" - MetricFailureInitializeValidate = "agent.failure.initialize.validate" - MetricFailureTooManyErrs = "agent.failure.too-many-errs" - - MetricHealthCheckAttempt = "agent.health.attempt" - MetricHealthCheckSuccess = "agent.health.success" - MetricHealthCheckError = "agent.health.error" -) - // Lifecycle creates lifecycle metrics. It is useful in // understanding what is going on during lifecycle management. type Lifecycle interface { @@ -91,83 +60,83 @@ func NewLifecycleClient(msgClient clients.MessageClient) Lifecycle { } func (lc *lifecycle) ClientDial(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricClientDial, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricClientDial, "", botConfigs)) } func (lc *lifecycle) ClientClose(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricClientClose, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricClientClose, "", botConfigs)) } func (lc *lifecycle) StatusRunning(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusRunning, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusRunning, "", botConfigs)) } func (lc *lifecycle) StatusAttached(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusAttached, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusAttached, "", botConfigs)) } func (lc *lifecycle) StatusInitialized(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusInitialized, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusInitialized, "", botConfigs)) } func (lc *lifecycle) StatusStopping(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusStopping, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusStopping, "", botConfigs)) } func (lc *lifecycle) StatusActive(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusActive, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusActive, "", botConfigs)) } func (lc *lifecycle) StatusInactive(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusInactive, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusInactive, "", botConfigs)) } func (lc *lifecycle) ActionUpdate(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricActionUpdate, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricActionUpdate, "", botConfigs)) } func (lc *lifecycle) ActionRestart(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricActionRestart, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricActionRestart, "", botConfigs)) } func (lc *lifecycle) ActionSubscribe(subscriptions []domain.CombinerBotSubscription) { - SendAgentMetrics(lc.msgClient, fromBotSubscriptions(MetricActionSubscribe, subscriptions)) + SendAgentMetrics(lc.msgClient, fromBotSubscriptions(domain.MetricActionSubscribe, subscriptions)) } func (lc *lifecycle) ActionUnsubscribe(subscriptions []domain.CombinerBotSubscription) { - SendAgentMetrics(lc.msgClient, fromBotSubscriptions(MetricActionUnsubscribe, subscriptions)) + SendAgentMetrics(lc.msgClient, fromBotSubscriptions(domain.MetricActionUnsubscribe, subscriptions)) } func (lc *lifecycle) FailurePull(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailurePull, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailurePull, err.Error(), botConfigs)) } func (lc *lifecycle) FailureLaunch(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureLaunch, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureLaunch, err.Error(), botConfigs)) } func (lc *lifecycle) FailureStop(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureStop, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureStop, err.Error(), botConfigs)) } func (lc *lifecycle) FailureDial(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureDial, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureDial, err.Error(), botConfigs)) } func (lc *lifecycle) FailureInitialize(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureInitialize, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureInitialize, err.Error(), botConfigs)) } func (lc *lifecycle) FailureInitializeResponse(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureInitializeResponse, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureInitializeResponse, err.Error(), botConfigs)) } func (lc *lifecycle) FailureInitializeValidate(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureInitializeValidate, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureInitializeValidate, err.Error(), botConfigs)) } func (lc *lifecycle) FailureTooManyErrs(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureTooManyErrs, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureTooManyErrs, err.Error(), botConfigs)) } func (lc *lifecycle) BotError(metricName string, err error, botConfigs ...config.AgentConfig) { @@ -183,20 +152,20 @@ func (lc *lifecycle) SystemStatus(metricName string, details string) { } func (lc *lifecycle) HealthCheckAttempt(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricHealthCheckAttempt, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricHealthCheckAttempt, "", botConfigs)) } func (lc *lifecycle) HealthCheckSuccess(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricHealthCheckSuccess, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricHealthCheckSuccess, "", botConfigs)) } func (lc *lifecycle) HealthCheckError(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricHealthCheckError, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricHealthCheckError, err.Error(), botConfigs)) } func fromBotSubscriptions(action string, subscriptions []domain.CombinerBotSubscription) (metrics []*protocol.AgentMetric) { for _, botSub := range subscriptions { - metrics = append(metrics, CreateAgentMetric(config.AgentConfig{ID: botSub.Subscriber.BotID}, action, 1)) + metrics = append(metrics, CreateAgentMetricV1(config.AgentConfig{ID: botSub.Subscriber.BotID}, action, 1)) } return } @@ -211,6 +180,7 @@ func fromBotConfigs(metricName string, details string, botConfigs []config.Agent Details: details, Value: 1, ShardId: botConfig.ShardID(), + ChainId: int64(botConfig.ChainID), } metrics = append(metrics, metric) } diff --git a/services/components/metrics/metrics.go b/services/components/metrics/metrics.go index 6b46102f..fa0e540f 100644 --- a/services/components/metrics/metrics.go +++ b/services/components/metrics/metrics.go @@ -12,39 +12,6 @@ import ( "github.com/forta-network/forta-node/config" ) -const ( - MetricFinding = "finding" - MetricTxRequest = "tx.request" - MetricTxLatency = "tx.latency" - MetricTxError = "tx.error" - MetricTxSuccess = "tx.success" - MetricTxDrop = "tx.drop" - MetricTxBlockAge = "tx.block.age" - MetricTxEventAge = "tx.event.age" - MetricBlockBlockAge = "block.block.age" - MetricBlockEventAge = "block.event.age" - MetricBlockRequest = "block.request" - MetricBlockLatency = "block.latency" - MetricBlockError = "block.error" - MetricBlockSuccess = "block.success" - MetricBlockDrop = "block.drop" - - MetricJSONRPCLatency = "jsonrpc.latency" - MetricJSONRPCRequest = "jsonrpc.request" - MetricJSONRPCSuccess = "jsonrpc.success" - MetricJSONRPCThrottled = "jsonrpc.throttled" - MetricPublicAPIProxyLatency = "publicapi.latency" - MetricPublicAPIProxyRequest = "publicapi.request" - MetricPublicAPIProxySuccess = "publicapi.success" - MetricPublicAPIProxyThrottled = "publicapi.throttled" - MetricFindingsDropped = "findings.dropped" - MetricCombinerRequest = "combiner.request" - MetricCombinerLatency = "combiner.latency" - MetricCombinerError = "combiner.error" - MetricCombinerSuccess = "combiner.success" - MetricCombinerDrop = "combiner.drop" -) - func SendAgentMetrics(client clients.MessageClient, ms []*protocol.AgentMetric) { if len(ms) > 0 { client.PublishProto(messaging.SubjectMetricAgent, &protocol.AgentMetricList{ @@ -53,13 +20,25 @@ func SendAgentMetrics(client clients.MessageClient, ms []*protocol.AgentMetric) } } -func CreateAgentMetric(agt config.AgentConfig, metric string, value float64) *protocol.AgentMetric { +func CreateAgentMetricV1(agt config.AgentConfig, metric string, value float64) *protocol.AgentMetric { + return &protocol.AgentMetric{ + AgentId: agt.ID, + Timestamp: time.Now().Format(time.RFC3339), + Name: metric, + Value: value, + ShardId: agt.ShardID(), + ChainId: int64(agt.ChainID), + } +} + +func CreateAgentMetricV2(agt config.AgentConfig, metric string, value float64, chainID int64) *protocol.AgentMetric { return &protocol.AgentMetric{ AgentId: agt.ID, Timestamp: time.Now().Format(time.RFC3339), Name: metric, Value: value, ShardId: agt.ShardID(), + ChainId: chainID, } } @@ -93,6 +72,7 @@ func createMetrics(agt config.AgentConfig, timestamp string, metricMap map[strin Name: name, Value: value, ShardId: agt.ShardID(), + ChainId: int64(agt.ChainID), }) } return res @@ -105,16 +85,16 @@ func durationMs(from time.Time, to time.Time) float64 { func GetBlockMetrics(agt config.AgentConfig, resp *protocol.EvaluateBlockResponse, times *domain.TrackingTimestamps) []*protocol.AgentMetric { metrics := make(map[string]float64) - metrics[MetricBlockRequest] = 1 - metrics[MetricFinding] = float64(len(resp.Findings)) - metrics[MetricBlockLatency] = float64(resp.LatencyMs) - metrics[MetricBlockBlockAge] = durationMs(times.Block, times.BotRequest) - metrics[MetricBlockEventAge] = durationMs(times.Feed, times.BotRequest) + metrics[domain.MetricBlockRequest] = 1 + metrics[domain.MetricFinding] = float64(len(resp.Findings)) + metrics[domain.MetricBlockLatency] = float64(resp.LatencyMs) + metrics[domain.MetricBlockBlockAge] = durationMs(times.Block, times.BotRequest) + metrics[domain.MetricBlockEventAge] = durationMs(times.Feed, times.BotRequest) if resp.Status == protocol.ResponseStatus_ERROR { - metrics[MetricBlockError] = 1 + metrics[domain.MetricBlockError] = 1 } else if resp.Status == protocol.ResponseStatus_SUCCESS { - metrics[MetricBlockSuccess] = 1 + metrics[domain.MetricBlockSuccess] = 1 } return createMetrics(agt, resp.Timestamp, metrics) @@ -123,16 +103,16 @@ func GetBlockMetrics(agt config.AgentConfig, resp *protocol.EvaluateBlockRespons func GetTxMetrics(agt config.AgentConfig, resp *protocol.EvaluateTxResponse, times *domain.TrackingTimestamps) []*protocol.AgentMetric { metrics := make(map[string]float64) - metrics[MetricTxRequest] = 1 - metrics[MetricFinding] = float64(len(resp.Findings)) - metrics[MetricTxLatency] = float64(resp.LatencyMs) - metrics[MetricTxBlockAge] = durationMs(times.Block, times.BotRequest) - metrics[MetricTxEventAge] = durationMs(times.Feed, times.BotRequest) + metrics[domain.MetricTxRequest] = 1 + metrics[domain.MetricFinding] = float64(len(resp.Findings)) + metrics[domain.MetricTxLatency] = float64(resp.LatencyMs) + metrics[domain.MetricTxBlockAge] = durationMs(times.Block, times.BotRequest) + metrics[domain.MetricTxEventAge] = durationMs(times.Feed, times.BotRequest) if resp.Status == protocol.ResponseStatus_ERROR { - metrics[MetricTxError] = 1 + metrics[domain.MetricTxError] = 1 } else if resp.Status == protocol.ResponseStatus_SUCCESS { - metrics[MetricTxSuccess] = 1 + metrics[domain.MetricTxSuccess] = 1 } return createMetrics(agt, resp.Timestamp, metrics) @@ -141,14 +121,14 @@ func GetTxMetrics(agt config.AgentConfig, resp *protocol.EvaluateTxResponse, tim func GetCombinerMetrics(agt config.AgentConfig, resp *protocol.EvaluateAlertResponse, times *domain.TrackingTimestamps) []*protocol.AgentMetric { metrics := make(map[string]float64) - metrics[MetricCombinerRequest] = 1 - metrics[MetricFinding] = float64(len(resp.Findings)) - metrics[MetricCombinerLatency] = float64(resp.LatencyMs) + metrics[domain.MetricCombinerRequest] = 1 + metrics[domain.MetricFinding] = float64(len(resp.Findings)) + metrics[domain.MetricCombinerLatency] = float64(resp.LatencyMs) if resp.Status == protocol.ResponseStatus_ERROR { - metrics[MetricCombinerError] = 1 + metrics[domain.MetricCombinerError] = 1 } else if resp.Status == protocol.ResponseStatus_SUCCESS { - metrics[MetricCombinerSuccess] = 1 + metrics[domain.MetricCombinerSuccess] = 1 } return createMetrics(agt, resp.Timestamp, metrics) @@ -157,15 +137,15 @@ func GetCombinerMetrics(agt config.AgentConfig, resp *protocol.EvaluateAlertResp func GetJSONRPCMetrics(agt config.AgentConfig, at time.Time, success, throttled int, latencyMs time.Duration, method string) []*protocol.AgentMetric { values := make(map[string]float64) if latencyMs > 0 { - values[MetricJSONRPCLatency] = float64(latencyMs.Milliseconds()) + values[domain.MetricJSONRPCLatency] = float64(latencyMs.Milliseconds()) } if success > 0 { - values[MetricJSONRPCSuccess] = float64(success) - values[MetricJSONRPCRequest] += float64(success) + values[domain.MetricJSONRPCSuccess] = float64(success) + values[domain.MetricJSONRPCRequest] += float64(success) } if throttled > 0 { - values[MetricJSONRPCThrottled] = float64(throttled) - values[MetricJSONRPCRequest] += float64(throttled) + values[domain.MetricJSONRPCThrottled] = float64(throttled) + values[domain.MetricJSONRPCRequest] += float64(throttled) } return createJsonRpcMetrics(agt, at.Format(time.RFC3339), values, method) } @@ -180,23 +160,24 @@ func createJsonRpcMetrics(agt config.AgentConfig, timestamp string, metricMap ma Name: fmt.Sprintf("%s.%s", name, method), Value: value, ShardId: agt.ShardID(), + ChainId: int64(agt.ChainID), }) } return res } -func GetPublicAPIMetrics(botID string, at time.Time, success, throttled int, latencyMs time.Duration) []*protocol.AgentMetric { +func GetPublicAPIMetrics(botID string, at time.Time, success, throttled int, latency time.Duration) []*protocol.AgentMetric { values := make(map[string]float64) - if latencyMs > 0 { - values[MetricPublicAPIProxyLatency] = float64(latencyMs.Milliseconds()) + if latency > 0 { + values[domain.MetricPublicAPIProxyLatency] = float64(latency.Milliseconds()) } if success > 0 { - values[MetricPublicAPIProxySuccess] = float64(success) - values[MetricPublicAPIProxyRequest] += float64(success) + values[domain.MetricPublicAPIProxySuccess] = float64(success) + values[domain.MetricPublicAPIProxyRequest] += float64(success) } if throttled > 0 { - values[MetricPublicAPIProxyThrottled] = float64(throttled) - values[MetricPublicAPIProxyRequest] += float64(throttled) + values[domain.MetricPublicAPIProxyThrottled] = float64(throttled) + values[domain.MetricPublicAPIProxyRequest] += float64(throttled) } //TODO: get the shardID into this eventually return createMetrics(config.AgentConfig{ID: botID}, at.Format(time.RFC3339), values) diff --git a/services/publisher/metrics.go b/services/publisher/metrics.go index cd24219e..797b15a7 100644 --- a/services/publisher/metrics.go +++ b/services/publisher/metrics.go @@ -12,19 +12,25 @@ import ( // AgentMetricsAggregator aggregates agents' metrics and produces a list of summary of them when flushed. type AgentMetricsAggregator struct { - buckets []*metricsBucket - bucketInterval time.Duration - lastFlush time.Time - mu sync.RWMutex + // int64 is a chain id + bucketsByChainID map[int64][]*metricsBucket + chainID int64 + bucketInterval time.Duration + lastFlush time.Time + mu sync.RWMutex } type metricsBucket struct { - Time time.Time - MetricCounters map[string][]uint32 - MetricDetails map[string]string + Time time.Time + MetricsData map[string]metricsData protocol.AgentMetrics } +type metricsData struct { + Counters []uint32 + Details string +} + func (mb *metricsBucket) CreateAndGetSummary(name string) *protocol.MetricSummary { for _, summary := range mb.Metrics { if summary.Name == name { @@ -37,17 +43,24 @@ func (mb *metricsBucket) CreateAndGetSummary(name string) *protocol.MetricSummar } // NewAgentMetricsAggregator creates a new agent metrics aggregator. -func NewMetricsAggregator(bucketInterval time.Duration) *AgentMetricsAggregator { +func NewMetricsAggregator(bucketInterval time.Duration, chainID int64) *AgentMetricsAggregator { return &AgentMetricsAggregator{ - mu: sync.RWMutex{}, - bucketInterval: bucketInterval, - lastFlush: time.Now(), // avoid flushing immediately + mu: sync.RWMutex{}, + bucketInterval: bucketInterval, + lastFlush: time.Now(), // avoid flushing immediately + bucketsByChainID: make(map[int64][]*metricsBucket), + chainID: chainID, } } -func (ama *AgentMetricsAggregator) findBucket(agentID string, t time.Time) *metricsBucket { +func (ama *AgentMetricsAggregator) findBucket(agentID string, chainID int64, t time.Time) *metricsBucket { bucketTime := ama.FindClosestBucketTime(t) - for _, bucket := range ama.buckets { + buckets, ok := ama.bucketsByChainID[chainID] + if !ok { + ama.bucketsByChainID[chainID] = make([]*metricsBucket, 0) + } + + for _, bucket := range buckets { if bucket.AgentId != agentID { continue } @@ -56,14 +69,15 @@ func (ama *AgentMetricsAggregator) findBucket(agentID string, t time.Time) *metr } return bucket } + bucket := &metricsBucket{ - Time: bucketTime, - MetricCounters: make(map[string][]uint32), - MetricDetails: make(map[string]string), + Time: bucketTime, + MetricsData: make(map[string]metricsData), } + bucket.AgentId = agentID bucket.Timestamp = utils.FormatTime(bucketTime) - ama.buckets = append(ama.buckets, bucket) + ama.bucketsByChainID[chainID] = append(ama.bucketsByChainID[chainID], bucket) return bucket } @@ -75,20 +89,26 @@ func (ama *AgentMetricsAggregator) FindClosestBucketTime(t time.Time) time.Time return time.Unix(0, ts-rem) } -type agentResponse protocol.EvaluateTxResponse - func (ama *AgentMetricsAggregator) AddAgentMetrics(ms *protocol.AgentMetricList) error { ama.mu.Lock() defer ama.mu.Unlock() for _, m := range ms.Metrics { t, _ := time.Parse(time.RFC3339, m.Timestamp) - bucket := ama.findBucket(m.AgentId, t) - bucket.MetricCounters[m.Name] = append(bucket.MetricCounters[m.Name], uint32(m.Value)) - if m.Details != "" { - bucket.MetricDetails[m.Name] = m.Details + + // add chain id if it's not set, e.g. EventMetric + chainID := ama.chainID + if m.ChainId != 0 { + chainID = m.ChainId + } + + bucket := ama.findBucket(m.AgentId, chainID, t) + bucket.MetricsData[m.Name] = metricsData{ + Counters: append(bucket.MetricsData[m.Name].Counters, uint32(m.Value)), + Details: m.Details, } } + return nil } @@ -100,14 +120,16 @@ func (ama *AgentMetricsAggregator) ForceFlush() []*protocol.AgentMetrics { now := time.Now() ama.lastFlush = now - buckets := ama.buckets - ama.buckets = nil + buckets := ama.bucketsByChainID + ama.bucketsByChainID = make(map[int64][]*metricsBucket) (allAgentMetrics)(buckets).Fix() var allMetrics []*protocol.AgentMetrics - for _, bucket := range buckets { - allMetrics = append(allMetrics, &bucket.AgentMetrics) + for _, metricsBuckets := range buckets { + for _, bucket := range metricsBuckets { + allMetrics = append(allMetrics, &bucket.AgentMetrics) + } } return allMetrics @@ -124,14 +146,16 @@ func (ama *AgentMetricsAggregator) TryFlush() ([]*protocol.AgentMetrics, bool) { } ama.lastFlush = now - buckets := ama.buckets - ama.buckets = nil + buckets := ama.bucketsByChainID + ama.bucketsByChainID = make(map[int64][]*metricsBucket) (allAgentMetrics)(buckets).Fix() var allMetrics []*protocol.AgentMetrics - for _, bucket := range buckets { - allMetrics = append(allMetrics, &bucket.AgentMetrics) + for _, metricsBuckets := range buckets { + for _, bucket := range metricsBuckets { + allMetrics = append(allMetrics, &bucket.AgentMetrics) + } } return allMetrics, true @@ -139,27 +163,30 @@ func (ama *AgentMetricsAggregator) TryFlush() ([]*protocol.AgentMetrics, bool) { // allAgentMetrics is an alias type for post-processing aggregated in-memory metrics // before we publish them. -type allAgentMetrics []*metricsBucket +type allAgentMetrics map[int64][]*metricsBucket func (allMetrics allAgentMetrics) Fix() { - sort.Slice(allMetrics, func(i, j int) bool { - return allMetrics[i].Time.Before(allMetrics[j].Time) - }) + for _, metricsBuckets := range allMetrics { + sort.Slice(metricsBuckets, func(i, j int) bool { + return metricsBuckets[i].Time.Before(metricsBuckets[j].Time) + }) + } allMetrics.PrepareMetrics() } func (allMetrics allAgentMetrics) PrepareMetrics() { - for _, agentMetrics := range allMetrics { - for metricName, list := range agentMetrics.MetricCounters { - if len(list) > 0 { - summary := agentMetrics.CreateAndGetSummary(metricName) - summary.Count = uint32(len(list)) - summary.Average = avgMetricArray(list) - summary.Max = maxDataPoint(list) - summary.P95 = calcP95(list) - summary.Sum = sumNums(list) - if details, ok := agentMetrics.MetricDetails[metricName]; ok { - summary.Details = details + for chainID, metricsBuckets := range allMetrics { + for _, agentMetrics := range metricsBuckets { + for metricName, data := range agentMetrics.MetricsData { + if len(data.Counters) > 0 { + summary := agentMetrics.CreateAndGetSummary(metricName) + summary.Count = uint32(len(data.Counters)) + summary.Average = avgMetricArray(data.Counters) + summary.Max = maxDataPoint(data.Counters) + summary.P95 = calcP95(data.Counters) + summary.Sum = sumNums(data.Counters) + summary.Details = data.Details + summary.ChainId = chainID } } } diff --git a/services/publisher/metrics_test.go b/services/publisher/metrics_test.go index 40643815..3b8b8284 100644 --- a/services/publisher/metrics_test.go +++ b/services/publisher/metrics_test.go @@ -23,7 +23,6 @@ type MetricsMathTest struct { } func TestAgentMetricsAggregator_math(t *testing.T) { - tests := []*MetricsMathTest{ { metrics: []float64{1, 2, 3, 4, 5}, @@ -34,6 +33,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { Average: 3, Sum: 15, P95: 4, + ChainId: 100, }, }, { @@ -45,6 +45,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { Average: 15, Sum: 45, P95: 10, + ChainId: 100, }, }, { @@ -56,6 +57,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { Average: 45, Sum: 45, P95: 45, + ChainId: 100, }, }, { @@ -69,6 +71,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { Average: 45, Sum: 45, P95: 45, + ChainId: 100, }, }, } @@ -87,7 +90,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { }) } - aggregator := publisher.NewMetricsAggregator(testBucketInterval) + aggregator := publisher.NewMetricsAggregator(testBucketInterval, 100) err := aggregator.AddAgentMetrics(&protocol.AgentMetricList{Metrics: metrics}) assert.NoError(t, err) time.Sleep(testBucketInterval * 2) diff --git a/services/publisher/publisher.go b/services/publisher/publisher.go index 91cbb076..a3fe463c 100644 --- a/services/publisher/publisher.go +++ b/services/publisher/publisher.go @@ -930,7 +930,7 @@ func initPublisher( cfg: cfg, ipfs: ipfsClient, storage: storageClient, - metricsAggregator: NewMetricsAggregator(time.Duration(*cfg.PublisherConfig.Batch.MetricsBucketIntervalSeconds) * time.Second), + metricsAggregator: NewMetricsAggregator(time.Duration(*cfg.PublisherConfig.Batch.MetricsBucketIntervalSeconds)*time.Second, int64(cfg.ChainID)), messageClient: mc, alertClient: alertClient, localAlertClient: localAlertClient, diff --git a/tests/e2e/.ipfs/config b/tests/e2e/.ipfs/config index 679d27ff..c6743a60 100644 --- a/tests/e2e/.ipfs/config +++ b/tests/e2e/.ipfs/config @@ -150,4 +150,4 @@ "RemoteServices": {} }, "Internal": {} -} \ No newline at end of file +} diff --git a/tests/e2e/e2e_forta_local_mode_test.go b/tests/e2e/e2e_forta_local_mode_test.go index 3df5a6c9..e2ffd535 100644 --- a/tests/e2e/e2e_forta_local_mode_test.go +++ b/tests/e2e/e2e_forta_local_mode_test.go @@ -10,8 +10,8 @@ import ( "time" "github.com/forta-network/forta-core-go/clients/webhook/client/models" + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/security" - "github.com/forta-network/forta-node/services/components/metrics" "github.com/forta-network/forta-node/tests/e2e" "github.com/forta-network/forta-node/tests/e2e/agents/combinerbot/combinerbotalertid" "github.com/forta-network/forta-node/tests/e2e/agents/txdetectoragent/testbotalertid" @@ -279,7 +279,7 @@ func (s *Suite) runLocalModeAlertHandler(webhookURL, logFileName string, readAle if strings.Contains(summary.Name, "agent.health") { s.T().Logf("contains metric: %s", summary.Name) } - if summary.Name == metrics.MetricHealthCheckSuccess { + if summary.Name == domain.MetricHealthCheckSuccess { healthCheckMetric = summary break }