Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,42 @@ Default: ``None``

If not set it will be the number of CPUs available.

``CELERY_USE_ASYNC_WORKER``
^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default: ``False``

If enabled, will use the newer Celery async worker thread.

``CELERY_ASYNC_PREFETCH_MULTIPLIER``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default: ``10``

If async workers are enabled, this will be used for the ``CELERY_WORKER_PREFETCH_MULTIPLIER`` config

``CELERY_ASYNC_THREAD_MAX_TASKS``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default: ``100``

The maximum number of active tasks in the async thread. If more tasks are added, task submission
to the thread is paused (as the client level).
Comment thread
MarkLark86 marked this conversation as resolved.

``CELERY_ASYNC_THREAD_RESTART_TASKS``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default: ``20``

The maximum number of active tasks in the async thread, before task submission is resumed.
Comment on lines +304 to +317
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what determines these values? I'd assume the hardware but what's the relation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default values are ones that I came up with. It is something we're going to have to try out, and might need different values depending on the environment (hardware capabilities, tasks/sec etc).

They're used for backpressure mitigation of the asyncio event loop. Without this, the event loop could become overloaded slowing down all tasks on the loop, essentially allowing unlimited number of concurrent tasks in the thread.


``CELERY_ASYNC_THREAD_BLOCK_SLEEP``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default: ``1``

The number of seconds to pause between checking the number of active tasks, once max tasks has been reached.

``HIGH_PRIORITY_QUEUE_ENABLED``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
13 changes: 13 additions & 0 deletions superdesk/celery_app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from .context_task import HybridAppContextTask, HybridAppContextWorkerTask, celery_wsgi_instance
from .serializer import CELERY_SERIALIZER_NAME, ContextAwareSerializerFactory
from .async_worker import CeleryAsyncWorkerTask
from .hooks import connect_signals

from superdesk.logging import logger
from superdesk.core import get_current_app, get_app_config
Expand Down Expand Up @@ -51,6 +53,17 @@ def init_celery(app: "SuperdeskEve") -> None:
# Registered here rather than at module level to keep startup ordering explicit.
register_type(ObjectId, "bson.objectid", str, ObjectId)

if not IS_BEAT_PROCESS and app.config.get("CELERY_USE_ASYNC_WORKER"):
celery.task_cls = CeleryAsyncWorkerTask
app.config.update(
{
"CELERY_WORKER_PREFETCH_MULTIPLIER": app.config.get("CELERY_ASYNC_PREFETCH_MULTIPLIER", 10),
"CELERY_WORKER_TASK_ACKS_LATE": True,
"CELERY_WORKER_TASK_ACKS_ON_FAILURE_OR_TIMEOUT": True,
}
Comment thread
petrjasek marked this conversation as resolved.
)
connect_signals()

celery_wsgi_instance.set(app)
celery.config_from_object(app.config, namespace="CELERY")
app.celery = celery
Expand Down
257 changes: 257 additions & 0 deletions superdesk/celery_app/async_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
from typing import Any, Protocol, TYPE_CHECKING, Callable
import logging
import asyncio
import threading
from inspect import isawaitable
from time import sleep

import celery
from quart import has_app_context
import werkzeug

from superdesk.errors import SuperdeskError
from .context_task import HybridAppContextWorkerTask

if TYPE_CHECKING:
from superdesk.factory.app import SuperdeskApp
else:
SuperdeskApp = Any


logger = logging.getLogger(__name__)


class ThreadData(Protocol):
worker: "CeleryAsyncWorkerThread"


class CeleryAsyncWorkerThread(threading.Thread):
"""
Manages an asynchronous event loop thread for handling Celery tasks.

This class provides functionality to handle execution of Celery tasks within a
dedicated asyncio event loop running in a background thread. It is responsible
for ensuring proper lifecycle management of the event loop, handling task
submission, limiting the number of active tasks, and performing cleanup of
background tasks on shutdown.
"""

#: Thread local storage to store the worker instance (only 1 per process allowed)
_thread_local: ThreadData = threading.local()

