Skip to content

Commit 62e1d37

Browse files
committed
Remove global state for ICE TCP
This addresses a few points issue of #245: - Take a net.Listener instead of having global state - Expose a net.TCPMux based API Also, the unused closeChannel was removed from tcp_mux.go Closes #253.
1 parent 1f59642 commit 62e1d37

File tree

8 files changed

+111
-172
lines changed

8 files changed

+111
-172
lines changed

agent.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ type Agent struct {
120120
loggerFactory logging.LoggerFactory
121121
log logging.LeveledLogger
122122

123-
net *vnet.Net
124-
tcp *tcpIPMux
123+
net *vnet.Net
124+
tcpMux *TCPMux
125125

126126
interfaceFilter func(string) bool
127127

@@ -306,11 +306,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) {
306306
insecureSkipVerify: config.InsecureSkipVerify,
307307
}
308308

309-
a.tcp = newTCPIPMux(tcpIPMuxParams{
310-
ListenPort: config.TCPListenPort,
311-
Logger: log,
312-
ReadBufferSize: 8,
313-
})
309+
a.tcpMux = config.TCPMux
314310

315311
if a.net == nil {
316312
a.net = vnet.NewNet(nil)
@@ -887,7 +883,11 @@ func (a *Agent) Close() error {
887883

888884
a.gatherCandidateCancel()
889885
a.err.Store(ErrClosed)
890-
a.tcp.RemoveUfrag(a.localUfrag)
886+
887+
if a.tcpMux != nil {
888+
a.tcpMux.RemoveConnByUfrag(a.localUfrag)
889+
}
890+
891891
close(a.done)
892892

893893
<-done

agent_config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ type AgentConfig struct {
139139
// to TURN servers via TLS or DTLS
140140
InsecureSkipVerify bool
141141

142-
// TCPListenPort will be used to start a TCP listener on all allowed interfaces for
143-
// ICE TCP. Currently only passive candidates are supported. This functionality is
144-
// experimental and this API will likely change in the future.
145-
TCPListenPort int
142+
// TCPMux will be used for multiplexing incoming TCP connections for ICE TCP.
143+
// Currently only passive candidates are supported. This functionality is
144+
// experimental and the API might change in the future.
145+
TCPMux *TCPMux
146146
}
147147

148148
// initWithDefaults populates an agent and falls back to defaults if fields are unset

gather.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,28 +161,23 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ
161161
var tcpType TCPType
162162
switch network {
163163
case tcp:
164-
if a.tcp == nil {
164+
if a.tcpMux == nil {
165165
continue
166166
}
167-
168167
// below is for passive mode
169168
// TODO active mode
170169
// TODO S-O mode
171170

172-
mux, muxErr := a.tcp.Listen(ip)
173-
if muxErr != nil {
174-
a.log.Warnf("could not listen %s %s\n", network, ip)
175-
continue
176-
}
177-
178171
a.log.Debugf("GetConn by ufrag: %s\n", a.localUfrag)
179-
conn, err = mux.GetConn(a.localUfrag)
172+
conn, err = a.tcpMux.GetConnByUfrag(a.localUfrag)
180173
if err != nil {
181174
a.log.Warnf("error getting tcp conn by ufrag: %s %s\n", network, ip, a.localUfrag)
182175
continue
183176
}
184177
port = conn.LocalAddr().(*net.TCPAddr).Port
185178
tcpType = TCPTypePassive
179+
// TODO is there a way to verify that the listen address is even
180+
// accessible from the current interface.
186181
case udp:
187182
conn, err = listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: ip, Port: 0})
188183
if err != nil {

gather_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ import (
1414

1515
"github.com/pion/dtls/v2"
1616
"github.com/pion/dtls/v2/pkg/crypto/selfsign"
17+
"github.com/pion/logging"
1718
"github.com/pion/transport/test"
1819
"github.com/pion/turn/v2"
1920
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
2022
)
2123

2224
func TestListenUDP(t *testing.T) {
@@ -116,11 +118,25 @@ func TestSTUNConcurrency(t *testing.T) {
116118
Port: serverPort,
117119
})
118120

121+
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
122+
IP: net.IP{127, 0, 0, 1},
123+
})
124+
require.NoError(t, err)
125+
defer func() {
126+
_ = listener.Close()
127+
}()
128+
119129
a, err := NewAgent(&AgentConfig{
120130
NetworkTypes: supportedNetworkTypes,
121131
Urls: urls,
122132
CandidateTypes: []CandidateType{CandidateTypeHost, CandidateTypeServerReflexive},
123-
TCPListenPort: 9999,
133+
TCPMux: NewTCPMux(
134+
TCPMuxParams{
135+
Listener: listener,
136+
Logger: logging.NewDefaultLoggerFactory().NewLogger("ice"),
137+
ReadBufferSize: 8,
138+
},
139+
),
124140
})
125141
assert.NoError(t, err)
126142

tcp_ip_mux.go

Lines changed: 0 additions & 102 deletions
This file was deleted.

tcp_mux.go

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,31 +11,35 @@ import (
1111
"github.com/pion/stun"
1212
)
1313

14-
type tcpMux struct {
15-
params *tcpMuxParams
14+
// TCPMux muxes TCP net.Conns into net.PacketConns and groups them by Ufrag.
15+
type TCPMux struct {
16+
params *TCPMuxParams
17+
closed bool
1618

1719
// conns is a map of all tcpPacketConns indexed by ufrag
1820
conns map[string]*tcpPacketConn
1921

20-
mu sync.Mutex
21-
wg sync.WaitGroup
22-
closedChan chan struct{}
23-
closeOnce sync.Once
22+
mu sync.Mutex
23+
wg sync.WaitGroup
2424
}
2525

26-
type tcpMuxParams struct {
26+
// TCPMuxParams are parameters for TCPMux.
27+
type TCPMuxParams struct {
2728
Listener net.Listener
2829
Logger logging.LeveledLogger
2930
ReadBufferSize int
3031
}
3132

32-
func newTCPMux(params tcpMuxParams) *tcpMux {
33-
m := &tcpMux{
33+
// NewTCPMux creates a new instance of TCPMux.
34+
func NewTCPMux(params TCPMuxParams) *TCPMux {
35+
if params.Logger == nil {
36+
params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice")
37+
}
38+
39+
m := &TCPMux{
3440
params: &params,
3541

3642
conns: map[string]*tcpPacketConn{},
37-
38-
closedChan: make(chan struct{}),
3943
}
4044

4145
m.wg.Add(1)
@@ -47,7 +51,7 @@ func newTCPMux(params tcpMuxParams) *tcpMux {
4751
return m
4852
}
4953

50-
func (m *tcpMux) start() {
54+
func (m *TCPMux) start() {
5155
m.params.Logger.Infof("Listening TCP on %s\n", m.params.Listener.Addr())
5256
for {
5357
conn, err := m.params.Listener.Accept()
@@ -66,11 +70,13 @@ func (m *tcpMux) start() {
6670
}
6771
}
6872

69-
func (m *tcpMux) LocalAddr() net.Addr {
73+
// LocalAddr returns the listening address of this TCPMux.
74+
func (m *TCPMux) LocalAddr() net.Addr {
7075
return m.params.Listener.Addr()
7176
}
7277

73-
func (m *tcpMux) GetConn(ufrag string) (net.PacketConn, error) {
78+
// GetConnByUfrag retrieves an existing or creates a new net.PacketConn.
79+
func (m *TCPMux) GetConnByUfrag(ufrag string) (net.PacketConn, error) {
7480
m.mu.Lock()
7581
defer m.mu.Unlock()
7682

@@ -86,7 +92,7 @@ func (m *tcpMux) GetConn(ufrag string) (net.PacketConn, error) {
8692
return conn, nil
8793
}
8894

89-
func (m *tcpMux) createConn(ufrag string, localAddr net.Addr) *tcpPacketConn {
95+
func (m *TCPMux) createConn(ufrag string, localAddr net.Addr) *tcpPacketConn {
9096
conn := newTCPPacketConn(tcpPacketParams{
9197
ReadBuffer: m.params.ReadBufferSize,
9298
LocalAddr: localAddr,
@@ -98,20 +104,20 @@ func (m *tcpMux) createConn(ufrag string, localAddr net.Addr) *tcpPacketConn {
98104
go func() {
99105
defer m.wg.Done()
100106
<-conn.CloseChannel()
101-
m.RemoveConn(ufrag)
107+
m.RemoveConnByUfrag(ufrag)
102108
}()
103109

104110
return conn
105111
}
106112

107-
func (m *tcpMux) closeAndLogError(closer io.Closer) {
113+
func (m *TCPMux) closeAndLogError(closer io.Closer) {
108114
err := closer.Close()
109115
if err != nil {
110116
m.params.Logger.Warnf("Error closing connection: %s", err)
111117
}
112118
}
113119

114-
func (m *tcpMux) handleConn(conn net.Conn) {
120+
func (m *TCPMux) handleConn(conn net.Conn) {
115121
buf := make([]byte, receiveMTU)
116122

117123
n, err := readStreamingPacket(conn, buf)
@@ -169,43 +175,34 @@ func (m *tcpMux) handleConn(conn net.Conn) {
169175
}
170176
}
171177

172-
func (m *tcpMux) Close() error {
178+
// Close closes the listener and waits for all goroutines to exit.
179+
func (m *TCPMux) Close() error {
173180
m.mu.Lock()
181+
m.closed = true
174182

175-
m.closeOnce.Do(func() {
176-
close(m.closedChan)
177-
})
178-
183+
for _, conn := range m.conns {
184+
m.closeAndLogError(conn)
185+
}
179186
m.conns = map[string]*tcpPacketConn{}
180-
m.mu.Unlock()
181187

182188
err := m.params.Listener.Close()
183189

190+
m.mu.Unlock()
191+
184192
m.wg.Wait()
185193

186194
return err
187195
}
188196

189-
func (m *tcpMux) CloseChannel() <-chan struct{} {
190-
return m.closedChan
191-
}
192-
193-
func (m *tcpMux) RemoveConn(ufrag string) {
197+
// RemoveConnByUfrag closes and removes a net.PacketConn by Ufrag.
198+
func (m *TCPMux) RemoveConnByUfrag(ufrag string) {
194199
m.mu.Lock()
195200
defer m.mu.Unlock()
196201

197202
if conn, ok := m.conns[ufrag]; ok {
198203
m.closeAndLogError(conn)
199204
delete(m.conns, ufrag)
200205
}
201-
202-
if len(m.conns) == 0 {
203-
m.closeOnce.Do(func() {
204-
close(m.closedChan)
205-
})
206-
207-
m.closeAndLogError(m.params.Listener)
208-
}
209206
}
210207

211208
const streamingPacketHeaderLen = 2

0 commit comments

Comments
 (0)