Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 1 addition & 69 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/mqtt"
"github.com/absmach/supermq/mqtt/events"
mqtttracing "github.com/absmach/supermq/mqtt/tracing"
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/grpcclient"
Expand All @@ -36,7 +35,6 @@ import (
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt"
"github.com/absmach/supermq/pkg/server"
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
Expand Down Expand Up @@ -135,31 +133,6 @@ func main() {
}()
tracer := tp.Tracer(svcName)

bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer bsub.Close()
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)

mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTTargetUsername, cfg.MQTTTargetPassword, cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
exitCode = 1
return
}
defer mpub.Close()

fwd := mqtt.NewForwarder(brokers.SubjectAllMessages, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllMessages)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
exitCode = 1
return
}

np, err := brokers.NewPublisher(ctx, cfg.BrokerURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
Expand Down Expand Up @@ -251,9 +224,7 @@ func main() {
go chc.CallHome(ctx)
}

beforeHandler := beforeHandler{
resolver: messaging.NewTopicResolver(channelsClient, domainsClient),
}
beforeHandler := mqtt.NewBeforeHandler(messaging.NewTopicResolver(channelsClient, domainsClient), parser, logger)

afterHandler := afterHandler{
username: cfg.MQTTTargetUsername,
Expand Down Expand Up @@ -385,42 +356,3 @@ func (ah afterHandler) Intercept(ctx context.Context, pkt packets.ControlPacket,

return pkt, nil
}

type beforeHandler struct {
resolver messaging.TopicResolver
}

// This interceptor is used to replace domain and channel routes with relevant domain and channel IDs in the message topic.
func (bh beforeHandler) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) {
switch pt := pkt.(type) {
case *packets.SubscribePacket:
for i, topic := range pt.Topics {
ft, err := bh.resolver.ResolveTopic(ctx, topic)
if err != nil {
return nil, err
}
pt.Topics[i] = ft
}

return pt, nil
case *packets.UnsubscribePacket:
for i, topic := range pt.Topics {
ft, err := bh.resolver.ResolveTopic(ctx, topic)
if err != nil {
return nil, err
}
pt.Topics[i] = ft
}
return pt, nil
case *packets.PublishPacket:
ft, err := bh.resolver.ResolveTopic(ctx, pt.TopicName)
if err != nil {
return nil, err
}
pt.TopicName = ft

return pt, nil
}

return pkt, nil
}
34 changes: 20 additions & 14 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,45 +14,51 @@ SMQ_NGINX_MQTTS_PORT=8883
SMQ_NGINX_SERVER_NAME=

## Nats
SMQ_NATS_HOST=nats
SMQ_NATS_PORT=4222
SMQ_NATS_HTTP_PORT=8222
SMQ_NATS_JETSTREAM_KEY=u7wFoAPgXpDueXOFldBnXDh4xjnSOyEJ2Cb8Z5SZvGLzIZ3U4exWhhoIBZHzuNvh
SMQ_NATS_URL=nats://nats:${SMQ_NATS_PORT}
# Configs for nats as MQTT broker
SMQ_NATS_HEALTH_CHECK=http://nats:${SMQ_NATS_HTTP_PORT}/healthz
SMQ_NATS_WS_TARGET_PATH=
SMQ_NATS_MQTT_PORT=1883
SMQ_NATS_MQTT_QOS=0
SMQ_NATS_MQTT_WS_PORT=8080
SMQ_NATS_MQTT_WS_PATH=/mqtt

## RabbitMQ
SMQ_RABBITMQ_HOST=rabbitmq
SMQ_RABBITMQ_PORT=5672
SMQ_RABBITMQ_HTTP_PORT=15672
SMQ_RABBITMQ_WS_PORT=15675
SMQ_RABBITMQ_USER=supermq
SMQ_RABBITMQ_PASS=supermq
SMQ_RABBITMQ_COOKIE=supermq
SMQ_RABBITMQ_VHOST=/
SMQ_RABBITMQ_URL=amqp://${SMQ_RABBITMQ_USER}:${SMQ_RABBITMQ_PASS}@rabbitmq:${SMQ_RABBITMQ_PORT}${SMQ_RABBITMQ_VHOST}
SMQ_RABBITMQ_HEALTH_CHECK=
SMQ_RABBITMQ_MQTT_PORT=1883
SMQ_RABBITMQ_MQTT_QOS=0
SMQ_RABBITMQ_WS_TARGET_PATH=/ws
SMQ_RABBITMQ_MQTT_WS_PORT=15675
SMQ_RABBITMQ_MQTT_WS_PATH=/ws

## Message Broker
SMQ_MESSAGE_BROKER_TYPE=msg_nats
SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL}

