Skip to content

Commit 3c79a16

Browse files
committed
Add endpoint for fetching currently running csv imports
1 parent 1097761 commit 3c79a16

File tree

4 files changed

+103
-9
lines changed

4 files changed

+103
-9
lines changed

.pylintrc

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ indent-string=' '
124124
max-line-length=100
125125

126126
# Maximum number of lines in a module
127-
max-module-lines=1000
127+
max-module-lines=1005
128128

129129
# List of optional constructs for which whitespace checking is disabled. `dict-
130130
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.

onadata/apps/api/viewsets/xform_viewset.py

+12
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
get_form_url,
9898
)
9999
from onadata.settings.common import CSV_EXTENSION, XLS_EXTENSIONS
100+
from onadata.libs.utils.async_status import get_active_tasks
100101

101102
ENKETO_AUTH_COOKIE = getattr(settings, "ENKETO_AUTH_COOKIE", "__enketo")
102103
ENKETO_META_UID_COOKIE = getattr(
@@ -876,6 +877,17 @@ def versions(self, request, *args, **kwargs):
876877

877878
return Response(data=serializer.data, status=status.HTTP_200_OK)
878879

880+
@action(methods=["GET"], detail=True)
881+
def active_imports(self, request, *args, **kwargs):
882+
"""Returns csv import async tasks that belong to this form"""
883+
xform = self.get_object()
884+
task_names = ["onadata.libs.utils.csv_import.submit_csv_async"]
885+
return Response(
886+
data=get_active_tasks(task_names, xform),
887+
status=status.HTTP_200_OK,
888+
content_type="application/json",
889+
)
890+
879891
@action(methods=["GET"], detail=True)
880892
def export_async(self, request, *args, **kwargs):
881893
"""Returns the status of an async export."""

onadata/libs/tests/utils/test_async_status.py

+36
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
from unittest.mock import MagicMock
2+
13
from onadata.apps.main.tests.test_base import TestBase
24
from celery import states
5+
from onadata.celeryapp import app
36
from onadata.libs.utils import async_status
7+
from onadata.apps.logger.models.xform import XForm
48

59

610
class TestAsyncStatus(TestBase):
@@ -34,3 +38,35 @@ def test_async_status(self):
3438
.get('error'))
3539
self.assertFalse(async_status.
3640
async_status(async_status.SUCCESSFUL).get('error'))
41+
42+
def test_get_active_tasks(self):
43+
"""test get_active_tasks"""
44+
xform = XForm()
45+
self.assertEqual(
46+
async_status.get_active_tasks(
47+
['onadata.libs.utils.csv_import.submit_csv_async'], xform
48+
),
49+
'[]',
50+
)
51+
inspect = MagicMock()
52+
inspect.active = MagicMock(
53+
return_value={
54+
'celery-worker@onadata-id-1': [
55+
{
56+
'args': [None, xform.pk],
57+
'id': '11',
58+
'time_start': '2021-02-26T03:28:19.512875-05:00',
59+
'name': 'onadata.libs.utils.csv_import.submit_csv_async',
60+
}
61+
]
62+
}
63+
)
64+
app.control.inspect = MagicMock(return_value=inspect)
65+
66+
self.assertEqual(
67+
async_status.get_active_tasks(
68+
['onadata.libs.utils.csv_import.submit_csv_async'], xform
69+
),
70+
'[{"job_uuid": "11", "time_start"'
71+
+ ': "2021-02-26T03:28:19.512875-05:00"}]',
72+
)

onadata/libs/utils/async_status.py

+54-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
1+
"""
2+
Utilities for celery asyncronous tasks
3+
"""
4+
import json
5+
6+
from typing import List
17
from celery import states
8+
from django.utils.translation import gettext
9+
10+
from onadata.celeryapp import app
11+
from onadata.apps.logger.models.xform import XForm
212

313
PENDING = 0
414
SUCCESSFUL = 1
@@ -7,21 +17,57 @@
717
RETRY = 4
818
STARTED = 5
919

10-
status_msg = {PENDING: 'PENDING', SUCCESSFUL: 'SUCCESS', FAILED: 'FAILURE',
11-
PROGRESS: 'PROGRESS', RETRY: 'RETRY', STARTED: 'STARTED'}
20+
status_msg = {
21+
PENDING: 'PENDING',
22+
SUCCESSFUL: 'SUCCESS',
23+
FAILED: 'FAILURE',
24+
PROGRESS: 'PROGRESS',
25+
RETRY: 'RETRY',
26+
STARTED: 'STARTED',
27+
}
1228

1329

1430
def celery_state_to_status(state):
15-
status_map = {states.PENDING: PENDING, states.STARTED: STARTED,
16-
states.RETRY: RETRY, states.SUCCESS: SUCCESSFUL,
17-
states.FAILURE: FAILED, 'PROGRESS': PROGRESS}
31+
status_map = {
32+
states.PENDING: PENDING,
33+
states.STARTED: STARTED,
34+
states.RETRY: RETRY,
35+
states.SUCCESS: SUCCESSFUL,
36+
states.FAILURE: FAILED,
37+
'PROGRESS': PROGRESS,
38+
}
1839
return status_map[state] if state in status_map else FAILED
1940

2041

2142
def async_status(status, error=None):
22-
status = {
23-
'job_status': status_msg[status]
24-
}
43+
status = {'job_status': status_msg[status]}
2544
if error:
2645
status['error'] = error
2746
return status
47+
48+
49+
def get_active_tasks(task_names: List[str], xform: XForm):
50+
"""Get active celery tasks"""
51+
inspect = app.control.inspect()
52+
inspect_active = inspect.active()
53+
data = []
54+
if inspect_active:
55+
task_list = list(inspect_active.values())
56+
data = list(
57+
filter(
58+
lambda task: xform.pk == task['args'][1] and task['name'] in task_names,
59+
task_list[0],
60+
)
61+
)
62+
63+
return json.dumps(
64+
list(
65+
map(
66+
lambda i: {
67+
'job_uuid': gettext(i['id']),
68+
'time_start': i['time_start'],
69+
},
70+
data,
71+
)
72+
)
73+
)

0 commit comments

Comments
 (0)