From 9a9a4764a977dfad12b825b68882bf659f3a61fa Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 1 Feb 2024 21:48:58 +0100 Subject: [PATCH 1/3] test: use pytest.raise() instead of unittest ruff started to complain about this in the venv tests. --- test/test_basic.py | 40 ++++++++++++++++++++++------------------ test/test_p2p.py | 9 ++++----- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/test/test_basic.py b/test/test_basic.py index 37d6b98..166596c 100644 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -20,6 +20,7 @@ import unittest import dbusmock # type: ignore[import] # not typed +import pytest import systemd_ctypes from systemd_ctypes import bus, introspection @@ -141,13 +142,13 @@ def test_int_async(self): def test_int_error(self): # int overflow self.add_method('', 'Inc', 'i', 'i', 'ret = args[0] + 1') - with self.assertRaisesRegex(systemd_ctypes.BusError, 'OverflowError'): + with pytest.raises(systemd_ctypes.BusError, match='OverflowError'): self.bus_user.call_method(*TEST_ADDR, 'Inc', 'i', 0x7FFFFFFF) # uint underflow self.add_method('', 'Dec', 'u', 'u', 'ret = args[0] - 1') - with self.assertRaisesRegex(systemd_ctypes.BusError, - "OverflowError: can't convert negative value to unsigned int"): + with pytest.raises(systemd_ctypes.BusError, + match="OverflowError: can't convert negative value to unsigned int"): self.bus_user.call_method(*TEST_ADDR, 'Dec', 'u', 0) def test_float(self): @@ -213,35 +214,35 @@ def test_base64_binary_decode(self): self.assertEqual(result, ['R8OkbnNlZsO8w59jaGVu']) def test_unknown_method_sync(self): - with self.assertRaisesRegex(systemd_ctypes.BusError, '.*org.freedesktop.DBus.Error.UnknownMethod:.*' - 'Do is not a valid method of interface org.freedesktop.Test.Main'): + with pytest.raises(systemd_ctypes.BusError, match='.*org.freedesktop.DBus.Error.UnknownMethod:.*' + 'Do is not a valid method of interface org.freedesktop.Test.Main'): self.bus_user.call_method(*TEST_ADDR, 'Do') def test_unknown_method_async(self): message = self.bus_user.message_new_method_call(*TEST_ADDR, 'Do') - with self.assertRaisesRegex(systemd_ctypes.BusError, '.*org.freedesktop.DBus.Error.UnknownMethod:.*' - 'Do is not a valid method of interface org.freedesktop.Test.Main'): + with pytest.raises(systemd_ctypes.BusError, match='.*org.freedesktop.DBus.Error.UnknownMethod:.*' + 'Do is not a valid method of interface org.freedesktop.Test.Main'): self.async_call(message).get_body() def test_call_signature_mismatch(self): self.add_method('', 'Inc', 'i', 'i', 'ret = args[0] + 1') # specified signature does not match server, but locally consistent args - with self.assertRaisesRegex(systemd_ctypes.BusError, - '(InvalidArgs|TypeError).*Fewer items.*signature.*arguments'): + with pytest.raises(systemd_ctypes.BusError, + match='(InvalidArgs|TypeError).*Fewer items.*signature.*arguments'): self.bus_user.call_method(*TEST_ADDR, 'Inc', 'ii', 1, 2) - with self.assertRaisesRegex(systemd_ctypes.BusError, 'InvalidArgs|TypeError'): + with pytest.raises(systemd_ctypes.BusError, match='InvalidArgs|TypeError'): self.bus_user.call_method(*TEST_ADDR, 'Inc', 's', 'hello.*dbus.String.*integer') # specified signature does not match arguments - with self.assertRaisesRegex(AssertionError, r'call args \(1, 2\) have different length than signature.*'): + with pytest.raises(AssertionError, match=r'call args \(1, 2\) have different length than signature.*'): self.bus_user.call_method(*TEST_ADDR, 'Inc', 'i', 1, 2) - with self.assertRaisesRegex(TypeError, r'.*str.* as.* integer|int.*str'): + with pytest.raises(TypeError, match=r'.*str.* as.* integer|int.*str'): self.bus_user.call_method(*TEST_ADDR, 'Inc', 'i', 'hello') def test_custom_error(self): self.add_method('', 'Boom', '', '', 'raise dbus.exceptions.DBusException("no good", name="com.example.Error.NoGood")') - with self.assertRaisesRegex(systemd_ctypes.BusError, 'no good'): + with pytest.raises(systemd_ctypes.BusError, match='no good'): self.bus_user.call_method(*TEST_ADDR, 'Boom') def test_introspect(self): @@ -283,17 +284,20 @@ def test_service_replace(self): def test_request_name_errors(self): # name already exists - self.assertRaises(FileExistsError, self.bus_user.request_name, TEST_ADDR[0], bus.Bus.NameFlags.DEFAULT) + with pytest.raises(FileExistsError): + self.bus_user.request_name(TEST_ADDR[0], bus.Bus.NameFlags.DEFAULT) # invalid name - self.assertRaisesRegex(OSError, '.*Invalid argument', - self.bus_user.request_name, '', bus.Bus.NameFlags.DEFAULT) + with pytest.raises(OSError, match='.*Invalid argument'): + self.bus_user.request_name('', bus.Bus.NameFlags.DEFAULT) # invalid flag - self.assertRaisesRegex(OSError, '.*Invalid argument', self.bus_user.request_name, TEST_ADDR[0], 0xFF) + with pytest.raises(OSError, match='.*Invalid argument'): + self.bus_user.request_name(TEST_ADDR[0], 0xFF) # name not taken - self.assertRaises(ProcessLookupError, self.bus_user.release_name, 'com.example.NotThis') + with pytest.raises(ProcessLookupError): + self.bus_user.release_name('com.example.NotThis') if __name__ == '__main__': diff --git a/test/test_p2p.py b/test/test_p2p.py index 1641390..1697541 100644 --- a/test/test_p2p.py +++ b/test/test_p2p.py @@ -179,13 +179,13 @@ async def test(): def test_method_throws(self): async def test(): - with self.assertRaisesRegex(BusError, 'cockpit.Error.ZeroDivisionError: Divide by zero'): + with pytest.raises(BusError, match='cockpit.Error.ZeroDivisionError: Divide by zero'): await self.client.call_method_async(None, '/test', 'cockpit.Test', 'Divide', 'ii', 1554, 0) run_async(test()) def test_method_throws_oserror(self): async def test(): - with self.assertRaisesRegex(BusError, 'org.freedesktop.DBus.Error.FileNotFound: .*notthere.*'): + with pytest.raises(BusError, match='org.freedesktop.DBus.Error.FileNotFound: .*notthere.*'): await self.client.call_method_async(None, '/test', 'cockpit.Test', 'ReadFile', 's', 'notthere') run_async(test()) @@ -206,7 +206,7 @@ async def test(): def test_async_method_throws(self): async def test(): - with self.assertRaisesRegex(BusError, 'cockpit.Error.ZeroDivisionError: Divide by zero'): + with pytest.raises(BusError, match='cockpit.Error.ZeroDivisionError: Divide by zero'): await self.client.call_method_async(None, '/test', 'cockpit.Test', 'DivideSlowly', 'ii', 1554, 0) run_async(test()) @@ -249,8 +249,7 @@ async def test(): # Make sure that dropping the slot results in the object being un-exported self.test_object_slot = None - with self.assertRaisesRegex( - BusError, "org.freedesktop.DBus.Error.UnknownObject: Unknown object '/test'."): + with pytest.raises(BusError, match="org.freedesktop.DBus.Error.UnknownObject: Unknown object '/test'."): await self.client.call_method_async(None, '/test', 'cockpit.Test', 'Divide', 'ii', 1554, 37) run_async(test()) From 3b44156fcf37a8fbcd6e451243315a6bfca69ee6 Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 1 Feb 2024 10:29:32 +0100 Subject: [PATCH 2/3] librarywrapper: remove some old debugging code --- src/systemd_ctypes/librarywrapper.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/systemd_ctypes/librarywrapper.py b/src/systemd_ctypes/librarywrapper.py index a2962ed..12605e2 100644 --- a/src/systemd_ctypes/librarywrapper.py +++ b/src/systemd_ctypes/librarywrapper.py @@ -152,9 +152,6 @@ class ReferenceType(ctypes.c_void_p): def _install_cfuncs(cls, cdll: ctypes.CDLL) -> None: logger.debug('Installing stubs for %s:', cls) stubs = tuple(cls.__dict__.items()) - if cls.__name__ == 'sd_bus': - assert True, stubs - for name, stub in stubs: if name.startswith("__"): continue From fbb8e570ffaf3ede768dbfaa584d10018332a6e0 Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 1 Feb 2024 21:29:22 +0100 Subject: [PATCH 3/3] Be careful about freeing callback trampolines Our approach to handling Source and Slot objects is fairly clever: we tie the call trampoline and closure to the same object that holds a reference to the source object on the C side. When we are about to `__del__()` that object, we unref the source, preventing any further events from being dispatched. In this way, we can be completely sure that systemd will never call our trampoline after it's been freed. Unfortunately, this isn't good enough: we have a lot of cases where we free a Source while it is currently being dispatched. Until now we've never noticed a problem, but Cockpit recently added a stress-test for inotify (`test_fsinfo_watch_identity_changes`) which dispatches thousand of events and runs long enough that garbage collection gets invoked, freeing trampolines while they are currently running. Python does not hold a reference to the data, and this causes crashes on some architectures. Let's give Source and Slot a common base class (Trampoline) that models their common behaviour. This helper class also changes the `__del__()` behaviour: in case some external caller has requested deferral of the destruction of trampolines, we add them to a list just before we get deleted, to prevent the FFI wrapper from being destroyed with us. We know that the problem described above is only a problem if we're dispatching from systemd's event loop, so setup deferral on entry to the loop and drop the deferred objects on exit. Closes #63 --- src/systemd_ctypes/bus.py | 13 ++++--------- src/systemd_ctypes/event.py | 21 +++++++++++++++------ src/systemd_ctypes/libsystemd.py | 25 ++++++++++++++++++++++--- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/src/systemd_ctypes/bus.py b/src/systemd_ctypes/bus.py index d992c4d..35a524d 100644 --- a/src/systemd_ctypes/bus.py +++ b/src/systemd_ctypes/bus.py @@ -221,12 +221,7 @@ class Slot(libsystemd.sd_bus_slot): def __init__(self, callback: Callable[[BusMessage], bool]): def handler(message: WeakReference, _data: object, _err: object) -> int: return 1 if callback(BusMessage.ref(message)) else 0 - self.callback = libsystemd.sd_bus_message_handler_t(handler) - self.userdata = None - - def cancel(self) -> None: - self._unref() - self.value = None + self.trampoline = libsystemd.sd_bus_message_handler_t(handler) if typing.TYPE_CHECKING: @@ -363,7 +358,7 @@ async def call_async( timeout: Optional[int] = None ) -> BusMessage: pending = PendingCall() - self._call_async(byref(pending), message, pending.callback, pending.userdata, timeout or 0) + self._call_async(byref(pending), message, pending.trampoline, pending.userdata, timeout or 0) return await pending.future async def call_method_async( @@ -384,12 +379,12 @@ async def call_method_async( def add_match(self, rule: str, handler: Callable[[BusMessage], bool]) -> Slot: slot = Slot(handler) - self._add_match(byref(slot), rule, slot.callback, slot.userdata) + self._add_match(byref(slot), rule, slot.trampoline, slot.userdata) return slot def add_object(self, path: str, obj: 'BaseObject') -> Slot: slot = Slot(obj.message_received) - self._add_object(byref(slot), path, slot.callback, slot.userdata) + self._add_object(byref(slot), path, slot.trampoline, slot.userdata) obj.registered_on_bus(self, path) return slot diff --git a/src/systemd_ctypes/event.py b/src/systemd_ctypes/event.py index 0cafed7..885c625 100644 --- a/src/systemd_ctypes/event.py +++ b/src/systemd_ctypes/event.py @@ -21,14 +21,11 @@ from typing import Callable, ClassVar, Coroutine, List, Optional, Tuple from . import inotify, libsystemd -from .librarywrapper import Callback, Reference, UserData, byref +from .librarywrapper import Reference, UserData, byref class Event(libsystemd.sd_event): class Source(libsystemd.sd_event_source): - callback: Callback - userdata: UserData = None - def cancel(self) -> None: self._unref() self.value = None @@ -52,11 +49,11 @@ def callback(source: libsystemd.sd_event_source, event = _event.contents handler(inotify.Event(event.mask), event.cookie, event.name) return 0 - self.callback = libsystemd.sd_event_inotify_handler_t(callback) + self.trampoline = libsystemd.sd_event_inotify_handler_t(callback) def add_inotify(self, path: str, mask: inotify.Event, handler: InotifyHandler) -> InotifySource: source = Event.InotifySource(handler) - self._add_inotify(byref(source), path, mask, source.callback, source.userdata) + self._add_inotify(byref(source), path, mask, source.trampoline, source.userdata) return source def add_inotify_fd(self, fd: int, mask: inotify.Event, handler: InotifyHandler) -> InotifySource: @@ -78,6 +75,14 @@ def __init__(self, event: Optional[Event] = None) -> None: def select( self, timeout: Optional[float] = None ) -> List[Tuple[selectors.SelectorKey, int]]: + # It's common to drop the last reference to a Source or Slot object on + # a dispatch of that same source/slot from the main loop. If we happen + # to garbage collect before returning, the trampoline could be + # destroyed before we're done using it. Provide a mechanism to defer + # the destruction of trampolines for as long as we might be + # dispatching. This gets cleared again at the bottom, before return. + libsystemd.Trampoline.deferred = [] + while self.sd_event.prepare(): self.sd_event.dispatch() ready = super().select(timeout) @@ -87,6 +92,10 @@ def select( self.sd_event.dispatch() while self.sd_event.prepare(): self.sd_event.dispatch() + + # We can be sure we're not dispatching callbacks anymore + libsystemd.Trampoline.deferred = None + # This could return zero events with infinite timeout, but nobody seems to mind. return [(key, events) for (key, events) in ready if key != self.key] diff --git a/src/systemd_ctypes/libsystemd.py b/src/systemd_ctypes/libsystemd.py index 6cbe7db..7d2d636 100644 --- a/src/systemd_ctypes/libsystemd.py +++ b/src/systemd_ctypes/libsystemd.py @@ -18,7 +18,7 @@ import ctypes import os import sys -from typing import List, Optional, Tuple, Union +from typing import ClassVar, List, Optional, Tuple, Union from .inotify import inotify_event from .librarywrapper import ( @@ -33,6 +33,25 @@ from .typing import Annotated +class Trampoline(ReferenceType): + deferred: 'ClassVar[list[Callback] | None]' = None + trampoline: Callback + userdata: UserData = None + + def cancel(self) -> None: + self._unref() + self.value = None + + def __del__(self) -> None: + # This might be the currently-dispatching callback — make sure we don't + # destroy the trampoline before we return. We drop the deferred list + # from the event loop when we're sure we're not doing any dispatches. + if Trampoline.deferred is not None: + Trampoline.deferred.append(self.trampoline) + if self.value is not None: + self._unref() + + class sd_bus_error(ctypes.Structure): # This is ABI, so we are safe to assume it doesn't change. # Unfortunately, we lack anything like sd_bus_error_new(). @@ -65,7 +84,7 @@ class sd_id128(ctypes.Structure): ) -class sd_event_source(ReferenceType): +class sd_event_source(Trampoline): ... @@ -105,7 +124,7 @@ def _default(ret: Reference['sd_event']) -> Union[None, Errno]: ... -class sd_bus_slot(ReferenceType): +class sd_bus_slot(Trampoline): ...