diff --git a/inputremapper/injection/macros/macro.py b/inputremapper/injection/macros/macro.py index 90eb4f57f..a4adaeb00 100644 --- a/inputremapper/injection/macros/macro.py +++ b/inputremapper/injection/macros/macro.py @@ -40,6 +40,7 @@ import copy import math import re +import random from typing import List, Callable, Awaitable, Tuple, Optional, Union, Any from evdev.ecodes import ( @@ -468,12 +469,19 @@ async def task(handler: Callable): self.tasks.append(task) - def add_wait(self, time: Union[int, float]): + def add_wait(self, time: Union[str, float, int], max_time=None): """Wait time in milliseconds.""" - time = self._type_check(time, [int, float], "wait", 1) + time = self._type_check(time, [float, int], "wait", 1) + max_time = self._type_check(max_time, [float, int, None], "wait", 2) async def task(_): - await asyncio.sleep(self._resolve(time, [int, float]) / 1000) + resolved_time = self._resolve(time, [float, int]) + resolved_max_time = self._resolve(max_time, [float, int]) + + if resolved_max_time is not None and resolved_max_time > resolved_time: + resolved_time = random.uniform(resolved_time, resolved_max_time) + + await asyncio.sleep(resolved_time / 1000) self.tasks.append(task) diff --git a/tests/unit/test_macros.py b/tests/unit/test_macros.py index 6f0b5ce44..8dbfd6efb 100644 --- a/tests/unit/test_macros.py +++ b/tests/unit/test_macros.py @@ -1344,6 +1344,56 @@ def set_foo(value): ) +@test_setup +class TestWait(MacroTestBase): + async def assert_time_randomized( + self, + macro: Macro, + min_: float, + max_: float, + ): + for _ in range(100): + start = time.time() + await macro.run(self.handler) + time_taken = time.time() - start + + # Any of the runs should be within the defined range, to prove that they + # are indeed random. + if min_ < time_taken < max_: + return + + raise AssertionError("`wait` was not randomized") + + async def test_wait_1_core(self): + mapping = DummyMapping() + mapping.macro_key_sleep_ms = 0 + macro = parse("repeat(5, wait(50))", self.context, mapping, True) + + start = time.time() + await macro.run(self.handler) + time_per_iteration = (time.time() - start) / 5 + + self.assertLess(abs(time_per_iteration - 0.05), 0.005) + + async def test_wait_2_ranged(self): + mapping = DummyMapping() + mapping.macro_key_sleep_ms = 0 + macro = parse("wait(1, 100)", self.context, mapping, True) + await self.assert_time_randomized(macro, 0.02, 0.08) + + async def test_wait_3_ranged_single_get(self): + mapping = DummyMapping() + mapping.macro_key_sleep_ms = 0 + macro = parse("set(a, 100).wait(1, $a)", self.context, mapping, True) + await self.assert_time_randomized(macro, 0.02, 0.08) + + async def test_wait_4_ranged_double_get(self): + mapping = DummyMapping() + mapping.macro_key_sleep_ms = 0 + macro = parse("set(a, 1).set(b, 100).wait($a, $b)", self.context, mapping, True) + await self.assert_time_randomized(macro, 0.02, 0.08) + + @test_setup class TestIfSingle(MacroTestBase): async def test_if_single(self):