[SDESK-7917] Feature: Celery async worker runtime#3225
Conversation
|
@petrjasek @eos87 I created a test instance so we can test it out before merging. So far publishing seems to work 😄 I can confirm it's using the new async worker thread, from from the log entry: |
| ``CELERY_ASYNC_THREAD_MAX_TASKS`` | ||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
|
||
| Default: ``100`` | ||
|
|
||
| The maximum number of active tasks in the async thread. If more tasks are added, task submission | ||
| to the thread is paused (as the client level). | ||
|
|
||
| ``CELERY_ASYNC_THREAD_RESTART_TASKS`` | ||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
|
||
| Default: ``20`` | ||
|
|
||
| The maximum number of active tasks in the async thread, before task submission is resumed. |
There was a problem hiding this comment.
what determines these values? I'd assume the hardware but what's the relation?
There was a problem hiding this comment.
The default values are ones that I came up with. It is something we're going to have to try out, and might need different values depending on the environment (hardware capabilities, tasks/sec etc).
They're used for backpressure mitigation of the asyncio event loop. Without this, the event loop could become overloaded slowing down all tasks on the loop, essentially allowing unlimited number of concurrent tasks in the thread.
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “async Celery worker runtime” that offloads task execution onto a dedicated asyncio event loop thread, adds lifecycle hooks to start/stop that thread with Celery worker processes, and adds tests + configuration/docs to support the feature flag.
Changes:
- Added
CeleryAsyncWorkerThread+CeleryAsyncWorkerTaskto execute Celery tasks on an asyncio loop and apply basic high/low watermark backpressure. - Wired Celery worker signals to start/stop the async worker thread and updated Celery initialization when
CELERY_USE_ASYNC_WORKERis enabled. - Added background-worker integration tests and documented new Celery async worker settings.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
superdesk/celery_app/async_worker.py |
Introduces the async worker thread + task wrapper (core runtime behavior). |
superdesk/celery_app/hooks.py |
Connects Celery worker lifecycle signals to start/stop the async thread. |
superdesk/celery_app/__init__.py |
Enables the async runtime via config flag and adjusts Celery configuration. |
superdesk/default_settings.py |
Adds new async worker settings and updates Celery serializer/protocol defaults. |
superdesk/tests/worker.py |
Adds a test Celery app + helper to run a worker subprocess for integration tests. |
tests/celery_app/async_worker.py |
Adds async worker integration/unit tests (including eager-path coverage). |
docs/settings.rst |
Documents new CELERY_USE_ASYNC_WORKER and related async thread settings. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if not has_app_context(): | ||
| await self.wsgi_app.app_context().push() | ||
|
|
||
| try: | ||
| self._num_tasks += 1 | ||
| if isinstance(task, celery.Task): | ||
| result = task.run(*args, **kwargs) | ||
| if isawaitable(result): | ||
| result = await result | ||
| task.backend.mark_as_done(task.request.id, result) | ||
| return result | ||
| else: | ||
| return await task if isawaitable(task) else task | ||
| except self.app_errors as e: | ||
| logger.exception("Error running Celery task") | ||
| if isinstance(task, celery.Task): | ||
| task.backend.mark_as_failure(task.request.id, e) | ||
|
|
||
| return None | ||
| finally: | ||
| self._num_tasks -= 1 |
eos87
left a comment
There was a problem hiding this comment.
Code looks good to me. Left a couple of minor comments.
Purpose
Originally celery workers run their tasks procedurally per thread/process, not starting the next task until the current one is finished. This causes a performance bottleneck, especially when many tasks are queued and async I/O is in use.
What has changed
CeleryAsyncWorkerThreadthread classCeleryAsyncWorkerTaskcelery task, extending from our existing ones, soeagerstill works with the newer runtime.Resolves: SDESK-7917