Skip to content

Commit

Permalink
♻️ refactor: updated new function register topic on endpoint websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
pnguyen215 committed Nov 17, 2023
1 parent dbfa710 commit 1dd71b1
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 7 deletions.
6 changes: 6 additions & 0 deletions wsconn/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func NewWebsocket() *Websocket {
ws.SetConfig(*conf)
ws.SetUpgrader(wsUpgrader)
ws.SetAllowCloseConn(false)
ws.SetRegisteredTopics(make(map[string]bool))
return ws
}

Expand Down Expand Up @@ -56,6 +57,11 @@ func (ws *Websocket) SetAllowCloseConn(value bool) *Websocket {
return ws
}

func (ws *Websocket) SetRegisteredTopics(value map[string]bool) *Websocket {
ws.RegisteredTopics = value
return ws
}

func (ws *Websocket) Json() string {
return utils.ToJson(ws)
}
13 changes: 7 additions & 6 deletions wsconn/wsconn_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
)

type Websocket struct {
Config wsconnx.WsConnOptionConfig `json:"conf"`
AllowCloseConn bool `json:"allow_close_conn"`
Upgrader websocket.Upgrader `json:"-"`
Broadcast map[string]chan wsconnx.WsConnMessagePayload `json:"-"`
Subscribers map[*websocket.Conn]wsconnx.WsConnSubscription `json:"-"`
Mutex sync.Mutex `json:"-"`
Config wsconnx.WsConnOptionConfig `json:"conf"`
AllowCloseConn bool `json:"allow_close_conn"`
RegisteredTopics map[string]bool `json:"registered_topics"`
Upgrader websocket.Upgrader `json:"-"`
Broadcast map[string]chan wsconnx.WsConnMessagePayload `json:"-"`
Subscribers map[*websocket.Conn]wsconnx.WsConnSubscription `json:"-"`
Mutex sync.Mutex `json:"-"`
}
29 changes: 28 additions & 1 deletion wsconn/wsconn_service.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package wsconn

import (
"fmt"
"net/http"
"time"

"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/sivaosorg/govm/entity"
"github.com/sivaosorg/govm/wsconnx"
)

Expand All @@ -15,10 +18,11 @@ type WebsocketService interface {
AddSubscriber(conn *websocket.Conn, subscription wsconnx.WsConnSubscription)
SubscribeMessage(c *gin.Context)
BroadcastMessage(message wsconnx.WsConnMessagePayload)
RegisterTopic(c *gin.Context)
}

type websocketServiceImpl struct {
wsConf *Websocket `json:"-"`
wsConf *Websocket
}

func NewWebsocketService(wsConf *Websocket) WebsocketService {
Expand Down Expand Up @@ -114,3 +118,26 @@ func (ws *websocketServiceImpl) BroadcastMessage(message wsconnx.WsConnMessagePa
channel <- message
}
}

func (ws *websocketServiceImpl) RegisterTopic(c *gin.Context) {
ws.wsConf.Mutex.Lock()
defer ws.wsConf.Mutex.Unlock()
response := entity.NewResponseEntity()
var subscription wsconnx.WsConnSubscription
if err := c.ShouldBindJSON(&subscription); err != nil {
response.SetStatusCode(http.StatusBadRequest).SetError(err).SetMessage(err.Error())
c.JSON(response.StatusCode, response)
return
}
if _, ok := ws.wsConf.RegisteredTopics[subscription.Topic]; ok {
response.SetStatusCode(http.StatusOK).SetMessage(fmt.Sprintf("Topic %s already registered", subscription.Topic)).SetData(subscription)
c.JSON(response.StatusCode, response)
return
}
ws.wsConf.RegisteredTopics[subscription.Topic] = true
ws.wsConf.Broadcast[subscription.Topic] = make(chan wsconnx.WsConnMessagePayload)
go ws.Run(subscription.Topic)
response.SetStatusCode(http.StatusOK).SetMessage(fmt.Sprintf("Topic %s registered successfully", subscription.Topic)).SetData(subscription)
c.JSON(response.StatusCode, response)
return
}

0 comments on commit 1dd71b1

Please sign in to comment.