Skip to content

Commit

Permalink
Added support for Lark persistent websocket connection (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
nt0xa authored Feb 12, 2025
1 parent 12911d1 commit b544fc2
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 36 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/gorilla/schema v1.4.0
github.com/invopop/jsonschema v0.8.0
github.com/jmoiron/sqlx v1.4.0
github.com/larksuite/oapi-sdk-go/v3 v3.2.7
github.com/larksuite/oapi-sdk-go/v3 v3.4.9
github.com/lib/pq v1.10.9
github.com/miekg/dns v1.1.61
github.com/mitchellh/mapstructure v1.5.0
Expand All @@ -45,6 +45,7 @@ require (
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ github.com/labbsr0x/bindman-dns-webhook v1.0.2/go.mod h1:p6b+VCXIR8NYKpDr8/dg1HK
github.com/labbsr0x/goh v1.0.1/go.mod h1:8K2UhVoaWXcCU7Lxoa2omWnC8gyW8px7/lmO61c027w=
github.com/larksuite/oapi-sdk-go/v3 v3.2.7 h1:1dAf8dkJJsHA0Qvan3d8LPMxFSppU4I9IAi5BabeIDE=
github.com/larksuite/oapi-sdk-go/v3 v3.2.7/go.mod h1:ZEplY+kwuIrj/nqw5uSCINNATcH3KdxSN7y+UxYY5fI=
github.com/larksuite/oapi-sdk-go/v3 v3.4.9 h1:ZzsPtdF2tsLbocQdKLNFwFCzWfwmKtD4kbyx7to++M0=
github.com/larksuite/oapi-sdk-go/v3 v3.4.9/go.mod h1:ZEplY+kwuIrj/nqw5uSCINNATcH3KdxSN7y+UxYY5fI=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linode/linodego v0.10.0/go.mod h1:cziNP7pbvE3mXIPneHj0oRY8L1WtGEIKlZ8LANE4eXA=
Expand Down
11 changes: 9 additions & 2 deletions internal/modules/lark/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,25 @@ type Config struct {
AppID string `mapstructure:"app_id"`
AppSecret string `mapstructure:"app_secret"`
VerificationToken string `mapstructure:"verification_token"`
Mode string `mapstructure:"mode"`
EncryptKey string `mapstructure:"encrypt_key"`
TLSEnabled bool `mapstructure:"tls_enabled"`
ProxyURL string `mapstructure:"proxy_url"`
ProxyInsecure bool `mapstructure:"proxy_insecure"`
}

const (
ModeWebhook = "webhook"
ModeWebsocket = "websocket"
)

func (c Config) Validate() error {
return validation.ValidateStruct(&c,
validation.Field(&c.Admin, validation.Required),
validation.Field(&c.AppID, validation.Required),
validation.Field(&c.AppSecret, validation.Required),
validation.Field(&c.VerificationToken, validation.Required),
validation.Field(&c.EncryptKey),
validation.Field(&c.Mode, validation.In(ModeWebhook, ModeWebsocket)),
validation.Field(&c.VerificationToken, validation.When(c.Mode == ModeWebhook, validation.Required)),
validation.Field(&c.EncryptKey, validation.When(c.Mode == ModeWebhook, validation.Required)),
)
}
93 changes: 60 additions & 33 deletions internal/modules/lark/lark.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/larksuite/oapi-sdk-go/v3/core/httpserverext"
larkevent "github.com/larksuite/oapi-sdk-go/v3/event"
larkim "github.com/larksuite/oapi-sdk-go/v3/service/im/v1"
larkws "github.com/larksuite/oapi-sdk-go/v3/ws"

"github.com/larksuite/oapi-sdk-go/v3/event/dispatcher"

Expand Down Expand Up @@ -66,8 +67,7 @@ func New(cfg *Config, db *database.DB, tlsConfig *tls.Config, acts actions.Actio
var client = lark.NewClient(
cfg.AppID,
cfg.AppSecret,
lark.WithLogReqAtDebug(true),
lark.WithLogLevel(larkcore.LogLevelDebug),
lark.WithLogLevel(larkcore.LogLevelInfo),
lark.WithHttpClient(httpClient))

// Check that AppID and AppSecret are valid
Expand Down Expand Up @@ -113,38 +113,52 @@ func New(cfg *Config, db *database.DB, tlsConfig *tls.Config, acts actions.Actio
}

func (lrk *Lark) Start() error {
var dispatcher *dispatcher.EventDispatcher

// Webhooks by default
if lrk.cfg.Mode == ModeWebhook || lrk.cfg.Mode == "" {
dispatcher = lrk.makeDispatcher(lrk.cfg.VerificationToken, lrk.cfg.EncryptKey, true)
return lrk.startWebhook(dispatcher)
} else {
dispatcher = lrk.makeDispatcher("", "", false)
return lrk.startWebsocket(dispatcher)
}
}

func (lrk *Lark) makeDispatcher(verificationToken, eventEncryptKey string, dedupEvents bool) *dispatcher.EventDispatcher {
// Sometimes the same event is sent several times, so keep recent event ids
// to prevent handling the same event more than once.
recentEvents := map[string]time.Time{}
recentEventsMutex := sync.Mutex{}

ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ticker.C:
toRemove := make([]string, 0)

for eventID, handledAt := range recentEvents {

// Cleanup events after 10m
// TODO: config
if time.Since(handledAt) > time.Minute*5 {
toRemove = append(toRemove, eventID)
if dedupEvents {
go func() {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-ticker.C:
toRemove := make([]string, 0)

for eventID, handledAt := range recentEvents {

// Cleanup events after 10m
// TODO: config
if time.Since(handledAt) > time.Minute*5 {
toRemove = append(toRemove, eventID)
}
}
}

recentEventsMutex.Lock()
for _, eventID := range toRemove {
delete(recentEvents, eventID)
recentEventsMutex.Lock()
for _, eventID := range toRemove {
delete(recentEvents, eventID)
}
recentEventsMutex.Unlock()
}
recentEventsMutex.Unlock()
}
}
}()
}()
}

handler := dispatcher.NewEventDispatcher(lrk.cfg.VerificationToken, lrk.cfg.EncryptKey).
return dispatcher.NewEventDispatcher(verificationToken, eventEncryptKey).
OnP2MessageReceiveV1(
func(ctx context.Context, event *larkim.P2MessageReceiveV1) error {

Expand All @@ -159,17 +173,19 @@ func (lrk *Lark) Start() error {
return nil
}

eventID := event.EventV2Base.Header.EventID
if dedupEvents {
eventID := event.EventV2Base.Header.EventID

recentEventsMutex.Lock()
if _, ok := recentEvents[eventID]; ok {
recentEventsMutex.Unlock()
recentEventsMutex.Lock()
if _, ok := recentEvents[eventID]; ok {
recentEventsMutex.Unlock()

// Event was already handled
return nil
// Event was already handled
return nil
}
recentEvents[eventID] = time.Now()
recentEventsMutex.Unlock()
}
recentEvents[eventID] = time.Now()
recentEventsMutex.Unlock()

userID := event.Event.Sender.SenderId.UserId
msgID := event.Event.Message.MessageId
Expand Down Expand Up @@ -234,12 +250,23 @@ func (lrk *Lark) Start() error {
return nil
},
)
}

func (lrk *Lark) startWebsocket(eventHandler *dispatcher.EventDispatcher) error {
cli := larkws.NewClient(lrk.cfg.AppID, lrk.cfg.AppSecret,
larkws.WithEventHandler(eventHandler),
larkws.WithLogLevel(larkcore.LogLevelInfo),
)

return cli.Start(context.TODO())
}

func (lrk *Lark) startWebhook(eventHandler *dispatcher.EventDispatcher) error {
mux := http.NewServeMux()

// TODO: take path from config
mux.HandleFunc("/webhook/event", httpserverext.NewEventHandlerFunc(handler,
larkevent.WithLogLevel(larkcore.LogLevelDebug)))
mux.HandleFunc("/webhook/event", httpserverext.NewEventHandlerFunc(eventHandler,
larkevent.WithLogLevel(larkcore.LogLevelInfo)))

// TODO: take port from config
srv := http.Server{
Expand Down

0 comments on commit b544fc2

Please sign in to comment.