## MQTT Broker
SMQ_MQTT_BROKER_TYPE=rabbitmq
SMQ_MQTT_BROKER_HEALTH_CHECK=
SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_RABBITMQ_MQTT_QOS}
SMQ_MQTT_BROKER_TYPE=nats
SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_NATS_MQTT_QOS}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PROTOCOL=mqtt
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883
SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME=${SMQ_RABBITMQ_USER}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD=${SMQ_RABBITMQ_PASS}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_NATS_HOST}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=${SMQ_NATS_MQTT_PORT}
SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME=
SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD=
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_WS_TARGET_PROTOCOL=http
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=${SMQ_RABBITMQ_WS_PORT}
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_RABBITMQ_WS_TARGET_PATH}
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_NATS_HOST}
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=${SMQ_NATS_MQTT_WS_PORT}
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_NATS_MQTT_WS_PATH}

## Redis
SMQ_REDIS_TCP_PORT=6379
Expand Down
6 changes: 1 addition & 5 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,14 @@ Events store: This is used by SuperMQ services to store events for distributed p

## Supported Combinations

This is the same as MESSAGE_BROKER. This can either be `NATS` or `RabbitMQ` or `Redis`. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker.
This is the same as MESSAGE_BROKER. This can either be `NATS` or `RabbitMQ`. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker.

The current deployment strategy for SuperMQ in `docker/docker-compose.yaml` is to use RabbitMQ as a MQTT_BROKER and NATS as a MESSAGE_BROKER and EVENTS_STORE.

Depending on the desired setup, the following broker configurations are valid:

- `MQTT_BROKER: RabbitMQ`, `MESSAGE_BROKER: NATS`, `EVENTS_STORE: NATS`
- `MQTT_BROKER: RabbitMQ`, `MESSAGE_BROKER: NATS`, `EVENTS_STORE: Redis`
- `MQTT_BROKER: RabbitMQ`, `MESSAGE_BROKER: RabbitMQ`, `EVENTS_STORE: RabbitMQ`
- `MQTT_BROKER: RabbitMQ`, `MESSAGE_BROKER: RabbitMQ`, `EVENTS_STORE: Redis`
- `MQTT_BROKER: NATS`, `MESSAGE_BROKER: RabbitMQ`, `EVENTS_STORE: RabbitMQ`
- `MQTT_BROKER: NATS`, `MESSAGE_BROKER: RabbitMQ`, `EVENTS_STORE: Redis`
- `MQTT_BROKER: NATS`, `MESSAGE_BROKER: NATS`, `EVENTS_STORE: NATS`
- `MQTT_BROKER: NATS`, `MESSAGE_BROKER: NATS`, `EVENTS_STORE: Redis`

Expand Down
24 changes: 2 additions & 22 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,6 @@ services:
container_name: supermq-mqtt
depends_on:
- clients
- rabbitmq
- nats
restart: on-failure
environment:
Expand Down Expand Up @@ -1645,27 +1644,6 @@ services:
bind:
create_host_path: true

