Skip to content

Commit

Permalink
♻️ refactor: updated codebase #3
Browse files Browse the repository at this point in the history
  • Loading branch information
pnguyen215 committed Jan 15, 2024
1 parent cf7c2fd commit 69bd686
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 72 deletions.
25 changes: 12 additions & 13 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,28 @@ name: Go

on:
push:
branches: [ "master" ]
branches: ["master"]
tags:
- "v*"
pull_request:
branches: [ "master" ]
branches: ["master"]

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19

- name: Build
run: go build -v ./...
- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
# - name: Test
# run: go test -v ./...

create-release:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -57,4 +56,4 @@ jobs:
draft: false
prerelease: false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
30 changes: 30 additions & 0 deletions example/wsconn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package example

import (
"net/http"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/sivaosorg/govm/wsconnx"
"github.com/sivaosorg/wsconn"
)

func TestWebsocket(t *testing.T) {
r := gin.Default()
ws := wsconn.NewWebsocket()
s := wsconn.NewWebsocketService(ws)

r.GET("/subscribe", s.SubscribeMessage) // ws://localhost:8081/subscribe
r.POST("/message", func(c *gin.Context) {
var message wsconnx.WsConnMessagePayload
message.SetGenesisTimestamp(time.Now())
if err := c.ShouldBindJSON(&message); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request payload"})
return
}
s.BroadcastMessage(message)
c.JSON(http.StatusOK, gin.H{"message": "Message sent successfully", "data": message})
})
r.Run(":8081")
}
60 changes: 32 additions & 28 deletions wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,50 @@ var (
)

func NewWebsocket() *Websocket {
ws := &Websocket{}
ws.SetBroadcast(make(map[string]chan wsconnx.WsConnMessagePayload))
ws.SetSubscribers(make(map[*websocket.Conn]wsconnx.WsConnSubscription))
ws.SetConfig(*conf)
ws.SetUpgrader(wsUpgrader)
ws.SetAllowCloseConn(false)
ws.SetRegisteredTopics(make(map[string]bool))
return ws
w := &Websocket{}
w.SetBroadcast(make(map[string]chan wsconnx.WsConnMessagePayload))
w.SetSubscribers(make(map[*websocket.Conn]wsconnx.WsConnSubscription))
w.SetOption(*conf)
w.SetUpgrader(wsUpgrader)
w.SetEnabledClosure(false)
w.SetTopics(make(map[string]bool))
return w
}

