Skip to content

Commit ffd48ed

Browse files
authored
Merge pull request #6106 from mysteriumnetwork/add-quic
Fix proper connection handling on multi-consumer setup
2 parents bb5034d + 34871e9 commit ffd48ed

File tree

4 files changed

+47
-20
lines changed

4 files changed

+47
-20
lines changed

cmd/commands/connection/command.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ var (
4949
Usage: "Proxy port",
5050
}
5151

52+
flagServiceType = cli.StringFlag{
53+
Name: "service-type",
54+
Usage: "Service type to connect to.",
55+
Value: serviceWireguard,
56+
}
57+
5258
flagCountry = cli.StringFlag{
5359
Name: "country",
5460
Usage: "Two letter (ISO 3166-1 alpha-2) country code to filter proposals.",
@@ -114,7 +120,7 @@ func NewCommand() *cli.Command {
114120
Name: "up",
115121
ArgsUsage: "[ProviderIdentityAddress]",
116122
Usage: "Create a new connection",
117-
Flags: []cli.Flag{&config.FlagAgreedTermsConditions, &flagCountry, &flagLocationType, &flagSortType, &flagIncludeFailed, &flagProxyPort},
123+
Flags: []cli.Flag{&config.FlagAgreedTermsConditions, &flagCountry, &flagLocationType, &flagSortType, &flagIncludeFailed, &flagProxyPort, &flagServiceType},
118124
Action: func(ctx *cli.Context) error {
119125
cmd.up(ctx)
120126
return nil
@@ -155,7 +161,7 @@ func (c *command) proposals(ctx *cli.Context) {
155161
return
156162
}
157163

158-
proposals, err := c.tequilapi.ProposalsByLocationAndService(serviceWireguard, locationType, locationCountry)
164+
proposals, err := c.tequilapi.ProposalsByLocationAndService(ctx.String(flagServiceType.Name), locationType, locationCountry)
159165
if err != nil {
160166
clio.Warn("Failed to fetch proposal list")
161167
return
@@ -291,7 +297,7 @@ func (c *command) up(ctx *cli.Context) {
291297
IncludeMonitoringFailed: ctx.Bool(flagIncludeFailed.Name),
292298
}
293299

294-
_, err = c.tequilapi.SmartConnectionCreate(id.Address, hermesID, serviceWireguard, filter, connectOptions)
300+
_, err = c.tequilapi.SmartConnectionCreate(id.Address, hermesID, ctx.String(flagServiceType.Name), filter, connectOptions)
295301
if err != nil {
296302
clio.Error("Failed to create a new connection: ", err)
297303
return

p2p/channel_quic.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package p2p
1919

2020
import (
2121
"context"
22+
"errors"
2223
"fmt"
2324
"net"
2425
"sync"
@@ -112,6 +113,16 @@ func (c *channelQuic) Send(ctx context.Context, topic string, msg *Message) (*Me
112113
return nil, err
113114
}
114115

116+
if readMsg.statusCode != statusCodeOK {
117+
if readMsg.statusCode == statusCodePublicErr {
118+
return nil, fmt.Errorf("public peer error: %s", string(readMsg.data))
119+
}
120+
if readMsg.statusCode == statusCodeHandlerNotFoundErr {
121+
return nil, fmt.Errorf("%s: %w", string(readMsg.data), ErrHandlerNotFound)
122+
}
123+
return nil, fmt.Errorf("peer error: %w", errors.New(readMsg.msg))
124+
}
125+
115126
return &Message{
116127
Data: readMsg.data,
117128
}, nil

p2p/dialer.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ type dialer struct {
7575
verifierFactory identity.VerifierFactory
7676
ipResolver ip.Resolver
7777
eventBus eventbus.EventBus
78-
79-
quicServer *server.QuicServer
8078
}
8179

8280
// Dial exchanges p2p configuration via broker, performs NAT pinging if needed
@@ -119,15 +117,16 @@ func (m *dialer) Dial(ctx context.Context, consumerID, providerID identity.Ident
119117
}
120118
}
121119

120+
var quicServer *server.QuicServer
122121
if serviceType == "quic_scraping" {
123-
m.quicServer, err = server.NewQuicServer()
122+
quicServer, err = server.NewQuicServer()
124123
if err != nil {
125124
return nil, fmt.Errorf("could not create QUIC server: %w", err)
126125
}
127126

128-
go m.quicServer.Start(ctx)
127+
go quicServer.Start(ctx)
129128

130-
port, err := m.quicServer.WaitForListenPort(ctx)
129+
port, err := quicServer.WaitForListenPort(ctx)
131130
if err != nil {
132131
return nil, fmt.Errorf("could not wait for listen addr: %w", err)
133132
}
@@ -155,7 +154,7 @@ func (m *dialer) Dial(ctx context.Context, consumerID, providerID identity.Ident
155154

156155
dial := m.dialPinger
157156
if len(config.peerURL) > 0 {
158-
dial = m.dialQUIC
157+
dial = m.dialQUIC(quicServer)
159158
} else if len(config.peerPorts) == requiredConnCount {
160159
dial = m.dialDirect
161160
}
@@ -175,7 +174,7 @@ func (m *dialer) Dial(ctx context.Context, consumerID, providerID identity.Ident
175174

176175
var channel communicationChannel
177176
if serviceType == "quic_scraping" {
178-
channel = newChannelQuic(conn1, config.peerID, config.compatibility)
177+
channel = newChannelQuic(conn1, providerID, config.compatibility)
179178
} else {
180179
channel, err = newChannel(conn1, config.privateKey, config.peerPubKey, config.compatibility)
181180
if err != nil {
@@ -358,8 +357,19 @@ func (m *dialer) dialDirect(ctx context.Context, providerID identity.Identity, c
358357
return conn1, conn2, err
359358
}
360359

361-
func (m *dialer) dialQUIC(ctx context.Context, providerID identity.Identity, config *p2pConnectConfig) (ServiceConn, ServiceConn, error) {
362-
return m.quicServer.CommunicationConn(ctx), m.quicServer.TransportConn(ctx), nil
360+
func (m *dialer) dialQUIC(quicServer *server.QuicServer) func(ctx context.Context, providerID identity.Identity, config *p2pConnectConfig) (ServiceConn, ServiceConn, error) {
361+
return func(ctx context.Context, providerID identity.Identity, config *p2pConnectConfig) (ServiceConn, ServiceConn, error) {
362+
conn1, err := quicServer.CommunicationConn(ctx)
363+
if err != nil {
364+
return nil, nil, fmt.Errorf("could not get QUIC communication connection: %w", err)
365+
}
366+
conn2, err := quicServer.TransportConn(ctx)
367+
if err != nil {
368+
return nil, nil, fmt.Errorf("could not get QUIC transport connection: %w", err)
369+
}
370+
371+
return conn1, conn2, nil
372+
}
363373
}
364374

365375
func (m *dialer) dialPinger(ctx context.Context, providerID identity.Identity, config *p2pConnectConfig) (ServiceConn, ServiceConn, error) {

services/quic/connection/server/server.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,34 +75,34 @@ func (s *QuicServer) Start(ctx context.Context) error {
7575
}
7676

7777
// CommunicationConn returns communication connection.
78-
func (s *QuicServer) CommunicationConn(ctx context.Context) *streams.QuicConnection {
78+
func (s *QuicServer) CommunicationConn(ctx context.Context) (*streams.QuicConnection, error) {
7979
for {
8080
select {
8181
case <-ctx.Done():
82-
return nil
82+
return nil, fmt.Errorf("context done: %w", ctx.Err())
8383
default:
8484
if s.communicationConn != nil {
85-
return &streams.QuicConnection{Connection: s.communicationConn}
85+
return &streams.QuicConnection{Connection: s.communicationConn}, nil
8686
}
8787

88-
log.Info().Msg("Waiting for communication connection")
88+
log.Debug().Msg("Waiting for communication connection")
8989
time.Sleep(200 * time.Millisecond)
9090
}
9191
}
9292
}
9393

9494
// TransportConn returns transport connection.
95-
func (s *QuicServer) TransportConn(ctx context.Context) *streams.QuicConnection {
95+
func (s *QuicServer) TransportConn(ctx context.Context) (*streams.QuicConnection, error) {
9696
for {
9797
select {
9898
case <-ctx.Done():
99-
return nil
99+
return nil, fmt.Errorf("context done: %w", ctx.Err())
100100
default:
101101
if s.transportConn != nil {
102-
return &streams.QuicConnection{Connection: s.transportConn}
102+
return &streams.QuicConnection{Connection: s.transportConn}, nil
103103
}
104104

105-
log.Info().Msg("Waiting for transport connection")
105+
log.Debug().Msg("Waiting for transport connection")
106106
time.Sleep(200 * time.Millisecond)
107107
}
108108
}

0 commit comments

Comments
 (0)