Skip to content

Commit

Permalink
Make batch mode usable for global scan of nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Litvinov <[email protected]>
  • Loading branch information
Zensey committed Jul 15, 2024
1 parent cc91377 commit adb2bff
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 36 deletions.
2 changes: 1 addition & 1 deletion core/connection/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Connection interface {

// ConnectionDiag is a specialised Connection interface for provider check
type ConnectionDiag interface {
Diag() bool
Diag() error
}

// StateChannel is the channel we receive state change events on
Expand Down
22 changes: 20 additions & 2 deletions core/connection/manager-diag.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/rs/zerolog/log"
"golang.org/x/time/rate"

"github.com/mysteriumnetwork/node/config"
"github.com/mysteriumnetwork/node/core/connection/connectionstate"
Expand Down Expand Up @@ -91,6 +92,8 @@ type diagConnectionManager struct {
// populated by Connect at runtime.
connsMu sync.Mutex
conns map[string]*conn

ratelimiter *rate.Limiter
}

// NewDiagManager creates connection manager with given dependencies
Expand Down Expand Up @@ -120,6 +123,8 @@ func NewDiagManager(
validator: validator,
p2pDialer: p2pDialer,
timeGetter: time.Now,

ratelimiter: rate.NewLimiter(rate.Every(1000*time.Millisecond), 1),
}

m.eventBus.SubscribeAsync(connectionstate.AppTopicConnectionState, m.reconnectOnHold)
Expand Down Expand Up @@ -153,6 +158,13 @@ func (m *diagConnectionManager) GetReadyChan(providerID string) chan interface{}
func (m *diagConnectionManager) Connect(consumerID identity.Identity, hermesID common.Address, proposalLookup ProposalLookup, params ConnectParams) (err error) {
var sessionID session.ID

ctx := context.Background()
err = m.ratelimiter.Wait(ctx) // This is a blocking call. Honors the rate limit
if err != nil {
log.Error().Msgf("ratelimiter.Wait: %s", err)
return err
}

proposal, err := proposalLookup()
if err != nil {
return fmt.Errorf("failed to lookup proposal: %w", err)
Expand All @@ -164,15 +176,19 @@ func (m *diagConnectionManager) Connect(consumerID identity.Identity, hermesID c
log.Debug().Msgf("Consumer connection trace: %s", traceResult)
}()

fmt.Println("Connect>", proposal.ProviderID)
log.Error().Msgf("Connect > %v", proposal.ProviderID)
uuid := proposal.ProviderID

m.connsMu.Lock()
con, ok := m.conns[uuid]
if !ok {
con = new(conn)
con.status.State = connectionstate.NotConnected
con.uuid = uuid
m.conns[uuid] = con
}
m.connsMu.Unlock()

removeConnection := func() {
m.connsMu.Lock()
defer m.connsMu.Unlock()
Expand Down Expand Up @@ -933,7 +949,9 @@ func (m *diagConnectionManager) keepAliveLoop(con *conn, channel p2p.Channel, se
if config.GetBool(config.FlagKeepConnectedOnFail) {
m.statusOnHold(con)
} else {
m.Disconnect()
//m.Disconnect()
log.Error().Msgf("Max p2p keepalive err count reached, disconnecting. SessionID=%s >>>>>>>>>", sessionID)
m.DisconnectSingle(con)
}
cancel()
return
Expand Down
7 changes: 5 additions & 2 deletions core/connection/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ import (
// Diag is used to start provider check
func Diag(cm *diagConnectionManager, con *conn, providerID string) {
c, ok := con.activeConnection.(ConnectionDiag)
res := false
res := error(nil)
if ok {
log.Debug().Msgf("Check provider> %v", providerID)

res = c.Diag()
cm.DisconnectSingle(con)
}
ev := quality.DiagEvent{ProviderID: providerID, Result: res}
ev := quality.DiagEvent{ProviderID: providerID, Result: res == nil}
if res != nil {
ev.Error = res
}
con.resChannel <- ev
close(con.resChannel)
}
1 change: 1 addition & 0 deletions core/quality/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type PingEvent struct {
type DiagEvent struct {
ProviderID string
Result bool
Error error
}

const (
Expand Down
15 changes: 10 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,15 @@ require (
github.com/imdario/mergo v0.3.12 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jinzhu/gorm v1.9.2 // indirect
github.com/jinzhu/inflection v0.0.0-20180308033659-04140366298a // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 // indirect
Expand All @@ -181,6 +185,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mdlayher/genetlink v1.1.0 // indirect
github.com/mdlayher/netlink v1.4.2 // indirect
Expand Down Expand Up @@ -263,12 +268,12 @@ require (
gopkg.in/intercom/intercom-go.v2 v2.0.0-20210504094731-2bd1af0ce4b2 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/postgres v1.5.9 // indirect
gorm.io/driver/sqlite v1.5.6 // indirect
gorm.io/gorm v1.25.11 // indirect
honnef.co/go/tools v0.4.2 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)

replace (
golang.zx2c4.com/wireguard => github.com/mysteriumnetwork/wireguard-go v0.0.0-20240416113031-406b13e8996a
//gvisor.dev/gvisor => github.com/mysteriumnetwork/gvisor v0.0.0-20240206094932-ff91e662b9e8
)
replace golang.zx2c4.com/wireguard => github.com/mysteriumnetwork/wireguard-go v0.0.0-20240416113031-406b13e8996a
20 changes: 20 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,14 @@ github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY=
github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackpal/gateway v1.0.6 h1:/MJORKvJEwNVldtGVJC2p2cwCnsSoLn3hl3zxmZT7tk=
github.com/jackpal/gateway v1.0.6/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand All @@ -974,6 +982,8 @@ github.com/jinzhu/gorm v1.9.2 h1:lCvgEaqe/HVE+tjAR2mt4HbbHAZsQOv3XAZiEZV37iw=
github.com/jinzhu/gorm v1.9.2/go.mod h1:Vla75njaFJ8clLU1W44h34PjIkijhjHIYnZxMqCdxqo=
github.com/jinzhu/inflection v0.0.0-20180308033659-04140366298a h1:eeaG9XMUvRBYXJi4pg1ZKM7nxc5AfXfojeLLW7O5J3k=
github.com/jinzhu/inflection v0.0.0-20180308033659-04140366298a/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
Expand Down Expand Up @@ -1107,6 +1117,8 @@ github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnU
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
Expand Down Expand Up @@ -2352,6 +2364,14 @@ gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8=
gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE=
gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde h1:9DShaph9qhkIYw7QF91I/ynrr4cOO2PZra2PFD7Mfeg=
gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg=
gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.4.0/go.mod h1:CtbdzLSsqVhDgMtKsx03ird5YTGB3ar27v0u/yKBW5g=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
Expand Down
2 changes: 1 addition & 1 deletion services/wireguard/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *Connection) State() <-chan connectionstate.State {
}

// Diag is used to start provider check
func (c *Connection) Diag() bool {
func (c *Connection) Diag() error {
return c.connectionEndpoint.Diag()
}

Expand Down
4 changes: 2 additions & 2 deletions services/wireguard/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ func (mce *mockConnectionEndpoint) ConfigureRoutes(_ net.IP) error { retur
func (mce *mockConnectionEndpoint) PeerStats() (wgcfg.Stats, error) {
return wgcfg.Stats{LastHandshake: time.Now(), BytesSent: 10, BytesReceived: 11}, nil
}
func (mce *mockConnectionEndpoint) Diag() bool {
return true
func (mce *mockConnectionEndpoint) Diag() error {
return nil
}

type mockHandshakeWaiter struct {
Expand Down
3 changes: 2 additions & 1 deletion services/wireguard/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ type ConnectionEndpoint interface {
Config() (ServiceConfig, error)
InterfaceName() string
Stop() error
Diag() bool

Diag() error
}
17 changes: 11 additions & 6 deletions services/wireguard/endpoint/diagclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package diagclient

import (
"bufio"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -126,25 +127,29 @@ func (c *client) Close() (err error) {
return nil
}

func (c *client) Diag() bool {
func (c *client) Diag() error {
client := http.Client{
Transport: &http.Transport{
DialContext: c.tnet.DialContext,
},
Timeout: 15 * time.Second,
}
resp, err := client.Get("http://1.1.1.1/")
resp, err := client.Get("http://107.173.23.19:8080/test")
if err != nil {
log.Error().Err(err).Msg("Get failed")
return false
return err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Error().Err(err).Msg("Readall failed")
return false
return err
}
if len(body) < 6 {
log.Error().Msg("Wrong length")
return errors.New("Wrong body length")
}
_ = body

return true
return nil
}
4 changes: 2 additions & 2 deletions services/wireguard/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ type connectionEndpoint struct {
wgClient WgClient
}

func (ce *connectionEndpoint) Diag() bool {
func (ce *connectionEndpoint) Diag() error {
c, ok := ce.wgClient.(WgClientDiag)
if ok {
return c.Diag()
}
return false
return nil
}

// StartConsumerMode starts and configure wireguard network interface running in consumer mode.
Expand Down
2 changes: 1 addition & 1 deletion services/wireguard/endpoint/wg_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type WgClient interface {

// WgClientDiag is a specialised WgClient interface for provider check
type WgClientDiag interface {
Diag() bool
Diag() error
}

// WgClientFactory represents WireGuard client factory.
Expand Down
6 changes: 3 additions & 3 deletions tequilapi/contract/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func NewConnectionInfoDTO(session connectionstate.Status) ConnectionInfoDTO {
// ConnectionDiagInfoDTO holds provider check result
// swagger:model ConnectionDiagInfoDTO
type ConnectionDiagInfoDTO struct {
Status bool `json:"status"`
Error interface{} `json:"error"`
ProviderID string `json:"provider_id"`
ProviderID string `json:"provider_id"`
Error string `json:"error"`
DiagError string `json:"diag_err"`
}

// ConnectionInfoDTO holds partial consumer connection details.
Expand Down
Loading

0 comments on commit adb2bff

Please sign in to comment.