Skip to content

Commit

Permalink
* BUGFIX
Browse files Browse the repository at this point in the history
 Mesages push to MQTT
 Stopped sending of MQTT messages on system start
  • Loading branch information
mario-almeida committed Nov 17, 2017
1 parent 295eec1 commit 9edca6f
Showing 1 changed file with 91 additions and 57 deletions.
148 changes: 91 additions & 57 deletions RFM_MQTT_GW/RFM_MQTT_GW.ino
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,46 @@
//
// The MQTT topic is /home/rfm_gw/direction/nodeid/devid

#define VERSION "V1.1"

/****************************************************************************************************************
**** Include Required Libraries ****
****************************************************************************************************************/
#include <RFM69.h> //get it here: https://github.com/lowpowerlab/rfm69
#include <RFM69_ATC.h> //get it here: https://github.com/lowpowerlab/RFM69
#include <SPI.h> //included with Arduino IDE (www.arduino.cc)
#include <Ethernet.h>
#include <PubSubClient.h>

/****************************************************************************************************************
**** Network Settings ****
****************************************************************************************************************/
#define VERSION "V1.5"

#define DEBUG // uncomment for MQTT, RADIO and NETWORK debugging
#define DEBUGRADIO // uncomment for radio debugging
//#define DEBUGNETWORK // uncomment for network debugging
//#define DEBUGMQTT // uncomment for mqtt debugging

// Ethernet settings
byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED }; // MAC address for ethernet
IPAddress mqtt_server(1, 1, 1, 1); // MQTT broker address (Mosquitto)
IPAddress ip(1, 1, 1, 1); // Gateway address (if DHCP fails)

// Wireless settings
//****************************************************************************************************************
//**** IMPORTANT RADIO SETTINGS - YOU MUST CHANGE/CONFIGURE TO MATCH YOUR HARDWARE TRANSCEIVER CONFIGURATION! ****
//****************************************************************************************************************
// Wireless settings
#define NODEID 1 //the ID of this node
#define NETWORKID 250 //the network ID of all nodes this node listens/talks to
#define NODEID 1 //the ID of this node
#define NETWORKID 250 //the network ID of all nodes this node listens/talks to
#define FREQUENCY RF69_868MHZ //Match this with the version of your Moteino! (others: RF69_433MHZ, RF69_868MHZ)
#define ENCRYPTKEY "sampleEncryptKey" //identical 16 characters/bytes on all nodes, not more not less!
#define IS_RFM69HW //uncomment only for RFM69HW! Leave out if you have RFM69W!
#define ACK_TIME 30 // # of ms to wait for an ack packet
#define RFM_SS 8 // Slave Select RFM69 is connected to pin 8
//****************************************************************************************************************
#define ENABLE_ATC //comment out this line to disable AUTO TRANSMISSION CONTROL
#define ATC_RSSI -75 //target RSSI for RFM69_ATC (recommended > -80)
#define IS_RFM69HW //uncomment only for RFM69HW! Leave out if you have RFM69W!
#define ACK_TIME 30 // # of ms to wait for an ack packet
#define RFM_SS 8 // Slave Select RFM69 is connected to pin 8
//*****************************************************************************************************************************
#define ENABLE_ATC //comment out this line to disable AUTO TRANSMISSION CONTROL
#define ATC_RSSI -75 //target RSSI for RFM69_ATC (recommended > -80)
//*****************************************************************************************************************************
// Serial baud rate must match your Pi/host computer serial port baud rate!
#define SERIAL_BAUD 115200
//*****************************************************************************************************************************

//****************************************************************************************************************
#define SERIAL_BAUD 115200 // Serial baud rate
#define DEBUG // uncomment for MQTT, RADIO and NETWORK debugging
//#define DEBUGRADIO // uncomment for radio debugging
//#define DEBUGNETWORK // uncomment for network debugging
#define DEBUGMQTT // uncomment for mqtt debugging
//****************************************************************************************************************
// PIN settings
#define MQTT_CON_LED 7 // MQTT Connection indicator
#define RADIO_CONN_LED 9 // Radio activity indicator
#define MQCON 7 // MQTT Connection indicator
#define R_LED 9 // Radio activity indicator
//****************************************************************************************************************

int dest; // destination node for radio packet
Expand All @@ -69,6 +66,8 @@ char *subTopic = "home/rfm_gw/sb/#"; // MQTT subscription topic ; directi
char *clientName = "RFM_gateway"; // MQTT system name of gateway
char buff_topic[30]; // MQTT publish topic string
char buff_mess[32]; // MQTT publish message string
int startSBmessageAfter = 10; // Start publishing sb messages only after n number of secs after system is up and running
long timeSinceStart = 0; // Get current start time since system start

