Skip to content

Commit 589abba

Browse files
committed
Merge branch 'sbruens/udp-split-serving' into sbruens/websocket
2 parents ed980ad + 64c48ce commit 589abba

File tree

9 files changed

+368
-468
lines changed

9 files changed

+368
-468
lines changed

caddy/shadowsocks_handler.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"log/slog"
2121
"net"
22+
"time"
2223

2324
"github.com/Jigsaw-Code/outline-sdk/transport"
2425
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
@@ -28,10 +29,11 @@ import (
2829
outline "github.com/Jigsaw-Code/outline-ss-server/service"
2930
)
3031

31-
const serverUDPBufferSize = 64 * 1024
32-
3332
const ssModuleName = "layer4.handlers.shadowsocks"
3433

34+
// A UDP NAT timeout of at least 5 minutes is recommended in RFC 4787 Section 4.3.
35+
const defaultNatTimeout time.Duration = 5 * time.Minute
36+
3537
func init() {
3638
caddy.RegisterModule(ModuleRegistration{
3739
ID: ssModuleName,
@@ -48,8 +50,10 @@ type KeyConfig struct {
4850
type ShadowsocksHandler struct {
4951
Keys []KeyConfig `json:"keys,omitempty"`
5052

51-
service outline.Service
52-
logger *slog.Logger
53+
streamHandler outline.StreamHandler
54+
associationHandler outline.AssociationHandler
55+
metrics outline.ServiceMetrics
56+
logger *slog.Logger
5357
}
5458

5559
var (
@@ -73,6 +77,7 @@ func (h *ShadowsocksHandler) Provision(ctx caddy.Context) error {
7377
if !ok {
7478
return fmt.Errorf("module `%s` is of type `%T`, expected `OutlineApp`", outlineModuleName, app)
7579
}
80+
h.metrics = app.Metrics
7681

7782
if len(h.Keys) == 0 {
7883
h.logger.Warn("no keys configured")
@@ -100,30 +105,22 @@ func (h *ShadowsocksHandler) Provision(ctx caddy.Context) error {
100105
ciphers := outline.NewCipherList()
101106
ciphers.Update(cipherList)
102107

103-
service, err := outline.NewShadowsocksService(
108+
h.streamHandler, h.associationHandler = outline.NewShadowsocksHandlers(
104109
outline.WithLogger(h.logger),
105110
outline.WithCiphers(ciphers),
106-
outline.WithMetrics(app.Metrics),
111+
outline.WithMetrics(h.metrics),
107112
outline.WithReplayCache(&app.ReplayCache),
108113
)
109-
if err != nil {
110-
return err
111-
}
112-
h.service = service
113114
return nil
114115
}
115116

116117
// Handle implements layer4.NextHandler.
117118
func (h *ShadowsocksHandler) Handle(cx *layer4.Connection, _ layer4.Handler) error {
118119
switch conn := cx.Conn.(type) {
119120
case transport.StreamConn:
120-
h.service.HandleStream(cx.Context, conn)
121+
h.streamHandler.HandleStream(cx.Context, conn, h.metrics.AddOpenTCPConnection(conn))
121122
case net.Conn:
122-
assoc, err := h.service.NewConnAssociation(conn)
123-
if err != nil {
124-
return fmt.Errorf("Failed to handle association: %v", err)
125-
}
126-
assoc.Handle()
123+
h.associationHandler.HandleAssociation(cx.Context, conn, h.metrics.AddOpenUDPAssociation(conn))
127124
default:
128125
return fmt.Errorf("failed to handle unknown connection type: %t", conn)
129126
}

cmd/outline-ss-server/main.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
266266
ciphers := service.NewCipherList()
267267
ciphers.Update(cipherList)
268268

269-
ssService, err := service.NewShadowsocksService(
269+
streamHandler, associationHandler := service.NewShadowsocksHandlers(
270270
service.WithCiphers(ciphers),
271271
service.WithMetrics(s.serviceMetrics),
272272
service.WithReplayCache(&s.replayCache),
@@ -278,14 +278,18 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
278278
return err
279279
}
280280
slog.Info("TCP service started.", "address", ln.Addr().String())
281-
go service.StreamServe(ln.AcceptStream, ssService.HandleStream)
281+
go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) {
282+
streamHandler.HandleStream(ctx, conn, s.serviceMetrics.AddOpenTCPConnection(conn))
283+
})
282284

283285
pc, err := lnSet.ListenPacket(addr)
284286
if err != nil {
285287
return err
286288
}
287289
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
288-
go service.PacketServe(pc, ssService.NewPacketAssociation, s.serverMetrics)
290+
go service.PacketServe(pc, func(ctx context.Context, conn net.Conn) {
291+
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
292+
}, s.serverMetrics)
289293
}
290294

291295
// Start services with listeners.
@@ -294,7 +298,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
294298
if err != nil {
295299
return fmt.Errorf("failed to create cipher list from config: %v", err)
296300
}
297-
ssService, err := service.NewShadowsocksService(
301+
streamHandler, associationHandler := service.NewShadowsocksHandlers(
298302
service.WithCiphers(ciphers),
299303
service.WithMetrics(s.serviceMetrics),
300304
service.WithReplayCache(&s.replayCache),
@@ -318,7 +322,9 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
318322
}
319323
return serviceConfig.Dialer.Fwmark
320324
}())
321-
go service.StreamServe(ln.AcceptStream, ssService.HandleStream)
325+
go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) {
326+
streamHandler.HandleStream(ctx, conn, s.serviceMetrics.AddOpenTCPConnection(conn))
327+
})
322328
case listenerTypeUDP:
323329
pc, err := lnSet.ListenPacket(lnConfig.Address)
324330
if err != nil {
@@ -330,7 +336,9 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
330336
}
331337
return serviceConfig.Dialer.Fwmark
332338
}())
333-
go service.PacketServe(pc, ssService.NewPacketAssociation, s.serverMetrics)
339+
go service.PacketServe(pc, func(ctx context.Context, conn net.Conn) {
340+
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
341+
}, s.serverMetrics)
334342
case listenerTypeWebsocketStream:
335343
if _, exists := webServers[lnConfig.WebServer]; !exists {
336344
return fmt.Errorf("listener type `%s` references unknown web server `%s`", lnConfig.Type, lnConfig.WebServer)
@@ -348,7 +356,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
348356
return
349357
}
350358
conn := &streamConn{&wrappedConn{Conn: wsConn, raddr: raddr}}
351-
ssService.HandleStream(ctx, conn)
359+
streamHandler.HandleStream(ctx, conn, s.serviceMetrics.AddOpenTCPConnection(conn))
352360
}
353361
websocket.Handler(handler).ServeHTTP(w, r)
354362
})
@@ -362,20 +370,16 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
362370
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
363371
handler := func(wsConn *websocket.Conn) {
364372
defer wsConn.Close()
373+
ctx, contextCancel := context.WithCancel(context.Background())
374+
defer contextCancel()
365375
raddr, err := net.ResolveUDPAddr("udp", r.RemoteAddr)
366376
if err != nil {
367377
slog.Error("failed to upgrade", "err", err)
368378
w.WriteHeader(http.StatusBadGateway)
369379
return
370380
}
371381
conn := &wrappedConn{Conn: wsConn, raddr: raddr}
372-
assoc, err := ssService.NewConnAssociation(conn)
373-
if err != nil {
374-
slog.Error("failed to upgrade", "err", err)
375-
w.WriteHeader(http.StatusBadGateway)
376-
return
377-
}
378-
assoc.Handle()
382+
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
379383
}
380384
websocket.Handler(handler).ServeHTTP(w, r)
381385
})

