diff --git a/core/connection/interface.go b/core/connection/interface.go index dbbb8b2245..bcf6a4b24c 100644 --- a/core/connection/interface.go +++ b/core/connection/interface.go @@ -41,7 +41,7 @@ type Connection interface { // ConnectionDiag is a specialised Connection interface for provider check type ConnectionDiag interface { - Diag() bool + Diag() error } // StateChannel is the channel we receive state change events on diff --git a/core/connection/manager-diag.go b/core/connection/manager-diag.go index c1f7a2abef..d8a061a047 100644 --- a/core/connection/manager-diag.go +++ b/core/connection/manager-diag.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/rs/zerolog/log" + "golang.org/x/time/rate" "github.com/mysteriumnetwork/node/config" "github.com/mysteriumnetwork/node/core/connection/connectionstate" @@ -91,6 +92,8 @@ type diagConnectionManager struct { // populated by Connect at runtime. connsMu sync.Mutex conns map[string]*conn + + ratelimiter *rate.Limiter } // NewDiagManager creates connection manager with given dependencies @@ -120,6 +123,8 @@ func NewDiagManager( validator: validator, p2pDialer: p2pDialer, timeGetter: time.Now, + + ratelimiter: rate.NewLimiter(rate.Every(1000*time.Millisecond), 1), } m.eventBus.SubscribeAsync(connectionstate.AppTopicConnectionState, m.reconnectOnHold) @@ -153,6 +158,13 @@ func (m *diagConnectionManager) GetReadyChan(providerID string) chan interface{} func (m *diagConnectionManager) Connect(consumerID identity.Identity, hermesID common.Address, proposalLookup ProposalLookup, params ConnectParams) (err error) { var sessionID session.ID + ctx := context.Background() + err = m.ratelimiter.Wait(ctx) // This is a blocking call. Honors the rate limit + if err != nil { + log.Error().Msgf("ratelimiter.Wait: %s", err) + return err + } + proposal, err := proposalLookup() if err != nil { return fmt.Errorf("failed to lookup proposal: %w", err) @@ -164,8 +176,10 @@ func (m *diagConnectionManager) Connect(consumerID identity.Identity, hermesID c log.Debug().Msgf("Consumer connection trace: %s", traceResult) }() - fmt.Println("Connect>", proposal.ProviderID) + log.Error().Msgf("Connect > %v", proposal.ProviderID) uuid := proposal.ProviderID + + m.connsMu.Lock() con, ok := m.conns[uuid] if !ok { con = new(conn) @@ -173,6 +187,8 @@ func (m *diagConnectionManager) Connect(consumerID identity.Identity, hermesID c con.uuid = uuid m.conns[uuid] = con } + m.connsMu.Unlock() + removeConnection := func() { m.connsMu.Lock() defer m.connsMu.Unlock() @@ -933,7 +949,9 @@ func (m *diagConnectionManager) keepAliveLoop(con *conn, channel p2p.Channel, se if config.GetBool(config.FlagKeepConnectedOnFail) { m.statusOnHold(con) } else { - m.Disconnect() + //m.Disconnect() + log.Error().Msgf("Max p2p keepalive err count reached, disconnecting. SessionID=%s >>>>>>>>>", sessionID) + m.DisconnectSingle(con) } cancel() return diff --git a/core/connection/pinger.go b/core/connection/pinger.go index 6ececc5437..202ccf70bb 100644 --- a/core/connection/pinger.go +++ b/core/connection/pinger.go @@ -25,14 +25,17 @@ import ( // Diag is used to start provider check func Diag(cm *diagConnectionManager, con *conn, providerID string) { c, ok := con.activeConnection.(ConnectionDiag) - res := false + res := error(nil) if ok { log.Debug().Msgf("Check provider> %v", providerID) res = c.Diag() cm.DisconnectSingle(con) } - ev := quality.DiagEvent{ProviderID: providerID, Result: res} + ev := quality.DiagEvent{ProviderID: providerID, Result: res == nil} + if res != nil { + ev.Error = res + } con.resChannel <- ev close(con.resChannel) } diff --git a/core/quality/metrics.go b/core/quality/metrics.go index 75ac4268e0..ae85bc2c47 100644 --- a/core/quality/metrics.go +++ b/core/quality/metrics.go @@ -106,6 +106,7 @@ type PingEvent struct { type DiagEvent struct { ProviderID string Result bool + Error error } const ( diff --git a/go.mod b/go.mod index 58302d2919..e8e3b5378a 100644 --- a/go.mod +++ b/go.mod @@ -151,11 +151,15 @@ require ( github.com/imdario/mergo v0.3.12 // indirect github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.5.5 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jinzhu/gorm v1.9.2 // indirect - github.com/jinzhu/inflection v0.0.0-20180308033659-04140366298a // indirect + github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 // indirect @@ -181,6 +185,7 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-pointer v0.0.1 // indirect + github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/mdlayher/genetlink v1.1.0 // indirect github.com/mdlayher/netlink v1.4.2 // indirect @@ -263,12 +268,12 @@ require ( gopkg.in/intercom/intercom-go.v2 v2.0.0-20210504094731-2bd1af0ce4b2 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + gorm.io/driver/postgres v1.5.9 // indirect + gorm.io/driver/sqlite v1.5.6 // indirect + gorm.io/gorm v1.25.11 // indirect honnef.co/go/tools v0.4.2 // indirect lukechampine.com/blake3 v1.2.1 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) -replace ( - golang.zx2c4.com/wireguard => github.com/mysteriumnetwork/wireguard-go v0.0.0-20240416113031-406b13e8996a - //gvisor.dev/gvisor => github.com/mysteriumnetwork/gvisor v0.0.0-20240206094932-ff91e662b9e8 -) +replace golang.zx2c4.com/wireguard => github.com/mysteriumnetwork/wireguard-go v0.0.0-20240416113031-406b13e8996a diff --git a/go.sum b/go.sum index f67f7f4405..d4073b8c8a 100644 --- a/go.sum +++ b/go.sum @@ -957,6 +957,14 @@ github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackpal/gateway v1.0.6 h1:/MJORKvJEwNVldtGVJC2p2cwCnsSoLn3hl3zxmZT7tk= github.com/jackpal/gateway v1.0.6/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -974,6 +982,8 @@ github.com/jinzhu/gorm v1.9.2 h1:lCvgEaqe/HVE+tjAR2mt4HbbHAZsQOv3XAZiEZV37iw= github.com/jinzhu/gorm v1.9.2/go.mod h1:Vla75njaFJ8clLU1W44h34PjIkijhjHIYnZxMqCdxqo= github.com/jinzhu/inflection v0.0.0-20180308033659-04140366298a h1:eeaG9XMUvRBYXJi4pg1ZKM7nxc5AfXfojeLLW7O5J3k= github.com/jinzhu/inflection v0.0.0-20180308033659-04140366298a/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -1107,6 +1117,8 @@ github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnU github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= @@ -2352,6 +2364,14 @@ gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8= +gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= +gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE= +gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4= +gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde h1:9DShaph9qhkIYw7QF91I/ynrr4cOO2PZra2PFD7Mfeg= +gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg= +gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.4.0/go.mod h1:CtbdzLSsqVhDgMtKsx03ird5YTGB3ar27v0u/yKBW5g= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= diff --git a/services/wireguard/connection/connection.go b/services/wireguard/connection/connection.go index 690226df04..0a292591b2 100644 --- a/services/wireguard/connection/connection.go +++ b/services/wireguard/connection/connection.go @@ -87,7 +87,7 @@ func (c *Connection) State() <-chan connectionstate.State { } // Diag is used to start provider check -func (c *Connection) Diag() bool { +func (c *Connection) Diag() error { return c.connectionEndpoint.Diag() } diff --git a/services/wireguard/connection/connection_test.go b/services/wireguard/connection/connection_test.go index ad82dbc3a8..07af4e923e 100644 --- a/services/wireguard/connection/connection_test.go +++ b/services/wireguard/connection/connection_test.go @@ -158,8 +158,8 @@ func (mce *mockConnectionEndpoint) ConfigureRoutes(_ net.IP) error { retur func (mce *mockConnectionEndpoint) PeerStats() (wgcfg.Stats, error) { return wgcfg.Stats{LastHandshake: time.Now(), BytesSent: 10, BytesReceived: 11}, nil } -func (mce *mockConnectionEndpoint) Diag() bool { - return true +func (mce *mockConnectionEndpoint) Diag() error { + return nil } type mockHandshakeWaiter struct { diff --git a/services/wireguard/endpoint.go b/services/wireguard/endpoint.go index e6df362a67..ea3e3d9960 100644 --- a/services/wireguard/endpoint.go +++ b/services/wireguard/endpoint.go @@ -34,5 +34,6 @@ type ConnectionEndpoint interface { Config() (ServiceConfig, error) InterfaceName() string Stop() error - Diag() bool + + Diag() error } diff --git a/services/wireguard/endpoint/diagclient/client.go b/services/wireguard/endpoint/diagclient/client.go index 646ff7f14e..6e50d23759 100644 --- a/services/wireguard/endpoint/diagclient/client.go +++ b/services/wireguard/endpoint/diagclient/client.go @@ -19,6 +19,7 @@ package diagclient import ( "bufio" + "errors" "fmt" "io" "net/http" @@ -126,25 +127,29 @@ func (c *client) Close() (err error) { return nil } -func (c *client) Diag() bool { +func (c *client) Diag() error { client := http.Client{ Transport: &http.Transport{ DialContext: c.tnet.DialContext, }, + Timeout: 15 * time.Second, } - resp, err := client.Get("http://1.1.1.1/") + resp, err := client.Get("http://107.173.23.19:8080/test") if err != nil { log.Error().Err(err).Msg("Get failed") - return false + return err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { log.Error().Err(err).Msg("Readall failed") - return false + return err + } + if len(body) < 6 { + log.Error().Msg("Wrong length") + return errors.New("Wrong body length") } - _ = body - return true + return nil } diff --git a/services/wireguard/endpoint/endpoint.go b/services/wireguard/endpoint/endpoint.go index ad1d1ddd28..5dd43a0800 100644 --- a/services/wireguard/endpoint/endpoint.go +++ b/services/wireguard/endpoint/endpoint.go @@ -52,12 +52,12 @@ type connectionEndpoint struct { wgClient WgClient } -func (ce *connectionEndpoint) Diag() bool { +func (ce *connectionEndpoint) Diag() error { c, ok := ce.wgClient.(WgClientDiag) if ok { return c.Diag() } - return false + return nil } // StartConsumerMode starts and configure wireguard network interface running in consumer mode. diff --git a/services/wireguard/endpoint/wg_client.go b/services/wireguard/endpoint/wg_client.go index 74cd0f6111..22dd36781e 100644 --- a/services/wireguard/endpoint/wg_client.go +++ b/services/wireguard/endpoint/wg_client.go @@ -46,7 +46,7 @@ type WgClient interface { // WgClientDiag is a specialised WgClient interface for provider check type WgClientDiag interface { - Diag() bool + Diag() error } // WgClientFactory represents WireGuard client factory. diff --git a/tequilapi/contract/connection.go b/tequilapi/contract/connection.go index 226e9fdb37..835805c254 100644 --- a/tequilapi/contract/connection.go +++ b/tequilapi/contract/connection.go @@ -54,9 +54,9 @@ func NewConnectionInfoDTO(session connectionstate.Status) ConnectionInfoDTO { // ConnectionDiagInfoDTO holds provider check result // swagger:model ConnectionDiagInfoDTO type ConnectionDiagInfoDTO struct { - Status bool `json:"status"` - Error interface{} `json:"error"` - ProviderID string `json:"provider_id"` + ProviderID string `json:"provider_id"` + Error string `json:"error"` + DiagError string `json:"diag_err"` } // ConnectionInfoDTO holds partial consumer connection details. diff --git a/tequilapi/endpoints/connection-diag.go b/tequilapi/endpoints/connection-diag.go index 5c144a2000..e719aeab88 100644 --- a/tequilapi/endpoints/connection-diag.go +++ b/tequilapi/endpoints/connection-diag.go @@ -25,6 +25,9 @@ import ( "github.com/gin-gonic/gin" "github.com/pkg/errors" "github.com/rs/zerolog/log" + + "gorm.io/driver/postgres" + "gorm.io/gorm" "gvisor.dev/gvisor/pkg/sync" "github.com/mysteriumnetwork/go-rest/apierror" @@ -55,6 +58,8 @@ type ConnectionDiagEndpoint struct { identitySelector selector.Handler consumerAddress string + + db *gorm.DB } // NewConnectionDiagEndpoint creates and returns connection endpoint @@ -78,6 +83,17 @@ func NewConnectionDiagEndpoint(manager connection.DiagManager, stateProvider sta log.Error().Msgf("Unlocked identity: %v", consumerID_.Address) ce.consumerAddress = consumerID_.Address + dsn := "host=____ user=mypguser password=___ dbname=myst_nodes port=5432 sslmode=disable" + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) + if err != nil { + panic(err) + } + + ce.db = db + if err != nil { + panic(err) + } + return ce } @@ -139,7 +155,7 @@ func (ce *ConnectionDiagEndpoint) DiagBatch(c *gin.Context) { ConnectOptions: contract.ConnectOptions{}, } if err := cr.Validate(); err != nil { - result.Error = err + result.Error = err.Error() return } @@ -147,13 +163,13 @@ func (ce *ConnectionDiagEndpoint) DiagBatch(c *gin.Context) { status, err := ce.identityRegistry.GetRegistrationStatus(config.GetInt64(config.FlagChainID), consumerID) if err != nil { log.Error().Err(err).Stack().Msg("Could not check registration status") - result.Error = contract.ErrCodeIDRegistrationCheck + result.Error = (contract.ErrCodeIDRegistrationCheck) return } switch status { case registry.Unregistered, registry.RegistrationError, registry.Unknown: log.Error().Msgf("Identity %q is not registered, aborting...", cr.ConsumerID) - result.Error = contract.ErrCodeIDNotRegistered + result.Error = (contract.ErrCodeIDNotRegistered) return case registry.InProgress: log.Info().Msgf("identity %q registration is in progress, continuing...", cr.ConsumerID) @@ -161,7 +177,7 @@ func (ce *ConnectionDiagEndpoint) DiagBatch(c *gin.Context) { log.Info().Msgf("identity %q is registered, continuing...", cr.ConsumerID) default: log.Error().Msgf("identity %q has unknown status, aborting...", cr.ConsumerID) - result.Error = contract.ErrCodeIDStatusUnknown + result.Error = (contract.ErrCodeIDStatusUnknown) return } @@ -179,7 +195,7 @@ func (ce *ConnectionDiagEndpoint) DiagBatch(c *gin.Context) { proposalLookup := connection.FilteredProposals(f, cr.Filter.SortBy, ce.proposalRepository) if ce.manager.HasConnection(cr.ProviderID) { - result.Error = contract.ErrCodeConnectionAlreadyExists + result.Error = (contract.ErrCodeConnectionAlreadyExists) return } @@ -187,12 +203,12 @@ func (ce *ConnectionDiagEndpoint) DiagBatch(c *gin.Context) { if err != nil { switch err { case connection.ErrAlreadyExists: - result.Error = contract.ErrCodeConnectionAlreadyExists + result.Error = (contract.ErrCodeConnectionAlreadyExists) case connection.ErrConnectionCancelled: - result.Error = contract.ErrCodeConnectionCancelled + result.Error = (contract.ErrCodeConnectionCancelled) default: log.Error().Err(err).Msgf("Failed to connect: %v", prov) - result.Error = contract.ErrCodeConnect + result.Error = (contract.ErrCodeConnect) } return } @@ -200,7 +216,6 @@ func (ce *ConnectionDiagEndpoint) DiagBatch(c *gin.Context) { resChannel := ce.manager.GetReadyChan(cr.ProviderID) res := <-resChannel log.Error().Msgf("Result > %v", res) - result.Status = res.(quality.DiagEvent).Result }(prov) } @@ -301,11 +316,174 @@ func (ce *ConnectionDiagEndpoint) Diag(c *gin.Context) { resp := contract.ConnectionDiagInfoDTO{ ProviderID: prov, - Status: res.(quality.DiagEvent).Result, } utils.WriteAsJSON(resp, c.Writer) } +type proposalDB struct { + ID string + Error string + DiagError string `json:"diag_error"` + Country string +} + +func (proposalDB) TableName() string { + return "node" +} + +// DiagBatch is used to start a given providers check (batch mode) +func (ce *ConnectionDiagEndpoint) DiagBatch2(c *gin.Context) { + + hermes, err := ce.addressProvider.GetActiveHermes(config.GetInt64(config.FlagChainID)) + if err != nil { + c.Error(apierror.Internal("Failed to get active hermes", contract.ErrCodeActiveHermes)) + return + } + + country := c.Query("location") + f := &proposal.Filter{ + ServiceType: "wireguard", + LocationCountry: country, + ExcludeUnsupported: true, + IncludeMonitoringFailed: true, + } + pp, err := ce.proposalRepository.Proposals(f) + if err != nil { + log.Error().Err(err).Stack().Msg("Proposals>") + } + log.Error().Msgf("pp> %v", len(pp)) + + var ( + wg sync.WaitGroup + mu sync.Mutex + ) + resultMap := make(map[string]contract.ConnectionDiagInfoDTO, len(pp)) + wg.Add(len(pp)) + + maxGoroutines := 15 + guard := make(chan struct{}, maxGoroutines) + + for _, pr := range pp { + guard <- struct{}{} + + worker := func(provID string) (result contract.ConnectionDiagInfoDTO) { + result.ProviderID = provID + + cr := &contract.ConnectionCreateRequest{ + ConsumerID: ce.consumerAddress, + ProviderID: provID, + Filter: contract.ConnectionCreateFilter{IncludeMonitoringFailed: true}, + HermesID: hermes.Hex(), + ServiceType: "wireguard", + ConnectOptions: contract.ConnectOptions{}, + } + if err := cr.Validate(); err != nil { + result.Error = err.Error() + return + } + + consumerID := identity.FromAddress(cr.ConsumerID) + status, err := ce.identityRegistry.GetRegistrationStatus(config.GetInt64(config.FlagChainID), consumerID) + if err != nil { + log.Error().Err(err).Stack().Msg("Could not check registration status") + result.Error = (contract.ErrCodeIDRegistrationCheck) + return + } + switch status { + case registry.Unregistered, registry.RegistrationError, registry.Unknown: + log.Error().Msgf("Identity %q is not registered, aborting...", cr.ConsumerID) + result.Error = (contract.ErrCodeIDNotRegistered) + return + case registry.InProgress: + log.Info().Msgf("identity %q registration is in progress, continuing...", cr.ConsumerID) + case registry.Registered: + log.Info().Msgf("identity %q is registered, continuing...", cr.ConsumerID) + default: + log.Error().Msgf("identity %q has unknown status, aborting...", cr.ConsumerID) + result.Error = (contract.ErrCodeIDStatusUnknown) + return + } + + if len(cr.ProviderID) > 0 { + cr.Filter.Providers = append(cr.Filter.Providers, cr.ProviderID) + } + f := &proposal.Filter{ + ServiceType: cr.ServiceType, + LocationCountry: cr.Filter.CountryCode, + ProviderIDs: cr.Filter.Providers, + IPType: cr.Filter.IPType, + IncludeMonitoringFailed: cr.Filter.IncludeMonitoringFailed, + AccessPolicy: "all", + } + proposalLookup := connection.FilteredProposals(f, cr.Filter.SortBy, ce.proposalRepository) + + if ce.manager.HasConnection(cr.ProviderID) { + result.Error = (contract.ErrCodeConnectionAlreadyExists) + return + } + + err = ce.manager.Connect(consumerID, common.HexToAddress(cr.HermesID), proposalLookup, getConnectOptions(cr)) + if err != nil { + switch err { + case connection.ErrAlreadyExists: + result.Error = (contract.ErrCodeConnectionAlreadyExists) + case connection.ErrConnectionCancelled: + result.Error = (contract.ErrCodeConnectionCancelled) + default: + log.Error().Err(err).Msgf("Failed to connect: %v", provID) + result.Error = (contract.ErrCodeConnect) + } + return + } + + resChannel := ce.manager.GetReadyChan(cr.ProviderID) + res := <-resChannel + log.Error().Msgf("Result > %v", res) + + ev := res.(quality.DiagEvent) + // result.Status = ev.Result + if ev.Error != nil { + result.DiagError = ev.Error.Error() + } + + return + } + go func(pr proposal.PricedServiceProposal) { + + result := worker(pr.ProviderID) + + mu.Lock() + resultMap[pr.ProviderID] = result + mu.Unlock() + + // update + provRec := proposalDB{ID: result.ProviderID, Country: pr.Location.Country} + provRec.Error = "" + provRec.DiagError = "" + provRec.Error = result.Error + provRec.DiagError = result.DiagError + if ce.db.Model(&provRec).Select("Error", "DiagError", "Country").Updates(provRec).RowsAffected == 0 { + ce.db.Create(&provRec) + } + + wg.Done() + <-guard + }(pr) + + } + wg.Wait() + + out := make([]contract.ConnectionDiagInfoDTO, 0) + for _, prov := range pp { + res := resultMap[prov.ProviderID] + if res.Error != "" || res.DiagError != "" { + out = append(out, resultMap[prov.ProviderID]) + } + + } + utils.WriteAsJSON(out, c.Writer) +} + // AddRoutesForConnectionDiag adds proder check route to given router func AddRoutesForConnectionDiag( manager connection.DiagManager, @@ -324,6 +502,7 @@ func AddRoutesForConnectionDiag( { connGroup.GET("/prov-checker", ConnectionDiagEndpoint.Diag) connGroup.POST("/prov-checker-batch", ConnectionDiagEndpoint.DiagBatch) + connGroup.GET("/prov-checker-batch2", ConnectionDiagEndpoint.DiagBatch2) } return nil }