Skip to content

Latest commit

 

History

History
executable file
·
395 lines (304 loc) · 12 KB

File metadata and controls

executable file
·
395 lines (304 loc) · 12 KB

GWS

logo

简单 · 高性能 · 可靠 的 WebSocket 服务器与客户端库

awesome codecov Go Test go-reportcard HelloGithub license go-version

介绍

GWS(Go WebSocket)是一个使用 Go 编写的、简单、高性能且功能完备的 WebSocket 库。 它专为高并发场景设计,非常适合构建 接口服务、长连接网关、代理、IM / 聊天室、游戏、实时流媒体、消息推送 / 订阅 等系统。 GWS 提供了极简的事件驱动 API,你可以用极少的代码构建出稳定可靠的 WebSocket 服务器或客户端。

为什么选择 GWS

  • 简单易用

    • 事件友好:基于 Event 接口的事件驱动模型,OnOpen / OnMessage / OnClose / OnPing / OnPong 一目了然。
    • 编码效率高:隐藏协议细节,最大限度减少业务代码量,上手几乎零成本。
  • 性能出众

    • 高吞吐 / 低延迟:针对 WebSocket 场景深度优化,在 Echo、长连接推送等场景下表现优秀。
    • 低内存占用:内建高效的 buffer 复用和压缩策略,在高并发场景下显著降低内存与 CPU 成本。
  • 稳定可靠

    • 健壮的错误处理:对连接异常、协议错误、压缩异常等都提供清晰的处理路径。
    • 测试完备:通过全部 Autobahn 用例,兼容 RFC 6455 / RFC 7692,单元测试覆盖率接近 100%,覆盖所有条件分支。

基准测试

IOPS (Echo Server)

GOMAXPROCS=4, Connection=1000, CompressEnabled=false

performance

GoBench

go test -benchmem -run=^$ -bench . github.com/lxzan/gws
goos: linux
goarch: amd64
pkg: github.com/lxzan/gws
cpu: AMD Ryzen 5 PRO 4650G with Radeon Graphics
BenchmarkConn_WriteMessage/compress_disabled-12                  5263632               232.3 ns/op            24 B/op          1 allocs/op
BenchmarkConn_WriteMessage/compress_enabled-12                     99663             11265 ns/op             386 B/op          1 allocs/op
BenchmarkConn_ReadMessage/compress_disabled-12                   7809654               152.4 ns/op             8 B/op          0 allocs/op
BenchmarkConn_ReadMessage/compress_enabled-12                     326257              3133 ns/op              81 B/op          1 allocs/op
PASS
ok      github.com/lxzan/gws    17.231s

Index

特性

  • 事件驱动式 API:基于 Event 接口,使用体验类似常见的 WebSocket SDK。
  • 广播能力:提供 Broadcaster,支持高效复用压缩结果进行大规模广播。
  • 代理拨号:支持自定义 Dialer,可与 SOCKS5 / HTTP 代理等一起使用。
  • 上下文接管(permessage-deflate):支持按需配置上下文接管与滑动窗口大小。
  • 大文件分段写入WriteFile 采用分段策略,减少大文件写入时的峰值内存。
  • 并发与异步写入:内置任务队列,支持异步写、多切片写 Writev / WritevAsync
  • 标准兼容性强:通过所有 Autobahn 用例 Server 报告 / Client 报告

注意

  • 所有 gws.Conn 的导出方法返回的错误在多数业务场景下可忽略:库内部已经根据错误类型做了适当处理(例如关闭连接、上报事件等)。
  • 传输超大文件时,底层连接在传输过程中可能会长时间占用带宽与 IO,需结合业务做限流或切分。
  • 如果复用 net/http 服务器(例如 http.HandleFunc 中升级),强烈建议在 新的 Goroutine 中调用 ReadLoop,避免请求上下文无法及时 GC。

安装

go get -v github.com/lxzan/gws@latest

事件

type Event interface {
    OnOpen(socket *Conn)                        // connection is established
    OnClose(socket *Conn, err error)            // received a close frame or input/output error occurs
    OnPing(socket *Conn, payload []byte)        // received a ping frame
    OnPong(socket *Conn, payload []byte)        // received a pong frame
    OnMessage(socket *Conn, message *Message)   // received a text/binary frame
}

快速上手

package main

import "github.com/lxzan/gws"

func main() {
	gws.NewServer(&gws.BuiltinEventHandler{}, nil).Run(":6666")
}

最佳实践

package main

import (
	"net/http"
	"time"

	"github.com/lxzan/gws"
)

const (
	PingInterval = 5 * time.Second
	PingWait     = 10 * time.Second
)

func main() {
	upgrader := gws.NewUpgrader(&Handler{}, &gws.ServerOption{
		ParallelEnabled:  true,                                 // 开启并行消息处理
		Recovery:          gws.Recovery,                         // 开启异常恢复
		PermessageDeflate: gws.PermessageDeflate{Enabled: true}, // 开启压缩
	})
	http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
		socket, err := upgrader.Upgrade(writer, request)
		if err != nil {
			return
		}
		go func() {
			socket.ReadLoop() // 此处阻塞会使请求上下文不能顺利被GC
		}()
	})
	http.ListenAndServe(":6666", nil)
}

