Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError(Event loop is closed) while trying to invalidate the cache in FastAPI with redis.asyncio #3492

Open
Vykp00 opened this issue Jan 31, 2025 · 3 comments

Comments

@Vykp00
Copy link

Vykp00 commented Jan 31, 2025

Our server experienced this error RuntimeError(Event loop is closed) every time we tried to invalidate multiple keys
I followed this issue and we pumped up to redis==5.2.1 but our server still raised it while trying to invalidate the cache in FastAPI. I also try to use lock to prevent race conditions but the error persists.

Version: redis 5.2.1

Platform: Python 3.8.10 on Ubuntu (Docker container: python:3.8.10)

For example:

# Mutex for Preventing Race Conditions
async def acquire_lock(redis_client: redis.Redis, lock_key: str, timeout: int = 5):
    """
        Acquires a lock using a Redis key

        @param redis_client: Redis client
        @param lock_key: Redis key
        @param timeout: Timeout in seconds for ``ex`` - sets an expiry flag on key ``name`` for ``ex`` seconds.

        @return: True if the lock is acquired, False otherwise.
    """
    # ``nx`` if set to True, set the value at key ``name`` to ``value`` only  if it does not exist.
    return await redis_client.set(lock_key, "locked", ex=timeout, nx=True)


async def release_lock(redis_client: redis.Redis, lock_key: str):
    """
        Releases the lock by deleting the lock key.

        @param redis_client: Redis client
        @param lock_key: Redis key
    """

    await redis_client.delete(lock_key)


async def delete_key_safely(redis_client_params: redis.Redis, key: str):
    """
    Safely deletes a Redis key and raises an exception if the deletion fails.
    """
    try:
        result = await redis_client_params.delete(key)
        if result == 0:
            raise ValueError(f"Key {key} could not be deleted.")
    except Exception as delete_error:
        )
        raise  # Re-raise the exception to propagate it to the caller


async def invalidate_cache_based_on_params(namespace: str, func, **params):
    """
        Invalidate cache keys based on a namespace and function name with locking and concurrency control.
    """
    try:
        # Ensure the event loop is running
        asyncio.get_running_loop()
        # Generate the initial part of the cache key using the custom key builder approach
        # Note: We're not using func directly here since we're constructing a pattern, not a specific key
        combined_params = [] #list used to store string-based parameters
        params_to_be_hashed = [] #used to store non-string parameters
        for k, v in list(params.items()):
            if v is not None:
                if isinstance(v, str):
                    combined_params.append(f"{k}={v}")
                else:
                    params_to_be_hashed.append(f"{k}={v}")
        params_str = ":".join(combined_params)
        hashed_params = hashlib.md5("".join(params_to_be_hashed).encode()).hexdigest()

        pattern = (
            f"{namespace}:{settings.OUR_ENVIRONMENT}:{func}:{params_str}*"
            if not params_to_be_hashed
            else f"{namespace}:{settings.OUR_ENVIRONMENT}:{func}:{params_str}:{hashed_params}*"
        )

        # Mutex to prevent race conditions
        lock_key = f"lock:{pattern}"
        if await acquire_lock(redis_client, lock_key):
            try:
                # Check if any keys that match the pattern
                matching_keys = []
                async for key in redis_client.scan_iter(match=pattern):
                    matching_keys.append(key)

                if matching_keys == []:
                    # No matching keys; nothing to invalidate
                    return

                # Concurrently delete matching keys, allowing the function to delete multiple keys simultaneously.
                delete_tasks = [delete_key_safely(redis_client, key) for key in matching_keys]
                await asyncio.gather(*delete_tasks)

            # Always release lock no matter the result is
            finally:
                await release_lock(redis_client, lock_key)
        else:
            # If the cache key is locked, return
            return
    except RuntimeError as redis_error:
        if "Event loop is closed" in str(redis_error):
            # Log issue
            return
        else:
            raise  # Re-raise for other runtime errors
    except Exception as other_error:
        # Re-raise all other exceptions
        raise

This is the log I got in my development setup
OS: Windows 10

