Skip to content

Commit aeb9edd

Browse files
committed
Update order_by and add unit test for concurrency
1 parent 45dbc3d commit aeb9edd

File tree

2 files changed

+55
-3
lines changed

2 files changed

+55
-3
lines changed

kpi/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def cleanup_anonymous_exports(**kwargs):
9494
date_created__lt=cutoff_time,
9595
).exclude(
9696
status=ImportExportStatusChoices.PROCESSING
97-
).order_by('id').values_list('pk', flat=True)[:BATCH_SIZE]
97+
).order_by('date_created').values_list('pk', flat=True)[:BATCH_SIZE]
9898

9999
if not old_export_ids:
100100
logging.info('No old anonymous exports to clean up.')

kpi/tests/test_cleanup_anonymous_exports.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import threading
2+
import time
13
from datetime import timedelta
24

5+
from django.db import transaction
36
from django.utils import timezone
4-
from django.test import TestCase
7+
from django.test import TransactionTestCase
58

69
from kpi.models.import_export_task import (
710
ImportExportStatusChoices,
@@ -11,7 +14,7 @@
1114
from kpi.utils.object_permission import get_anonymous_user
1215

1316

14-
class AnonymousExportCleanupTestCase(TestCase):
17+
class AnonymousExportCleanupTestCase(TransactionTestCase):
1518
def _create_export_task(
1619
self, status=ImportExportStatusChoices.COMPLETE, minutes_old=60
1720
):
@@ -59,3 +62,52 @@ def test_processing_exports_are_not_deleted(self):
5962
uid=processing_export.uid
6063
).exists()
6164
)
65+
66+
def test_concurrency_locked_rows_are_skipped(self):
67+
exports = [self._create_export_task(minutes_old=120) for _ in range(5)]
68+
export_pks = [e.pk for e in exports]
69+
locked_pks = export_pks[:3]
70+
unlocked_pks = export_pks[3:]
71+
72+
def lock_and_hold():
73+
"""
74+
Acquire lock on first 3 exports and hold for 5 seconds,
75+
this will block cleanup from acquiring the lock on these rows and
76+
in the meantime other rows should be cleaned up
77+
"""
78+
with transaction.atomic():
79+
# Acquire lock on first 3 exports
80+
list(
81+
SubmissionExportTask.objects
82+
.select_for_update()
83+
.filter(pk__in=locked_pks)
84+
)
85+
86+
# Hold lock for 5 seconds,
87+
# during this time cleanup should run
88+
time.sleep(5)
89+
90+
# Start thread that will hold the lock
91+
lock_thread = threading.Thread(target=lock_and_hold, daemon=True)
92+
lock_thread.start()
93+
94+
# Give thread time to acquire lock
95+
time.sleep(1)
96+
97+
# Run cleanup while lock is held
98+
cleanup_anonymous_exports()
99+
100+
# Wait for thread to finish
101+
lock_thread.join(timeout=10)
102+
103+
# Verify locked rows were not deleted
104+
remaining_locked = SubmissionExportTask.objects.filter(
105+
pk__in=locked_pks
106+
).count()
107+
self.assertEqual(remaining_locked, 3)
108+
109+
# Verify unlocked rows were deleted
110+
remaining_unlocked = SubmissionExportTask.objects.filter(
111+
pk__in=unlocked_pks
112+
).count()
113+
self.assertEqual(remaining_unlocked, 0)

0 commit comments

Comments
 (0)