-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMqttActor.py
More file actions
85 lines (71 loc) · 3.29 KB
/
MqttActor.py
File metadata and controls
85 lines (71 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import logging
import pykka, paho.mqtt.client as mqtt, os, urlparse, socket, json, time, config
from subprocess import call
import TrackQueueActor
import PingActor
class MqttActor(pykka.ThreadingActor):
def __init__(self):
super(MqttActor, self).__init__()
self.uid = config.uproar.get('token')
self.track_queue = None
self.client = None
self.once = True
def on_message(self, client, userdata, msg):
print("MQTT <-" + str(msg))
self.check_q_a()
if msg.topic == ("device_in_" + self.uid):
update = json.loads(str(msg.payload))
update_type = update['update']
if update_type == "add_content":
self.track_queue.tell({'command': 'add_content', 'content': update["data"]})
elif update_type == "skip":
self.track_queue.tell({'command': 'skip', 'orig': update["data"]})
elif update_type == "promote":
self.track_queue.tell({'command': 'promote', 'orig': update["data"]})
elif update_type == "volume":
vol_action = "+" if update["data"] == "1" else "-"
try:
call(["amixer", "-q", "sset", "\'Power Amplifier\'", "5%" + vol_action])
except OSError:
pass
def check_q_a(self):
if self.track_queue is None or not self.track_queue.is_alive():
self.track_queue = TrackQueueActor.TrackQueueActor.start(self.actor_ref)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("device_in_" + self.uid, 2)
if self.once:
self.once = False
client.publish('registry', self.uid)
self.check_q_a()
self.track_queue.tell({'command': 'startup'})
def initMqtt(self):
print('init mqtt')
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
url_str = 'mqtt://%s:%s@m21.cloudmqtt.com:18552' % (self.uid.split("-")[0] + "-" + self.uid.split("-")[1], self.uid)
url = urlparse.urlparse(url_str)
self.client.username_pw_set(url.username, url.password)
self.client.connect(url.hostname, url.port)
self.client.loop_start()
def on_receive(self, message):
if message.get('command') == 'init':
try:
self.client = mqtt.Client()
self.initMqtt()
except Exception as ex:
logging.exception(ex)
time.sleep(1)
self.actor_ref.tell({'command': 'init'})
elif message.get('command') == "update_track_status":
track = message.get('track')
update = {"update":"update_track_status", "token":self.uid, "data":track}
outMsg = str(json.dumps(update))
self.publish(outMsg)
elif message.get('command') == "update":
outMsg = str(json.dumps({"update": message.get("update"), "token": self.uid, "data": message.get("data")}))
self.publish(outMsg)
def publish(self, outMsg):
print("MQTT ->" + str(outMsg))
self.client.publish("device_out", payload=outMsg, qos=2)