func (ws *Websocket) SetBroadcast(value map[string]chan wsconnx.WsConnMessagePayload) *Websocket {
ws.Broadcast = value
return ws
func (w *Websocket) SetBroadcast(value map[string]chan wsconnx.WsConnMessagePayload) *Websocket {
w.broadcast = value
return w
}

func (ws *Websocket) SetSubscribers(value map[*websocket.Conn]wsconnx.WsConnSubscription) *Websocket {
ws.Subscribers = value
return ws
func (w *Websocket) SetSubscribers(value map[*websocket.Conn]wsconnx.WsConnSubscription) *Websocket {
w.subscribers = value
return w
}

func (ws *Websocket) SetConfig(value wsconnx.WsConnOptionConfig) *Websocket {
ws.Config = value
return ws
func (w *Websocket) SetOption(value wsconnx.WsConnOptionConfig) *Websocket {
w.Option = value
return w
}

func (ws *Websocket) SetUpgrader(value websocket.Upgrader) *Websocket {
ws.Upgrader = value
return ws
func (w *Websocket) SetUpgrader(value websocket.Upgrader) *Websocket {
w.upgrader = value
return w
}

func (ws *Websocket) SetAllowCloseConn(value bool) *Websocket {
ws.AllowCloseConn = value
return ws
func (w *Websocket) Upgrader() websocket.Upgrader {
return w.upgrader
}

func (ws *Websocket) SetRegisteredTopics(value map[string]bool) *Websocket {
ws.RegisteredTopics = value
return ws
func (w *Websocket) SetEnabledClosure(value bool) *Websocket {
w.IsEnabledClosure = value
return w
}

func (ws *Websocket) Json() string {
return utils.ToJson(ws)
func (w *Websocket) SetTopics(value map[string]bool) *Websocket {
w.Topics = value
return w
}

func (w *Websocket) Json() string {
return utils.ToJson(w)
}
14 changes: 7 additions & 7 deletions wsconn_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
)

type Websocket struct {
Config wsconnx.WsConnOptionConfig `json:"conf"`
AllowCloseConn bool `json:"allow_close_conn"`
RegisteredTopics map[string]bool `json:"registered_topics"`
Upgrader websocket.Upgrader `json:"-"`
Broadcast map[string]chan wsconnx.WsConnMessagePayload `json:"-"`
Subscribers map[*websocket.Conn]wsconnx.WsConnSubscription `json:"-"`
Mutex sync.Mutex `json:"-"`
mutex sync.Mutex
upgrader websocket.Upgrader
broadcast map[string]chan wsconnx.WsConnMessagePayload
subscribers map[*websocket.Conn]wsconnx.WsConnSubscription
Option wsconnx.WsConnOptionConfig `json:"option"`
IsEnabledClosure bool `json:"enabled_closure"`
Topics map[string]bool `json:"topics"`
}
48 changes: 24 additions & 24 deletions wsconn_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ func NewWebsocketService(wsConf *Websocket) WebsocketService {
}

func (ws *websocketServiceImpl) Run(topic string) {
channel, ok := ws.wsConf.Broadcast[topic]
channel, ok := ws.wsConf.broadcast[topic]
if !ok {
_logger.Warn("Topic not found: %v", topic)
return
}
for {
message := <-channel
ws.wsConf.Mutex.Lock()
for subscriber, subscription := range ws.wsConf.Subscribers {
ws.wsConf.mutex.Lock()
for subscriber, subscription := range ws.wsConf.subscribers {
if subscription.Topic == topic {
err := ws.WriteMessage(subscriber, message)
if err != nil {
Expand All @@ -50,28 +50,28 @@ func (ws *websocketServiceImpl) Run(topic string) {
}
}
}
ws.wsConf.Mutex.Unlock()
ws.wsConf.mutex.Unlock()
}
}

func (ws *websocketServiceImpl) WriteMessage(conn *websocket.Conn, message wsconnx.WsConnMessagePayload) error {
message.SetGenesisTimestamp(time.Now())
conn.SetWriteDeadline(time.Now().Add(ws.wsConf.Config.WriteWait))
conn.SetWriteDeadline(time.Now().Add(ws.wsConf.Option.WriteWait))
return conn.WriteJSON(message)
}

func (ws *websocketServiceImpl) CloseSubscriber(conn *websocket.Conn) {
conn.Close()
delete(ws.wsConf.Subscribers, conn)
delete(ws.wsConf.subscribers, conn)
}

func (ws *websocketServiceImpl) AddSubscriber(conn *websocket.Conn, subscription wsconnx.WsConnSubscription) {
ws.wsConf.Mutex.Lock()
defer ws.wsConf.Mutex.Unlock()
ws.wsConf.mutex.Lock()
defer ws.wsConf.mutex.Unlock()
if conn != nil {
ws.wsConf.Subscribers[conn] = subscription
if _, ok := ws.wsConf.Broadcast[subscription.Topic]; !ok {
ws.wsConf.Broadcast[subscription.Topic] = make(chan wsconnx.WsConnMessagePayload)
ws.wsConf.subscribers[conn] = subscription
if _, ok := ws.wsConf.broadcast[subscription.Topic]; !ok {
ws.wsConf.broadcast[subscription.Topic] = make(chan wsconnx.WsConnMessagePayload)
go ws.Run(subscription.Topic)
}
}
Expand All @@ -80,7 +80,7 @@ func (ws *websocketServiceImpl) AddSubscriber(conn *websocket.Conn, subscription
// Parse user ID and desired topic from the WebSocket message
// Read incoming messages, but ignore them as we handle sending only
func (ws *websocketServiceImpl) SubscribeMessage(c *gin.Context) {
conn, err := ws.wsConf.Upgrader.Upgrade(c.Writer, c.Request, nil)
conn, err := ws.wsConf.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
_logger.Error("An error occurred while upgrading connection", err)
return
Expand All @@ -92,11 +92,11 @@ func (ws *websocketServiceImpl) SubscribeMessage(c *gin.Context) {
return
}
ws.AddSubscriber(conn, subscription)
if ws.wsConf.AllowCloseConn {
conn.SetReadLimit(int64(ws.wsConf.Config.MaxMessageSize))
conn.SetReadDeadline(time.Now().Add(ws.wsConf.Config.PongWait))
if ws.wsConf.IsEnabledClosure {
conn.SetReadLimit(int64(ws.wsConf.Option.MaxMessageSize))
conn.SetReadDeadline(time.Now().Add(ws.wsConf.Option.PongWait))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(ws.wsConf.Config.PongWait))
conn.SetReadDeadline(time.Now().Add(ws.wsConf.Option.PongWait))
return nil
})
}
Expand All @@ -112,30 +112,30 @@ func (ws *websocketServiceImpl) SubscribeMessage(c *gin.Context) {

// Find the channel for the specific topic and send the message to it
func (ws *websocketServiceImpl) BroadcastMessage(message wsconnx.WsConnMessagePayload) {
ws.wsConf.Mutex.Lock()
defer ws.wsConf.Mutex.Unlock()
if channel, ok := ws.wsConf.Broadcast[message.Topic]; ok {
ws.wsConf.mutex.Lock()
defer ws.wsConf.mutex.Unlock()
if channel, ok := ws.wsConf.broadcast[message.Topic]; ok {
channel <- message
}
}

func (ws *websocketServiceImpl) RegisterTopic(c *gin.Context) {
ws.wsConf.Mutex.Lock()
defer ws.wsConf.Mutex.Unlock()
ws.wsConf.mutex.Lock()
defer ws.wsConf.mutex.Unlock()
response := entity.NewResponseEntity()
var subscription wsconnx.WsConnSubscription
if err := c.ShouldBindJSON(&subscription); err != nil {
response.SetStatusCode(http.StatusBadRequest).SetError(err).SetMessage(err.Error())
c.JSON(response.StatusCode, response)
return
}
if _, ok := ws.wsConf.RegisteredTopics[subscription.Topic]; ok {
if _, ok := ws.wsConf.Topics[subscription.Topic]; ok {
response.SetStatusCode(http.StatusOK).SetMessage(fmt.Sprintf("Topic %s already registered", subscription.Topic)).SetData(subscription)
c.JSON(response.StatusCode, response)
return
}
ws.wsConf.RegisteredTopics[subscription.Topic] = true
ws.wsConf.Broadcast[subscription.Topic] = make(chan wsconnx.WsConnMessagePayload)
ws.wsConf.Topics[subscription.Topic] = true
ws.wsConf.broadcast[subscription.Topic] = make(chan wsconnx.WsConnMessagePayload)
go ws.Run(subscription.Topic)
response.SetStatusCode(http.StatusOK).SetMessage(fmt.Sprintf("Topic %s registered successfully", subscription.Topic)).SetData(subscription)
c.JSON(response.StatusCode, response)
Expand Down

0 comments on commit 69bd686

Please sign in to comment.