Skip to content

Commit 32c27d4

Browse files
committed
Remove global state for ICE TCP
This addresses a few points issue of #245: - Take a net.Listener instead of having global state - Expose a net.TCPMux based API Also, the unused closeChannel was removed from tcp_mux.go Closes #253.
1 parent 1f59642 commit 32c27d4

File tree

8 files changed

+109
-168
lines changed

8 files changed

+109
-168
lines changed

agent.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ type Agent struct {
120120
loggerFactory logging.LoggerFactory
121121
log logging.LeveledLogger
122122

123-
net *vnet.Net
124-
tcp *tcpIPMux
123+
net *vnet.Net
124+
tcpMux *TCPMux
125125

126126
interfaceFilter func(string) bool
127127

@@ -306,11 +306,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) {
306306
insecureSkipVerify: config.InsecureSkipVerify,
307307
}
308308

309-
a.tcp = newTCPIPMux(tcpIPMuxParams{
310-
ListenPort: config.TCPListenPort,
311-
Logger: log,
312-
ReadBufferSize: 8,
313-
})
309+
a.tcpMux = config.TCPMux
314310

315311
if a.net == nil {
316312
a.net = vnet.NewNet(nil)
@@ -887,7 +883,11 @@ func (a *Agent) Close() error {
887883

888884
a.gatherCandidateCancel()
889885
a.err.Store(ErrClosed)
890-
a.tcp.RemoveUfrag(a.localUfrag)
886+
887+
if a.tcpMux != nil {
888+
a.tcpMux.RemoveConnByUfrag(a.localUfrag)
889+
}
890+
891891
close(a.done)
892892

893893
<-done

agent_config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ type AgentConfig struct {
139139
// to TURN servers via TLS or DTLS
140140
InsecureSkipVerify bool
141141

142-
// TCPListenPort will be used to start a TCP listener on all allowed interfaces for
143-
// ICE TCP. Currently only passive candidates are supported. This functionality is
144-
// experimental and this API will likely change in the future.
145-
TCPListenPort int
142+
// TCPMux will be used for multiplexing incoming TCP connections for ICE TCP.
143+
// Currently only passive candidates are supported. This functionality is
144+
// experimental and the API might change in the future.
145+
TCPMux *TCPMux
146146
}
147147

148148
// initWithDefaults populates an agent and falls back to defaults if fields are unset

gather.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,28 +161,23 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ
161161
var tcpType TCPType
162162
switch network {
163163
case tcp:
164-
if a.tcp == nil {
164+
if a.tcpMux == nil {
165165
continue
166166
}
167-
168167
// below is for passive mode
169168
// TODO active mode
170169
// TODO S-O mode
171170

172-
mux, muxErr := a.tcp.Listen(ip)
173-
if muxErr != nil {
174-
a.log.Warnf("could not listen %s %s\n", network, ip)
175-
continue
176-
}
177-
178171
a.log.Debugf("GetConn by ufrag: %s\n", a.localUfrag)
179-
conn, err = mux.GetConn(a.localUfrag)
172+
conn, err = a.tcpMux.GetConnByUfrag(a.localUfrag)
180173
if err != nil {
181174
a.log.Warnf("error getting tcp conn by ufrag: %s %s\n", network, ip, a.localUfrag)
182175
continue
183176
}
184177
port = conn.LocalAddr().(*net.TCPAddr).Port
185178
tcpType = TCPTypePassive
179+
// TODO is there a way to verify that the listen address is even
180+
// accessible from the current interface.
186181
case udp:
187182
conn, err = listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: ip, Port: 0})
188183
if err != nil {

gather_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ import (
1414

1515
"github.com/pion/dtls/v2"
1616
"github.com/pion/dtls/v2/pkg/crypto/selfsign"
17+
"github.com/pion/logging"
1718
"github.com/pion/transport/test"
1819
"github.com/pion/turn/v2"
1920
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
2022
)
2123

2224
func TestListenUDP(t *testing.T) {
@@ -116,11 +118,25 @@ func TestSTUNConcurrency(t *testing.T) {
116118
Port: serverPort,
117119
})
118120

121+
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
122+
IP: net.IP{127, 0, 0, 1},
123+
})
124+
require.NoError(t, err)
125+
defer func() {
126+
_ = listener.Close()
127+
}()
128+
119129
a, err := NewAgent(&AgentConfig{
120130
NetworkTypes: supportedNetworkTypes,
121131
Urls: urls,
122132
CandidateTypes: []CandidateType{CandidateTypeHost, CandidateTypeServerReflexive},
123-
TCPListenPort: 9999,
133+
TCPMux: NewTCPMux(
134+
TCPMuxParams{
135+
Listener: listener,
136+
Logger: logging.NewDefaultLoggerFactory().NewLogger("ice"),
137+
ReadBufferSize: 8,
138+
},
139+
),
124140
})
125141
assert.NoError(t, err)
126142

tcp_ip_mux.go

Lines changed: 0 additions & 102 deletions
This file was deleted.

tcp_mux.go

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,31 +11,34 @@ import (
1111
"github.com/pion/stun"
1212
)
1313

14-
type tcpMux struct {
15-
params *tcpMuxParams
14+
// TCPMux muxes TCP net.Conns into net.PacketConns and groups them by Ufrag.
15+
type TCPMux struct {
16+
params *TCPMuxParams
1617

1718
// conns is a map of all tcpPacketConns indexed by ufrag
1819
conns map[string]*tcpPacketConn
1920

20-
mu sync.Mutex
21-
wg sync.WaitGroup
22-
closedChan chan struct{}
23-
closeOnce sync.Once
21+
mu sync.Mutex
22+
wg sync.WaitGroup
2423
}
2524

26-
type tcpMuxParams struct {
25+
// TCPMuxParams are parameters for TCPMux.
26+
type TCPMuxParams struct {
2727
Listener net.Listener
2828
Logger logging.LeveledLogger
2929
ReadBufferSize int
3030
}
3131

32-
func newTCPMux(params tcpMuxParams) *tcpMux {
33-
m := &tcpMux{
32+
// NewTCPMux creates a new instance of TCPMux.
33+
func NewTCPMux(params TCPMuxParams) *TCPMux {
34+
if params.Logger == nil {
35+
params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice")
36+
}
37+
38+
m := &TCPMux{
3439
params: &params,
3540

3641
conns: map[string]*tcpPacketConn{},
37-
38-
closedChan: make(chan struct{}),
3942
}
4043

4144
m.wg.Add(1)
@@ -47,7 +50,7 @@ func newTCPMux(params tcpMuxParams) *tcpMux {
4750
return m
4851
}
4952

50-
func (m *tcpMux) start() {
53+
func (m *TCPMux) start() {
5154
m.params.Logger.Infof("Listening TCP on %s\n", m.params.Listener.Addr())
5255
for {
5356
conn, err := m.params.Listener.Accept()
@@ -66,11 +69,13 @@ func (m *tcpMux) start() {
6669
}
6770
}
6871

69-
func (m *tcpMux) LocalAddr() net.Addr {
72+
// LocalAddr returns the listening address of this TCPMux.
73+
func (m *TCPMux) LocalAddr() net.Addr {
7074
return m.params.Listener.Addr()
7175
}
7276

73-
func (m *tcpMux) GetConn(ufrag string) (net.PacketConn, error) {
77+
// GetConnByUfrag retrieves an existing or creates a new net.PacketConn.
78+
func (m *TCPMux) GetConnByUfrag(ufrag string) (net.PacketConn, error) {
7479
m.mu.Lock()
7580
defer m.mu.Unlock()
7681

@@ -86,7 +91,7 @@ func (m *tcpMux) GetConn(ufrag string) (net.PacketConn, error) {
8691
return conn, nil
8792
}
8893

89-
func (m *tcpMux) createConn(ufrag string, localAddr net.Addr) *tcpPacketConn {
94+
func (m *TCPMux) createConn(ufrag string, localAddr net.Addr) *tcpPacketConn {
9095
conn := newTCPPacketConn(tcpPacketParams{
9196
ReadBuffer: m.params.ReadBufferSize,
9297
LocalAddr: localAddr,
@@ -98,20 +103,20 @@ func (m *tcpMux) createConn(ufrag string, localAddr net.Addr) *tcpPacketConn {
98103
go func() {
99104
defer m.wg.Done()
100105
<-conn.CloseChannel()
101-
m.RemoveConn(ufrag)
106+
m.RemoveConnByUfrag(ufrag)
102107
}()
103108

104109
return conn
105110
}
106111

107-
func (m *tcpMux) closeAndLogError(closer io.Closer) {
112+
func (m *TCPMux) closeAndLogError(closer io.Closer) {
108113
err := closer.Close()
109114
if err != nil {
110115
m.params.Logger.Warnf("Error closing connection: %s", err)
111116
}
112117
}
113118

114-
func (m *tcpMux) handleConn(conn net.Conn) {
119+
func (m *TCPMux) handleConn(conn net.Conn) {
115120
buf := make([]byte, receiveMTU)
116121

117122
n, err := readStreamingPacket(conn, buf)
@@ -169,28 +174,26 @@ func (m *tcpMux) handleConn(conn net.Conn) {
169174
}
170175
}
171176

172-
func (m *tcpMux) Close() error {
177+
// Close closes the listener and waits for all goroutines to exit.
178+
func (m *TCPMux) Close() error {
173179
m.mu.Lock()
174180

175-
m.closeOnce.Do(func() {
176-
close(m.closedChan)
177-
})
178-
181+
for _, conn := range m.conns {
182+
m.closeAndLogError(conn)
183+
}
179184
m.conns = map[string]*tcpPacketConn{}
180-
m.mu.Unlock()
181185

182186
err := m.params.Listener.Close()
183187

188+
m.mu.Unlock()
189+
184190
m.wg.Wait()
185191

186192
return err
187193
}
188194

189-
func (m *tcpMux) CloseChannel() <-chan struct{} {
190-
return m.closedChan
191-
}
192-
193-
func (m *tcpMux) RemoveConn(ufrag string) {
195+
// RemoveConnByUfrag closes and removes a net.PacketConn by Ufrag.
196+
func (m *TCPMux) RemoveConnByUfrag(ufrag string) {
194197
m.mu.Lock()
195198
defer m.mu.Unlock()
196199

@@ -200,10 +203,6 @@ func (m *tcpMux) RemoveConn(ufrag string) {
200203
}
201204

202205
if len(m.conns) == 0 {
203-
m.closeOnce.Do(func() {
204-
close(m.closedChan)
205-
})
206-
207206
m.closeAndLogError(m.params.Listener)
208207
}
209208
}

0 commit comments

Comments
 (0)