Skip to content

Commit

Permalink
Add concurrency for diag endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Litvinov <[email protected]>
  • Loading branch information
Zensey committed May 24, 2024
1 parent 5d82134 commit 6a2878c
Show file tree
Hide file tree
Showing 9 changed files with 1,053 additions and 106 deletions.
2 changes: 1 addition & 1 deletion cmd/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (di *Dependencies) bootstrapTequilapi(nodeOptions node.Options, listener ne
tequilapi_endpoints.AddRoutesForAuthentication(di.Authenticator, di.JWTAuthenticator, di.SSOMystnodes),
tequilapi_endpoints.AddRoutesForIdentities(di.IdentityManager, di.IdentitySelector, di.IdentityRegistry, di.ConsumerBalanceTracker, di.AddressProvider, di.HermesChannelRepository, di.BCHelper, di.Transactor, di.BeneficiaryProvider, di.IdentityMover, di.BeneficiaryAddressStorage, di.HermesMigrator),
tequilapi_endpoints.AddRoutesForConnection(di.MultiConnectionManager, di.StateKeeper, di.ProposalRepository, di.IdentityRegistry, di.EventBus, di.AddressProvider),
tequilapi_endpoints.AddRoutesForConnectionDiag(di.MultiConnectionManager, di.StateKeeper, di.ProposalRepository, di.IdentityRegistry, di.EventBus, di.EventBus, di.AddressProvider, di.IdentitySelector, nodeOptions),
tequilapi_endpoints.AddRoutesForConnectionDiag(di.MultiConnectionDiagManager, di.StateKeeper, di.ProposalRepository, di.IdentityRegistry, di.EventBus, di.EventBus, di.AddressProvider, di.IdentitySelector, nodeOptions),
tequilapi_endpoints.AddRoutesForSessions(di.SessionStorage),
tequilapi_endpoints.AddRoutesForConnectionLocation(di.IPResolver, di.LocationResolver, di.LocationResolver),
tequilapi_endpoints.AddRoutesForProposals(di.ProposalRepository, di.PricingHelper, di.LocationResolver, di.FilterPresetStorage, di.NATProber),
Expand Down
37 changes: 30 additions & 7 deletions cmd/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ type Dependencies struct {

EventBus eventbus.EventBus

MultiConnectionManager connection.MultiManager
ConnectionRegistry *connection.Registry
MultiConnectionManager connection.MultiManager
MultiConnectionDiagManager connection.DiagManager
ConnectionRegistry *connection.Registry

ServicesManager *service.Manager
ServiceRegistry *service.Registry
Expand Down Expand Up @@ -210,8 +211,6 @@ type Dependencies struct {
NodeStatusTracker *monitoring.StatusTracker
NodeStatsTracker *node.StatsTracker
uiVersionConfig versionmanager.NodeUIVersionConfig

provPinger *connection.ProviderChecker
}

// Bootstrap initiates all container dependencies
Expand Down Expand Up @@ -608,10 +607,34 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options, tequil
di.P2PDialer,
di.allowTrustedDomainBypassTunnel,
di.disallowTrustedDomainBypassTunnel,
di.provPinger,
)
})

if nodeOptions.ProvChecker {
di.MultiConnectionDiagManager = connection.NewDiagManager(
pingpong.ExchangeFactoryFunc(
di.Keystore,
di.SignerFactory,
di.ConsumerTotalsStorage,
di.AddressProvider,
di.EventBus,
nodeOptions.Payments.ConsumerDataLeewayMegabytes,
),
di.ConnectionRegistry.CreateConnection,
di.EventBus,
di.IPResolver,
di.LocationResolver,
connection.DefaultConfig(),
config.GetDuration(config.FlagStatsReportInterval),
connection.NewValidator(
di.ConsumerBalanceTracker,
di.IdentityManager,
),
di.P2PDialer,
di.allowTrustedDomainBypassTunnel,
di.disallowTrustedDomainBypassTunnel,
)
}
di.NATProber = natprobe.NewNATProber(di.MultiConnectionManager, di.EventBus)

di.LogCollector = logconfig.NewCollector(&logconfig.CurrentLogOptions)
Expand Down Expand Up @@ -660,7 +683,7 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options, tequil
sleepNotifier := sleep.NewNotifier(di.MultiConnectionManager, di.EventBus)
sleepNotifier.Subscribe()

di.Node = NewNode(di.MultiConnectionManager, tequilapiHTTPServer, di.EventBus, di.UIServer, sleepNotifier)
di.Node = NewNode(di.MultiConnectionManager, di.MultiConnectionDiagManager, tequilapiHTTPServer, di.EventBus, di.UIServer, sleepNotifier)

return nil
}
Expand Down Expand Up @@ -930,7 +953,7 @@ func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality,
}

if nodeOptions.ProvChecker {
di.provPinger = connection.NewProviderChecker(di.EventBus)
// di.provPinger = connection.NewProviderChecker(di.EventBus)
}

return nil
Expand Down
26 changes: 15 additions & 11 deletions cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,27 @@ type SleepNotifier interface {
}

// NewNode function creates new Mysterium node by given options
func NewNode(connectionManager connection.MultiManager, tequilapiServer tequilapi.APIServer, publisher Publisher, uiServer UIServer, notifier SleepNotifier) *Node {
func NewNode(connectionManager connection.MultiManager, connectionDiagManager connection.DiagManager, tequilapiServer tequilapi.APIServer, publisher Publisher, uiServer UIServer, notifier SleepNotifier) *Node {
return &Node{
connectionManager: connectionManager,
httpAPIServer: tequilapiServer,
publisher: publisher,
uiServer: uiServer,
sleepNotifier: notifier,
connectionManager: connectionManager,
connectionDiagManager: connectionDiagManager,

httpAPIServer: tequilapiServer,
publisher: publisher,
uiServer: uiServer,
sleepNotifier: notifier,
}
}

// Node represent entrypoint for Mysterium node with top level components
type Node struct {
connectionManager connection.MultiManager
httpAPIServer tequilapi.APIServer
publisher Publisher
uiServer UIServer
sleepNotifier SleepNotifier
connectionManager connection.MultiManager
connectionDiagManager connection.DiagManager

httpAPIServer tequilapi.APIServer
publisher Publisher
uiServer UIServer
sleepNotifier SleepNotifier
}

// Start starts Mysterium node (Tequilapi service, fetches location)
Expand Down
12 changes: 12 additions & 0 deletions core/connection/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,15 @@ type MultiManager interface {
// Reconnect reconnects current session
Reconnect(n int)
}

// DiagManager interface provides methods to manage diagnotic connection
type DiagManager interface {
// Connect creates new connection from given consumer to provider, reports error if connection already exists
Connect(consumerID identity.Identity, hermesID common.Address, proposal ProposalLookup, params ConnectParams) error
// Status queries current status of connection
Status() connectionstate.Status
// GetReadyChan returns a channel for getting a diagnostic result
GetReadyChan(providerID string) chan interface{}
// HasConnection returns true if a diagnostic connection is already established
HasConnection(providerID string) bool
}
Loading

0 comments on commit 6a2878c

Please sign in to comment.