Skip to content

Commit 750cf6e

Browse files
committed
test(exports): remove time.sleep() from unit concurrency tests
1 parent aeb9edd commit 750cf6e

File tree

3 files changed

+45
-55
lines changed

3 files changed

+45
-55
lines changed

kobo/settings/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@
477477
),
478478
'ANONYMOUS_EXPORTS_GRACE_PERIOD': (
479479
30,
480-
'Number of minutes after which anonymous export tasks are cleaned up.'
480+
'Number of minutes after which anonymous export tasks are cleaned up.',
481481
),
482482
'LIMIT_ATTACHMENT_REMOVAL_GRACE_PERIOD': (
483483
90,
@@ -1453,7 +1453,7 @@ def dj_stripe_request_callback_method():
14531453
'cleanup-anonymous-exports': {
14541454
'task': 'kpi.tasks.cleanup_anonymous_exports',
14551455
'schedule': crontab(minute='*/5'),
1456-
'options': {'queue': 'kpi_low_priority_queue'}
1456+
'options': {'queue': 'kpi_low_priority_queue'},
14571457
},
14581458
# Schedule every 15 minutes
14591459
'refresh-user-report-snapshot': {

kpi/tasks.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,15 @@ def cleanup_anonymous_exports(**kwargs):
8989
minutes=config.ANONYMOUS_EXPORTS_GRACE_PERIOD
9090
)
9191

92-
old_export_ids = SubmissionExportTask.objects.filter(
93-
user=get_anonymous_user(),
94-
date_created__lt=cutoff_time,
95-
).exclude(
96-
status=ImportExportStatusChoices.PROCESSING
97-
).order_by('date_created').values_list('pk', flat=True)[:BATCH_SIZE]
92+
old_export_ids = (
93+
SubmissionExportTask.objects.filter(
94+
user=get_anonymous_user(),
95+
date_created__lt=cutoff_time,
96+
)
97+
.exclude(status=ImportExportStatusChoices.PROCESSING)
98+
.order_by('date_created')
99+
.values_list('pk', flat=True)[:BATCH_SIZE]
100+
)
98101

99102
if not old_export_ids:
100103
logging.info('No old anonymous exports to clean up.')
@@ -104,7 +107,7 @@ def cleanup_anonymous_exports(**kwargs):
104107
for export_id in old_export_ids:
105108
try:
106109
with transaction.atomic():
107-
# Acquire a row level lock without waiting
110+
# Acquire a row-level lock without waiting
108111
export = (
109112
SubmissionExportTask.objects.only('pk', 'uid', 'result')
110113
.select_for_update(nowait=True)
@@ -121,9 +124,7 @@ def cleanup_anonymous_exports(**kwargs):
121124
export.delete()
122125
deleted_count += 1
123126
except DatabaseError:
124-
logging.info(
125-
f'Export {export_id} is currently being processed. Skipping.'
126-
)
127+
logging.info(f'Export {export_id} is currently being processed. Skipping.')
127128
logging.info(f'Cleaned up {deleted_count} old anonymous exports.')
128129

129130

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import threading
2-
import time
32
from datetime import timedelta
43

54
from django.db import transaction
6-
from django.utils import timezone
75
from django.test import TransactionTestCase
6+
from django.utils import timezone
87

98
from kpi.models.import_export_task import (
109
ImportExportStatusChoices,
11-
SubmissionExportTask
10+
SubmissionExportTask,
1211
)
1312
from kpi.tasks import cleanup_anonymous_exports
1413
from kpi.utils.object_permission import get_anonymous_user
@@ -52,62 +51,52 @@ def test_processing_exports_are_not_deleted(self):
5251
Test that exports with PROCESSING status are never deleted
5352
"""
5453
processing_export = self._create_export_task(
55-
status=ImportExportStatusChoices.PROCESSING,
56-
minutes_old=100
54+
status=ImportExportStatusChoices.PROCESSING, minutes_old=100
5755
)
5856

5957
cleanup_anonymous_exports()
6058
self.assertTrue(
61-
SubmissionExportTask.objects.filter(
62-
uid=processing_export.uid
63-
).exists()
59+
SubmissionExportTask.objects.filter(uid=processing_export.uid).exists()
6460
)
6561

6662
def test_concurrency_locked_rows_are_skipped(self):
6763
exports = [self._create_export_task(minutes_old=120) for _ in range(5)]
68-
export_pks = [e.pk for e in exports]
69-
locked_pks = export_pks[:3]
70-
unlocked_pks = export_pks[3:]
71-
72-
def lock_and_hold():
73-
"""
74-
Acquire lock on first 3 exports and hold for 5 seconds,
75-
this will block cleanup from acquiring the lock on these rows and
76-
in the meantime other rows should be cleaned up
77-
"""
64+
locked_export = exports[0]
65+
not_locked_exports = exports[1:]
66+
67+
lock_acquired = threading.Event()
68+
release_lock = threading.Event()
69+
70+
def lock_row():
7871
with transaction.atomic():
79-
# Acquire lock on first 3 exports
80-
list(
81-
SubmissionExportTask.objects
82-
.select_for_update()
83-
.filter(pk__in=locked_pks)
72+
# Lock the row
73+
SubmissionExportTask.objects.select_for_update().get(
74+
pk=locked_export.pk
8475
)
8576

86-
# Hold lock for 5 seconds,
87-
# during this time cleanup should run
88-
time.sleep(5)
77+
# Signal that the lock is active
78+
lock_acquired.set()
79+
80+
# Wait until the main thread signals we can release the lock
81+
release_lock.wait()
8982

90-
# Start thread that will hold the lock
91-
lock_thread = threading.Thread(target=lock_and_hold, daemon=True)
92-
lock_thread.start()
83+
t = threading.Thread(target=lock_row)
84+
t.start()
9385

94-
# Give thread time to acquire lock
95-
time.sleep(1)
86+
# Wait for the row-level lock to actually be acquired
87+
lock_acquired.wait()
9688

97-
# Run cleanup while lock is held
89+
# Now cleanup should try select_for_update(nowait=True), causing DatabaseError
9890
cleanup_anonymous_exports()
9991

100-
# Wait for thread to finish
101-
lock_thread.join(timeout=10)
92+
# Let the locking thread finish its transaction
93+
release_lock.set()
94+
t.join()
10295

103-
# Verify locked rows were not deleted
104-
remaining_locked = SubmissionExportTask.objects.filter(
105-
pk__in=locked_pks
106-
).count()
107-
self.assertEqual(remaining_locked, 3)
96+
# Verify the locked row was not deleted
97+
assert SubmissionExportTask.objects.filter(pk=locked_export.pk).exists()
10898

10999
# Verify unlocked rows were deleted
110-
remaining_unlocked = SubmissionExportTask.objects.filter(
111-
pk__in=unlocked_pks
112-
).count()
113-
self.assertEqual(remaining_unlocked, 0)
100+
assert not SubmissionExportTask.objects.filter(
101+
pk__in=[e.pk for e in not_locked_exports]
102+
).exists()

0 commit comments

Comments
 (0)