internal/integration_test/integration_test.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func TestTCPEcho(t *testing.T) {
142142
go func() {
143143
service.StreamServe(
144144
func() (transport.StreamConn, error) { return proxyListener.AcceptTCP() },
145-
func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) },
145+
func(ctx context.Context, conn transport.StreamConn) { handler.HandleStream(ctx, conn, testMetrics) },
146146
)
147147
done <- struct{}{}
148148
}()
@@ -221,7 +221,7 @@ func TestRestrictedAddresses(t *testing.T) {
221221
go func() {
222222
service.StreamServe(
223223
service.WrapStreamAcceptFunc(proxyListener.AcceptTCP),
224-
func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) },
224+
func(ctx context.Context, conn transport.StreamConn) { handler.HandleStream(ctx, conn, testMetrics) },
225225
)
226226
done <- struct{}{}
227227
}()
@@ -288,6 +288,8 @@ type fakeUDPAssociationMetrics struct {
288288
var _ service.UDPAssociationMetrics = (*fakeUDPAssociationMetrics)(nil)
289289

290290
func (m *fakeUDPAssociationMetrics) AddAuthentication(key string) {
291+
m.mu.Lock()
292+
defer m.mu.Unlock()
291293
m.accessKey = key
292294
}
293295

@@ -317,13 +319,13 @@ func TestUDPEcho(t *testing.T) {
317319
if err != nil {
318320
t.Fatal(err)
319321
}
320-
proxy := service.NewPacketHandler(cipherList, &fakeShadowsocksMetrics{})
322+
proxy := service.NewAssociationHandler(cipherList, &fakeShadowsocksMetrics{})
321323

322324
proxy.SetTargetIPValidator(allowAll)
323325
natMetrics := &natTestMetrics{}
324326
associationMetrics := &fakeUDPAssociationMetrics{}
325-
go service.PacketServe(proxyConn, func(conn net.Conn) (service.PacketAssociation, error) {
326-
return proxy.NewPacketAssociation(conn, associationMetrics)
327+
go service.PacketServe(proxyConn, func(ctx context.Context, conn net.Conn) {
328+
proxy.HandleAssociation(ctx, conn, associationMetrics)
327329
}, natMetrics)
328330

329331
cryptoKey, err := shadowsocks.NewEncryptionKey(shadowsocks.CHACHA20IETFPOLY1305, secrets[0])
@@ -408,7 +410,7 @@ func BenchmarkTCPThroughput(b *testing.B) {
408410
go func() {
409411
service.StreamServe(
410412
service.WrapStreamAcceptFunc(proxyListener.AcceptTCP),
411-
func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) },
413+
func(ctx context.Context, conn transport.StreamConn) { handler.HandleStream(ctx, conn, testMetrics) },
412414
)
413415
done <- struct{}{}
414416
}()
@@ -475,7 +477,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) {
475477
go func() {
476478
service.StreamServe(
477479
service.WrapStreamAcceptFunc(proxyListener.AcceptTCP),
478-
func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) },
480+
func(ctx context.Context, conn transport.StreamConn) { handler.HandleStream(ctx, conn, testMetrics) },
479481
)
480482
done <- struct{}{}
481483
}()
@@ -545,12 +547,12 @@ func BenchmarkUDPEcho(b *testing.B) {
545547
if err != nil {
546548
b.Fatal(err)
547549
}
548-
proxy := service.NewPacketHandler(cipherList, &fakeShadowsocksMetrics{})
550+
proxy := service.NewAssociationHandler(cipherList, &fakeShadowsocksMetrics{})
549551
proxy.SetTargetIPValidator(allowAll)
550552
done := make(chan struct{})
551553
go func() {
552-
service.PacketServe(server, func(conn net.Conn) (service.PacketAssociation, error) {
553-
return proxy.NewPacketAssociation(conn, nil)
554+
service.PacketServe(server, func(ctx context.Context, conn net.Conn) {
555+
proxy.HandleAssociation(ctx, conn, &fakeUDPAssociationMetrics{})
554556
}, &natTestMetrics{})
555557
done <- struct{}{}
556558
}()
@@ -591,12 +593,12 @@ func BenchmarkUDPManyKeys(b *testing.B) {
591593
if err != nil {
592594
b.Fatal(err)
593595
}
594-
proxy := service.NewPacketHandler(cipherList, &fakeShadowsocksMetrics{})
596+
proxy := service.NewAssociationHandler(cipherList, &fakeShadowsocksMetrics{})
595597
proxy.SetTargetIPValidator(allowAll)
596598
done := make(chan struct{})
597599
go func() {
598-
service.PacketServe(proxyConn, func(conn net.Conn) (service.PacketAssociation, error) {
599-
return proxy.NewPacketAssociation(conn, nil)
600+
service.PacketServe(proxyConn, func(ctx context.Context, conn net.Conn) {
601+
proxy.HandleAssociation(ctx, conn, &fakeUDPAssociationMetrics{})
600602
}, &natTestMetrics{})
601603
done <- struct{}{}
602604
}()

prometheus/metrics.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,8 @@ func (m *serviceMetrics) getIPInfoFromAddr(addr net.Addr) ipinfo.IPInfo {
509509
return ipInfo
510510
}
511511

512+
// TODO: Split TCP and UDP metrics.
513+
512514
func (m *serviceMetrics) AddOpenTCPConnection(clientConn net.Conn) service.TCPConnMetrics {
513515
clientAddr := clientConn.RemoteAddr()
514516
clientInfo := m.getIPInfoFromAddr(clientAddr)
@@ -521,12 +523,12 @@ func (m *serviceMetrics) AddOpenUDPAssociation(clientConn net.Conn) service.UDPA
521523
return newUDPAssociationMetrics(m.udpServiceMetrics, m.tunnelTimeMetrics, clientAddr, clientInfo)
522524
}
523525

524-
func (m *serviceMetrics) AddCipherSearch(proto string, accessKeyFound bool, timeToCipher time.Duration) {
525-
if proto == "tcp" {
526-
m.tcpServiceMetrics.AddCipherSearch(accessKeyFound, timeToCipher)
527-
} else if proto == "udp" {
528-
m.udpServiceMetrics.AddCipherSearch(accessKeyFound, timeToCipher)
529-
}
526+
func (m *serviceMetrics) AddTCPCipherSearch(accessKeyFound bool, timeToCipher time.Duration) {
527+
m.tcpServiceMetrics.AddCipherSearch(accessKeyFound, timeToCipher)
528+
}
529+
530+
func (m *serviceMetrics) AddUDPCipherSearch(accessKeyFound bool, timeToCipher time.Duration) {
531+
m.udpServiceMetrics.AddCipherSearch(accessKeyFound, timeToCipher)
530532
}
531533

532534
// addIfNonZero helps avoid the creation of series that are always zero.

0 commit comments

Comments
 (0)