Event loop is closed 
 Traceback (most recent call last):
  File "C:\Users\hello\OneDrive\Desktop\BE\backend-py3-11-6-env\Lib\site-packages\redis\asyncio\connection.py", line 472, in send_packed_command
    self._writer.writelines(command)
  File "C:\Users\hello\AppData\Local\Programs\Python\Python311\Lib\asyncio\streams.py", line 334, in writelines
    self._transport.writelines(data)
  File "C:\Users\hello\AppData\Local\Programs\Python\Python311\Lib\asyncio\transports.py", line 123, in writelines
    self.write(data)
  File "C:\Users\hello\AppData\Local\Programs\Python\Python311\Lib\asyncio\proactor_events.py", line 365, in write
    self._loop_writing(data=bytes(data))
  File "C:\Users\hello\AppData\Local\Programs\Python\Python311\Lib\asyncio\proactor_events.py", line 401, in _loop_writing
    self._write_fut = self._loop._proactor.send(self._sock, data)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'send'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\hello\OneDrive\Desktop\BE\main.py", line 9597, in get_user_details
    await invalidate_cache_based_on_params("backend", "get_user_info_query", user_id=initiator_uid)
  File "C:\Users\hello\OneDrive\Desktop\BE\main.py", line 654, in invalidate_cache_based_on_params
    async for key in redis_client.scan_iter(match=pattern):
  File "C:\Users\hello\OneDrive\Desktop\BE\backend-py3-11-6-env\Lib\site-packages\redis\commands\core.py", line 3223, in scan_iter
    cursor, data = await self.scan(
                   ^^^^^^^^^^^^^^^^
  File "C:\Users\hello\OneDrive\Desktop\BE\backend-py3-11-6-env\Lib\site-packages\redis\asyncio\client.py", line 612, in execute_command
    return await conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\hello\OneDrive\Desktop\BE\backend-py3-11-6-env\Lib\site-packages\redis\asyncio\retry.py", line 59, in call_with_retry
    return await do()
           ^^^^^^^^^^
  File "C:\Users\hello\OneDrive\Desktop\BE\backend-py3-11-6-env\Lib\site-packages\redis\asyncio\client.py", line 585, in _send_command_parse_response
    await conn.send_command(*args)
  File "C:\Users\hello\OneDrive\Desktop\BE\backend-py3-11-6-env\Lib\site-packages\redis\asyncio\connection.py", line 497, in send_command
    await self.send_packed_command(
  File "C:\Users\hello\OneDrive\Desktop\BE\backend-py3-11-6-env\Lib\site-packages\redis\asyncio\connection.py", line 492, in send_packed_command
    await self.disconnect(nowait=True)
  File "C:\Users\hello\OneDrive\Desktop\BE\backend-py3-11-6-env\Lib\site-packages\redis\asyncio\connection.py", line 417, in disconnect
    self._writer.close()  # type: ignore[union-attr]
    ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\hello\AppData\Local\Programs\Python\Python311\Lib\asyncio\streams.py", line 343, in close
    return self._transport.close()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\hello\AppData\Local\Programs\Python\Python311\Lib\asyncio\proactor_events.py", line 109, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "C:\Users\hello\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 761, in call_soon
    self._check_closed()
  File "C:\Users\hello\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 519, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
@vladvildanov
Copy link
Collaborator

@Vykp00 Hey! So let me clarify some things. You're using Redis as a cache and implemented a caching strategy in your app that are using FastAPI framework behind and redis-py is used as a transport? The OS you experienced issues at is Windows 10?

@Vykp00
Copy link
Author

Vykp00 commented Jan 31, 2025

@vladvildanov I experienced it on both Ubuntu and Windows 10, our production server is Ubuntu-latest (Docker container: python:3.8.10). Our server logs this error and I managed to reproduce it on my Windows 10 environment too. The issue often occurs when the FastAPi endpoint calls redis to invalidate caches

@vladvildanov
Copy link
Collaborator

@Vykp00 It looks like the issue with EventLoop lifecycle itself, redis-py async API doesn't manage EventLoop it works in the context of existing one. In your code you're trying to send a command but the EventLoop was already closed, so loop reference already got flushed by GC. Please try to debug why at this point of time EventLoop is closed whether it shouldn't be.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants