Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor mqtt probe #83

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions prober/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func Handler(w http.ResponseWriter, r *http.Request, probes []config.Probe, logg
registry.MustRegister(probeDurationGauge)

start := time.Now()
if ProbeMQTT(probe, logger) {
mp := newMQTTProbe(probe, logger)
if mp != nil && mp.Probe(probe, logger) {
probeSuccessGauge.Set(1)
} else {
probeSuccessGauge.Set(0)
}
probeDurationGauge.Set(time.Since(start).Seconds())

h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
probeDurationGauge.Set(time.Since(start).Seconds())
promhttp.HandlerFor(registry, promhttp.HandlerOpts{}).ServeHTTP(w, r)
}
87 changes: 25 additions & 62 deletions prober/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package prober

import (
"context"
"emqx-exporter/config"
"fmt"
"sync"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -17,43 +14,12 @@ type MQTTProbe struct {
MsgChan <-chan mqtt.Message
}

type mqttProbeManager struct {
probes map[string]*MQTTProbe
sync.RWMutex
}

var manager mqttProbeManager

func init() {
manager = mqttProbeManager{
probes: make(map[string]*MQTTProbe),
}
go func() {
for {
manager.Lock()
defer manager.Unlock()
for target, probe := range manager.probes {
if probe == nil {
delete(manager.probes, target)
continue
}
}
manager.Unlock()

select {
case <-context.Background().Done():
return
case <-time.After(60 * time.Second):
}
}
}()
}

func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
func newMQTTProbe(probe config.Probe, logger log.Logger) *MQTTProbe {
var isReady = make(chan struct{})
var msgChan = make(chan mqtt.Message)

opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target)
opt.SetCleanSession(true)
opt.SetClientID(probe.ClientID)
opt.SetUsername(probe.Username)
opt.SetPassword(probe.Password)
Expand All @@ -63,7 +29,7 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
}
opt.SetOnConnectHandler(func(c mqtt.Client) {
optReader := c.OptionsReader()
level.Info(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target, "client_id", optReader.ClientID())
level.Debug(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target, "client_id", optReader.ClientID())
token := c.Subscribe(probe.Topic, probe.QoS, func(c mqtt.Client, m mqtt.Message) {
msgChan <- m
})
Expand All @@ -73,59 +39,56 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
return
}
isReady <- struct{}{}
level.Info(logger).Log("msg", "Subscribed to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
level.Debug(logger).Log("msg", "Subscribed to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
})
opt.SetConnectionLostHandler(func(c mqtt.Client, err error) {
level.Error(logger).Log("msg", "Lost connection to MQTT broker", "target", probe.Target, "err", err)
})
c := mqtt.NewClient(opt)
if token := c.Connect(); token.Wait() && token.Error() != nil {
level.Error(logger).Log("msg", "Failed to connect to MQTT broker", "target", probe.Target, "err", token.Error())
return nil, token.Error()
return nil
}

select {
case <-isReady:
case <-time.After(60 * time.Second):
return nil, fmt.Errorf("MQTT probe connect timeout")
case <-time.After(time.Duration(probe.KeepAlive) * time.Second):
level.Error(logger).Log("msg", "MQTT probe connect timeout", "target", probe.Target)
return nil
}

return &MQTTProbe{
Client: c,
MsgChan: msgChan,
}, nil
}
}

func ProbeMQTT(probe config.Probe, logger log.Logger) bool {
mqttProbe, ok := manager.probes[probe.Target]
if !ok {
var err error
if mqttProbe, err = initMQTTProbe(probe, logger); err != nil {
return false
}
manager.Lock()
defer manager.Unlock()
manager.probes[probe.Target] = mqttProbe
}
func (mp *MQTTProbe) Probe(probe config.Probe, logger log.Logger) bool {
defer mp.Client.Disconnect(0)

if !mqttProbe.Client.IsConnected() {
if !mp.Client.IsConnected() {
level.Error(logger).Log("msg", "MQTT client is not connected", "target", probe.Target)
return false
}

level.Info(logger).Log("msg", "Publishing MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
if token := mqttProbe.Client.Publish(probe.Topic, probe.QoS, false, "hello world"); token.Wait() && token.Error() != nil {
level.Debug(logger).Log("msg", "Publishing MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)

message := "from emqx-exporter MQTT probe"
if token := mp.Client.Publish(probe.Topic, probe.QoS, false, message); token.Wait() && token.Error() != nil {
level.Error(logger).Log("msg", "Failed to publish MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS, "err", token.Error())
return false
}

select {
case msg := <-mqttProbe.MsgChan:
if msg == nil {
return false
case msg := <-mp.MsgChan:
if msg != nil && string(msg.Payload()) == message {
level.Debug(logger).Log("msg", "MQTT probe receive message success", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
return true
}
level.Error(logger).Log("msg", "MQTT probe receive message failed", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
return false
case <-time.After(time.Duration(probe.KeepAlive) * time.Second):
level.Info(logger).Log("msg", "MQTT probe receive message timeout", "target", probe.Target)
level.Error(logger).Log("msg", "MQTT probe receive message timeout", "target", probe.Target)
return false
}

return true
}