From 0245b318f1a85bab4f84113587e8f886c7f72180 Mon Sep 17 00:00:00 2001 From: Florian Bruhin Date: Fri, 21 Mar 2025 22:32:35 +0100 Subject: [PATCH 1/9] Fix and clean up timer handling in signal/callback blockers - Always create the timeout `QTimer`, even if it's not going to be used. - Connect the signal once in `_AbstractSignalBlocker`/`CallbackBlocker.__init__` and never disconnect it. - When checking whether a timeout was set, simply check `self._timeout` instead of `self._timer`. Before this change, we conditionally created a `QTimer` and deleted it again when cleaning up. In addition to coming with hard to follow complexity, this also caused multiple issues around this timer: Warnings about disconnecting signal =================================== In `AbstractSignalBlocker._cleanup`, we attempted to disconnect `self._timer.timeout()` from `self._quit_loop_by_timeout`, under the condition that a timer was created in `__init__` (i.e. `self.timeout` was not `0` or `None`). However, the signal only got connected in `AbstractSignalBlocker.wait()`. Thus, if `AbstractSignalBlocker` is used as a context manager with a timeout and the signal is emitted inside it: - `self._timer` is present - The signal calls `self._quit_loop_by_signal()` - `self._cleanup()` gets called from there - That tries to disconnect `self._timer.timeout()` from `self._quit_loop_by_timeout()` - Since `self.wait()` was never called, the signal was never connected - Which then results in either an exception (PyQt), an internal SystemError (older PySide6) or a warning (newer PySide6). In 560f565c062abb60fbba44719649e50e56f49cca and later 81b317c5e2170b135076958a730ef62bdb448317 this was fixed by ignoring `TypeError`/`RuntimeError`, but that turned out to not be sufficient for newer PySide versions: #552, #558. The proper fix for this is to not attempt to disconnect a signal that was never connected, which makes all the PySide6 warnings go away (and thus we can run our testsuite with `-Werror` now). As a drive-by fix, this also removes the old `filterwarnings` mark definition, which was introduced in 95fee8b20ccd20cde728dc4e05a488797fd19b93 (perhaps as a stop-gap for older pytest versions?) but shouldn't be needed anymore (and is unused since e33880994f85bdb336f943764ffb6ad0dacc366e). AttributeError when a signal/callback is emitted from a different thread ======================================================================== If a signal is emitted from a thread, `_AbstractSignalBlocker._cleanup()` also gets called from the non-main thread (Qt will complain: "QObject::killTimer: Timers cannot be stopped from another thread"). If the signal emission just happened to happen between the None-check and calling `.start()` in `wait()`, it failed with an `AttributeError`: Main thread in `AbstractSignalBlocker.wait()`: ```python if self._timer is not None: # <--- this was true... self._timer.timeout.connect(self._quit_loop_by_timeout) # <--- ...now here, but self._timer is None now! self._timer.start() ```` Emitting thread in `AbstractSignalBlocker._cleanup()` via `._quit_loop_by_signal()`: ```python if self._timer is not None: ... self._timer = None # <--- here ``` In SignalBlocker.connect, we used: ```python actual_signal.connect(self._quit_loop_by_signal) ``` which by default is supposed to use a `QueuedConnection` if the signal gets emitted in a different thread: https://doc.qt.io/qt-6/qt.html#ConnectionType-enum (Default) If the receiver lives in the thread that emits the signal, `Qt::DirectConnection` is used. Otherwise, `Qt::QueuedConnection` is used. The connection type is determined when the signal is emitted. though then that page says: https://doc.qt.io/qt-6/qobject.html#thread-affinity Note: If a `QObject` has no thread affinity (that is, if `thread()` returns zero), or if it lives in a thread that has no running event loop, then it cannot receive queued signals or posted events. Which means `AbstractSignalBlocker` needs to be a `QObject` for this to work. However, that means we need to use `qt_api.QtCore.QObject` as subclass, i.e. at import time of `wait_signal.py`. Yet, `qt_api` only gets initialized in `pytest_configure` so that it's configurable via a pytest setting. Unfortunately, doing that is tricky, as it means we can't import `wait_signal.py` at all during import time, and people might do so for e.g. type annotations. With this refactoring, the `AttributeError` is out of the way, though there are other subtle failures with multi-threaded signals now: 1) `_quit_loop_by_signal()` -> `_cleanup()` now simply calls `self._timer.stop()` without setting `self._timer` to `None`. This still results in the same Qt message quoted above (after all, the timer still doesn't belong to the calling thread!), but it looks like the test terminates without any timeout anyways. From what I can gather, while the "low level timer" continues to run (and waste a minimal amount of resources), the QTimer still "detaches" from it and stops running. The commit adds a test to catch this case (currently marked as xfail). 2) The main thread in `wait()` can now still call `self._timer.start()` without an `AttributeError`. However, in theory this could restart the timer after it was already stopped by the signal emission, with a race between `_cleanup()` and `wait()`. See #586. This fixes the test-case posted by the reporter (added to the testsuite in a simplified version), but not the additional considerations above. The same fix is also applied to `CallbackBlocker`, though the test there is way more unreliable in triggering the issue, and thus is skipped for taking too long. --- CHANGELOG.rst | 17 ++++- pytest.ini | 3 +- src/pytestqt/wait_signal.py | 41 +++++------- tests/test_wait_signal.py | 129 ++++++++++++++++++++++++++++++++++++ 4 files changed, 162 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5f354b4..88aeb9b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,9 +1,20 @@ UNRELEASED ---------- -* Added official support for Python 3.13. -* Dropped support for EOL Python 3.8. -* Dropped support for EOL PySide 2. +- Added official support for Python 3.13. +- Dropped support for EOL Python 3.8. +- Dropped support for EOL PySide 2. +- Fixed PySide6 exceptions / warnings about being unable to disconnect signals + with ``qtbot.waitSignal`` (`#552`_, `#558`_). +- Reduced the likelyhood of trouble when using ``qtbot.waitSignal(s)`` and + ``qtbot.waitCallback`` where the signal/callback is emitted from a non-main + thread. In theory, more problems remain and this isn't a proper fix yet. In + practice, it seems impossible to provoke any problems in pytest-qt's testsuite. + (`#586`_) + +.. _#552: https://github.com/pytest-dev/pytest-qt/issues/552 +.. _#558: https://github.com/pytest-dev/pytest-qt/issues/558 +.. _#586: https://github.com/pytest-dev/pytest-qt/issues/586 4.4.0 (2024-02-07) ------------------ diff --git a/pytest.ini b/pytest.ini index 9ade678..26b0a16 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,5 +2,4 @@ testpaths = tests addopts = --strict-markers --strict-config xfail_strict = true -markers = - filterwarnings: pytest's filterwarnings marker +filterwarnings = error diff --git a/src/pytestqt/wait_signal.py b/src/pytestqt/wait_signal.py index 8da3836..04c66b6 100644 --- a/src/pytestqt/wait_signal.py +++ b/src/pytestqt/wait_signal.py @@ -24,12 +24,12 @@ def __init__(self, timeout=5000, raising=True): self.raising = raising self._signals = None # will be initialized by inheriting implementations self._timeout_message = "" - if timeout is None or timeout == 0: - self._timer = None - else: - self._timer = qt_api.QtCore.QTimer(self._loop) - self._timer.setSingleShot(True) + + self._timer = qt_api.QtCore.QTimer(self._loop) + self._timer.setSingleShot(True) + if timeout is not None: self._timer.setInterval(timeout) + self._timer.timeout.connect(self._quit_loop_by_timeout) def wait(self): """ @@ -43,11 +43,13 @@ def wait(self): return if self.timeout is None and not self._signals: raise ValueError("No signals or timeout specified.") - if self._timer is not None: - self._timer.timeout.connect(self._quit_loop_by_timeout) - self._timer.start() if self.timeout != 0: + if self.timeout is not None: + # asserts as a stop-gap for possible multithreading issues + assert not self.signal_triggered + self._timer.start() + assert not self.signal_triggered qt_api.exec(self._loop) if not self.signal_triggered and self.raising: @@ -62,10 +64,7 @@ def _quit_loop_by_timeout(self): def _cleanup(self): # store timeout message before the data to construct it is lost self._timeout_message = self._get_timeout_error_message() - if self._timer is not None: - _silent_disconnect(self._timer.timeout, self._quit_loop_by_timeout) - self._timer.stop() - self._timer = None + self._timer.stop() def _get_timeout_error_message(self): """Subclasses have to implement this, returning an appropriate error message for a TimeoutError.""" @@ -649,12 +648,12 @@ def __init__(self, timeout=5000, raising=True): self.kwargs = None self.called = False self._loop = qt_api.QtCore.QEventLoop() - if timeout is None: - self._timer = None - else: - self._timer = qt_api.QtCore.QTimer(self._loop) - self._timer.setSingleShot(True) + + self._timer = qt_api.QtCore.QTimer(self._loop) + self._timer.setSingleShot(True) + if timeout is not None: self._timer.setInterval(timeout) + self._timer.timeout.connect(self._quit_loop_by_timeout) def wait(self): """ @@ -664,8 +663,7 @@ def wait(self): __tracebackhide__ = True if self.called: return - if self._timer is not None: - self._timer.timeout.connect(self._quit_loop_by_timeout) + if self.timeout is not None: self._timer.start() qt_api.exec(self._loop) if not self.called and self.raising: @@ -687,10 +685,7 @@ def _quit_loop_by_timeout(self): self._loop.quit() def _cleanup(self): - if self._timer is not None: - _silent_disconnect(self._timer.timeout, self._quit_loop_by_timeout) - self._timer.stop() - self._timer = None + self._timer.stop() def __call__(self, *args, **kwargs): # Not inside the try: block, as if self.called is True, we did quit the diff --git a/tests/test_wait_signal.py b/tests/test_wait_signal.py index 87ca324..8903dd5 100644 --- a/tests/test_wait_signal.py +++ b/tests/test_wait_signal.py @@ -1364,3 +1364,132 @@ def test_timeout_not_raising(self, qtbot): assert not callback.called assert callback.args is None assert callback.kwargs is None + + +@pytest.mark.parametrize( + "check_stderr, count", + [ + # Checking stderr messages + pytest.param( + True, # check stderr + 200, # gets output reliably even with only few runs (often the first) + id="stderr", + ), + # Triggering AttributeError + pytest.param( + False, # don't check stderr + # Hopefully enough to trigger the AttributeError race condition reliably. + # With 500 runs, only 1 of 5 Windows PySide6 CI jobs triggered it (but all + # Ubuntu/macOS jobs did). With 1500 runs, Windows jobs still only triggered + # it 0-2 times. + # + # On my machine (Linux, Intel Core Ultra 9 185H), 500 runs trigger it + # reliably and take ~1s in total. + 2500 if sys.platform == "win32" else 500, + id="attributeerror", + ), + ], +) +@pytest.mark.parametrize("multi_blocker", [True, False]) +def test_signal_raised_from_thread( + pytester: pytest.Pytester, check_stderr: bool, multi_blocker: bool, count: int +) -> None: + """Wait for a signal with a thread. + + Extracted from https://github.com/pytest-dev/pytest-qt/issues/586 + """ + pytester.makepyfile( + f""" + import pytest + from pytestqt.qt_compat import qt_api + + + class Worker(qt_api.QtCore.QObject): + signal = qt_api.Signal() + + + @pytest.mark.parametrize("_", range({count})) + def test_thread(qtbot, capfd, _): + worker = Worker() + thread = qt_api.QtCore.QThread() + worker.moveToThread(thread) + thread.start() + + try: + if {multi_blocker}: # multi_blocker + with qtbot.waitSignals([worker.signal], timeout=500) as blocker: + worker.signal.emit() + else: + with qtbot.waitSignal(worker.signal, timeout=500) as blocker: + worker.signal.emit() + finally: + thread.quit() + thread.wait() + + if {check_stderr}: # check_stderr + out, err = capfd.readouterr() + assert not err + """ + ) + + res = pytester.runpytest_subprocess("-x", "-s") + outcomes = res.parseoutcomes() + + if outcomes.get("failed", 0) and check_stderr and qt_api.pytest_qt_api == "pyside6": + # The test succeeds on PyQt (unsure why!), but we can't check + # qt_api.pytest_qt_api at import time, so we can't use + # pytest.mark.xfail conditionally. + pytest.xfail( + "Qt error: QObject::killTimer: " + "Timers cannot be stopped from another thread" + ) + + res.assert_outcomes(passed=outcomes["passed"]) # no failed/error + + +@pytest.mark.skip(reason="Runs ~1min to reproduce bug reliably") +def test_callback_in_thread(pytester: pytest.Pytester) -> None: + """Wait for a callback with a thread. + + Inspired by https://github.com/pytest-dev/pytest-qt/issues/586 + """ + # Hopefully enough to trigger the bug reliably. + # + # On my machine (Linux, Intel Core Ultra 9 185H), sometimes the bug only + # triggers after ~30k runs (~44s). Thus, we skip this test by default. + count = 50_000 + + pytester.makepyfile( + f""" + import pytest + from pytestqt.qt_compat import qt_api + + + class Worker(qt_api.QtCore.QObject): + def __init__(self, callback): + super().__init__() + self.callback = callback + + def call_callback(self): + self.callback() + + + @pytest.mark.parametrize("_", range({count})) + def test_thread(qtbot, _): + thread = qt_api.QtCore.QThread() + + try: + with qtbot.waitCallback() as callback: + worker = Worker(callback) + worker.moveToThread(thread) + thread.started.connect(worker.call_callback) + thread.start() + finally: + thread.quit() + thread.wait() + """ + ) + + res = pytester.runpytest_subprocess("-x") + outcomes = res.parseoutcomes() + res.assert_outcomes(passed=outcomes["passed"]) # no failed/error From ef0f5cd39b613365c04a9bc5244d8ebf7339bf0c Mon Sep 17 00:00:00 2001 From: Florian Bruhin Date: Sun, 23 Mar 2025 14:31:36 +0100 Subject: [PATCH 2/9] tests: Simplify Timer utility class After enabling -Werror for tests, for unknown reasons, tests/test_wait_signal.py::test_zero_timeout[True] consistently fails on macOS + Python 3.13 + PySide6 (only there!) with: RuntimeWarning: Failed to disconnect (functools.partial(.Timer._emit of >, )) from signal "timeout()". Qt supports connecting signals to signals, so we can save the detour via Python and functools.partial. --- tests/conftest.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 6010c31..e09ea8b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,3 @@ -import functools import time import pytest @@ -63,10 +62,9 @@ def shutdown(self): def single_shot(self, signal, delay): t = qt_api.QtCore.QTimer(self) t.setSingleShot(True) - slot = functools.partial(self._emit, signal) - t.timeout.connect(slot) + t.timeout.connect(signal) t.start(delay) - self.timers_and_slots.append((t, slot)) + self.timers_and_slots.append((t, signal)) def single_shot_callback(self, callback, delay): t = qt_api.QtCore.QTimer(self) @@ -75,9 +73,6 @@ def single_shot_callback(self, callback, delay): t.start(delay) self.timers_and_slots.append((t, callback)) - def _emit(self, signal): - signal.emit() - timer = Timer() yield timer timer.shutdown() From 584b8ec745b8a26ecb32917058b56da8facab1a3 Mon Sep 17 00:00:00 2001 From: Florian Bruhin Date: Sun, 23 Mar 2025 14:59:50 +0100 Subject: [PATCH 3/9] Move SignalEmittedError / CallbackCalledTwiceError to pytestqt.exceptions Preparation for avoiding early wait_signal.py imports, but also seems more consistent. --- docs/wait_callback.rst | 2 +- src/pytestqt/exceptions.py | 19 +++++++++++++++++++ src/pytestqt/qtbot.py | 9 ++++++--- src/pytestqt/wait_signal.py | 24 +++++------------------- tests/test_wait_signal.py | 4 ++-- 5 files changed, 33 insertions(+), 25 deletions(-) diff --git a/docs/wait_callback.rst b/docs/wait_callback.rst index 26f27bf..72a7f52 100644 --- a/docs/wait_callback.rst +++ b/docs/wait_callback.rst @@ -26,7 +26,7 @@ Anything following the ``with`` block will be run only after the callback has be If the callback doesn't get called during the given timeout, :class:`qtbot.TimeoutError ` is raised. If it is called more than once, -:class:`qtbot.CallbackCalledTwiceError ` is raised. +:class:`qtbot.CallbackCalledTwiceError ` is raised. raising parameter ----------------- diff --git a/src/pytestqt/exceptions.py b/src/pytestqt/exceptions.py index d342876..ceab023 100644 --- a/src/pytestqt/exceptions.py +++ b/src/pytestqt/exceptions.py @@ -116,3 +116,22 @@ class ScreenshotError(Exception): Access via ``qtbot.ScreenshotError``. """ + + +class SignalEmittedError(Exception): + """ + .. versionadded:: 1.11 + + The exception thrown by :meth:`pytestqt.qtbot.QtBot.assertNotEmitted` if a + signal was emitted unexpectedly. + """ + + +class CallbackCalledTwiceError(Exception): + """ + .. versionadded:: 3.1 + + The exception thrown by :meth:`pytestqt.qtbot.QtBot.waitCallback` if a + callback was called twice. + """ + diff --git a/src/pytestqt/qtbot.py b/src/pytestqt/qtbot.py index 321b2a3..6052c0e 100644 --- a/src/pytestqt/qtbot.py +++ b/src/pytestqt/qtbot.py @@ -2,15 +2,18 @@ import weakref import warnings -from pytestqt.exceptions import TimeoutError, ScreenshotError +from pytestqt.exceptions import ( + TimeoutError, + ScreenshotError, + SignalEmittedError, + CallbackCalledTwiceError, +) from pytestqt.qt_compat import qt_api from pytestqt.wait_signal import ( SignalBlocker, MultiSignalBlocker, SignalEmittedSpy, - SignalEmittedError, CallbackBlocker, - CallbackCalledTwiceError, ) diff --git a/src/pytestqt/wait_signal.py b/src/pytestqt/wait_signal.py index 04c66b6..7b8a4c8 100644 --- a/src/pytestqt/wait_signal.py +++ b/src/pytestqt/wait_signal.py @@ -1,6 +1,10 @@ import functools -from pytestqt.exceptions import TimeoutError +from pytestqt.exceptions import ( + TimeoutError, + SignalEmittedError, + CallbackCalledTwiceError, +) from pytestqt.qt_compat import qt_api @@ -710,24 +714,6 @@ def __exit__(self, type, value, traceback): self.wait() -class SignalEmittedError(Exception): - """ - .. versionadded:: 1.11 - - The exception thrown by :meth:`pytestqt.qtbot.QtBot.assertNotEmitted` if a - signal was emitted unexpectedly. - """ - - -class CallbackCalledTwiceError(Exception): - """ - .. versionadded:: 3.1 - - The exception thrown by :meth:`pytestqt.qtbot.QtBot.waitCallback` if a - callback was called twice. - """ - - def _silent_disconnect(signal, slot): """Disconnects a signal from a slot, ignoring errors. Sometimes Qt might disconnect a signal automatically for unknown reasons. diff --git a/tests/test_wait_signal.py b/tests/test_wait_signal.py index 8903dd5..60f0ffa 100644 --- a/tests/test_wait_signal.py +++ b/tests/test_wait_signal.py @@ -5,10 +5,10 @@ import sys from pytestqt.qt_compat import qt_api -from pytestqt.wait_signal import ( +from pytestqt.wait_signal import SignalAndArgs +from pytestqt.exceptions import ( SignalEmittedError, TimeoutError, - SignalAndArgs, CallbackCalledTwiceError, ) From dcaebcf77ff2cd4e118693b9acd6b2143aad7b72 Mon Sep 17 00:00:00 2001 From: Florian Bruhin Date: Sun, 23 Mar 2025 15:01:36 +0100 Subject: [PATCH 4/9] exceptions: Add qtbot note to all exceptions --- src/pytestqt/exceptions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/pytestqt/exceptions.py b/src/pytestqt/exceptions.py index ceab023..051ca47 100644 --- a/src/pytestqt/exceptions.py +++ b/src/pytestqt/exceptions.py @@ -124,6 +124,8 @@ class SignalEmittedError(Exception): The exception thrown by :meth:`pytestqt.qtbot.QtBot.assertNotEmitted` if a signal was emitted unexpectedly. + + Access via ``qtbot.SignalEmittedError``. """ @@ -133,5 +135,7 @@ class CallbackCalledTwiceError(Exception): The exception thrown by :meth:`pytestqt.qtbot.QtBot.waitCallback` if a callback was called twice. + + Access via ``qtbot.CallbackCalledTwiceError``. """ From 3506da88026cbee9959efcaf8c1ae107b74b74ec Mon Sep 17 00:00:00 2001 From: Florian Bruhin Date: Sun, 23 Mar 2025 15:08:29 +0100 Subject: [PATCH 5/9] wait_signal: Simplify utils --- src/pytestqt/wait_signal.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/pytestqt/wait_signal.py b/src/pytestqt/wait_signal.py index 7b8a4c8..138bfb8 100644 --- a/src/pytestqt/wait_signal.py +++ b/src/pytestqt/wait_signal.py @@ -1,4 +1,6 @@ import functools +import dataclasses +from typing import Any from pytestqt.exceptions import ( TimeoutError, @@ -261,12 +263,12 @@ def _get_timeout_error_message(self): ) +@dataclasses.dataclass class SignalAndArgs: - def __init__(self, signal_name, args): - self.signal_name = signal_name - self.args = args + signal_name: str + args: list[Any] - def _get_readable_signal_with_optional_args(self): + def __str__(self) -> str: args = repr(self.args) if self.args else "" # remove signal parameter signature, e.g. turn "some_signal(str,int)" to "some_signal", because we're adding @@ -276,18 +278,9 @@ def _get_readable_signal_with_optional_args(self): return signal_name + args - def __str__(self): - return self._get_readable_signal_with_optional_args() - - def __eq__(self, other): - if isinstance(other, self.__class__): - return self.__dict__ == other.__dict__ - else: - return False - -# Returns e.g. "3rd" for 3, or "21st" for 21 -def get_ordinal_str(n): +def get_ordinal_str(n: int) -> str: + """Return e.g. "3rd" for 3, or "21st" for 21.""" return "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th")) From 9c5a2d8d5eb45d3a5250521e20beb855280f248f Mon Sep 17 00:00:00 2001 From: Florian Bruhin Date: Mon, 24 Mar 2025 00:24:53 +0100 Subject: [PATCH 6/9] Move wait signal implementations to a separate file With this, we can inherit from qt_api.QtCore.QObject without needing to entirely avoid importing wait_signal.py at initial import time. --- src/pytestqt/qtbot.py | 17 +- src/pytestqt/utils.py | 20 + src/pytestqt/wait_signal.py | 709 ++----------------------------- src/pytestqt/wait_signal_impl.py | 663 +++++++++++++++++++++++++++++ 4 files changed, 716 insertions(+), 693 deletions(-) create mode 100644 src/pytestqt/wait_signal_impl.py diff --git a/src/pytestqt/qtbot.py b/src/pytestqt/qtbot.py index 6052c0e..52cb751 100644 --- a/src/pytestqt/qtbot.py +++ b/src/pytestqt/qtbot.py @@ -9,12 +9,7 @@ CallbackCalledTwiceError, ) from pytestqt.qt_compat import qt_api -from pytestqt.wait_signal import ( - SignalBlocker, - MultiSignalBlocker, - SignalEmittedSpy, - CallbackBlocker, -) +from pytestqt import wait_signal def _parse_ini_boolean(value): @@ -353,7 +348,7 @@ def waitSignal(self, signal, *, timeout=5000, raising=None, check_params_cb=None f"Passing None as signal isn't supported anymore, use qtbot.wait({timeout}) instead." ) raising = self._should_raise(raising) - blocker = SignalBlocker( + blocker = wait_signal.SignalBlocker( timeout=timeout, raising=raising, check_params_cb=check_params_cb ) blocker.connect(signal) @@ -440,7 +435,7 @@ def waitSignals( len(check_params_cbs), len(signals) ) ) - blocker = MultiSignalBlocker( + blocker = wait_signal.MultiSignalBlocker( timeout=timeout, raising=raising, order=order, @@ -458,7 +453,7 @@ def wait(self, ms): While waiting, events will be processed and your test will stay responsive to user interface events or network communication. """ - blocker = MultiSignalBlocker(timeout=ms, raising=False) + blocker = wait_signal.MultiSignalBlocker(timeout=ms, raising=False) blocker.wait() @contextlib.contextmanager @@ -478,7 +473,7 @@ def assertNotEmitted(self, signal, *, wait=0): .. note:: This method is also available as ``assert_not_emitted`` (pep-8 alias) """ - spy = SignalEmittedSpy(signal) + spy = wait_signal.SignalEmittedSpy(signal) with spy, self.waitSignal(signal, timeout=wait, raising=False): yield spy.assert_not_emitted() @@ -592,7 +587,7 @@ def waitCallback(self, *, timeout=5000, raising=None): .. note:: This method is also available as ``wait_callback`` (pep-8 alias) """ raising = self._should_raise(raising) - blocker = CallbackBlocker(timeout=timeout, raising=raising) + blocker = wait_signal.CallbackBlocker(timeout=timeout, raising=raising) return blocker @contextlib.contextmanager diff --git a/src/pytestqt/utils.py b/src/pytestqt/utils.py index 79703c0..d4949e0 100644 --- a/src/pytestqt/utils.py +++ b/src/pytestqt/utils.py @@ -1,3 +1,7 @@ +import dataclasses +from typing import Any + + def get_marker(item, name): """Get a marker from a pytest item. @@ -9,3 +13,19 @@ def get_marker(item, name): except AttributeError: # pytest < 3.6 return item.get_marker(name) + + +@dataclasses.dataclass +class SignalAndArgs: + signal_name: str + args: list[Any] + + def __str__(self) -> str: + args = repr(self.args) if self.args else "" + + # remove signal parameter signature, e.g. turn "some_signal(str,int)" to "some_signal", because we're adding + # the actual parameters anyways + signal_name = self.signal_name + signal_name = signal_name.partition("(")[0] + + return signal_name + args diff --git a/src/pytestqt/wait_signal.py b/src/pytestqt/wait_signal.py index 138bfb8..c19c707 100644 --- a/src/pytestqt/wait_signal.py +++ b/src/pytestqt/wait_signal.py @@ -1,579 +1,30 @@ -import functools -import dataclasses -from typing import Any - -from pytestqt.exceptions import ( - TimeoutError, - SignalEmittedError, - CallbackCalledTwiceError, -) -from pytestqt.qt_compat import qt_api - - -class _AbstractSignalBlocker: - """ - Base class for :class:`SignalBlocker` and :class:`MultiSignalBlocker`. - - Provides :meth:`wait` and a context manager protocol, but no means to add - new signals and to detect when the signals should be considered "done". - This needs to be implemented by subclasses. - - Subclasses also need to provide ``self._signals`` which should evaluate to - ``False`` if no signals were configured. - - """ - - def __init__(self, timeout=5000, raising=True): - self._loop = qt_api.QtCore.QEventLoop() - self.timeout = timeout - self.signal_triggered = False - self.raising = raising - self._signals = None # will be initialized by inheriting implementations - self._timeout_message = "" - - self._timer = qt_api.QtCore.QTimer(self._loop) - self._timer.setSingleShot(True) - if timeout is not None: - self._timer.setInterval(timeout) - self._timer.timeout.connect(self._quit_loop_by_timeout) - - def wait(self): - """ - Waits until either a connected signal is triggered or timeout is reached. - - :raise ValueError: if no signals are connected and timeout is None; in - this case it would wait forever. - """ - __tracebackhide__ = True - if self.signal_triggered: - return - if self.timeout is None and not self._signals: - raise ValueError("No signals or timeout specified.") - - if self.timeout != 0: - if self.timeout is not None: - # asserts as a stop-gap for possible multithreading issues - assert not self.signal_triggered - self._timer.start() - assert not self.signal_triggered - qt_api.exec(self._loop) - - if not self.signal_triggered and self.raising: - raise TimeoutError(self._timeout_message) - - def _quit_loop_by_timeout(self): - try: - self._cleanup() - finally: - self._loop.quit() - - def _cleanup(self): - # store timeout message before the data to construct it is lost - self._timeout_message = self._get_timeout_error_message() - self._timer.stop() - - def _get_timeout_error_message(self): - """Subclasses have to implement this, returning an appropriate error message for a TimeoutError.""" - raise NotImplementedError # pragma: no cover - - def _extract_pyqt_signal_name(self, potential_pyqt_signal): - signal_name = potential_pyqt_signal.signal # type: str - if not isinstance(signal_name, str): - raise TypeError( - "Invalid 'signal' attribute in {}. " - "Expected str but got {}".format(signal_name, type(signal_name)) - ) - # strip magic number "2" that PyQt prepends to the signal names - signal_name = signal_name.lstrip("2") - return signal_name - - def _extract_signal_from_signal_tuple(self, potential_signal_tuple): - if isinstance(potential_signal_tuple, tuple): - if len(potential_signal_tuple) != 2: - raise ValueError( - "Signal tuple must have length of 2 (first element is the signal, " - "the second element is the signal's name)." - ) - signal_tuple = potential_signal_tuple - signal_name = signal_tuple[1] - if not isinstance(signal_name, str): - raise TypeError( - "Invalid type for provided signal name, " - "expected str but got {}".format(type(signal_name)) - ) - if not signal_name: - raise ValueError("The provided signal name may not be empty") - return signal_name - return "" - - def determine_signal_name(self, potential_signal_tuple): - """ - Attempts to determine the signal's name. If the user provided the signal name as 2nd value of the tuple, this - name has preference. Bad values cause a ``ValueError``. - Otherwise it attempts to get the signal from the ``signal`` attribute of ``signal`` (which only exists for - PyQt signals). - :returns: str name of the signal, an empty string if no signal name can be determined, or raises an error - in case the user provided an invalid signal name manually - """ - signal_name = self._extract_signal_from_signal_tuple(potential_signal_tuple) - - if not signal_name: - try: - signal_name = self._extract_pyqt_signal_name(potential_signal_tuple) - except AttributeError: - # not a PyQt signal - # -> no signal name could be determined - signal_name = "" - - return signal_name - - def get_callback_name(self, callback): - """Attempts to extract the name of the callback. Returns empty string in case of failure.""" - try: - name = callback.__name__ - except AttributeError: - try: - name = ( - callback.func.__name__ - ) # e.g. for callbacks wrapped with functools.partial() - except AttributeError: - name = "" - return name - - @staticmethod - def get_signal_from_potential_signal_tuple(signal_tuple): - if isinstance(signal_tuple, tuple): - return signal_tuple[0] - return signal_tuple - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - __tracebackhide__ = True - if value is None: - # only wait if no exception happened inside the "with" block - self.wait() - - -class SignalBlocker(_AbstractSignalBlocker): - """ - Returned by :meth:`pytestqt.qtbot.QtBot.waitSignal` method. - - :ivar int timeout: maximum time to wait for a signal to be triggered. Can - be changed before :meth:`wait` is called. - - :ivar bool signal_triggered: set to ``True`` if a signal (or all signals in - case of :class:`MultipleSignalBlocker`) was triggered, or - ``False`` if timeout was reached instead. Until :meth:`wait` is called, - this is set to ``None``. - - :ivar bool raising: - If :class:`qtbot.TimeoutError ` should be raised - if a timeout occurred. - - .. note:: contrary to the parameter of same name in - :meth:`pytestqt.qtbot.QtBot.waitSignal`, this parameter does not - consider the :ref:`qt_default_raising` option. - - :ivar list args: - The arguments which were emitted by the signal, or None if the signal - wasn't emitted at all. - - .. versionadded:: 1.10 - The *args* attribute. - - .. automethod:: wait - .. automethod:: connect - """ - - def __init__(self, timeout=5000, raising=True, check_params_cb=None): - super().__init__(timeout, raising=raising) - self._signals = [] - self.args = None - self.all_args = [] - self.check_params_callback = check_params_cb - self.signal_name = "" - - def connect(self, signal): - """ - Connects to the given signal, making :meth:`wait()` return once - this signal is emitted. - - More than one signal can be connected, in which case **any** one of - them will make ``wait()`` return. - - :param signal: QtCore.Signal or tuple (QtCore.Signal, str) - """ - self.signal_name = self.determine_signal_name(potential_signal_tuple=signal) - actual_signal = self.get_signal_from_potential_signal_tuple(signal) - actual_signal.connect(self._quit_loop_by_signal) - self._signals.append(actual_signal) - - def _quit_loop_by_signal(self, *args): - """ - quits the event loop and marks that we finished because of a signal. - """ - if self.check_params_callback: - self.all_args.append(args) - if not self.check_params_callback(*args): - return # parameter check did not pass - try: - self.signal_triggered = True - self.args = list(args) - self._cleanup() - finally: - self._loop.quit() - - def _cleanup(self): - super()._cleanup() - for signal in self._signals: - _silent_disconnect(signal, self._quit_loop_by_signal) - self._signals = [] - - def get_params_as_str(self): - if not self.all_args: - return "" - - if len(self.all_args[0]) == 1: - # we have a list of tuples with 1 element each (i.e. the signal has 1 parameter), it doesn't make sense - # to return something like "[(someParam,), (someParam,)]", it's just ugly. Instead return something like - # "[someParam, someParam]" - args_list = [arg[0] for arg in self.all_args] - else: - args_list = self.all_args - - return str(args_list) - - def _get_timeout_error_message(self): - if self.check_params_callback is not None: - return ( - "Signal {signal_name} emitted with parameters {params} " - "within {timeout} ms, but did not satisfy " - "the {cb_name} callback" - ).format( - signal_name=self.signal_name, - params=self.get_params_as_str(), - timeout=self.timeout, - cb_name=self.get_callback_name(self.check_params_callback), - ) - else: - return "Signal {signal_name} not emitted after {timeout} ms".format( - signal_name=self.signal_name, timeout=self.timeout - ) - - -@dataclasses.dataclass -class SignalAndArgs: - signal_name: str - args: list[Any] - - def __str__(self) -> str: - args = repr(self.args) if self.args else "" - - # remove signal parameter signature, e.g. turn "some_signal(str,int)" to "some_signal", because we're adding - # the actual parameters anyways - signal_name = self.signal_name - signal_name = signal_name.partition("(")[0] - - return signal_name + args - - -def get_ordinal_str(n: int) -> str: - """Return e.g. "3rd" for 3, or "21st" for 21.""" - return "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th")) - - -class NoMatchingIndexFoundError(Exception): - pass - - -class MultiSignalBlocker(_AbstractSignalBlocker): - """ - Returned by :meth:`pytestqt.qtbot.QtBot.waitSignals` method, blocks until - all signals connected to it are triggered or the timeout is reached. - - Variables identical to :class:`SignalBlocker`: - - ``timeout`` - - ``signal_triggered`` - - ``raising`` - - .. automethod:: wait - """ - - def __init__(self, timeout=5000, raising=True, check_params_cbs=None, order="none"): - super().__init__(timeout, raising=raising) - self._order = order - self._check_params_callbacks = check_params_cbs - self._signals_emitted = ( - [] - ) # list of booleans, indicates whether the signal was already emitted - self._signals_map = ( - {} - ) # maps from a unique Signal to a list of indices where to expect signal instance emits - self._signals = ( - [] - ) # list of all Signals (for compatibility with _AbstractSignalBlocker) - self._slots = [] # list of slot functions - self._signal_expected_index = 0 # only used when forcing order - self._strict_order_violated = False - self._actual_signal_and_args_at_violation = None - self._signal_names = ( - {} - ) # maps from the unique Signal to the name of the signal (as string) - self.all_signals_and_args = [] # list of SignalAndArgs instances - - def add_signals(self, signals): - """ - Adds the given signal to the list of signals which :meth:`wait()` waits - for. - - :param list signals: list of QtCore.Signal`s or tuples (QtCore.Signal, str) - """ - self._determine_unique_signals(signals) - self._create_signal_emitted_indices(signals) - self._connect_unique_signals() - - def _get_timeout_error_message(self): - if not self._are_signal_names_available(): - error_message = self._get_degenerate_error_message() - else: - error_message = self._get_expected_and_actual_signals_message() - if self._strict_order_violated: - error_message = self._get_order_violation_message() + error_message - - return error_message - - def _determine_unique_signals(self, signals): - # create a map that maps from a unique signal to a list of indices - # (positions) where this signal is expected (in case order matters) - signals_as_str = [ - str(self.get_signal_from_potential_signal_tuple(signal)) - for signal in signals - ] - # maps from a signal-string to one of the signal instances (the first one found) - signal_str_to_unique_signal = {} - for index, signal_str in enumerate(signals_as_str): - signal = self.get_signal_from_potential_signal_tuple(signals[index]) - potential_tuple = signals[index] - if signal_str not in signal_str_to_unique_signal: - unique_signal_tuple = potential_tuple - signal_str_to_unique_signal[signal_str] = signal - self._signals_map[signal] = [index] # create a new list - else: - # append to existing list - unique_signal = signal_str_to_unique_signal[signal_str] - self._signals_map[unique_signal].append(index) - unique_signal_tuple = signals[index] - - self._determine_and_save_signal_name(unique_signal_tuple) - - def _determine_and_save_signal_name(self, unique_signal_tuple): - signal_name = self.determine_signal_name(unique_signal_tuple) - if signal_name: # might be an empty string if no name could be determined - unique_signal = self.get_signal_from_potential_signal_tuple( - unique_signal_tuple - ) - self._signal_names[unique_signal] = signal_name - - def _create_signal_emitted_indices(self, signals): - for signal in signals: - self._signals_emitted.append(False) - - def _connect_unique_signals(self): - for unique_signal in self._signals_map: - slot = functools.partial(self._unique_signal_emitted, unique_signal) - self._slots.append(slot) - unique_signal.connect(slot) - self._signals.append(unique_signal) - - def _unique_signal_emitted(self, unique_signal, *args): - """ - Called when a given signal is emitted. - - If all expected signals have been emitted, quits the event loop and - marks that we finished because signals. - """ - self._record_emitted_signal_if_possible(unique_signal, *args) - - self._check_signal_match(unique_signal, *args) - - if self._all_signals_emitted(): - self.signal_triggered = True - try: - self._cleanup() - finally: - self._loop.quit() - - def _record_emitted_signal_if_possible(self, unique_signal, *args): - if self._are_signal_names_available(): - self.all_signals_and_args.append( - SignalAndArgs(signal_name=self._signal_names[unique_signal], args=args) - ) - - def _check_signal_match(self, unique_signal, *args): - if self._order == "none": - # perform the test for every matching index (stop after the first one that matches) - try: - successful_index = self._get_first_matching_index(unique_signal, *args) - self._signals_emitted[successful_index] = True - except NoMatchingIndexFoundError: # none found - pass - elif self._order == "simple": - if self._check_signal_matches_expected_index(unique_signal, *args): - self._signals_emitted[self._signal_expected_index] = True - self._signal_expected_index += 1 - else: # self.order == "strict" - if not self._strict_order_violated: - # only do the check if the strict order has not been violated yet - self._strict_order_violated = ( - True # assume the order has been violated this time - ) - if self._check_signal_matches_expected_index(unique_signal, *args): - self._signals_emitted[self._signal_expected_index] = True - self._signal_expected_index += 1 - self._strict_order_violated = ( - False # order has not been violated after all! - ) - else: - if self._are_signal_names_available(): - self._actual_signal_and_args_at_violation = SignalAndArgs( - signal_name=self._signal_names[unique_signal], args=args - ) - - def _all_signals_emitted(self): - return not self._strict_order_violated and all(self._signals_emitted) - - def _get_first_matching_index(self, unique_signal, *args): - successfully_emitted = False - successful_index = -1 - potential_indices = self._get_unemitted_signal_indices(unique_signal) - for potential_index in potential_indices: - if not self._violates_callback_at_index(potential_index, *args): - successful_index = potential_index - successfully_emitted = True - break - if not successfully_emitted: - raise NoMatchingIndexFoundError - - return successful_index - - def _check_signal_matches_expected_index(self, unique_signal, *args): - potential_indices = self._get_unemitted_signal_indices(unique_signal) - if potential_indices: - if self._signal_expected_index == potential_indices[0]: - if not self._violates_callback_at_index( - self._signal_expected_index, *args - ): - return True - return False - - def _violates_callback_at_index(self, index, *args): - """ - Checks if there's a callback at the provided index that is violates due to invalid parameters. Returns False if - there is no callback for that index, or if a callback exists but it wasn't violated (returned True). - Returns True otherwise. - """ - if self._check_params_callbacks: - callback_func = self._check_params_callbacks[index] - if callback_func: - if not callback_func(*args): - return True - return False - - def _get_unemitted_signal_indices(self, signal): - """Returns the indices for the provided signal for which NO signal instance has been emitted yet.""" - return [ - index - for index in self._signals_map[signal] - if not self._signals_emitted[index] - ] - - def _are_signal_names_available(self): - if self._signal_names: - return True - return False - - def _get_degenerate_error_message(self): - received_signals = sum(self._signals_emitted) - total_signals = len(self._signals_emitted) - return ( - "Received {actual} of the {total} expected signals. " - "To improve this error message, provide the names of the signals " - "in the waitSignals() call." - ).format(actual=received_signals, total=total_signals) - - def _get_expected_and_actual_signals_message(self): - if not self.all_signals_and_args: - emitted_signals = "None" - else: - emitted_signal_string_list = [str(_) for _ in self.all_signals_and_args] - emitted_signals = self._format_as_array(emitted_signal_string_list) - - missing_signal_strings = [] - for missing_signal_index in self._get_missing_signal_indices(): - missing_signal_strings.append( - self._get_signal_string_representation_for_index(missing_signal_index) - ) - missing_signals = self._format_as_array(missing_signal_strings) - - return "Emitted signals: {}. Missing: {}".format( - emitted_signals, missing_signals - ) - - @staticmethod - def _format_as_array(list_of_strings): - return "[{}]".format(", ".join(list_of_strings)) - - def _get_order_violation_message(self): - expected_signal_as_str = self._get_signal_string_representation_for_index( - self._signal_expected_index - ) - actual_signal_as_str = str(self._actual_signal_and_args_at_violation) - return ( - "Signal order violated! Expected {expected} as {ordinal} signal, " - "but received {actual} instead. " - ).format( - expected=expected_signal_as_str, - ordinal=get_ordinal_str(self._signal_expected_index + 1), - actual=actual_signal_as_str, - ) - - def _get_missing_signal_indices(self): - return [ - index - for index, value in enumerate(self._signals_emitted) - if not self._signals_emitted[index] - ] - - def _get_signal_string_representation_for_index(self, index): - """Returns something like or (callback: )""" - signal = self._get_signal_for_index(index) - signal_str_repr = self._signal_names[signal] - - if self._check_params_callbacks: - potential_callback = self._check_params_callbacks[index] - if potential_callback: - callback_name = self.get_callback_name(potential_callback) - if callback_name: - signal_str_repr += f" (callback: {callback_name})" - - return signal_str_repr - - def _get_signal_for_index(self, index): - for signal in self._signals_map: - if index in self._signals_map[signal]: - return signal - - def _cleanup(self): - super()._cleanup() - for i in range(len(self._signals)): - signal = self._signals[i] - slot = self._slots[i] - _silent_disconnect(signal, slot) - del self._signals_emitted[:] - self._signals_map.clear() - del self._slots[:] +from typing import TYPE_CHECKING + +from pytestqt.exceptions import SignalEmittedError +from pytestqt.utils import SignalAndArgs as SignalAndArgs +if TYPE_CHECKING: + from pytestqt.wait_signal_impl import ( + SignalBlocker as SignalBlocker, + MultiSignalBlocker as MultiSignalBlocker, + CallbackBlocker as CallbackBlocker, + ) + + +def __getattr__(name: str) -> type: + """Avoid importing wait_signal_impl at the top level as qt_api is uninitialized.""" + from pytestqt.wait_signal_impl import ( + SignalBlocker, + MultiSignalBlocker, + CallbackBlocker, + ) + if name == "SignalBlocker": + return SignalBlocker + elif name == "MultiSignalBlocker": + return MultiSignalBlocker + elif name == "CallbackBlocker": + return CallbackBlocker + else: + raise AttributeError(f"module {__name__} has no attribute {name}") class SignalEmittedSpy: @@ -609,109 +60,3 @@ def assert_not_emitted(self): ) else: raise SignalEmittedError(f"Signal {self.signal!r} unexpectedly emitted") - - -class CallbackBlocker: - """ - .. versionadded:: 3.1 - - An object which checks if the returned callback gets called. - - Intended to be used as a context manager. - - :ivar int timeout: maximum time to wait for the callback to be called. - - :ivar bool raising: - If :class:`qtbot.TimeoutError ` should be raised if - a timeout occurred. - - .. note:: contrary to the parameter of same name in - :meth:`pytestqt.qtbot.QtBot.waitCallback`, this parameter does not - consider the :ref:`qt_default_raising` option. - - :ivar list args: - The arguments with which the callback was called, or None if the - callback wasn't called at all. - - :ivar dict kwargs: - The keyword arguments with which the callback was called, or None if - the callback wasn't called at all. - """ - - def __init__(self, timeout=5000, raising=True): - self.timeout = timeout - self.raising = raising - self.args = None - self.kwargs = None - self.called = False - self._loop = qt_api.QtCore.QEventLoop() - - self._timer = qt_api.QtCore.QTimer(self._loop) - self._timer.setSingleShot(True) - if timeout is not None: - self._timer.setInterval(timeout) - self._timer.timeout.connect(self._quit_loop_by_timeout) - - def wait(self): - """ - Waits until either the returned callback is called or timeout is - reached. - """ - __tracebackhide__ = True - if self.called: - return - if self.timeout is not None: - self._timer.start() - qt_api.exec(self._loop) - if not self.called and self.raising: - raise TimeoutError("Callback wasn't called after %sms." % self.timeout) - - def assert_called_with(self, *args, **kwargs): - """ - Check that the callback was called with the same arguments as this - function. - """ - assert self.called - assert self.args == list(args) - assert self.kwargs == kwargs - - def _quit_loop_by_timeout(self): - try: - self._cleanup() - finally: - self._loop.quit() - - def _cleanup(self): - self._timer.stop() - - def __call__(self, *args, **kwargs): - # Not inside the try: block, as if self.called is True, we did quit the - # loop already. - if self.called: - raise CallbackCalledTwiceError("Callback called twice") - try: - self.args = list(args) - self.kwargs = kwargs - self.called = True - self._cleanup() - finally: - self._loop.quit() - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - __tracebackhide__ = True - if value is None: - # only wait if no exception happened inside the "with" block - self.wait() - - -def _silent_disconnect(signal, slot): - """Disconnects a signal from a slot, ignoring errors. Sometimes - Qt might disconnect a signal automatically for unknown reasons. - """ - try: - signal.disconnect(slot) - except (TypeError, RuntimeError): # pragma: no cover - pass diff --git a/src/pytestqt/wait_signal_impl.py b/src/pytestqt/wait_signal_impl.py new file mode 100644 index 0000000..bbf2204 --- /dev/null +++ b/src/pytestqt/wait_signal_impl.py @@ -0,0 +1,663 @@ +import functools + +from pytestqt.exceptions import TimeoutError, CallbackCalledTwiceError +from pytestqt.utils import SignalAndArgs +from pytestqt.qt_compat import qt_api + + +if not hasattr(qt_api, "QtCore"): + raise ImportError( + "wait_signal_impl.py got imported too early (before qt_api is initialized)! " + "To access the [...]Blocker classes for type annotations, " + "guard the imports with `if TYPE_CHECKING:`. For any other usage, " + "use `from pytestqt import wait_signal` and access `wait_signal.[...]Blocker`." + ) + + +def get_ordinal_str(n: int) -> str: + """Return e.g. "3rd" for 3, or "21st" for 21.""" + return "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th")) + + +class NoMatchingIndexFoundError(Exception): + pass + + +def _silent_disconnect(signal, slot): + """Disconnects a signal from a slot, ignoring errors. Sometimes + Qt might disconnect a signal automatically for unknown reasons. + """ + try: + signal.disconnect(slot) + except (TypeError, RuntimeError): # pragma: no cover + pass + + +class _AbstractSignalBlocker(qt_api.QtCore.QObject): + """ + Base class for :class:`SignalBlocker` and :class:`MultiSignalBlocker`. + + Provides :meth:`wait` and a context manager protocol, but no means to add + new signals and to detect when the signals should be considered "done". + This needs to be implemented by subclasses. + + Subclasses also need to provide ``self._signals`` which should evaluate to + ``False`` if no signals were configured. + + """ + + def __init__(self, timeout=5000, raising=True): + self._loop = qt_api.QtCore.QEventLoop() + self.timeout = timeout + self.signal_triggered = False + self.raising = raising + self._signals = None # will be initialized by inheriting implementations + self._timeout_message = "" + + self._timer = qt_api.QtCore.QTimer(self._loop) + self._timer.setSingleShot(True) + if timeout is not None: + self._timer.setInterval(timeout) + self._timer.timeout.connect(self._quit_loop_by_timeout) + + def wait(self): + """ + Waits until either a connected signal is triggered or timeout is reached. + + :raise ValueError: if no signals are connected and timeout is None; in + this case it would wait forever. + """ + __tracebackhide__ = True + if self.signal_triggered: + return + if self.timeout is None and not self._signals: + raise ValueError("No signals or timeout specified.") + + if self.timeout != 0: + if self.timeout is not None: + # asserts as a stop-gap for possible multithreading issues + assert not self.signal_triggered + self._timer.start() + assert not self.signal_triggered + qt_api.exec(self._loop) + + if not self.signal_triggered and self.raising: + raise TimeoutError(self._timeout_message) + + def _quit_loop_by_timeout(self): + try: + self._cleanup() + finally: + self._loop.quit() + + def _cleanup(self): + # store timeout message before the data to construct it is lost + self._timeout_message = self._get_timeout_error_message() + self._timer.stop() + + def _get_timeout_error_message(self): + """Subclasses have to implement this, returning an appropriate error message for a TimeoutError.""" + raise NotImplementedError # pragma: no cover + + def _extract_pyqt_signal_name(self, potential_pyqt_signal): + signal_name = potential_pyqt_signal.signal # type: str + if not isinstance(signal_name, str): + raise TypeError( + "Invalid 'signal' attribute in {}. Expected str but got {}".format( + signal_name, type(signal_name) + ) + ) + # strip magic number "2" that PyQt prepends to the signal names + signal_name = signal_name.lstrip("2") + return signal_name + + def _extract_signal_from_signal_tuple(self, potential_signal_tuple): + if isinstance(potential_signal_tuple, tuple): + if len(potential_signal_tuple) != 2: + raise ValueError( + "Signal tuple must have length of 2 (first element is the signal, " + "the second element is the signal's name)." + ) + signal_tuple = potential_signal_tuple + signal_name = signal_tuple[1] + if not isinstance(signal_name, str): + raise TypeError( + "Invalid type for provided signal name, " + "expected str but got {}".format(type(signal_name)) + ) + if not signal_name: + raise ValueError("The provided signal name may not be empty") + return signal_name + return "" + + def determine_signal_name(self, potential_signal_tuple): + """ + Attempts to determine the signal's name. If the user provided the signal name as 2nd value of the tuple, this + name has preference. Bad values cause a ``ValueError``. + Otherwise it attempts to get the signal from the ``signal`` attribute of ``signal`` (which only exists for + PyQt signals). + :returns: str name of the signal, an empty string if no signal name can be determined, or raises an error + in case the user provided an invalid signal name manually + """ + signal_name = self._extract_signal_from_signal_tuple(potential_signal_tuple) + + if not signal_name: + try: + signal_name = self._extract_pyqt_signal_name(potential_signal_tuple) + except AttributeError: + # not a PyQt signal + # -> no signal name could be determined + signal_name = "" + + return signal_name + + def get_callback_name(self, callback): + """Attempts to extract the name of the callback. Returns empty string in case of failure.""" + try: + name = callback.__name__ + except AttributeError: + try: + name = ( + callback.func.__name__ + ) # e.g. for callbacks wrapped with functools.partial() + except AttributeError: + name = "" + return name + + @staticmethod + def get_signal_from_potential_signal_tuple(signal_tuple): + if isinstance(signal_tuple, tuple): + return signal_tuple[0] + return signal_tuple + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + __tracebackhide__ = True + if value is None: + # only wait if no exception happened inside the "with" block + self.wait() + + +class SignalBlocker(_AbstractSignalBlocker): + """ + Returned by :meth:`pytestqt.qtbot.QtBot.waitSignal` method. + + :ivar int timeout: maximum time to wait for a signal to be triggered. Can + be changed before :meth:`wait` is called. + + :ivar bool signal_triggered: set to ``True`` if a signal (or all signals in + case of :class:`MultipleSignalBlocker`) was triggered, or + ``False`` if timeout was reached instead. Until :meth:`wait` is called, + this is set to ``None``. + + :ivar bool raising: + If :class:`qtbot.TimeoutError ` should be raised + if a timeout occurred. + + .. note:: contrary to the parameter of same name in + :meth:`pytestqt.qtbot.QtBot.waitSignal`, this parameter does not + consider the :ref:`qt_default_raising` option. + + :ivar list args: + The arguments which were emitted by the signal, or None if the signal + wasn't emitted at all. + + .. versionadded:: 1.10 + The *args* attribute. + + .. automethod:: wait + .. automethod:: connect + """ + + def __init__(self, timeout=5000, raising=True, check_params_cb=None): + super().__init__(timeout, raising=raising) + self._signals = [] + self.args = None + self.all_args = [] + self.check_params_callback = check_params_cb + self.signal_name = "" + + def connect(self, signal): + """ + Connects to the given signal, making :meth:`wait()` return once + this signal is emitted. + + More than one signal can be connected, in which case **any** one of + them will make ``wait()`` return. + + :param signal: QtCore.Signal or tuple (QtCore.Signal, str) + """ + self.signal_name = self.determine_signal_name(potential_signal_tuple=signal) + actual_signal = self.get_signal_from_potential_signal_tuple(signal) + actual_signal.connect(self._quit_loop_by_signal) + self._signals.append(actual_signal) + + def _quit_loop_by_signal(self, *args): + """ + quits the event loop and marks that we finished because of a signal. + """ + if self.check_params_callback: + self.all_args.append(args) + if not self.check_params_callback(*args): + return # parameter check did not pass + try: + self.signal_triggered = True + self.args = list(args) + self._cleanup() + finally: + self._loop.quit() + + def _cleanup(self): + super()._cleanup() + for signal in self._signals: + _silent_disconnect(signal, self._quit_loop_by_signal) + self._signals = [] + + def get_params_as_str(self): + if not self.all_args: + return "" + + if len(self.all_args[0]) == 1: + # we have a list of tuples with 1 element each (i.e. the signal has 1 parameter), it doesn't make sense + # to return something like "[(someParam,), (someParam,)]", it's just ugly. Instead return something like + # "[someParam, someParam]" + args_list = [arg[0] for arg in self.all_args] + else: + args_list = self.all_args + + return str(args_list) + + def _get_timeout_error_message(self): + if self.check_params_callback is not None: + return ( + "Signal {signal_name} emitted with parameters {params} " + "within {timeout} ms, but did not satisfy " + "the {cb_name} callback" + ).format( + signal_name=self.signal_name, + params=self.get_params_as_str(), + timeout=self.timeout, + cb_name=self.get_callback_name(self.check_params_callback), + ) + else: + return "Signal {signal_name} not emitted after {timeout} ms".format( + signal_name=self.signal_name, timeout=self.timeout + ) + + +class MultiSignalBlocker(_AbstractSignalBlocker): + """ + Returned by :meth:`pytestqt.qtbot.QtBot.waitSignals` method, blocks until + all signals connected to it are triggered or the timeout is reached. + + Variables identical to :class:`SignalBlocker`: + - ``timeout`` + - ``signal_triggered`` + - ``raising`` + + .. automethod:: wait + """ + + def __init__(self, timeout=5000, raising=True, check_params_cbs=None, order="none"): + super().__init__(timeout, raising=raising) + self._order = order + self._check_params_callbacks = check_params_cbs + self._signals_emitted = [] # list of booleans, indicates whether the signal was already emitted + self._signals_map = {} # maps from a unique Signal to a list of indices where to expect signal instance emits + self._signals = [] # list of all Signals (for compatibility with _AbstractSignalBlocker) + self._slots = [] # list of slot functions + self._signal_expected_index = 0 # only used when forcing order + self._strict_order_violated = False + self._actual_signal_and_args_at_violation = None + self._signal_names = {} # maps from the unique Signal to the name of the signal (as string) + self.all_signals_and_args = [] # list of SignalAndArgs instances + + def add_signals(self, signals): + """ + Adds the given signal to the list of signals which :meth:`wait()` waits + for. + + :param list signals: list of QtCore.Signal`s or tuples (QtCore.Signal, str) + """ + self._determine_unique_signals(signals) + self._create_signal_emitted_indices(signals) + self._connect_unique_signals() + + def _get_timeout_error_message(self): + if not self._are_signal_names_available(): + error_message = self._get_degenerate_error_message() + else: + error_message = self._get_expected_and_actual_signals_message() + if self._strict_order_violated: + error_message = self._get_order_violation_message() + error_message + + return error_message + + def _determine_unique_signals(self, signals): + # create a map that maps from a unique signal to a list of indices + # (positions) where this signal is expected (in case order matters) + signals_as_str = [ + str(self.get_signal_from_potential_signal_tuple(signal)) + for signal in signals + ] + # maps from a signal-string to one of the signal instances (the first one found) + signal_str_to_unique_signal = {} + for index, signal_str in enumerate(signals_as_str): + signal = self.get_signal_from_potential_signal_tuple(signals[index]) + potential_tuple = signals[index] + if signal_str not in signal_str_to_unique_signal: + unique_signal_tuple = potential_tuple + signal_str_to_unique_signal[signal_str] = signal + self._signals_map[signal] = [index] # create a new list + else: + # append to existing list + unique_signal = signal_str_to_unique_signal[signal_str] + self._signals_map[unique_signal].append(index) + unique_signal_tuple = signals[index] + + self._determine_and_save_signal_name(unique_signal_tuple) + + def _determine_and_save_signal_name(self, unique_signal_tuple): + signal_name = self.determine_signal_name(unique_signal_tuple) + if signal_name: # might be an empty string if no name could be determined + unique_signal = self.get_signal_from_potential_signal_tuple( + unique_signal_tuple + ) + self._signal_names[unique_signal] = signal_name + + def _create_signal_emitted_indices(self, signals): + for signal in signals: + self._signals_emitted.append(False) + + def _connect_unique_signals(self): + for unique_signal in self._signals_map: + slot = functools.partial(self._unique_signal_emitted, unique_signal) + self._slots.append(slot) + unique_signal.connect(slot) + self._signals.append(unique_signal) + + def _unique_signal_emitted(self, unique_signal, *args): + """ + Called when a given signal is emitted. + + If all expected signals have been emitted, quits the event loop and + marks that we finished because signals. + """ + self._record_emitted_signal_if_possible(unique_signal, *args) + + self._check_signal_match(unique_signal, *args) + + if self._all_signals_emitted(): + self.signal_triggered = True + try: + self._cleanup() + finally: + self._loop.quit() + + def _record_emitted_signal_if_possible(self, unique_signal, *args): + if self._are_signal_names_available(): + self.all_signals_and_args.append( + SignalAndArgs(signal_name=self._signal_names[unique_signal], args=args) + ) + + def _check_signal_match(self, unique_signal, *args): + if self._order == "none": + # perform the test for every matching index (stop after the first one that matches) + try: + successful_index = self._get_first_matching_index(unique_signal, *args) + self._signals_emitted[successful_index] = True + except NoMatchingIndexFoundError: # none found + pass + elif self._order == "simple": + if self._check_signal_matches_expected_index(unique_signal, *args): + self._signals_emitted[self._signal_expected_index] = True + self._signal_expected_index += 1 + else: # self.order == "strict" + if not self._strict_order_violated: + # only do the check if the strict order has not been violated yet + self._strict_order_violated = ( + True # assume the order has been violated this time + ) + if self._check_signal_matches_expected_index(unique_signal, *args): + self._signals_emitted[self._signal_expected_index] = True + self._signal_expected_index += 1 + self._strict_order_violated = ( + False # order has not been violated after all! + ) + else: + if self._are_signal_names_available(): + self._actual_signal_and_args_at_violation = SignalAndArgs( + signal_name=self._signal_names[unique_signal], args=args + ) + + def _all_signals_emitted(self): + return not self._strict_order_violated and all(self._signals_emitted) + + def _get_first_matching_index(self, unique_signal, *args): + successfully_emitted = False + successful_index = -1 + potential_indices = self._get_unemitted_signal_indices(unique_signal) + for potential_index in potential_indices: + if not self._violates_callback_at_index(potential_index, *args): + successful_index = potential_index + successfully_emitted = True + break + if not successfully_emitted: + raise NoMatchingIndexFoundError + + return successful_index + + def _check_signal_matches_expected_index(self, unique_signal, *args): + potential_indices = self._get_unemitted_signal_indices(unique_signal) + if potential_indices: + if self._signal_expected_index == potential_indices[0]: + if not self._violates_callback_at_index( + self._signal_expected_index, *args + ): + return True + return False + + def _violates_callback_at_index(self, index, *args): + """ + Checks if there's a callback at the provided index that is violates due to invalid parameters. Returns False if + there is no callback for that index, or if a callback exists but it wasn't violated (returned True). + Returns True otherwise. + """ + if self._check_params_callbacks: + callback_func = self._check_params_callbacks[index] + if callback_func: + if not callback_func(*args): + return True + return False + + def _get_unemitted_signal_indices(self, signal): + """Returns the indices for the provided signal for which NO signal instance has been emitted yet.""" + return [ + index + for index in self._signals_map[signal] + if not self._signals_emitted[index] + ] + + def _are_signal_names_available(self): + if self._signal_names: + return True + return False + + def _get_degenerate_error_message(self): + received_signals = sum(self._signals_emitted) + total_signals = len(self._signals_emitted) + return ( + "Received {actual} of the {total} expected signals. " + "To improve this error message, provide the names of the signals " + "in the waitSignals() call." + ).format(actual=received_signals, total=total_signals) + + def _get_expected_and_actual_signals_message(self): + if not self.all_signals_and_args: + emitted_signals = "None" + else: + emitted_signal_string_list = [str(_) for _ in self.all_signals_and_args] + emitted_signals = self._format_as_array(emitted_signal_string_list) + + missing_signal_strings = [] + for missing_signal_index in self._get_missing_signal_indices(): + missing_signal_strings.append( + self._get_signal_string_representation_for_index(missing_signal_index) + ) + missing_signals = self._format_as_array(missing_signal_strings) + + return "Emitted signals: {}. Missing: {}".format( + emitted_signals, missing_signals + ) + + @staticmethod + def _format_as_array(list_of_strings): + return "[{}]".format(", ".join(list_of_strings)) + + def _get_order_violation_message(self): + expected_signal_as_str = self._get_signal_string_representation_for_index( + self._signal_expected_index + ) + actual_signal_as_str = str(self._actual_signal_and_args_at_violation) + return ( + "Signal order violated! Expected {expected} as {ordinal} signal, " + "but received {actual} instead. " + ).format( + expected=expected_signal_as_str, + ordinal=get_ordinal_str(self._signal_expected_index + 1), + actual=actual_signal_as_str, + ) + + def _get_missing_signal_indices(self): + return [ + index + for index, value in enumerate(self._signals_emitted) + if not self._signals_emitted[index] + ] + + def _get_signal_string_representation_for_index(self, index): + """Returns something like or (callback: )""" + signal = self._get_signal_for_index(index) + signal_str_repr = self._signal_names[signal] + + if self._check_params_callbacks: + potential_callback = self._check_params_callbacks[index] + if potential_callback: + callback_name = self.get_callback_name(potential_callback) + if callback_name: + signal_str_repr += f" (callback: {callback_name})" + + return signal_str_repr + + def _get_signal_for_index(self, index): + for signal in self._signals_map: + if index in self._signals_map[signal]: + return signal + + def _cleanup(self): + super()._cleanup() + for i in range(len(self._signals)): + signal = self._signals[i] + slot = self._slots[i] + _silent_disconnect(signal, slot) + del self._signals_emitted[:] + self._signals_map.clear() + del self._slots[:] + + +class CallbackBlocker: + """ + .. versionadded:: 3.1 + + An object which checks if the returned callback gets called. + + Intended to be used as a context manager. + + :ivar int timeout: maximum time to wait for the callback to be called. + + :ivar bool raising: + If :class:`qtbot.TimeoutError ` should be raised if + a timeout occurred. + + .. note:: contrary to the parameter of same name in + :meth:`pytestqt.qtbot.QtBot.waitCallback`, this parameter does not + consider the :ref:`qt_default_raising` option. + + :ivar list args: + The arguments with which the callback was called, or None if the + callback wasn't called at all. + + :ivar dict kwargs: + The keyword arguments with which the callback was called, or None if + the callback wasn't called at all. + """ + + def __init__(self, timeout=5000, raising=True): + self.timeout = timeout + self.raising = raising + self.args = None + self.kwargs = None + self.called = False + self._loop = qt_api.QtCore.QEventLoop() + + self._timer = qt_api.QtCore.QTimer(self._loop) + self._timer.setSingleShot(True) + if timeout is not None: + self._timer.setInterval(timeout) + self._timer.timeout.connect(self._quit_loop_by_timeout) + + def wait(self): + """ + Waits until either the returned callback is called or timeout is + reached. + """ + __tracebackhide__ = True + if self.called: + return + if self.timeout is not None: + self._timer.start() + qt_api.exec(self._loop) + if not self.called and self.raising: + raise TimeoutError("Callback wasn't called after %sms." % self.timeout) + + def assert_called_with(self, *args, **kwargs): + """ + Check that the callback was called with the same arguments as this + function. + """ + assert self.called + assert self.args == list(args) + assert self.kwargs == kwargs + + def _quit_loop_by_timeout(self): + try: + self._cleanup() + finally: + self._loop.quit() + + def _cleanup(self): + self._timer.stop() + + def __call__(self, *args, **kwargs): + # Not inside the try: block, as if self.called is True, we did quit the + # loop already. + if self.called: + raise CallbackCalledTwiceError("Callback called twice") + try: + self.args = list(args) + self.kwargs = kwargs + self.called = True + self._cleanup() + finally: + self._loop.quit() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + __tracebackhide__ = True + if value is None: + # only wait if no exception happened inside the "with" block + self.wait() From dd0f762680a2edad10cde67579a35bf1987803e1 Mon Sep 17 00:00:00 2001 From: Florian Bruhin Date: Mon, 24 Mar 2025 01:00:53 +0100 Subject: [PATCH 7/9] WIP: Inherit signal blockers from QObject --- src/pytestqt/wait_signal_impl.py | 15 +++++++++--- tests/test_wait_signal.py | 42 +++++++++++++++++--------------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/pytestqt/wait_signal_impl.py b/src/pytestqt/wait_signal_impl.py index bbf2204..65abb7b 100644 --- a/src/pytestqt/wait_signal_impl.py +++ b/src/pytestqt/wait_signal_impl.py @@ -47,6 +47,7 @@ class _AbstractSignalBlocker(qt_api.QtCore.QObject): """ def __init__(self, timeout=5000, raising=True): + super().__init__() self._loop = qt_api.QtCore.QEventLoop() self.timeout = timeout self.signal_triggered = False @@ -54,7 +55,7 @@ def __init__(self, timeout=5000, raising=True): self._signals = None # will be initialized by inheriting implementations self._timeout_message = "" - self._timer = qt_api.QtCore.QTimer(self._loop) + self._timer = qt_api.QtCore.QTimer(self) self._timer.setSingleShot(True) if timeout is not None: self._timer.setInterval(timeout) @@ -84,6 +85,7 @@ def wait(self): if not self.signal_triggered and self.raising: raise TimeoutError(self._timeout_message) + @qt_api.Slot() def _quit_loop_by_timeout(self): try: self._cleanup() @@ -91,6 +93,7 @@ def _quit_loop_by_timeout(self): self._loop.quit() def _cleanup(self): + # assert self._timer.thread() == qt_api.QtCore.QThread.currentThread() # store timeout message before the data to construct it is lost self._timeout_message = self._get_timeout_error_message() self._timer.stop() @@ -234,6 +237,7 @@ def connect(self, signal): actual_signal.connect(self._quit_loop_by_signal) self._signals.append(actual_signal) + @qt_api.Slot() def _quit_loop_by_signal(self, *args): """ quits the event loop and marks that we finished because of a signal. @@ -251,6 +255,8 @@ def _quit_loop_by_signal(self, *args): def _cleanup(self): super()._cleanup() + # FIXME move to _AbstractSignalBlocker once we got MultiSignalBlocker correct + assert self._timer.thread() == qt_api.QtCore.QThread.currentThread() for signal in self._signals: _silent_disconnect(signal, self._quit_loop_by_signal) self._signals = [] @@ -567,7 +573,7 @@ def _cleanup(self): del self._slots[:] -class CallbackBlocker: +class CallbackBlocker(qt_api.QtCore.QObject): """ .. versionadded:: 3.1 @@ -595,6 +601,7 @@ class CallbackBlocker: """ def __init__(self, timeout=5000, raising=True): + super().__init__() self.timeout = timeout self.raising = raising self.args = None @@ -602,7 +609,7 @@ def __init__(self, timeout=5000, raising=True): self.called = False self._loop = qt_api.QtCore.QEventLoop() - self._timer = qt_api.QtCore.QTimer(self._loop) + self._timer = qt_api.QtCore.QTimer(self) self._timer.setSingleShot(True) if timeout is not None: self._timer.setInterval(timeout) @@ -631,6 +638,7 @@ def assert_called_with(self, *args, **kwargs): assert self.args == list(args) assert self.kwargs == kwargs + @qt_api.Slot() def _quit_loop_by_timeout(self): try: self._cleanup() @@ -638,6 +646,7 @@ def _quit_loop_by_timeout(self): self._loop.quit() def _cleanup(self): + assert self._timer.thread() == qt_api.QtCore.QThread.currentThread() self._timer.stop() def __call__(self, *args, **kwargs): diff --git a/tests/test_wait_signal.py b/tests/test_wait_signal.py index 60f0ffa..8b9dde3 100644 --- a/tests/test_wait_signal.py +++ b/tests/test_wait_signal.py @@ -1367,17 +1367,17 @@ def test_timeout_not_raising(self, qtbot): @pytest.mark.parametrize( - "check_stderr, count", + "check_warnings, count", [ - # Checking stderr messages + # Checking for warnings pytest.param( - True, # check stderr + True, # check warnings 200, # gets output reliably even with only few runs (often the first) id="stderr", ), # Triggering AttributeError pytest.param( - False, # don't check stderr + False, # don't check warnings # Hopefully enough to trigger the AttributeError race condition reliably. # With 500 runs, only 1 of 5 Windows PySide6 CI jobs triggered it (but all # Ubuntu/macOS jobs did). With 1500 runs, Windows jobs still only triggered @@ -1392,7 +1392,11 @@ def test_timeout_not_raising(self, qtbot): ) @pytest.mark.parametrize("multi_blocker", [True, False]) def test_signal_raised_from_thread( - pytester: pytest.Pytester, check_stderr: bool, multi_blocker: bool, count: int + monkeypatch: pytest.MonkeyPatch, + pytester: pytest.Pytester, + check_warnings: bool, + multi_blocker: bool, + count: int, ) -> None: """Wait for a signal with a thread. @@ -1409,7 +1413,7 @@ class Worker(qt_api.QtCore.QObject): @pytest.mark.parametrize("_", range({count})) - def test_thread(qtbot, capfd, _): + def test_thread(qtbot, _): worker = Worker() thread = qt_api.QtCore.QThread() worker.moveToThread(thread) @@ -1425,25 +1429,23 @@ def test_thread(qtbot, capfd, _): finally: thread.quit() thread.wait() - - if {check_stderr}: # check_stderr - out, err = capfd.readouterr() - assert not err """ ) - + if check_warnings: + monkeypatch.setenv("QT_FATAL_WARNINGS", "1") res = pytester.runpytest_subprocess("-x", "-s") - outcomes = res.parseoutcomes() - if outcomes.get("failed", 0) and check_stderr and qt_api.pytest_qt_api == "pyside6": - # The test succeeds on PyQt (unsure why!), but we can't check - # qt_api.pytest_qt_api at import time, so we can't use - # pytest.mark.xfail conditionally. - pytest.xfail( - "Qt error: QObject::killTimer: " - "Timers cannot be stopped from another thread" - ) + qtimer_message = "QObject::killTimer: Timers cannot be stopped from another thread" + if ( + qtimer_message in res.stderr.str() + and multi_blocker + and check_warnings + and qt_api.pytest_qt_api == "pyside6" + ): + # We haven't fixed MultiSignalBlocker yet... + pytest.xfail(f"Qt error: {qtimer_message}") + outcomes = res.parseoutcomes() res.assert_outcomes(passed=outcomes["passed"]) # no failed/error From 32ef058d4c30ccb69b098ef0f53739a325b47e2b Mon Sep 17 00:00:00 2001 From: Florian Bruhin Date: Mon, 24 Mar 2025 01:05:33 +0100 Subject: [PATCH 8/9] Fix import --- tests/test_wait_signal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_wait_signal.py b/tests/test_wait_signal.py index 8b9dde3..3295db3 100644 --- a/tests/test_wait_signal.py +++ b/tests/test_wait_signal.py @@ -5,7 +5,7 @@ import sys from pytestqt.qt_compat import qt_api -from pytestqt.wait_signal import SignalAndArgs +from pytestqt.utils import SignalAndArgs from pytestqt.exceptions import ( SignalEmittedError, TimeoutError, From f655c03260dd511e7a9088ca5fc32ccb91d64884 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 24 Mar 2025 00:07:39 +0000 Subject: [PATCH 9/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pytestqt/exceptions.py | 1 - src/pytestqt/wait_signal.py | 3 ++- src/pytestqt/wait_signal_impl.py | 16 ++++++++++++---- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/pytestqt/exceptions.py b/src/pytestqt/exceptions.py index 051ca47..a02567a 100644 --- a/src/pytestqt/exceptions.py +++ b/src/pytestqt/exceptions.py @@ -138,4 +138,3 @@ class CallbackCalledTwiceError(Exception): Access via ``qtbot.CallbackCalledTwiceError``. """ - diff --git a/src/pytestqt/wait_signal.py b/src/pytestqt/wait_signal.py index c19c707..d5b3c9a 100644 --- a/src/pytestqt/wait_signal.py +++ b/src/pytestqt/wait_signal.py @@ -1,7 +1,7 @@ from typing import TYPE_CHECKING from pytestqt.exceptions import SignalEmittedError -from pytestqt.utils import SignalAndArgs as SignalAndArgs + if TYPE_CHECKING: from pytestqt.wait_signal_impl import ( SignalBlocker as SignalBlocker, @@ -17,6 +17,7 @@ def __getattr__(name: str) -> type: MultiSignalBlocker, CallbackBlocker, ) + if name == "SignalBlocker": return SignalBlocker elif name == "MultiSignalBlocker": diff --git a/src/pytestqt/wait_signal_impl.py b/src/pytestqt/wait_signal_impl.py index 65abb7b..1b59161 100644 --- a/src/pytestqt/wait_signal_impl.py +++ b/src/pytestqt/wait_signal_impl.py @@ -310,14 +310,22 @@ def __init__(self, timeout=5000, raising=True, check_params_cbs=None, order="non super().__init__(timeout, raising=raising) self._order = order self._check_params_callbacks = check_params_cbs - self._signals_emitted = [] # list of booleans, indicates whether the signal was already emitted - self._signals_map = {} # maps from a unique Signal to a list of indices where to expect signal instance emits - self._signals = [] # list of all Signals (for compatibility with _AbstractSignalBlocker) + self._signals_emitted = ( + [] + ) # list of booleans, indicates whether the signal was already emitted + self._signals_map = ( + {} + ) # maps from a unique Signal to a list of indices where to expect signal instance emits + self._signals = ( + [] + ) # list of all Signals (for compatibility with _AbstractSignalBlocker) self._slots = [] # list of slot functions self._signal_expected_index = 0 # only used when forcing order self._strict_order_violated = False self._actual_signal_and_args_at_violation = None - self._signal_names = {} # maps from the unique Signal to the name of the signal (as string) + self._signal_names = ( + {} + ) # maps from the unique Signal to the name of the signal (as string) self.all_signals_and_args = [] # list of SignalAndArgs instances def add_signals(self, signals):