From 13b0bd798e929d69d229fc3adfec22b9acbf8132 Mon Sep 17 00:00:00 2001 From: dkeysil Date: Thu, 18 Jan 2024 14:48:23 +0100 Subject: [PATCH 1/3] Imported new version of forta-core-go, supported passing chain id in AgentMetrics, supported metrics in the health request Datapoints int64 -> float64 Bump forta-core-go to use int64 for chain id, add chain id to metrics that doesn't have it Add metrics to bothttp/client_test Add check for exceeded response size Remove comment Add chain id from bot config in lifecycle metrics Remove unnecessary metric Pass chain id from alert config, remove todo, fix disco configuration Add TODO Move chain id fallback to metrics aggregator Use forta-core-go metrics names --- clients/bothttp/client.go | 48 +++-- clients/bothttp/client_test.go | 35 +++- go.mod | 2 +- go.sum | 4 +- services/components/botio/bot_client.go | 27 ++- services/components/botio/sender.go | 7 +- services/components/lifecycle/bot_manager.go | 4 +- services/components/lifecycle/bot_monitor.go | 3 +- .../components/lifecycle/bot_monitor_test.go | 4 +- services/components/metrics/activity.go | 5 +- services/components/metrics/lifecycle.go | 80 +++----- services/components/metrics/metrics.go | 111 +++++------ services/publisher/metrics.go | 119 +++++++----- services/publisher/metrics_test.go | 7 +- services/publisher/publisher.go | 2 +- tests/e2e/.ipfs/config | 176 ++++++++---------- tests/e2e/.ipfs/version | 2 +- tests/e2e/disco.config.yml | 4 +- tests/e2e/e2e_forta_local_mode_test.go | 4 +- 19 files changed, 342 insertions(+), 302 deletions(-) diff --git a/clients/bothttp/client.go b/clients/bothttp/client.go index 7adfd893..55be9af6 100644 --- a/clients/bothttp/client.go +++ b/clients/bothttp/client.go @@ -5,19 +5,34 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" + "strings" "github.com/forta-network/forta-core-go/utils/httpclient" "github.com/hashicorp/go-multierror" + log "github.com/sirupsen/logrus" +) + +var ( + // responseSizeLimit is the maximum number of bytes to read from the response body. + responseSizeLimit = int64(2 << 20) // 2MB ) type HealthResponse struct { - Errors []string `json:"errors"` + Errors []string `json:"errors"` + Metrics []Metrics `json:"metrics"` +} + +type Metrics struct { + // ChainID is the id of the chain the metrics are for + ChainID int64 `json:"chainId"` + DataPoints map[string][]float64 `json:"dataPoints"` } // Client is the bot HTTP client interface. type Client interface { - Health(ctx context.Context) error + Health(ctx context.Context) ([]Metrics, error) } type botClient struct { @@ -25,7 +40,7 @@ type botClient struct { httpClient *http.Client } -// NewClient creates anew client. +// NewClient creates a new client. func NewClient(host string, port int) Client { return &botClient{ baseUrl: fmt.Sprintf("http://%s:%d", host, port), @@ -34,31 +49,42 @@ func NewClient(host string, port int) Client { } // Health does a health check on the bot. -func (bc *botClient) Health(ctx context.Context) error { +func (bc *botClient) Health(ctx context.Context) ([]Metrics, error) { healthUrl := fmt.Sprintf("%s/health", bc.baseUrl) req, err := http.NewRequestWithContext(ctx, "GET", healthUrl, nil) if err != nil { - return err + return nil, err } + resp, err := bc.httpClient.Do(req) if err != nil { - return err + return nil, err } defer resp.Body.Close() if resp.StatusCode > 200 { - return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } var healthResp HealthResponse - if err := json.NewDecoder(resp.Body).Decode(&healthResp); err != nil { - return nil // ignore decoding errors + + // Limit the response size to a certain number of bytes + limitedReader := io.LimitReader(resp.Body, responseSizeLimit) + if err := json.NewDecoder(limitedReader).Decode(&healthResp); err != nil { + if strings.Contains(err.Error(), "EOF") { + log.WithError(err).Warn("response size limit for health check is reached") + } + + return nil, nil // ignore decoding errors } + if len(healthResp.Errors) == 0 { - return nil + return healthResp.Metrics, nil } + for _, errMsg := range healthResp.Errors { err = multierror.Append(err, errors.New(errMsg)) } - return err + + return healthResp.Metrics, err } diff --git a/clients/bothttp/client_test.go b/clients/bothttp/client_test.go index d2f6fe81..cdec5181 100644 --- a/clients/bothttp/client_test.go +++ b/clients/bothttp/client_test.go @@ -6,6 +6,7 @@ import ( "net/http" "testing" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/require" ) @@ -28,12 +29,42 @@ func TestHealth(t *testing.T) { go server.ListenAndServe() client := NewClient("localhost", 8183) - err := client.Health(context.Background()) + _, err := client.Health(context.Background()) r.NoError(err) respData.Errors = append(respData.Errors, "some error msg") - err = client.Health(context.Background()) + _, err = client.Health(context.Background()) r.Error(err) + respData = HealthResponse{ + Metrics: []Metrics{ + { + ChainID: 1, + DataPoints: map[string][]float64{ + "tx.success": {1, 2, 3}, + }, + }, + { + ChainID: 2, + DataPoints: map[string][]float64{ + "tx.success": {3}, + }, + }, + }, + } + + hook := test.NewGlobal() + + metrics, err := client.Health(context.Background()) + r.NoError(err) + r.EqualValues(respData.Metrics, metrics) + + responseSizeLimit = 1 + + _, err = client.Health(context.Background()) + r.NoError(err) + r.Equal(1, len(hook.Entries)) + r.Equal("response size limit for health check is reached", hook.LastEntry().Message) + server.Close() } diff --git a/go.mod b/go.mod index da59a55a..19613357 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible require ( github.com/docker/docker v1.6.2 github.com/docker/go-connections v0.4.0 - github.com/forta-network/forta-core-go v0.0.0-20240110111218-b4b8774d2bb6 + github.com/forta-network/forta-core-go v0.0.0-20240122094409-b30b8f2dcbea github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.39.0 diff --git a/go.sum b/go.sum index c231de03..3c978e8c 100644 --- a/go.sum +++ b/go.sum @@ -329,8 +329,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= -github.com/forta-network/forta-core-go v0.0.0-20240110111218-b4b8774d2bb6 h1:fDydEruBxtBYrWG5ct+YPwXuJiuUrT+QRU6A8Xz3ids= -github.com/forta-network/forta-core-go v0.0.0-20240110111218-b4b8774d2bb6/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= +github.com/forta-network/forta-core-go v0.0.0-20240122094409-b30b8f2dcbea h1:s16Te1XGVkJPxfqAFkgApQaEhrTmz7JpVXhVE2RW+vE= +github.com/forta-network/forta-core-go v0.0.0-20240122094409-b30b8f2dcbea/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= diff --git a/services/components/botio/bot_client.go b/services/components/botio/bot_client.go index 855697bc..adb70ee0 100644 --- a/services/components/botio/bot_client.go +++ b/services/components/botio/bot_client.go @@ -545,7 +545,7 @@ func (bot *botClient) processTransaction(ctx context.Context, lg *log.Entry, req // truncate findings if len(resp.Findings) > MaxFindings { dropped := len(resp.Findings) - MaxFindings - droppedMetric := metrics.CreateAgentMetric(botConfig, metrics.MetricFindingsDropped, float64(dropped)) + droppedMetric := metrics.CreateAgentMetricV1(botConfig, domain.MetricFindingsDropped, float64(dropped)) bot.msgClient.PublishProto( messaging.SubjectMetricAgent, &protocol.AgentMetricList{Metrics: []*protocol.AgentMetric{droppedMetric}}, @@ -613,8 +613,8 @@ func (bot *botClient) processBlock(ctx context.Context, lg *log.Entry, request * // truncate findings if len(resp.Findings) > MaxFindings { dropped := len(resp.Findings) - MaxFindings - droppedMetric := metrics.CreateAgentMetric( - botConfig, metrics.MetricFindingsDropped, float64(dropped), + droppedMetric := metrics.CreateAgentMetricV1( + botConfig, domain.MetricFindingsDropped, float64(dropped), ) bot.msgClient.PublishProto( messaging.SubjectMetricAgent, @@ -706,7 +706,7 @@ func (bot *botClient) processCombinationAlert(ctx context.Context, lg *log.Entry // truncate findings if len(resp.Findings) > MaxFindings { dropped := len(resp.Findings) - MaxFindings - droppedMetric := metrics.CreateAgentMetric(botConfig, metrics.MetricFindingsDropped, float64(dropped)) + droppedMetric := metrics.CreateAgentMetricV1(botConfig, domain.MetricFindingsDropped, float64(dropped)) bot.msgClient.PublishProto( messaging.SubjectMetricAgent, &protocol.AgentMetricList{Metrics: []*protocol.AgentMetric{droppedMetric}}, ) @@ -754,7 +754,24 @@ func (bot *botClient) doHealthCheck(ctx context.Context, lg *log.Entry) bool { var err error if botConfig.ProtocolVersion >= 2 { - err = bot.clientV2.Health(ctx) + var botMetrics []bothttp.Metrics + botMetrics, err = bot.clientV2.Health(ctx) + + if len(botMetrics) != 0 { + agentMetrics := make([]*protocol.AgentMetric, 0, len(botMetrics)) + for _, botMetric := range botMetrics { + for metricName, metricValues := range botMetric.DataPoints { + for _, metricValue := range metricValues { + agentMetrics = append(agentMetrics, metrics.CreateAgentMetricV2(botConfig, metricName, metricValue, botMetric.ChainID)) + } + } + } + + bot.msgClient.PublishProto( + messaging.SubjectMetricAgent, + &protocol.AgentMetricList{Metrics: agentMetrics}, + ) + } } else { err = botClient.DoHealthCheck(ctx) } diff --git a/services/components/botio/sender.go b/services/components/botio/sender.go index 337214f6..f4ad1e2f 100644 --- a/services/components/botio/sender.go +++ b/services/components/botio/sender.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/forta-network/forta-core-go/clients/health" + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" "github.com/forta-network/forta-node/clients" "github.com/forta-network/forta-node/clients/messaging" @@ -115,7 +116,7 @@ func (rs *requestSender) SendEvaluateTxRequest(req *protocol.EvaluateTxRequest) }: default: // do not try to send if the buffer is full lg.WithField("bot", botConfig.ID).Debug("agent tx request buffer is full - skipping") - metricsList = append(metricsList, metrics.CreateAgentMetric(botConfig, metrics.MetricTxDrop, 1)) + metricsList = append(metricsList, metrics.CreateAgentMetricV1(botConfig, domain.MetricTxDrop, 1)) } lg.WithFields(log.Fields{ "bot": botConfig.ID, @@ -164,7 +165,7 @@ func (rs *requestSender) SendEvaluateBlockRequest(req *protocol.EvaluateBlockReq }: default: // do not try to send if the buffer is full lg.WithField("bot", botConfig.ID).Warn("agent block request buffer is full - skipping") - metricsList = append(metricsList, metrics.CreateAgentMetric(botConfig, metrics.MetricBlockDrop, 1)) + metricsList = append(metricsList, metrics.CreateAgentMetricV1(botConfig, domain.MetricBlockDrop, 1)) } lg.WithFields( log.Fields{ @@ -249,7 +250,7 @@ func (rs *requestSender) SendEvaluateAlertRequest(req *protocol.EvaluateAlertReq }: default: // do not try to send if the buffer is full lg.WithField("bot", botConfig.ID).Warn("agent alert request buffer is full - skipping") - metricsList = append(metricsList, metrics.CreateAgentMetric(botConfig, metrics.MetricCombinerDrop, 1)) + metricsList = append(metricsList, metrics.CreateAgentMetricV1(botConfig, domain.MetricCombinerDrop, 1)) } lg.WithFields( diff --git a/services/components/lifecycle/bot_manager.go b/services/components/lifecycle/bot_manager.go index 59e54f0e..ecc268ba 100644 --- a/services/components/lifecycle/bot_manager.go +++ b/services/components/lifecycle/bot_manager.go @@ -208,9 +208,7 @@ func (blm *botLifecycleManager) ExitInactiveBots(ctx context.Context) error { botConfig, found := blm.findBotConfigByID(inactiveBotID) logger := log.WithField("bot", inactiveBotID) if !found { - logger.Warn("could not find the config for inactive bot - skipping stop") - // send this metric by ID, because it doesn't have a shard ID (since it's not found) - blm.lifecycleMetrics.StatusInactive(config.AgentConfig{ID: inactiveBotID}) + logger.Warn("could not find the config for inactive bot - skipping stop", inactiveBotID) continue } diff --git a/services/components/lifecycle/bot_monitor.go b/services/components/lifecycle/bot_monitor.go index 0b860912..a620f27c 100644 --- a/services/components/lifecycle/bot_monitor.go +++ b/services/components/lifecycle/bot_monitor.go @@ -3,6 +3,7 @@ package lifecycle import ( "sync" + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" "github.com/forta-network/forta-node/services/components/metrics" ) @@ -49,7 +50,7 @@ func (bm *botMonitor) UpdateWithMetrics(botMetrics *protocol.AgentMetricList) er } for _, botMetric := range botMetrics.Metrics { - if botMetric.Name == metrics.MetricStatusActive { + if botMetric.Name == domain.MetricStatusActive { bm.saveBotActivity(botMetric.AgentId) } } diff --git a/services/components/lifecycle/bot_monitor_test.go b/services/components/lifecycle/bot_monitor_test.go index 41538232..f82d74e5 100644 --- a/services/components/lifecycle/bot_monitor_test.go +++ b/services/components/lifecycle/bot_monitor_test.go @@ -4,8 +4,8 @@ import ( "testing" "time" + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" - "github.com/forta-network/forta-node/services/components/metrics" mock_metrics "github.com/forta-network/forta-node/services/components/metrics/mocks" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -51,7 +51,7 @@ func TestBotMonitor(t *testing.T) { r.NoError(botMonitor.UpdateWithMetrics(&protocol.AgentMetricList{ Metrics: []*protocol.AgentMetric{ { - Name: metrics.MetricStatusActive, + Name: domain.MetricStatusActive, AgentId: testTrackerBotID2, }, }, diff --git a/services/components/metrics/activity.go b/services/components/metrics/activity.go index 1b05f7dc..99cd4376 100644 --- a/services/components/metrics/activity.go +++ b/services/components/metrics/activity.go @@ -1,6 +1,7 @@ package metrics import ( + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" "github.com/forta-network/forta-node/config" ) @@ -10,9 +11,9 @@ func FindActiveBotsFromMetrics(allBotMetrics []*protocol.AgentMetrics) (found [] for _, botMetrics := range allBotMetrics { botID := botMetrics.AgentId for _, botMetric := range botMetrics.Metrics { - if botMetric.Name == MetricHealthCheckSuccess { + if botMetric.Name == domain.MetricHealthCheckSuccess { // copy over shardID value so metric will indicate shard - cfg := &config.AgentConfig{ID: botID} + cfg := &config.AgentConfig{ID: botID, ChainID: int(botMetric.ChainId)} if botMetric.ShardId >= 0 { cfg.ShardConfig = &config.ShardConfig{ShardID: uint(botMetric.ShardId)} } diff --git a/services/components/metrics/lifecycle.go b/services/components/metrics/lifecycle.go index a21255c8..db41e4b4 100644 --- a/services/components/metrics/lifecycle.go +++ b/services/components/metrics/lifecycle.go @@ -11,37 +11,6 @@ import ( "github.com/forta-network/forta-node/config" ) -// Bot lifecycle metrics -const ( - MetricClientDial = "agent.client.dial" - MetricClientClose = "agent.client.close" - - MetricStatusRunning = "agent.status.running" - MetricStatusAttached = "agent.status.attached" - MetricStatusInitialized = "agent.status.initialized" - MetricStatusStopping = "agent.status.stopping" - MetricStatusActive = "agent.status.active" - MetricStatusInactive = "agent.status.inactive" - - MetricActionUpdate = "agent.action.update" - MetricActionRestart = "agent.action.restart" - MetricActionSubscribe = "agent.action.subscribe" - MetricActionUnsubscribe = "agent.action.unsubscribe" - - MetricFailurePull = "agent.failure.pull" - MetricFailureLaunch = "agent.failure.launch" - MetricFailureStop = "agent.failure.stop" - MetricFailureDial = "agent.failure.dial" - MetricFailureInitialize = "agent.failure.initialize" - MetricFailureInitializeResponse = "agent.failure.initialize.response" - MetricFailureInitializeValidate = "agent.failure.initialize.validate" - MetricFailureTooManyErrs = "agent.failure.too-many-errs" - - MetricHealthCheckAttempt = "agent.health.attempt" - MetricHealthCheckSuccess = "agent.health.success" - MetricHealthCheckError = "agent.health.error" -) - // Lifecycle creates lifecycle metrics. It is useful in // understanding what is going on during lifecycle management. type Lifecycle interface { @@ -91,83 +60,83 @@ func NewLifecycleClient(msgClient clients.MessageClient) Lifecycle { } func (lc *lifecycle) ClientDial(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricClientDial, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricClientDial, "", botConfigs)) } func (lc *lifecycle) ClientClose(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricClientClose, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricClientClose, "", botConfigs)) } func (lc *lifecycle) StatusRunning(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusRunning, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusRunning, "", botConfigs)) } func (lc *lifecycle) StatusAttached(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusAttached, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusAttached, "", botConfigs)) } func (lc *lifecycle) StatusInitialized(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusInitialized, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusInitialized, "", botConfigs)) } func (lc *lifecycle) StatusStopping(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusStopping, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusStopping, "", botConfigs)) } func (lc *lifecycle) StatusActive(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusActive, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusActive, "", botConfigs)) } func (lc *lifecycle) StatusInactive(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricStatusInactive, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricStatusInactive, "", botConfigs)) } func (lc *lifecycle) ActionUpdate(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricActionUpdate, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricActionUpdate, "", botConfigs)) } func (lc *lifecycle) ActionRestart(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricActionRestart, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricActionRestart, "", botConfigs)) } func (lc *lifecycle) ActionSubscribe(subscriptions []domain.CombinerBotSubscription) { - SendAgentMetrics(lc.msgClient, fromBotSubscriptions(MetricActionSubscribe, subscriptions)) + SendAgentMetrics(lc.msgClient, fromBotSubscriptions(domain.MetricActionSubscribe, subscriptions)) } func (lc *lifecycle) ActionUnsubscribe(subscriptions []domain.CombinerBotSubscription) { - SendAgentMetrics(lc.msgClient, fromBotSubscriptions(MetricActionUnsubscribe, subscriptions)) + SendAgentMetrics(lc.msgClient, fromBotSubscriptions(domain.MetricActionUnsubscribe, subscriptions)) } func (lc *lifecycle) FailurePull(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailurePull, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailurePull, err.Error(), botConfigs)) } func (lc *lifecycle) FailureLaunch(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureLaunch, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureLaunch, err.Error(), botConfigs)) } func (lc *lifecycle) FailureStop(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureStop, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureStop, err.Error(), botConfigs)) } func (lc *lifecycle) FailureDial(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureDial, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureDial, err.Error(), botConfigs)) } func (lc *lifecycle) FailureInitialize(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureInitialize, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureInitialize, err.Error(), botConfigs)) } func (lc *lifecycle) FailureInitializeResponse(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureInitializeResponse, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureInitializeResponse, err.Error(), botConfigs)) } func (lc *lifecycle) FailureInitializeValidate(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureInitializeValidate, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureInitializeValidate, err.Error(), botConfigs)) } func (lc *lifecycle) FailureTooManyErrs(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricFailureTooManyErrs, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricFailureTooManyErrs, err.Error(), botConfigs)) } func (lc *lifecycle) BotError(metricName string, err error, botConfigs ...config.AgentConfig) { @@ -183,20 +152,20 @@ func (lc *lifecycle) SystemStatus(metricName string, details string) { } func (lc *lifecycle) HealthCheckAttempt(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricHealthCheckAttempt, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricHealthCheckAttempt, "", botConfigs)) } func (lc *lifecycle) HealthCheckSuccess(botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricHealthCheckSuccess, "", botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricHealthCheckSuccess, "", botConfigs)) } func (lc *lifecycle) HealthCheckError(err error, botConfigs ...config.AgentConfig) { - SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricHealthCheckError, err.Error(), botConfigs)) + SendAgentMetrics(lc.msgClient, fromBotConfigs(domain.MetricHealthCheckError, err.Error(), botConfigs)) } func fromBotSubscriptions(action string, subscriptions []domain.CombinerBotSubscription) (metrics []*protocol.AgentMetric) { for _, botSub := range subscriptions { - metrics = append(metrics, CreateAgentMetric(config.AgentConfig{ID: botSub.Subscriber.BotID}, action, 1)) + metrics = append(metrics, CreateAgentMetricV1(config.AgentConfig{ID: botSub.Subscriber.BotID}, action, 1)) } return } @@ -211,6 +180,7 @@ func fromBotConfigs(metricName string, details string, botConfigs []config.Agent Details: details, Value: 1, ShardId: botConfig.ShardID(), + ChainId: int64(botConfig.ChainID), } metrics = append(metrics, metric) } diff --git a/services/components/metrics/metrics.go b/services/components/metrics/metrics.go index 134e16a0..c001a234 100644 --- a/services/components/metrics/metrics.go +++ b/services/components/metrics/metrics.go @@ -12,39 +12,6 @@ import ( "github.com/forta-network/forta-node/config" ) -const ( - MetricFinding = "finding" - MetricTxRequest = "tx.request" - MetricTxLatency = "tx.latency" - MetricTxError = "tx.error" - MetricTxSuccess = "tx.success" - MetricTxDrop = "tx.drop" - MetricTxBlockAge = "tx.block.age" - MetricTxEventAge = "tx.event.age" - MetricBlockBlockAge = "block.block.age" - MetricBlockEventAge = "block.event.age" - MetricBlockRequest = "block.request" - MetricBlockLatency = "block.latency" - MetricBlockError = "block.error" - MetricBlockSuccess = "block.success" - MetricBlockDrop = "block.drop" - - MetricJSONRPCLatency = "jsonrpc.latency" - MetricJSONRPCRequest = "jsonrpc.request" - MetricJSONRPCSuccess = "jsonrpc.success" - MetricJSONRPCThrottled = "jsonrpc.throttled" - MetricPublicAPIProxyLatency = "publicapi.latency" - MetricPublicAPIProxyRequest = "publicapi.request" - MetricPublicAPIProxySuccess = "publicapi.success" - MetricPublicAPIProxyThrottled = "publicapi.throttled" - MetricFindingsDropped = "findings.dropped" - MetricCombinerRequest = "combiner.request" - MetricCombinerLatency = "combiner.latency" - MetricCombinerError = "combiner.error" - MetricCombinerSuccess = "combiner.success" - MetricCombinerDrop = "combiner.drop" -) - func SendAgentMetrics(client clients.MessageClient, ms []*protocol.AgentMetric) { if len(ms) > 0 { client.PublishProto(messaging.SubjectMetricAgent, &protocol.AgentMetricList{ @@ -53,13 +20,25 @@ func SendAgentMetrics(client clients.MessageClient, ms []*protocol.AgentMetric) } } -func CreateAgentMetric(agt config.AgentConfig, metric string, value float64) *protocol.AgentMetric { +func CreateAgentMetricV1(agt config.AgentConfig, metric string, value float64) *protocol.AgentMetric { + return &protocol.AgentMetric{ + AgentId: agt.ID, + Timestamp: time.Now().Format(time.RFC3339), + Name: metric, + Value: value, + ShardId: agt.ShardID(), + ChainId: int64(agt.ChainID), + } +} + +func CreateAgentMetricV2(agt config.AgentConfig, metric string, value float64, chainID int64) *protocol.AgentMetric { return &protocol.AgentMetric{ AgentId: agt.ID, Timestamp: time.Now().Format(time.RFC3339), Name: metric, Value: value, ShardId: agt.ShardID(), + ChainId: chainID, } } @@ -83,6 +62,7 @@ func createMetrics(agt config.AgentConfig, timestamp string, metricMap map[strin Name: name, Value: value, ShardId: agt.ShardID(), + ChainId: int64(agt.ChainID), }) } return res @@ -95,16 +75,16 @@ func durationMs(from time.Time, to time.Time) float64 { func GetBlockMetrics(agt config.AgentConfig, resp *protocol.EvaluateBlockResponse, times *domain.TrackingTimestamps) []*protocol.AgentMetric { metrics := make(map[string]float64) - metrics[MetricBlockRequest] = 1 - metrics[MetricFinding] = float64(len(resp.Findings)) - metrics[MetricBlockLatency] = float64(resp.LatencyMs) - metrics[MetricBlockBlockAge] = durationMs(times.Block, times.BotRequest) - metrics[MetricBlockEventAge] = durationMs(times.Feed, times.BotRequest) + metrics[domain.MetricBlockRequest] = 1 + metrics[domain.MetricFinding] = float64(len(resp.Findings)) + metrics[domain.MetricBlockLatency] = float64(resp.LatencyMs) + metrics[domain.MetricBlockBlockAge] = durationMs(times.Block, times.BotRequest) + metrics[domain.MetricBlockEventAge] = durationMs(times.Feed, times.BotRequest) if resp.Status == protocol.ResponseStatus_ERROR { - metrics[MetricBlockError] = 1 + metrics[domain.MetricBlockError] = 1 } else if resp.Status == protocol.ResponseStatus_SUCCESS { - metrics[MetricBlockSuccess] = 1 + metrics[domain.MetricBlockSuccess] = 1 } return createMetrics(agt, resp.Timestamp, metrics) @@ -113,16 +93,16 @@ func GetBlockMetrics(agt config.AgentConfig, resp *protocol.EvaluateBlockRespons func GetTxMetrics(agt config.AgentConfig, resp *protocol.EvaluateTxResponse, times *domain.TrackingTimestamps) []*protocol.AgentMetric { metrics := make(map[string]float64) - metrics[MetricTxRequest] = 1 - metrics[MetricFinding] = float64(len(resp.Findings)) - metrics[MetricTxLatency] = float64(resp.LatencyMs) - metrics[MetricTxBlockAge] = durationMs(times.Block, times.BotRequest) - metrics[MetricTxEventAge] = durationMs(times.Feed, times.BotRequest) + metrics[domain.MetricTxRequest] = 1 + metrics[domain.MetricFinding] = float64(len(resp.Findings)) + metrics[domain.MetricTxLatency] = float64(resp.LatencyMs) + metrics[domain.MetricTxBlockAge] = durationMs(times.Block, times.BotRequest) + metrics[domain.MetricTxEventAge] = durationMs(times.Feed, times.BotRequest) if resp.Status == protocol.ResponseStatus_ERROR { - metrics[MetricTxError] = 1 + metrics[domain.MetricTxError] = 1 } else if resp.Status == protocol.ResponseStatus_SUCCESS { - metrics[MetricTxSuccess] = 1 + metrics[domain.MetricTxSuccess] = 1 } return createMetrics(agt, resp.Timestamp, metrics) @@ -131,14 +111,14 @@ func GetTxMetrics(agt config.AgentConfig, resp *protocol.EvaluateTxResponse, tim func GetCombinerMetrics(agt config.AgentConfig, resp *protocol.EvaluateAlertResponse, times *domain.TrackingTimestamps) []*protocol.AgentMetric { metrics := make(map[string]float64) - metrics[MetricCombinerRequest] = 1 - metrics[MetricFinding] = float64(len(resp.Findings)) - metrics[MetricCombinerLatency] = float64(resp.LatencyMs) + metrics[domain.MetricCombinerRequest] = 1 + metrics[domain.MetricFinding] = float64(len(resp.Findings)) + metrics[domain.MetricCombinerLatency] = float64(resp.LatencyMs) if resp.Status == protocol.ResponseStatus_ERROR { - metrics[MetricCombinerError] = 1 + metrics[domain.MetricCombinerError] = 1 } else if resp.Status == protocol.ResponseStatus_SUCCESS { - metrics[MetricCombinerSuccess] = 1 + metrics[domain.MetricCombinerSuccess] = 1 } return createMetrics(agt, resp.Timestamp, metrics) @@ -147,15 +127,15 @@ func GetCombinerMetrics(agt config.AgentConfig, resp *protocol.EvaluateAlertResp func GetJSONRPCMetrics(agt config.AgentConfig, at time.Time, success, throttled int, latencyMs time.Duration, method string) []*protocol.AgentMetric { values := make(map[string]float64) if latencyMs > 0 { - values[MetricJSONRPCLatency] = float64(latencyMs.Milliseconds()) + values[domain.MetricJSONRPCLatency] = float64(latencyMs.Milliseconds()) } if success > 0 { - values[MetricJSONRPCSuccess] = float64(success) - values[MetricJSONRPCRequest] += float64(success) + values[domain.MetricJSONRPCSuccess] = float64(success) + values[domain.MetricJSONRPCRequest] += float64(success) } if throttled > 0 { - values[MetricJSONRPCThrottled] = float64(throttled) - values[MetricJSONRPCRequest] += float64(throttled) + values[domain.MetricJSONRPCThrottled] = float64(throttled) + values[domain.MetricJSONRPCRequest] += float64(throttled) } return createJsonRpcMetrics(agt, at.Format(time.RFC3339), values, method) } @@ -170,23 +150,24 @@ func createJsonRpcMetrics(agt config.AgentConfig, timestamp string, metricMap ma Name: fmt.Sprintf("%s.%s", name, method), Value: value, ShardId: agt.ShardID(), + ChainId: int64(agt.ChainID), }) } return res } -func GetPublicAPIMetrics(botID string, at time.Time, success, throttled int, latencyMs time.Duration) []*protocol.AgentMetric { +func GetPublicAPIMetrics(botID string, at time.Time, success, throttled int, latency time.Duration) []*protocol.AgentMetric { values := make(map[string]float64) - if latencyMs > 0 { - values[MetricPublicAPIProxyLatency] = float64(latencyMs.Milliseconds()) + if latency > 0 { + values[domain.MetricPublicAPIProxyLatency] = float64(latency.Milliseconds()) } if success > 0 { - values[MetricPublicAPIProxySuccess] = float64(success) - values[MetricPublicAPIProxyRequest] += float64(success) + values[domain.MetricPublicAPIProxySuccess] = float64(success) + values[domain.MetricPublicAPIProxyRequest] += float64(success) } if throttled > 0 { - values[MetricPublicAPIProxyThrottled] = float64(throttled) - values[MetricPublicAPIProxyRequest] += float64(throttled) + values[domain.MetricPublicAPIProxyThrottled] = float64(throttled) + values[domain.MetricPublicAPIProxyRequest] += float64(throttled) } //TODO: get the shardID into this eventually return createMetrics(config.AgentConfig{ID: botID}, at.Format(time.RFC3339), values) diff --git a/services/publisher/metrics.go b/services/publisher/metrics.go index cd24219e..797b15a7 100644 --- a/services/publisher/metrics.go +++ b/services/publisher/metrics.go @@ -12,19 +12,25 @@ import ( // AgentMetricsAggregator aggregates agents' metrics and produces a list of summary of them when flushed. type AgentMetricsAggregator struct { - buckets []*metricsBucket - bucketInterval time.Duration - lastFlush time.Time - mu sync.RWMutex + // int64 is a chain id + bucketsByChainID map[int64][]*metricsBucket + chainID int64 + bucketInterval time.Duration + lastFlush time.Time + mu sync.RWMutex } type metricsBucket struct { - Time time.Time - MetricCounters map[string][]uint32 - MetricDetails map[string]string + Time time.Time + MetricsData map[string]metricsData protocol.AgentMetrics } +type metricsData struct { + Counters []uint32 + Details string +} + func (mb *metricsBucket) CreateAndGetSummary(name string) *protocol.MetricSummary { for _, summary := range mb.Metrics { if summary.Name == name { @@ -37,17 +43,24 @@ func (mb *metricsBucket) CreateAndGetSummary(name string) *protocol.MetricSummar } // NewAgentMetricsAggregator creates a new agent metrics aggregator. -func NewMetricsAggregator(bucketInterval time.Duration) *AgentMetricsAggregator { +func NewMetricsAggregator(bucketInterval time.Duration, chainID int64) *AgentMetricsAggregator { return &AgentMetricsAggregator{ - mu: sync.RWMutex{}, - bucketInterval: bucketInterval, - lastFlush: time.Now(), // avoid flushing immediately + mu: sync.RWMutex{}, + bucketInterval: bucketInterval, + lastFlush: time.Now(), // avoid flushing immediately + bucketsByChainID: make(map[int64][]*metricsBucket), + chainID: chainID, } } -func (ama *AgentMetricsAggregator) findBucket(agentID string, t time.Time) *metricsBucket { +func (ama *AgentMetricsAggregator) findBucket(agentID string, chainID int64, t time.Time) *metricsBucket { bucketTime := ama.FindClosestBucketTime(t) - for _, bucket := range ama.buckets { + buckets, ok := ama.bucketsByChainID[chainID] + if !ok { + ama.bucketsByChainID[chainID] = make([]*metricsBucket, 0) + } + + for _, bucket := range buckets { if bucket.AgentId != agentID { continue } @@ -56,14 +69,15 @@ func (ama *AgentMetricsAggregator) findBucket(agentID string, t time.Time) *metr } return bucket } + bucket := &metricsBucket{ - Time: bucketTime, - MetricCounters: make(map[string][]uint32), - MetricDetails: make(map[string]string), + Time: bucketTime, + MetricsData: make(map[string]metricsData), } + bucket.AgentId = agentID bucket.Timestamp = utils.FormatTime(bucketTime) - ama.buckets = append(ama.buckets, bucket) + ama.bucketsByChainID[chainID] = append(ama.bucketsByChainID[chainID], bucket) return bucket } @@ -75,20 +89,26 @@ func (ama *AgentMetricsAggregator) FindClosestBucketTime(t time.Time) time.Time return time.Unix(0, ts-rem) } -type agentResponse protocol.EvaluateTxResponse - func (ama *AgentMetricsAggregator) AddAgentMetrics(ms *protocol.AgentMetricList) error { ama.mu.Lock() defer ama.mu.Unlock() for _, m := range ms.Metrics { t, _ := time.Parse(time.RFC3339, m.Timestamp) - bucket := ama.findBucket(m.AgentId, t) - bucket.MetricCounters[m.Name] = append(bucket.MetricCounters[m.Name], uint32(m.Value)) - if m.Details != "" { - bucket.MetricDetails[m.Name] = m.Details + + // add chain id if it's not set, e.g. EventMetric + chainID := ama.chainID + if m.ChainId != 0 { + chainID = m.ChainId + } + + bucket := ama.findBucket(m.AgentId, chainID, t) + bucket.MetricsData[m.Name] = metricsData{ + Counters: append(bucket.MetricsData[m.Name].Counters, uint32(m.Value)), + Details: m.Details, } } + return nil } @@ -100,14 +120,16 @@ func (ama *AgentMetricsAggregator) ForceFlush() []*protocol.AgentMetrics { now := time.Now() ama.lastFlush = now - buckets := ama.buckets - ama.buckets = nil + buckets := ama.bucketsByChainID + ama.bucketsByChainID = make(map[int64][]*metricsBucket) (allAgentMetrics)(buckets).Fix() var allMetrics []*protocol.AgentMetrics - for _, bucket := range buckets { - allMetrics = append(allMetrics, &bucket.AgentMetrics) + for _, metricsBuckets := range buckets { + for _, bucket := range metricsBuckets { + allMetrics = append(allMetrics, &bucket.AgentMetrics) + } } return allMetrics @@ -124,14 +146,16 @@ func (ama *AgentMetricsAggregator) TryFlush() ([]*protocol.AgentMetrics, bool) { } ama.lastFlush = now - buckets := ama.buckets - ama.buckets = nil + buckets := ama.bucketsByChainID + ama.bucketsByChainID = make(map[int64][]*metricsBucket) (allAgentMetrics)(buckets).Fix() var allMetrics []*protocol.AgentMetrics - for _, bucket := range buckets { - allMetrics = append(allMetrics, &bucket.AgentMetrics) + for _, metricsBuckets := range buckets { + for _, bucket := range metricsBuckets { + allMetrics = append(allMetrics, &bucket.AgentMetrics) + } } return allMetrics, true @@ -139,27 +163,30 @@ func (ama *AgentMetricsAggregator) TryFlush() ([]*protocol.AgentMetrics, bool) { // allAgentMetrics is an alias type for post-processing aggregated in-memory metrics // before we publish them. -type allAgentMetrics []*metricsBucket +type allAgentMetrics map[int64][]*metricsBucket func (allMetrics allAgentMetrics) Fix() { - sort.Slice(allMetrics, func(i, j int) bool { - return allMetrics[i].Time.Before(allMetrics[j].Time) - }) + for _, metricsBuckets := range allMetrics { + sort.Slice(metricsBuckets, func(i, j int) bool { + return metricsBuckets[i].Time.Before(metricsBuckets[j].Time) + }) + } allMetrics.PrepareMetrics() } func (allMetrics allAgentMetrics) PrepareMetrics() { - for _, agentMetrics := range allMetrics { - for metricName, list := range agentMetrics.MetricCounters { - if len(list) > 0 { - summary := agentMetrics.CreateAndGetSummary(metricName) - summary.Count = uint32(len(list)) - summary.Average = avgMetricArray(list) - summary.Max = maxDataPoint(list) - summary.P95 = calcP95(list) - summary.Sum = sumNums(list) - if details, ok := agentMetrics.MetricDetails[metricName]; ok { - summary.Details = details + for chainID, metricsBuckets := range allMetrics { + for _, agentMetrics := range metricsBuckets { + for metricName, data := range agentMetrics.MetricsData { + if len(data.Counters) > 0 { + summary := agentMetrics.CreateAndGetSummary(metricName) + summary.Count = uint32(len(data.Counters)) + summary.Average = avgMetricArray(data.Counters) + summary.Max = maxDataPoint(data.Counters) + summary.P95 = calcP95(data.Counters) + summary.Sum = sumNums(data.Counters) + summary.Details = data.Details + summary.ChainId = chainID } } } diff --git a/services/publisher/metrics_test.go b/services/publisher/metrics_test.go index 40643815..3b8b8284 100644 --- a/services/publisher/metrics_test.go +++ b/services/publisher/metrics_test.go @@ -23,7 +23,6 @@ type MetricsMathTest struct { } func TestAgentMetricsAggregator_math(t *testing.T) { - tests := []*MetricsMathTest{ { metrics: []float64{1, 2, 3, 4, 5}, @@ -34,6 +33,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { Average: 3, Sum: 15, P95: 4, + ChainId: 100, }, }, { @@ -45,6 +45,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { Average: 15, Sum: 45, P95: 10, + ChainId: 100, }, }, { @@ -56,6 +57,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { Average: 45, Sum: 45, P95: 45, + ChainId: 100, }, }, { @@ -69,6 +71,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { Average: 45, Sum: 45, P95: 45, + ChainId: 100, }, }, } @@ -87,7 +90,7 @@ func TestAgentMetricsAggregator_math(t *testing.T) { }) } - aggregator := publisher.NewMetricsAggregator(testBucketInterval) + aggregator := publisher.NewMetricsAggregator(testBucketInterval, 100) err := aggregator.AddAgentMetrics(&protocol.AgentMetricList{Metrics: metrics}) assert.NoError(t, err) time.Sleep(testBucketInterval * 2) diff --git a/services/publisher/publisher.go b/services/publisher/publisher.go index 91cbb076..a3fe463c 100644 --- a/services/publisher/publisher.go +++ b/services/publisher/publisher.go @@ -930,7 +930,7 @@ func initPublisher( cfg: cfg, ipfs: ipfsClient, storage: storageClient, - metricsAggregator: NewMetricsAggregator(time.Duration(*cfg.PublisherConfig.Batch.MetricsBucketIntervalSeconds) * time.Second), + metricsAggregator: NewMetricsAggregator(time.Duration(*cfg.PublisherConfig.Batch.MetricsBucketIntervalSeconds)*time.Second, int64(cfg.ChainID)), messageClient: mc, alertClient: alertClient, localAlertClient: localAlertClient, diff --git a/tests/e2e/.ipfs/config b/tests/e2e/.ipfs/config index c6743a60..f4fe7102 100644 --- a/tests/e2e/.ipfs/config +++ b/tests/e2e/.ipfs/config @@ -1,12 +1,30 @@ { - "Identity": { - "PeerID": "12D3KooWQnSGodX2FKUriXDoBMsnUPeT8VY4JKGcnBHoyZbFr2Et", - "PrivKey": "CAESQPMUlTm0g5Lr56yX8RkPTJFFSpMZxrCsN4jvju8FO2e13mDIgjdS+DpO6sWjMqqe3vhkmx/u8zXKF4DdvFn5KhE=" + "API": { + "HTTPHeaders": {} + }, + "Addresses": { + "API": "/ip4/0.0.0.0/tcp/5002", + "Announce": null, + "Gateway": "/ip4/0.0.0.0/tcp/8181", + "NoAnnounce": null, + "Swarm": [ + "/ip4/0.0.0.0/tcp/4002", + "/ip6/::/tcp/4002", + "/ip4/0.0.0.0/udp/4002/quic-v1", + "/ip4/0.0.0.0/udp/4002/quic-v1/webtransport", + "/ip6/::/udp/4002/quic-v1", + "/ip6/::/udp/4002/quic-v1/webtransport" + ] + }, + "AutoNAT": {}, + "Bootstrap": [], + "DNS": { + "Resolvers": {} }, "Datastore": { - "StorageMax": "10GB", - "StorageGCWatermark": 90, + "BloomFilterSize": 0, "GCPeriod": "1h", + "HashOnRead": false, "Spec": { "mounts": [ { @@ -33,25 +51,8 @@ ], "type": "mount" }, - "HashOnRead": false, - "BloomFilterSize": 0 - }, - "Addresses": { - "Swarm": [ - "/ip4/0.0.0.0/tcp/4002", - "/ip6/::/tcp/4002", - "/ip4/0.0.0.0/udp/4002/quic", - "/ip6/::/udp/4002/quic" - ], - "Announce": [], - "NoAnnounce": [], - "API": "/ip4/0.0.0.0/tcp/5002", - "Gateway": "/ip4/0.0.0.0/tcp/8181" - }, - "Mounts": { - "IPFS": "/ipfs", - "IPNS": "/ipns", - "FuseAllowOther": false + "StorageGCWatermark": 90, + "StorageMax": "10GB" }, "Discovery": { "MDNS": { @@ -59,95 +60,76 @@ "Interval": 10 } }, - "Routing": { - "Type": "none" - }, - "Ipns": { - "RepublishPeriod": "", - "RecordLifetime": "", - "ResolveCacheSize": 128 + "Experimental": { + "FilestoreEnabled": false, + "GraphsyncEnabled": false, + "Libp2pStreamMounting": false, + "P2pHttpProxy": false, + "ShardingEnabled": false, + "StrategicProviding": false, + "UrlstoreEnabled": false }, - "Bootstrap": [], "Gateway": { - "HTTPHeaders": { - "Access-Control-Allow-Headers": [ - "X-Requested-With", - "Range", - "User-Agent" - ], - "Access-Control-Allow-Methods": [ - "GET" - ], - "Access-Control-Allow-Origin": [ - "*" - ] - }, - "RootRedirect": "", - "Writable": false, - "PathPrefixes": [], "APICommands": [], - "NoFetch": true, + "HTTPHeaders": {}, "NoDNSLink": true, - "PublicGateways": null - }, - "API": { - "HTTPHeaders": {} - }, - "Swarm": { - "AddrFilters": null, - "DisableBandwidthMetrics": false, - "DisableNatPortMap": false, - "EnableRelayHop": false, - "EnableAutoRelay": false, - "Transports": { - "Network": {}, - "Security": {}, - "Multiplexers": {} - }, - "ConnMgr": { - "Type": "basic", - "LowWater": 600, - "HighWater": 900, - "GracePeriod": "20s" - } - }, - "AutoNAT": {}, - "Pubsub": { - "Router": "", - "DisableSigning": false + "NoFetch": true, + "PathPrefixes": [], + "PublicGateways": null, + "RootRedirect": "", + "Writable": false }, - "Peering": { - "Peers": null + "Identity": { + "PeerID": "12D3KooWQnSGodX2FKUriXDoBMsnUPeT8VY4JKGcnBHoyZbFr2Et", + "PrivKey": "CAESQPMUlTm0g5Lr56yX8RkPTJFFSpMZxrCsN4jvju8FO2e13mDIgjdS+DpO6sWjMqqe3vhkmx/u8zXKF4DdvFn5KhE=" }, - "DNS": { - "Resolvers": {} + "Internal": {}, + "Ipns": { + "RecordLifetime": "", + "RepublishPeriod": "", + "ResolveCacheSize": 128 }, "Migration": { "DownloadSources": [], "Keep": "" }, - "Provider": { - "Strategy": "" + "Mounts": { + "FuseAllowOther": false, + "IPFS": "/ipfs", + "IPNS": "/ipns" }, - "Reprovider": { - "Interval": "12h", - "Strategy": "all" + "Peering": { + "Peers": null }, - "Experimental": { - "FilestoreEnabled": false, - "UrlstoreEnabled": false, - "ShardingEnabled": false, - "GraphsyncEnabled": false, - "Libp2pStreamMounting": false, - "P2pHttpProxy": false, - "StrategicProviding": false, - "AcceleratedDHTClient": false + "Pinning": { + "RemoteServices": {} }, "Plugins": { "Plugins": null }, - "Pinning": { - "RemoteServices": {} + "Provider": { + "Strategy": "" }, - "Internal": {} + "Pubsub": { + "DisableSigning": false, + "Router": "" + }, + "Reprovider": {}, + "Routing": { + "AcceleratedDHTClient": false, + "Type": "none" + }, + "Swarm": { + "AddrFilters": null, + "ConnMgr": {}, + "DisableBandwidthMetrics": false, + "DisableNatPortMap": false, + "EnableAutoRelay": false, + "EnableRelayHop": false, + "Transports": { + "Multiplexers": {}, + "Network": {}, + "Security": {} + } + } } diff --git a/tests/e2e/.ipfs/version b/tests/e2e/.ipfs/version index 48082f72..60d3b2f4 100644 --- a/tests/e2e/.ipfs/version +++ b/tests/e2e/.ipfs/version @@ -1 +1 @@ -12 +15 diff --git a/tests/e2e/disco.config.yml b/tests/e2e/disco.config.yml index bdb9f0d8..2454a883 100644 --- a/tests/e2e/disco.config.yml +++ b/tests/e2e/disco.config.yml @@ -6,7 +6,9 @@ log: environment: development storage: ipfs: - url: http://localhost:5002 + router: + nodes: + - url: http://localhost:5002 delete: enabled: true maintenance: diff --git a/tests/e2e/e2e_forta_local_mode_test.go b/tests/e2e/e2e_forta_local_mode_test.go index 3df5a6c9..e2ffd535 100644 --- a/tests/e2e/e2e_forta_local_mode_test.go +++ b/tests/e2e/e2e_forta_local_mode_test.go @@ -10,8 +10,8 @@ import ( "time" "github.com/forta-network/forta-core-go/clients/webhook/client/models" + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/security" - "github.com/forta-network/forta-node/services/components/metrics" "github.com/forta-network/forta-node/tests/e2e" "github.com/forta-network/forta-node/tests/e2e/agents/combinerbot/combinerbotalertid" "github.com/forta-network/forta-node/tests/e2e/agents/txdetectoragent/testbotalertid" @@ -279,7 +279,7 @@ func (s *Suite) runLocalModeAlertHandler(webhookURL, logFileName string, readAle if strings.Contains(summary.Name, "agent.health") { s.T().Logf("contains metric: %s", summary.Name) } - if summary.Name == metrics.MetricHealthCheckSuccess { + if summary.Name == domain.MetricHealthCheckSuccess { healthCheckMetric = summary break } From 1fdb43863120926463bf80f9bcd28bd71ecde573 Mon Sep 17 00:00:00 2001 From: dkeysil Date: Mon, 29 Jan 2024 19:06:03 +0100 Subject: [PATCH 2/3] Fix ipfs config --- tests/e2e/.ipfs/config | 154 ++++++++++++++++++++++++++--------------- 1 file changed, 100 insertions(+), 54 deletions(-) diff --git a/tests/e2e/.ipfs/config b/tests/e2e/.ipfs/config index f32ebf45..c6743a60 100644 --- a/tests/e2e/.ipfs/config +++ b/tests/e2e/.ipfs/config @@ -1,30 +1,12 @@ { - "API": { - "HTTPHeaders": {} - }, - "Addresses": { - "API": "/ip4/0.0.0.0/tcp/5002", - "Announce": null, - "Gateway": "/ip4/0.0.0.0/tcp/8181", - "NoAnnounce": null, - "Swarm": [ - "/ip4/0.0.0.0/tcp/4002", - "/ip6/::/tcp/4002", - "/ip4/0.0.0.0/udp/4002/quic-v1", - "/ip4/0.0.0.0/udp/4002/quic-v1/webtransport", - "/ip6/::/udp/4002/quic-v1", - "/ip6/::/udp/4002/quic-v1/webtransport" - ] - }, - "AutoNAT": {}, - "Bootstrap": [], - "DNS": { - "Resolvers": {} + "Identity": { + "PeerID": "12D3KooWQnSGodX2FKUriXDoBMsnUPeT8VY4JKGcnBHoyZbFr2Et", + "PrivKey": "CAESQPMUlTm0g5Lr56yX8RkPTJFFSpMZxrCsN4jvju8FO2e13mDIgjdS+DpO6sWjMqqe3vhkmx/u8zXKF4DdvFn5KhE=" }, "Datastore": { - "BloomFilterSize": 0, + "StorageMax": "10GB", + "StorageGCWatermark": 90, "GCPeriod": "1h", - "HashOnRead": false, "Spec": { "mounts": [ { @@ -51,8 +33,25 @@ ], "type": "mount" }, - "StorageGCWatermark": 90, - "StorageMax": "10GB" + "HashOnRead": false, + "BloomFilterSize": 0 + }, + "Addresses": { + "Swarm": [ + "/ip4/0.0.0.0/tcp/4002", + "/ip6/::/tcp/4002", + "/ip4/0.0.0.0/udp/4002/quic", + "/ip6/::/udp/4002/quic" + ], + "Announce": [], + "NoAnnounce": [], + "API": "/ip4/0.0.0.0/tcp/5002", + "Gateway": "/ip4/0.0.0.0/tcp/8181" + }, + "Mounts": { + "IPFS": "/ipfs", + "IPNS": "/ipns", + "FuseAllowOther": false }, "Discovery": { "MDNS": { @@ -60,48 +59,95 @@ "Interval": 10 } }, - "Experimental": { - "FilestoreEnabled": false, - "GraphsyncEnabled": false, - "Libp2pStreamMounting": false, - "P2pHttpProxy": false, - "ShardingEnabled": false, - "StrategicProviding": false, - "UrlstoreEnabled": false + "Routing": { + "Type": "none" + }, + "Ipns": { + "RepublishPeriod": "", + "RecordLifetime": "", + "ResolveCacheSize": 128 }, + "Bootstrap": [], "Gateway": { + "HTTPHeaders": { + "Access-Control-Allow-Headers": [ + "X-Requested-With", + "Range", + "User-Agent" + ], + "Access-Control-Allow-Methods": [ + "GET" + ], + "Access-Control-Allow-Origin": [ + "*" + ] + }, + "RootRedirect": "", + "Writable": false, + "PathPrefixes": [], "APICommands": [], - "HTTPHeaders": {}, - "NoDNSLink": true, "NoFetch": true, - "PathPrefixes": [], - "PublicGateways": null, - "RootRedirect": "", - "Writable": false + "NoDNSLink": true, + "PublicGateways": null }, - "Identity": { - "PeerID": "12D3KooWQnSGodX2FKUriXDoBMsnUPeT8VY4JKGcnBHoyZbFr2Et", - "PrivKey": "CAESQPMUlTm0g5Lr56yX8RkPTJFFSpMZxrCsN4jvju8FO2e13mDIgjdS+DpO6sWjMqqe3vhkmx/u8zXKF4DdvFn5KhE=" + "API": { + "HTTPHeaders": {} }, - "Internal": {}, - "Ipns": { - "RecordLifetime": "", - "RepublishPeriod": "", - "ResolveCacheSize": 128 + "Swarm": { + "AddrFilters": null, + "DisableBandwidthMetrics": false, + "DisableNatPortMap": false, + "EnableRelayHop": false, + "EnableAutoRelay": false, + "Transports": { + "Network": {}, + "Security": {}, + "Multiplexers": {} + }, + "ConnMgr": { + "Type": "basic", + "LowWater": 600, + "HighWater": 900, + "GracePeriod": "20s" + } + }, + "AutoNAT": {}, + "Pubsub": { + "Router": "", + "DisableSigning": false + }, + "Peering": { + "Peers": null + }, + "DNS": { + "Resolvers": {} }, "Migration": { "DownloadSources": [], "Keep": "" }, - "Mounts": { - "FuseAllowOther": false, - "IPFS": "/ipfs", - "IPNS": "/ipns" + "Provider": { + "Strategy": "" }, - "Peering": { - "Peers": null + "Reprovider": { + "Interval": "12h", + "Strategy": "all" + }, + "Experimental": { + "FilestoreEnabled": false, + "UrlstoreEnabled": false, + "ShardingEnabled": false, + "GraphsyncEnabled": false, + "Libp2pStreamMounting": false, + "P2pHttpProxy": false, + "StrategicProviding": false, + "AcceleratedDHTClient": false + }, + "Plugins": { + "Plugins": null }, "Pinning": { "RemoteServices": {} - } + }, + "Internal": {} } From 07de81d065db7ea702267cfd4556db4e53bd42e3 Mon Sep 17 00:00:00 2001 From: dkeysil Date: Mon, 29 Jan 2024 19:14:11 +0100 Subject: [PATCH 3/3] Use chan to sync test --- services/components/botio/bot_client_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/services/components/botio/bot_client_test.go b/services/components/botio/bot_client_test.go index e651a5db..02df30d6 100644 --- a/services/components/botio/bot_client_test.go +++ b/services/components/botio/bot_client_test.go @@ -96,15 +96,20 @@ func (s *BotClientSuite) TestStartProcessStop() { s.lifecycleMetrics.EXPECT().ActionSubscribe(combinerSubscriptions) // test health checks + healthCheckChan := make(chan interface{}) HealthCheckInterval = time.Second - s.botGrpc.EXPECT().DoHealthCheck(gomock.Any()).MinTimes(1) + s.botGrpc.EXPECT().DoHealthCheck(gomock.Any()).Return(nil).MinTimes(1) s.lifecycleMetrics.EXPECT().HealthCheckAttempt(s.botClient.configUnsafe).MinTimes(1) - s.lifecycleMetrics.EXPECT().HealthCheckSuccess(s.botClient.configUnsafe).MinTimes(1) + s.lifecycleMetrics.EXPECT().HealthCheckSuccess(s.botClient.configUnsafe).Do(func(_ interface{}) { + close(healthCheckChan) + }).MinTimes(1) s.msgClient.EXPECT().Publish(messaging.SubjectAgentsAlertSubscribe, combinerSubscriptions) s.botClient.StartProcessing() s.botClient.Initialize() + <-healthCheckChan + <-s.botClient.Initialized() txReq := &protocol.EvaluateTxRequest{ @@ -193,8 +198,6 @@ func (s *BotClientSuite) TestStartProcessStop() { s.lifecycleMetrics.EXPECT().ActionUnsubscribe(combinerSubscriptions) s.r.NoError(s.botClient.Close()) - // Using small sleep to allow goroutines to be executed (e.g. health check) - time.Sleep(30 * time.Millisecond) } func (s *BotClientSuite) TestCombinerBotSubscriptions() {