Skip to content

Unable to Receive Messages on HiveMQ Broker #689

Open
@neilor-mendes

Description

@neilor-mendes

Hi everyone,

I’m encountering an issue with my code that’s supposed to connect to a HiveMQ Broker. While the connection seems to be established successfully (as indicated by the keepAlive and ping logs), I'm not receiving any messages sent to the topic.

Here’s what I’ve done so far:

  • Successfully connected to the broker.
  • Set up the SetDefaultPublishHandler.

I’m unsure what might be missing or incorrectly configured. Any guidance or suggestions would be greatly appreciated!

Thank you in advance for your help.

Version

go 1.23.0

require (
        github.com/eclipse/paho.mqtt.golang v1.5.0
        github.com/golang/glog v1.2.2
)

require (
        github.com/golang/snappy v0.0.4 // indirect
        github.com/gorilla/websocket v1.5.3 // indirect
        github.com/klauspost/compress v1.13.6 // indirect
        github.com/montanaflynn/stats v0.7.1 // indirect
        github.com/pelletier/go-toml v1.9.5 // indirect
        github.com/xdg-go/pbkdf2 v1.0.0 // indirect
        github.com/xdg-go/scram v1.1.2 // indirect
        github.com/xdg-go/stringprep v1.0.4 // indirect
        github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
        go.mongodb.org/mongo-driver v1.16.1 // indirect
        golang.org/x/crypto v0.25.0 // indirect
        golang.org/x/net v0.27.0 // indirect
        golang.org/x/sync v0.7.0 // indirect
        golang.org/x/text v0.16.0 // indirect
)

Code

package main

import (
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, os.Interrupt)
	signal.Notify(sig, syscall.SIGTERM)

	mqtt.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
	mqtt.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
	mqtt.WARN = log.New(os.Stdout, "[WARN]  ", 0)
	mqtt.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)

	var knt int = 0

	// Load CA certificate
	certpool := x509.NewCertPool()
	pemCerts, err := os.ReadFile("mqttutil/cacert.pem")

	if err != nil {
		log.Fatalf("Error loading CA certificate: %v", err)
	}

	if !certpool.AppendCertsFromPEM(pemCerts) {
		log.Fatalf("Failed to append CA certificate")
	}

	tlsConfig := &tls.Config{
		RootCAs:            certpool,
		ClientAuth:         tls.NoClientCert,
		ClientCAs:          nil,
		InsecureSkipVerify: true,
	}

	opts := mqtt.NewClientOptions().
		AddBroker("ssl://broker.hivemq.com:8883").
		SetClientID("mqtt2mongo").
		SetUsername("username").
		SetPassword("password").
		SetTLSConfig(tlsConfig).
		SetAutoReconnect(true).
		SetConnectRetry(true).
		SetOrderMatters(false).
		SetCleanSession(true)

	opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
		fmt.Printf("MSG: %s\n", msg.Payload())
		text := fmt.Sprintf("this is result msg #%d!", knt)
		knt++
		token := client.Publish("nn/result", 0, false, text)
		token.Wait()
	})

	topic := "nn/result"

	opts.OnConnect = func(c mqtt.Client) {
		if token := c.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
			panic(token.Error())
		}
	}

	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	} else {
		fmt.Printf("Connected to server\n")
	}

	<-sig
	fmt.Println("signal caught - exiting")
	defer client.Unsubscribe("nn/result")
	defer client.Disconnect(250)
	fmt.Println("shutdown complete")
}

Debug Log

[DEBUG] [client]   Connect()
[DEBUG] [store]    memorystore initialized
[DEBUG] [client]   about to write new connect msg
[DEBUG] [client]   socket connected to broker
[DEBUG] [client]   Using MQTT 3.1.1 protocol
[DEBUG] [net]      connect started
[DEBUG] [net]      received connack
[DEBUG] [client]   startCommsWorkers called
[DEBUG] [client]   client is connected/reconnected
[DEBUG] [net]      incoming started
[DEBUG] [net]      startIncomingComms started
[DEBUG] [net]      outgoing started
[DEBUG] [net]      startComms started
[DEBUG] [client]   startCommsWorkers done
[WARN]  [store]    memorystore wiped
Connected to server
[DEBUG] [client]   enter Subscribe
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [pinger]   keepalive starting
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: inboundFromStore complete
[DEBUG] [client]   exit startClient
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [nn/result]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [client]   sending subscribe message, topic: nn/result
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 1 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 1
[DEBUG] [net]      startIncomingComms: granted qoss [0]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [pinger]   ping check 4.9984945
[DEBUG] [pinger]   ping check 9.9990185
[DEBUG] [pinger]   ping check 14.9984701
[DEBUG] [pinger]   ping check 19.9986013
[DEBUG] [pinger]   ping check 24.9985122
[DEBUG] [pinger]   ping check 29.998588
[DEBUG] [pinger]   ping check 34.9987725
[DEBUG] [pinger]   keepalive sending ping
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received pingresp
...

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions