diff --git a/continuousprint/__init__.py b/continuousprint/__init__.py index 85eece7..19bb907 100644 --- a/continuousprint/__init__.py +++ b/continuousprint/__init__.py @@ -15,9 +15,11 @@ import octoprint.filemanager from octoprint.filemanager.util import StreamWrapper from octoprint.filemanager.destinations import FileDestinations +from octoprint.util import RepeatedTimer + from .print_queue import PrintQueue, QueueItem -from .driver import ContinuousPrintDriver +from .driver import ContinuousPrintDriver, Action as DA, Printer as DP QUEUE_KEY = "cp_queue" CLEARING_SCRIPT_KEY = "cp_bed_clearing_script" @@ -82,6 +84,10 @@ def get_settings_defaults(self): d[BED_COOLDOWN_TIMEOUT_KEY] = 60 return d + def _active(self): + return self.d.state != self.d._state_inactive if hasattr(self, "d") else False + + def _rm_temp_files(self): # Clean up any file references from prior runs for path in TEMP_FILES.values(): @@ -108,25 +114,40 @@ def on_after_startup(self): self.q = PrintQueue(self._settings, QUEUE_KEY) self.d = ContinuousPrintDriver( queue=self.q, - finish_script_fn=self.run_finish_script, - clear_bed_fn=self.clear_bed, - start_print_fn=self.start_print, - cancel_print_fn=self.cancel_print, + script_runner=self, logger=self._logger, ) + self.update(DA.DEACTIVATE) # Initializes and passes printer state self._update_driver_settings() self._rm_temp_files() self.next_pause_is_spaghetti = False + + # It's possible to miss events or for some weirdness to occur in conditionals. Adding a watchdog + # timer with a periodic tick ensures that the driver knows what the state of the printer is. + self.watchdog = RepeatedTimer(5.0, lambda: self.update(DA.TICK)) + self.watchdog.start() self._logger.info("Continuous Print Plugin started") + def update(self, a: DA): + # Access current file via `get_current_job` instead of `is_current_file` because the latter may go away soon + # See https://docs.octoprint.org/en/master/modules/printer.html#octoprint.printer.PrinterInterface.is_current_file + # Avoid using payload.get('path') as some events may not express path info. + path = self._printer.get_current_job().get("file", {}).get("name") + pstate = self._printer.get_state_id() + p = DP.BUSY + if pstate == "OPERATIONAL": + p = DP.IDLE + elif pstate == "PAUSED": + p = DP.PAUSED + + if self.d.action(a, p, path): + self._msg(type="reload") # Reload UI when new state is added + # part of EventHandlerPlugin def on_event(self, event, payload): if not hasattr(self, "d"): # Ignore any messages arriving before init return - # Access current file via `get_current_job` instead of `is_current_file` because the latter may go away soon - # See https://docs.octoprint.org/en/master/modules/printer.html#octoprint.printer.PrinterInterface.is_current_file - # Avoid using payload.get('path') as some events may not express path info. current_file = self._printer.get_current_job().get("file", {}).get("name") is_current_path = current_file == self.d.current_path() is_finish_script = current_file == TEMP_FILES[FINISHED_SCRIPT_KEY] @@ -146,23 +167,17 @@ def on_event(self, event, payload): if self._printer.is_current_file(path, sd=False): return self._rm_temp_files() - elif (is_current_path or is_finish_script) and event == Events.PRINT_DONE: - self.d.on_print_success(is_finish_script) - self.paused = False - self._msg(type="reload") # reload UI - elif ( - is_current_path - and event == Events.PRINT_FAILED - and payload["reason"] != "cancelled" - ): + elif event == Events.PRINT_DONE: + self.update(DA.SUCCESS) + elif event == Events.PRINT_FAILED: # Note that cancelled events are already handled directly with Events.PRINT_CANCELLED - self.d.on_print_failed() - self.paused = False - self._msg(type="reload") # reload UI - elif is_current_path and event == Events.PRINT_CANCELLED: - self.d.on_print_cancelled(initiator=payload.get('user', None)) - self.paused = False - self._msg(type="reload") # reload UI + self.update(DA.FAILURE) + elif event == Events.PRINT_CANCELLED: + print(payload.get('user')) + if payload.get('user') is not None: + self.update(DA.DEACTIVATE) + else: + self.update(DA.TICK) elif ( is_current_path and tsd_command is not None @@ -170,40 +185,20 @@ def on_event(self, event, payload): and payload.get("cmd") == "pause" and payload.get("initiator") == "system" ): - self._logger.info( - "Got spaghetti detection event; flagging next pause event for restart" - ) - self.next_pause_is_spaghetti = True + self.update(DA.SPAGHETTI) elif is_current_path and event == Events.PRINT_PAUSED: - self.d.on_print_paused( - is_temp_file=(payload["path"] in TEMP_FILES.values()), - is_spaghetti=self.next_pause_is_spaghetti, - ) - self.next_pause_is_spaghetti = False - self.paused = True - self._msg(type="reload") # reload UI + self.update(DA.TICK) elif is_current_path and event == Events.PRINT_RESUMED: - self.d.on_print_resumed() - self.paused = False - self._msg(type="reload") + self.update(DA.TICK) elif ( event == Events.PRINTER_STATE_CHANGED and self._printer.get_state_id() == "OPERATIONAL" ): - self._msg(type="reload") # reload UI + self.update(DA.TICK) elif event == Events.UPDATED_FILES: self._msg(type="updatefiles") elif event == Events.SETTINGS_UPDATED: self._update_driver_settings() - # Play out actions until printer no longer in a state where we can run commands - # Note that PAUSED state is respected so that gcode can include `@pause` commands. - # See https://docs.octoprint.org/en/master/features/atcommands.html - while ( - self._printer.get_state_id() == "OPERATIONAL" - and self.d.pending_actions() > 0 - ): - self._logger.warning("on_printer_ready") - self.d.on_printer_ready() def _write_temp_gcode(self, key): gcode = self._settings.get([key]) @@ -221,6 +216,7 @@ def run_finish_script(self): self._msg("Print Queue Complete", type="complete") path = self._write_temp_gcode(FINISHED_SCRIPT_KEY) self._printer.select_file(path, sd=False, printAfterSelect=True) + return path def cancel_print(self): self._msg("Print cancelled", type="error") @@ -252,6 +248,7 @@ def clear_bed(self): self.wait_for_bed_cooldown() path = self._write_temp_gcode(CLEARING_SCRIPT_KEY) self._printer.select_file(path, sd=False, printAfterSelect=True) + return path def start_print(self, item, clear_bed=True): self._msg("Starting print: " + item.name) @@ -264,6 +261,7 @@ def start_print(self, item, clear_bed=True): self._msg("File not found: " + item.path, type="error") except InvalidFileType: self._msg("File not gcode: " + item.path, type="error") + return item.path def state_json(self, extra_message=None): # Values are stored json-serialized, so we need to create a json string and inject them into it @@ -278,19 +276,18 @@ def state_json(self, extra_message=None): # IMPORTANT: Non-additive changes to this response string must be released in a MAJOR version bump # (e.g. 1.4.1 -> 2.0.0). resp = '{"active": %s, "status": "%s", "queue": %s%s}' % ( - "true" if hasattr(self, "d") and self.d.active else "false", + "true" if self._active() else "false", "Initializing" if not hasattr(self, "d") else self.d.status, q, extra_message, ) return resp - # Listen for resume from printer ("M118 //action:queuego"), only act if actually paused. #from @grtrenchman + # Listen for resume from printer ("M118 //action:queuego") #from @grtrenchman def resume_action_handler(self, comm, line, action, *args, **kwargs): if not action == "queuego": return - if self.paused: - self.d.set_active() + self.update(DA.ACTIVATE) # Public API method returning the full state of the plugin in JSON format. # See `state_json()` for return values. @@ -308,10 +305,7 @@ def set_active(self): if not Permissions.PLUGIN_CONTINUOUSPRINT_STARTQUEUE.can(): return flask.make_response("Insufficient Rights", 403) self._logger.info("attempt failed due to insufficient permissions.") - self.d.set_active( - flask.request.form["active"] == "true", - printer_ready=(self._printer.get_state_id() == "OPERATIONAL"), - ) + self.update(DA.ACTIVATE if flask.request.form["active"] == "true" else DA.DEACTIVATE) return self.state_json() # PRIVATE API method - may change without warning. diff --git a/continuousprint/driver.py b/continuousprint/driver.py index 013fa56..4983370 100644 --- a/continuousprint/driver.py +++ b/continuousprint/driver.py @@ -1,5 +1,18 @@ import time +from enum import Enum, auto +class Action(Enum): + ACTIVATE = auto() + DEACTIVATE = auto() + SUCCESS = auto() + FAILURE = auto() + SPAGHETTI = auto() + TICK = auto() + +class Printer(Enum): + IDLE = auto() + PAUSED = auto() + BUSY = auto() # Inspired by answers at # https://stackoverflow.com/questions/6108819/javascript-timestamp-to-relative-time @@ -16,31 +29,201 @@ class ContinuousPrintDriver: def __init__( self, queue, - finish_script_fn, - clear_bed_fn, - start_print_fn, - cancel_print_fn, + script_runner, logger, ): + self._logger = logger + self.status = None + self._set_status("Initializing") self.q = queue - self.active = False + self.state = self._state_unknown self.retries = 0 - self._logger = logger self.retry_on_pause = False self.max_retries = 0 self.retry_threshold_seconds = 0 self.first_print = True - self.actions = [] + self._runner = script_runner + self._intent = None # Intended file path + self._update_ui = False + + def action(self, a: Action, p: Printer, path: str=None): + self._logger.debug(f"{a.name}, {p.name}, path={path}") + if path is not None: + self._cur_path = path + nxt = self.state(a, p) + if nxt is not None: + self._logger.info(f"{self.state.__name__} -> {nxt.__name__}") + self.state = nxt + self._update_ui = True + + + if self._update_ui: + self._update_ui = False + return True + return False + + def _state_unknown(self, a: Action, p: Printer): + if a == Action.DEACTIVATE: + return self._state_inactive + + def _state_inactive(self, a: Action, p: Printer): + self.retries = 0 + + if a == Action.ACTIVATE: + if p != Printer.IDLE: + return self._state_printing + else: + # TODO "clear bed on startup" setting + return self._state_start_print + + if p == Printer.IDLE: + self._set_status("Inactive (click Start Managing)") + else: + self._set_status("Inactive (active print continues unmanaged)") + + + def _state_start_print(self, a: Action, p: Printer): + if a == Action.DEACTIVATE: + return self._state_inactive + + if p != Printer.IDLE: + self._set_status("Waiting for printer to be ready") + return + + # The next print may not be the *immediately* next print + # e.g. if we skip over a print or start mid-print + idx = self._next_available_idx() + if idx is not None: + p = self.q[idx] + p.start_ts = int(time.time()) + p.end_ts = None + p.retries = self.retries + self.q[idx] = p + self._intent = self._runner.start_print(p) + return self._state_printing + else: + return self._state_inactive + + + def _elapsed(self): + return (time.time() - self.q[self._cur_idx()].start_ts) + + def _state_printing(self, a: Action, p: Printer, elapsed=None): + if a == Action.DEACTIVATE: + return self._state_inactive + elif a == Action.FAILURE: + return self._state_failure + elif a == Action.SPAGHETTI: + elapsed = self._elapsed() + if self.retry_on_pause and elapsed < self.retry_threshold_seconds: + return self._state_spaghetti_recovery + else: + self._set_status( + f"Print paused {timeAgo(elapsed)} into print (over auto-restart threshold of {timeAgo(self.retry_threshold_seconds)}); awaiting user input" + ) + return self._state_paused + elif a == Action.SUCCESS: + return self._state_success + + if p == Printer.BUSY: + idx = self._cur_idx() + if idx is not None: + self._set_status(f"Printing {self.q[idx].name}") + elif p == Printer.PAUSED: + return self._state_paused + elif p == Printer.IDLE: # Idle state without event; assume success + return self._state_success + + def _state_paused(self, a: Action, p: Printer): + self._set_status("Queue paused") + if a == Action.DEACTIVATE or p == Printer.IDLE: + return self._state_inactive + elif p == Printer.BUSY: + return self._state_printing + + def _state_spaghetti_recovery(self, a: Action, p: Printer): + self._set_status(f"Cancelling print (spaghetti seen early in print)") + if p == Printer.PAUSED: + self._runner.cancel_print() + self._intent = None + return self._state_failure + + def _state_failure(self, a: Action, p: Printer): + if p != Printer.IDLE: + return + + if self.retries + 1 < self.max_retries: + self.retries += 1 + return self._state_start_clearing + else: + idx = self._cur_idx() + if idx is not None: + self._complete_item(idx, "failure") + return self._state_inactive + + def _state_success(self, a: Action, p: Printer): + idx = self._cur_idx() + + # Complete prior queue item if that's what we just finished + if idx is not None: + path = self.q[idx].path + if self._intent == path and self._cur_path == path: + self._complete_item(idx, "success") + else: + self._logger.info(f"Current queue item {path} not matching intent {self._intent}, current path {self._cur_path} - no completion") + self.retries = 0 + + # Clear bed if we have a next queue item, otherwise run finishing script + idx = self._next_available_idx() + if idx is not None: + return self._state_start_clearing + else: + return self._state_start_finishing + + def _state_start_clearing(self, a: Action, p: Printer): + if a == Action.DEACTIVATE: + return self._state_inactive + if p != Printer.IDLE: + self._set_status("Waiting for printer to be ready") + return + + self._intent = self._runner.clear_bed() + return self._state_clearing + + def _state_clearing(self, a: Action, p: Printer): + if a == Action.DEACTIVATE: + return self._state_inactive + if p != Printer.IDLE: + return + + self._set_status("Clearing bed") + return self._state_start_print + + def _state_start_finishing(self, a: Action, p: Printer): + if a == Action.DEACTIVATE: + return self._state_inactive + if p != Printer.IDLE: + self._set_status("Waiting for printer to be ready") + return + + self._intent = self._runner.run_finish_script() + return self._state_finishing - self.finish_script_fn = finish_script_fn - self.clear_bed_fn = clear_bed_fn - self.start_print_fn = start_print_fn - self.cancel_print_fn = cancel_print_fn - self._set_status("Initialized (click Start Managing to run the queue)") + def _state_finishing(self, a: Action, p: Printer): + if a == Action.DEACTIVATE: + return self._state_inactive + if p != Printer.IDLE: + return + + self._set_status("Finising up") + + return self._state_inactive def _set_status(self, status): - self.status = status - self._logger.info(status) + if status != self.status: + self._update_ui = True + self.status = status + self._logger.info(status) def set_retry_on_pause( self, enabled, max_retries=3, retry_threshold_seconds=60 * 60 @@ -48,27 +231,10 @@ def set_retry_on_pause( self.retry_on_pause = enabled self.max_retries = max_retries self.retry_threshold_seconds = retry_threshold_seconds - self._logger.info( + self._logger.debug( f"Retry on pause: {enabled} (max_retries {max_retries}, threshold {retry_threshold_seconds}s)" ) - def set_active(self, active=True, printer_ready=True): - if active and not self.active: - self.active = True - self.first_print = True - self.retries = 0 - if not printer_ready: - self._set_status("Waiting for printer to be ready") - else: - self._begin_next_available_print() - self.on_printer_ready() - elif self.active and not active: - self.active = False - if not printer_ready: - self._set_status("Inactive (active prints continue unmanaged)") - else: - self._set_status("Inactive (ready - click Start Managing)") - def _cur_idx(self): for (i, item) in enumerate(self.q): if item.start_ts is not None and item.end_ts is None: @@ -85,126 +251,9 @@ def _next_available_idx(self): return i return None - def _begin_next_available_print(self): - # The next print may not be the *immediately* next print - # e.g. if we skip over a print or start mid-print - idx = self._next_available_idx() - if idx is not None: - p = self.q[idx] - p.start_ts = int(time.time()) - p.end_ts = None - p.retries = self.retries - self.q[idx] = p - if not self.first_print: - self.actions.append(self._clear_bed) - self.actions.append(lambda: self._start_print(p)) - self.first_print = False - else: - self.actions.append(self._finish) - - def _finish(self): - self._set_status("Running finish script") - self.finish_script_fn() - - def _clear_bed(self): - self._set_status("Running bed clearing script") - self.clear_bed_fn() - - def _start_print(self, p): - if self.retries > 0: - self._set_status( - f"Printing {p.name} (attempt {self.retries+1}/{self.max_retries})" - ) - else: - self._set_status(f"Printing {p.name}") - self.start_print_fn(p) - def _complete_item(self, idx, result): + self._logger.debug(f"Completing q[{idx}] - {result}") item = self.q[idx] item.end_ts = int(time.time()) item.result = result self.q[idx] = item # TODO necessary? - - def pending_actions(self): - return len(self.actions) - - def on_printer_ready(self): - if len(self.actions) > 0: - a = self.actions.pop(0) - self._logger.info("Printer ready; performing next action %s" % a.__repr__()) - a() - return True - else: - return False - - def on_print_success(self, is_finish_script=False): - if not self.active: - return - - idx = self._cur_idx() - if idx is not None: - self._complete_item(idx, "success") - - self.retries = 0 - - if is_finish_script: - self.set_active(False) - else: - self.actions.append(self._begin_next_available_print) - - def on_print_failed(self): - if not self.active: - return - self._complete_item(self._cur_idx(), "failure") - self.active = False - self._set_status("Inactive (print failed)") - self.first_print = True - - def on_print_cancelled(self, initiator): - self.first_print = True - if not self.active: - return - - idx = self._cur_idx() - - if initiator is not None: - self._complete_item(idx, f"failure (user {initiator} cancelled)") - self.active = False - self._set_status(f"Inactive (print cancelled by user {initiator})") - elif self.retries + 1 < self.max_retries: - self.retries += 1 - self.actions.append( - self._begin_next_available_print - ) # same print, not finished - else: - self._complete_item(idx, "failure (max retries)") - self.active = False - self._set_status("Inactive (print cancelled with too many retries)") - - def on_print_paused(self, elapsed=None, is_temp_file=False, is_spaghetti=False): - if ( - not self.active - or not self.retry_on_pause - or is_temp_file - or not is_spaghetti - ): - self._set_status("Print paused") - return - - elapsed = elapsed or (time.time() - self.q[self._cur_idx()].start_ts) - if elapsed < self.retry_threshold_seconds: - self._set_status( - "Cancelling print (spaghetti detected {timeAgo(elapsed)} into print)" - ) - self.cancel_print_fn() - # self.actions.append(self.cancel_print_fn) - else: - self._set_status( - f"Print paused {timeAgo(elapsed)} into print (over auto-restart threshold of {timeAgo(self.retry_threshold_seconds)}); awaiting user input" - ) - - def on_print_resumed(self): - # This happens after pause & manual resume - idx = self._cur_idx() - if idx is not None: - self._set_status(f"Printing {self.q[idx].name}") diff --git a/continuousprint/driver_test.py b/continuousprint/driver_test.py index 1d6ca9f..64c8f21 100644 --- a/continuousprint/driver_test.py +++ b/continuousprint/driver_test.py @@ -1,13 +1,22 @@ import unittest from unittest.mock import MagicMock from print_queue import PrintQueue, QueueItem -from driver import ContinuousPrintDriver +from driver import ContinuousPrintDriver, Action as DA, Printer as DP from mock_settings import MockSettings import logging logging.basicConfig(level=logging.DEBUG) +class Runner(): + def __init__(self): + self.run_finish_script = MagicMock() + self.start_print = MagicMock() + self.cancel_print = MagicMock() + self.clear_bed = MagicMock() + + + def setupTestQueueAndDriver(self, num_complete): self.s = MockSettings("q") self.q = PrintQueue(self.s, "q") @@ -25,18 +34,11 @@ def setupTestQueueAndDriver(self, num_complete): ) self.d = ContinuousPrintDriver( queue=self.q, - finish_script_fn=MagicMock(), - start_print_fn=MagicMock(), - cancel_print_fn=MagicMock(), - clear_bed_fn=MagicMock(), + script_runner=Runner(), logger=logging.getLogger(), ) self.d.set_retry_on_pause(True) - - -def flush(d): - while d.pending_actions() > 0: - d.on_printer_ready() + self.d.action(DA.DEACTIVATE, DP.IDLE) class TestQueueManagerFromInitialState(unittest.TestCase): @@ -44,155 +46,194 @@ def setUp(self): setupTestQueueAndDriver(self, 0) def test_activate_not_printing(self): - self.d.set_active() - flush(self.d) - self.d.start_print_fn.assert_called_once() - self.assertEqual(self.d.start_print_fn.call_args[0][0], self.q[0]) + self.d.action(DA.ACTIVATE, DP.IDLE) + self.d.action(DA.TICK, DP.IDLE) + self.d._runner.start_print.assert_called_once() + self.assertEqual(self.d._runner.start_print.call_args[0][0], self.q[0]) + self.assertEqual(self.d.state, self.d._state_printing) def test_activate_already_printing(self): - self.d.set_active(printer_ready=False) - self.d.start_print_fn.assert_not_called() + self.d.action(DA.ACTIVATE, DP.BUSY) + self.d.action(DA.TICK, DP.BUSY) + self.d._runner.start_print.assert_not_called() + self.assertEqual(self.d.state, self.d._state_printing) def test_events_cause_no_action_when_inactive(self): def assert_nocalls(): - self.d.finish_script_fn.assert_not_called() - self.d.start_print_fn.assert_not_called() - - self.d.on_print_success() - assert_nocalls() - self.d.on_print_failed() - assert_nocalls() - self.d.on_print_cancelled(initiator=None) - assert_nocalls() - self.d.on_print_paused(0) - assert_nocalls() - self.d.on_print_resumed() - assert_nocalls() + self.d._runner.run_finish_script.assert_not_called() + self.d._runner.start_print.assert_not_called() + + for p in [DP.IDLE, DP.BUSY, DP.PAUSED]: + for a in [DA.SUCCESS, DA.FAILURE, DA.TICK, DA.DEACTIVATE, DA.SPAGHETTI]: + self.d.action(a, p) + assert_nocalls() + self.assertEqual(self.d.state, self.d._state_inactive) def test_completed_print_not_in_queue(self): - self.d.set_active(printer_ready=False) - self.d.on_print_success() - flush(self.d) + self.d.action(DA.ACTIVATE, DP.BUSY) + self.d.action(DA.SUCCESS, DP.IDLE, "otherprint.gcode") # -> success + self.d.action(DA.TICK, DP.IDLE) # -> start_clearing + + self.assertEqual(self.q[0].end_ts, None) # Queue item not completed since print not in queue + self.assertEqual(self.q[0].result, None) + + self.d.action(DA.TICK, DP.IDLE) # -> clearing + self.d.action(DA.SUCCESS, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing # Non-queue print completion while the driver is active # should kick off a new print from the head of the queue - self.d.start_print_fn.assert_called_once() - self.assertEqual(self.d.start_print_fn.call_args[0][0], self.q[0]) - + self.d._runner.start_print.assert_called_once() + self.assertEqual(self.d._runner.start_print.call_args[0][0], self.q[0]) + def test_completed_first_print(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() - - self.d.on_print_success() - flush(self.d) - self.assertEqual(self.d.first_print, False) - self.d.clear_bed_fn.assert_called_once() - self.d.start_print_fn.assert_called_once() - self.assertEqual(self.d.start_print_fn.call_args[0][0], self.q[1]) - + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + self.d._runner.start_print.reset_mock() + + self.d._intent = self.q[0].path + self.d._cur_path = self.d._intent + + self.d.action(DA.SUCCESS, DP.IDLE, self.d._intent) # -> success + self.d.action(DA.TICK, DP.IDLE) # -> start_clearing + self.d.action(DA.TICK, DP.IDLE) # -> clearing + self.d._runner.clear_bed.assert_called_once() + + self.d.action(DA.SUCCESS, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + self.d._runner.start_print.assert_called_once() + self.assertEqual(self.d._runner.start_print.call_args[0][0], self.q[1]) + + def test_start_clearing_waits_for_idle(self): + self.d.state = self.d._state_start_clearing + self.d.action(DA.TICK, DP.BUSY) + self.assertEqual(self.d.state, self.d._state_start_clearing) + self.d._runner.clear_bed.assert_not_called() + self.d.action(DA.TICK, DP.PAUSED) + self.assertEqual(self.d.state, self.d._state_start_clearing) + self.d._runner.clear_bed.assert_not_called() class TestQueueManagerPartiallyComplete(unittest.TestCase): def setUp(self): setupTestQueueAndDriver(self, 1) def test_success(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + self.d._runner.start_print.reset_mock() + + self.d._intent = self.q[1].path + self.d._cur_path = self.d._intent - self.d.on_print_success() - flush(self.d) - self.d.start_print_fn.assert_called_once() - self.assertEqual(self.d.start_print_fn.call_args[0][0], self.q[2]) + self.d.action(DA.SUCCESS, DP.IDLE) # -> success + self.d.action(DA.TICK, DP.IDLE) # -> start_clearing + self.d.action(DA.TICK, DP.IDLE) # -> clearing + self.d.action(DA.TICK, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + self.d._runner.start_print.assert_called_once() + self.assertEqual(self.d._runner.start_print.call_args[0][0], self.q[2]) def test_success_after_queue_prepend_starts_prepended(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + self.d._runner.start_print.reset_mock() n = QueueItem("new", "/new.gco", True) self.q.add([n], idx=0) - self.d.on_print_success() - flush(self.d) - self.d.start_print_fn.assert_called_once - self.assertEqual(self.d.start_print_fn.call_args[0][0], n) + self.d._intent = self.q[1].path + self.d._cur_path = self.d._intent - def test_paused_with_spaghetti_early_triggers_cancel(self): - self.d.set_active() - - self.d.on_print_paused(self.d.retry_threshold_seconds - 1, is_spaghetti=True) - flush(self.d) - self.d.cancel_print_fn.assert_called_once_with() - - def test_paused_manually_early_falls_through(self): - self.d.set_active() + self.d.action(DA.SUCCESS, DP.IDLE) # -> success + self.d.action(DA.TICK, DP.IDLE) # -> start_clearing + self.d.action(DA.TICK, DP.IDLE) # -> clearing + self.d.action(DA.TICK, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + self.d._runner.start_print.assert_called_once + self.assertEqual(self.d._runner.start_print.call_args[0][0], n) - self.d.on_print_paused(self.d.retry_threshold_seconds - 1, is_spaghetti=False) - flush(self.d) - self.d.cancel_print_fn.assert_not_called() + def test_paused_with_spaghetti_early_triggers_cancel(self): + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + + self.d._elapsed = lambda: 10 + self.d.action(DA.SPAGHETTI, DP.BUSY) # -> spaghetti_recovery + self.d.action(DA.TICK, DP.PAUSED) # -> cancel + failure + self.d._runner.cancel_print.assert_called() + self.assertEqual(self.d.state, self.d._state_failure) + + def test_paused_with_spaghetti_late_waits_for_user(self): + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + + self.d._elapsed = lambda: self.d.retry_threshold_seconds + 1 + self.d.action(DA.SPAGHETTI, DP.BUSY) # -> printing (ignore spaghetti) + self.d.action(DA.TICK, DP.PAUSED) # -> paused + self.d._runner.cancel_print.assert_not_called() + self.assertEqual(self.d.state, self.d._state_paused) + + def test_paused_manually_early_waits_for_user(self): + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + + self.d._elapsed = lambda: 10 + self.d.action(DA.TICK, DP.PAUSED) # -> paused + self.d.action(DA.TICK, DP.PAUSED) # stay in paused state + self.d._runner.cancel_print.assert_not_called() + self.assertEqual(self.d.state, self.d._state_paused) + + def test_paused_manually_late_waits_for_user(self): + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + + self.d._elapsed = lambda: 1000 + self.d.action(DA.TICK, DP.PAUSED) # -> paused + self.d.action(DA.TICK, DP.PAUSED) # stay in paused state + self.d._runner.cancel_print.assert_not_called() + self.assertEqual(self.d.state, self.d._state_paused) def test_paused_on_temp_file_falls_through(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() - self.d.on_print_paused(is_temp_file=True, is_spaghetti=True) - self.d.cancel_print_fn.assert_not_called() - self.assertEqual(self.d.pending_actions(), 0) - - def test_user_cancelled_sets_inactive_and_fails_print(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() - - self.d.on_print_cancelled(initiator="admin") - flush(self.d) - self.d.start_print_fn.assert_not_called() - self.assertEqual(self.d.active, False) - self.assertRegex(self.q[1].result, "^failure.*") - - def test_cancelled_triggers_retry(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() - - self.d.on_print_cancelled(initiator=None) - flush(self.d) - self.d.start_print_fn.assert_called_once() - self.assertEqual(self.d.start_print_fn.call_args[0][0], self.q[1]) - self.assertEqual(self.d.retries, 1) - - def test_set_active_clears_retries(self): + self.d.state = self.d._state_clearing # -> clearing + self.d.action(DA.TICK, DP.PAUSED) + self.d._runner.cancel_print.assert_not_called() + self.assertEqual(self.d.state, self.d._state_clearing) + + self.d.state = self.d._state_finishing # -> finishing + self.d.action(DA.TICK, DP.PAUSED) + self.d._runner.cancel_print.assert_not_called() + self.assertEqual(self.d.state, self.d._state_finishing) + + def test_user_deactivate_sets_inactive(self): + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + self.d._runner.start_print.reset_mock() + + self.d.action(DA.DEACTIVATE, DP.IDLE) # -> inactive + self.assertEqual(self.d.state, self.d._state_inactive) + self.d._runner.start_print.assert_not_called() + self.assertEqual(self.q[1].result, None) + + def test_retry_after_failure(self): + self.d.state = self.d._state_failure + self.d.retries = self.d.max_retries-2 + self.d.action(DA.TICK, DP.IDLE) # Start clearing + self.assertEqual(self.d.retries, self.d.max_retries-1) + self.assertEqual(self.d.state, self.d._state_start_clearing) + + def test_activate_clears_retries(self): self.d.retries = self.d.max_retries - 1 - self.d.set_active() + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print self.assertEqual(self.d.retries, 0) - def test_cancelled_with_max_retries_sets_inactive(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() - self.d.retries = self.d.max_retries - - self.d.on_print_cancelled(initiator=None) - flush(self.d) - self.d.start_print_fn.assert_not_called() - self.assertEqual(self.d.active, False) - - def test_paused_late_waits_for_user(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() - - self.d.on_print_paused(self.d.retry_threshold_seconds + 1, is_spaghetti=True) - self.d.start_print_fn.assert_not_called() - - def test_failure_sets_inactive(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() - - self.d.on_print_failed() - flush(self.d) - self.d.start_print_fn.assert_not_called() - self.assertEqual(self.d.active, False) - - def test_resume_sets_status(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() + def test_failure_with_max_retries_sets_inactive(self): + self.d.state = self.d._state_failure + self.d.retries = self.d.max_retries - 1 + self.d.action(DA.TICK, DP.IDLE) # -> inactive + self.assertEqual(self.d.state, self.d._state_inactive) - self.d.on_print_resumed() - self.assertTrue("paused" not in self.d.status.lower()) + def test_resume_from_pause(self): + self.d.state = self.d._state_paused + self.d.action(DA.TICK, DP.BUSY) + self.assertEqual(self.d.state, self.d._state_printing) class TestQueueManagerOnLastPrint(unittest.TestCase): @@ -200,14 +241,22 @@ def setUp(self): setupTestQueueAndDriver(self, 2) def test_completed_last_print(self): - self.d.set_active() - self.d.start_print_fn.reset_mock() - - self.d.on_print_success() - flush(self.d) - self.d.on_print_success(is_finish_script=True) - self.d.finish_script_fn.assert_called_once_with() - self.assertEqual(self.d.active, False) + self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print + self.d.action(DA.TICK, DP.IDLE) # -> printing + self.d._runner.start_print.reset_mock() + + self.d._intent = self.q[-1].path + self.d._cur_path = self.d._intent + + self.d.action(DA.SUCCESS, DP.IDLE, self.d._intent) # -> success + self.d.action(DA.TICK, DP.IDLE) # -> start_finishing + self.d.action(DA.TICK, DP.IDLE) # -> finishing + self.d._runner.run_finish_script.assert_called() + self.assertEqual(self.d.state, self.d._state_finishing) + + self.d.action(DA.TICK, DP.IDLE) # -> inactive + self.assertEqual(self.d.state, self.d._state_inactive) + if __name__ == "__main__":