Skip to content

Commit 423565f

Browse files
committed
Merge branch 'master' into sbruens/ip-key-metrics
2 parents 0520fd2 + 36bf99d commit 423565f

File tree

4 files changed

+86
-68
lines changed

4 files changed

+86
-68
lines changed

cmd/outline-ss-server/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ func (s *SSServer) startPort(portNum int) error {
8686
}
8787
logger.Infof("Shadowsocks UDP service listening on %v", packetConn.LocalAddr().String())
8888
port := &ssPort{tcpListener: listener, packetConn: packetConn, cipherList: service.NewCipherList()}
89+
authFunc := service.NewShadowsocksStreamAuthenticator(port.cipherList, &s.replayCache, s.m)
8990
// TODO: Register initial data metrics at zero.
90-
tcpHandler := service.NewTCPHandler(portNum, port.cipherList, &s.replayCache, s.m, tcpReadTimeout)
91+
tcpHandler := service.NewTCPHandler(portNum, authFunc, s.m, tcpReadTimeout)
9192
packetHandler := service.NewPacketHandler(s.natTimeout, port.cipherList, s.m)
9293
s.ports[portNum] = port
9394
accept := func() (transport.StreamConn, error) {

internal/integration_test/integration_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ func TestTCPEcho(t *testing.T) {
130130
}
131131
replayCache := service.NewReplayCache(5)
132132
const testTimeout = 200 * time.Millisecond
133-
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, cipherList, &replayCache, &service.NoOpTCPMetrics{}, testTimeout)
133+
testMetrics := &service.NoOpTCPMetrics{}
134+
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
135+
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
134136
handler.SetTargetDialer(&transport.TCPDialer{})
135137
done := make(chan struct{})
136138
go func() {
@@ -198,7 +200,8 @@ func TestRestrictedAddresses(t *testing.T) {
198200
require.NoError(t, err)
199201
const testTimeout = 200 * time.Millisecond
200202
testMetrics := &statusMetrics{}
201-
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, cipherList, nil, testMetrics, testTimeout)
203+
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
204+
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
202205
done := make(chan struct{})
203206
go func() {
204207
service.StreamServe(service.WrapStreamListener(proxyListener.AcceptTCP), handler.Handle)
@@ -378,7 +381,9 @@ func BenchmarkTCPThroughput(b *testing.B) {
378381
b.Fatal(err)
379382
}
380383
const testTimeout = 200 * time.Millisecond
381-
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, cipherList, nil, &service.NoOpTCPMetrics{}, testTimeout)
384+
testMetrics := &service.NoOpTCPMetrics{}
385+
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
386+
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
382387
handler.SetTargetDialer(&transport.TCPDialer{})
383388
done := make(chan struct{})
384389
go func() {
@@ -440,7 +445,9 @@ func BenchmarkTCPMultiplexing(b *testing.B) {
440445
}
441446
replayCache := service.NewReplayCache(service.MaxCapacity)
442447
const testTimeout = 200 * time.Millisecond
443-
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, cipherList, &replayCache, &service.NoOpTCPMetrics{}, testTimeout)
448+
testMetrics := &service.NoOpTCPMetrics{}
449+
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
450+
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
444451
handler.SetTargetDialer(&transport.TCPDialer{})
445452
done := make(chan struct{})
446453
go func() {

service/tcp.go

Lines changed: 57 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@ type TCPMetrics interface {
4343
AddOpenTCPConnection(ip net.Addr)
4444
AddAuthenticatedTCPConnection(ip net.Addr, accessKey string)
4545
AddClosedTCPConnection(ip net.Addr, accessKey string, status string, data metrics.ProxyMetrics, duration time.Duration)
46-
47-
// Shadowsocks TCP metrics
4846
AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64)
49-
AddTCPCipherSearch(accessKeyFound bool, timeToCipher time.Duration)
5047
}
5148

5249
func remoteIP(conn net.Conn) net.IP {
@@ -119,26 +116,67 @@ func findEntry(firstBytes []byte, ciphers []*list.Element) (*CipherEntry, *list.
119116
return nil, nil
120117
}
121118

119+
type StreamAuthenticateFunc func(clientConn transport.StreamConn) (string, transport.StreamConn, *onet.ConnectionError)
120+
121+
// ShadowsocksTCPMetrics is used to report Shadowsocks metrics on TCP connections.
122+
type ShadowsocksTCPMetrics interface {
123+
// Shadowsocks TCP metrics
124+
AddTCPCipherSearch(accessKeyFound bool, timeToCipher time.Duration)
125+
}
126+
127+
// NewShadowsocksStreamAuthenticator creates a stream authenticator that uses Shadowsocks.
128+
// TODO(fortuna): Offer alternative transports.
129+
func NewShadowsocksStreamAuthenticator(ciphers CipherList, replayCache *ReplayCache, metrics ShadowsocksTCPMetrics) StreamAuthenticateFunc {
130+
return func(clientConn transport.StreamConn) (string, transport.StreamConn, *onet.ConnectionError) {
131+
// Find the cipher and acess key id.
132+
cipherEntry, clientReader, clientSalt, timeToCipher, keyErr := findAccessKey(clientConn, remoteIP(clientConn), ciphers)
133+
metrics.AddTCPCipherSearch(keyErr == nil, timeToCipher)
134+
if keyErr != nil {
135+
const status = "ERR_CIPHER"
136+
return "", nil, onet.NewConnectionError(status, "Failed to find a valid cipher", keyErr)
137+
}
138+
var id string
139+
if cipherEntry != nil {
140+
id = cipherEntry.ID
141+
}
142+
143+
// Check if the connection is a replay.
144+
isServerSalt := cipherEntry.SaltGenerator.IsServerSalt(clientSalt)
145+
// Only check the cache if findAccessKey succeeded and the salt is unrecognized.
146+
if isServerSalt || !replayCache.Add(cipherEntry.ID, clientSalt) {
147+
var status string
148+
if isServerSalt {
149+
status = "ERR_REPLAY_SERVER"
150+
} else {
151+
status = "ERR_REPLAY_CLIENT"
152+
}
153+
return id, nil, onet.NewConnectionError(status, "Replay detected", nil)
154+
}
155+
156+
metrics.AddAuthenticatedTCPConnection(clientConn.RemoteAddr(), id)
157+
ssr := shadowsocks.NewReader(clientReader, cipherEntry.CryptoKey)
158+
ssw := shadowsocks.NewWriter(clientConn, cipherEntry.CryptoKey)
159+
ssw.SetSaltGenerator(cipherEntry.SaltGenerator)
160+
return id, transport.WrapConn(clientConn, ssr, ssw), nil
161+
}
162+
}
163+
122164
type tcpHandler struct {
123-
port int
124-
ciphers CipherList
125-
m TCPMetrics
126-
readTimeout time.Duration
127-
// `replayCache` is a pointer to SSServer.replayCache, to share the cache among all ports.
128-
replayCache *ReplayCache
129-
dialer transport.StreamDialer
165+
port int
166+
m TCPMetrics
167+
readTimeout time.Duration
168+
authenticate StreamAuthenticateFunc
169+
dialer transport.StreamDialer
130170
}
131171

132172
// NewTCPService creates a TCPService
133-
// `replayCache` is a pointer to SSServer.replayCache, to share the cache among all ports.
134-
func NewTCPHandler(port int, ciphers CipherList, replayCache *ReplayCache, m TCPMetrics, timeout time.Duration) TCPHandler {
173+
func NewTCPHandler(port int, authenticate StreamAuthenticateFunc, m TCPMetrics, timeout time.Duration) TCPHandler {
135174
return &tcpHandler{
136-
port: port,
137-
ciphers: ciphers,
138-
m: m,
139-
readTimeout: timeout,
140-
replayCache: replayCache,
141-
dialer: defaultDialer,
175+
port: port,
176+
m: m,
177+
readTimeout: timeout,
178+
authenticate: authenticate,
179+
dialer: defaultDialer,
142180
}
143181
}
144182

@@ -235,42 +273,6 @@ func (h *tcpHandler) Handle(ctx context.Context, clientConn transport.StreamConn
235273
logger.Debugf("Done with status %v, duration %v", status, connDuration)
236274
}
237275

238-
func (h *tcpHandler) authenticate(clientConn transport.StreamConn, proxyMetrics *metrics.ProxyMetrics) (string, transport.StreamConn, *onet.ConnectionError) {
239-
// TODO(fortuna): Offer alternative transports.
240-
// Find the cipher and acess key id.
241-
cipherEntry, clientReader, clientSalt, timeToCipher, keyErr := findAccessKey(clientConn, remoteIP(clientConn), h.ciphers)
242-
h.m.AddTCPCipherSearch(keyErr == nil, timeToCipher)
243-
if keyErr != nil {
244-
logger.Debugf("Failed to find a valid cipher after reading %v bytes: %v", proxyMetrics.ClientProxy, keyErr)
245-
const status = "ERR_CIPHER"
246-
return "", nil, onet.NewConnectionError(status, "Failed to find a valid cipher", keyErr)
247-
}
248-
var id string
249-
if cipherEntry != nil {
250-
id = cipherEntry.ID
251-
}
252-
253-
// Check if the connection is a replay.
254-
isServerSalt := cipherEntry.SaltGenerator.IsServerSalt(clientSalt)
255-
// Only check the cache if findAccessKey succeeded and the salt is unrecognized.
256-
if isServerSalt || !h.replayCache.Add(cipherEntry.ID, clientSalt) {
257-
var status string
258-
if isServerSalt {
259-
status = "ERR_REPLAY_SERVER"
260-
} else {
261-
status = "ERR_REPLAY_CLIENT"
262-
}
263-
logger.Debugf(status+": %v sent %d bytes", clientConn.RemoteAddr(), proxyMetrics.ClientProxy)
264-
return id, nil, onet.NewConnectionError(status, "Replay detected", nil)
265-
}
266-
267-
h.m.AddAuthenticatedTCPConnection(clientConn.RemoteAddr(), id)
268-
ssr := shadowsocks.NewReader(clientReader, cipherEntry.CryptoKey)
269-
ssw := shadowsocks.NewWriter(clientConn, cipherEntry.CryptoKey)
270-
ssw.SetSaltGenerator(cipherEntry.SaltGenerator)
271-
return id, transport.WrapConn(clientConn, ssr, ssw), nil
272-
}
273-
274276
func getProxyRequest(clientConn transport.StreamConn) (string, error) {
275277
// TODO(fortuna): Use Shadowsocks proxy, HTTP CONNECT or SOCKS5 based on first byte:
276278
// case 1, 3 or 4: Shadowsocks (address type)
@@ -332,7 +334,7 @@ func (h *tcpHandler) handleConnection(ctx context.Context, listenerPort int, out
332334
}
333335
outerConn.SetReadDeadline(readDeadline)
334336

335-
id, innerConn, authErr := h.authenticate(outerConn, proxyMetrics)
337+
id, innerConn, authErr := h.authenticate(outerConn)
336338
if authErr != nil {
337339
// Drain to protect against probing attacks.
338340
h.absorbProbe(listenerPort, outerConn, authErr.Status, proxyMetrics)

service/tcp_test.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ func TestProbeRandom(t *testing.T) {
278278
cipherList, err := MakeTestCiphers(makeTestSecrets(1))
279279
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
280280
testMetrics := &probeTestMetrics{}
281-
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, cipherList, nil, testMetrics, 200*time.Millisecond)
281+
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
282+
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
282283
done := make(chan struct{})
283284
go func() {
284285
StreamServe(WrapStreamListener(listener.AcceptTCP), handler.Handle)
@@ -354,7 +355,8 @@ func TestProbeClientBytesBasicTruncated(t *testing.T) {
354355
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
355356
cipher := firstCipher(cipherList)
356357
testMetrics := &probeTestMetrics{}
357-
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, cipherList, nil, testMetrics, 200*time.Millisecond)
358+
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
359+
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
358360
handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll))
359361
done := make(chan struct{})
360362
go func() {
@@ -389,7 +391,8 @@ func TestProbeClientBytesBasicModified(t *testing.T) {
389391
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
390392
cipher := firstCipher(cipherList)
391393
testMetrics := &probeTestMetrics{}
392-
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, cipherList, nil, testMetrics, 200*time.Millisecond)
394+
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
395+
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
393396
handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll))
394397
done := make(chan struct{})
395398
go func() {
@@ -425,7 +428,8 @@ func TestProbeClientBytesCoalescedModified(t *testing.T) {
425428
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
426429
cipher := firstCipher(cipherList)
427430
testMetrics := &probeTestMetrics{}
428-
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, cipherList, nil, testMetrics, 200*time.Millisecond)
431+
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
432+
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
429433
handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll))
430434
done := make(chan struct{})
431435
go func() {
@@ -468,7 +472,8 @@ func TestProbeServerBytesModified(t *testing.T) {
468472
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
469473
cipher := firstCipher(cipherList)
470474
testMetrics := &probeTestMetrics{}
471-
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, cipherList, nil, testMetrics, 200*time.Millisecond)
475+
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
476+
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
472477
done := make(chan struct{})
473478
go func() {
474479
StreamServe(WrapStreamListener(listener.AcceptTCP), handler.Handle)
@@ -498,7 +503,8 @@ func TestReplayDefense(t *testing.T) {
498503
replayCache := NewReplayCache(5)
499504
testMetrics := &probeTestMetrics{}
500505
const testTimeout = 200 * time.Millisecond
501-
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, cipherList, &replayCache, testMetrics, testTimeout)
506+
authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
507+
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
502508
snapshot := cipherList.SnapshotForClientIP(nil)
503509
cipherEntry := snapshot[0].Value.(*CipherEntry)
504510
cipher := cipherEntry.CryptoKey
@@ -576,7 +582,8 @@ func TestReverseReplayDefense(t *testing.T) {
576582
replayCache := NewReplayCache(5)
577583
testMetrics := &probeTestMetrics{}
578584
const testTimeout = 200 * time.Millisecond
579-
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, cipherList, &replayCache, testMetrics, testTimeout)
585+
authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
586+
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
580587
snapshot := cipherList.SnapshotForClientIP(nil)
581588
cipherEntry := snapshot[0].Value.(*CipherEntry)
582589
cipher := cipherEntry.CryptoKey
@@ -646,7 +653,8 @@ func probeExpectTimeout(t *testing.T, payloadSize int) {
646653
cipherList, err := MakeTestCiphers(makeTestSecrets(5))
647654
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
648655
testMetrics := &probeTestMetrics{}
649-
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, cipherList, nil, testMetrics, testTimeout)
656+
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
657+
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
650658

651659
done := make(chan struct{})
652660
go func() {

0 commit comments

Comments
 (0)