Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: general optimizations #1

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions candidate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
"github.com/pion/stun/v2"
)

var (

Check failure on line 21 in candidate_base.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gofumpt`-ed (gofumpt)
startTime = time.Now()

Check failure on line 22 in candidate_base.go

View workflow job for this annotation

GitHub Actions / lint / Go

startTime is a global variable (gochecknoglobals)
)

type candidateBase struct {
id string
networkType NetworkType
Expand All @@ -31,9 +35,9 @@

resolvedAddr net.Addr

lastSent atomic.Value
lastReceived atomic.Value
conn net.PacketConn
lastSentAfter atomic.Int64
lastReceivedAfter atomic.Int64
conn net.PacketConn

currAgent *Agent
closeCh chan struct{}
Expand Down Expand Up @@ -207,6 +211,7 @@
c.conn = conn
c.closeCh = make(chan struct{})
c.closedCh = make(chan struct{})
startTime = time.Now()

go c.recvLoop(initializedCh)
}
Expand Down Expand Up @@ -400,34 +405,47 @@
// LastReceived returns a time.Time indicating the last time
// this candidate was received
func (c *candidateBase) LastReceived() time.Time {
if lastReceived, ok := c.lastReceived.Load().(time.Time); ok {
return lastReceived
// if lastReceived, ok := c.lastReceived.Load().(time.Time); ok {
// return lastReceived
// }
// return time.Time{}
diff := c.lastReceivedAfter.Load()
if diff > 0 {
return startTime.Add(time.Duration(diff))
}

return time.Time{}
}

func (c *candidateBase) setLastReceived(t time.Time) {
c.lastReceived.Store(t)
func (c *candidateBase) setLastReceived() {
c.lastReceivedAfter.Store(time.Since(startTime).Nanoseconds())
}

// LastSent returns a time.Time indicating the last time
// this candidate was sent
func (c *candidateBase) LastSent() time.Time {
if lastSent, ok := c.lastSent.Load().(time.Time); ok {
return lastSent
// if lastSent, ok := c.lastSent.Load().(time.Time); ok {
// return lastSent
// }
// return time.Time{}

diff := c.lastSentAfter.Load()
if diff > 0 {
return startTime.Add(time.Duration(diff))
}

return time.Time{}
}

func (c *candidateBase) setLastSent(t time.Time) {
c.lastSent.Store(t)
func (c *candidateBase) setLastSent() {
c.lastSentAfter.Store(time.Since(startTime).Nanoseconds())
}

func (c *candidateBase) seen(outbound bool) {
if outbound {
c.setLastSent(time.Now())
c.setLastSent() //time.Now())

Check failure on line 446 in candidate_base.go

View workflow job for this annotation

GitHub Actions / lint / Go

commentFormatting: put a space between `//` and comment text (gocritic)
} else {
c.setLastReceived(time.Now())
c.setLastReceived() //time.Now())

Check failure on line 448 in candidate_base.go

View workflow job for this annotation

GitHub Actions / lint / Go

commentFormatting: put a space between `//` and comment text (gocritic)
}
}

Expand Down
10 changes: 6 additions & 4 deletions candidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,18 @@ func TestCandidateLastSent(t *testing.T) {
candidate := candidateBase{}
assert.Equal(t, candidate.LastSent(), time.Time{})
now := time.Now()
candidate.setLastSent(now)
assert.Equal(t, candidate.LastSent(), now)
candidate.setLastSent()
// assert.Equal(t, candidate.LastSent(), now)
assert.WithinDuration(t, candidate.LastSent(), now, time.Millisecond)
}

func TestCandidateLastReceived(t *testing.T) {
candidate := candidateBase{}
assert.Equal(t, candidate.LastReceived(), time.Time{})
now := time.Now()
candidate.setLastReceived(now)
assert.Equal(t, candidate.LastReceived(), now)
candidate.setLastReceived()
// assert.Equal(t, candidate.LastReceived(), now)
assert.WithinDuration(t, candidate.LastReceived(), now, time.Millisecond)
}

func TestCandidateFoundation(t *testing.T) {
Expand Down
25 changes: 21 additions & 4 deletions udp_muxed_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/pion/logging"
"github.com/pion/transport/v3/packetio"
)

const (
iceConnectedTimeout = 25 * time.Second
)

type udpMuxedConnParams struct {
Mux *UDPMuxDefault
AddrPool *sync.Pool
Expand All @@ -33,13 +38,17 @@ type udpMuxedConn struct {
closedChan chan struct{}
closeOnce sync.Once
mu sync.Mutex

startAt time.Time
iceConnected atomic.Bool
}

func newUDPMuxedConn(params *udpMuxedConnParams) *udpMuxedConn {
p := &udpMuxedConn{
params: params,
buf: packetio.NewBuffer(),
closedChan: make(chan struct{}),
startAt: time.Now(),
}

return p
Expand Down Expand Up @@ -80,10 +89,18 @@ func (c *udpMuxedConn) WriteTo(buf []byte, rAddr net.Addr) (n int, err error) {
if c.isClosed() {
return 0, io.ErrClosedPipe
}
// Each time we write to a new address, we'll register it with the mux
addr := rAddr.String()
if !c.containsAddress(addr) {
c.addAddress(addr)

// Only check the address at the ICE connecting stage to reduce the check cost
if !c.iceConnected.Load() {
if time.Since(c.startAt) > iceConnectedTimeout {
c.iceConnected.Store(true)
} else {
// Each time we write to a new address, we'll register it with the mux
addr := rAddr.String()
if !c.containsAddress(addr) {
c.addAddress(addr)
}
}
}

return c.params.Mux.writeTo(buf, rAddr)
Expand Down
Loading