Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0652bcb

Browse files
committedJul 21, 2020
Remove global state for ICE TCP
This addresses a few points issue of #245: - Take a net.Listener instead of having global state - Expose a net.TCPMux based API Also, the unused closeChannel was removed from tcp_mux.go Closes #253.
1 parent 1f59642 commit 0652bcb

9 files changed

+71
-167
lines changed
 

‎agent.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ type Agent struct {
120120
loggerFactory logging.LoggerFactory
121121
log logging.LeveledLogger
122122

123-
net *vnet.Net
124-
tcp *tcpIPMux
123+
net *vnet.Net
124+
tcpMux *TCPMux
125125

126126
interfaceFilter func(string) bool
127127

@@ -306,11 +306,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) {
306306
insecureSkipVerify: config.InsecureSkipVerify,
307307
}
308308

309-
a.tcp = newTCPIPMux(tcpIPMuxParams{
310-
ListenPort: config.TCPListenPort,
311-
Logger: log,
312-
ReadBufferSize: 8,
313-
})
309+
a.tcpMux = config.TCPMux
314310

315311
if a.net == nil {
316312
a.net = vnet.NewNet(nil)
@@ -887,7 +883,11 @@ func (a *Agent) Close() error {
887883

888884
a.gatherCandidateCancel()
889885
a.err.Store(ErrClosed)
890-
a.tcp.RemoveUfrag(a.localUfrag)
886+
887+
if a.tcpMux != nil {
888+
a.tcpMux.RemoveConnByUfrag(a.localUfrag)
889+
}
890+
891891
close(a.done)
892892

893893
<-done

‎agent_config.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ type AgentConfig struct {
139139
// to TURN servers via TLS or DTLS
140140
InsecureSkipVerify bool
141141

142-
// TCPListenPort will be used to start a TCP listener on all allowed interfaces for
143-
// ICE TCP. Currently only passive candidates are supported. This functionality is
144-
// experimental and this API will likely change in the future.
145-
TCPListenPort int
142+
// TCPMux will be used for multiplexing incoming TCP connections for ICE TCP.
143+
// Currently only passive candidates are supported. This functionality is
144+
// experimental and the API might change in the future.
145+
TCPMux *TCPMux
146146
}
147147

148148
// initWithDefaults populates an agent and falls back to defaults if fields are unset

‎gather.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -161,28 +161,23 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ
161161
var tcpType TCPType
162162
switch network {
163163
case tcp:
164-
if a.tcp == nil {
164+
if a.tcpMux == nil {
165165
continue
166166
}
167-
168167
// below is for passive mode
169168
// TODO active mode
170169
// TODO S-O mode
171170

172-
mux, muxErr := a.tcp.Listen(ip)
173-
if muxErr != nil {
174-
a.log.Warnf("could not listen %s %s\n", network, ip)
175-
continue
176-
}
177-
178171
a.log.Debugf("GetConn by ufrag: %s\n", a.localUfrag)
179-
conn, err = mux.GetConn(a.localUfrag)
172+
conn, err = a.tcpMux.GetConnByUfrag(a.localUfrag)
180173
if err != nil {
181174
a.log.Warnf("error getting tcp conn by ufrag: %s %s\n", network, ip, a.localUfrag)
182175
continue
183176
}
184177
port = conn.LocalAddr().(*net.TCPAddr).Port
185178
tcpType = TCPTypePassive
179+
// TODO is there a way to verify that the listen address is even
180+
// accessible from the current interface.
186181
case udp:
187182
conn, err = listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: ip, Port: 0})
188183
if err != nil {

‎gather_test.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ import (
1414

1515
"github.com/pion/dtls/v2"
1616
"github.com/pion/dtls/v2/pkg/crypto/selfsign"
17+
"github.com/pion/logging"
1718
"github.com/pion/transport/test"
1819
"github.com/pion/turn/v2"
1920
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
2022
)
2123

2224
func TestListenUDP(t *testing.T) {
@@ -116,11 +118,25 @@ func TestSTUNConcurrency(t *testing.T) {
116118
Port: serverPort,
117119
})
118120

121+
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
122+
IP: net.IP{127, 0, 0, 1},
123+
})
124+
require.NoError(t, err)
125+
defer func() {
126+
_ = listener.Close()
127+
}()
128+
119129
a, err := NewAgent(&AgentConfig{
120130
NetworkTypes: supportedNetworkTypes,
121131
Urls: urls,
122132
CandidateTypes: []CandidateType{CandidateTypeHost, CandidateTypeServerReflexive},
123-
TCPListenPort: 9999,
133+
TCPMux: NewTCPMux(
134+
TCPMuxParams{
135+
Listener: listener,
136+
Logger: logging.NewDefaultLoggerFactory().NewLogger("ice"),
137+
ReadBufferSize: 8,
138+
},
139+
),
124140
})
125141
assert.NoError(t, err)
126142

‎go.sum

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
44
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
55
github.com/pion/dtls/v2 v2.0.2 h1:FHCHTiM182Y8e15aFTiORroiATUI16ryHiQh8AIOJ1E=
66
github.com/pion/dtls/v2 v2.0.2/go.mod h1:27PEO3MDdaCfo21heT59/vsdmZc0zMt9wQPcSlLu/1I=
7+
github.com/pion/ice v0.7.17 h1:0dD2RASsDY/28idIKcVZkZgGctORIW5wGJJx93lyD3s=
78
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
89
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
910
github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY=

‎tcp_ip_mux.go

-102
This file was deleted.

‎tcp_mux.go

+20-32
Original file line numberDiff line numberDiff line change
@@ -11,31 +11,31 @@ import (
1111
"github.com/pion/stun"
1212
)
1313

14-
type tcpMux struct {
15-
params *tcpMuxParams
14+
type TCPMux struct {
15+
params *TCPMuxParams
1616

1717
// conns is a map of all tcpPacketConns indexed by ufrag
1818
conns map[string]*tcpPacketConn
1919

20-
mu sync.Mutex
21-
wg sync.WaitGroup
22-
closedChan chan struct{}
23-
closeOnce sync.Once
20+
mu sync.Mutex
21+
wg sync.WaitGroup
2422
}
2523

26-
type tcpMuxParams struct {
24+
type TCPMuxParams struct {
2725
Listener net.Listener
2826
Logger logging.LeveledLogger
2927
ReadBufferSize int
3028
}
3129

32-
func newTCPMux(params tcpMuxParams) *tcpMux {
33-
m := &tcpMux{
30+
func NewTCPMux(params TCPMuxParams) *TCPMux {
31+
if params.Logger == nil {
32+
params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice")
33+
}
34+
35+
m := &TCPMux{
3436
params: &params,
3537

3638
conns: map[string]*tcpPacketConn{},
37-
38-
closedChan: make(chan struct{}),
3939
}
4040

4141
m.wg.Add(1)
@@ -47,7 +47,7 @@ func newTCPMux(params tcpMuxParams) *tcpMux {
4747
return m
4848
}
4949

50-
func (m *tcpMux) start() {
50+
func (m *TCPMux) start() {
5151
m.params.Logger.Infof("Listening TCP on %s\n", m.params.Listener.Addr())
5252
for {
5353
conn, err := m.params.Listener.Accept()
@@ -66,11 +66,11 @@ func (m *tcpMux) start() {
6666
}
6767
}
6868

69-
func (m *tcpMux) LocalAddr() net.Addr {
69+
func (m *TCPMux) LocalAddr() net.Addr {
7070
return m.params.Listener.Addr()
7171
}
7272

73-
func (m *tcpMux) GetConn(ufrag string) (net.PacketConn, error) {
73+
func (m *TCPMux) GetConnByUfrag(ufrag string) (net.PacketConn, error) {
7474
m.mu.Lock()
7575
defer m.mu.Unlock()
7676

@@ -86,7 +86,7 @@ func (m *tcpMux) GetConn(ufrag string) (net.PacketConn, error) {
8686
return conn, nil
8787
}
8888

89-
func (m *tcpMux) createConn(ufrag string, localAddr net.Addr) *tcpPacketConn {
89+
func (m *TCPMux) createConn(ufrag string, localAddr net.Addr) *tcpPacketConn {
9090
conn := newTCPPacketConn(tcpPacketParams{
9191
ReadBuffer: m.params.ReadBufferSize,
9292
LocalAddr: localAddr,
@@ -98,20 +98,20 @@ func (m *tcpMux) createConn(ufrag string, localAddr net.Addr) *tcpPacketConn {
9898
go func() {
9999
defer m.wg.Done()
100100
<-conn.CloseChannel()
101-
m.RemoveConn(ufrag)
101+
m.RemoveConnByUfrag(ufrag)
102102
}()
103103

104104
return conn
105105
}
106106

107-
func (m *tcpMux) closeAndLogError(closer io.Closer) {
107+
func (m *TCPMux) closeAndLogError(closer io.Closer) {
108108
err := closer.Close()
109109
if err != nil {
110110
m.params.Logger.Warnf("Error closing connection: %s", err)
111111
}
112112
}
113113

114-
func (m *tcpMux) handleConn(conn net.Conn) {
114+
func (m *TCPMux) handleConn(conn net.Conn) {
115115
buf := make([]byte, receiveMTU)
116116

117117
n, err := readStreamingPacket(conn, buf)
@@ -169,13 +169,9 @@ func (m *tcpMux) handleConn(conn net.Conn) {
169169
}
170170
}
171171

172-
func (m *tcpMux) Close() error {
172+
func (m *TCPMux) Close() error {
173173
m.mu.Lock()
174174

175-
m.closeOnce.Do(func() {
176-
close(m.closedChan)
177-
})
178-
179175
m.conns = map[string]*tcpPacketConn{}
180176
m.mu.Unlock()
181177

@@ -186,11 +182,7 @@ func (m *tcpMux) Close() error {
186182
return err
187183
}
188184

189-
func (m *tcpMux) CloseChannel() <-chan struct{} {
190-
return m.closedChan
191-
}
192-
193-
func (m *tcpMux) RemoveConn(ufrag string) {
185+
func (m *TCPMux) RemoveConnByUfrag(ufrag string) {
194186
m.mu.Lock()
195187
defer m.mu.Unlock()
196188

@@ -200,10 +192,6 @@ func (m *tcpMux) RemoveConn(ufrag string) {
200192
}
201193

202194
if len(m.conns) == 0 {
203-
m.closeOnce.Do(func() {
204-
close(m.closedChan)
205-
})
206-
207195
m.closeAndLogError(m.params.Listener)
208196
}
209197
}

‎tcp_ip_mux_test.go ‎tcp_mux_test.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,27 @@ import (
1111
"github.com/stretchr/testify/require"
1212
)
1313

14-
func TestTCP_Recv(t *testing.T) {
14+
func TestTCPMux_Recv(t *testing.T) {
1515
report := test.CheckRoutines(t)
1616
defer report()
1717

1818
loggerFactory := logging.NewDefaultLoggerFactory()
1919

20-
tim := newTCPIPMux(tcpIPMuxParams{
21-
ListenPort: 8080,
20+
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
21+
IP: net.IP{127, 0, 0, 1},
22+
Port: 0,
23+
})
24+
require.NoError(t, err, "error starting listener")
25+
defer func() {
26+
_ = listener.Close()
27+
}()
28+
29+
tcpMux := NewTCPMux(TCPMuxParams{
30+
Listener: listener,
2231
Logger: loggerFactory.NewLogger("ice"),
2332
ReadBufferSize: 20,
2433
})
2534

26-
tcpMux, err := tim.Listen(net.IP{127, 0, 0, 1})
27-
require.NoError(t, err, "error starting listener")
2835
defer func() {
2936
_ = tcpMux.Close()
3037
}()
@@ -42,7 +49,7 @@ func TestTCP_Recv(t *testing.T) {
4249
n, err := writeStreamingPacket(conn, msg.Raw)
4350
require.NoError(t, err, "error writing tcp stun packet")
4451

45-
pktConn, err := tcpMux.GetConn("myufrag")
52+
pktConn, err := tcpMux.GetConnByUfrag("myufrag")
4653
require.NoError(t, err, "error retrieving muxed connection for ufrag")
4754
defer func() {
4855
_ = pktConn.Close()

‎tcp_packet_conn.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,16 @@ func (t *tcpPacketConn) AddConn(conn net.Conn, firstPacketData []byte) error {
6262
}
6363

6464
if _, ok := t.conns[conn.RemoteAddr().String()]; ok {
65-
return ErrTCPRemoteAddrAlreadyExists
65+
return fmt.Errorf("connection with same remote address already exists: %s", conn.RemoteAddr().String())
6666
}
6767

6868
t.conns[conn.RemoteAddr().String()] = conn
6969

70-
if firstPacketData != nil {
71-
t.recvChan <- streamingPacket{firstPacketData, conn.RemoteAddr(), nil}
72-
}
73-
7470
t.wg.Add(1)
7571
go func() {
72+
if firstPacketData != nil {
73+
t.recvChan <- streamingPacket{firstPacketData, conn.RemoteAddr(), nil}
74+
}
7675
defer t.wg.Done()
7776
t.startReading(conn)
7877
}()

0 commit comments

Comments
 (0)
Please sign in to comment.