#ifdef ENABLE_ATC
RFM69_ATC radio;
Expand All @@ -85,14 +84,16 @@ PubSubClient mqttClient(mqtt_server, 1883, mqtt_subs, ethClient );
//
//============== MQTT_SUBS
//
// receive messages from subscribed MQRR topics
// receive messages from subscribed topics
// parse MQTT topic / message and construct radio message
//
// The values in the MQTT topic/message are converted to corresponding values on the Radio network
//

void mqtt_subs(char* topic, byte* payload, unsigned int length) {

int i;

#ifdef DEBUGMQTT
Serial.print(F("Topic received from MQTT Server: "));
Serial.print(topic);
Expand All @@ -109,7 +110,12 @@ void mqtt_subs(char* topic, byte* payload, unsigned int length) {
String strPayload = String((char*)payload); // convert to string
sprintf(buff_mess, "BTN%d:%s", BTN, strPayload.c_str());
respNeeded = true;
sendMsg(dest, buff_mess, 3 + sizeof(BTN) + strPayload.length());

// Start sending message only after system up and running time is reached.
if (timeSinceStart >= startSBmessageAfter) {
sendMsg(dest, buff_mess, 3 + sizeof(BTN) + strPayload.length());
}

} else {
error = 1;
#ifdef DEBUGMQTT
Expand All @@ -129,7 +135,6 @@ void mqtt_subs(char* topic, byte* payload, unsigned int length) {

} // end mqttSubs


//
//============== SETUP
//
Expand All @@ -149,16 +154,16 @@ void setup() {
radio.setHighPower(); // only for RFM69HW!
#endif
radio.encrypt(ENCRYPTKEY); // encrypt with shared key
radio.promiscuous(promiscuousMode); // listen only to nodes in closed network
radio.promiscuous(promiscuousMode); // listen only to nodes in closed network

#ifdef ENABLE_ATC
radio.enableAutoPower(ATC_RSSI);
#endif

pinMode(RADIO_CONN_LED, OUTPUT); // set pin of radio indicator
pinMode(MQTT_CON_LED, OUTPUT); // set pin for MQTT connection indicator
digitalWrite(MQTT_CON_LED, LOW); // switch off MQTT connection indicator
digitalWrite(RADIO_CONN_LED, LOW); // switch off radio indicator
pinMode(R_LED, OUTPUT); // set pin of radio indicator
pinMode(MQCON, OUTPUT); // set pin for MQTT connection indicator
digitalWrite(MQCON, LOW); // switch off MQTT connection indicator
digitalWrite(R_LED, LOW); // switch off radio indicator

#ifdef DEBUGRADIO
Serial.print(F("GW Version "));
Expand All @@ -172,7 +177,7 @@ void setup() {
Serial.println(F("Connecting to Network..."));
#endif

if (Ethernet.begin(mac) == 0) { // start the Ethernet connection
if (Ethernet.begin(mac) == 0) { // start the Ethernet connection
#ifdef DEBUG && DEBUGNETWORK
Serial.println(F("No DHCP"));
#endif
Expand All @@ -199,24 +204,25 @@ void setup() {
#endif
#endif

mqttCon = 0; // reset connection flag
while (mqttCon != 1) { // retry MQTT connection every 2 seconds
mqttCon = mqttClient.connect(clientName); // retry connection to broker
delay(2000); // every 2 seconds
mqttCon = 0; // reset connection flag
while (mqttCon != 1) { // retry MQTT connection every 2 seconds
mqttCon = mqttClient.connect(clientName); // retry connection to broker
delay(2000); // every 2 seconds
}

if (mqttCon) { // Connected !
if (mqttCon) { // Connected !
#ifdef DEBUGMQTT
Serial.println(F("MQTT-link OK"));
#endif
digitalWrite(MQTT_CON_LED, HIGH); // switch on MQTT connection indicator
digitalWrite(MQCON, HIGH); // switch on MQTT connection indicator

sprintf(buff_mess, "%s %s", clientName, "Joined Server.");
mqtt_pubs("home/rfm_gw/nb/", buff_mess);

mqttClient.subscribe(subTopic); // subscribe to all southbound messages
mqttClient.subscribe(subTopic); // subscribe to all southbound messages
} else {
digitalWrite(MQTT_CON_LED, LOW); // switch off MQTT connection indicator
// switch off MQTT connection indicator
digitalWrite(MQCON, LOW);
#ifdef DEBUGMQTT
Serial.println(F("MQTT connection lost"));
#endif
Expand All @@ -240,7 +246,7 @@ void loop() {
if (Rstat) { // turn off radio LED after 100 msec
if (millis() - onMillis > 100) {
Rstat = false;
digitalWrite(RADIO_CONN_LED, LOW);
digitalWrite(R_LED, LOW);
}
}

Expand All @@ -255,10 +261,12 @@ void loop() {

if (!mqttClient.loop()) { // check connection MQTT server and process MQTT subscription input
mqttCon = 0;
digitalWrite(MQTT_CON_LED, LOW);
#ifdef DEBUG && DEBUGMQTT
digitalWrite(MQCON, LOW);

#ifdef DEBUGMQTT
Serial.println(F("MQTT connection lost"));
#endif

while (mqttCon != 1) { // try to reconnect every 2 seconds
mqttCon = mqttClient.connect(clientName);
delay(2000);
Expand All @@ -268,7 +276,7 @@ void loop() {
#ifdef DEBUGMQTT
Serial.println(F("MQTT-link OK"));
#endif
digitalWrite(MQTT_CON_LED, HIGH); // switch on MQTT connection indicator
digitalWrite(MQCON, HIGH); // switch on MQTT connection indicator

sprintf(buff_mess, "%s %s", clientName, "Joined Server.");
mqtt_pubs("home/rfm_gw/nb/", buff_mess);
Expand All @@ -279,6 +287,12 @@ void loop() {
#endif
}
}

// Keep increment time until system is up and running
if (timeSinceStart <= startSBmessageAfter) {
timeSinceStart = millis() / 1000;
}

} // end loop

//
Expand All @@ -289,14 +303,14 @@ void loop() {
void sendMsg(int target, char* radioMessage, int msgSize) {

Rstat = true; // radio indicator on
digitalWrite(RADIO_CONN_LED, HIGH); // turn on radio LED
digitalWrite(R_LED, HIGH); // turn on radio LED
onMillis = millis(); // store timestamp

int i = 5; // number of transmission retries
int i = 1; // number of transmission retries

while (respNeeded && i > 0) { // first try to send packets

if (radio.sendWithRetry(target, radioMessage, msgSize)) {
if (radio.sendWithRetry(target, radioMessage, msgSize, 1)) {
respNeeded = false;
#ifdef DEBUGRADIO
Serial.print(F("Msg to node " )); Serial.print(target);
Expand All @@ -309,6 +323,9 @@ void sendMsg(int target, char* radioMessage, int msgSize) {
}

if (respNeeded && verbose) { // if not succeeded in sending packets after 5 retries
//sprintf(buff_topic, "home/rfm_gw/nb/node%03d/dev90", target); // construct MQTT topic and message
//sprintf(buff_mess, "radio lost node %d", target); // for radio loss (device 90)
//mqtt_pubs(buff_topic, buff_mess); // publish ...
respNeeded = false; // reset response needed flag
#ifdef DEBUGRADIO
Serial.print(F("Node "));
Expand All @@ -328,7 +345,7 @@ void sendMsg(int target, char* radioMessage, int msgSize) {

void processPacket() {
Rstat = true; // set radio indicator flag
digitalWrite(RADIO_CONN_LED, HIGH); // turn on radio LED
digitalWrite(R_LED, HIGH); // turn on radio LED
onMillis = millis(); // store timestamp

#ifdef DEBUGRADIO
Expand Down Expand Up @@ -368,20 +385,37 @@ void processPacket() {

char *p = pload;
char *str;
char *kv;

while ((str = strtok_r(p, " ", &p)) != NULL) {
String key = String(str).substring(0, 3);
String val = String(str).substring(4, String(str).length());

sprintf(buff_topic, "%s/nb/node%03d/%s", rootTopic, radio.SENDERID, key.c_str());
sprintf(buff_mess, "%s", val.c_str());
mqtt_pubs(buff_topic, buff_mess);
// Extract the key name
kv = strtok(str, ":");
if (kv) {

memset(&buff_topic, '\0', sizeof(buff_topic));
sprintf(buff_topic, "%s/nb/node%03d/%s", rootTopic, radio.SENDERID, kv);

// Extract the value
kv = strtok(NULL, ":");
if (kv) {

memset(&buff_mess, '\0', sizeof(buff_mess));
sprintf(buff_mess, "%s", kv);

mqtt_pubs(buff_topic, buff_mess);
}
}
}
memset(pload, '\0', sizeof(pload));
}

memset(&buff_topic, '\0', sizeof(buff_topic));
memset(&buff_mess, '\0', sizeof(buff_mess));

sprintf(buff_mess, "%d", radio.RSSI);
sprintf(buff_topic, "%s/nb/node%03d/RSSI", rootTopic, radio.SENDERID);
mqtt_pubs(buff_topic, buff_mess);

} // end processPacket

/*
Expand Down

0 comments on commit 9edca6f

Please sign in to comment.