Skip to content

Commit

Permalink
refactor: implement WebSocket handlers with Gorilla
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Jan 30, 2025
1 parent b7d1c7a commit 2aa225f
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 250 deletions.
101 changes: 40 additions & 61 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/Jigsaw-Code/outline-sdk/x/websocket"
"github.com/gorilla/handlers"
gorilla "github.com/gorilla/websocket"
"github.com/lmittmann/tint"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/net/websocket"
"golang.org/x/term"

"github.com/Jigsaw-Code/outline-ss-server/ipinfo"
Expand All @@ -57,6 +59,10 @@ const tcpReadTimeout time.Duration = 59 * time.Second
// A UDP NAT timeout of at least 5 minutes is recommended in RFC 4787 Section 4.3.
const defaultNatTimeout time.Duration = 5 * time.Minute

// defaultUpgrader is a pre-configured instance of the gorilla.Upgrader which provides
// reasonable default values for various upgrade parameters.
var defaultUpgrader = gorilla.Upgrader{}

func init() {
logHandler = tint.NewHandler(
os.Stderr,
Expand Down Expand Up @@ -306,18 +312,22 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
if err != nil {
return err
}
logger := slog.Default().With(
slog.Int("access_keys", len(serviceConfig.Keys)),
slog.Any("fwmark", func() any {
if serviceConfig.Dialer.Fwmark == 0 {
return "disabled"
}
return serviceConfig.Dialer.Fwmark
}()),
)
for _, cfg := range serviceConfig.Listeners {
if cfg.TCP != nil {
ln, err := lnSet.ListenStream(cfg.TCP.Address)
if err != nil {
return err
}
slog.Info("TCP service started.", "address", ln.Addr().String(), "fwmark", func() any {
if serviceConfig.Dialer.Fwmark == 0 {
return "disabled"
}
return serviceConfig.Dialer.Fwmark
}())
logger.Info("TCP service started.", "address", ln.Addr().String())
go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) {
streamHandler.HandleStream(ctx, conn, s.serviceMetrics.AddOpenTCPConnection(conn))
})
Expand All @@ -326,12 +336,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
if err != nil {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String(), "fwmark", func() any {
if serviceConfig.Dialer.Fwmark == 0 {
return "disabled"
}
return serviceConfig.Dialer.Fwmark
}())
logger.Info("UDP service started.", "address", pc.LocalAddr().String())
go service.PacketServe(pc, func(ctx context.Context, conn net.Conn) {
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
}, s.serverMetrics)
Expand All @@ -340,48 +345,37 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
return fmt.Errorf("websocket-stream listener references unknown web server `%s`", cfg.WebsocketStream.WebServer)
}
mux := webServers[cfg.WebsocketStream.WebServer]
// TODO: Support a "half-closed" state for WebSockets.
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler := func(wsConn *websocket.Conn) {
defer wsConn.Close()
ctx, contextCancel := context.WithCancel(context.Background())
defer contextCancel()
clientIP, err := onet.GetClientIPFromRequest(r)
if err != nil {
slog.Error("failed to determine client address", "err", err)
w.WriteHeader(http.StatusBadGateway)
return
}
conn := &streamConn{&replaceAddrConn{Conn: wsConn, raddr: &net.TCPAddr{IP: clientIP}}}
streamHandler.HandleStream(ctx, conn, s.serviceMetrics.AddOpenTCPConnection(conn))
wsConn, err := defaultUpgrader.Upgrade(w, r, nil)
if err != nil {
slog.Error("failed to upgrade", "err", err)
}
websocket.Handler(handler).ServeHTTP(w, r)
defer wsConn.Close()
ctx, contextCancel := context.WithCancel(context.Background())
defer contextCancel()
conn := &replaceAddrConn{StreamConn: websocket.WrapConn(wsConn), raddr: &net.TCPAddr{IP: net.ParseIP(r.RemoteAddr)}}
streamHandler.HandleStream(ctx, conn, s.serviceMetrics.AddOpenTCPConnection(conn))
})
mux.Handle(cfg.WebsocketStream.Path, http.StripPrefix(cfg.WebsocketStream.Path, handler))
slog.Info("WebSocket stream service started.", "ID", cfg.WebsocketStream.WebServer, "path", cfg.WebsocketStream.Path)
mux.Handle(cfg.WebsocketStream.Path, http.StripPrefix(cfg.WebsocketStream.Path, handlers.ProxyHeaders(handler)))
logger.Info("WebSocket stream service started.", "ID", cfg.WebsocketStream.WebServer, "path", cfg.WebsocketStream.Path)
} else if cfg.WebsocketPacket != nil {
if _, exists := webServers[cfg.WebsocketPacket.WebServer]; !exists {
return fmt.Errorf("websocket-packet listener references unknown web server `%s`", cfg.WebsocketPacket.WebServer)
}
mux := webServers[cfg.WebsocketPacket.WebServer]
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler := func(wsConn *websocket.Conn) {
defer wsConn.Close()
ctx, contextCancel := context.WithCancel(context.Background())
defer contextCancel()
clientIP, err := onet.GetClientIPFromRequest(r)
if err != nil {
slog.Error("failed to determine client address", "err", err)
w.WriteHeader(http.StatusBadGateway)
return
}
conn := &replaceAddrConn{Conn: wsConn, raddr: &net.UDPAddr{IP: clientIP}}
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
wsConn, err := defaultUpgrader.Upgrade(w, r, nil)
if err != nil {
slog.Error("failed to upgrade", "err", err)
}
websocket.Handler(handler).ServeHTTP(w, r)
defer wsConn.Close()
ctx, contextCancel := context.WithCancel(context.Background())
defer contextCancel()
conn := &replaceAddrConn{StreamConn: websocket.WrapConn(wsConn), raddr: &net.UDPAddr{IP: net.ParseIP(r.RemoteAddr)}}
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
})
mux.Handle(cfg.WebsocketPacket.Path, http.StripPrefix(cfg.WebsocketPacket.Path, handler))
slog.Info("WebSocket packet service started.", "ID", cfg.WebsocketPacket.WebServer, "path", cfg.WebsocketPacket.Path)
mux.Handle(cfg.WebsocketPacket.Path, http.StripPrefix(cfg.WebsocketPacket.Path, handlers.ProxyHeaders(handler)))
logger.Info("WebSocket packet service started.", "ID", cfg.WebsocketPacket.WebServer, "path", cfg.WebsocketPacket.Path)
} else {
return fmt.Errorf("unknown listener configuration: %v", cfg)
}
Expand Down Expand Up @@ -452,31 +446,16 @@ func RunOutlineServer(filename string, natTimeout time.Duration, serverMetrics *
}

