forked from SeppPenner/MQTT2MySQLBridge
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbridgeFiltering.py
71 lines (62 loc) · 2.24 KB
/
bridgeFiltering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import paho.mqtt.client as mqtt
import traceback
import pymysql.cursors
broker_source = "127.0.0.1"
broker_source_port = 1883
client_source = mqtt.Client("YourClientId")
client_source.username_pw_set("YourUsername", "YourPassword")
DatabaseHostName = 'YourHost'
DatabaseUserName = 'YourUser'
DatabasePassword = 'YourPassword'
DatabaseName = 'mqtt'
DatabasePort = 3306
print("Connecting to database")
connection = pymysql.connect(
host = DatabaseHostName,
user = DatabaseUserName,
password = DatabasePassword,
db = DatabaseName,
charset = 'utf8mb4',
cursorclass = pymysql.cursors.DictCursor,
port = DatabasePort
)
def insertIntoDatabase(message):
"Inserts the mqtt data into the database"
with connection.cursor() as cursor:
print("Inserting data: " + str(message.topic) + ";" + str(message.payload)[2:][:-1] + ";" + str(message.qos))
cursor.callproc('InsertIntoMQTTTable', [str(message.topic), str(message.payload)[2:][:-1], int(message.qos)])
connection.commit()
def on_message(client, userdata, message):
"Evaluated when a new message is received on a subscribed topic"
print("Received message '" + str(message.payload)[2:][:-1] + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
if not filterMessage(str(message.payload), str(message.topic), (message.qos)):
insertIntoDatabase(message)
def filterMessage(payload, topic, qos):
"Filters the messages depending on the configuration for the attributes payload, topic and QoS. 'True' means that the message is not forwarded."
# Examples below:
if(payload == "10 %"):
print('Filtered: payload == "10 %"')
return True
if(topic == "humidity" and qos == 0):
print('Filtered: topic == "humidity" and qos == 0')
return True
if(topic == "temperature" or qos == 2):
print('Filtered: topic == "temperature" or qos == 2')
return True
#Add your filters here
def setup():
"Runs the setup procedure for the client"
print("Setting up the onMessage handler")
client_source.on_message = on_message
print("Connecting to source")
client_source.connect(broker_source, broker_source_port)
client_source.subscribe("#", qos=1)
print("Setup finished, waiting for messages...")
try:
setup()
client_source.loop_forever()
except Exception as e:
traceback.print_exc()
finally:
connection.close()