diff --git a/pytest_asyncio/plugin.py b/pytest_asyncio/plugin.py index 26a37d48..e8eb02ba 100644 --- a/pytest_asyncio/plugin.py +++ b/pytest_asyncio/plugin.py @@ -196,9 +196,64 @@ def pytest_runtest_setup(item): } +def _event_loop_policy(): + """ + Create a new class for ClockEventLoopPolicy based on the current + class-type produced by `asyncio.get_event_loop_policy()`. This is important + for instances in which the enent-loop-policy has been changed. + """ + class ClockEventLoopPolicy(asyncio.get_event_loop_policy().__class__): + "A custom event loop policy for ClockEventLoop" + + def new_event_loop(self): + parent_loop = super().new_event_loop() + parent_loop.close() + + class ClockEventLoop(parent_loop.__class__): + """ + A custom event loop that explicitly advances time when requested. Otherwise, + this event loop advances time as expected. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._clockoffset = 0 + + def time(self): + """ + Return the time according the event loop's clock. + + This time is adjusted by the stored offset that allows for advancement + with `advance_time`. + """ + return super().time() + self._clockoffset + + def advance_time(self, seconds): + ''' + Advance time by a given offset in seconds. Returns an awaitable + that will complete after all tasks scheduled for after advancement + of time are proceeding. + ''' + if seconds > 0: + # advance the clock by the given offset + self._clockoffset += seconds + + # Once the clock is adjusted, new tasks may have just been + # scheduled for running in the next pass through the event loop + return self.create_task(asyncio.sleep(0)) + + # return the new event loop + return ClockEventLoop() + + # return the new event loop policy + return ClockEventLoopPolicy() + + @pytest.yield_fixture def event_loop(request): """Create an instance of the default event loop for each test case.""" + # reset the event loop policy: modify existing policy to give time advancement + asyncio.set_event_loop_policy(_event_loop_policy()) loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close() diff --git a/tests/test_simple_35.py b/tests/test_simple_35.py index 1e4d697c..ee014df1 100644 --- a/tests/test_simple_35.py +++ b/tests/test_simple_35.py @@ -86,3 +86,24 @@ async def test_asyncio_marker_method(self, event_loop): def test_async_close_loop(event_loop): event_loop.close() return 'ok' + + +@pytest.mark.asyncio +async def test_event_loop_advance_time(event_loop): + """ + Test the sliding time event loop fixture + """ + # A task is created that will sleep some number of seconds + SLEEP_TIME = 10 + + # create the task + task = event_loop.create_task(asyncio.sleep(SLEEP_TIME)) + assert not task.done() + + # start the task + await event_loop.advance_time(0) + assert not task.done() + + # process the timeout + await event_loop.advance_time(SLEEP_TIME) + assert task.done()