// TODO: Create a dedicated `ClientConn` struct with `ClientAddr` and `Conn`.
// replaceAddrConn overrides [websocket.Conn]'s remote address handling.
// replaceAddrConn overrides a [transport.StreamConn]'s remote address handling.
type replaceAddrConn struct {
*websocket.Conn
transport.StreamConn
raddr net.Addr
}

func (c replaceAddrConn) RemoteAddr() net.Addr {
return c.raddr
}

type streamConn struct {
net.Conn
}

var _ transport.StreamConn = (*streamConn)(nil)

// TODO: Support a "half-closed" state.
func (c *streamConn) CloseRead() error {
return c.Close()
}

func (c *streamConn) CloseWrite() error {
return c.Close()
}

func main() {
slog.SetDefault(slog.New(logHandler))

Expand Down
33 changes: 17 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/Jigsaw-Code/outline-ss-server

require (
github.com/Jigsaw-Code/outline-sdk v0.0.14
github.com/Jigsaw-Code/outline-sdk v0.0.18-0.20241106233708-faffebb12629
github.com/go-task/task/v3 v3.34.1
github.com/google/addlicense v1.1.1
github.com/google/go-licenses v1.6.0
Expand All @@ -11,9 +11,9 @@ require (
github.com/oschwald/geoip2-golang v1.8.0
github.com/prometheus/client_golang v1.15.0
github.com/shadowsocks/go-shadowsocks2 v0.1.5
github.com/stretchr/testify v1.8.4
golang.org/x/crypto v0.17.0
golang.org/x/term v0.16.0
github.com/stretchr/testify v1.9.0
golang.org/x/crypto v0.26.0
golang.org/x/term v0.23.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -45,6 +45,7 @@ require (
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.8.1 // indirect
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/Jigsaw-Code/outline-sdk/x v0.0.0-20250129215116-5f6d09583605 // indirect
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/semver/v3 v3.2.1 // indirect
Expand Down Expand Up @@ -154,7 +155,7 @@ require (
github.com/goreleaser/chglog v0.4.2 // indirect
github.com/goreleaser/fileglob v1.3.0 // indirect
github.com/goreleaser/nfpm/v2 v2.28.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand All @@ -171,7 +172,7 @@ require (
github.com/joho/godotenv v1.5.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Expand Down Expand Up @@ -219,7 +220,7 @@ require (
github.com/sigstore/cosign/v2 v2.0.0 // indirect
github.com/sigstore/rekor v1.1.1 // indirect
github.com/sigstore/sigstore v1.6.3 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/skeema/knownhosts v1.2.1 // indirect
github.com/slack-go/slack v0.12.2 // indirect
github.com/spf13/afero v1.9.3 // indirect
Expand All @@ -245,21 +246,21 @@ require (
go.opencensus.io v0.24.0 // indirect
go.uber.org/automaxprocs v1.5.2 // indirect
gocloud.dev v0.29.0 // indirect
golang.org/x/exp v0.0.0-20240110193028-0dcbfd608b1e // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.16.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.119.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.56.3 // indirect
google.golang.org/protobuf v1.30.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/mail.v2 v2.3.1 // indirect
Expand All @@ -274,4 +275,4 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect
)

go 1.21
go 1.22
Loading

0 comments on commit 2aa225f

Please sign in to comment.