diff --git a/cmd/mqtt/main.go b/cmd/mqtt/main.go index 61d56316f8..96d24af06b 100644 --- a/cmd/mqtt/main.go +++ b/cmd/mqtt/main.go @@ -26,7 +26,6 @@ import ( smqlog "github.com/absmach/supermq/logger" "github.com/absmach/supermq/mqtt" "github.com/absmach/supermq/mqtt/events" - mqtttracing "github.com/absmach/supermq/mqtt/tracing" domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient" "github.com/absmach/supermq/pkg/errors" "github.com/absmach/supermq/pkg/grpcclient" @@ -36,7 +35,6 @@ import ( brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing" msgevents "github.com/absmach/supermq/pkg/messaging/events" "github.com/absmach/supermq/pkg/messaging/handler" - mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt" "github.com/absmach/supermq/pkg/server" "github.com/absmach/supermq/pkg/uuid" "github.com/caarlos0/env/v11" @@ -135,31 +133,6 @@ func main() { }() tracer := tp.Tracer(svcName) - bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger) - if err != nil { - logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) - exitCode = 1 - return - } - defer bsub.Close() - bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub) - - mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTTargetUsername, cfg.MQTTTargetPassword, cfg.MQTTQoS, cfg.MQTTForwarderTimeout) - if err != nil { - logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err)) - exitCode = 1 - return - } - defer mpub.Close() - - fwd := mqtt.NewForwarder(brokers.SubjectAllMessages, logger) - fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllMessages) - if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil { - logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err)) - exitCode = 1 - return - } - np, err := brokers.NewPublisher(ctx, cfg.BrokerURL) if err != nil { logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) @@ -251,9 +224,7 @@ func main() { go chc.CallHome(ctx) } - beforeHandler := beforeHandler{ - resolver: messaging.NewTopicResolver(channelsClient, domainsClient), - } + beforeHandler := mqtt.NewBeforeHandler(messaging.NewTopicResolver(channelsClient, domainsClient), parser, logger) afterHandler := afterHandler{ username: cfg.MQTTTargetUsername, @@ -385,42 +356,3 @@ func (ah afterHandler) Intercept(ctx context.Context, pkt packets.ControlPacket, return pkt, nil } - -type beforeHandler struct { - resolver messaging.TopicResolver -} - -// This interceptor is used to replace domain and channel routes with relevant domain and channel IDs in the message topic. -func (bh beforeHandler) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) { - switch pt := pkt.(type) { - case *packets.SubscribePacket: - for i, topic := range pt.Topics { - ft, err := bh.resolver.ResolveTopic(ctx, topic) - if err != nil { - return nil, err - } - pt.Topics[i] = ft - } - - return pt, nil - case *packets.UnsubscribePacket: - for i, topic := range pt.Topics { - ft, err := bh.resolver.ResolveTopic(ctx, topic) - if err != nil { - return nil, err - } - pt.Topics[i] = ft - } - return pt, nil - case *packets.PublishPacket: - ft, err := bh.resolver.ResolveTopic(ctx, pt.TopicName) - if err != nil { - return nil, err - } - pt.TopicName = ft - - return pt, nil - } - - return pkt, nil -} diff --git a/docker/.env b/docker/.env index c879ba545f..b5ccb3c460 100644 --- a/docker/.env +++ b/docker/.env @@ -14,45 +14,51 @@ SMQ_NGINX_MQTTS_PORT=8883 SMQ_NGINX_SERVER_NAME= ## Nats +SMQ_NATS_HOST=nats SMQ_NATS_PORT=4222 SMQ_NATS_HTTP_PORT=8222 SMQ_NATS_JETSTREAM_KEY=u7wFoAPgXpDueXOFldBnXDh4xjnSOyEJ2Cb8Z5SZvGLzIZ3U4exWhhoIBZHzuNvh SMQ_NATS_URL=nats://nats:${SMQ_NATS_PORT} # Configs for nats as MQTT broker SMQ_NATS_HEALTH_CHECK=http://nats:${SMQ_NATS_HTTP_PORT}/healthz -SMQ_NATS_WS_TARGET_PATH= +SMQ_NATS_MQTT_PORT=1883 SMQ_NATS_MQTT_QOS=0 +SMQ_NATS_MQTT_WS_PORT=8080 +SMQ_NATS_MQTT_WS_PATH=/mqtt ## RabbitMQ +SMQ_RABBITMQ_HOST=rabbitmq SMQ_RABBITMQ_PORT=5672 SMQ_RABBITMQ_HTTP_PORT=15672 -SMQ_RABBITMQ_WS_PORT=15675 SMQ_RABBITMQ_USER=supermq SMQ_RABBITMQ_PASS=supermq SMQ_RABBITMQ_COOKIE=supermq SMQ_RABBITMQ_VHOST=/ SMQ_RABBITMQ_URL=amqp://${SMQ_RABBITMQ_USER}:${SMQ_RABBITMQ_PASS}@rabbitmq:${SMQ_RABBITMQ_PORT}${SMQ_RABBITMQ_VHOST} +SMQ_RABBITMQ_HEALTH_CHECK= +SMQ_RABBITMQ_MQTT_PORT=1883 SMQ_RABBITMQ_MQTT_QOS=0 -SMQ_RABBITMQ_WS_TARGET_PATH=/ws +SMQ_RABBITMQ_MQTT_WS_PORT=15675 +SMQ_RABBITMQ_MQTT_WS_PATH=/ws ## Message Broker SMQ_MESSAGE_BROKER_TYPE=msg_nats SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL} ## MQTT Broker -SMQ_MQTT_BROKER_TYPE=rabbitmq -SMQ_MQTT_BROKER_HEALTH_CHECK= -SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_RABBITMQ_MQTT_QOS} +SMQ_MQTT_BROKER_TYPE=nats +SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK} +SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_NATS_MQTT_QOS} SMQ_MQTT_ADAPTER_MQTT_TARGET_PROTOCOL=mqtt -SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} -SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 -SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME=${SMQ_RABBITMQ_USER} -SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD=${SMQ_RABBITMQ_PASS} -SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK} +SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_NATS_HOST} +SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=${SMQ_NATS_MQTT_PORT} +SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME= +SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD= +SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK} SMQ_MQTT_ADAPTER_WS_TARGET_PROTOCOL=http -SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} -SMQ_MQTT_ADAPTER_WS_TARGET_PORT=${SMQ_RABBITMQ_WS_PORT} -SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_RABBITMQ_WS_TARGET_PATH} +SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_NATS_HOST} +SMQ_MQTT_ADAPTER_WS_TARGET_PORT=${SMQ_NATS_MQTT_WS_PORT} +SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_NATS_MQTT_WS_PATH} ## Redis SMQ_REDIS_TCP_PORT=6379 diff --git a/docker/README.md b/docker/README.md index bcf209f2ab..18584a6f66 100644 --- a/docker/README.md +++ b/docker/README.md @@ -35,18 +35,14 @@ Events store: This is used by SuperMQ services to store events for distributed p ## Supported Combinations -This is the same as MESSAGE_BROKER. This can either be `NATS` or `RabbitMQ` or `Redis`. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker. +This is the same as MESSAGE_BROKER. This can either be `NATS` or `RabbitMQ`. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker. The current deployment strategy for SuperMQ in `docker/docker-compose.yaml` is to use RabbitMQ as a MQTT_BROKER and NATS as a MESSAGE_BROKER and EVENTS_STORE. Depending on the desired setup, the following broker configurations are valid: -- `MQTT_BROKER: RabbitMQ`, `MESSAGE_BROKER: NATS`, `EVENTS_STORE: NATS` -- `MQTT_BROKER: RabbitMQ`, `MESSAGE_BROKER: NATS`, `EVENTS_STORE: Redis` - `MQTT_BROKER: RabbitMQ`, `MESSAGE_BROKER: RabbitMQ`, `EVENTS_STORE: RabbitMQ` - `MQTT_BROKER: RabbitMQ`, `MESSAGE_BROKER: RabbitMQ`, `EVENTS_STORE: Redis` -- `MQTT_BROKER: NATS`, `MESSAGE_BROKER: RabbitMQ`, `EVENTS_STORE: RabbitMQ` -- `MQTT_BROKER: NATS`, `MESSAGE_BROKER: RabbitMQ`, `EVENTS_STORE: Redis` - `MQTT_BROKER: NATS`, `MESSAGE_BROKER: NATS`, `EVENTS_STORE: NATS` - `MQTT_BROKER: NATS`, `MESSAGE_BROKER: NATS`, `EVENTS_STORE: Redis` diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 1df5bf4446..dfb15df664 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -1207,7 +1207,6 @@ services: container_name: supermq-mqtt depends_on: - clients - - rabbitmq - nats restart: on-failure environment: @@ -1645,27 +1644,6 @@ services: bind: create_host_path: true - rabbitmq: - image: docker.io/rabbitmq:4.1.4-management-alpine - container_name: supermq-rabbitmq - restart: on-failure - environment: - RABBITMQ_ERLANG_COOKIE: ${SMQ_RABBITMQ_COOKIE} - RABBITMQ_DEFAULT_USER: ${SMQ_RABBITMQ_USER} - RABBITMQ_DEFAULT_PASS: ${SMQ_RABBITMQ_PASS} - RABBITMQ_DEFAULT_VHOST: ${SMQ_RABBITMQ_VHOST} - RABBITMQ_CONFIG_FILES: /etc/rabbitmq/conf.d/ - ports: - - ${SMQ_RABBITMQ_PORT}:${SMQ_RABBITMQ_PORT} - - ${SMQ_RABBITMQ_HTTP_PORT}:${SMQ_RABBITMQ_HTTP_PORT} - - ${SMQ_RABBITMQ_WS_PORT}:${SMQ_RABBITMQ_WS_PORT} - volumes: - - ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins - - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/conf.d/10-defaults.conf - - supermq-mqtt-broker-volume:/var/lib/rabbitmq - networks: - - supermq-base-net - nats: image: docker.io/nats:2.12.0-alpine3.22 container_name: supermq-nats @@ -1675,6 +1653,8 @@ services: - SMQ_NATS_PORT=${SMQ_NATS_PORT} - SMQ_NATS_HTTP_PORT=${SMQ_NATS_HTTP_PORT} - SMQ_NATS_JETSTREAM_KEY=${SMQ_NATS_JETSTREAM_KEY} + - SMQ_NATS_MQTT_PORT=${SMQ_NATS_MQTT_PORT} + - SMQ_NATS_MQTT_WS_PORT=${SMQ_NATS_MQTT_WS_PORT} ports: - ${SMQ_NATS_PORT}:${SMQ_NATS_PORT} - ${SMQ_NATS_HTTP_PORT}:${SMQ_NATS_HTTP_PORT} diff --git a/docker/nats/nats.conf b/docker/nats/nats.conf index a547b6bf05..f6e22e2337 100644 --- a/docker/nats/nats.conf +++ b/docker/nats/nats.conf @@ -16,12 +16,12 @@ jetstream { } mqtt { - port: 1883 + port: $SMQ_NATS_MQTT_PORT max_ack_pending: 1 } websocket { - port: 8080 + port: $SMQ_NATS_MQTT_WS_PORT no_tls: true } diff --git a/mqtt/forwarder.go b/mqtt/forwarder.go deleted file mode 100644 index a790e28532..0000000000 --- a/mqtt/forwarder.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package mqtt - -import ( - "context" - "fmt" - "log/slog" - - "github.com/absmach/supermq/pkg/messaging" -) - -// Forwarder specifies MQTT forwarder interface API. -type Forwarder interface { - // Forward subscribes to the Subscriber and - // publishes messages using provided Publisher. - Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error -} - -type forwarder struct { - topic string - logger *slog.Logger -} - -// NewForwarder returns new Forwarder implementation. -func NewForwarder(topic string, logger *slog.Logger) Forwarder { - return forwarder{ - topic: topic, - logger: logger, - } -} - -func (f forwarder) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { - subCfg := messaging.SubscriberConfig{ - ID: id, - Topic: f.topic, - Handler: handle(ctx, pub, f.logger), - } - - return sub.Subscribe(ctx, subCfg) -} - -func handle(ctx context.Context, pub messaging.Publisher, logger *slog.Logger) handleFunc { - return func(msg *messaging.Message) error { - if msg.GetProtocol() == protocol { - return nil - } - - topic := messaging.EncodeMessageMQTTTopic(msg) - - go func() { - if err := pub.Publish(ctx, topic, msg); err != nil { - logger.Warn(fmt.Sprintf("Failed to forward message: %s", err)) - } - }() - - return nil - } -} - -type handleFunc func(msg *messaging.Message) error - -func (h handleFunc) Handle(msg *messaging.Message) error { - return h(msg) -} - -func (h handleFunc) Cancel() error { - return nil -} diff --git a/mqtt/handler.go b/mqtt/handler.go index 53b5bd4e00..9db43643db 100644 --- a/mqtt/handler.go +++ b/mqtt/handler.go @@ -8,7 +8,6 @@ import ( "fmt" "log/slog" "strings" - "time" "github.com/absmach/mgate/pkg/session" grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1" @@ -158,44 +157,11 @@ func (h *handler) Connect(ctx context.Context) error { // Publish - after client successfully published. func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) error { - s, ok := session.FromContext(ctx) - if !ok { - return errors.Wrap(ErrFailedPublish, ErrClientNotInitialized) - } - h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic)) - - domainID, chanID, subTopic, topicType, err := h.parser.ParsePublishTopic(ctx, *topic, false) - if err != nil { - return errors.Wrap(ErrFailedPublish, err) - } - - msg := messaging.Message{ - Protocol: protocol, - Domain: domainID, - Channel: chanID, - Subtopic: subTopic, - Publisher: s.Username, - Payload: *payload, - Created: time.Now().UnixNano(), - } - - if topicType == messaging.MessageType { - if err := h.publisher.Publish(ctx, messaging.EncodeMessageTopic(&msg), &msg); err != nil { - return errors.Wrap(ErrFailedPublishToMsgBroker, err) - } - } - return nil } // Subscribe - after client successfully subscribed. func (h *handler) Subscribe(ctx context.Context, topics *[]string) error { - s, ok := session.FromContext(ctx) - if !ok { - return errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized) - } - h.logger.Info(fmt.Sprintf(LogInfoSubscribed, s.ID, strings.Join(*topics, ","))) - return nil } diff --git a/mqtt/intercepter.go b/mqtt/intercepter.go new file mode 100644 index 0000000000..583a205a51 --- /dev/null +++ b/mqtt/intercepter.go @@ -0,0 +1,101 @@ +package mqtt + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/absmach/mgate/pkg/session" + "github.com/absmach/supermq/pkg/errors" + "github.com/absmach/supermq/pkg/messaging" + "github.com/eclipse/paho.mqtt.golang/packets" + "google.golang.org/protobuf/proto" +) + +type beforeHandler struct { + resolver messaging.TopicResolver + parser messaging.TopicParser + logger *slog.Logger +} + +func NewBeforeHandler(resolver messaging.TopicResolver, parser messaging.TopicParser, logger *slog.Logger) session.Interceptor { + return &beforeHandler{ + resolver: resolver, + parser: parser, + logger: logger, + } +} + +// This interceptor is used to replace domain and channel routes with relevant domain and channel IDs in the message topic. +func (bh beforeHandler) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) { + switch pt := pkt.(type) { + case *packets.SubscribePacket: + for i, topic := range pt.Topics { + ft, err := bh.resolver.ResolveTopic(ctx, topic) + if err != nil { + return nil, err + } + pt.Topics[i] = ft + } + + return pt, nil + case *packets.UnsubscribePacket: + for i, topic := range pt.Topics { + ft, err := bh.resolver.ResolveTopic(ctx, topic) + if err != nil { + return nil, err + } + pt.Topics[i] = ft + } + return pt, nil + case *packets.PublishPacket: + ft, err := bh.resolver.ResolveTopic(ctx, pt.TopicName) + if err != nil { + return nil, err + } + pt.TopicName = ft + + s, ok := session.FromContext(ctx) + if !ok { + return pt, errors.Wrap(ErrFailedPublish, ErrClientNotInitialized) + } + bh.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, ft)) + + switch dir { + case session.Up: + domainID, chanID, subTopic, _, err := bh.parser.ParsePublishTopic(ctx, ft, false) + if err != nil { + return pt, errors.Wrap(ErrFailedPublish, err) + } + + msg := &messaging.Message{ + Protocol: "mqtt", + Domain: domainID, + Channel: chanID, + Subtopic: subTopic, + Publisher: s.Username, + Payload: pt.Payload, + Created: time.Now().UnixNano(), + } + + data, err := proto.Marshal(msg) + if err != nil { + return pt, err + } + pt.Payload = data + + case session.Down: + var msg messaging.Message + + if err := proto.Unmarshal(pt.Payload, &msg); err != nil { + return pt, errors.Wrap(ErrFailedPublish, err) + } + pt.Payload = msg.GetPayload() + } + + return pt, nil + } + + return pkt, nil +} diff --git a/mqtt/tracing/forwarder.go b/mqtt/tracing/forwarder.go deleted file mode 100644 index b904edc50b..0000000000 --- a/mqtt/tracing/forwarder.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package tracing - -import ( - "context" - "fmt" - - "github.com/absmach/supermq/mqtt" - "github.com/absmach/supermq/pkg/messaging" - "github.com/absmach/supermq/pkg/server" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -const forwardOP = "process" - -var _ mqtt.Forwarder = (*forwarderMiddleware)(nil) - -type forwarderMiddleware struct { - topic string - forwarder mqtt.Forwarder - tracer trace.Tracer - host server.Config -} - -// New creates new mqtt forwarder tracing middleware. -func New(config server.Config, tracer trace.Tracer, forwarder mqtt.Forwarder, topic string) mqtt.Forwarder { - return &forwarderMiddleware{ - forwarder: forwarder, - tracer: tracer, - topic: topic, - host: config, - } -} - -// Forward traces mqtt forward operations. -func (fm *forwarderMiddleware) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { - spanName := fmt.Sprintf("%s %s", fm.topic, forwardOP) - - ctx, span := fm.tracer.Start(ctx, - spanName, - trace.WithAttributes( - attribute.String("messaging.system", "mqtt"), - attribute.Bool("messaging.destination.anonymous", false), - attribute.String("messaging.destination.template", "m/{domainID}/c/{channelID}/*"), - attribute.Bool("messaging.destination.temporary", true), - attribute.String("network.protocol.name", "mqtt"), - attribute.String("network.protocol.version", "3.1.1"), - attribute.String("network.transport", "tcp"), - attribute.String("network.type", "ipv4"), - attribute.String("messaging.operation", forwardOP), - attribute.String("messaging.client_id", id), - attribute.String("server.address", fm.host.Host), - attribute.String("server.socket.port", fm.host.Port), - ), - ) - defer span.End() - - return fm.forwarder.Forward(ctx, id, sub, pub) -}