diff --git a/inputremapper/injection/mapping_handlers/combination_handler.py b/inputremapper/injection/mapping_handlers/combination_handler.py index 52429e251..320926234 100644 --- a/inputremapper/injection/mapping_handlers/combination_handler.py +++ b/inputremapper/injection/mapping_handlers/combination_handler.py @@ -19,7 +19,7 @@ from __future__ import annotations # needed for the TYPE_CHECKING import -from typing import TYPE_CHECKING, Dict, Hashable +from typing import TYPE_CHECKING, Dict, Hashable, Tuple import evdev from evdev.ecodes import EV_ABS, EV_REL @@ -44,9 +44,12 @@ class CombinationHandler(MappingHandler): # map of InputEvent.input_match_hash -> bool , keep track of the combination state _pressed_keys: Dict[Hashable, bool] - _output_state: bool # the last update we sent to a sub-handler + # the last update we sent to a sub-handler. If this is true, the output key is + # still being held down. + _output_previously_active: bool _sub_handler: InputEventHandler _handled_input_hashes: list[Hashable] + _requires_a_release: Dict[Tuple[int, int], bool] def __init__( self, @@ -59,8 +62,9 @@ def __init__( logger.debug(str(mapping)) super().__init__(combination, mapping, global_uinputs) self._pressed_keys = {} - self._output_state = False + self._output_previously_active = False self._context = context + self._requires_a_release = {} # prepare a key map for all events with non-zero value for input_config in combination: @@ -101,58 +105,118 @@ def notify( # we are not responsible for the event return False - was_activated = self.is_activated() - # update the state # The value of non-key input should have been changed to either 0 or 1 at this # point by other handlers. is_pressed = event.value == 1 self._pressed_keys[event.input_match_hash] = is_pressed # maybe this changes the activation status (triggered/not-triggered) - is_activated = self.is_activated() + changed = self._is_activated() != self._output_previously_active - if is_activated == was_activated or is_activated == self._output_state: - # nothing changed - if self._output_state: - # combination is active, consume the event - return True + if changed: + if is_pressed: + return self._handle_freshly_activated(suppress, event, source) else: - # combination inactive, forward the event - return False - - if is_activated: - # send key up events to the forwarded uinput - self.forward_release() - event = event.modify(value=1) + return self._handle_freshly_deactivated(event, source) else: - if self._output_state or self.mapping.is_axis_mapping(): - # we ignore the suppress argument for release events - # otherwise we might end up with stuck keys - # (test_event_pipeline.test_combination) - - # we also ignore it if the mapping specifies an output axis - # this will enable us to activate multiple axis with the same button - suppress = False - event = event.modify(value=0) + if is_pressed: + return self._handle_no_change_press(event) + else: + return self._handle_no_change_release(event) + def _handle_no_change_press(self, event: InputEvent) -> bool: + """A key was pressed, but this doesn't change the combinations activation state. + Can only happen if either the combination wasn't already active, or a duplicate + key-down event arrived (EV_ABS?) + """ + # self._output_previously_active is negated, because if the output is active, a + # key-down event triggered it, which then did not get forwarded, therefore + # it doesn't require a release. + self._require_release_later(not self._output_previously_active, event) + # output is active: consume the event + # output inactive: forward the event + return self._output_previously_active + + def _handle_no_change_release(self, event: InputEvent) -> bool: + """One of the combinations keys was released, but it didn't untrigger the + combination yet.""" + # Negate: `False` means that the event-reader will forward the release. + return not self._should_release_event(event) + + def _handle_freshly_activated( + self, + suppress: bool, + event: InputEvent, + source: evdev.InputDevice, + ) -> bool: + """The combination was deactivated, but is activated now.""" if suppress: return False + # Send key up events to the forwarded uinput if configured to do so. + self._forward_release() + logger.debug("Sending %s to sub-handler", self.mapping.input_combination) - self._output_state = bool(event.value) - return self._sub_handler.notify(event, source, suppress) + self._output_previously_active = bool(event.value) + sub_handler_result = self._sub_handler.notify(event, source, suppress) + + # Only if the sub-handler return False, we need a release-event later. + # If it handled the event, the user never sees this key-down event. + self._require_release_later(not sub_handler_result, event) + return sub_handler_result + + def _handle_freshly_deactivated( + self, + event: InputEvent, + source: evdev.InputDevice, + ) -> bool: + """The combination was activated, but is deactivated now.""" + # We ignore the `suppress` argument for release events. Otherwise, we + # might end up with stuck keys (test_event_pipeline.test_combination). + # In the case of output axis, this will enable us to activate multiple + # axis with the same button. + + logger.debug("Sending %s to sub-handler", self.mapping.input_combination) + self._output_previously_active = bool(event.value) + self._sub_handler.notify(event, source, suppress=False) + + # Negate: `False` means that the event-reader will forward the release. + return not self._should_release_event(event) + + def _should_release_event(self, event: InputEvent) -> bool: + """Check if the key-up event should be forwarded by the event-reader. + + After this, the release event needs to be injected by someone, otherwise the + dictionary was modified erroneously. If there is no entry, we assume that there + was no key-down event to release. Maybe a duplicate event arrived. + """ + # Ensure that all injected key-down events will get their release event + # injected eventually. + # If a key-up event arrives that will inactivate the combination, but + # for which previously a key-down event was injected (because it was + # an earlier key in the combination chain), then we need to ensure that its + # release is injected as well. So we get two release events in that case: + # one for the key, and one for the output. + assert event.value == 0 + return self._requires_a_release.pop(event.type_and_code, False) + + def _require_release_later(self, require: bool, event: InputEvent) -> None: + """Remember if this key-down event will need a release event later on.""" + assert event.value == 1 + self._requires_a_release[event.type_and_code] = require def reset(self) -> None: self._sub_handler.reset() for key in self._pressed_keys: self._pressed_keys[key] = False - self._output_state = False + self._requires_a_release = {} + self._output_previously_active = False - def is_activated(self) -> bool: + def _is_activated(self) -> bool: """Return if all keys in the keymap are set to True.""" return False not in self._pressed_keys.values() - def forward_release(self) -> None: + def _forward_release(self) -> None: """Forward a button release for all keys if this is a combination. This might cause duplicate key-up events but those are ignored by evdev anyway @@ -168,6 +232,9 @@ def forward_release(self) -> None: logger.debug("Forwarding release for %s", self.mapping.input_combination) for input_config in keys_to_release: + if not self._requires_a_release.get(input_config.type_and_code): + continue + origin_hash = input_config.origin_hash if origin_hash is None: logger.error( @@ -180,6 +247,9 @@ def forward_release(self) -> None: forward_to.write(*input_config.type_and_code, 0) forward_to.syn() + # We are done with this key, forget about it + del self._requires_a_release[input_config.type_and_code] + def needs_ranking(self) -> bool: return bool(self.input_configs) diff --git a/inputremapper/injection/mapping_handlers/key_handler.py b/inputremapper/injection/mapping_handlers/key_handler.py index 871128506..abe0bc299 100644 --- a/inputremapper/injection/mapping_handlers/key_handler.py +++ b/inputremapper/injection/mapping_handlers/key_handler.py @@ -69,7 +69,6 @@ def child(self): # used for logging def notify(self, event: InputEvent, *_, **__) -> bool: """Inject event.value to the target key.""" - event_tuple = (*self._maps_to, event.value) try: self.global_uinputs.write(event_tuple, self.mapping.target_uinput) diff --git a/tests/unit/test_event_pipeline/test_event_pipeline.py b/tests/unit/test_event_pipeline/test_event_pipeline.py index bd542d9e5..fbbeb0fe4 100644 --- a/tests/unit/test_event_pipeline/test_event_pipeline.py +++ b/tests/unit/test_event_pipeline/test_event_pipeline.py @@ -43,7 +43,9 @@ ABS_HAT0Y, KEY_B, KEY_C, + KEY_D, BTN_TL, + KEY_1, ) from inputremapper.configs.mapping import ( @@ -65,6 +67,7 @@ from tests.lib.logger import logger from tests.lib.constants import MAX_ABS, MIN_ABS from tests.lib.fixtures import Fixture, fixtures +from tests.lib.pipes import uinput_write_history from tests.lib.test_setup import test_setup @@ -77,7 +80,7 @@ def setUp(self): self.mapping_parser = MappingParser(self.global_uinputs) self.global_uinputs.is_service = True self.global_uinputs.prepare_all() - self.forward_uinput = evdev.UInput() + self.forward_uinput = evdev.UInput(name="test-forward-uinput") self.stop_event = asyncio.Event() def tearDown(self) -> None: @@ -116,7 +119,7 @@ def create_event_reader( @test_setup -class TestIdk(EventPipelineTestBase): +class TestCombination(EventPipelineTestBase): async def test_any_event_as_button(self): """As long as there is an event handler and a mapping we should be able to map anything to a button""" @@ -424,6 +427,10 @@ async def test_combination(self): output_symbol="c", ) + assert mapping_1.release_combination_keys + assert mapping_2.release_combination_keys + assert mapping_3.release_combination_keys + preset = Preset() preset.add(mapping_1) preset.add(mapping_2) @@ -448,6 +455,8 @@ async def test_combination(self): forwarded_history = self.forward_uinput.write_history + # I don't remember the specifics. I guess if there is a combination ongoing, + # it shouldn't trigger ABS_X -> a? self.assertNotIn((EV_KEY, a, 1), keyboard_history) # c and b should have been written, because the input from send_events @@ -473,6 +482,296 @@ async def test_combination(self): self.assertEqual(keyboard_history.count((EV_KEY, c, 0)), 1) self.assertEqual(keyboard_history.count((EV_KEY, b, 0)), 1) + async def test_combination_manual_release_in_press_order(self): + """Test if combinations map to keys properly.""" + # release_combination_keys is off + # press 5, then 6, then release 5, then release 6 + in_1 = keyboard_layout.get("7") + in_2 = keyboard_layout.get("8") + out = keyboard_layout.get("a") + + origin = fixtures.foo_device_2_keyboard + origin_hash = origin.get_device_hash() + + input_combination = InputCombination( + [ + InputConfig( + type=EV_KEY, + code=in_1, + origin_hash=origin_hash, + ), + InputConfig( + type=EV_KEY, + code=in_2, + origin_hash=origin_hash, + ), + ] + ) + + mapping = Mapping( + input_combination=input_combination.to_config(), + target_uinput="keyboard", + output_symbol="a", + release_combination_keys=False, + ) + + assert not mapping.release_combination_keys + + preset = Preset() + preset.add(mapping) + + event_reader = self.create_event_reader(preset, origin) + + keyboard_history = self.global_uinputs.get_uinput("keyboard").write_history + forwarded_history = self.forward_uinput.write_history + + # press the first key of the combination + await self.send_events([InputEvent.key(in_1, 1, origin_hash)], event_reader) + self.assertListEqual(forwarded_history, [(EV_KEY, in_1, 1)]) + + # then the second, it should trigger the combination + await self.send_events([InputEvent.key(in_2, 1, origin_hash)], event_reader) + self.assertListEqual(forwarded_history, [(EV_KEY, in_1, 1)]) + self.assertListEqual(keyboard_history, [(EV_KEY, out, 1)]) + + # release the first key. A key-down event was injected for it previously, so + # now we find a key-up event here as well. + await self.send_events([InputEvent.key(in_1, 0, origin_hash)], event_reader) + self.assertListEqual(forwarded_history, [(EV_KEY, in_1, 1), (EV_KEY, in_1, 0)]) + self.assertListEqual(keyboard_history, [(EV_KEY, out, 1), (EV_KEY, out, 0)]) + + # release the second key. No key-down event was injected, so we don't have a + # key-up event here either. + await self.send_events([InputEvent.key(in_2, 0, origin_hash)], event_reader) + self.assertListEqual(forwarded_history, [(EV_KEY, in_1, 1), (EV_KEY, in_1, 0)]) + self.assertListEqual(keyboard_history, [(EV_KEY, out, 1), (EV_KEY, out, 0)]) + + async def test_releases_before_triggering(self): + origin = fixtures.foo_device_2_keyboard + origin_hash = origin.get_device_hash() + + input_combination = InputCombination( + [ + InputConfig( + type=EV_KEY, + code=KEY_A, + origin_hash=origin_hash, + ), + InputConfig( + type=EV_KEY, + code=KEY_B, + origin_hash=origin_hash, + ), + ] + ) + + mapping = Mapping( + input_combination=input_combination.to_config(), + target_uinput="keyboard", + output_symbol="1", + release_combination_keys=True, + ) + + preset = Preset() + preset.add(mapping) + + event_reader = self.create_event_reader(preset, origin) + + await self.send_events( + [ + InputEvent.key(KEY_A, 1, origin_hash), + InputEvent.key(KEY_B, 1, origin_hash), + ], + event_reader, + ) + + # Other tests check forwarded_history and keyboard_history individually, + # so here is one that looks at the order in uinput_write_history + self.assertListEqual( + uinput_write_history, + [ + (EV_KEY, KEY_A, 1), + (EV_KEY, KEY_A, 0), + (EV_KEY, KEY_1, 1), + ], + ) + + async def test_suppressed_doesnt_forward_releases(self): + origin = fixtures.foo_device_2_keyboard + origin_hash = origin.get_device_hash() + + input_combination_1 = InputCombination( + [ + InputConfig( + type=EV_KEY, + code=KEY_A, + origin_hash=origin_hash, + ), + InputConfig( + type=EV_KEY, + code=KEY_B, + origin_hash=origin_hash, + ), + InputConfig( + type=EV_KEY, + code=KEY_C, + origin_hash=origin_hash, + ), + ] + ) + + mapping_1 = Mapping( + input_combination=input_combination_1.to_config(), + target_uinput="keyboard", + output_symbol="1", + release_combination_keys=False, + ) + + input_combination_2 = InputCombination( + [ + InputConfig( + type=EV_KEY, + code=KEY_A, + origin_hash=origin_hash, + ), + InputConfig( + type=EV_KEY, + code=KEY_C, + origin_hash=origin_hash, + ), + ] + ) + + mapping_2 = Mapping( + input_combination=input_combination_2.to_config(), + target_uinput="keyboard", + output_symbol="2", + release_combination_keys=True, + ) + + preset = Preset() + preset.add(mapping_1) + preset.add(mapping_2) + + event_reader = self.create_event_reader(preset, origin) + # Trigger mapping_1, mapping_2 should be suppressed + await self.send_events( + [ + InputEvent.key(KEY_A, 1, origin_hash), + InputEvent.key(KEY_B, 1, origin_hash), + InputEvent.key(KEY_C, 1, origin_hash), + ], + event_reader, + ) + + keyboard_history = self.global_uinputs.get_uinput("keyboard").write_history + forwarded_history = self.forward_uinput.write_history + # There used to be an incorrect KEY_A release, caused by + # `release_combination_keys` on the suppressed mapping. + self.assertListEqual( + forwarded_history, + [ + (EV_KEY, KEY_A, 1), + (EV_KEY, KEY_B, 1), + ], + ) + self.assertListEqual( + keyboard_history, + [ + (EV_KEY, KEY_1, 1), + ], + ) + + async def test_no_stuck_key(self): + origin = fixtures.foo_device_2_keyboard + origin_hash = origin.get_device_hash() + + input_combination_1 = InputCombination( + [ + InputConfig( + type=EV_KEY, + code=KEY_A, + origin_hash=origin_hash, + ), + InputConfig( + type=EV_KEY, + code=KEY_B, + origin_hash=origin_hash, + ), + InputConfig( + type=EV_KEY, + code=KEY_D, + origin_hash=origin_hash, + ), + ] + ) + + mapping_1 = Mapping( + input_combination=input_combination_1.to_config(), + target_uinput="keyboard", + output_symbol="1", + release_combination_keys=False, + ) + + input_combination_2 = InputCombination( + [ + InputConfig( + type=EV_KEY, + code=KEY_C, + origin_hash=origin_hash, + ), + InputConfig( + type=EV_KEY, + code=KEY_D, + origin_hash=origin_hash, + ), + ] + ) + + mapping_2 = Mapping( + input_combination=input_combination_2.to_config(), + target_uinput="keyboard", + output_symbol="2", + release_combination_keys=False, + ) + + preset = Preset() + preset.add(mapping_1) + preset.add(mapping_2) + + event_reader = self.create_event_reader(preset, origin) + # Trigger mapping_1, mapping_2 should be suppressed + await self.send_events( + [ + InputEvent.key(KEY_A, 1, origin_hash), + InputEvent.key(KEY_B, 1, origin_hash), + InputEvent.key(KEY_C, 1, origin_hash), + InputEvent.key(KEY_D, 1, origin_hash), + # Release c -> mapping_2 transitions from suppressed to passive. + # The release of c should be forwarded. + InputEvent.key(KEY_C, 0, origin_hash), + ], + event_reader, + ) + + keyboard_history = self.global_uinputs.get_uinput("keyboard").write_history + forwarded_history = self.forward_uinput.write_history + self.assertListEqual( + forwarded_history, + [ + (EV_KEY, KEY_A, 1), + (EV_KEY, KEY_B, 1), + (EV_KEY, KEY_C, 1), + (EV_KEY, KEY_C, 0), + ], + ) + self.assertListEqual( + keyboard_history, + [ + (EV_KEY, KEY_1, 1), + ], + ) + async def test_ignore_hold(self): # hold as in event-value 2, not in macro-hold. # linux will generate events with value 2 after input-remapper injected diff --git a/tests/unit/test_event_pipeline/test_mapping_handlers.py b/tests/unit/test_event_pipeline/test_mapping_handlers.py index 361d5d335..785d51236 100644 --- a/tests/unit/test_event_pipeline/test_mapping_handlers.py +++ b/tests/unit/test_event_pipeline/test_mapping_handlers.py @@ -21,7 +21,6 @@ """See TestEventPipeline for more tests.""" - import asyncio import unittest from unittest.mock import MagicMock @@ -59,7 +58,10 @@ from inputremapper.injection.mapping_handlers.hierarchy_handler import HierarchyHandler from inputremapper.injection.mapping_handlers.key_handler import KeyHandler from inputremapper.injection.mapping_handlers.macro_handler import MacroHandler -from inputremapper.injection.mapping_handlers.mapping_handler import MappingHandler +from inputremapper.injection.mapping_handlers.mapping_handler import ( + MappingHandler, + InputEventHandler, +) from inputremapper.injection.mapping_handlers.rel_to_abs_handler import RelToAbsHandler from inputremapper.input_event import InputEvent, EventActions @@ -309,27 +311,30 @@ def setUp(self): input_combination=input_combination.to_config(), target_uinput="mouse", output_symbol="BTN_LEFT", + release_combination_keys=True, ), self.context_mock, global_uinputs=self.global_uinputs, ) - def test_forward_correctly(self): - # In the past, if a mapping has inputs from two different sub devices, it - # always failed to send the release events to the correct one. - # Nowadays, self._context.get_forward_uinput(origin_hash) is used to - # release them correctly. - mock = MagicMock() - self.handler.set_sub_handler(mock) + sub_handler_mock = MagicMock(InputEventHandler) + self.handler.set_sub_handler(sub_handler_mock) # insert our own test-uinput to see what is being written to it - uinputs = { + self.uinputs = { self.mouse_hash: evdev.UInput(), self.keyboard_hash: evdev.UInput(), self.gamepad_hash: evdev.UInput(), } - self.context_mock.get_forward_uinput = lambda origin_hash: uinputs[origin_hash] + self.context_mock.get_forward_uinput = lambda origin_hash: self.uinputs[ + origin_hash + ] + def test_forward_correctly(self): + # In the past, if a mapping has inputs from two different sub devices, it + # always failed to send the release events to the correct one. + # Nowadays, self._context.get_forward_uinput(origin_hash) is used to + # release them correctly. # 1. trigger the combination self.handler.notify( InputEvent.rel( @@ -359,30 +364,23 @@ def test_forward_correctly(self): # 2. expect release events to be written to the correct devices, as indicated # by the origin_hash of the InputConfigs self.assertListEqual( - uinputs[self.mouse_hash].write_history, + self.uinputs[self.mouse_hash].write_history, [InputEvent.rel(self.input_combination[0].code, 0)], ) self.assertListEqual( - uinputs[self.keyboard_hash].write_history, + self.uinputs[self.keyboard_hash].write_history, [InputEvent.key(self.input_combination[1].code, 0)], ) - self.assertListEqual( - uinputs[self.gamepad_hash].write_history, - [InputEvent.key(self.input_combination[2].code, 0)], - ) + + # We do not expect a release event for this, because there was no key-down + # event when the combination triggered. + # self.assertListEqual( + # self.uinputs[self.gamepad_hash].write_history, + # [InputEvent.key(self.input_combination[2].code, 0)], + # ) def test_no_forwards(self): # if a combination is not triggered, nothing is released - mock = MagicMock() - self.handler.set_sub_handler(mock) - - # insert our own test-uinput to see what is being written to it - uinputs = { - self.mouse_hash: evdev.UInput(), - self.keyboard_hash: evdev.UInput(), - } - self.context_mock.get_forward_uinput = lambda origin_hash: uinputs[origin_hash] - # 1. inject any two events self.handler.notify( InputEvent.rel( @@ -402,8 +400,8 @@ def test_no_forwards(self): ) # 2. expect no release events to be written - self.assertListEqual(uinputs[self.mouse_hash].write_history, []) - self.assertListEqual(uinputs[self.keyboard_hash].write_history, []) + self.assertListEqual(self.uinputs[self.mouse_hash].write_history, []) + self.assertListEqual(self.uinputs[self.keyboard_hash].write_history, []) @test_setup diff --git a/tests/unit/test_injector.py b/tests/unit/test_injector.py index 7fc2fa921..ccdc8d8cd 100644 --- a/tests/unit/test_injector.py +++ b/tests/unit/test_injector.py @@ -61,6 +61,7 @@ from inputremapper.injection.macros.parse import parse from inputremapper.injection.numlock import is_numlock_on from inputremapper.input_event import InputEvent + from tests.lib.constants import EVENT_READ_TIMEOUT from tests.lib.fixtures import fixtures from tests.lib.fixtures import keyboard_keys @@ -456,9 +457,10 @@ def test_injector(self): [ # should execute a macro... InputEvent.key(8, 1), # forwarded - InputEvent.key(9, 1), # triggers macro - InputEvent.key(8, 0), # releases macro - InputEvent.key(9, 0), # forwarded + InputEvent.key(9, 1), # triggers macro, not forwarding + # macro runs now and injects a few more keys + InputEvent.key(8, 0), # releases macro (needs to be forwarded as well) + InputEvent.key(9, 0), # not forwarded, just like the down-event ], ) @@ -495,20 +497,15 @@ def test_injector(self): history = read_write_history_pipe() # 1 event before the combination was triggered - # 2 events for releasing the combination trigger (by combination handler) # 4 events for the macro - # 1 release of the event that didn't release the macro + # 1 event for releasing the previous key-down event # 2 for mapped keys # 3 for forwarded events - self.assertEqual(len(history), 13) + self.assertEqual(len(history), 11) # the first bit is ordered properly self.assertEqual(history[0], (EV_KEY, 8, 1)) # forwarded del history[0] - self.assertIn((EV_KEY, 8, 0), history[0:2]) # released by combination handler - self.assertIn((EV_KEY, 9, 0), history[0:2]) # released by combination handler - del history[0] - del history[0] # since the macro takes a little bit of time to execute, its # keystrokes are all over the place. @@ -530,10 +527,12 @@ def test_injector(self): del history[index_q_0] del history[index_q_1] - # the rest should be in order now. - # first the released combination key which did not release the macro. - # the combination key which released the macro won't appear here. - self.assertEqual(history[0], (EV_KEY, 9, 0)) + # The rest should be in order now. + # First the released combination key which did not release the macro. + # The combination key which released the macro won't appear here, because + # it also didn't have a key-down event and therefore doesn't need to be + # released itself. + self.assertEqual(history[0], (EV_KEY, 8, 0)) # value should be 1, even if the input event was -1. # Injected keycodes should always be either 0 or 1 self.assertEqual(history[1], (EV_KEY, code_a, 1))