type Handler struct{}

func (c *Handler) OnOpen(socket *gws.Conn) {
	_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
}

func (c *Handler) OnClose(socket *gws.Conn, err error) {}

func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
	_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
	_ = socket.WritePong(nil)
}

func (c *Handler) OnPong(socket *gws.Conn, payload []byte) {}

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
	defer message.Close()
	socket.WriteMessage(message.Opcode, message.Bytes())
}

更多用例

KCP

  • server
package main

import (
	"log"
	"github.com/lxzan/gws"
	kcp "github.com/xtaci/kcp-go"
)

func main() {
	listener, err := kcp.Listen(":6666")
	if err != nil {
		log.Println(err.Error())
		return
	}
	app := gws.NewServer(&gws.BuiltinEventHandler{}, nil)
	app.RunListener(listener)
}
  • client
package main

import (
	"github.com/lxzan/gws"
	kcp "github.com/xtaci/kcp-go"
	"log"
)

func main() {
	conn, err := kcp.Dial("127.0.0.1:6666")
	if err != nil {
		log.Println(err.Error())
		return
	}
	app, _, err := gws.NewClientFromConn(&gws.BuiltinEventHandler{}, nil, conn)
	if err != nil {
		log.Println(err.Error())
		return
	}
	app.ReadLoop()
}

代理

通过代理拨号, 使用 socks5 协议.

package main

import (
	"crypto/tls"
	"github.com/lxzan/gws"
	"golang.org/x/net/proxy"
	"log"
)

func main() {
	socket, _, err := gws.NewClient(new(gws.BuiltinEventHandler), &gws.ClientOption{
		Addr:      "wss://example.com/connect",
		TlsConfig: &tls.Config{InsecureSkipVerify: true},
		NewDialer: func() (gws.Dialer, error) {
			return proxy.SOCKS5("tcp", "127.0.0.1:1080", nil, nil)
		},
		PermessageDeflate: gws.PermessageDeflate{
			Enabled:               true,
			ServerContextTakeover: true,
			ClientContextTakeover: true,
		},
	})
	if err != nil {
		log.Println(err.Error())
		return
	}
	socket.ReadLoop()
}

广播

先创建一个 Broadcaster 实例,然后在循环中调用 Broadcast 方法向每个客户端发送消息,最后关闭 广播程序以回收内存。整个过程中消息只会被压缩一次。

func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) {
    var b = gws.NewBroadcaster(opcode, payload)
    defer b.Close()
    for _, item := range conns {
        _ = b.Broadcast(item)
    }
}

写入超时

SetDeadline 可以覆盖大部分使用场景, 想要精细地控制每一次写入的超时时间, 则需要自行封装下 WriteWithTimeout 函数, timer 的创建和销毁会有一定额外开销.

func WriteWithTimeout(socket *gws.Conn, p []byte, timeout time.Duration) error {
	var sig = atomic.Uint32{}
	var timer = time.AfterFunc(timeout, func() {
		if sig.CompareAndSwap(0, 1) {
			socket.WriteClose(1000, []byte("write timeout"))
		}
	})
	var err = socket.WriteMessage(gws.OpcodeText, p)
	if sig.CompareAndSwap(0, 1) {
		timer.Stop()
	}
	return err
}

发布/订阅

使用 event_emitter 包实现发布订阅模式。用结构体包装 gws.Conn,并实现 GetSubscriberID 方法以获取订阅 ID,该 ID 必须是唯一的。订阅 ID 用于识别订阅者,订阅者只能接收其订阅主题的消息。

此示例对于使用 gws 构建聊天室或消息推送非常有用。这意味着用户可以通过 websocket 订阅一个或多个主题,当向该主题发布消息时,所有订阅用户都会收到消息。

package main

import (
    "github.com/lxzan/event_emitter"
    "github.com/lxzan/gws"
)

type Subscriber gws.Conn

func NewSubscriber(conn *gws.Conn) *Subscriber { return (*Subscriber)(conn) }

func (c *Subscriber) GetSubscriberID() int64 {
    userId, _ := c.GetMetadata().Load("userId")
    return userId.(int64)
}

func (c *Subscriber) GetMetadata() event_emitter.Metadata { return c.Conn().Session() }

func (c *Subscriber) Conn() *gws.Conn { return (*gws.Conn)(c) }

func Subscribe(em *event_emitter.EventEmitter[int64, *Subscriber], s *Subscriber, topic string) {
    em.Subscribe(s, topic, func(msg any) {
        _ = msg.(*gws.Broadcaster).Broadcast(s.Conn())
    })
}

func Publish(em *event_emitter.EventEmitter[int64, *Subscriber], topic string, msg []byte) {
    var broadcaster = gws.NewBroadcaster(gws.OpcodeText, msg)
    defer broadcaster.Close()
    em.Publish(topic, broadcaster)
}

Autobahn 测试

cd examples/autobahn
mkdir reports
docker run -it --rm \
    -v ${PWD}/config:/config \
    -v ${PWD}/reports:/reports \
    crossbario/autobahn-testsuite \
    wstest -m fuzzingclient -s /config/fuzzingclient.json

生态

交流

微信需要先添加好友再拉群, 请注明来自 GitHub

WeChat      QQ

赞赏

WeChat

致谢