#: The asyncio event loop used for executing tasks.
_loop: asyncio.AbstractEventLoop | None = None

#: Event to signal when the loop is ready to run tasks.
_loop_ready: threading.Event = threading.Event()

#: The number of currently active tasks.
_num_tasks: int = 0

#: A tuple of exception types that are treated as application errors when running Celery tasks.
app_errors = (SuperdeskError, werkzeug.exceptions.InternalServerError)

#: The Superdesk application instance used for pushing the application context.
wsgi_app: SuperdeskApp

#: The maximum number of active tasks allowed before task submission is paused.
_max_tasks: int

#: The maximum number of active tasks allowed before task submission is resumed.
_restart_tasks: int

#: The time, in seconds, to wait before checking the number of active tasks.
_monitor_sleep: float

def __init__(self, *args, **kwargs):
super().__init__(
*args,
name="CeleryAsyncWorkerThread",
daemon=True,
**(kwargs or {}),
Comment thread
MarkLark86 marked this conversation as resolved.
)
import superdesk

assert superdesk.app is not None, "Superdesk app is not initialized"
self.wsgi_app = superdesk.app

self._max_tasks = self.wsgi_app.config.get("CELERY_ASYNC_THREAD_MAX_TASKS", 100)
self._restart_tasks = self.wsgi_app.config.get("CELERY_ASYNC_THREAD_RESTART_TASKS", 20)
self._monitor_sleep = self.wsgi_app.config.get("CELERY_ASYNC_THREAD_BLOCK_SLEEP", 1)

@classmethod
def get_instance(cls) -> "CeleryAsyncWorkerThread":
"""
Provides a thread-safe singleton instance of CeleryAsyncWorkerThread. Ensures that only one
instance of the worker thread exists and starts the thread if it is not already alive.

This method is used to manage the lifecycle of the worker thread and provides
a consistent entry point for acquiring an instance.

:return: The singleton instance of the CeleryAsyncWorkerThread.
"""

if not hasattr(cls._thread_local, "worker") or not cls._thread_local.worker.is_alive():
cls._thread_local.worker = cls()
cls._thread_local.worker.start()

return cls._thread_local.worker

def run(self):
"""
Main run function for the thread.

The `run` method initializes and starts a new asyncio event loop, making it the
current active event loop. It signals readiness once the loop is set up. This
method continuously runs the event loop until it is explicitly stopped, ensuring
all scheduled tasks are executed. Upon termination, it performs necessary cleanup
activities.
"""

self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop_ready.set()
try:
self._loop.run_forever()
finally:
self._cleanup()

async def run_task(self, task: Callable | celery.Task, *args, **kwargs) -> Any:
"""
Executes a given task asynchronously, ensuring proper handling of app context
and task execution lifecycle. Supports integration with Celery tasks.

:param task: The task to execute. It can be either a standard coroutine or a Celery task.
:param *args: Positional arguments to pass to the task upon execution.
:param **kwargs: Keyword arguments to pass to the task upon execution.
:return: The result of the task execution.
"""

if not has_app_context():
await self.wsgi_app.app_context().push()

try:
self._num_tasks += 1
if isinstance(task, celery.Task):
result = task.run(*args, **kwargs)
if isawaitable(result):
result = await result
task.backend.mark_as_done(task.request.id, result)
return result
else:
return await task if isawaitable(task) else task
except self.app_errors as e:
logger.exception("Error running Celery task")
if isinstance(task, celery.Task):
task.backend.mark_as_failure(task.request.id, e)

return None
finally:
self._num_tasks -= 1
Comment on lines +130 to +150

def submit(self, task: Callable | celery.Task, *args, **kwargs):
"""
Submits a task to be run in the thread using its event loop.

This method accepts a task along with its arguments and submits it to be
executed in the event loop after ensuring the loop is ready and the number
of active tasks does not exceed the defined limit.

If the thread has too many tasks currently running, this method will wait synchronously
until the number of tasks drops below the restart threshold.

:param task: The task to be executed.
:param *args: Positional arguments to be passed to the task.
:param **kwargs:Keyword arguments to be passed to the task.
"""