rabbitmq:
image: docker.io/rabbitmq:4.1.4-management-alpine
container_name: supermq-rabbitmq
restart: on-failure
environment:
RABBITMQ_ERLANG_COOKIE: ${SMQ_RABBITMQ_COOKIE}
RABBITMQ_DEFAULT_USER: ${SMQ_RABBITMQ_USER}
RABBITMQ_DEFAULT_PASS: ${SMQ_RABBITMQ_PASS}
RABBITMQ_DEFAULT_VHOST: ${SMQ_RABBITMQ_VHOST}
RABBITMQ_CONFIG_FILES: /etc/rabbitmq/conf.d/
ports:
- ${SMQ_RABBITMQ_PORT}:${SMQ_RABBITMQ_PORT}
- ${SMQ_RABBITMQ_HTTP_PORT}:${SMQ_RABBITMQ_HTTP_PORT}
- ${SMQ_RABBITMQ_WS_PORT}:${SMQ_RABBITMQ_WS_PORT}
volumes:
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
- ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/conf.d/10-defaults.conf
- supermq-mqtt-broker-volume:/var/lib/rabbitmq
networks:
- supermq-base-net

nats:
image: docker.io/nats:2.12.0-alpine3.22
container_name: supermq-nats
Expand All @@ -1675,6 +1653,8 @@ services:
- SMQ_NATS_PORT=${SMQ_NATS_PORT}
- SMQ_NATS_HTTP_PORT=${SMQ_NATS_HTTP_PORT}
- SMQ_NATS_JETSTREAM_KEY=${SMQ_NATS_JETSTREAM_KEY}
- SMQ_NATS_MQTT_PORT=${SMQ_NATS_MQTT_PORT}
- SMQ_NATS_MQTT_WS_PORT=${SMQ_NATS_MQTT_WS_PORT}
ports:
- ${SMQ_NATS_PORT}:${SMQ_NATS_PORT}
- ${SMQ_NATS_HTTP_PORT}:${SMQ_NATS_HTTP_PORT}
Expand Down
4 changes: 2 additions & 2 deletions docker/nats/nats.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ jetstream {
}

mqtt {
port: 1883
port: $SMQ_NATS_MQTT_PORT
max_ack_pending: 1
}

websocket {
port: 8080
port: $SMQ_NATS_MQTT_WS_PORT

no_tls: true
}
70 changes: 0 additions & 70 deletions mqtt/forwarder.go

This file was deleted.

34 changes: 0 additions & 34 deletions mqtt/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"log/slog"
"strings"
"time"

"github.com/absmach/mgate/pkg/session"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
Expand Down Expand Up @@ -158,44 +157,11 @@ func (h *handler) Connect(ctx context.Context) error {

// Publish - after client successfully published.
func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) error {
s, ok := session.FromContext(ctx)
if !ok {
return errors.Wrap(ErrFailedPublish, ErrClientNotInitialized)
}
h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic))

domainID, chanID, subTopic, topicType, err := h.parser.ParsePublishTopic(ctx, *topic, false)
if err != nil {
return errors.Wrap(ErrFailedPublish, err)
}

msg := messaging.Message{
Protocol: protocol,
Domain: domainID,
Channel: chanID,
Subtopic: subTopic,
Publisher: s.Username,
Payload: *payload,
Created: time.Now().UnixNano(),
}

if topicType == messaging.MessageType {
if err := h.publisher.Publish(ctx, messaging.EncodeMessageTopic(&msg), &msg); err != nil {
return errors.Wrap(ErrFailedPublishToMsgBroker, err)
}
}

return nil
}

// Subscribe - after client successfully subscribed.
func (h *handler) Subscribe(ctx context.Context, topics *[]string) error {
s, ok := session.FromContext(ctx)
if !ok {
return errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized)
}
h.logger.Info(fmt.Sprintf(LogInfoSubscribed, s.ID, strings.Join(*topics, ",")))

return nil
}

Expand Down
Loading
Loading