1
- #!/usr/bin/env python3
1
+ #!/usr/bin/python3
2
2
3
3
import serial
4
4
import time
5
5
import sys
6
6
import os
7
7
import json
8
8
import signal
9
+ import threading
9
10
from datetime import datetime
10
11
import modbus_tk
11
12
import modbus_tk .defines as cst
12
13
from modbus_tk import modbus_rtu
14
+ import paho .mqtt .publish as publish
15
+ import paho .mqtt .client as mqtt
13
16
17
+ def msg (text ):
18
+ print ("%s : %s" % (datetime .now ().strftime ("%Y-%m-%d %H:%M:%S.%f" ),text ))
14
19
15
20
def getenv ():
16
21
conf = {
17
22
'serial' : {
18
- 'device ' : os .environ .get ('UART_DEVICE ' , '/dev/ttyUSB0' ),
23
+ 'port ' : os .environ .get ('UART_PORT ' , '/dev/ttyUSB0' ),
19
24
'baudrate' : os .environ .get ('UART_BAUD' , 19200 ),
20
25
"bytesize" : os .environ .get ('UART_BYTESIZE' , 8 ),
21
26
"parity" : os .environ .get ('UART_PARITY' , "N" ),
@@ -25,61 +30,199 @@ def getenv():
25
30
'addr' : os .environ .get ('EC133_ADDR' , 1 ),
26
31
'timeout' : os .environ .get ('EC133_TIMEOUT' , 0.2 ),
27
32
'command_topics' : {
28
- 'ch0 ' : os .environ .get ('CH0_COMMAND' , '' ),
29
- 'ch1 ' : os .environ .get ('CH1_COMMAND' , '' ),
30
- 'ch2 ' : os .environ .get ('CH2_COMMAND' , '' )
33
+ '0 ' : os .environ .get ('CH0_COMMAND' , '' ),
34
+ '1 ' : os .environ .get ('CH1_COMMAND' , '' ),
35
+ '2 ' : os .environ .get ('CH2_COMMAND' , '' )
31
36
},
32
37
'state_topics' : {
33
- 'ch0 ' : os .environ .get ('CH0_STATE' , '' ),
34
- 'ch1 ' : os .environ .get ('CH1_STATE' , '' ),
35
- 'ch2 ' : os .environ .get ('CH2_STATE' , '' )
38
+ '0 ' : os .environ .get ('CH0_STATE' , '' ),
39
+ '1 ' : os .environ .get ('CH1_STATE' , '' ),
40
+ '2 ' : os .environ .get ('CH2_STATE' , '' )
36
41
}
37
42
},
38
43
'mqtt' : {
39
44
'address' : os .environ .get ('MQTT_ADDR' , '127.0.0.1' ),
40
45
'port' : os .environ .get ('MQTT_PORT' , 1883 ),
41
46
'username' : os .environ .get ('MQTT_USER' , None ),
42
- 'password' : os .environ .get ('MQTT_PASS' , None )
47
+ 'password' : os .environ .get ('MQTT_PASS' , None ),
48
+ 'qos' : os .environ .get ('MQTT_QOS' , int (0 ))
43
49
}
44
50
}
45
51
46
52
return conf
47
53
48
54
49
- class SignalHandler :
50
- signum = False
51
-
52
- def __init__ (self ):
53
- signal .signal (signal .SIGINT , self .exit_gracefully )
54
- signal .signal (signal .SIGTERM , self .exit_gracefully )
55
-
56
- def exit_gracefully (self , signum , frame ):
57
- self .signum = signum
58
-
59
-
60
55
class Ec133 :
61
56
62
- def __init__ (self ):
57
+ def __init__ (self , serconf , ecconf , callback = False ):
58
+ self .serconf = serconf
59
+ self .ecconf = ecconf
60
+ self .ser = False
61
+ self .rtu = False
62
+ self .reinit_count = 3
63
+ self .callback = callback
64
+ self .brightness = [255 , 255 , 255 ]
65
+ self .register = [255 , 255 , 255 ]
66
+ self .lock = threading .Lock ()
67
+
68
+ def __del__ (self ):
69
+ msg ("Closing serial device" )
70
+ if bool (self .rtu ):
71
+ del self .rtu
72
+ if bool (self .ser ):
73
+ del self .ser
74
+
75
+ def set_callback (self ,callback ):
76
+ self .callback = callback
77
+
78
+ def connect (self ):
79
+ """
80
+ Method that establishes connection with encmods on the other end of a
81
+ serial line. By default it has 3 retries.
82
+ """
83
+ i = self .reinit_count
84
+ while i > 0 :
85
+ try :
86
+ self .ser = serial .Serial (** self .serconf )
87
+ except Exception as e :
88
+ msg ("Serial init attempt #%s failed" % i )
89
+ time .sleep (0.2 )
90
+ i -= 1
91
+ if i == 0 :
92
+ raise e
93
+ else :
94
+ msg ("Serial line opened" )
95
+ i = 0
96
+
97
+ try :
98
+ self .rtu = modbus_rtu .RtuMaster (self .ser )
99
+ self .rtu .set_timeout (float (self .ecconf ['timeout' ]))
100
+ except Exception as e :
101
+ msg ("Unable to initialize RTU master" )
102
+ raise e
103
+
104
+ def set_channel (self , client , userdata , message ):
105
+ ch = int (userdata ['channel' ])
106
+
107
+ payload_str = str (message .payload .decode ("utf-8" ))
108
+
109
+ try :
110
+ payload = json .loads (payload_str )
111
+ except Exception as e :
112
+ msg ("Channel%s : Malformed json message : %s" % (ch ,e ))
113
+ return
114
+
115
+ if type (payload ) is not dict :
116
+ msg ("Channel%s : mqtt_json format expected , got %s!" % (ch ,type (payload )))
117
+ return
118
+
119
+ self .lock .acquire (blocking = True , timeout = - 1 )
120
+
121
+ msg ("Channel%s: %s" % (ch ,payload ))
122
+
123
+ if payload .get ("brightness" ,False ):
124
+ self .brightness [ch ] = int (payload ['brightness' ])
125
+
126
+ if payload .get ('state' ,'ON' ) == 'ON' :
127
+ self .register [ch ] = int (self .brightness [ch ])
128
+ else :
129
+ self .register [ch ] = int (0 )
130
+
131
+ try :
132
+ self .rtu .execute (self .ecconf ['addr' ],
133
+ cst .WRITE_MULTIPLE_REGISTERS ,
134
+ ch ,
135
+ output_value = [self .register [ch ]]
136
+ )
137
+ except Exception as e :
138
+ msg (str (e ))
139
+ time .sleep (0.2 )
140
+ self .lock .release ()
141
+ self .set_channel (client , userdata , message )
142
+ #raise e
143
+ else :
144
+ time .sleep (0.02 )
145
+ if bool (self .callback ):
146
+ self .callback (ch ,payload_str )
147
+ self .lock .release ()
148
+
149
+
150
+ class Mqtt :
151
+
152
+ def __init__ (self , mqconf , command_topics , state_topics , callback ):
153
+ self .mqconf = mqconf
154
+ self .ctopics = command_topics
155
+ self .stopics = state_topics
156
+ self .callback = callback
157
+ self .connhandlers = []
158
+
159
+ def __del__ (self ):
160
+ msg ("Stopping all mq connections" )
161
+ for h in self .connhandlers :
162
+ h .loop_stop ()
163
+ h .disconnect ()
164
+
165
+ def _consume_topic (self , channel ):
166
+ c = mqtt .Client ()
167
+ c .on_message = self .callback
168
+ c .user_data_set ({'channel' : channel })
169
+ if self .mqconf ['username' ] != None :
170
+ c .username_pw_set (self .mqconf ['username' ], password = self .mqconf ['password' ])
171
+ c .connect (self .mqconf ['address' ], port = self .mqconf ['port' ], keepalive = 15 )
172
+ c .subscribe (self .ctopics [channel ], qos = self .mqconf ['qos' ])
173
+ c .loop_start ()
174
+ self .connhandlers .append (c )
175
+
176
+ def consume_all (self ):
177
+ for ch , topic in self .ctopics .items ():
178
+ self ._consume_topic (ch )
179
+
180
+ def postback (self ,ch ,payload ):
181
+ auth = None
182
+ if self .mqconf ['username' ] != None :
183
+ auth = { 'username' : self .mqconf ['username' ],
184
+ 'password' : self .mqconf ['password' ]
185
+ }
186
+
187
+ try :
188
+ publish .single (self .stopics [str (ch )],
189
+ hostname = self .mqconf ['address' ],
190
+ port = self .mqconf ['port' ],
191
+ auth = auth ,
192
+ payload = payload ,
193
+ qos = self .mqconf ['qos' ],
194
+ keepalive = 15 ,
195
+ retain = True )
196
+ except Exception as e :
197
+ msg ("Unable to send channel%s state update : %s" % (ch ,e ))
198
+ else :
199
+ msg ("Channel%s state update sent" % ch )
200
+
63
201
64
202
def main ():
65
203
"""
66
204
Main routine
67
205
"""
68
-
206
+
207
+ msg ("Start ..." )
69
208
conf = getenv ()
70
209
71
- sig_handler = SignalHandler ()
210
+ msg ("Connect ec133" )
211
+ ec = Ec133 (conf ['serial' ], conf ['ec133' ])
212
+ ec .connect ()
72
213
73
- while True :
74
- if ( sig_handler . signum == signal . Signals [ 'SIGINT' ]. value
75
- or sig_handler . signum == signal . Signals [ 'SIGTERM' ]. value ):
76
- break
214
+ msg ( "Consume mqtt topics" )
215
+ mq = Mqtt ( conf [ 'mqtt' ], conf [ 'ec133' ][ 'command_topics' ], conf [ 'ec133' ][ 'state_topics' ], ec . set_channel )
216
+ mq . consume_all ()
217
+ ec . set_callback ( mq . postback )
77
218
78
- time .sleep (0.01 )
79
-
80
- print ("Stopping now on %s ..." % (signal .Signals (sig_handler .signum )).name )
219
+ signal .pause ()
81
220
221
+ msg ("Stopping now on signal ..." )
222
+ del mq
223
+ del ec
82
224
83
225
84
226
if __name__ == "__main__" :
85
227
main ()
228
+
0 commit comments