self._loop_ready.wait()
assert self._loop is not None, "Event loop is not ready"
self.limit_active_tasks()
result = self.run_task(task, *args, **kwargs)
return asyncio.run_coroutine_threadsafe(result, loop=self._loop)

def limit_active_tasks(self):
"""
Limits the number of active tasks to prevent overload.

This method monitors the number of currently active tasks and ensures it
does not exceed the maximum permissible threshold. If the number of active
tasks exceeds the threshold, it briefly halts further task execution and
waits for the task queue size to reduce to a stable level.
"""

if self._num_tasks < self._max_tasks:
return

logger.warning(f"Celery high load detected ({self._num_tasks}). Waiting for tasks to finish")
while True:
sleep(self._monitor_sleep)
if self._num_tasks < self._restart_tasks:
logger.info(f"Load stabilized, task queue size reduced to {self._num_tasks}")
break
Comment thread
MarkLark86 marked this conversation as resolved.

def stop(self):
"""
Stops the running event loop if it is active and waits for the corresponding thread
to finish execution.
"""

if self._loop is None or not self._loop.is_running():
return

self._loop.call_soon_threadsafe(self._loop.stop)
self.join()

Comment thread
MarkLark86 marked this conversation as resolved.
def _cleanup(self, timeout: float = 10, cancel_tasks: bool = False):
"""
Cleans up any background tasks associated with the running event loop.

This method ensures that pending asyncio tasks in the specified loop
are properly terminated. If desired, the tasks can be cancelled before
the cleanup process waits for their completion. It provides timeout
functionality to limit the waiting period for task termination, ensuring
that hanging tasks are logged and managed without indefinite blocking.

:param timeout: The maximum time, in seconds, to wait for background tasks to finish.
:param cancel_tasks: Whether to send cancellation signals to tasks before waiting for them to terminate.
"""

if self._loop is None or not self._loop.is_running():
return

pending = asyncio.all_tasks(loop=self._loop)

if not pending:
return

if cancel_tasks:
for task in pending:
task.cancel()

async def _async_stop():
try:
await asyncio.wait_for(asyncio.gather(*pending, return_exceptions=True), timeout=timeout)
except asyncio.TimeoutError:
still_running = [t for t in pending if not t.done()]
logger.warning(f"Background tasks shutdown timed out. {len(still_running)} tasks still active.")
self._cleanup(timeout, cancel_tasks=True)
finally:
Comment thread
MarkLark86 marked this conversation as resolved.
pass

cleanup_future = asyncio.run_coroutine_threadsafe(_async_stop(), loop=self._loop)
try:
cleanup_future.result()
except Exception:
logger.exception("Error during background tasks shutdown")


class CeleryAsyncWorkerTask(HybridAppContextWorkerTask):
def __call__(self, *args, **kwargs):
"""Executes the task function, determining if it should be run in this thread or the celery process."""

celery_async_thread = CeleryAsyncWorkerThread.get_instance()
if self._is_always_eager():
return celery_async_thread.run_task(self, *args, **kwargs)
else:
return celery_async_thread.submit(self, *args, **kwargs)
Comment thread
MarkLark86 marked this conversation as resolved.
Comment on lines +256 to +257
26 changes: 26 additions & 0 deletions superdesk/celery_app/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging
from os import getpid

from celery.signals import worker_process_init, worker_shutting_down
from .async_worker import CeleryAsyncWorkerThread

__all__ = ["start_async_thread", "shutdown_async_thread"]


logger = logging.getLogger(__name__)


def start_async_thread(sender, **kwargs):
logger.info(f"on_worker_init[{getpid()}]")
# Get the instance here, which will automatically start the thread
CeleryAsyncWorkerThread.get_instance()


def shutdown_async_thread(sender, **kwargs):
logger.info(f"on_worker_shutting_down[{getpid()}]")
CeleryAsyncWorkerThread.get_instance().stop()
Comment thread
MarkLark86 marked this conversation as resolved.


def connect_signals():
worker_process_init.connect(start_async_thread)
worker_shutting_down.connect(shutdown_async_thread)
Loading