Skip to content

Commit

Permalink
✨ feat: added zookeeper exhange to view the exchange while producing …
Browse files Browse the repository at this point in the history
…a message #4
  • Loading branch information
pnguyen215 committed Jan 6, 2024
1 parent 589c058 commit 8535ae6
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 4 deletions.
6 changes: 6 additions & 0 deletions example/rmqconn_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func TestServiceProduceMessage(t *testing.T) {
logger.Errorf("Producing message got an error", err)
return
}
err = svc.Produce(*message, "Hello 2 RabbitMQ!")
if err != nil {
logger.Errorf("Producing message got an error", err)
return
}
svc.ZookeeperExchangeNoop()
logger.Infof("Produced message successfully")
}

Expand Down
96 changes: 92 additions & 4 deletions rmqconn_cluster_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@ package rmqconn

import (
"context"
"encoding/binary"
"fmt"
"time"

amqp "github.com/rabbitmq/amqp091-go"
"github.com/sivaosorg/govm/logger"
"github.com/sivaosorg/govm/rabbitmqx"
"github.com/sivaosorg/govm/utils"
)

var (
zookeeperExchanges map[string]struct {
L int
T int64
} = make(map[string]struct {
L int
T int64
})
)

var callbackDefault = func(next amqp.Delivery) {
logger.Debugf(fmt.Sprintf("Received exchange: %v, message (content-type: %s): %s", next.Exchange, next.ContentType, string(next.Body)))
}
Expand All @@ -27,6 +39,14 @@ type RmqClusterService interface {
ConsumeByMap(clusters map[string]rabbitmqx.RabbitMqMessageConfig, key string, callback func(next amqp.Delivery)) error
ProduceBySlice(clusters []rabbitmqx.MultiTenantRabbitMqConfig, key string, usableMessageDefault bool, data interface{}) error
ConsumeBySlice(clusters []rabbitmqx.MultiTenantRabbitMqConfig, key string, usableMessageDefault bool, callback func(next amqp.Delivery)) error
ZookeeperExchangeGenKey(message rabbitmqx.RabbitMqMessageConfig) string
ZookeeperExchangeKeyExists(message rabbitmqx.RabbitMqMessageConfig) bool
ZookeeperExchangePushKey(message rabbitmqx.RabbitMqMessageConfig)
ZookeeperExchangePushKeyIfNeeded(message rabbitmqx.RabbitMqMessageConfig)
ZookeeperExchangeRemoveKey(message rabbitmqx.RabbitMqMessageConfig) bool
ZookeeperExchangeSize() int
ZookeeperExchangeDestroy()
ZookeeperExchangeNoop()
}

type rmqClusterServiceImpl struct {
Expand Down Expand Up @@ -100,16 +120,19 @@ func (s *rmqClusterServiceImpl) Produce(message rabbitmqx.RabbitMqMessageConfig,
if !message.IsEnabled {
return fmt.Errorf("Message (exchange: %s, queue: %s) unavailable", message.Exchange.Name, message.Queue.Name)
}
err := s.DeclareExchange(message)
if err != nil {
return err
if !s.ZookeeperExchangeKeyExists(message) {
err := s.DeclareExchange(message)
if err != nil {
return err
}
s.ZookeeperExchangePushKey(message)
}
if s.c.Config.DebugMode {
_logger.Info(fmt.Sprintf("Producer is running for messages (exchange: %s, queue: %s) outgoing data: %v", message.Exchange.Name, message.Queue.Name, utils.ToJson(data)))
} else {
_logger.Info(fmt.Sprintf("Producer is running for messages (exchange: %s, queue: %s)", message.Exchange.Name, message.Queue.Name))
}
err = s.c.channel.PublishWithContext(
err := s.c.channel.PublishWithContext(
context.Background(),
message.Exchange.Name,
"",
Expand Down Expand Up @@ -232,3 +255,68 @@ func (s *rmqClusterServiceImpl) ConsumeBySlice(clusters []rabbitmqx.MultiTenantR
}
return s.ConsumeByMap(v.Config.Clusters, key, callback)
}

func (s *rmqClusterServiceImpl) ZookeeperExchangeGenKey(message rabbitmqx.RabbitMqMessageConfig) string {
// the form of key
// ex:n:ABC:k:fanout
// which exchange has name and kind
form := fmt.Sprintf("ex:n:%s:k:%s", message.Exchange.Name, message.Exchange.Kind)
return form
}

func (s *rmqClusterServiceImpl) ZookeeperExchangeKeyExists(message rabbitmqx.RabbitMqMessageConfig) bool {
if len(zookeeperExchanges) == 0 || zookeeperExchanges == nil {
return false
}
if !message.IsEnabled {
return false
}
key := s.ZookeeperExchangeGenKey(message)
_, ok := zookeeperExchanges[key]
return ok
}

func (s *rmqClusterServiceImpl) ZookeeperExchangePushKey(message rabbitmqx.RabbitMqMessageConfig) {
if len(zookeeperExchanges) == 0 || zookeeperExchanges == nil {
zookeeperExchanges = make(map[string]struct {
L int
T int64
})
}
key := s.ZookeeperExchangeGenKey(message)
zookeeperExchanges[key] = struct {
L int
T int64
}{
L: binary.Size([]byte(key)),
T: time.Now().UnixMilli(),
}
}

func (s *rmqClusterServiceImpl) ZookeeperExchangePushKeyIfNeeded(message rabbitmqx.RabbitMqMessageConfig) {
if s.ZookeeperExchangeKeyExists(message) {
return
}
s.ZookeeperExchangePushKey(message)
}

func (s *rmqClusterServiceImpl) ZookeeperExchangeRemoveKey(message rabbitmqx.RabbitMqMessageConfig) bool {
if !s.ZookeeperExchangeKeyExists(message) {
return false
}
key := s.ZookeeperExchangeGenKey(message)
delete(zookeeperExchanges, key)
return true
}

func (s *rmqClusterServiceImpl) ZookeeperExchangeSize() int {
return len(zookeeperExchanges)
}

func (s *rmqClusterServiceImpl) ZookeeperExchangeDestroy() {
zookeeperExchanges = nil
}

func (s *rmqClusterServiceImpl) ZookeeperExchangeNoop() {
logger.Debugf("Zookeeper Exchange(s): %v", zookeeperExchanges)
}

0 comments on commit 8535ae6

Please sign in to comment.