Skip to content

Commit 46d1d71

Browse files
authored
Merge branch 'develop' into develop
2 parents 4743922 + 08c824b commit 46d1d71

File tree

4 files changed

+41
-61
lines changed

4 files changed

+41
-61
lines changed

README.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,14 @@ In order to use this feature you need to enable the celery plugin and configure
9090
use `current_request_id()` from inside your worker
9191

9292
```python
93-
from flask_log_request_id.extras.celery import RequestIDAwareTask
93+
from flask_log_request_id.extras.celery import enable_request_id_propagation
9494
from flask_log_request_id import current_request_id
9595
from celery.app import Celery
9696
import logging
9797

98-
celery = Celery(task_cls=RequestIDAwareTask)
98+
celery = Celery()
99+
enable_request_id_propagation(celery) # << This step here is critical to propagate request-id to workers
100+
99101
app = Flask()
100102

101103
@celery.task()

examples/server_with_celery/example_app/celery.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
from celery import Celery
2-
from flask_log_request_id.extras.celery import RequestIDAwareTask
1+
from celery import Celery, signals
2+
from flask_log_request_id.extras.celery import enable_request_id_propagation
3+
4+
celery = Celery()
5+
6+
# You need to enable propagation on celery application
7+
enable_request_id_propagation(celery)
38

4-
celery = Celery(task_cls=RequestIDAwareTask)

flask_log_request_id/extras/celery.py

+15-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from celery import Task, current_task
1+
from celery import current_task, signals
22
import logging as _logging
33

44
from ..request_id import current_request_id
@@ -9,20 +9,25 @@
99
logger = _logging.getLogger(__name__)
1010

1111

12-
class RequestIDAwareTask(Task):
12+
def enable_request_id_propagation(celery_app):
1313
"""
14-
Task base class that injects request id to task request object with key 'x_request_id'.
14+
Will attach signal on celery application in order to propagate
15+
current request id to workers
16+
:param celery_app: The celery application
1517
"""
18+
signals.before_task_publish.connect(on_before_publish_insert_request_id_header)
1619

17-
def apply_async(self, *args, **kwargs):
18-
# Set default value for 'headers' argument
19-
if 'headers' not in kwargs or kwargs['headers'] is None:
20-
kwargs['headers'] = {}
20+
21+
def on_before_publish_insert_request_id_header(headers, **kwargs):
22+
"""
23+
This function is meant to be used as signal processor for "before_task_publish".
24+
:param Dict headers: The headers of the message
25+
:param kwargs: Any extra keyword arguments
26+
"""
27+
if _CELERY_X_HEADER not in headers:
2128
request_id = current_request_id()
29+
headers[_CELERY_X_HEADER] = request_id
2230
logger.debug("Forwarding request_id '{}' to the task consumer.".format(request_id))
23-
kwargs['headers'][_CELERY_X_HEADER] = request_id
24-
25-
return super(RequestIDAwareTask, self).apply_async(*args, **kwargs)
2631

2732

2833
def ctx_celery_task_get_request_id():

tests/extras/celery_tests.py

+15-46
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
from celery import Celery
2-
import unittest
31
import mock
2+
import unittest
43

5-
6-
from flask_log_request_id.extras.celery import (
7-
RequestIDAwareTask, ctx_celery_task_get_request_id, ExecutedOutsideContext)
4+
from celery import Celery
5+
from flask_log_request_id.extras.celery import (ExecutedOutsideContext,
6+
on_before_publish_insert_request_id_header,
7+
ctx_celery_task_get_request_id)
88

99

1010
class MockedTask(object):
@@ -22,58 +22,27 @@ def apply_async(self, *args, **kwargs):
2222
class CeleryIntegrationTestCase(unittest.TestCase):
2323

2424
@mock.patch('flask_log_request_id.extras.celery.current_request_id')
25-
def test_mixin_injection(self, mocked_current_request_id):
26-
27-
patcher = mock.patch.object(RequestIDAwareTask, '__bases__', (MockedTask,))
28-
29-
with patcher:
30-
patcher.is_local = True
31-
32-
mocked_current_request_id.return_value = 15
33-
task = RequestIDAwareTask()
34-
task.apply_async('test', foo='bar')
35-
self.assertEqual(
36-
task.apply_async_called['args'],
37-
('test', ))
25+
def test_enable_request_id_propagation(self, mocked_current_request_id):
26+
mocked_current_request_id.return_value = 15
3827

39-
self.assertDictEqual(
40-
task.apply_async_called['kwargs'], {
41-
'headers': {'x_request_id': 15},
42-
'foo': 'bar'
43-
})
44-
45-
@mock.patch('flask_log_request_id.extras.celery.current_request_id')
46-
def test_issue21_called_with_headers_None(self, mocked_current_request_id):
47-
48-
patcher = mock.patch.object(RequestIDAwareTask, '__bases__', (MockedTask,))
49-
50-
with patcher:
51-
patcher.is_local = True
52-
53-
mocked_current_request_id.return_value = 15
54-
task = RequestIDAwareTask()
55-
task.apply_async('test', foo='bar', headers=None)
56-
self.assertEqual(
57-
task.apply_async_called['args'],
58-
('test', ))
59-
60-
self.assertDictEqual(
61-
task.apply_async_called['kwargs'], {
62-
'headers': {'x_request_id': 15},
63-
'foo': 'bar'
64-
})
28+
headers = {}
29+
on_before_publish_insert_request_id_header(headers=headers)
30+
self.assertDictEqual(
31+
{
32+
'x_request_id': 15
33+
},
34+
headers)
6535

6636
@mock.patch('flask_log_request_id.extras.celery.current_task')
6737
def test_ctx_fetcher_outside_context(self, mocked_current_task):
68-
6938
mocked_current_task._get_current_object.return_value = None
7039
with self.assertRaises(ExecutedOutsideContext):
7140
ctx_celery_task_get_request_id()
7241

7342
@mock.patch('flask_log_request_id.extras.celery.current_task')
7443
def test_ctx_fetcher_inside_context(self, mocked_current_task):
7544
mocked_current_task._get_current_object.return_value = True
76-
mocked_current_task.request.get.side_effect = lambda a, default: {'x_request_id': 15, 'other':'bar'}[a]
45+
mocked_current_task.request.get.side_effect = lambda a, default: {'x_request_id': 15, 'other': 'bar'}[a]
7746

7847
self.assertEqual(ctx_celery_task_get_request_id(), 15)
7948

0 commit comments

Comments
 (0)