From 28eefe5f813aa8089c5df51a0164d879bfe84144 Mon Sep 17 00:00:00 2001 From: rajpatel24 Date: Mon, 5 Aug 2024 18:07:29 +0530 Subject: [PATCH] 862 Avoid duplicate submissions to be saved --- .github/workflows/pytest.yml | 1 + .../apps/api/tests/fixtures/users.json | 18 ++ .../viewsets/test_xform_submission_api.py | 123 +++++++++ kobo/apps/openrosa/apps/logger/exceptions.py | 31 ++- .../commands/clean_duplicated_submissions.py | 233 +++++++++++------- .../openrosa/apps/logger/models/__init__.py | 1 - .../apps/openrosa/apps/logger/models/xform.py | 2 +- .../logger/tests/test_simple_submission.py | 4 +- .../apps/logger/xform_instance_parser.py | 60 +++-- kobo/apps/openrosa/libs/utils/logger_tools.py | 112 +++++---- kobo/settings/testing.py | 2 + 11 files changed, 419 insertions(+), 168 deletions(-) create mode 100644 kobo/apps/openrosa/apps/api/tests/fixtures/users.json diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 5bc93e6559..7dddb7443e 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -20,6 +20,7 @@ jobs: CACHE_URL: redis://localhost:6379/3 ENKETO_REDIS_MAIN_URL: redis://localhost:6379/0 KOBOCAT_MEDIA_ROOT: /tmp/test_media + SKIP_TESTS_WITH_CONCURRENCY: "True" strategy: matrix: python-version: ['3.8', '3.10'] diff --git a/kobo/apps/openrosa/apps/api/tests/fixtures/users.json b/kobo/apps/openrosa/apps/api/tests/fixtures/users.json new file mode 100644 index 0000000000..72ad40c86a --- /dev/null +++ b/kobo/apps/openrosa/apps/api/tests/fixtures/users.json @@ -0,0 +1,18 @@ +[ + { + "model": "auth.user", + "pk": 2, + "fields": { + "username": "bob", + "password": "pbkdf2_sha256$260000$T1eA0O4Ub6c6FAaCsb0fqU$6vX4qMw1VV9tMXFf1d9pL/5z5/2T1MQYYn7vB3p+I2Y=", + "email": "bob@columbia.edu", + "first_name": "bob", + "last_name": "bob", + "is_active": true, + "is_staff": false, + "is_superuser": false, + "last_login": null, + "date_joined": "2015-02-12T19:52:14.406Z" + } + } +] diff --git a/kobo/apps/openrosa/apps/api/tests/viewsets/test_xform_submission_api.py b/kobo/apps/openrosa/apps/api/tests/viewsets/test_xform_submission_api.py index 8bc0a561b7..80001a7d1f 100644 --- a/kobo/apps/openrosa/apps/api/tests/viewsets/test_xform_submission_api.py +++ b/kobo/apps/openrosa/apps/api/tests/viewsets/test_xform_submission_api.py @@ -1,12 +1,24 @@ # coding: utf-8 +import multiprocessing import os import uuid +from collections import defaultdict +from functools import partial +import pytest +import requests import simplejson as json from django.conf import settings from django.contrib.auth.models import AnonymousUser from django.core.files.uploadedfile import InMemoryUploadedFile +from django.test.testcases import LiveServerTestCase +from django.urls import reverse from django_digest.test import DigestAuth +from rest_framework.authtoken.models import Token + +from kobo.apps.kobo_auth.shortcuts import User +from kobo.apps.openrosa.apps.main.models import UserProfile +from kobo.apps.openrosa.libs.tests.mixins.request_mixin import RequestMixin from kobo.apps.openrosa.libs.utils.guardian import assign_perm from kobo_service_account.utils import get_request_headers from rest_framework import status @@ -15,6 +27,7 @@ TestAbstractViewSet from kobo.apps.openrosa.apps.api.viewsets.xform_submission_api import XFormSubmissionApi from kobo.apps.openrosa.apps.logger.models import Attachment +from kobo.apps.openrosa.apps.main import tests as main_tests from kobo.apps.openrosa.libs.constants import ( CAN_ADD_SUBMISSIONS ) @@ -510,3 +523,113 @@ def test_submission_blocking_flag(self): ) response = self.view(request, username=username) self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + +class ConcurrentSubmissionTestCase(RequestMixin, LiveServerTestCase): + """ + Inherit from LiveServerTestCase to be able to test concurrent requests + to submission endpoint in different transactions (and different processes). + Otherwise, DB is populated only on the first request but still empty on + subsequent ones. + """ + fixtures = ['kobo/apps/openrosa/apps/api/tests/fixtures/users'] + + def setUp(self): + self.user = User.objects.get(username='bob') + self.token, _ = Token.objects.get_or_create(user=self.user) + new_profile, created = UserProfile.objects.get_or_create( + user=self.user + ) + + def publish_xls_form(self): + path = os.path.join( + settings.OPENROSA_APP_DIR, + 'apps', + 'main', + 'tests', + 'fixtures', + 'transportation', + 'transportation.xls', + ) + + xform_list_url = reverse('xform-list') + service_account_meta = self.get_meta_from_headers( + get_request_headers(self.user.username) + ) + service_account_meta['HTTP_HOST'] = settings.TEST_HTTP_HOST + + with open(path, 'rb') as xls_file: + post_data = {'xls_file': xls_file} + response = self.client.post(xform_list_url, data=post_data, **service_account_meta) + + assert response.status_code == status.HTTP_201_CREATED + + @pytest.mark.skipif( + settings.SKIP_TESTS_WITH_CONCURRENCY, reason='GitLab does not seem to support multi-processes' + ) + def test_post_concurrent_same_submissions(self): + DUPLICATE_SUBMISSIONS_COUNT = 2 # noqa + + self.publish_xls_form() + username = 'bob' + survey = 'transport_2011-07-25_19-05-49' + results = defaultdict(int) + + with multiprocessing.Pool() as pool: + for result in pool.map( + partial( + submit_data, + live_server_url=self.live_server_url, + survey_=survey, + username_=username, + token_=self.token.key + ), + range(DUPLICATE_SUBMISSIONS_COUNT), + ): + results[result] += 1 + + assert results[status.HTTP_201_CREATED] == 1 + assert results[status.HTTP_409_CONFLICT] == DUPLICATE_SUBMISSIONS_COUNT - 1 + + +def submit_data(identifier, survey_, username_, live_server_url, token_): + """ + Submit data to live server. + + It has to be outside `ConcurrentSubmissionTestCase` class to be pickled by + `multiprocessing.Pool().map()`. + """ + media_file = '1335783522563.jpg' + main_directory = os.path.dirname(main_tests.__file__) + path = os.path.join( + main_directory, + 'fixtures', + 'transportation', + 'instances', + survey_, + media_file, + ) + with open(path, 'rb') as f: + f = InMemoryUploadedFile( + f, + 'media_file', + media_file, + 'image/jpg', + os.path.getsize(path), + None, + ) + submission_path = os.path.join( + main_directory, + 'fixtures', + 'transportation', + 'instances', + survey_, + f'{survey_}.xml', + ) + with open(submission_path) as sf: + files = {'xml_submission_file': sf, 'media_file': f} + headers = {'Authorization': f'Token {token_}'} + response = requests.post( + f'{live_server_url}/{username_}/submission', files=files, headers=headers + ) + return response.status_code diff --git a/kobo/apps/openrosa/apps/logger/exceptions.py b/kobo/apps/openrosa/apps/logger/exceptions.py index 74b31a19bd..1aa2bbb074 100644 --- a/kobo/apps/openrosa/apps/logger/exceptions.py +++ b/kobo/apps/openrosa/apps/logger/exceptions.py @@ -1,5 +1,11 @@ # coding: utf-8 -from django.utils.translation import gettext as t +class ConflictingXMLHashInstanceError(Exception): + pass + + +class DuplicateInstanceError(Exception): + def __init__(self, message='Duplicate Instance'): + super().__init__(message) class DuplicateUUIDError(Exception): @@ -10,5 +16,28 @@ class FormInactiveError(Exception): pass +class InstanceEmptyError(Exception): + def __init__(self, message='Empty instance'): + super().__init__(message) + + +class InstanceInvalidUserError(Exception): + def __init__(self, message='Could not determine the user'): + super().__init__(message) + + +class InstanceMultipleNodeError(Exception): + pass + + +class InstanceParseError(Exception): + def __init__(self, message='The instance could not be parsed'): + super().__init__(message) + + class TemporarilyUnavailableError(Exception): pass + + +class XLSFormError(Exception): + pass diff --git a/kobo/apps/openrosa/apps/logger/management/commands/clean_duplicated_submissions.py b/kobo/apps/openrosa/apps/logger/management/commands/clean_duplicated_submissions.py index 5ad0d71b8c..e16c7e7991 100644 --- a/kobo/apps/openrosa/apps/logger/management/commands/clean_duplicated_submissions.py +++ b/kobo/apps/openrosa/apps/logger/management/commands/clean_duplicated_submissions.py @@ -1,29 +1,25 @@ #!/usr/bin/env python # vim: ai ts=4 sts=4 et sw=4 fileencoding=utf-8 # coding: utf-8 +from collections import defaultdict + from django.conf import settings -from django.core.management.base import BaseCommand, CommandError +from django.core.management.base import BaseCommand from django.db import transaction -from django.db.models import Sum +from django.db.models import F from django.db.models.aggregates import Count -from django.utils import timezone +from kobo.apps.openrosa.apps.logger.models import DailyXFormSubmissionCounter, MonthlyXFormSubmissionCounter from kobo.apps.openrosa.apps.logger.models.attachment import Attachment from kobo.apps.openrosa.apps.logger.models.instance import Instance from kobo.apps.openrosa.apps.viewer.models.parsed_instance import ParsedInstance -from kobo.apps.openrosa.apps.logger.models.xform import XForm -from kobo.apps.openrosa.libs.utils.common_tags import MONGO_STRFTIME +from kobo.apps.openrosa.apps.logger.xform_instance_parser import set_meta class Command(BaseCommand): help = "Deletes duplicated submissions (i.e same `uuid` and same `xml`)" - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.__vaccuum = False - self.__users = set([]) - def add_arguments(self, parser): super().add_arguments(parser) @@ -39,11 +35,20 @@ def add_arguments(self, parser): help="Specify a XForm's `id_string` to clean up only this form", ) + parser.add_argument( + "--delete-unique-uuids", + action='store_true', + default=False, + help="Delete duplicates with identical uuid", + ) + def handle(self, *args, **options): username = options['user'] xform_id_string = options['xform'] + self._delete_unique_uuids = options['delete_unique_uuids'] + self._verbosity = options['verbosity'] - # Retrieve all instances with the same `uuid`. + # Retrieve all instances with the same `xml_hash`. query = Instance.objects if xform_id_string: query = query.filter(xform__id_string=xform_id_string) @@ -51,87 +56,137 @@ def handle(self, *args, **options): if username: query = query.filter(xform__user__username=username) - query = query.values_list('uuid', flat=True)\ - .annotate(count_uuid=Count('uuid'))\ - .filter(count_uuid__gt=1)\ + query = ( + query.values_list('xml_hash', flat=True) + .annotate(count_xml_hash=Count('xml_hash')) + .filter(count_xml_hash__gt=1) .distinct() + ) - for uuid in query.all(): - - duplicated_query = Instance.objects.filter(uuid=uuid) - - instances_with_same_uuid = duplicated_query.values_list('id', - 'xml_hash')\ - .order_by('xml_hash', 'date_created') - xml_hash_ref = None - instance_id_ref = None - - duplicated_instance_ids = [] - for instance_with_same_uuid in instances_with_same_uuid: - instance_id = instance_with_same_uuid[0] - instance_xml_hash = instance_with_same_uuid[1] - - if instance_xml_hash != xml_hash_ref: - self.__clean_up(instance_id_ref, - duplicated_instance_ids) - xml_hash_ref = instance_xml_hash - instance_id_ref = instance_id - duplicated_instance_ids = [] - continue - - duplicated_instance_ids.append(instance_id) - - self.__clean_up(instance_id_ref, - duplicated_instance_ids) - - if not self.__vaccuum: - self.stdout.write('No instances have been purged.') - else: - # Update number of submissions for each user. - for user_ in list(self.__users): - result = XForm.objects.filter(user_id=user_.id)\ - .aggregate(count=Sum('num_of_submissions')) - user_.profile.num_of_submissions = result['count'] - self.stdout.write( - "\tUpdating `{}`'s number of submissions".format( - user_.username)) - user_.profile.save(update_fields=['num_of_submissions']) + for xml_hash in query.iterator(): + + duplicates_queryset = Instance.objects.filter(xml_hash=xml_hash) + + instances_with_same_xml_hash = duplicates_queryset.values( + 'id', 'uuid', 'xform_id' + ).order_by('xform_id', 'uuid', 'date_created') + + duplicates_by_xform = self._get_duplicates_by_xform( + instances_with_same_xml_hash + ) + + for ( + xform_id, + instances_with_same_xml_hash, + ) in duplicates_by_xform.items(): + instance_ref = instances_with_same_xml_hash.pop(0) + self._clean_up(instance_ref, instances_with_same_xml_hash) + + def _clean_up(self, instance_ref, duplicated_instances): + + if duplicated_instances: + + if self._replace_duplicates(duplicated_instances): + return + + self._delete_duplicates(instance_ref, duplicated_instances) + + def _delete_duplicates( + self, instance_ref: dict, duplicated_instances: list[dict] + ): + + duplicated_instance_ids = [i['id'] for i in duplicated_instances] + + if self._verbosity >= 1: + self.stdout.write( + f"Deleting instance #{instance_ref['id']} duplicates…" + ) + + with transaction.atomic(): + # Update attachments + Attachment.objects.select_for_update().filter( + instance_id__in=duplicated_instance_ids + ).update(instance_id=instance_ref['id']) + if self._verbosity >= 2: self.stdout.write( - '\t\tDone! New number: {}'.format(result['count'])) - - def __clean_up(self, instance_id_ref, duplicated_instance_ids): - if instance_id_ref is not None and len(duplicated_instance_ids) > 0: - self.__vaccuum = True - with transaction.atomic(): - self.stdout.write('Link attachments to instance #{}'.format( - instance_id_ref)) - # Update attachments - Attachment.objects.select_for_update()\ - .filter(instance_id__in=duplicated_instance_ids)\ - .update(instance_id=instance_id_ref) - - # Update Mongo - main_instance = Instance.objects.select_for_update()\ - .get(id=instance_id_ref) - main_instance.parsed_instance.save() - - self.stdout.write('\tPurging instances: {}'.format( - duplicated_instance_ids)) - Instance.objects.select_for_update()\ - .filter(id__in=duplicated_instance_ids).delete() - ParsedInstance.objects.select_for_update()\ - .filter(instance_id__in=duplicated_instance_ids).delete() - settings.MONGO_DB.instances.remove( - {'_id': {'$in': duplicated_instance_ids}} + f"\tLinked attachments to instance #{instance_ref['id']}" ) - # Update number of submissions - xform = main_instance.xform - self.stdout.write( - '\tUpdating number of submissions of XForm #{} ({})'.format( - xform.id, xform.id_string)) - xform_submission_count = xform.submission_count(force_update=True) + + # Update Mongo + main_instance = Instance.objects.select_for_update().get( + id=instance_ref['id'] + ) + main_instance.parsed_instance.save() + + ParsedInstance.objects.filter( + instance_id__in=duplicated_instance_ids + ).delete() + + instance_queryset = Instance.objects.filter( + id__in=duplicated_instance_ids + ) + # update counters + for instance in instance_queryset.values( + 'xform_id', 'date_created__date', 'xform__user_id' + ): + MonthlyXFormSubmissionCounter.objects.filter( + year=instance['date_created__date'].year, + month=instance['date_created__date'].month, + user_id=instance['xform__user_id'], + xform_id=instance['xform_id'], + ).update(counter=F('counter') - 1) + + DailyXFormSubmissionCounter.objects.filter( + date=instance['date_created__date'], + xform_id=instance['xform_id'], + ).update(counter=F('counter') - 1) + + instance_queryset.delete() + + settings.MONGO_DB.instances.remove( + {'_id': {'$in': duplicated_instance_ids}} + ) + if self._verbosity > 1: self.stdout.write( - '\t\tDone! New number: {}'.format(xform_submission_count)) - self.stdout.write('') + f'\tPurged instance IDs: {duplicated_instance_ids}' + ) + + def _replace_duplicates(self, duplicated_instances: list) -> bool: + uniq__uuids = set([i['uuid'] for i in duplicated_instances]) + + if len(uniq__uuids) > 1 or self._delete_unique_uuids: + return False + + duplicates = [] + + for idx, duplicated_instance in enumerate(duplicated_instances): + try: + instance = Instance.objects.get(pk=duplicated_instance['id']) + except Instance.DoesNotExist: + pass + else: + if self._verbosity > 1: + self.stdout.write( + f'\tUpdating instance #{instance.pk} ({instance.uuid})…' + ) + + instance.uuid = f'DUPLICATE {idx} {instance.uuid}' + instance.xml = set_meta( + instance.xml, 'instanceID', instance.uuid + ) + instance.xml_hash = instance.get_hash(instance.xml) + duplicates.append(instance) + + if duplicates: + Instance.objects.bulk_update( + duplicates, fields=['uuid', 'xml', 'xml_hash'] + ) + + return True + + def _get_duplicates_by_xform(self, queryset): + duplicates_by_xform = defaultdict(list) + for record in queryset: + duplicates_by_xform[record['xform_id']].append(record) - self.__users.add(xform.user) + return duplicates_by_xform diff --git a/kobo/apps/openrosa/apps/logger/models/__init__.py b/kobo/apps/openrosa/apps/logger/models/__init__.py index 01a8162a96..6defed3784 100644 --- a/kobo/apps/openrosa/apps/logger/models/__init__.py +++ b/kobo/apps/openrosa/apps/logger/models/__init__.py @@ -3,7 +3,6 @@ from kobo.apps.openrosa.apps.logger.models.instance import Instance from kobo.apps.openrosa.apps.logger.models.survey_type import SurveyType from kobo.apps.openrosa.apps.logger.models.xform import XForm -from kobo.apps.openrosa.apps.logger.xform_instance_parser import InstanceParseError from kobo.apps.openrosa.apps.logger.models.note import Note from kobo.apps.openrosa.apps.logger.models.daily_xform_submission_counter import ( DailyXFormSubmissionCounter, diff --git a/kobo/apps/openrosa/apps/logger/models/xform.py b/kobo/apps/openrosa/apps/logger/models/xform.py index 11ad293827..eb07bbb5a4 100644 --- a/kobo/apps/openrosa/apps/logger/models/xform.py +++ b/kobo/apps/openrosa/apps/logger/models/xform.py @@ -16,7 +16,7 @@ from kobo.apps.kobo_auth.shortcuts import User from kobo.apps.openrosa.apps.logger.fields import LazyDefaultBooleanField -from kobo.apps.openrosa.apps.logger.xform_instance_parser import XLSFormError +from kobo.apps.openrosa.apps.logger.exceptions import XLSFormError from kobo.apps.openrosa.koboform.pyxform_utils import convert_csv_to_xls from kobo.apps.openrosa.libs.models.base_model import BaseModel from kobo.apps.openrosa.libs.constants import ( diff --git a/kobo/apps/openrosa/apps/logger/tests/test_simple_submission.py b/kobo/apps/openrosa/apps/logger/tests/test_simple_submission.py index c3d845af45..a7646eda95 100644 --- a/kobo/apps/openrosa/apps/logger/tests/test_simple_submission.py +++ b/kobo/apps/openrosa/apps/logger/tests/test_simple_submission.py @@ -3,7 +3,7 @@ from pyxform import SurveyElementBuilder from kobo.apps.kobo_auth.shortcuts import User -from kobo.apps.openrosa.apps.logger.xform_instance_parser import DuplicateInstance +from kobo.apps.openrosa.apps.logger.exceptions import DuplicateInstanceError from kobo.apps.openrosa.apps.main.models.user_profile import UserProfile from kobo.apps.openrosa.apps.viewer.models.data_dictionary import DataDictionary from kobo.apps.openrosa.libs.utils.logger_tools import ( @@ -39,7 +39,7 @@ def _submit_at_hour(self, hour): '_time>' % hour try: create_instance(self.user.username, TempFileProxy(st_xml), []) - except DuplicateInstance: + except DuplicateInstanceError: pass def _submit_simple_yes(self): diff --git a/kobo/apps/openrosa/apps/logger/xform_instance_parser.py b/kobo/apps/openrosa/apps/logger/xform_instance_parser.py index ecfeda23b0..4b220cac53 100644 --- a/kobo/apps/openrosa/apps/logger/xform_instance_parser.py +++ b/kobo/apps/openrosa/apps/logger/xform_instance_parser.py @@ -1,7 +1,10 @@ # coding: utf-8 +from __future__ import annotations + import logging import re import sys +from typing import Union from xml.dom import Node import dateutil.parser @@ -10,38 +13,13 @@ from django.utils.encoding import smart_str from django.utils.translation import gettext as t +from kobo.apps.openrosa.apps.logger.exceptions import InstanceEmptyError from kobo.apps.openrosa.libs.utils.common_tags import XFORM_ID_STRING -class XLSFormError(Exception): - pass - - -class DuplicateInstance(Exception): - def __str__(self): - return t("Duplicate Instance") - - -class InstanceInvalidUserError(Exception): - def __str__(self): - return t("Could not determine the user.") - - -class InstanceParseError(Exception): - def __str__(self): - return t("The instance could not be parsed.") - - -class InstanceEmptyError(InstanceParseError): - def __str__(self): - return t("Empty instance") - - -class InstanceMultipleNodeError(Exception): - pass - - -def get_meta_from_xml(xml_str, meta_name): +def get_meta_node_from_xml( + xml_str: str, meta_name: str +) -> Union[None, tuple[str, minidom.Document]]: xml = clean_and_parse_xml(xml_str) children = xml.childNodes # children ideally contains a single element @@ -66,8 +44,13 @@ def get_meta_from_xml(xml_str, meta_name): return None uuid_tag = uuid_tags[0] - return uuid_tag.firstChild.nodeValue.strip() if uuid_tag.firstChild\ - else None + return uuid_tag, xml + + +def get_meta_from_xml(xml_str: str, meta_name: str) -> str: + if node_and_root := get_meta_node_from_xml(xml_str, meta_name): + node, _ = node_and_root + return node.firstChild.nodeValue.strip() if node.firstChild else None def get_uuid_from_xml(xml): @@ -119,13 +102,26 @@ def get_deprecated_uuid_from_xml(xml): return None -def clean_and_parse_xml(xml_string: str) -> Node: +def clean_and_parse_xml(xml_string: str) -> minidom.Document: clean_xml_str = xml_string.strip() clean_xml_str = re.sub(r'>\s+<', '><', smart_str(clean_xml_str)) xml_obj = minidom.parseString(clean_xml_str) return xml_obj +def set_meta(xml_str: str, meta_name: str, new_value: str) -> str: + + if not (node_and_root := get_meta_node_from_xml(xml_str, meta_name)): + raise ValueError(f'{meta_name} node not found') + + node, root = node_and_root + + if node.firstChild: + node.firstChild.nodeValue = new_value + + return root.toxml() + + def _xml_node_to_dict(node: Node, repeats: list = []) -> dict: assert isinstance(node, Node) if len(node.childNodes) == 0: diff --git a/kobo/apps/openrosa/libs/utils/logger_tools.py b/kobo/apps/openrosa/libs/utils/logger_tools.py index 3d15d8d8b3..cfb18f0b3f 100644 --- a/kobo/apps/openrosa/libs/utils/logger_tools.py +++ b/kobo/apps/openrosa/libs/utils/logger_tools.py @@ -1,6 +1,7 @@ # coding: utf-8 from __future__ import annotations +import contextlib import logging import os import re @@ -18,7 +19,7 @@ from django.conf import settings from django.core.exceptions import ValidationError, PermissionDenied from django.core.mail import mail_admins -from django.db import IntegrityError, transaction +from django.db import connection, IntegrityError, transaction from django.db.models import Q from django.http import ( HttpResponse, @@ -39,6 +40,7 @@ from wsgiref.util import FileWrapper from kobo.apps.openrosa.apps.logger.exceptions import ( + ConflictingXMLHashInstanceError, DuplicateUUIDError, FormInactiveError, TemporarilyUnavailableError, @@ -60,11 +62,13 @@ update_xform_monthly_counter, update_xform_submission_count, ) -from kobo.apps.openrosa.apps.logger.xform_instance_parser import ( +from kobo.apps.openrosa.apps.logger.exceptions import ( + DuplicateInstanceError, InstanceEmptyError, InstanceInvalidUserError, InstanceMultipleNodeError, - DuplicateInstance, +) +from kobo.apps.openrosa.apps.logger.xform_instance_parser import ( clean_and_parse_xml, get_uuid_from_xml, get_deprecated_uuid_from_xml, @@ -134,7 +138,6 @@ def check_edit_submission_permissions( )) -@transaction.atomic # paranoia; redundant since `ATOMIC_REQUESTS` set to `True` def create_instance( username: str, xml_file: str, @@ -161,43 +164,47 @@ def create_instance( # get new and deprecated uuid's new_uuid = get_uuid_from_xml(xml) - # Dorey's rule from 2012 (commit 890a67aa): - # Ignore submission as a duplicate IFF - # * a submission's XForm collects start time - # * the submitted XML is an exact match with one that - # has already been submitted for that user. - # The start-time requirement protected submissions with identical responses - # from being rejected as duplicates *before* KoBoCAT had the concept of - # submission UUIDs. Nowadays, OpenRosa requires clients to send a UUID (in - # ``) within every submission; if the incoming XML has a UUID - # and still exactly matches an existing submission, it's certainly a - # duplicate (https://docs.opendatakit.org/openrosa-metadata/#fields). - if xform.has_start_time or new_uuid is not None: - # XML matches are identified by identical content hash OR, when a - # content hash is not present, by string comparison of the full - # content, which is slow! Use the management command - # `populate_xml_hashes_for_instances` to hash existing submissions - existing_instance = Instance.objects.filter( - Q(xml_hash=xml_hash) | Q(xml_hash=Instance.DEFAULT_XML_HASH, xml=xml), - xform__user=xform.user, - ).first() - else: - existing_instance = None - - if existing_instance: - existing_instance.check_active(force=False) - # ensure we have saved the extra attachments - new_attachments, _ = save_attachments(existing_instance, media_files) - if not new_attachments: - raise DuplicateInstance() + with get_instance_lock(xml_hash) as lock_acquired: + if not lock_acquired: + raise ConflictingXMLHashInstanceError() + # Dorey's rule from 2012 (commit 890a67aa): + # Ignore submission as a duplicate IFF + # * a submission's XForm collects start time + # * the submitted XML is an exact match with one that + # has already been submitted for that user. + # The start-time requirement protected submissions with identical responses + # from being rejected as duplicates *before* KoBoCAT had the concept of + # submission UUIDs. Nowadays, OpenRosa requires clients to send a UUID (in + # ``) within every submission; if the incoming XML has a UUID + # and still exactly matches an existing submission, it's certainly a + # duplicate (https://docs.opendatakit.org/openrosa-metadata/#fields). + + if xform.has_start_time or new_uuid is not None: + # XML matches are identified by identical content hash OR, when a + # content hash is not present, by string comparison of the full + # content, which is slow! Use the management command + # `populate_xml_hashes_for_instances` to hash existing submissions + existing_instance = Instance.objects.filter( + Q(xml_hash=xml_hash) | Q(xml_hash=Instance.DEFAULT_XML_HASH, xml=xml), + xform__user=xform.user, + ).first() else: - # Update Mongo via the related ParsedInstance - existing_instance.parsed_instance.save(asynchronous=False) - return existing_instance - else: - instance = save_submission(request, xform, xml, media_files, new_uuid, - status, date_created_override) - return instance + existing_instance = None + + if existing_instance: + existing_instance.check_active(force=False) + # ensure we have saved the extra attachments + new_attachments, _ = save_attachments(existing_instance, media_files) + if not new_attachments: + raise DuplicateInstanceError() + else: + # Update Mongo via the related ParsedInstance + existing_instance.parsed_instance.save(asynchronous=False) + return existing_instance + else: + instance = save_submission(request, xform, xml, media_files, new_uuid, + status, date_created_override) + return instance def disposition_ext_and_date(name, extension, show_date=True): @@ -216,6 +223,22 @@ def dict2xform(jsform, form_id): return xml_head + dict2xml(jsform) + xml_tail +@contextlib.contextmanager +def get_instance_lock(xml_hash: str) -> bool: + int_lock = int(xml_hash, 16) & 0xfffffffffffffff + acquired = False + + with transaction.atomic(): + try: + cur = connection.cursor() + cur.execute('SELECT pg_try_advisory_lock(%s::bigint);', (int_lock,)) + acquired = cur.fetchone()[0] + yield acquired + finally: + cur.execute('SELECT pg_advisory_unlock(%s::bigint);', (int_lock,)) + cur.close() + + def get_instance_or_404(**criteria): """ Mimic `get_object_or_404` but handles duplicate records. @@ -551,8 +574,13 @@ def safe_create_instance(username, xml_file, media_files, uuid, request): ) except ExpatError as e: error = OpenRosaResponseBadRequest(t("Improperly formatted XML.")) - except DuplicateInstance: - response = OpenRosaResponse(t("Duplicate submission")) + except ConflictingXMLHashInstanceError: + response = OpenRosaResponse(t('Conflict with already existing instance')) + response.status_code = 409 + response['Location'] = request.build_absolute_uri(request.path) + error = response + except DuplicateInstanceError: + response = OpenRosaResponse(t('Duplicate instance')) response.status_code = 202 response['Location'] = request.build_absolute_uri(request.path) error = response diff --git a/kobo/settings/testing.py b/kobo/settings/testing.py index 4ae7e54ea8..3e204192df 100644 --- a/kobo/settings/testing.py +++ b/kobo/settings/testing.py @@ -53,4 +53,6 @@ SERVICE_ACCOUNT['WHITELISTED_HOSTS'] = ['testserver'] SERVICE_ACCOUNT['NAMESPACE'] = 'kobo-service-account-test' +SKIP_TESTS_WITH_CONCURRENCY = os.getenv('SKIP_TESTS_WITH_CONCURRENCY', False) + OPENROSA_DB_ALIAS = DEFAULT_DB_ALIAS