diff --git a/CHANGELOG b/CHANGELOG index 331ffd3..ef7793b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,7 @@ 0.9.2 - ci: turn tests into a submodule - ref: add pyqtSlot decorators where misssing + - ref: cleanup threads to isolate #14 0.9.1 - fix: don't print upload job when upload is completed in CLI - fix: daemonize background thread in CLI dcoraid-upload-task diff --git a/dcoraid/download/job.py b/dcoraid/download/job.py index a5a8007..adb9167 100644 --- a/dcoraid/download/job.py +++ b/dcoraid/download/job.py @@ -23,7 +23,7 @@ ] -class DownloadJob(object): +class DownloadJob: def __init__(self, api, resource_id, download_path): """Wrapper for resource downloads @@ -78,16 +78,20 @@ def __getstate__(self): } return dj_state - @property - def file_size(self): - return self.get_resource_dict()["size"] - @staticmethod def from_download_job_state(dj_state, api): """Reinstantiate a job from an `DownloadJob.__getstate__` dict """ return DownloadJob(api=api, **dj_state) + @property + def file_size(self): + return self.get_resource_dict()["size"] + + @property + def id(self): + return self.resource_id + @functools.lru_cache(maxsize=100) def get_resource_dict(self): """Return resource dictionary""" @@ -223,7 +227,7 @@ def task_download_resource(self): """Start the download The progress of the download is monitored and written - to attributes. The current status can be retrieved + to the attributes. The current status can be retrieved via :func:`DownloadJob.get_status`. """ if self.state in ["init", "wait-disk"]: @@ -247,7 +251,7 @@ def task_download_resource(self): if shutil.disk_usage(self.path_temp.parent).free < size: # there is not enough space on disk for the download self.set_state("wait-disk") - time.sleep(.2) + time.sleep(1) else: # proceed with download # reset everything diff --git a/dcoraid/download/queue.py b/dcoraid/download/queue.py index 9689d4b..b597f47 100644 --- a/dcoraid/download/queue.py +++ b/dcoraid/download/queue.py @@ -1,4 +1,5 @@ import pathlib +import time import warnings from ..worker import Daemon @@ -97,6 +98,13 @@ def __init__(self, api, path_persistent_job_list=None): def __contains__(self, download_job): return download_job in self.jobs + def __del__(self): + self.daemon_download.shutdown_flag.set() + self.daemon_verify.shutdown_flag.set() + time.sleep(.2) + self.daemon_download.terminate() + self.daemon_verify.terminate() + def __getitem__(self, index): return self.jobs[index] diff --git a/dcoraid/gui/main.py b/dcoraid/gui/main.py index 08368ee..1907cf6 100644 --- a/dcoraid/gui/main.py +++ b/dcoraid/gui/main.py @@ -140,14 +140,6 @@ def close(self): self.timer.stop() if self.panel_upload.widget_jobs.timer is not None: self.panel_upload.widget_jobs.timer.stop() - self.panel_upload.jobs.daemon_compress.join() - self.panel_upload.jobs.daemon_upload.join() - self.panel_upload.jobs.daemon_verify.join() - self.panel_upload.jobs.daemon_compress.terminate() - self.panel_upload.jobs.daemon_upload.terminate() - self.panel_upload.jobs.daemon_verify.terminate() - QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, - 3000) super(DCORAid, self).close() @QtCore.pyqtSlot() @@ -258,10 +250,14 @@ def refresh_login_status(self): text = "{}".format(fullname) tip = "user '{}'".format(name) icon = "user-lock" - self.status_widget.set_status(text=text, - tooltip=tip, - icon=icon, - server=api.server) + try: + self.status_widget.set_status(text=text, + tooltip=tip, + icon=icon, + server=api.server) + except BaseException: + # Probably application killed + pass class StatusWidget(QtWidgets.QWidget): diff --git a/dcoraid/upload/job.py b/dcoraid/upload/job.py index 9387889..c323a2e 100644 --- a/dcoraid/upload/job.py +++ b/dcoraid/upload/job.py @@ -144,6 +144,10 @@ def from_upload_job_state(uj_state, api, cache_dir=None): """ return UploadJob(api=api, cache_dir=cache_dir, **uj_state) + @property + def id(self): + return self.dataset_id + def cleanup(self): """cleanup temporary files in the user's cache directory""" shutil.rmtree(self.cache_dir, ignore_errors=True) @@ -362,7 +366,7 @@ def task_compress_resources(self): # As long as there is less space free than the # input file size, we stall here. self.set_state("wait-disk") - time.sleep(0.2) + time.sleep(1) self.set_state("compress") compress(path_out=path_out, path_in=path) # replace current path_out with compressed path diff --git a/dcoraid/upload/queue.py b/dcoraid/upload/queue.py index 8295a92..2b5fe03 100644 --- a/dcoraid/upload/queue.py +++ b/dcoraid/upload/queue.py @@ -1,4 +1,5 @@ import pathlib +import time import warnings from ..api import APINotFoundError @@ -143,6 +144,15 @@ def __init__(self, api, path_persistent_job_list=None, cache_dir=None): def __contains__(self, upload_job): return upload_job in self.jobs + def __del__(self): + self.daemon_upload.shutdown_flag.set() + self.daemon_verify.shutdown_flag.set() + self.daemon_compress.shutdown_flag.set() + time.sleep(.2) + self.daemon_upload.terminate() + self.daemon_verify.terminate() + self.daemon_compress.terminate() + def __getitem__(self, index): return self.jobs[index] @@ -180,6 +190,7 @@ def abort_job(self, dataset_id): elif job.state == "compress": job.set_state("abort") self.daemon_compress.terminate() + self.daemon_compress.shutdown_flag.set() self.daemon_compress = CompressDaemon(self.jobs) def add_job(self, upload_job): diff --git a/dcoraid/worker/daemon.py b/dcoraid/worker/daemon.py index 2875bc5..5e648dd 100644 --- a/dcoraid/worker/daemon.py +++ b/dcoraid/worker/daemon.py @@ -1,40 +1,35 @@ +import atexit import logging +import threading import traceback import time from ..common import ConnectionTimeoutErrors -from .kthread import KThread +from .kthread import KThread, KThreadExit class Daemon(KThread): def __init__(self, queue, job_trigger_state, job_function_name): """Daemon base class for running uploads/downloads in the background""" self.queue = queue - self.state = "running" self.job_trigger_state = job_trigger_state self.job_function_name = job_function_name super(Daemon, self).__init__() self.daemon = True # We don't have to worry about ending this thread - self.start() - def join(self, *args, **kwargs): - """Join thread by breaking the while loop""" - self.state = "exiting" - super(Daemon, self).join(*args, **kwargs) - assert self.state == "exited" + # The shutdown_flag is a threading.Event object that + # indicates whether the thread should be terminated. + self.shutdown_flag = threading.Event() + + atexit.register(self.shutdown_flag.set) + + self.start() def run(self): - while True: - if self.state == "exiting": - self.state = "exited" - break - elif self.state != "running": - # Don't do anything - time.sleep(.1) - continue - else: + try: + while not self.shutdown_flag.is_set(): # Get the first job that is in the trigger state for job in self.queue: if job.state == self.job_trigger_state: @@ -56,16 +51,26 @@ def run(self): job.traceback = traceback.format_exc(limit=1) \ + "\nDCOR-Aid will retry in 10s!" logger.error( - f"(dataset {job.dataset_id}) {traceback.format_exc()}") + f"(dataset {job.id}) {traceback.format_exc()}") time.sleep(10) job.set_state(self.job_trigger_state) - except SystemExit: + except KThreadExit: job.set_state("abort") - logger.error(f"(dataset {job.dataset_id}) Aborted!") + logger.error(f"{job.__class__.__name__} {job.id} Aborted!") + except SystemExit: + # nothing to do + self.terminate() except BaseException: - # Set job to error state and let the user figure - # out what to do next. - job.set_state("error") - job.traceback = traceback.format_exc() - logger.error( - f"(dataset {job.dataset_id}) {traceback.format_exc()}") + if not self.shutdown_flag.is_set(): + # Only log if the thread is supposed to be running. + # Set job to error state and let the user figure + # out what to do next. + job.set_state("error") + job.traceback = traceback.format_exc() + logger.error( + f"(dataset {job.id}) {traceback.format_exc()}") + except KThreadExit: + # killed by KThread + pass + except SystemExit: + self.terminate() diff --git a/dcoraid/worker/kthread.py b/dcoraid/worker/kthread.py index 13d59a1..4211877 100644 --- a/dcoraid/worker/kthread.py +++ b/dcoraid/worker/kthread.py @@ -1,18 +1,20 @@ """https://github.com/munshigroup/kthread""" +import atexit import ctypes -import inspect -import threading import time +import threading + + +class KThreadExit(BaseException): + pass def _async_raise(tid, exctype): """Raises the exception, causing the thread to exit""" - if not inspect.isclass(exctype): - raise TypeError("Only types can be raised (not instances)") res = ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(tid), ctypes.py_object(exctype)) if res == 0: - raise ValueError("Invalid thread ID") + pass # ignore elif res != 1: # """if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect""" @@ -23,16 +25,20 @@ def _async_raise(tid, exctype): class KThread(threading.Thread): """Killable thread. See terminate() for details.""" + def __init__(self, *args, **kwargs): + super(KThread, self).__init__(*args, **kwargs) + atexit.register(self.terminate) + def _get_my_tid(self): """Determines the instance's thread ID""" if not self.is_alive(): - raise threading.ThreadError("Thread is not active") + return None # Thread is not active # do we have it cached? if hasattr(self, "_thread_id"): return self._thread_id - # no, look for it in the _active dict + # look for it in the _active dict for tid, tobj in threading._active.items(): if tobj is self: self._thread_id = tid @@ -42,7 +48,9 @@ def _get_my_tid(self): def raise_exc(self, exctype): """raises the given exception type in the context of this thread""" - _async_raise(self._get_my_tid(), exctype) + thread_id = self._get_my_tid() + if thread_id: + _async_raise(thread_id, exctype) def terminate(self): """raises SystemExit in the context of the given thread, which should @@ -50,6 +58,7 @@ def terminate(self): # WARNING: using terminate() can introduce instability in your # programs. It is worth noting that terminate() will NOT work if the # thread in question is blocked by a syscall (accept(), recv(), etc.). + atexit.unregister(self.terminate) while self.is_alive(): - self.raise_exc(SystemExit) - time.sleep(0.01) + self.raise_exc(KThreadExit) + time.sleep(.05) diff --git a/tests/conftest.py b/tests/conftest.py index 57c2942..bf8fbe7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -52,7 +52,7 @@ def pytest_configure(config): settings.value("user scenario", "dcor-dev") settings.setValue("auth/server", "dcor-dev.mpl.mpg.de") settings.setValue("auth/api key", common.get_api_key()) - settings.setValue("debug/without timers", 1) + settings.setValue("debug/without timers", "1") settings.sync() # cleanup cleanup_dcoraid_tasks() diff --git a/tests/test_download_queue.py b/tests/test_download_queue.py index d6e9c54..f668798 100644 --- a/tests/test_download_queue.py +++ b/tests/test_download_queue.py @@ -1,5 +1,6 @@ import pathlib import tempfile +import time import pytest @@ -45,8 +46,9 @@ def test_queue_remove_job(): joblist = DownloadQueue(api=api, path_persistent_job_list=pdjl_path) # disable all daemons, so no downloading happens - joblist.daemon_download.join() - joblist.daemon_verify.join() + joblist.daemon_download.shutdown_flag.set() + joblist.daemon_verify.shutdown_flag.set() + time.sleep(.2) resource_id = ds_dict["resources"][0]["id"] dj = joblist.new_job(resource_id=resource_id, download_path=td) @@ -109,8 +111,9 @@ def test_persistent_download_joblist_job_added_in_queue(): pdjl = PersistentDownloadJobList(pdjl_path) uq = DownloadQueue(api=api, path_persistent_job_list=pdjl_path) - uq.daemon_download.join() - uq.daemon_verify.join() + uq.daemon_download.shutdown_flag.set() + uq.daemon_verify.shutdown_flag.set() + time.sleep(.2) assert pdjl.num_queued == 0 uq.add_job(dj) @@ -145,8 +148,9 @@ def test_persistent_download_joblist_skip_queued_resources(): pdjl.immortalize_job(dj) dq = DownloadQueue(api=api, path_persistent_job_list=pdjl_path) - dq.daemon_download.join() - dq.daemon_verify.join() + dq.daemon_download.shutdown_flag.set() + dq.daemon_verify.shutdown_flag.set() + time.sleep(.2) assert len(dq) == 1 assert dq.jobs_eternal.num_queued == 1 diff --git a/tests/test_gui.py b/tests/test_gui.py index e73500a..63a4f9d 100644 --- a/tests/test_gui.py +++ b/tests/test_gui.py @@ -11,7 +11,7 @@ from dcoraid.gui.upload import widget_upload import pytest -from PyQt5 import QtCore, QtWidgets +from PyQt5 import QtCore, QtWidgets, QtTest from PyQt5.QtWidgets import QInputDialog, QMessageBox from . import common @@ -24,8 +24,7 @@ def run_around_tests(): # Run test yield # Make sure that all daemons are gone - QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, - 3000) + QtTest.QTest.qWait(1000) QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 3000) @@ -52,6 +51,8 @@ def test_anonymous(qtbot): ])) try: mw = DCORAid() + qtbot.addWidget(mw) + QtWidgets.QApplication.setActiveWindow(mw) QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 3000) # sanity check assert mw.settings.value("user scenario") == "anonymous" @@ -73,6 +74,8 @@ def test_mydata_dataset_add_to_collection(qtbot, monkeypatch): task_id = str(uuid.uuid4()) tpath = pathlib.Path(common.make_upload_task(task_id=task_id)) mw = DCORAid() + qtbot.addWidget(mw) + QtWidgets.QApplication.setActiveWindow(mw) # monkeypatch success message box monkeypatch.setattr(QMessageBox, "information", lambda *args: None) @@ -121,6 +124,8 @@ def test_mydata_dataset_add_to_collection(qtbot, monkeypatch): def test_upload_simple(qtbot, monkeypatch): """Upload a test dataset""" mw = DCORAid() + qtbot.addWidget(mw) + QtWidgets.QApplication.setActiveWindow(mw) QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 300) dlg = UploadDialog(mw.panel_upload) @@ -143,6 +148,8 @@ def test_upload_task(qtbot, monkeypatch): task_id = str(uuid.uuid4()) tpath = common.make_upload_task(task_id=task_id) mw = DCORAid() + qtbot.addWidget(mw) + QtWidgets.QApplication.setActiveWindow(mw) QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 300) monkeypatch.setattr(QtWidgets.QFileDialog, "getOpenFileNames", lambda *args: ([tpath], None)) @@ -165,6 +172,8 @@ def test_upload_task_bad_dataset_id_no(qtbot, monkeypatch): dataset_id="wrong_id", dataset_dict=dataset_dict) mw = DCORAid() + qtbot.addWidget(mw) + QtWidgets.QApplication.setActiveWindow(mw) QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 300) # monkeypatch file selection dialog monkeypatch.setattr(QtWidgets.QFileDialog, "getOpenFileNames", @@ -192,6 +201,8 @@ def test_upload_task_bad_dataset_id_yes(qtbot, monkeypatch): dataset_id="wrong_id", dataset_dict=dataset_dict) mw = DCORAid() + qtbot.addWidget(mw) + QtWidgets.QApplication.setActiveWindow(mw) QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 300) # monkeypatch file selection dialog monkeypatch.setattr(QtWidgets.QFileDialog, "getOpenFileNames", @@ -207,6 +218,8 @@ def test_upload_task_bad_dataset_id_yes(qtbot, monkeypatch): mw.panel_upload.on_upload_task(action=act) uj = mw.panel_upload.jobs[-1] assert uj.task_id == task_id + mw.panel_upload.jobs.daemon_compress.shutdown_flag.set() + mw.panel_upload.jobs.daemon_compress.join() mw.close() @@ -218,6 +231,8 @@ def test_upload_task_missing_circle(qtbot, monkeypatch): tpath = common.make_upload_task(task_id=task_id, dataset_dict=dataset_dict) mw = DCORAid() + qtbot.addWidget(mw) + QtWidgets.QApplication.setActiveWindow(mw) QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 300) monkeypatch.setattr(QtWidgets.QFileDialog, "getOpenFileNames", lambda *args: ([tpath], None)) @@ -258,6 +273,8 @@ def test_upload_task_missing_circle_multiple(qtbot, monkeypatch): shutil.copytree(tpath2.parent, tdir / tpath2.parent.name) mw = DCORAid() + qtbot.addWidget(mw) + QtWidgets.QApplication.setActiveWindow(mw) QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 300) monkeypatch.setattr(QtWidgets.QFileDialog, "getExistingDirectory", lambda *args: str(tdir)) @@ -286,6 +303,8 @@ def test_upload_task_missing_circle_multiple(qtbot, monkeypatch): def test_upload_private(qtbot, monkeypatch): """Upload a private test dataset""" mw = DCORAid() + qtbot.addWidget(mw) + QtWidgets.QApplication.setActiveWindow(mw) QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 300) dlg = UploadDialog(mw.panel_upload) @@ -306,8 +325,12 @@ def test_upload_private(qtbot, monkeypatch): assert dataset_id is not None common.wait_for_job(upload_queue=mw.panel_upload.jobs, - dataset_id=mw.panel_upload.jobs[0].dataset_id) + dataset_id=dataset_id) mw.close() + QtTest.QTest.qWait(1000) + QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, + 3000) + # make sure the dataset is private api = common.get_api() dataset_dict = api.get(api_call="package_show", id=dataset_id) diff --git a/tests/test_upload_queue.py b/tests/test_upload_queue.py index 26c5e51..943a233 100644 --- a/tests/test_upload_queue.py +++ b/tests/test_upload_queue.py @@ -1,5 +1,6 @@ import pathlib import tempfile +import time import uuid import pytest @@ -63,9 +64,10 @@ def test_queue_find_zombie_caches(): realcache = pathlib.Path(cache_dir) / f"compress-{data['id']}" joblist = UploadQueue(api=api, cache_dir=cache_dir) # disable all daemons, so no uploading happens - joblist.daemon_compress.join() - joblist.daemon_upload.join() - joblist.daemon_verify.join() + joblist.daemon_compress.shutdown_flag.set() + joblist.daemon_upload.shutdown_flag.set() + joblist.daemon_verify.shutdown_flag.set() + time.sleep(.2) uj = joblist.new_job( dataset_id=data["id"], paths=[data_path / "calibration_beads_47_nocomp.rtdc"]) @@ -88,11 +90,11 @@ def test_queue_remove_job(): joblist = UploadQueue(api=api, path_persistent_job_list=pujl_path) # disable all daemons, so no uploading happens - joblist.daemon_compress.join() - joblist.daemon_upload.join() - joblist.daemon_verify.join() - uj = joblist.new_job(dataset_id=data["id"], - paths=[dpath]) + joblist.daemon_compress.shutdown_flag.set() + joblist.daemon_upload.shutdown_flag.set() + joblist.daemon_verify.shutdown_flag.set() + time.sleep(.2) + uj = joblist.new_job(dataset_id=data["id"], paths=[dpath]) assert uj.state == "init" joblist.remove_job(uj.dataset_id) assert uj not in joblist @@ -145,9 +147,10 @@ def test_persistent_upload_joblist_job_added_in_queue(): uj = load_task(task_path, api=api) uq = UploadQueue(api=api, path_persistent_job_list=pujl_path) - uq.daemon_compress.join() - uq.daemon_upload.join() - uq.daemon_verify.join() + uq.daemon_compress.shutdown_flag.set() + uq.daemon_upload.shutdown_flag.set() + uq.daemon_verify.shutdown_flag.set() + time.sleep(.2) assert pujl.num_queued == 0 uq.add_job(uj) @@ -253,9 +256,10 @@ def test_persistent_upload_joblist_skip_missing_resources(): assert not task_dir.exists() with pytest.warns(DCORAidQueueWarning, match="resources are missing"): uq = UploadQueue(api=api, path_persistent_job_list=pujl_path) - uq.daemon_compress.join() - uq.daemon_upload.join() - uq.daemon_verify.join() + uq.daemon_compress.shutdown_flag.set() + uq.daemon_upload.shutdown_flag.set() + uq.daemon_verify.shutdown_flag.set() + time.sleep(.2) # sanity checks assert len(uq) == 0 assert uq.jobs_eternal.num_queued == 1 @@ -280,9 +284,10 @@ def test_persistent_upload_joblist_skip_queued_resources(): pujl.immortalize_job(uj) uq = UploadQueue(api=api, path_persistent_job_list=pujl_path) - uq.daemon_compress.join() - uq.daemon_upload.join() - uq.daemon_verify.join() + uq.daemon_compress.shutdown_flag.set() + uq.daemon_upload.shutdown_flag.set() + uq.daemon_verify.shutdown_flag.set() + time.sleep(.2) assert len(uq) == 1 assert uq.jobs_eternal.num_queued == 1