diff --git a/BrewPiUtil.py b/BrewPiUtil.py index c357d22..ae51185 100644 --- a/BrewPiUtil.py +++ b/BrewPiUtil.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with BrewPi. If not, see . +from __future__ import print_function import time import sys import os @@ -23,7 +24,7 @@ try: import configobj except ImportError: - print "BrewPi requires ConfigObj to run, please install it with 'sudo apt-get install python-configobj" + print("BrewPi requires ConfigObj to run, please install it with 'sudo apt-get install python-configobj") sys.exit(1) @@ -79,12 +80,14 @@ def configSet(configFile, settingName, value): "To fix this, run 'sudo sh /home/brewpi/fixPermissions.sh'") return readCfgWithDefaults(configFile) # return updated ConfigObj +def printStdErr(*objs): + print("", *objs, file=sys.stderr) def logMessage(message): """ Prints a timestamped message to stderr """ - print >> sys.stderr, time.strftime("%b %d %Y %H:%M:%S ") + message + printStdErr(time.strftime("%b %d %Y %H:%M:%S ") + message) def scriptPath(): @@ -99,9 +102,9 @@ def removeDontRunFile(path='/var/www/do_not_run_brewpi'): if os.path.isfile(path): os.remove(path) if not sys.platform.startswith('win'): # cron not available - print "BrewPi script will restart automatically." + print("BrewPi script will restart automatically.") else: - print "File do_not_run_brewpi does not exist at " + path + print("File do_not_run_brewpi does not exist at " + path) def setupSerial(config, baud_rate=57600, time_out=0.1): diff --git a/backgroundserial.py b/backgroundserial.py new file mode 100644 index 0000000..d96858b --- /dev/null +++ b/backgroundserial.py @@ -0,0 +1,158 @@ +from __future__ import print_function + +import threading +import Queue +import sys +import time +from BrewPiUtil import printStdErr +from BrewPiUtil import logMessage +from serial import SerialException + +class BackGroundSerial(): + def __init__(self, serial_port): + self.buffer = '' + self.ser = serial_port + self.queue = Queue.Queue() + self.thread = None + self.error = False + self.fatal_error = None + self.run = False + + # public interface only has 4 functions: start/stop/read_line/write + def start(self): + self.ser.writeTimeout = 1 # makes sure an exception is raised when serial is lost + self.run = True + if not self.thread: + self.thread = threading.Thread(target=self.__listenThread) + self.thread.setDaemon(True) + self.thread.start() + + def stop(self): + self.run = False + + def read_line(self): + self.exit_on_fatal_error() + try: + return self.queue.get_nowait() + except Queue.Empty: + return None + + def write(self, data): + self.exit_on_fatal_error() + # prevent writing to a port in error state. This will leave unclosed handles to serial on the system + if not self.error: + try: + self.ser.write(data) + except (IOError, OSError, SerialException) as e: + logMessage('Serial Error: {0})'.format(str(e))) + self.error = True + + + def exit_on_fatal_error(self): + if self.fatal_error is not None: + self.thread.join() # wait for background thread to terminate + logMessage(self.fatal_error) + if self.ser is not None: + self.ser.close() + del self.ser # this helps to fully release the port to the OS + sys.exit("Terminating due to fatal serial error") + + def __listenThread(self): + lastReceive = time.time() + while self.run : + in_waiting = None + new_data = None + if not self.error: + try: + in_waiting = self.ser.inWaiting() + if in_waiting > 0: + new_data = self.ser.read(in_waiting) + lastReceive = time.time() + except (IOError, OSError, SerialException) as e: + logMessage('Serial Error: {0})'.format(str(e))) + self.error = True + + if new_data: + self.buffer = self.buffer + new_data + line = self.__get_line_from_buffer() + if line: + self.queue.put(line) + + if self.error: + try: + # try to restore serial by closing and opening again + self.ser.close() + self.ser.open() + self.error = False + except (ValueError, OSError, SerialException) as e: + if self.ser.isOpen(): + self.ser.flushInput() # will help to close open handles + self.ser.flushOutput() # will help to close open handles + self.ser.close() + self.fatal_error = 'Lost serial connection. Error: {0})'.format(str(e)) + self.run = False + + # max 10 ms delay. At baud 57600, max 576 characters are received while waiting + time.sleep(0.01) + + def __get_line_from_buffer(self): + while '\n' in self.buffer: + lines = self.buffer.partition('\n') # returns 3-tuple with line, separator, rest + if(lines[1] == ''): + # '\n' not found, first element is incomplete line + self.buffer = lines[0] + return None + else: + # complete line received, [0] is complete line [1] is separator [2] is the rest + self.buffer = lines[2] + return self.__asciiToUnicode(lines[0]) + + # remove extended ascii characters from string, because they can raise UnicodeDecodeError later + def __asciiToUnicode(self, s): + s = s.replace(chr(0xB0), '°') + return unicode(s, 'ascii', 'ignore') + +if __name__ == '__main__': + # some test code that requests data from serial and processes the response json + import simplejson + import time + import BrewPiUtil as util + + config_file = util.addSlash(sys.path[0]) + 'settings/config.cfg' + config = util.readCfgWithDefaults(config_file) + ser = util.setupSerial(config, time_out=0) + if not ser: + printStdErr("Could not open Serial Port") + exit() + + bg_ser = BackGroundSerial(ser) + bg_ser.start() + + success = 0 + fail = 0 + for i in range(1, 5): + # request control variables 4 times. This would overrun buffer if it was not read in a background thread + # the json decode will then fail, because the message is clipped + bg_ser.write('v') + bg_ser.write('v') + bg_ser.write('v') + bg_ser.write('v') + bg_ser.write('v') + line = True + while(line): + line = bg_ser.read_line() + if line: + if line[0] == 'V': + try: + decoded = simplejson.loads(line[2:]) + print("Success") + success += 1 + except simplejson.JSONDecodeError: + logMessage("Error: invalid JSON parameter string received: " + line) + fail += 1 + else: + print(line) + time.sleep(5) + + print("Successes: {0}, Fails: {1}".format(success,fail)) + diff --git a/brewpi.py b/brewpi.py index ad5947c..30ea24b 100644 --- a/brewpi.py +++ b/brewpi.py @@ -18,8 +18,8 @@ from __future__ import print_function import sys -def printStdErr(*objs): - print("", *objs, file=sys.stderr) +from BrewPiUtil import printStdErr +from BrewPiUtil import logMessage # Check needed software dependencies to nudge users to fix their setup if sys.version_info < (2, 7): @@ -72,6 +72,7 @@ def printStdErr(*objs): import pinList import expandLogMessage import BrewPiProcess +from backgroundserial import BackGroundSerial # Settings will be read from controller, initialize with same defaults as controller @@ -86,16 +87,13 @@ def printStdErr(*objs): cc = dict() # Control variables (json string, sent directly to browser without decoding) -cv = "" +cv = "{}" # listState = "", "d", "h", "dh" to reflect whether the list is up to date for installed (d) and available (h) deviceList = dict(listState="", installed=[], available=[]) lcdText = ['Script starting up', ' ', ' ', ' '] -def logMessage(message): - printStdErr(time.strftime("%b %d %Y %H:%M:%S ") + message) - # Read in command line arguments try: opts, args = getopt.getopt(sys.argv[1:], "hc:sqkfld", @@ -334,33 +332,6 @@ def resumeLogging(): if not ser: exit(1) -serialBuffer = '' - -def lineFromSerial(ser): - global serialBuffer - inWaiting = None - newData = None - try: - inWaiting = ser.inWaiting() - if inWaiting > 0: - newData = ser.read(inWaiting) - except (IOError, OSError, SerialException) as e: - logMessage('Serial Error: {0})'.format(str(e))) - return - if newData: - serialBuffer = serialBuffer + newData - if '\n' in serialBuffer: - lines = serialBuffer.partition('\n') # returns 3-tuple with line, separator, rest - if(lines[1] == ''): - # '\n' not found, first element is incomplete line - serialBuffer = lines[0] - return None - else: - # complete line received, [0] is complete line [1] is separator [2] is the rest - serialBuffer = lines[2] - return util.asciiToUnicode(lines[0]) - - logMessage("Notification: Script started for beer '" + urllib.unquote(config['beerName']) + "'") # wait for 10 seconds to allow an Uno to reboot (in case an Uno is being used) time.sleep(float(config.get('startupDelay', 10))) @@ -387,11 +358,18 @@ def lineFromSerial(ser): "controller version = " + str(hwVersion.log) + ", local copy version = " + str(expandLogMessage.getVersion())) +bg_ser = None + if hwVersion is not None: ser.flush() + + # set up background serial processing, which will continuously read data from serial and put whole lines in a queue + bg_ser = BackGroundSerial(ser) + bg_ser.start() # request settings from controller, processed later when reply is received - ser.write('s') # request control settings cs - ser.write('c') # request control constants cc + bg_ser.write('s') # request control settings cs + bg_ser.write('c') # request control constants cc + bg_ser.write('v') # request control variables cv # answer from controller is received asynchronously later. # create a listening socket to communicate with PHP @@ -499,19 +477,19 @@ def renameTempKey(key): elif messageType == "getControlVariables": conn.send(cv) elif messageType == "refreshControlConstants": - ser.write("c") + bg_ser.write("c") raise socket.timeout elif messageType == "refreshControlSettings": - ser.write("s") + bg_ser.write("s") raise socket.timeout elif messageType == "refreshControlVariables": - ser.write("v") + bg_ser.write("v") raise socket.timeout elif messageType == "loadDefaultControlSettings": - ser.write("S") + bg_ser.write("S") raise socket.timeout elif messageType == "loadDefaultControlConstants": - ser.write("C") + bg_ser.write("C") raise socket.timeout elif messageType == "setBeer": # new constant beer temperature received try: @@ -523,7 +501,7 @@ def renameTempKey(key): cs['mode'] = 'b' # round to 2 dec, python will otherwise produce 6.999999999 cs['beerSet'] = round(newTemp, 2) - ser.write("j{mode:b, beerSet:" + json.dumps(cs['beerSet']) + "}") + bg_ser.write("j{mode:b, beerSet:" + json.dumps(cs['beerSet']) + "}") logMessage("Notification: Beer temperature set to " + str(cs['beerSet']) + " degrees in web interface") @@ -538,7 +516,7 @@ def renameTempKey(key): cs['mode'] = 'f' cs['fridgeSet'] = round(newTemp, 2) - ser.write("j{mode:f, fridgeSet:" + json.dumps(cs['fridgeSet']) + "}") + bg_ser.write("j{mode:f, fridgeSet:" + json.dumps(cs['fridgeSet']) + "}") logMessage("Notification: Fridge temperature set to " + str(cs['fridgeSet']) + " degrees in web interface") @@ -546,14 +524,14 @@ def renameTempKey(key): elif messageType == "setOff": # cs['mode'] set to OFF cs['mode'] = 'o' - ser.write("j{mode:o}") + bg_ser.write("j{mode:o}") logMessage("Notification: Temperature control disabled") raise socket.timeout elif messageType == "setParameters": # receive JSON key:value pairs to set parameters on the controller try: decoded = json.loads(value) - ser.write("j" + json.dumps(decoded)) + bg_ser.write("j" + json.dumps(decoded)) if 'tempFormat' in decoded: changeWwwSetting('tempFormat', decoded['tempFormat']) # change in web interface settings too. except json.JSONDecodeError: @@ -636,10 +614,11 @@ def renameTempKey(key): conn.send("Profile successfully updated") if cs['mode'] is not 'p': cs['mode'] = 'p' - ser.write("j{mode:p}") + bg_ser.write("j{mode:p}") logMessage("Notification: Profile mode enabled") raise socket.timeout # go to serial communication to update controller elif messageType == "programController" or messageType == "programArduino": + bg_ser.stop() ser.close() # close serial port before programming ser = None try: @@ -662,11 +641,11 @@ def renameTempKey(key): elif messageType == "refreshDeviceList": deviceList['listState'] = "" # invalidate local copy if value.find("readValues") != -1: - ser.write("d{r:1}") # request installed devices - ser.write("h{u:-1,v:1}") # request available, but not installed devices + bg_ser.write("d{r:1}") # request installed devices + bg_ser.write("h{u:-1,v:1}") # request available, but not installed devices else: - ser.write("d{}") # request installed devices - ser.write("h{u:-1}") # request available, but not installed devices + bg_ser.write("d{}") # request installed devices + bg_ser.write("h{u:-1}") # request available, but not installed devices elif messageType == "getDeviceList": if deviceList['listState'] in ["dh", "hd"]: response = dict(board=hwVersion.board, @@ -682,7 +661,7 @@ def renameTempKey(key): except json.JSONDecodeError: logMessage("Error: invalid JSON parameter string received: " + value) continue - ser.write("U" + json.dumps(configStringJson)) + bg_ser.write("U" + json.dumps(configStringJson)) deviceList['listState'] = "" # invalidate local copy elif messageType == "writeDevice": try: @@ -690,7 +669,7 @@ def renameTempKey(key): except json.JSONDecodeError: logMessage("Error: invalid JSON parameter string received: " + value) continue - ser.write("d" + json.dumps(configStringJson)) + bg_ser.write("d" + json.dumps(configStringJson)) elif messageType == "getVersion": if hwVersion: response = hwVersion.__dict__ @@ -702,7 +681,7 @@ def renameTempKey(key): conn.send(response_str) elif messageType == "resetController": logMessage("Resetting controller to factory defaults") - ser.write("E") + bg_ser.write("E") else: logMessage("Error: Received invalid message on socket: " + message) @@ -722,17 +701,17 @@ def renameTempKey(key): if(time.time() - prevLcdUpdate) > 5: # request new LCD text prevLcdUpdate += 5 # give the controller some time to respond - ser.write('l') + bg_ser.write('l') if(time.time() - prevSettingsUpdate) > 60: # Request Settings from controller to stay up to date # Controller should send updates on changes, this is a periodical update to ensure it is up to date prevSettingsUpdate += 5 # give the controller some time to respond - ser.write('s') + bg_ser.write('s') # if no new data has been received for serialRequestInteval seconds if (time.time() - prevDataTime) >= float(config['interval']): - ser.write("t") # request new from controller + bg_ser.write("t") # request new from controller prevDataTime += 5 # give the controller some time to respond to prevent requesting twice elif (time.time() - prevDataTime) > float(config['interval']) + 2 * float(config['interval']): @@ -741,7 +720,7 @@ def renameTempKey(key): while True: - line = lineFromSerial(ser) + line = bg_ser.read_line() if line is None: break try: @@ -837,14 +816,15 @@ def renameTempKey(key): if newTemp != cs['beerSet']: cs['beerSet'] = newTemp # if temperature has to be updated send settings to controller - ser.write("j{beerSet:" + json.dumps(cs['beerSet']) + "}") + bg_ser.write("j{beerSet:" + json.dumps(cs['beerSet']) + "}") except socket.error as e: logMessage("Socket error(%d): %s" % (e.errno, e.strerror)) traceback.print_exc() if ser: - ser.close() # close port + if ser.isOpen(): + ser.close() # close port if conn: conn.shutdown(socket.SHUT_RDWR) # close socket conn.close() diff --git a/brewpiVersion.py b/brewpiVersion.py index 6aff311..9503791 100644 --- a/brewpiVersion.py +++ b/brewpiVersion.py @@ -26,6 +26,9 @@ def getVersionFromSerial(ser): retries = 0 oldTimeOut = ser.timeout startTime = time.time() + if not ser.isOpen(): + print "Cannot get version from serial port that is not open." + ser.setTimeout(1) ser.write('n') # request version info while retries < 10: