diff --git a/hub/models/extra_user_detail.py b/hub/models/extra_user_detail.py
index 012a28b676..95d27d650b 100644
--- a/hub/models/extra_user_detail.py
+++ b/hub/models/extra_user_detail.py
@@ -78,5 +78,6 @@ def _sync_org_name(self):
except (KeyError, AttributeError):
organization_name = None
- user_organization.name = organization_name
- user_organization.save(update_fields=['name'])
+ if organization_name:
+ user_organization.name = organization_name
+ user_organization.save(update_fields=['name'])
diff --git a/kobo/apps/hook/tests/hook_test_case.py b/kobo/apps/hook/tests/hook_test_case.py
index 030610ee60..6dc51ce796 100644
--- a/kobo/apps/hook/tests/hook_test_case.py
+++ b/kobo/apps/hook/tests/hook_test_case.py
@@ -1,22 +1,12 @@
-# coding: utf-8
import json
import uuid
-import pytest
-import responses
-from django.conf import settings
-from django.urls import reverse
-from rest_framework import status
-
-from kpi.constants import SUBMISSION_FORMAT_TYPE_JSON, SUBMISSION_FORMAT_TYPE_XML
-from kpi.exceptions import BadFormatException
from kpi.tests.kpi_test_case import KpiTestCase
-from ..constants import HOOK_LOG_FAILED
-from ..exceptions import HookRemoteServerDownError
-from ..models import Hook, HookLog
+from ..models import Hook
+from ..utils.tests.mixins import HookTestCaseMixin
-class HookTestCase(KpiTestCase):
+class HookTestCase(HookTestCaseMixin, KpiTestCase):
def setUp(self):
self.client.login(username='someuser', password='someuser')
@@ -49,115 +39,8 @@ def setUp(self):
self.asset.deploy(backend='mock', active=True)
self.asset.save()
self.hook = Hook()
- self._submission_pk = 1
-
- settings.CELERY_TASK_ALWAYS_EAGER = True
-
- def _create_hook(self, return_response_only=False, **kwargs):
-
- format_type = kwargs.get('format_type', SUBMISSION_FORMAT_TYPE_JSON)
- if format_type not in [
- SUBMISSION_FORMAT_TYPE_JSON,
- SUBMISSION_FORMAT_TYPE_XML,
- ]:
- raise BadFormatException(
- 'The format {} is not supported'.format(format_type)
- )
-
- self.__prepare_submission()
-
- url = reverse('hook-list', args=(self.asset.uid,))
- data = {
- 'name': kwargs.get('name', 'some external service with token'),
- 'endpoint': kwargs.get('endpoint', 'http://external.service.local/'),
- 'settings': kwargs.get('settings', {
- 'custom_headers': {
- 'X-Token': '1234abcd'
- }
- }),
- 'export_type': format_type,
- 'active': kwargs.get('active', True),
- 'subset_fields': kwargs.get('subset_fields', []),
- 'payload_template': kwargs.get('payload_template', None)
- }
-
- response = self.client.post(url, data, format='json')
- if return_response_only:
- return response
- else:
- self.assertEqual(
- response.status_code, status.HTTP_201_CREATED, msg=response.data
- )
- hook = self.asset.hooks.last()
- self.assertTrue(hook.active)
- return hook
-
- def _send_and_fail(self):
- """
- The public method which calls this method needs to be decorated by
- `@responses.activate`
-
- :return: dict
- """
- first_hooklog_response = self._send_and_wait_for_retry()
-
- # Fakes Celery n retries by forcing status to `failed`
- # (where n is `settings.HOOKLOG_MAX_RETRIES`)
- first_hooklog = HookLog.objects.get(uid=first_hooklog_response.get('uid'))
- first_hooklog.change_status(HOOK_LOG_FAILED)
-
- return first_hooklog_response
-
- def _send_and_wait_for_retry(self):
- self.hook = self._create_hook()
-
- ServiceDefinition = self.hook.get_service_definition()
- submissions = self.asset.deployment.get_submissions(self.asset.owner)
- submission_id = submissions[0]['_id']
- service_definition = ServiceDefinition(self.hook, submission_id)
- first_mock_response = {'error': 'gateway timeout'}
-
- # Mock first request's try
- responses.add(
- responses.POST,
- self.hook.endpoint,
- json=first_mock_response,
- status=status.HTTP_504_GATEWAY_TIMEOUT,
- )
-
- # Mock next requests' tries
- responses.add(
- responses.POST,
- self.hook.endpoint,
- status=status.HTTP_200_OK,
- content_type='application/json',
- )
-
- # Try to send data to external endpoint
- with pytest.raises(HookRemoteServerDownError):
- service_definition.send()
-
- # Retrieve the corresponding log
- url = reverse('hook-log-list', kwargs={
- 'parent_lookup_asset': self.hook.asset.uid,
- 'parent_lookup_hook': self.hook.uid
- })
-
- response = self.client.get(url)
- first_hooklog_response = response.data.get('results')[0]
-
- # Result should match first try
- self.assertEqual(
- first_hooklog_response.get('status_code'),
- status.HTTP_504_GATEWAY_TIMEOUT,
- )
- self.assertEqual(
- json.loads(first_hooklog_response.get('message')),
- first_mock_response,
- )
- return first_hooklog_response
- def __prepare_submission(self):
+ def _add_submissions(self):
v_uid = self.asset.latest_deployed_version.uid
self.submission = {
'__version__': v_uid,
diff --git a/kobo/apps/hook/utils/tests/__init__.py b/kobo/apps/hook/utils/tests/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/kobo/apps/hook/utils/tests/mixins.py b/kobo/apps/hook/utils/tests/mixins.py
new file mode 100644
index 0000000000..6453751044
--- /dev/null
+++ b/kobo/apps/hook/utils/tests/mixins.py
@@ -0,0 +1,118 @@
+import json
+
+import pytest
+import responses
+from django.urls import reverse
+from rest_framework import status
+
+from kpi.constants import SUBMISSION_FORMAT_TYPE_JSON, SUBMISSION_FORMAT_TYPE_XML
+from kpi.exceptions import BadFormatException
+from kobo.apps.hook.constants import HOOK_LOG_FAILED
+from kobo.apps.hook.exceptions import HookRemoteServerDownError
+from kobo.apps.hook.models import HookLog
+
+
+class HookTestCaseMixin:
+
+ def _create_hook(self, return_response_only=False, **kwargs):
+
+ format_type = kwargs.get('format_type', SUBMISSION_FORMAT_TYPE_JSON)
+ if format_type not in [
+ SUBMISSION_FORMAT_TYPE_JSON,
+ SUBMISSION_FORMAT_TYPE_XML,
+ ]:
+ raise BadFormatException(
+ 'The format {} is not supported'.format(format_type)
+ )
+
+ self._add_submissions()
+
+ url = reverse('hook-list', args=(self.asset.uid,))
+ data = {
+ 'name': kwargs.get('name', 'some external service with token'),
+ 'endpoint': kwargs.get('endpoint', 'http://external.service.local/'),
+ 'settings': kwargs.get('settings', {
+ 'custom_headers': {
+ 'X-Token': '1234abcd'
+ }
+ }),
+ 'export_type': format_type,
+ 'active': kwargs.get('active', True),
+ 'subset_fields': kwargs.get('subset_fields', []),
+ 'payload_template': kwargs.get('payload_template', None)
+ }
+
+ response = self.client.post(url, data, format='json')
+ if return_response_only:
+ return response
+ else:
+ self.assertEqual(
+ response.status_code, status.HTTP_201_CREATED, msg=response.data
+ )
+ hook = self.asset.hooks.last()
+ self.assertTrue(hook.active)
+ return hook
+
+ def _send_and_fail(self) -> dict:
+ """
+ The public method which calls this method needs to be decorated by
+ `@responses.activate`
+ """
+
+ first_hooklog_response = self._send_and_wait_for_retry()
+
+ # Fakes Celery n retries by forcing status to `failed`
+ # (where n is `settings.HOOKLOG_MAX_RETRIES`)
+ first_hooklog = HookLog.objects.get(uid=first_hooklog_response.get('uid'))
+ first_hooklog.change_status(HOOK_LOG_FAILED)
+
+ return first_hooklog_response
+
+ def _send_and_wait_for_retry(self):
+ self.hook = self._create_hook()
+
+ ServiceDefinition = self.hook.get_service_definition()
+ submissions = self.asset.deployment.get_submissions(self.asset.owner)
+ submission_id = submissions[0]['_id']
+ service_definition = ServiceDefinition(self.hook, submission_id)
+ first_mock_response = {'error': 'gateway timeout'}
+
+ # Mock first request's try
+ responses.add(
+ responses.POST,
+ self.hook.endpoint,
+ json=first_mock_response,
+ status=status.HTTP_504_GATEWAY_TIMEOUT,
+ )
+
+ # Mock next requests' tries
+ responses.add(
+ responses.POST,
+ self.hook.endpoint,
+ status=status.HTTP_200_OK,
+ content_type='application/json',
+ )
+
+ # Try to send data to external endpoint
+ with pytest.raises(HookRemoteServerDownError):
+ service_definition.send()
+
+ # Retrieve the corresponding log
+ url = reverse('hook-log-list', kwargs={
+ 'parent_lookup_asset': self.hook.asset.uid,
+ 'parent_lookup_hook': self.hook.uid
+ })
+
+ response = self.client.get(url)
+ first_hooklog_response = response.data.get('results')[0]
+
+ # Result should match first try
+ self.assertEqual(
+ first_hooklog_response.get('status_code'),
+ status.HTTP_504_GATEWAY_TIMEOUT,
+ )
+ self.assertEqual(
+ json.loads(first_hooklog_response.get('message')),
+ first_mock_response,
+ )
+ return first_hooklog_response
diff --git a/kobo/apps/kobo_auth/models.py b/kobo/apps/kobo_auth/models.py
index 36a5c239db..e9798b4dde 100644
--- a/kobo/apps/kobo_auth/models.py
+++ b/kobo/apps/kobo_auth/models.py
@@ -8,6 +8,7 @@
from kobo.apps.openrosa.libs.permissions import get_model_permission_codenames
from kobo.apps.organizations.models import create_organization, Organization
from kpi.utils.database import update_autofield_sequence, use_db
+from kpi.utils.permissions import is_user_anonymous
class User(AbstractUser):
@@ -52,6 +53,9 @@ def is_org_owner(self):
@property
@cache_for_request
def organization(self):
+ if is_user_anonymous(self):
+ return
+
# Database allows multiple organizations per user, but we restrict it to one.
if organization := Organization.objects.filter(
organization_users__user=self
diff --git a/kobo/apps/openrosa/apps/api/viewsets/xform_submission_api.py b/kobo/apps/openrosa/apps/api/viewsets/xform_submission_api.py
index 0df1022d27..7854c2ab0f 100644
--- a/kobo/apps/openrosa/apps/api/viewsets/xform_submission_api.py
+++ b/kobo/apps/openrosa/apps/api/viewsets/xform_submission_api.py
@@ -164,9 +164,11 @@ def create(self, request, *args, **kwargs):
username = user.username
if request.method.upper() == 'HEAD':
- return Response(status=status.HTTP_204_NO_CONTENT,
- headers=self.get_openrosa_headers(request),
- template_name=self.template_name)
+ return Response(
+ status=status.HTTP_204_NO_CONTENT,
+ headers=self.get_openrosa_headers(request),
+ template_name=self.template_name,
+ )
is_json_request = is_json(request)
diff --git a/kobo/apps/openrosa/apps/logger/models/xform.py b/kobo/apps/openrosa/apps/logger/models/xform.py
index ed124fd439..c277cb8218 100644
--- a/kobo/apps/openrosa/apps/logger/models/xform.py
+++ b/kobo/apps/openrosa/apps/logger/models/xform.py
@@ -7,6 +7,7 @@
from django.apps import apps
from django.conf import settings
+from django.contrib.auth.management import DEFAULT_DB_ALIAS
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.urls import reverse
@@ -28,7 +29,9 @@
)
from kpi.fields.file import ExtendedFileField
from kpi.models.abstract_models import AbstractTimeStampedModel
+from kpi.utils.database import use_db
from kpi.utils.hash import calculate_hash
+from kpi.utils.object_permission import perm_parse
from kpi.utils.xml import XMLFormWithDisclaimer
XFORM_TITLE_LENGTH = 255
@@ -60,7 +63,9 @@ class XForm(AbstractTimeStampedModel):
description = models.TextField(default='', null=True)
xml = models.TextField()
- user = models.ForeignKey(User, related_name='xforms', null=True, on_delete=models.CASCADE)
+ user = models.ForeignKey(
+ User, related_name='xforms', null=True, on_delete=models.CASCADE
+ )
require_auth = models.BooleanField(
default=True,
verbose_name=t('Require authentication to see form and submit data'),
@@ -124,25 +129,31 @@ def asset(self):
See kpi.utils.xml.XMLFormWithDisclaimer for more details.
"""
Asset = apps.get_model('kpi', 'Asset') # noqa
- if not hasattr(self, '_cache_asset'):
+ if not hasattr(self, '_cached_asset'):
# We only need to load the PK because XMLFormWithDisclaimer
# uses an Asset object only to narrow down a query with a filter,
# thus uses only asset PK
try:
- asset = Asset.objects.only('pk').get(uid=self.kpi_asset_uid)
+ asset = Asset.objects.only('pk', 'name', 'uid', 'owner_id').get(
+ uid=self.kpi_asset_uid
+ )
except Asset.DoesNotExist:
try:
- asset = Asset.objects.only('pk').get(
- _deployment_data__formid=self.pk
- )
+ asset = Asset.objects.only(
+ 'pk', 'name', 'uid', 'owner_id'
+ ).get(_deployment_data__formid=self.pk)
except Asset.DoesNotExist:
# An `Asset` object needs to be returned to avoid 500 while
# Enketo is fetching for project XML (e.g: /formList, /manifest)
- asset = Asset(uid=self.id_string)
+ asset = Asset(
+ uid=self.id_string,
+ name=self.title,
+ owner_id=self.user.id,
+ )
- setattr(self, '_cache_asset', asset)
+ setattr(self, '_cached_asset', asset)
- return getattr(self, '_cache_asset')
+ return getattr(self, '_cached_asset')
@property
def can_be_replaced(self):
@@ -160,6 +171,7 @@ def data_dictionary(self, use_cache: bool = False):
xform_dict = deepcopy(self.__dict__)
xform_dict.pop('_state', None)
+ xform_dict.pop('_cached_asset', None)
return DataDictionary(**xform_dict)
def file_name(self):
@@ -173,6 +185,31 @@ def geocoded_submission_count(self):
def has_instances_with_geopoints(self):
return self.instances_with_geopoints
+ def has_mapped_perm(self, user_obj: User, perm: str) -> bool:
+ """
+ Checks if a role-based user (e.g., an organization admin) has access to an
+ object by validating against equivalent permissions defined in KPI.
+
+ In the context of OpenRosa, roles such as organization admins do not have
+ permissions explicitly recorded in the database. Since django-guardian cannot
+ determine access for such roles directly, this method maps the role to
+ its equivalent permissions in KPI, allowing for accurate permission validation.
+ """
+ _, codename = perm_parse(perm)
+
+ with use_db(DEFAULT_DB_ALIAS):
+ kc_permission_map = self.asset.KC_PERMISSIONS_MAP
+ try:
+ kpi_perm = list(kc_permission_map.keys())[
+ list(kc_permission_map.values()).index(codename)
+ ]
+ except ValueError:
+ return False
+
+ has_perm = self.asset.has_perm(user_obj, kpi_perm)
+
+ return has_perm
+
@property
def md5_hash(self):
return calculate_hash(self.xml)
@@ -227,18 +264,6 @@ def submission_count(self, force_update=False):
submission_count.short_description = t('Submission Count')
- def update(self, *args, **kwargs):
- super().save(*args, **kwargs)
-
- def url(self):
- return reverse(
- 'download_xform',
- kwargs={
- 'username': self.user.username,
- 'pk': self.pk
- }
- )
-
def time_of_last_submission(self):
if self.last_submission_time is None and self.num_of_submissions > 0:
try:
@@ -258,6 +283,18 @@ def time_of_last_submission_update(self):
except ObjectDoesNotExist:
pass
+ def update(self, *args, **kwargs):
+ super().save(*args, **kwargs)
+
+ def url(self):
+ return reverse(
+ 'download_xform',
+ kwargs={
+ 'username': self.user.username,
+ 'pk': self.pk
+ }
+ )
+
@property
def xml_with_disclaimer(self):
return XMLFormWithDisclaimer(self).get_object().xml
diff --git a/kobo/apps/openrosa/libs/filters.py b/kobo/apps/openrosa/libs/filters.py
index e5f52186be..61b178d057 100644
--- a/kobo/apps/openrosa/libs/filters.py
+++ b/kobo/apps/openrosa/libs/filters.py
@@ -1,4 +1,4 @@
-# coding: utf-8
+from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.http import Http404
from django.shortcuts import get_object_or_404
@@ -6,14 +6,16 @@
from rest_framework.exceptions import ParseError
from rest_framework.filters import BaseFilterBackend
+from kobo.apps.kobo_auth.shortcuts import User
from kobo.apps.openrosa.apps.logger.models import Instance, XForm
from kobo.apps.openrosa.libs.utils.guardian import get_objects_for_user
+from kpi.utils.object_permission import get_database_user
class GuardianObjectPermissionsFilter(BaseFilterBackend):
"""
Copy from django-rest-framework-guardian `ObjectPermissionsFilter`
- Avoid importing the library (which does not seem to be maintained anymore)
+ Avoid importing the library (which does not seem to be maintained anymore)
"""
perm_format = '%(app_label)s.view_%(model_name)s'
@@ -21,6 +23,13 @@ class GuardianObjectPermissionsFilter(BaseFilterBackend):
'accept_global_perms': False,
}
+ ORG_ADMIN_EXEMPT_VIEWS = {
+ 'XFormListApi': [
+ 'manifest',
+ 'list',
+ ]
+ }
+
def filter_queryset(self, request, queryset, view):
user = request.user
permission = self.perm_format % {
@@ -28,10 +37,52 @@ def filter_queryset(self, request, queryset, view):
'model_name': queryset.model._meta.model_name,
}
+ if org_admin_queryset := self._get_objects_for_org_admin(
+ request, queryset, view
+ ):
+ return org_admin_queryset
+
return get_objects_for_user(
user, permission, queryset, **self.shortcut_kwargs
)
+ def _get_objects_for_org_admin(self, request, queryset, view):
+ """
+ Bypasses the Django Guardian permission mechanism to retrieve all objects
+ belonging to the owner of an organization.
+
+ If the current user is an admin of the organization associated with the
+ owner of the given object, this method returns all related objects
+ without applying Guardian's usual filtering.
+ """
+
+ # Only check for specific view and action
+ if not (
+ view.action
+ in self.ORG_ADMIN_EXEMPT_VIEWS.get(view.__class__.__name__, {})
+ ):
+ return
+
+ xform_id_string = request.query_params.get('formID', False)
+ object_id = request.parser_context['kwargs'].get('pk', False)
+
+ if xform_id_string or object_id:
+
+ xform_filter = (
+ {'xforms__id_string': xform_id_string}
+ if xform_id_string
+ else {'xforms__pk': object_id}
+ )
+
+ if (
+ owner := User.objects.using(settings.OPENROSA_DB_ALIAS)
+ .filter(**xform_filter)
+ .first()
+ ):
+ user = get_database_user(request.user)
+ if owner.organization.is_admin_only(user):
+ return queryset.filter(user=owner)
+
class AnonDjangoObjectPermissionFilter(GuardianObjectPermissionsFilter):
def filter_queryset(self, request, queryset, view):
@@ -58,7 +109,7 @@ def filter_queryset(self, request, queryset, view):
# some of its objects are not allowed to accessed by `request.user`.
# We need to avoid `guardian` filter to allow:
# - anonymous user to see public data
- # - superuser to take actions on all objects.
+ # - superuser to take actions on all objects
# The permissions validation is handled by the permission classes and
# should deny access to forbidden data.
if request.user.is_anonymous or request.user.is_superuser:
diff --git a/kobo/apps/openrosa/libs/utils/logger_tools.py b/kobo/apps/openrosa/libs/utils/logger_tools.py
index fc03f679f3..28436f749b 100644
--- a/kobo/apps/openrosa/libs/utils/logger_tools.py
+++ b/kobo/apps/openrosa/libs/utils/logger_tools.py
@@ -137,7 +137,7 @@ def check_edit_submission_permissions(
))
-@transaction.atomic # paranoia; redundant since `ATOMIC_REQUESTS` set to `True`
+@transaction.atomic
def create_instance(
username: str,
xml_file: File,
diff --git a/kobo/apps/organizations/constants.py b/kobo/apps/organizations/constants.py
index dd2f1525b5..790f5984f5 100644
--- a/kobo/apps/organizations/constants.py
+++ b/kobo/apps/organizations/constants.py
@@ -1,4 +1,4 @@
-ADMIN_ORG_ROLE = 'admin'
-EXTERNAL_ORG_ROLE = 'external'
-MEMBER_ORG_ROLE = 'member'
-OWNER_ORG_ROLE = 'owner'
+ORG_ADMIN_ROLE = 'admin'
+ORG_EXTERNAL_ROLE = 'external'
+ORG_MEMBER_ROLE = 'member'
+ORG_OWNER_ROLE = 'owner'
diff --git a/kobo/apps/organizations/exceptions.py b/kobo/apps/organizations/exceptions.py
new file mode 100644
index 0000000000..2fb1b71493
--- /dev/null
+++ b/kobo/apps/organizations/exceptions.py
@@ -0,0 +1,2 @@
+class NotMultiMemberOrganizationException(Exception):
+ pass
diff --git a/kobo/apps/organizations/migrations/0006_update_organization_name.py b/kobo/apps/organizations/migrations/0006_update_organization_name.py
index a7d171696b..6c1e3b8ee0 100644
--- a/kobo/apps/organizations/migrations/0006_update_organization_name.py
+++ b/kobo/apps/organizations/migrations/0006_update_organization_name.py
@@ -31,10 +31,12 @@ def update_organization_names(apps, schema_editor):
except (KeyError, AttributeError):
continue
- organization.name = organization_name
- organizations.append(organization)
+ if organization_name:
+ organization.name = organization_name
+ organizations.append(organization)
- Organization.objects.bulk_update(organizations, ['name'])
+ if organizations:
+ Organization.objects.bulk_update(organizations, ['name'])
def noop(apps, schema_editor):
diff --git a/kobo/apps/organizations/models.py b/kobo/apps/organizations/models.py
index 4fc9a111ce..d03c01bb3b 100644
--- a/kobo/apps/organizations/models.py
+++ b/kobo/apps/organizations/models.py
@@ -20,14 +20,16 @@
from kobo.apps.stripe.constants import ACTIVE_STRIPE_STATUSES
from kpi.fields import KpiUidField
from .constants import (
- ADMIN_ORG_ROLE,
- EXTERNAL_ORG_ROLE,
- MEMBER_ORG_ROLE,
- OWNER_ORG_ROLE,
+ ORG_ADMIN_ROLE,
+ ORG_EXTERNAL_ROLE,
+ ORG_MEMBER_ROLE,
+ ORG_OWNER_ROLE,
)
+from .exceptions import NotMultiMemberOrganizationException
+
OrganizationRole = Literal[
- ADMIN_ORG_ROLE, EXTERNAL_ORG_ROLE, MEMBER_ORG_ROLE, OWNER_ORG_ROLE
+ ORG_ADMIN_ROLE, ORG_EXTERNAL_ROLE, ORG_MEMBER_ROLE, ORG_OWNER_ROLE
]
@@ -37,6 +39,13 @@ class Organization(AbstractOrganization):
default=False, verbose_name='Multi-members override'
)
+ def add_user(self, user, is_admin=False):
+ if not self.is_mmo and self.users.all().count():
+ raise NotMultiMemberOrganizationException
+
+ user.organization.delete()
+ super().add_user(user, is_admin=is_admin)
+
@cache_for_request
def active_subscription_billing_details(self):
"""
@@ -84,7 +93,7 @@ def canceled_subscription_billing_cycle_anchor(self):
def email(self):
"""
As organization is our customer model for Stripe, Stripe requires that
- it has an email address attribute
+ it has an email address attribute.
"""
try:
return self.owner_user_object.email
@@ -95,23 +104,24 @@ def email(self):
def get_user_role(self, user: 'User') -> OrganizationRole:
if not self.users.filter(pk=user.pk).exists():
- return EXTERNAL_ORG_ROLE
+ return ORG_EXTERNAL_ROLE
if self.is_owner(user):
- return OWNER_ORG_ROLE
+ return ORG_OWNER_ROLE
if self.is_admin(user):
- return ADMIN_ORG_ROLE
+ return ORG_ADMIN_ROLE
- return MEMBER_ORG_ROLE
+ return ORG_MEMBER_ROLE
@cache_for_request
def is_admin(self, user: 'User') -> bool:
"""
Only extends super() to add decorator @cache_for_request and avoid
- multiple calls to DB in the same request
+ multiple calls to DB in the same request.
"""
+ # Be aware: Owners are also Admins
return super().is_admin(user)
@property
@@ -128,13 +138,21 @@ def is_mmo(self):
return self.mmo_override or bool(self.active_subscription_billing_details())
@cache_for_request
- def is_owner(self, user):
+ def is_admin_only(self, user: 'User') -> bool:
+
+ # Be aware: Owners are also Admins
+ return super().is_admin(user) and not self.is_owner(user)
+
+ @cache_for_request
+ def is_owner(self, user: 'User') -> bool:
"""
- Only extends super() to add decorator @cache_for_request and avoid
- multiple calls to DB in the same request
+ Overrides `is_owner()` with `owner_user_object()` instead of
+ using `super().is_owner()` to take advantage of `@cache_for_request`
+ in both scenarios.
+ (i.e., when calling either `is_owner()` or `owner_user_object()`).
"""
- return super().is_owner(user)
+ return self.owner_user_object == user
@property
@cache_for_request
diff --git a/kobo/apps/organizations/permissions.py b/kobo/apps/organizations/permissions.py
index 51a5cc399f..7564a57eda 100644
--- a/kobo/apps/organizations/permissions.py
+++ b/kobo/apps/organizations/permissions.py
@@ -1,25 +1,44 @@
+from django.http import Http404
from rest_framework import permissions
+from kobo.apps.organizations.constants import ORG_EXTERNAL_ROLE
from kpi.mixins.validation_password_permission import ValidationPasswordPermissionMixin
+from kpi.utils.object_permission import get_database_user
-class IsOrgAdminOrReadOnly(
+class IsOrgAdmin(
ValidationPasswordPermissionMixin, permissions.BasePermission
):
"""
- Object-level permission to only allow admin members of an object to edit it.
+ Object-level permission to only allow admin members of an object to access it.
Assumes the model instance has an `is_admin` attribute.
"""
-
def has_permission(self, request, view):
self.validate_password(request)
return super().has_permission(request=request, view=view)
def has_object_permission(self, request, view, obj):
+ user = get_database_user(request.user)
+ if obj.get_user_role(user) == ORG_EXTERNAL_ROLE:
+ # Do not expose organization existence
+ raise Http404()
+
+ # Instance must have an attribute named `is_admin`.
+ return obj.is_admin(user)
+
+
+class IsOrgAdminOrReadOnly(IsOrgAdmin):
+ """
+ Object-level permission to only allow admin members of an object to edit it.
+ Assumes the model instance has an `is_admin` attribute.
+ """
+
+ def has_object_permission(self, request, view, obj):
+
# Read permissions are allowed to any request,
# so we'll always allow GET, HEAD or OPTIONS requests.
if request.method in permissions.SAFE_METHODS:
return True
- # Instance must have an attribute named `owner`.
+ # Instance must have an attribute named `is_admin`
return obj.is_admin(request.user)
diff --git a/kobo/apps/organizations/serializers.py b/kobo/apps/organizations/serializers.py
index 21d73e131d..2c0e0fdcd1 100644
--- a/kobo/apps/organizations/serializers.py
+++ b/kobo/apps/organizations/serializers.py
@@ -7,7 +7,7 @@
OrganizationUser,
)
from kpi.utils.object_permission import get_database_user
-from .constants import EXTERNAL_ORG_ROLE
+from .constants import ORG_EXTERNAL_ROLE
class OrganizationUserSerializer(serializers.ModelSerializer):
@@ -66,4 +66,4 @@ def get_request_user_role(self, organization):
user = get_database_user(request.user)
return organization.get_user_role(user)
- return EXTERNAL_ORG_ROLE
+ return ORG_EXTERNAL_ROLE
diff --git a/kobo/apps/organizations/tests/test_organization.py b/kobo/apps/organizations/tests/test_organizations.py
similarity index 79%
rename from kobo/apps/organizations/tests/test_organization.py
rename to kobo/apps/organizations/tests/test_organizations.py
index 73a2d054fc..24d4a6acae 100644
--- a/kobo/apps/organizations/tests/test_organization.py
+++ b/kobo/apps/organizations/tests/test_organizations.py
@@ -3,10 +3,10 @@
from kobo.apps.kobo_auth.shortcuts import User
from kobo.apps.organizations.models import Organization
from kobo.apps.organizations.constants import (
- ADMIN_ORG_ROLE,
- EXTERNAL_ORG_ROLE,
- MEMBER_ORG_ROLE,
- OWNER_ORG_ROLE,
+ ORG_ADMIN_ROLE,
+ ORG_EXTERNAL_ROLE,
+ ORG_MEMBER_ROLE,
+ ORG_OWNER_ROLE,
)
@@ -16,9 +16,10 @@ class OrganizationTestCase(TestCase):
def setUp(self):
self.someuser = User.objects.get(username='someuser')
-
# someuser is the only member their organization, and the owner as well.
self.organization = self.someuser.organization
+ self.organization.mmo_override = True
+ self.organization.save(update_fields=['mmo_override'])
def test_owner_user_object_property(self):
anotheruser = User.objects.get(username='anotheruser')
@@ -33,10 +34,10 @@ def test_get_user_role(self):
)
self.organization.add_user(anotheruser, is_admin=True)
self.organization.add_user(alice)
- assert self.organization.get_user_role(self.someuser) == OWNER_ORG_ROLE
- assert self.organization.get_user_role(anotheruser) == ADMIN_ORG_ROLE
- assert self.organization.get_user_role(alice) == MEMBER_ORG_ROLE
- assert self.organization.get_user_role(external) == EXTERNAL_ORG_ROLE
+ assert self.organization.get_user_role(self.someuser) == ORG_OWNER_ROLE
+ assert self.organization.get_user_role(anotheruser) == ORG_ADMIN_ROLE
+ assert self.organization.get_user_role(alice) == ORG_MEMBER_ROLE
+ assert self.organization.get_user_role(external) == ORG_EXTERNAL_ROLE
def test_create_organization_on_user_creation(self):
assert not Organization.objects.filter(name__startswith='alice').exists()
@@ -57,6 +58,12 @@ def test_sync_org_name_on_save(self):
"""
# someuser is the owner
assert self.organization.name == 'someuser’s organization'
+ someuser_extra_details = self.someuser.extra_details
+ someuser_extra_details.data['organization'] = ''
+ someuser_extra_details.save()
+ self.organization.refresh_from_db()
+ assert self.organization.name == 'someuser’s organization'
+
someuser_extra_details = self.someuser.extra_details
someuser_extra_details.data['organization'] = 'SomeUser Technologies'
someuser_extra_details.save()
diff --git a/kobo/apps/organizations/tests/test_organizations_api.py b/kobo/apps/organizations/tests/test_organizations_api.py
index 961614cbd9..02665895ea 100644
--- a/kobo/apps/organizations/tests/test_organizations_api.py
+++ b/kobo/apps/organizations/tests/test_organizations_api.py
@@ -1,18 +1,34 @@
from unittest.mock import patch
+import responses
+from django.contrib.auth.models import Permission
from django.urls import reverse
+from ddt import ddt, data, unpack
from model_bakery import baker
from rest_framework import status
from kobo.apps.kobo_auth.shortcuts import User
+from kobo.apps.hook.utils.tests.mixins import HookTestCaseMixin
from kobo.apps.organizations.models import Organization
-from kpi.tests.kpi_test_case import BaseTestCase
+from kpi.constants import (
+ PERM_ADD_SUBMISSIONS,
+ PERM_MANAGE_ASSET,
+ PERM_VIEW_ASSET,
+)
+from kpi.models.asset import Asset
+from kpi.tests.base_test_case import BaseTestCase, BaseAssetTestCase
+from kpi.tests.utils.mixins import (
+ AssetFileTestCaseMixin,
+ SubmissionEditTestCaseMixin,
+ SubmissionDeleteTestCaseMixin,
+ SubmissionValidationStatusTestCaseMixin,
+ SubmissionViewTestCaseMixin,
+)
from kpi.urls.router_api_v2 import URL_NAMESPACE
-
from kpi.utils.fuzzy_int import FuzzyInt
-class OrganizationTestCase(BaseTestCase):
+class OrganizationApiTestCase(BaseTestCase):
fixtures = ['test_data']
URL_NAMESPACE = URL_NAMESPACE
DEFAULT_SUBSCRIPTION_DETAILS = {
@@ -26,12 +42,10 @@ def setUp(self):
self.url_list = reverse(self._get_endpoint('organizations-list'))
def _insert_data(self, mmo_override=False):
- self.organization = baker.make(
- Organization,
- id='org_abcd1234',
- mmo_override=mmo_override
- )
- self.organization.add_user(user=self.user, is_admin=True)
+ self.organization = self.user.organization
+ self.organization.mmo_override = mmo_override
+ self.organization.save(update_fields=['mmo_override'])
+
self.url_detail = reverse(
self._get_endpoint('organizations-detail'),
kwargs={'id': self.organization.id},
@@ -52,7 +66,7 @@ def test_create(self):
def test_list(self):
self._insert_data()
- with self.assertNumQueries(FuzzyInt(10, 16)):
+ with self.assertNumQueries(FuzzyInt(8, 16)):
res = self.client.get(self.url_list)
self.assertContains(res, self.organization.name)
@@ -64,7 +78,7 @@ def test_api_returns_org_data(self):
self.assertContains(response, self.organization.name)
def test_update(self):
- self._insert_data()
+ self._insert_data(mmo_override=True)
data = {'name': 'edit'}
with self.assertNumQueries(FuzzyInt(10, 16)):
res = self.client.patch(self.url_detail, data)
@@ -129,9 +143,667 @@ def test_api_response_includes_is_mmo_with_override_and_subscription(
):
"""
Test that is_mmo is True when both mmo_override and active
- subscription are present.
+ subscription is present.
"""
self._insert_data(mmo_override=True)
response = self.client.get(self.url_detail)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['is_mmo'], True)
+
+
+class BaseOrganizationAssetApiTestCase(BaseAssetTestCase):
+ """
+ This test suite (e.g. classes which inherit from this one) does not cover
+ scenarios where owners or admins are also the creators of the objects.
+
+ The primary focus is to evaluate access for organization admins without explicit
+ permission assignments, while ensuring that permissions for other users work
+ as expected.
+
+ - someuser is the owner
+ - anotheruser is an admin
+ - alice is a member
+ - bob is external to the organization
+ """
+
+ fixtures = ['test_data']
+ URL_NAMESPACE = URL_NAMESPACE
+
+ def setUp(self):
+ self.someuser = User.objects.get(username='someuser')
+ self.organization = self.someuser.organization
+ self.organization.mmo_override = True
+ self.organization.save(update_fields=['mmo_override'])
+
+ # anotheruser is an admin of someuser's organization
+ self.anotheruser = User.objects.get(username='anotheruser')
+ self.organization.add_user(self.anotheruser, is_admin=True)
+
+ # alice is a regular member of someuser's organization
+ self.alice = User.objects.create_user(
+ username='alice', password='alice', email='alice@alice.com'
+ )
+ self.organization.add_user(self.alice, is_admin=False)
+
+ # bob is external to someuser's organization
+ self.bob = User.objects.create_user(
+ username='bob', password='bob', email='bob@bob.com'
+ )
+
+ # Assign permissions to someuser's assets
+ for asset in self.someuser.assets.all():
+ asset.save()
+
+ self.client.force_login(self.someuser)
+ self.org_assets_list_url = reverse(
+ self._get_endpoint('organizations-assets'),
+ kwargs={'id': self.organization.id},
+ )
+
+ def _create_asset_by_alice(self):
+
+ self.client.force_login(self.alice)
+ response = self.create_asset(
+ name='Breakfast',
+ content={
+ 'survey': [
+ {
+ 'name': 'egg',
+ 'type': 'integer',
+ 'label': 'how many eggs?',
+ },
+ {
+ 'name': 'bacon',
+ 'type': 'integer',
+ 'label': 'how many slices of bacon',
+ },
+ ],
+ },
+ )
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.data['owner__username'] == self.someuser.username
+ assert_detail_url = reverse(
+ self._get_endpoint('asset-detail'), kwargs={'uid': response.data['uid']}
+ )
+ response = self.client.get(assert_detail_url)
+
+ asset = Asset.objects.get(uid=response.data['uid'])
+ asset.deploy(backend='mock', active=True)
+ # Ensure creator received "manage_asset" permission
+ assert asset.has_perm(self.alice, PERM_MANAGE_ASSET)
+
+ assert response.status_code == status.HTTP_200_OK
+ return response
+
+ def _create_asset_by_bob(self):
+
+ self.client.force_login(self.bob)
+
+ response = self.create_asset(
+ name='Report',
+ content={
+ 'survey': [
+ {
+ 'name': 'number_of_pages',
+ 'type': 'integer',
+ 'label': 'How many pages?',
+ }
+ ],
+ }
+ )
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.data['owner__username'] == self.bob.username
+ assert_detail_url = reverse(
+ self._get_endpoint('asset-detail'), kwargs={'uid': response.data['uid']}
+ )
+ response = self.client.get(assert_detail_url)
+ assert response.status_code == status.HTTP_200_OK
+
+ asset = Asset.objects.get(uid=response.data['uid'])
+ asset.deploy(backend='mock', active=True)
+ asset.assign_perm(self.someuser, PERM_MANAGE_ASSET)
+
+ return response
+
+
+class BaseOrganizationAdminsDataApiTestCase(BaseOrganizationAssetApiTestCase):
+ """
+ Base test case for testing organization admin permissions.
+
+ This test suite serves as the foundation for classes that verify the permissions
+ of organization admins. It focuses on key access points and basic cases to ensure
+ that admins have the same rights as organization owners over assets and associated
+ data, even without explicitly assigned permissions.
+
+ This suite is intentionally not exhaustive to avoid redundancy with tests covering
+ regular user flows. Only the admin role is tested here, as owners and regular
+ members follow the standard user scenario.
+ """
+
+ def setUp(self):
+ super().setUp()
+ response = self._create_asset_by_alice()
+ self.asset = Asset.objects.get(uid=response.data['uid'])
+ self.asset.deploy(backend='mock', active=True)
+ self.submission = {
+ 'egg': 2,
+ 'bacon': 1,
+ }
+ self.asset.deployment.mock_submissions([self.submission])
+ self.data_url = reverse(
+ self._get_endpoint('submission-list'),
+ kwargs={'parent_lookup_asset': self.asset.uid},
+ )
+ self.submission_url = reverse(
+ self._get_endpoint('submission-detail'),
+ kwargs={
+ 'parent_lookup_asset': self.asset.uid,
+ 'pk': self.submission['_id'],
+ },
+ )
+ self.submission_list_url = reverse(
+ self._get_endpoint('submission-list'),
+ kwargs={'parent_lookup_asset': self.asset.uid},
+ )
+ self.client.force_login(self.anotheruser)
+
+
+@ddt
+class OrganizationAssetListApiTestCase(BaseOrganizationAssetApiTestCase):
+ """
+ The organization asset endpoint returns a list of all assets owned by the
+ organization, while the regular asset endpoint only returns assets for which
+ the user has explicit permissions.
+
+ Admins can view organization assets based on their role, not through assigned
+ permissions.
+ """
+
+ @data(
+ ('someuser', status.HTTP_200_OK),
+ ('anotheruser', status.HTTP_200_OK),
+ ('alice', status.HTTP_403_FORBIDDEN),
+ ('bob', status.HTTP_404_NOT_FOUND),
+ )
+ @unpack
+ def test_can_list(self, username, expected_status_code):
+ user = User.objects.get(username=username)
+
+ self.client.force_login(user)
+ response = self.client.get(self.org_assets_list_url)
+ assert response.status_code == expected_status_code
+
+ def test_list_not_found_as_anonymous(self):
+ self.client.logout()
+ response = self.client.get(self.org_assets_list_url)
+ assert response.status_code == status.HTTP_404_NOT_FOUND
+
+ def test_list_only_organization_assets(self):
+ # The organization's assets endpoint only returns assets where the `owner`
+ # matches the User object who owns the organization.
+ # The user assets list endpoint returns all assets to which a permission
+ # has been assigned.
+ # As a result, for an owner, each of the assets they own will appear in
+ # both lists until the code excludes organization assets from the user
+ # list.
+
+ self.client.force_login(self.someuser)
+
+ # Ensure someuser see already created assets (from fixture).
+ asset_list_url = reverse(self._get_endpoint('asset-list'))
+ response = self.client.get(self.org_assets_list_url)
+ someuser_org_asset_uids = [a['uid'] for a in response.data['results']]
+ assert response.data['count'] == 2
+
+ response = self.client.get(asset_list_url)
+ someuser_asset_uids = [a['uid'] for a in response.data['results']]
+ assert response.data['count'] == 2
+ assert someuser_org_asset_uids == someuser_asset_uids
+
+ response = self._create_asset_by_bob()
+ bob_asset = Asset.objects.get(uid=response.data['uid'])
+ bob_asset.assign_perm(self.someuser, PERM_VIEW_ASSET)
+
+ self.client.force_login(self.someuser)
+ # someuser should see only their org projects on the org assets list
+ response = self.client.get(self.org_assets_list_url)
+ assert response.data['count'] == 2
+ assert someuser_org_asset_uids == [a['uid'] for a in response.data['results']]
+
+ # someuser should see only projects shared with them on the regular assets list
+ response = self.client.get(asset_list_url)
+ assert response.data['count'] == 3
+ assert response.data['results'][0]['uid'] == bob_asset.uid
+
+ self.client.force_login(self.anotheruser)
+ # anotheruser, as an admin, should see only someuser org projects on the
+ # org assets list
+ response = self.client.get(self.org_assets_list_url)
+ assert response.data['count'] == 2
+ assert someuser_org_asset_uids == [a['uid'] for a in response.data['results']]
+
+ # anotheruser should see no assets on the regular assets list because
+ # no assets are shared with them
+ response = self.client.get(asset_list_url)
+ assert response.data['count'] == 0
+
+
+@ddt
+class OrganizationAssetDetailApiTestCase(BaseOrganizationAssetApiTestCase):
+ """
+ Owners and Admins have complete control over organization projects, including
+ the ability to delete them.
+ Members who create projects can manage them entirely, except for deletion.
+ Externals do not have any permissions on organization projects. However,
+ Owners and Admins can only interact with externals' projects if these projects
+ have been explicitly shared with them.
+ """
+
+ def test_create_asset_is_owned_by_organization(self):
+ self._create_asset_by_alice()
+
+ @data(
+ ('someuser', True, status.HTTP_200_OK),
+ ('someuser', False, status.HTTP_200_OK),
+ ('anotheruser', True, status.HTTP_200_OK),
+ ('anotheruser', False, status.HTTP_404_NOT_FOUND),
+ ('alice', True, status.HTTP_200_OK),
+ ('alice', False, status.HTTP_404_NOT_FOUND),
+ ('bob', True, status.HTTP_404_NOT_FOUND),
+ ('bob', False, status.HTTP_200_OK),
+ )
+ @unpack
+ def test_get_asset_detail(
+ self, username: str, owned_by_org: bool, expected_status_code: int
+ ):
+
+ if owned_by_org:
+ response = self._create_asset_by_alice()
+ else:
+ response = self._create_asset_by_bob()
+
+ asset_uid = response.data['uid']
+ user = User.objects.get(username=username)
+ assert_detail_url = reverse(
+ self._get_endpoint('asset-detail'), kwargs={'uid': asset_uid}
+ )
+
+ self.client.force_login(user)
+ response = self.client.get(assert_detail_url)
+ assert response.status_code == expected_status_code
+
+ if expected_status_code == status.HTTP_200_OK:
+ assert response.data['uid'] == response.data['uid']
+
+ @data(
+ ('someuser', True, status.HTTP_200_OK),
+ ('someuser', False, status.HTTP_200_OK),
+ ('anotheruser', True, status.HTTP_200_OK),
+ ('anotheruser', False, status.HTTP_404_NOT_FOUND),
+ ('alice', True, status.HTTP_200_OK),
+ ('alice', False, status.HTTP_404_NOT_FOUND),
+ ('bob', True, status.HTTP_404_NOT_FOUND),
+ ('bob', False, status.HTTP_200_OK),
+ )
+ @unpack
+ def test_can_update_asset(
+ self, username: str, owned_by_org: bool, expected_status_code: int
+ ):
+
+ if owned_by_org:
+ response = self._create_asset_by_alice()
+ else:
+ response = self._create_asset_by_bob()
+
+ asset_uid = response.data['uid']
+ user = User.objects.get(username=username)
+ assert_detail_url = reverse(
+ self._get_endpoint('asset-detail'), kwargs={'uid': asset_uid}
+ )
+ data = {
+ 'name': 'Week-end breakfast'
+ }
+
+ self.client.force_login(user)
+ response = self.client.patch(assert_detail_url, data)
+ assert response.status_code == expected_status_code
+
+ if expected_status_code == status.HTTP_200_OK:
+ assert response.data['name'] == response.data['name']
+
+ @data(
+ ('someuser', True, status.HTTP_204_NO_CONTENT),
+ ('someuser', False, status.HTTP_403_FORBIDDEN),
+ ('anotheruser', True, status.HTTP_204_NO_CONTENT),
+ ('anotheruser', False, status.HTTP_404_NOT_FOUND),
+ ('alice', True, status.HTTP_403_FORBIDDEN),
+ ('alice', False, status.HTTP_404_NOT_FOUND),
+ ('bob', True, status.HTTP_404_NOT_FOUND),
+ ('bob', False, status.HTTP_204_NO_CONTENT),
+ )
+ @unpack
+ def test_can_delete_asset(
+ self, username: str, owned_by_org: bool, expected_status_code: int
+ ):
+
+ if owned_by_org:
+ response = self._create_asset_by_alice()
+ else:
+ response = self._create_asset_by_bob()
+ asset_uid = response.data['uid']
+ user = User.objects.get(username=username)
+ assert_detail_url = reverse(
+ self._get_endpoint('asset-detail'),
+ # Use JSON format to prevent HtmlRenderer from returning a 200 status
+ # instead of 204.
+ kwargs={'uid': asset_uid, 'format': 'json'}
+ )
+
+ self.client.force_login(user)
+ response = self.client.delete(assert_detail_url)
+ assert response.status_code == expected_status_code
+
+ @data(
+ ('someuser', True, True, status.HTTP_200_OK),
+ ('someuser', True, False, status.HTTP_200_OK),
+ ('someuser', False, True, status.HTTP_200_OK),
+ ('someuser', False, False, status.HTTP_200_OK),
+ ('anotheruser', True, True, status.HTTP_200_OK),
+ ('anotheruser', True, False, status.HTTP_200_OK),
+ ('anotheruser', False, True, status.HTTP_404_NOT_FOUND),
+ ('anotheruser', False, False, status.HTTP_404_NOT_FOUND),
+ ('alice', True, True, status.HTTP_200_OK),
+ ('alice', True, False, status.HTTP_200_OK),
+ ('alice', False, True, status.HTTP_404_NOT_FOUND),
+ ('alice', False, False, status.HTTP_404_NOT_FOUND),
+ ('bob', True, True, status.HTTP_404_NOT_FOUND),
+ ('bob', True, False, status.HTTP_404_NOT_FOUND),
+ ('bob', False, True, status.HTTP_200_OK),
+ ('bob', False, False, status.HTTP_200_OK),
+ )
+ @unpack
+ def test_can_archive_or_unarchive(
+ self,
+ username: str,
+ owned_by_org: bool,
+ is_active: bool,
+ expected_status_code: int,
+ ):
+
+ if owned_by_org:
+ response = self._create_asset_by_alice()
+ else:
+ response = self._create_asset_by_bob()
+ asset_uid = response.data['uid']
+ user = User.objects.get(username=username)
+ assert_detail_url = reverse(
+ self._get_endpoint('asset-deployment'), kwargs={'uid': asset_uid}
+ )
+ data = {'active': is_active}
+
+ self.client.force_login(user)
+ response = self.client.patch(assert_detail_url, data)
+ assert response.status_code == expected_status_code
+
+ if expected_status_code == status.HTTP_200_OK:
+ assert response.data['asset']['deployment__active'] == is_active
+
+ @data(
+ ('someuser', True, status.HTTP_200_OK),
+ ('someuser', False, status.HTTP_200_OK),
+ ('anotheruser', True, status.HTTP_200_OK),
+ ('anotheruser', False, status.HTTP_404_NOT_FOUND),
+ ('alice', True, status.HTTP_200_OK),
+ ('alice', False, status.HTTP_404_NOT_FOUND),
+ ('bob', True, status.HTTP_404_NOT_FOUND),
+ ('bob', False, status.HTTP_200_OK),
+ )
+ @unpack
+ def test_can_assign_permissions(
+ self,
+ username: str,
+ owned_by_org: bool,
+ expected_status_code: int,
+ ):
+ if owned_by_org:
+ response = self._create_asset_by_alice()
+ else:
+ response = self._create_asset_by_bob()
+
+ asset_uid = response.data['uid']
+ user = User.objects.get(username=username)
+
+ payload = {
+ 'user': self.obj_to_url(self.alice),
+ 'permission': self.obj_to_url(
+ Permission.objects.get(codename=PERM_VIEW_ASSET)
+ ),
+ }
+
+ self.client.force_login(user)
+ url = reverse(
+ self._get_endpoint('asset-permission-assignment-bulk-assignments'),
+ kwargs={'parent_lookup_asset': asset_uid},
+ )
+ response = self.client.post(url, data=payload)
+ response.status_code == expected_status_code
+
+
+class OrganizationAdminsDataApiTestCase(
+ SubmissionEditTestCaseMixin,
+ SubmissionDeleteTestCaseMixin,
+ SubmissionViewTestCaseMixin,
+ BaseOrganizationAdminsDataApiTestCase
+):
+ """
+ This test suite shares logic with `SubmissionEditApiTests`,
+ `SubmissionViewApiTests` and uses the mixins to call the same code for consistency
+ and reusability.
+ """
+
+ def test_can_access_data(self):
+ response = self.client.get(self.data_url)
+ assert response.status_code == status.HTTP_200_OK
+ assert response.data['count'] == 1
+
+ response = self.client.get(self.submission_url)
+ assert response.status_code == status.HTTP_200_OK
+ assert response.data['_id'] == self.submission['_id']
+
+ def test_can_bulk_delete_data(self):
+ self.submission_bulk_url = reverse(
+ self._get_endpoint('submission-bulk'),
+ kwargs={
+ 'parent_lookup_asset': self.asset.uid,
+ },
+ )
+ self._delete_submissions()
+
+ def test_can_delete_data(self):
+ self._delete_submission(self.submission)
+
+ @responses.activate
+ def test_can_get_edit_link(self):
+ self._get_edit_link()
+
+ @responses.activate
+ def test_can_get_view_link(self):
+ self._get_view_link()
+
+ def test_can_submit_data(self):
+ """
+ Test that anotheruser can submit data if they have the necessary permissions.
+
+ This test verifies that anotheruser retains the "PERM_ADD_SUBMISSIONS"
+ permission, ensuring that it is not temporarily added by `mock_submissions`.
+ The `mock_submissions` function internally calls `create_instance`, which
+ validates the user's permission (via `_submitted_by`) before saving the data
+ to the database.
+
+ If `create_instance` completes without raising an error, this confirms that the
+ user has the required permissions to submit data.
+ """
+
+ submission = {
+ 'egg': 3,
+ 'bacon': 0,
+ '_submitted_by': self.anotheruser,
+ }
+
+ self.asset.has_perm(self.anotheruser, PERM_ADD_SUBMISSIONS)
+ self.asset.deployment.mock_submissions([submission])
+ self.asset.has_perm(self.anotheruser, PERM_ADD_SUBMISSIONS)
+
+
+class OrganizationAdminsAssetFileApiTestCase(
+ AssetFileTestCaseMixin, BaseOrganizationAssetApiTestCase
+):
+ """
+ This test suite shares logic with `AssetFileTest` and uses the
+ mixin to call the same code for consistency and reusability.
+ """
+
+ def setUp(self):
+ super().setUp()
+ response = self._create_asset_by_alice()
+ self.asset = Asset.objects.get(uid=response.data['uid'])
+ self.current_username = 'anotheruser'
+ self.list_url = reverse(
+ self._get_endpoint('asset-file-list'), args=[self.asset.uid]
+ )
+ self.client.force_login(self.anotheruser)
+
+ def test_can_get_asset_files(self):
+ self.client.force_login(self.someuser)
+ self.current_username = 'someuser'
+ af_uid = self.verify_asset_file(self.create_asset_file())
+
+ self.client.force_login(self.anotheruser)
+ self.current_username = 'anotheruser'
+ response = self.client.get(self.list_url)
+ assert response.status_code == status.HTTP_200_OK
+ assert response.data['results'][0]['uid'] == af_uid
+
+ def test_can_post_asset_files(self):
+ response = self.create_asset_file()
+ self.verify_asset_file(response)
+
+ def test_can_delete_asset_files(self):
+ self.delete_asset_file()
+
+
+class OrganizationAdminsRestServiceApiTestCase(
+ HookTestCaseMixin, BaseOrganizationAssetApiTestCase
+):
+ """
+ This test suite shares logic with `HookTestCase` and uses the mixin to call the
+ same code for consistency and reusability.
+ """
+
+ def setUp(self):
+ super().setUp()
+ response = self._create_asset_by_alice()
+ self.asset = Asset.objects.get(uid=response.data['uid'])
+ self.asset.deploy(backend='mock', active=True)
+ self.client.force_login(self.anotheruser)
+
+ def test_can_add_rest_services(self):
+ self._create_hook()
+
+ def test_can_list_rest_services(self):
+ hook = self._create_hook()
+ list_url = reverse(self._get_endpoint('hook-list'), kwargs={
+ 'parent_lookup_asset': self.asset.uid
+ })
+
+ response = self.client.get(list_url)
+ assert response.status_code == status.HTTP_200_OK
+ assert response.data['count'] == 1
+ assert response.data['results'][0]['uid'] == hook.uid
+
+ detail_url = reverse('hook-detail', kwargs={
+ 'parent_lookup_asset': self.asset.uid,
+ 'uid': hook.uid,
+ })
+
+ response = self.client.get(detail_url)
+ assert response.status_code == status.HTTP_200_OK
+ assert response.data['uid'] == hook.uid
+
+ def test_can_delete_rest_services(self):
+ hook = self._create_hook()
+ detail_url = reverse(
+ self._get_endpoint('hook-detail'),
+ kwargs={'parent_lookup_asset': self.asset.uid, 'uid': hook.uid},
+ )
+ response = self.client.delete(detail_url)
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+
+ def test_can_update_rest_services(self):
+ hook = self._create_hook()
+ detail_url = reverse(
+ self._get_endpoint('hook-detail'),
+ kwargs={'parent_lookup_asset': self.asset.uid, 'uid': hook.uid},
+ )
+ data = {'name': 'some disabled external service', 'active': False}
+ response = self.client.patch(detail_url, data)
+ assert response.status_code == status.HTTP_200_OK
+ hook.refresh_from_db()
+ assert not hook.active
+ assert hook.name == 'some disabled external service'
+
+ def _add_submissions(self):
+ submission = {
+ 'egg': 2,
+ 'bacon': 1,
+ }
+ self.asset.deployment.mock_submissions([submission])
+
+
+class OrganizationAdminsValidationStatusApiTestCase(
+ SubmissionValidationStatusTestCaseMixin, BaseOrganizationAdminsDataApiTestCase
+):
+ """
+ This test suite shares logic with `SubmissionValidationStatusApiTests` and uses
+ the mixin to call the same code for consistency and reusability.
+ """
+
+ def setUp(self):
+ super().setUp()
+ self.validation_status_url = reverse(
+ self._get_endpoint('submission-validation-status'),
+ kwargs={
+ 'parent_lookup_asset': self.asset.uid,
+ 'pk': self.submission['_id'],
+ },
+ )
+ self.validation_statuses_url = reverse(
+ self._get_endpoint('submission-validation-statuses'),
+ kwargs={'parent_lookup_asset': self.asset.uid, 'format': 'json'},
+ )
+
+ def test_can_access_validation_status(self):
+ response = self.client.get(self.submission_url)
+ assert response.status_code == status.HTTP_200_OK
+ assert response.data['_validation_status'] == {}
+
+ response = self.client.get(self.validation_status_url)
+ assert response.status_code == status.HTTP_200_OK
+ assert response.data == {}
+
+ def test_can_update_validation_status(self):
+ self._update_status('anotheruser')
+
+ def test_can_delete_validation_status(self):
+ self._update_status('anotheruser')
+ self._delete_status()
+
+ def test_can_bulk_validate_statuses(self):
+ self._validate_statuses(empty=True)
+ self._update_statuses(status_uid='validation_status_not_approved')
+ self._validate_statuses(
+ uid='validation_status_not_approved', username='anotheruser'
+ )
+
+ def test_can_bulk_delete_statuses(self):
+ self._delete_statuses()
diff --git a/kobo/apps/organizations/views.py b/kobo/apps/organizations/views.py
index 7102f513bd..af959e6cc0 100644
--- a/kobo/apps/organizations/views.py
+++ b/kobo/apps/organizations/views.py
@@ -8,8 +8,10 @@
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
+from rest_framework.request import Request
from kpi.constants import ASSET_TYPE_SURVEY
+from kpi.filters import AssetOrderingFilter, SearchFilter
from kpi.models.asset import Asset
from kpi.paginators import AssetUsagePagination
from kpi.permissions import IsAuthenticated
@@ -18,12 +20,47 @@
ServiceUsageSerializer,
)
from kpi.utils.object_permission import get_database_user
+from kpi.views.v2.asset import AssetViewSet
from .models import Organization
-from .permissions import IsOrgAdminOrReadOnly
+from .permissions import IsOrgAdmin, IsOrgAdminOrReadOnly
from .serializers import OrganizationSerializer
from ..stripe.constants import ACTIVE_STRIPE_STATUSES
+class OrganizationAssetViewSet(AssetViewSet):
+ """
+ This class is specifically designed for the `assets` action of the
+ OrganizationViewSet below.
+
+ It overrides the queryset of the parent class (AssetViewSet), limiting
+ results to assets owned by the organization. The `permission_classes`
+ attribute is deliberately left empty to prevent duplicate permission checks
+ with OrganizationViewSet.asset(). It relies on `permissions_checked` being
+ passed as a `self.request` attribute to confirm that permissions have been
+ properly validated beforehand.
+ """
+
+ permission_classes = []
+ filter_backends = [
+ SearchFilter,
+ AssetOrderingFilter,
+ ]
+
+ def get_queryset(self, *args, **kwargs):
+ if not getattr(self.request, 'permissions_checked', False):
+ # Perform a sanity check to ensure that permissions have been properly
+ # validated within `OrganizationViewSet.assets()`.
+ raise AttributeError('`permissions_checked` is missing')
+
+ queryset = super().get_queryset(*args, **kwargs)
+ if self.action == 'list':
+ return queryset.filter(
+ owner=self.request.user.organization.owner_user_object
+ )
+ else:
+ raise NotImplementedError
+
+
@method_decorator(cache_page(settings.ENDPOINT_CACHE_DURATION), name='service_usage')
# django uses the Vary header in its caching, and each middleware can potentially add more Vary headers
# we use this decorator to remove any Vary headers except 'origin' (we don't want to cache between different installs)
@@ -43,8 +80,34 @@ class OrganizationViewSet(viewsets.ModelViewSet):
permission_classes = (IsAuthenticated, IsOrgAdminOrReadOnly)
pagination_class = AssetUsagePagination
+ @action(
+ detail=True,
+ methods=['GET'],
+ permission_classes=[IsOrgAdmin]
+ )
+ def assets(self, request: Request, *args, **kwargs):
+ """
+ ### Retrieve Organization Assets
+
+ This endpoint returns all assets associated with a specific organization.
+ The assets listed here are restricted to those owned by the specified
+ organization.
+
+ Only the owner or administrators of the organization can access this endpoint.
+
+ ### Additional Information
+ For more details, please refer to `/api/v2/assets/`.
+ """
+ self.get_object() # Call check permissions
+
+ # Permissions check is done by `OrganizationAssetViewSet` permission classes
+ asset_view = OrganizationAssetViewSet.as_view({'get': 'list'})
+ django_http_request = request._request
+ django_http_request.permissions_checked = True
+ return asset_view(request=django_http_request)
+
def get_queryset(self) -> QuerySet:
- user = self.request.user
+ user = get_database_user(self.request.user)
return super().get_queryset().filter(users=user)
@action(detail=True, methods=['get'])
diff --git a/kobo/apps/stripe/tests/test_customer_portal_api.py b/kobo/apps/stripe/tests/test_customer_portal_api.py
index 9faeaca8ff..9e6421e0d8 100644
--- a/kobo/apps/stripe/tests/test_customer_portal_api.py
+++ b/kobo/apps/stripe/tests/test_customer_portal_api.py
@@ -28,11 +28,14 @@ def _get_url(query_params):
return f'{url}?{urlencode(query_params)}'
def _create_stripe_data(self, create_subscription=True, product_type='plan'):
- self.organization = baker.make(Organization, id='orgSALFMLFMSDGmgdlsgmsd')
- self.customer = baker.make(Customer, subscriber=self.organization, livemode=False)
+ self.organization = baker.make(
+ Organization, id='orgSALFMLFMSDGmgdlsgmsd', mmo_override=True
+ )
+ self.customer = baker.make(
+ Customer, subscriber=self.organization, livemode=False
+ )
self.product = baker.make(
- Product,
- metadata={'product_type': product_type}
+ Product, metadata={'product_type': product_type}
)
self.price = baker.make(
Price,
diff --git a/kobo/apps/stripe/tests/test_organization_usage.py b/kobo/apps/stripe/tests/test_organization_usage.py
index fb0d62e907..ef2c57a82f 100644
--- a/kobo/apps/stripe/tests/test_organization_usage.py
+++ b/kobo/apps/stripe/tests/test_organization_usage.py
@@ -52,7 +52,7 @@ def setUpTestData(cls):
cls.now = timezone.now()
cls.organization = baker.make(
- Organization, id=cls.org_id, name='test organization'
+ Organization, id=cls.org_id, name='test organization', mmo_override=True
)
cls.organization.add_user(cls.anotheruser, is_admin=True)
assets = create_mock_assets([cls.anotheruser], cls.assets_per_user)
diff --git a/kobo/apps/stripe/tests/test_subscription_api.py b/kobo/apps/stripe/tests/test_subscription_api.py
index 28b8d20899..901b25c077 100644
--- a/kobo/apps/stripe/tests/test_subscription_api.py
+++ b/kobo/apps/stripe/tests/test_subscription_api.py
@@ -5,7 +5,6 @@
from rest_framework import status
from kobo.apps.kobo_auth.shortcuts import User
-from kobo.apps.organizations.models import Organization
from kpi.tests.kpi_test_case import BaseTestCase
@@ -19,8 +18,7 @@ def setUp(self):
self.url_list = reverse('subscriptions-list')
def _insert_data(self):
- organization = baker.make(Organization)
- organization.add_user(self.someuser, is_admin=True)
+ organization = self.someuser.organization
customer = baker.make(Customer, subscriber=organization)
self.subscription = baker.make(
'djstripe.Subscription',
diff --git a/kobo/settings/base.py b/kobo/settings/base.py
index a19b516e8c..0e632527bc 100644
--- a/kobo/settings/base.py
+++ b/kobo/settings/base.py
@@ -18,6 +18,7 @@
from kobo.apps.stripe.constants import FREE_TIER_EMPTY_DISPLAY, FREE_TIER_NO_THRESHOLDS
from kpi.utils.json import LazyJSONSerializable
+from kpi.constants import PERM_DELETE_ASSET, PERM_MANAGE_ASSET
from ..static_lists import EXTRA_LANG_INFO, SECTOR_CHOICE_DEFAULTS
env = environ.Env()
@@ -1793,3 +1794,8 @@ def dj_stripe_request_callback_method():
SILENCED_SYSTEM_CHECKS = ['guardian.W001']
DIGEST_LOGIN_FACTORY = 'django_digest.NoEmailLoginFactory'
+
+# Admins will not be explicitly granted these permissions, (i.e., not referenced
+# in the ObjectPermission table), but the code will still conduct the permission
+# checks as if they were.
+ADMIN_ORG_INHERITED_PERMS = [PERM_DELETE_ASSET, PERM_MANAGE_ASSET]
diff --git a/kpi/backends.py b/kpi/backends.py
index c2b77d2949..8fe4f65d3c 100644
--- a/kpi/backends.py
+++ b/kpi/backends.py
@@ -39,6 +39,11 @@ def has_perm(self, user_obj, perm, obj=None):
# Obey limits on anonymous users' permissions
if perm not in settings.ALLOWED_ANONYMOUS_PERMISSIONS:
return False
+
+ if hasattr(obj, 'has_mapped_perm'):
+ if obj.has_mapped_perm(user_obj, perm):
+ return True
+
return super().has_perm(user_obj, perm, obj)
if not user_obj.is_active:
# Inactive users are denied immediately
diff --git a/kpi/db_routers.py b/kpi/db_routers.py
index 75840d5e2e..43cb1cf83f 100644
--- a/kpi/db_routers.py
+++ b/kpi/db_routers.py
@@ -13,7 +13,7 @@ class DefaultDatabaseRouter:
def db_for_read(self, model, **hints):
"""
- Reads go to KoBoCAT database when `model` is a ShadowModel
+ Reads go to KoboCAT database when `model` is a ShadowModel
"""
if (
model._meta.app_label in SHADOW_MODEL_APP_LABELS
@@ -25,7 +25,7 @@ def db_for_read(self, model, **hints):
def db_for_write(self, model, **hints):
"""
- Writes go to KoBoCAT database when `model` is a ShadowModel
+ Writes go to KoboCAT database when `model` is a ShadowModel
"""
if getattr(model, 'read_only', False):
diff --git a/kpi/filters.py b/kpi/filters.py
index ba6d2e242f..7b3eac83a9 100644
--- a/kpi/filters.py
+++ b/kpi/filters.py
@@ -1,4 +1,3 @@
-# coding: utf-8
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import FieldError
@@ -36,8 +35,9 @@
from kpi.utils.django_orm_helper import OrderCustomCharField
from kpi.utils.query_parser import get_parsed_parameters, parse, ParseError
from kpi.utils.object_permission import (
- get_objects_for_user,
get_anonymous_user,
+ get_database_user,
+ get_objects_for_user,
get_perm_ids_from_code_names,
)
from kpi.utils.permissions import is_user_anonymous
@@ -59,7 +59,7 @@ class AssetOwnerFilterBackend(filters.BaseFilterBackend):
"""
def filter_queryset(self, request, queryset, view):
- fields = {"asset__owner": request.user}
+ fields = {'asset__owner': request.user}
return queryset.filter(**fields)
@@ -164,6 +164,7 @@ class KpiObjectPermissionsFilter:
DATA_SHARING_PARAMETER = 'data_sharing__enabled'
def filter_queryset(self, request, queryset, view):
+
user = request.user
if user.is_superuser and view.action != 'list':
# For a list, we won't deluge the superuser with everyone else's
@@ -403,17 +404,31 @@ class RelatedAssetPermissionsFilter(KpiObjectPermissionsFilter):
"""
Uses KpiObjectPermissionsFilter to determine which assets the user
may access, and then filters the provided queryset to include only objects
- related to those assets. The queryset's model must be related to `Asset`
- via a field named `asset`.
+ related to those assets.
+ If the current user is an admin of an organization, all assets of that organization
+ should be added too.
+ The queryset's model must be related to `Asset` via a field named `asset`.
"""
def filter_queryset(self, request, queryset, view):
+
+ user = get_database_user(request.user)
+ organization = user.organization
+ if organization.is_admin_only(user):
+ # Admins do not receive explicit permission assignments,
+ # but they have the same access to assets as the organization owner.
+ org_assets = Asset.objects.filter(
+ owner=organization.owner_user_object
+ )
+ else:
+ org_assets = Asset.objects.none()
+
available_assets = super().filter_queryset(
request=request,
queryset=Asset.objects.all(),
view=view
)
- return queryset.filter(asset__in=available_assets)
+ return queryset.filter(asset__in=available_assets | org_assets)
class SearchFilter(filters.BaseFilterBackend):
diff --git a/kpi/mixins/object_permission.py b/kpi/mixins/object_permission.py
index 61d05eba0a..a5f33b5909 100644
--- a/kpi/mixins/object_permission.py
+++ b/kpi/mixins/object_permission.py
@@ -555,20 +555,42 @@ def assign_perm(self, user_obj, perm, deny=False, defer_recalc=False,
self.recalculate_descendants_perms()
return new_permission
+ @classmethod
+ def get_org_admin_inherited_perms(
+ cls, for_instance: Optional['kpi.models.Asset'] = None
+ ):
+ return set(
+ perm
+ for inherit_perm in settings.ADMIN_ORG_INHERITED_PERMS
+ for perm in [
+ inherit_perm,
+ *cls.get_implied_perms(inherit_perm, for_instance=for_instance),
+ ]
+ )
+
def get_perms(self, user_obj: settings.AUTH_USER_MODEL) -> list[str]:
"""
Return a list of codenames of all effective grant permissions that
user_obj has on this object.
"""
- user_perm_ids = self._get_effective_perms(user=user_obj)
+ user = get_database_user(user_obj)
+ user_perm_ids = self._get_effective_perms(user=user)
perm_ids = [x[1] for x in user_perm_ids]
- assigned_perms = Permission.objects.filter(pk__in=perm_ids).values_list(
- 'codename', flat=True
+ assigned_perms = list(
+ Permission.objects.filter(pk__in=perm_ids).values_list(
+ 'codename', flat=True
+ )
)
project_views_perms = get_project_view_user_permissions_for_asset(
- self, user_obj
+ self, user
)
- return list(set(list(assigned_perms) + project_views_perms))
+
+ other_perms = []
+ if self.owner and self.owner.organization.is_admin_only(user):
+ # Admins do not receive explicit permission assignments.
+ other_perms = list(self.get_org_admin_inherited_perms(self))
+
+ return list(set(assigned_perms + project_views_perms + other_perms))
def get_partial_perms(self, user_id, with_filters=False):
"""
@@ -622,7 +644,17 @@ def has_perm(self, user_obj: User, perm: str) -> bool:
user=user_obj,
codename=codename
)) == 1
+
if not result and not is_anonymous:
+ org_admin_perms = self.get_org_admin_inherited_perms()
+
+ if (
+ self.owner
+ and self.owner.organization.is_admin_only(user_obj)
+ and codename in org_admin_perms
+ ):
+ return True
+
if perm in ProjectView.ALLOWED_PERMISSIONS:
result = user_has_project_view_asset_perm(self, user_obj, perm)
@@ -911,9 +943,9 @@ def build_dict(user_id_, object_permissions_):
@staticmethod
@cache_for_request
- def __get_permissions_for_content_type(content_type_id,
- codename=None,
- codename__startswith=None):
+ def __get_permissions_for_content_type(
+ content_type_id, codename=None, codename__startswith=None
+ ):
"""
Gets permissions for specific content type and permission's codename
This method is cached per request because it can be called several times
@@ -936,8 +968,9 @@ def __get_permissions_for_content_type(content_type_id,
if codename__startswith is not None:
filters['codename__startswith'] = codename__startswith
- permissions = Permission.objects.filter(**filters). \
- values_list('pk', 'codename')
+ permissions = Permission.objects.filter(**filters).values_list(
+ 'pk', 'codename'
+ )
return permissions
@@ -945,7 +978,6 @@ def __get_permissions_for_content_type(content_type_id,
class ObjectPermissionViewSetMixin:
def cache_all_assets_perms(self, asset_ids: list) -> dict:
-
object_permissions = ObjectPermission.objects.filter(
asset_id__in=asset_ids,
deny=False,
diff --git a/kpi/paginators.py b/kpi/paginators.py
index 4b0b9ea667..29823ea1a4 100644
--- a/kpi/paginators.py
+++ b/kpi/paginators.py
@@ -121,7 +121,7 @@ class DataPagination(LimitOffsetPagination):
offset_query_param = 'start'
max_limit = settings.SUBMISSION_LIST_LIMIT
-
+
class FastAssetPagination(Paginated):
"""
Pagination class optimized for faster counting for DISTINCT queries on large tables.
diff --git a/kpi/permissions.py b/kpi/permissions.py
index 9d282ee4a1..18a70c8a2f 100644
--- a/kpi/permissions.py
+++ b/kpi/permissions.py
@@ -84,7 +84,9 @@ def _get_parent_object(cls, view):
return cls._get_asset(view)
def _get_user_permissions(
- self, object_: Union['kpi.Asset', 'kpi.Collection'], user: settings.AUTH_USER_MODEL
+ self,
+ object_: Union['kpi.Asset', 'kpi.Collection'],
+ user: settings.AUTH_USER_MODEL,
) -> list[str]:
"""
Returns a list of `user`'s permission for `asset`
@@ -264,6 +266,37 @@ class AssetPermissionAssignmentPermission(AssetNestedObjectPermission):
perms_map['DELETE'] = perms_map['GET']
+class AssetSnapshotPermission(AssetPermission):
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ # Do NOT mutate `perms_map` from the parent class! Doing so will affect
+ # *every* instance of `DjangoObjectPermissions` and all its subclasses
+ app_label = Asset._meta.app_label
+ model_name = Asset._meta.model_name
+
+ self.perms_map = self.perms_map.copy()
+ for action in self.perms_map.keys():
+ for idx, perm in enumerate(self.perms_map[action]):
+ self.perms_map[action][idx] = perm % {
+ 'app_label': app_label,
+ 'model_name': model_name,
+ }
+
+ def has_object_permission(self, request, view, obj):
+ if (
+ view.action == 'submission'
+ or (
+ view.action == 'retrieve'
+ and request.accepted_renderer.format == 'xml'
+ )
+ ):
+ return True
+
+ asset = obj.asset
+ return super().has_object_permission(request, view, asset)
+
+
class AssetVersionReadOnlyPermission(AssetNestedObjectPermission):
required_permissions = ['%(app_label)s.view_asset']
diff --git a/kpi/serializers/v2/asset.py b/kpi/serializers/v2/asset.py
index faad5f8518..1ac3840738 100644
--- a/kpi/serializers/v2/asset.py
+++ b/kpi/serializers/v2/asset.py
@@ -18,6 +18,7 @@
from rest_framework.reverse import reverse
from rest_framework.utils.serializer_helpers import ReturnList
+from kobo.apps.organizations.constants import ORG_ADMIN_ROLE
from kobo.apps.reports.constants import FUZZY_VERSION_PATTERN
from kobo.apps.reports.report_data import build_formpack
from kobo.apps.subsequences.utils.deprecation import WritableAdvancedFeaturesField
@@ -199,6 +200,10 @@ def _has_perms(self, payload: dict, asset_uids: list[str]):
if not asset_uids or self.__user.is_superuser:
return
+ user_filter = [self.__user]
+ if self.__user.organization.is_admin(self.__user):
+ user_filter.append(self.__user.organization.owner_user_object)
+
if not delete_request:
if ProjectTrash.objects.filter(asset__uid__in=asset_uids).exists():
raise exceptions.PermissionDenied()
@@ -206,14 +211,14 @@ def _has_perms(self, payload: dict, asset_uids: list[str]):
code_names = get_cached_code_names(Asset)
perm_dict = code_names[PERM_MANAGE_ASSET]
objects_count = ObjectPermission.objects.filter(
- user=self.__user,
+ user__in=user_filter,
permission_id=perm_dict['id'],
asset__uid__in=asset_uids,
deny=False
).count()
else:
objects_count = Asset.objects.filter(
- owner=self.__user,
+ owner__in=user_filter,
uid__in=asset_uids,
).count()
@@ -668,10 +673,11 @@ def get_status(self, asset):
# No need to read all permissions if `AnonymousUser`'s permissions
# are found.
# We assume that `settings.ANONYMOUS_USER_ID` equals -1.
- perm_assignments = asset.permissions. \
- values('user_id', 'permission__codename'). \
- exclude(user_id=asset.owner_id). \
- order_by('user_id', 'permission__codename')
+ perm_assignments = (
+ asset.permissions.values('user_id', 'permission__codename')
+ .exclude(user_id=asset.owner_id)
+ .order_by('user_id', 'permission__codename')
+ )
return self._get_status(perm_assignments)
@@ -809,6 +815,10 @@ def get_access_types(self, obj):
if request.user.is_superuser:
access_types.append('superuser')
+ if obj.owner.organization.get_user_role(request.user) == ORG_ADMIN_ROLE:
+ access_types.extend(['shared', 'org-admin'])
+ access_types = list(set(access_types))
+
if not access_types:
raise Exception(
f'{request.user.username} has unexpected access to {obj.uid}'
@@ -962,9 +972,7 @@ def _set_asset_ids_cache(self, asset):
def _table_url(self, obj):
request = self.context.get('request', None)
- return reverse('asset-table-view',
- args=(obj.uid,),
- request=request)
+ return reverse('asset-table-view', args=(obj.uid,), request=request)
class AssetListSerializer(AssetSerializer):
@@ -1012,7 +1020,7 @@ def get_permissions(self, asset):
except KeyError:
# Maybe overkill, there are no reasons to enter here.
# in the list context, `object_permissions_per_asset` should
- # be always a property of `self.context`
+ # always be a property of `self.context`
return super().get_permissions(asset)
context = self.context
diff --git a/kpi/tests/api/v2/test_api_asset_permission_assignment.py b/kpi/tests/api/v2/test_api_asset_permission_assignment.py
index 3477c8b4ac..122dab9716 100644
--- a/kpi/tests/api/v2/test_api_asset_permission_assignment.py
+++ b/kpi/tests/api/v2/test_api_asset_permission_assignment.py
@@ -19,11 +19,14 @@
PERM_VIEW_SUBMISSIONS,
)
from kpi.tests.kpi_test_case import KpiTestCase
+from kpi.tests.utils.mixins import PermissionAssignmentTestCaseMixin
from kpi.urls.router_api_v2 import URL_NAMESPACE as ROUTER_URL_NAMESPACE
from kpi.utils.object_permission import get_anonymous_user
-class BaseApiAssetPermissionTestCase(KpiTestCase):
+class BaseApiAssetPermissionTestCase(
+ PermissionAssignmentTestCaseMixin, KpiTestCase
+):
fixtures = ['test_data']
URL_NAMESPACE = ROUTER_URL_NAMESPACE
diff --git a/kpi/tests/api/v2/test_api_assets.py b/kpi/tests/api/v2/test_api_assets.py
index 1b4503d51b..2680653313 100644
--- a/kpi/tests/api/v2/test_api_assets.py
+++ b/kpi/tests/api/v2/test_api_assets.py
@@ -1,9 +1,6 @@
-# coding: utf-8
-import base64
import copy
import json
import os
-from io import StringIO
import dateutil.parser
from django.urls import reverse
@@ -33,6 +30,7 @@
from kpi.utils.hash import calculate_hash
from kpi.utils.object_permission import get_anonymous_user
from kpi.utils.project_views import get_region_for_view
+from kpi.tests.utils.mixins import AssetFileTestCaseMixin
class AssetListApiTests(BaseAssetTestCase):
@@ -1307,7 +1305,7 @@ class AssetExportTaskTest(BaseTestCase):
pass
-class AssetFileTest(BaseTestCase):
+class AssetFileTest(AssetFileTestCaseMixin, BaseTestCase):
fixtures = ['test_data']
URL_NAMESPACE = ROUTER_URL_NAMESPACE
@@ -1331,133 +1329,16 @@ def setUp(self):
),
)
- def get_asset_file_content(self, url):
- response = self.client.get(url)
- return b''.join(response.streaming_content)
-
- @property
- def asset_file_payload(self):
- geojson_ = StringIO(
- json.dumps(
- {
- 'type': 'Feature',
- 'geometry': {'type': 'Point', 'coordinates': [125.6, 10.1]},
- 'properties': {'name': 'Dinagat Islands'},
- }
- )
- )
- geojson_.name = 'dingagat_island.geojson'
- return {
- 'file_type': AssetFile.MAP_LAYER,
- 'description': 'Dinagat Islands',
- 'content': geojson_,
- 'metadata': json.dumps({'source': 'http://geojson.org/'}),
- }
-
def switch_user(self, *args, **kwargs):
self.client.logout()
self.client.login(*args, **kwargs)
self.current_username = kwargs['username']
- def create_asset_file(self,
- payload=None,
- status_code=status.HTTP_201_CREATED):
- payload = self.asset_file_payload if payload is None else payload
-
- response = self.client.get(self.list_url)
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEqual(json.loads(response.content)['count'], 0)
- response = self.client.post(self.list_url, payload)
- self.assertEqual(response.status_code, status_code)
- return response
-
- def verify_asset_file(self, response, payload=None, form_media=False):
- posted_payload = self.asset_file_payload if payload is None else payload
- response_dict = json.loads(response.content)
- self.assertEqual(
- response_dict['asset'],
- self.absolute_reverse(
- self._get_endpoint('asset-detail'), args=[self.asset.uid]
- ),
- )
- self.assertEqual(
- response_dict['user'],
- self.absolute_reverse(
- self._get_endpoint('user-kpi-detail'),
- args=[self.current_username],
- ),
- )
- self.assertEqual(
- response_dict['user__username'],
- self.current_username,
- )
-
- # Some metadata properties are added when file is created.
- # Let's compare without them
- response_metadata = dict(response_dict['metadata'])
-
- if not form_media:
- # `filename` is only mandatory with form media files
- response_metadata.pop('filename', None)
-
- response_metadata.pop('mimetype', None)
- response_metadata.pop('hash', None)
-
- self.assertEqual(
- json.dumps(response_metadata),
- posted_payload['metadata']
- )
- for field in 'file_type', 'description':
- self.assertEqual(response_dict[field], posted_payload[field])
-
- # Content uploaded as binary
- try:
- posted_payload['content'].seek(0)
- except KeyError:
- pass
- else:
- expected_content = posted_payload['content'].read().encode()
- self.assertEqual(
- self.get_asset_file_content(response_dict['content']),
- expected_content
- )
- return response_dict['uid']
-
- # Content uploaded as base64
- try:
- base64_encoded = posted_payload['base64Encoded']
- except KeyError:
- pass
- else:
- media_content = base64_encoded[base64_encoded.index('base64') + 7:]
- expected_content = base64.decodebytes(media_content.encode())
- self.assertEqual(
- self.get_asset_file_content(response_dict['content']),
- expected_content
- )
- return response_dict['uid']
-
- # Content uploaded as a URL
- metadata = json.loads(posted_payload['metadata'])
- payload_url = metadata['redirect_url']
- # if none of the other upload methods have been chosen,
- # `redirect_url` should be present in the response because user
- # must have provided a redirect url. Otherwise, a validation error
- # should have been raised about invalid payload.
- response_url = response_dict['metadata']['redirect_url']
- assert response_url == payload_url and response_url != ''
- return response_dict['uid']
-
def test_owner_can_create_file(self):
self.verify_asset_file(self.create_asset_file())
def test_owner_can_delete_file(self):
- af_uid = self.verify_asset_file(self.create_asset_file())
- detail_url = reverse(self._get_endpoint('asset-file-detail'),
- args=(self.asset.uid, af_uid))
- response = self.client.delete(detail_url)
- self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
- # TODO: test that the file itself is removed
+ self.delete_asset_file()
def test_editor_can_create_file(self):
anotheruser = User.objects.get(username='anotheruser')
diff --git a/kpi/tests/api/v2/test_api_collections.py b/kpi/tests/api/v2/test_api_collections.py
index be84adbd2b..d435ddc7f0 100644
--- a/kpi/tests/api/v2/test_api_collections.py
+++ b/kpi/tests/api/v2/test_api_collections.py
@@ -225,30 +225,31 @@ def test_collection_statuses_and_access_types(self):
public_collection = Asset.objects.create(
asset_type=ASSET_TYPE_COLLECTION,
name='public collection',
- owner=another_user
+ owner=another_user,
)
shared_collection = Asset.objects.create(
asset_type=ASSET_TYPE_COLLECTION,
name='shared collection',
- owner=another_user
+ owner=another_user,
)
subscribed_collection = Asset.objects.create(
asset_type=ASSET_TYPE_COLLECTION,
name='subscribed collection',
- owner=another_user
+ owner=another_user,
)
shared_subscribed_collection = Asset.objects.create(
asset_type=ASSET_TYPE_COLLECTION,
name='shared & subscribed collection',
- owner=another_user
+ owner=another_user,
)
# Make `public_collection` and `subscribed_collection` public-discoverable
public_collection.assign_perm(AnonymousUser(), PERM_DISCOVER_ASSET)
subscribed_collection.assign_perm(AnonymousUser(), PERM_DISCOVER_ASSET)
- shared_subscribed_collection.assign_perm(AnonymousUser(),
- PERM_DISCOVER_ASSET)
+ shared_subscribed_collection.assign_perm(
+ AnonymousUser(), PERM_DISCOVER_ASSET
+ )
# Make `shared_collection` and `shared_subscribed_collection` shared
shared_collection.assign_perm(self.someuser, PERM_VIEW_ASSET)
@@ -256,20 +257,25 @@ def test_collection_statuses_and_access_types(self):
# Subscribe `someuser` to `subscribed_collection`.
subscription_url = reverse(
- self._get_endpoint('userassetsubscription-list'))
+ self._get_endpoint('userassetsubscription-list')
+ )
asset_detail_url = self.absolute_reverse(
self._get_endpoint('asset-detail'),
- kwargs={'uid': subscribed_collection.uid})
- response = self.client.post(subscription_url, data={
- 'asset': asset_detail_url})
+ kwargs={'uid': subscribed_collection.uid},
+ )
+ response = self.client.post(
+ subscription_url, data={'asset': asset_detail_url}
+ )
assert response.status_code == status.HTTP_201_CREATED
# Subscribe `someuser` to `shared_subscribed_collection`.
asset_detail_url = self.absolute_reverse(
self._get_endpoint('asset-detail'),
- kwargs={'uid': shared_subscribed_collection.uid})
- response = self.client.post(subscription_url, data={
- 'asset': asset_detail_url})
+ kwargs={'uid': shared_subscribed_collection.uid},
+ )
+ response = self.client.post(
+ subscription_url, data={'asset': asset_detail_url}
+ )
assert response.status_code == status.HTTP_201_CREATED
list_url = reverse(self._get_endpoint('asset-list'))
@@ -281,31 +287,30 @@ def test_collection_statuses_and_access_types(self):
expected = {
'public collection': {
'status': 'public-discoverable',
- 'access_types': ['public']
+ 'access_types': ['public'],
},
'shared collection': {
'status': 'shared',
- 'access_types': ['shared']
+ 'access_types': ['shared'],
},
'subscribed collection': {
'status': 'public-discoverable',
- 'access_types': ['public', 'subscribed']
- },
- 'test collection': {
- 'status': 'private',
- 'access_types': ['owned']
+ 'access_types': ['public', 'subscribed'],
},
+ 'test collection': {'status': 'private', 'access_types': ['owned']},
'shared & subscribed collection': {
'status': 'public-discoverable',
- 'access_types': ['public', 'shared', 'subscribed']
- }
+ 'access_types': ['public', 'shared', 'subscribed'],
+ },
}
for collection in response.data['results']:
expected_collection = expected[collection.get('name')]
assert expected_collection['status'] == collection['status']
- assert expected_collection['access_types'] \
- == collection['access_types']
+ assert (
+ expected_collection['access_types']
+ == collection['access_types']
+ )
def test_collection_subscribe(self):
public_collection = Asset.objects.create(
diff --git a/kpi/tests/api/v2/test_api_permissions.py b/kpi/tests/api/v2/test_api_permissions.py
index 2917d4d001..b71f979b61 100644
--- a/kpi/tests/api/v2/test_api_permissions.py
+++ b/kpi/tests/api/v2/test_api_permissions.py
@@ -1,4 +1,3 @@
-# coding: utf-8
from django.contrib.auth.models import Permission
from django.urls import reverse
from rest_framework import status
@@ -13,6 +12,7 @@
from kpi.models import Asset, ObjectPermission
from kpi.tests.kpi_test_case import KpiTestCase
from kpi.urls.router_api_v2 import URL_NAMESPACE as ROUTER_URL_NAMESPACE
+from kpi.tests.utils.mixins import PermissionAssignmentTestCaseMixin
from kpi.utils.object_permission import get_anonymous_user
@@ -658,7 +658,9 @@ def test_inherited_viewable_collection_not_deletable(self):
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
-class ApiAssignedPermissionsTestCase(KpiTestCase):
+class ApiAssignedPermissionsTestCase(
+ PermissionAssignmentTestCaseMixin, KpiTestCase
+):
"""
An obnoxiously large amount of code to test that the endpoint for listing
assigned permissions complies with the following rules:
diff --git a/kpi/tests/api/v2/test_api_submissions.py b/kpi/tests/api/v2/test_api_submissions.py
index b659b4d28f..890f881a27 100644
--- a/kpi/tests/api/v2/test_api_submissions.py
+++ b/kpi/tests/api/v2/test_api_submissions.py
@@ -5,10 +5,7 @@
import string
import uuid
from datetime import datetime
-try:
- from zoneinfo import ZoneInfo
-except ImportError:
- from backports.zoneinfo import ZoneInfo
+from zoneinfo import ZoneInfo
from unittest import mock
@@ -40,6 +37,12 @@
)
from kpi.models import Asset
from kpi.tests.base_test_case import BaseTestCase
+from kpi.tests.utils.mixins import (
+ SubmissionDeleteTestCaseMixin,
+ SubmissionEditTestCaseMixin,
+ SubmissionValidationStatusTestCaseMixin,
+ SubmissionViewTestCaseMixin,
+)
from kpi.tests.utils.mock import (
enketo_edit_instance_response,
enketo_edit_instance_response_with_root_name_validation,
@@ -127,7 +130,9 @@ def _add_submissions(self, other_fields: dict = None):
self.submissions = submissions
-class BulkDeleteSubmissionsApiTests(BaseSubmissionTestCase):
+class BulkDeleteSubmissionsApiTests(
+ SubmissionDeleteTestCaseMixin, BaseSubmissionTestCase
+):
# TODO, Add tests with ids and query
@@ -151,14 +156,7 @@ def test_delete_submissions_as_owner(self):
someuser is the project owner
someuser can delete their own data
"""
- data = {'payload': {'confirm': True}}
- response = self.client.delete(
- self.submission_bulk_url, data=data, format='json'
- )
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- response = self.client.get(self.submission_list_url, {'format': 'json'})
-
- self.assertEqual(response.data['count'], 0)
+ self._delete_submissions()
def test_audit_log_on_bulk_delete(self):
"""
@@ -177,7 +175,7 @@ def test_audit_log_on_bulk_delete(self):
# No submissions have been deleted yet
assert audit_log_count == 0
# Delete all submissions
- self.test_delete_submissions_as_owner()
+ self._delete_submissions()
# All submissions have been deleted and should be logged
deleted_submission_ids = AuditLog.objects.values_list('pk', flat=True).filter(
@@ -386,7 +384,7 @@ def test_cant_delete_view_only_submissions_with_partial_perms_as_anotheruser(sel
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
-class SubmissionApiTests(BaseSubmissionTestCase):
+class SubmissionApiTests(SubmissionDeleteTestCaseMixin, BaseSubmissionTestCase):
def setUp(self):
super().setUp()
@@ -786,19 +784,7 @@ def test_delete_submission_as_owner(self):
someuser can delete their own data.
"""
submission = self.submissions_submitted_by_someuser[0]
- url = reverse(
- self._get_endpoint('submission-detail'),
- kwargs={
- 'parent_lookup_asset': self.asset.uid,
- 'pk': submission['_id'],
- },
- )
-
- response = self.client.delete(url, HTTP_ACCEPT='application/json')
- self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
- response = self.client.get(self.submission_list_url, {'format': 'json'})
-
- self.assertEqual(response.data['count'], len(self.submissions) - 1)
+ self._delete_submission(submission)
def test_audit_log_on_delete(self):
"""
@@ -1151,7 +1137,7 @@ def test_attachments_rewrite(self):
assert attachment['question_xpath'] == expected_question_xpaths[idx]
-class SubmissionEditApiTests(BaseSubmissionTestCase):
+class SubmissionEditApiTests(SubmissionEditTestCaseMixin, BaseSubmissionTestCase):
"""
Tests for editin submissions.
@@ -1210,23 +1196,7 @@ def test_get_edit_link_submission_as_owner(self):
someuser is the owner of the project.
someuser can retrieve enketo edit link
"""
- ee_url = (
- f'{settings.ENKETO_URL}/{settings.ENKETO_EDIT_INSTANCE_ENDPOINT}'
- )
- # Mock Enketo response
- responses.add_callback(
- responses.POST, ee_url,
- callback=enketo_edit_instance_response,
- content_type='application/json',
- )
-
- response = self.client.get(self.submission_url, {'format': 'json'})
- assert response.status_code == status.HTTP_200_OK
- expected_response = {
- 'url': f"{settings.ENKETO_URL}/edit/{self.submission['_uuid']}",
- 'version_uid': self.asset.latest_deployed_version.uid,
- }
- self.assertEqual(response.data, expected_response)
+ self._get_edit_link()
@responses.activate
def test_get_edit_submission_redirect_as_owner(self):
@@ -1719,7 +1689,7 @@ def test_edit_submission_snapshot_missing_unauthenticated(self):
self.assertEqual(req.status_code, status.HTTP_404_NOT_FOUND)
-class SubmissionViewApiTests(BaseSubmissionTestCase):
+class SubmissionViewApiTests(SubmissionViewTestCaseMixin, BaseSubmissionTestCase):
def setUp(self):
super().setUp()
@@ -1745,25 +1715,7 @@ def test_get_view_link_submission_as_owner(self):
someuser is the owner of the project.
someuser can get enketo view link
"""
- ee_url = (
- f'{settings.ENKETO_URL}/{settings.ENKETO_VIEW_INSTANCE_ENDPOINT}'
- )
-
- # Mock Enketo response
- responses.add_callback(
- responses.POST, ee_url,
- callback=enketo_view_instance_response,
- content_type='application/json',
- )
-
- response = self.client.get(self.submission_view_link_url, {'format': 'json'})
- assert response.status_code == status.HTTP_200_OK
-
- expected_response = {
- 'url': f"{settings.ENKETO_URL}/view/{self.submission['_uuid']}",
- 'version_uid': self.asset.latest_deployed_version.uid,
- }
- assert response.data == expected_response
+ self._get_view_link()
@responses.activate
def test_get_view_submission_redirect_as_owner(self):
@@ -1972,7 +1924,6 @@ def test_duplicate_submission_as_owner_allowed(self):
someuser is the owner of the project.
someuser is allowed to duplicate their own data
"""
- print('URL :', self.submission_url, flush=True)
response = self.client.post(self.submission_url, {'format': 'json'})
assert response.status_code == status.HTTP_201_CREATED
self._check_duplicate(response)
@@ -2311,7 +2262,9 @@ def test_bulk_update_submissions_as_anotheruser_with_partial_perms(self):
self._check_bulk_update(response)
-class SubmissionValidationStatusApiTests(BaseSubmissionTestCase):
+class SubmissionValidationStatusApiTests(
+ SubmissionValidationStatusTestCaseMixin, BaseSubmissionTestCase
+):
def setUp(self):
super().setUp()
@@ -2374,13 +2327,8 @@ def test_delete_status_as_owner(self):
someuser is the owner of the project.
someuser can delete the validation status of submissions
"""
- response = self.client.delete(self.validation_status_url)
- self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
-
- # Ensure delete worked.
- response = self.client.get(self.validation_status_url)
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEqual(response.data, {})
+ self._update_status('someuser')
+ self._delete_status()
def test_cannot_delete_status_of_not_shared_submission_as_anotheruser(self):
"""
@@ -2427,17 +2375,7 @@ def test_edit_status_as_owner(self):
someuser is the owner of the project.
someuser can update validation status.
"""
- data = {
- 'validation_status.uid': 'validation_status_not_approved'
- }
- response = self.client.patch(self.validation_status_url, data=data)
- self.assertEqual(response.status_code, status.HTTP_200_OK)
-
- # Ensure update worked.
- response = self.client.get(self.validation_status_url)
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEqual(response.data['by_whom'], 'someuser')
- self.assertEqual(response.data['uid'], data['validation_status.uid'])
+ self._update_status('someuser')
def test_cannot_edit_status_of_not_shared_submission_as_anotheruser(self):
"""
@@ -2535,7 +2473,9 @@ def test_edit_status_with_partial_perms_as_anotheruser(self):
self.assertEqual(response.data['uid'], data['validation_status.uid'])
-class SubmissionValidationStatusesApiTests(BaseSubmissionTestCase):
+class SubmissionValidationStatusesApiTests(
+ SubmissionValidationStatusTestCaseMixin, BaseSubmissionTestCase
+):
def setUp(self):
super().setUp()
@@ -2549,64 +2489,23 @@ def setUp(self):
kwargs={'parent_lookup_asset': self.asset.uid, 'format': 'json'},
)
- # Ensure all submissions have no validation status
- response = self.client.get(self.submission_list_url, format='json')
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- emptied = [not s['_validation_status'] for s in response.data['results']]
- self.assertTrue(all(emptied))
-
- # Make the owner change validation status of all submissions
- data = {
- 'payload': {
- 'validation_status.uid': 'validation_status_not_approved',
- 'confirm': True,
- }
- }
- response = self.client.patch(
- self.validation_statuses_url, data=data, format='json'
- )
- self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self._validate_statuses(empty=True)
+ self._update_statuses(status_uid='validation_status_not_approved')
def test_all_validation_statuses_applied(self):
- # ensure all submissions are not approved
- response = self.client.get(self.submission_list_url, format='json')
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- applied = [
- s['_validation_status']['uid'] == 'validation_status_not_approved'
- for s in response.data['results']
- ]
- self.assertTrue(all(applied))
+ self._validate_statuses(
+ uid='validation_status_not_approved', username='someuser'
+ )
- def test_delete_all_status_as_owner(self):
+ def test_delete_all_statuses_as_owner(self):
"""
someuser is the owner of the project.
someuser can bulk delete the status of all their submissions.
"""
- data = {
- 'payload': {
- 'validation_status.uid': None,
- }
- }
- response = self.client.delete(self.validation_statuses_url,
- data=data,
- format='json')
- # `confirm` must be sent within the payload (when all submissions are
- # modified). Otherwise, a ValidationError is raised
- self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
-
- data['payload']['confirm'] = True
- response = self.client.delete(self.validation_statuses_url,
- data=data,
- format='json')
- self.assertEqual(response.status_code, status.HTTP_200_OK)
+ response = self._delete_statuses()
count = self._deployment.calculated_submission_count(self.someuser)
expected_response = {'detail': f'{count} submissions have been updated'}
- self.assertEqual(response.data, expected_response)
-
- # Ensure update worked.
- response = self.client.get(self.submission_list_url)
- for submission in response.data['results']:
- self.assertEqual(submission['_validation_status'], {})
+ assert response.data == expected_response
def test_delete_some_statuses_as_owner(self):
"""
diff --git a/kpi/tests/base_test_case.py b/kpi/tests/base_test_case.py
index bf73b708e1..9955115b0f 100644
--- a/kpi/tests/base_test_case.py
+++ b/kpi/tests/base_test_case.py
@@ -1,10 +1,15 @@
-# coding: utf-8
-from django.urls import reverse
+import json
+import re
+
+from django.contrib.auth.models import Permission
from formpack.utils.expand_content import SCHEMA_VERSION
from rest_framework import status
+from rest_framework.reverse import reverse
from rest_framework.test import APITestCase
-from kpi.models import Asset
+from kobo.apps.kobo_auth.shortcuts import User
+from kpi.models.asset import Asset
+from kpi.models.object_permission import ObjectPermission
# `baker_generators` needs to be imported to give baker extra support
from kpi.tests.utils import baker_generators # noqa
from kpi.urls.router_api_v2 import URL_NAMESPACE as ROUTER_URL_NAMESPACE
@@ -18,33 +23,84 @@ class BaseTestCase(APITestCase):
def absolute_reverse(*args, **kwargs):
return 'http://testserver/' + reverse(*args, **kwargs).lstrip('/')
- def _get_endpoint(self, endpoint):
- if hasattr(self, 'URL_NAMESPACE') and self.URL_NAMESPACE is not None:
- endpoint = '{}:{}'.format(self.URL_NAMESPACE, endpoint) \
- if self.URL_NAMESPACE else endpoint
- return endpoint
-
def login_as_other_user(self, username, password):
self.client.logout()
self.client.login(username=username, password=password)
+ def obj_to_url(self, obj):
+ # Add more types as you need them
+ if isinstance(obj, ObjectPermission):
+ return reverse(
+ self._get_endpoint('asset-permission-assignment-detail'),
+ kwargs={'parent_lookup_asset': obj.asset.uid, 'uid': obj.uid},
+ )
+ if isinstance(obj, Permission):
+ return reverse(
+ self._get_endpoint('permission-detail'),
+ kwargs={'codename': obj.codename},
+ )
+ elif isinstance(obj, User):
+ return reverse(
+ self._get_endpoint('user-kpi-detail'),
+ kwargs={'username': obj.username},
+ )
+ raise NotImplementedError
+
+ def url_to_obj(self, url):
+ uid = self._url_to_uid(url)
+ if uid.startswith('a'):
+ klass = Asset
+ elif uid.startswith('p'):
+ klass = ObjectPermission
+ else:
+ raise NotImplementedError()
+ obj = klass.objects.get(uid=uid)
+ return obj
+
+ @staticmethod
+ def _url_to_uid(url):
+ return re.match(r'.+/(.+)/.*$', url).groups()[0]
+
+ def _get_endpoint(self, endpoint):
+ if hasattr(self, 'URL_NAMESPACE') and self.URL_NAMESPACE is not None:
+ endpoint = (
+ '{}:{}'.format(self.URL_NAMESPACE, endpoint)
+ if self.URL_NAMESPACE
+ else endpoint
+ )
+ return endpoint
+
class BaseAssetTestCase(BaseTestCase):
EMPTY_SURVEY = {'survey': [], 'schema': SCHEMA_VERSION, 'settings': {}}
- def create_asset(self, asset_type='survey'):
- """ Create a new, empty asset as the currently logged-in user """
+ def create_asset(
+ self, asset_type='survey', content: dict = None, name: str = None
+ ):
+ """
+ Create a new, empty asset as the currently logged-in user
+ """
+ if not content:
+ content = {}
+
data = {
- 'content': '{}',
+ 'content': json.dumps(content),
'asset_type': asset_type,
}
+ if name:
+ data['name'] = name
+
list_url = reverse(self._get_endpoint('asset-list'))
response = self.client.post(list_url, data, format='json')
- self.assertEqual(response.status_code, status.HTTP_201_CREATED,
- msg=response.data)
- sa = Asset.objects.order_by('date_created').last()
- self.assertEqual(sa.content, self.EMPTY_SURVEY)
+ self.assertEqual(
+ response.status_code, status.HTTP_201_CREATED, msg=response.data
+ )
+
+ if not content:
+ sa = Asset.objects.order_by('date_created').last()
+ self.assertEqual(sa.content, self.EMPTY_SURVEY)
+
return response
diff --git a/kpi/tests/kpi_test_case.py b/kpi/tests/kpi_test_case.py
index 87fd920da9..0bd4dcdce5 100644
--- a/kpi/tests/kpi_test_case.py
+++ b/kpi/tests/kpi_test_case.py
@@ -1,24 +1,16 @@
-# coding: utf-8
"""
Created on Apr 6, 2015
@author: esmail
"""
-import re
-
-from django.contrib.auth import get_user_model
-from django.contrib.auth.models import Permission
from django.urls import reverse
from rest_framework import status
from kpi.constants import ASSET_TYPE_COLLECTION
-
-from ..models.asset import Asset
-from ..models.object_permission import ObjectPermission
-
# FIXME: Remove the following line when the permissions API is in place.
from .base_test_case import BaseTestCase
from .test_permissions import BasePermissionsTestCase
+from ..models.asset import Asset
class KpiTestCase(BaseTestCase, BasePermissionsTestCase):
@@ -40,55 +32,6 @@ def login(self, username=None, password=None, expect_success=True):
kwargs = {'username': username, 'password': password}
self.assertEqual(self.client.login(**kwargs), expect_success)
- @staticmethod
- def _url_to_uid(url):
- return re.match(r'.+/(.+)/.*$', url).groups()[0]
-
- def url_to_obj(self, url):
- uid = self._url_to_uid(url)
- if uid.startswith('a'):
- klass = Asset
- elif uid.startswith('p'):
- klass = ObjectPermission
- else:
- raise NotImplementedError()
- obj = klass.objects.get(uid=uid)
- return obj
-
- def obj_to_url(self, obj):
- # Add more types as you need them
- if isinstance(obj, ObjectPermission):
- return reverse(
- self._get_endpoint('asset-permission-assignment-detail'),
- kwargs={'parent_lookup_asset': obj.asset.uid, 'uid': obj.uid},
- )
- if isinstance(obj, Permission):
- return reverse(
- self._get_endpoint('permission-detail'),
- kwargs={'codename': obj.codename},
- )
- elif isinstance(obj, get_user_model()):
- return reverse(
- self._get_endpoint('user-kpi-detail'),
- kwargs={'username': obj.username},
- )
- raise NotImplementedError
-
- def get_asset_perm_assignment_list_url(self, asset):
- return reverse(
- self._get_endpoint('asset-permission-assignment-list'),
- kwargs={'parent_lookup_asset': asset.uid}
- )
-
- def get_urls_for_asset_perm_assignment_objs(self, perm_assignments, asset):
- return [
- self.absolute_reverse(
- self._get_endpoint('asset-permission-assignment-detail'),
- kwargs={'uid': uid, 'parent_lookup_asset': asset.uid},
- )
- for uid in perm_assignments.values_list('uid', flat=True)
- ]
-
def create_collection(self, name, owner=None, owner_password=None,
**kwargs):
if owner and owner_password:
diff --git a/kpi/tests/test_organization.py b/kpi/tests/test_organization.py
deleted file mode 100644
index c52e7b32ee..0000000000
--- a/kpi/tests/test_organization.py
+++ /dev/null
@@ -1,49 +0,0 @@
-from django.test import TestCase
-from model_bakery import baker
-
-from kobo.apps.kobo_auth.shortcuts import User
-from kobo.apps.organizations.models import Organization
-from kobo.apps.organizations.constants import (
- ADMIN_ORG_ROLE,
- EXTERNAL_ORG_ROLE,
- MEMBER_ORG_ROLE,
- OWNER_ORG_ROLE,
-)
-
-
-class OrganizationTestCase(TestCase):
-
- fixtures = ['test_data']
-
- def setUp(self):
- self.user = User.objects.get(username='someuser')
- self.organization = baker.make(
- Organization,
- id='orgSALFMLFMSDGmgdlsgmsd',
- slug='orgSALFMLFMSDGmgdlsgmsd',
- )
-
- def test_owner_user_object_property(self):
- # The organization has no members yet, the first user is set as the owner
- self.organization.add_user(self.user)
- anotheruser = User.objects.get(username='anotheruser')
- self.organization.add_user(anotheruser)
- assert self.organization.owner_user_object == self.user
- assert not (self.organization.owner_user_object == anotheruser)
-
- def test_no_owner_user_object_property(self):
- assert self.organization.owner_user_object is None
-
- def test_get_user_role(self):
- anotheruser = User.objects.get(username='anotheruser')
- alice = User.objects.create(username='alice', email='alice@alice.com')
- external = User.objects.create(
- username='external', email='external@external.com'
- )
- self.organization.add_user(self.user)
- self.organization.add_user(anotheruser, is_admin=True)
- self.organization.add_user(alice)
- assert self.organization.get_user_role(self.user) == OWNER_ORG_ROLE
- assert self.organization.get_user_role(anotheruser) == ADMIN_ORG_ROLE
- assert self.organization.get_user_role(alice) == MEMBER_ORG_ROLE
- assert self.organization.get_user_role(external) == EXTERNAL_ORG_ROLE
diff --git a/kpi/tests/test_permissions.py b/kpi/tests/test_permissions.py
index cc1693f3bf..15b5fa3f2e 100644
--- a/kpi/tests/test_permissions.py
+++ b/kpi/tests/test_permissions.py
@@ -21,6 +21,7 @@
)
from kpi.exceptions import BadPermissionsException
from kpi.utils.object_permission import get_all_objects_for_user
+from ..models import ObjectPermission
from ..models.asset import Asset
@@ -861,5 +862,48 @@ def test_user_without_perms_get_anonymous_perms(self):
self.assertFalse(anonymous_user.has_perm(PERM_VIEW_SUBMISSIONS, asset))
asset.assign_perm(anonymous_user, PERM_VIEW_SUBMISSIONS)
self.assertTrue(grantee.has_perm(PERM_VIEW_SUBMISSIONS, asset))
- self.assertTrue(asset.get_perms(grantee),
- asset.get_perms(anonymous_user))
+ self.assertTrue(
+ asset.get_perms(grantee), asset.get_perms(anonymous_user)
+ )
+
+ def test_org_admin_inherited_and_implied_permissions(self):
+ """
+ Test the inherited (and implied) permissions for an admin within
+ an organization.
+
+ This test ensures that admin users receive the correct permissions,
+ both directly inherited and those implied by their role,
+ within the organization context, even if they are not granted explicitly.
+ """
+ expected_perms = [
+ PERM_ADD_SUBMISSIONS,
+ PERM_CHANGE_ASSET,
+ PERM_CHANGE_SUBMISSIONS,
+ PERM_DELETE_ASSET,
+ PERM_DELETE_SUBMISSIONS,
+ PERM_DISCOVER_ASSET,
+ PERM_MANAGE_ASSET,
+ PERM_VALIDATE_SUBMISSIONS,
+ PERM_VIEW_ASSET,
+ PERM_VIEW_SUBMISSIONS,
+ ]
+ assert (
+ list(self.admin_asset.get_org_admin_inherited_perms()).sort()
+ == expected_perms.sort()
+ )
+
+ # Add anotheruser to someuser's org as an admin
+ organization = self.someuser.organization
+ organization.mmo_override = True
+ organization.save(update_fields=['mmo_override'])
+ organization.add_user(self.anotheruser, is_admin=True)
+ for asset in self.someuser.assets.all():
+ # Set permission assignments
+ asset.save()
+ # No permissions are explicitly assigned to anotheruser…
+ assert not ObjectPermission.objects.filter(
+ asset=asset, user=self.anotheruser
+ ).exists()
+ # …but they still access to someuser's org projects
+ for expected_perm in expected_perms:
+ assert asset.has_perm(self.anotheruser, expected_perm)
diff --git a/kpi/tests/test_usage_calculator.py b/kpi/tests/test_usage_calculator.py
index 8cf9dad719..8c4b455082 100644
--- a/kpi/tests/test_usage_calculator.py
+++ b/kpi/tests/test_usage_calculator.py
@@ -191,7 +191,7 @@ def test_no_data(self):
@override_settings(STRIPE_ENABLED=True)
def test_organization_setup(self):
- organization = baker.make(Organization, id='org_abcd1234')
+ organization = baker.make(Organization, id='org_abcd1234', mmo_override=True)
organization.add_user(user=self.anotheruser, is_admin=True)
organization.add_user(user=self.someuser, is_admin=True)
generate_enterprise_subscription(organization)
diff --git a/kpi/tests/utils/mixins.py b/kpi/tests/utils/mixins.py
new file mode 100644
index 0000000000..96f4e3158a
--- /dev/null
+++ b/kpi/tests/utils/mixins.py
@@ -0,0 +1,346 @@
+import base64
+import json
+from io import StringIO
+
+import responses
+from django.conf import settings
+from rest_framework import status
+from rest_framework.reverse import reverse
+
+from kpi.models.asset_file import AssetFile
+from kpi.tests.utils.mock import (
+ enketo_edit_instance_response,
+ enketo_view_instance_response,
+)
+
+
+class AssetFileTestCaseMixin:
+
+ @property
+ def asset_file_payload(self):
+ geojson_ = StringIO(
+ json.dumps(
+ {
+ 'type': 'Feature',
+ 'geometry': {'type': 'Point', 'coordinates': [125.6, 10.1]},
+ 'properties': {'name': 'Dinagat Islands'},
+ }
+ )
+ )
+ geojson_.name = 'dingagat_island.geojson'
+ return {
+ 'file_type': AssetFile.MAP_LAYER,
+ 'description': 'Dinagat Islands',
+ 'content': geojson_,
+ 'metadata': json.dumps({'source': 'http://geojson.org/'}),
+ }
+
+ def create_asset_file(
+ self, payload=None, status_code=status.HTTP_201_CREATED
+ ):
+ payload = self.asset_file_payload if payload is None else payload
+
+ response = self.client.get(self.list_url)
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(json.loads(response.content)['count'], 0)
+ response = self.client.post(self.list_url, payload)
+ self.assertEqual(response.status_code, status_code)
+ return response
+
+ def delete_asset_file(self):
+ af_uid = self.verify_asset_file(self.create_asset_file())
+ detail_url = reverse(
+ self._get_endpoint('asset-file-detail'),
+ args=(self.asset.uid, af_uid),
+ )
+ response = self.client.delete(detail_url)
+ self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
+ # TODO: test that the file itself is removed
+
+ def get_asset_file_content(self, url):
+ response = self.client.get(url)
+ return b''.join(response.streaming_content)
+
+ def verify_asset_file(self, response, payload=None, form_media=False):
+ posted_payload = self.asset_file_payload if payload is None else payload
+ response_dict = json.loads(response.content)
+ self.assertEqual(
+ response_dict['asset'],
+ self.absolute_reverse(
+ self._get_endpoint('asset-detail'), args=[self.asset.uid]
+ ),
+ )
+ self.assertEqual(
+ response_dict['user'],
+ self.absolute_reverse(
+ self._get_endpoint('user-kpi-detail'),
+ args=[self.current_username],
+ ),
+ )
+ self.assertEqual(
+ response_dict['user__username'],
+ self.current_username,
+ )
+
+ # Some metadata properties are added when file is created.
+ # Let's compare without them
+ response_metadata = dict(response_dict['metadata'])
+
+ if not form_media:
+ # `filename` is only mandatory with form media files
+ response_metadata.pop('filename', None)
+
+ response_metadata.pop('mimetype', None)
+ response_metadata.pop('hash', None)
+
+ self.assertEqual(
+ json.dumps(response_metadata), posted_payload['metadata']
+ )
+ for field in 'file_type', 'description':
+ self.assertEqual(response_dict[field], posted_payload[field])
+
+ # Content uploaded as binary
+ try:
+ posted_payload['content'].seek(0)
+ except KeyError:
+ pass
+ else:
+ expected_content = posted_payload['content'].read().encode()
+ self.assertEqual(
+ self.get_asset_file_content(response_dict['content']),
+ expected_content,
+ )
+ return response_dict['uid']
+
+ # Content uploaded as base64
+ try:
+ base64_encoded = posted_payload['base64Encoded']
+ except KeyError:
+ pass
+ else:
+ media_content = base64_encoded[base64_encoded.index('base64') + 7:]
+ expected_content = base64.decodebytes(media_content.encode())
+ self.assertEqual(
+ self.get_asset_file_content(response_dict['content']),
+ expected_content,
+ )
+ return response_dict['uid']
+
+ # Content uploaded as a URL
+ metadata = json.loads(posted_payload['metadata'])
+ payload_url = metadata['redirect_url']
+ # If none of the other upload methods have been chosen,
+ # `redirect_url` should be present in the response because user
+ # must have provided a redirect url. Otherwise, a validation error
+ # should have been raised about invalid payload.
+ response_url = response_dict['metadata']['redirect_url']
+ assert response_url == payload_url and response_url != ''
+ return response_dict['uid']
+
+
+class SubmissionDeleteTestCaseMixin:
+
+ def _delete_submission(self, submission: dict):
+ response = self.client.get(self.submission_list_url)
+ submission_count = response.data['count']
+
+ url = reverse(
+ self._get_endpoint('submission-detail'),
+ kwargs={
+ 'parent_lookup_asset': self.asset.uid,
+ 'pk': submission['_id'],
+ },
+ )
+
+ response = self.client.delete(url, HTTP_ACCEPT='application/json')
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+
+ response = self.client.get(self.submission_list_url)
+ assert response.data['count'] == submission_count - 1
+
+ def _delete_submissions(self):
+ response = self.client.get(self.submission_list_url)
+ assert response.data['count'] != 0
+
+ response = self.client.delete(self.submission_bulk_url, format='json')
+ # `confirm` must be sent within the payload (when all submissions are
+ # deleted). Otherwise, a ValidationError is raised
+ assert response.status_code == status.HTTP_400_BAD_REQUEST
+
+ data = {'payload': {'confirm': True}}
+ response = self.client.delete(
+ self.submission_bulk_url, data=data, format='json'
+ )
+ assert response.status_code == status.HTTP_200_OK
+
+ response = self.client.get(self.submission_list_url)
+ assert response.data['count'] == 0
+
+
+class SubmissionEditTestCaseMixin:
+
+ def _get_edit_link(self):
+ ee_url = (
+ f'{settings.ENKETO_URL}/{settings.ENKETO_EDIT_INSTANCE_ENDPOINT}'
+ )
+ # Mock Enketo response
+ responses.add_callback(
+ responses.POST, ee_url,
+ callback=enketo_edit_instance_response,
+ content_type='application/json',
+ )
+
+ submission_edit_link_url_legacy = reverse(
+ self._get_endpoint('submission-enketo-edit'),
+ kwargs={
+ 'parent_lookup_asset': self.asset.uid,
+ 'pk': self.submission['_id'],
+ },
+ )
+ submission_edit_link_url = submission_edit_link_url_legacy.replace(
+ 'edit', 'enketo/edit'
+ )
+
+ response = self.client.get(submission_edit_link_url, {'format': 'json'})
+ assert response.status_code == status.HTTP_200_OK
+ expected_response = {
+ 'url': f"{settings.ENKETO_URL}/edit/{self.submission['_uuid']}",
+ 'version_uid': self.asset.latest_deployed_version.uid,
+ }
+ assert response.data == expected_response
+
+
+class SubmissionValidationStatusTestCaseMixin:
+
+ def _delete_status(self):
+ response = self.client.delete(self.validation_status_url)
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+
+ # Ensure delete worked.
+ response = self.client.get(self.validation_status_url)
+ assert response.status_code == status.HTTP_200_OK
+ assert response.data == {}
+
+ def _delete_statuses(self):
+ data = {
+ 'payload': {
+ 'validation_status.uid': None,
+ }
+ }
+ response = self.client.delete(
+ self.validation_statuses_url, data=data, format='json'
+ )
+ # `confirm` must be sent within the payload (when all submissions are
+ # modified). Otherwise, a ValidationError is raised
+ assert response.status_code == status.HTTP_400_BAD_REQUEST
+
+ data['payload']['confirm'] = True
+ response = self.client.delete(
+ self.validation_statuses_url, data=data, format='json'
+ )
+ assert response.status_code == status.HTTP_200_OK
+
+ # Ensure update worked.
+ response_list = self.client.get(self.submission_list_url)
+ for submission in response_list.data['results']:
+ assert submission['_validation_status'] == {}
+
+ return response
+
+ def _update_status(
+ self, username: str, status_uid: str = 'validation_status_not_approved'
+ ):
+ data = {
+ 'validation_status.uid': status_uid
+ }
+ response = self.client.patch(self.validation_status_url, data=data)
+ assert response.status_code == status.HTTP_200_OK
+
+ # Ensure update worked.
+ response = self.client.get(self.validation_status_url)
+ assert response.status_code == status.HTTP_200_OK
+ assert response.data['by_whom'] == username
+ assert response.data['uid'] == data['validation_status.uid']
+
+ def _update_statuses(self, status_uid: str = 'validation_status_not_approved'):
+ # Make the owner change validation status of all submissions
+ data = {
+ 'payload': {
+ 'validation_status.uid': status_uid,
+ 'confirm': True,
+ }
+ }
+ response = self.client.patch(
+ self.validation_statuses_url, data=data, format='json'
+ )
+ assert response.status_code == status.HTTP_200_OK
+
+ def _validate_statuses(
+ self, empty: bool = False, uid: str = None, username: str = None
+ ):
+
+ response = self.client.get(self.submission_list_url, format='json')
+ assert response.status_code == status.HTTP_200_OK
+
+ if empty:
+ statuses = [
+ not s['_validation_status'] for s in response.data['results']
+ ]
+ else:
+ statuses = [
+ s['_validation_status']['uid'] == uid
+ and s['_validation_status']['by_whom'] == username
+ for s in response.data['results']
+ ]
+
+ assert all(statuses)
+
+
+class SubmissionViewTestCaseMixin:
+
+ def _get_view_link(self):
+ ee_url = (
+ f'{settings.ENKETO_URL}/{settings.ENKETO_VIEW_INSTANCE_ENDPOINT}'
+ )
+
+ # Mock Enketo response
+ responses.add_callback(
+ responses.POST, ee_url,
+ callback=enketo_view_instance_response,
+ content_type='application/json',
+ )
+
+ submission_view_link_url = reverse(
+ self._get_endpoint('submission-enketo-view'),
+ kwargs={
+ 'parent_lookup_asset': self.asset.uid,
+ 'pk': self.submission['_id'],
+ },
+ )
+
+ response = self.client.get(submission_view_link_url, {'format': 'json'})
+ assert response.status_code == status.HTTP_200_OK
+
+ expected_response = {
+ 'url': f"{settings.ENKETO_URL}/view/{self.submission['_uuid']}",
+ 'version_uid': self.asset.latest_deployed_version.uid,
+ }
+ assert response.data == expected_response
+
+
+class PermissionAssignmentTestCaseMixin:
+
+ def get_asset_perm_assignment_list_url(self, asset):
+ return reverse(
+ self._get_endpoint('asset-permission-assignment-list'),
+ kwargs={'parent_lookup_asset': asset.uid}
+ )
+
+ def get_urls_for_asset_perm_assignment_objs(self, perm_assignments, asset):
+ return [
+ self.absolute_reverse(
+ self._get_endpoint('asset-permission-assignment-detail'),
+ kwargs={'uid': uid, 'parent_lookup_asset': asset.uid},
+ )
+ for uid in perm_assignments.values_list('uid', flat=True)
+ ]
diff --git a/kpi/utils/project_views.py b/kpi/utils/project_views.py
index 47a3ccf2c1..04d48985a3 100644
--- a/kpi/utils/project_views.py
+++ b/kpi/utils/project_views.py
@@ -44,6 +44,8 @@ def user_has_project_view_asset_perm(
Returns True if user has specified permission for asset within project view
if not explicitly granted through Asset.assign_perm()
"""
+ if not asset:
+ return False
return perm in get_project_view_user_permissions_for_asset(asset, user)
diff --git a/kpi/views/v2/asset.py b/kpi/views/v2/asset.py
index 5ec5604a14..829302b6fe 100644
--- a/kpi/views/v2/asset.py
+++ b/kpi/views/v2/asset.py
@@ -1,4 +1,3 @@
-# coding: utf-8
import copy
import json
from collections import OrderedDict, defaultdict
@@ -54,7 +53,10 @@ class AssetViewSet(
ObjectPermissionViewSetMixin, NestedViewSetMixin, AuditLoggedModelViewSet
):
"""
- * Assign a asset to a collection partially implemented
+ * Assign an asset to a collection
+
+ partially implemented
+
* Run a partial update of a asset TODO
## List of asset endpoints
@@ -392,28 +394,6 @@ class AssetViewSet(
]
log_type = AuditType.PROJECT_HISTORY
- def get_object_override(self):
- if self.request.method in ['PATCH', 'GET']:
- try:
- asset = Asset.objects.get(uid=self.kwargs['uid'])
- except Asset.DoesNotExist:
- raise Http404
-
- self.check_object_permissions(self.request, asset)
-
- # Cope with kobotoolbox/formpack#322, which wrote invalid content
- # into the database. For performance, consider only the current
- # content, not previous versions. Previous versions are handled in
- # `kobo.apps.reports.report_data.build_formpack()`
- if self.request.method == 'GET':
- repair_file_column_content_and_save(
- asset, include_versions=False
- )
-
- return asset
-
- return super().get_object_override()
-
@action(
detail=False,
methods=['POST'],
@@ -561,6 +541,30 @@ def finalize_response(self, request, response, *args, **kwargs):
return super().finalize_response(
request, response, *args, **kwargs)
+ def get_object_override(self):
+ """
+ This `get_object` method bypasses the filter backends because the UID
+ already explicitly filters the object to be retrieved.
+ It relies on `check_object_permissions` to validate access to the object.
+ """
+ try:
+ asset = Asset.objects.get(uid=self.kwargs['uid'])
+ except Asset.DoesNotExist:
+ raise Http404
+
+ self.check_object_permissions(self.request, asset)
+
+ # Cope with kobotoolbox/formpack#322, which wrote invalid content
+ # into the database. For performance, consider only the current
+ # content, not previous versions. Previous versions are handled in
+ # `kobo.apps.reports.report_data.build_formpack()`
+ if self.request.method == 'GET':
+ repair_file_column_content_and_save(
+ asset, include_versions=False
+ )
+
+ return asset
+
def get_queryset(self, *args, **kwargs):
queryset = super().get_queryset(*args, **kwargs)
if self.action == 'list':
@@ -706,8 +710,12 @@ def get_serializer_context(self):
# 4) Get children count per asset
# Ordering must be cleared otherwise group_by is wrong
# (i.e. default ordered field `date_modified` must be removed)
- records = Asset.objects.filter(parent_id__in=asset_ids). \
- values('parent_id').annotate(children_count=Count('id')).order_by()
+ records = (
+ Asset.objects.filter(parent_id__in=asset_ids)
+ .values('parent_id')
+ .annotate(children_count=Count('id'))
+ .order_by()
+ )
children_count_per_asset = {
r.get('parent_id'): r.get('children_count', 0)
diff --git a/kpi/views/v2/asset_file.py b/kpi/views/v2/asset_file.py
index 062255fb13..ecde9f8a1a 100644
--- a/kpi/views/v2/asset_file.py
+++ b/kpi/views/v2/asset_file.py
@@ -178,7 +178,8 @@ class PrivateContentView(PrivateStorageDetailView):
# permissions
def can_access_file(self, private_file):
return private_file.request.user.has_perm(
- PERM_VIEW_ASSET, private_file.parent_object.asset)
+ PERM_VIEW_ASSET, private_file.parent_object.asset
+ )
@action(detail=True, methods=['GET'])
def content(self, *args, **kwargs):
diff --git a/kpi/views/v2/asset_snapshot.py b/kpi/views/v2/asset_snapshot.py
index 5763ca8de4..efbae3f12b 100644
--- a/kpi/views/v2/asset_snapshot.py
+++ b/kpi/views/v2/asset_snapshot.py
@@ -1,5 +1,4 @@
import copy
-from typing import Optional
import requests
from django.conf import settings
@@ -11,12 +10,11 @@
from kobo.apps.openrosa.libs.utils.logger_tools import http_open_rosa_error_handler
from kpi.authentication import DigestAuthentication, EnketoSessionAuthentication
-from kpi.constants import PERM_VIEW_ASSET
from kpi.exceptions import SubmissionIntegrityError
from kpi.filters import RelatedAssetPermissionsFilter
from kpi.highlighters import highlight_xform
from kpi.models import AssetFile, AssetSnapshot, PairedData
-from kpi.permissions import EditSubmissionPermission
+from kpi.permissions import AssetSnapshotPermission, EditSubmissionPermission
from kpi.renderers import (
OpenRosaFormListRenderer,
OpenRosaManifestRenderer,
@@ -25,8 +23,6 @@
from kpi.serializers.v2.asset_snapshot import AssetSnapshotSerializer
from kpi.serializers.v2.open_rosa import FormListSerializer, ManifestSerializer
from kpi.tasks import enketo_flush_cached_preview
-from kpi.utils.object_permission import get_database_user
-from kpi.utils.project_views import user_has_project_view_asset_perm
from kpi.utils.xml import XMLFormWithDisclaimer
from kpi.views.no_update_model import NoUpdateModelViewSet
from kpi.views.v2.open_rosa import OpenRosaViewSetMixin
@@ -43,6 +39,7 @@ class AssetSnapshotViewSet(OpenRosaViewSetMixin, NoUpdateModelViewSet):
serializer_class = AssetSnapshotSerializer
lookup_field = 'uid'
queryset = AssetSnapshot.objects.all()
+ permission_classes = [AssetSnapshotPermission]
renderer_classes = NoUpdateModelViewSet.renderer_classes + [
XMLRenderer,
@@ -51,7 +48,7 @@ class AssetSnapshotViewSet(OpenRosaViewSetMixin, NoUpdateModelViewSet):
@property
def asset(self):
if not hasattr(self, '_asset'):
- self._set_asset()
+ self.get_object()
return self._asset
def filter_queryset(self, queryset):
@@ -76,8 +73,13 @@ def filter_queryset(self, queryset):
owned_snapshots = queryset.none()
if not user.is_anonymous:
owned_snapshots = queryset.filter(owner=user)
- return owned_snapshots | RelatedAssetPermissionsFilter(
- ).filter_queryset(self.request, queryset, view=self)
+
+ return (
+ owned_snapshots
+ | RelatedAssetPermissionsFilter().filter_queryset(
+ self.request, queryset, view=self
+ )
+ )
@action(
detail=True,
@@ -88,7 +90,7 @@ def form_list(self, request, *args, **kwargs):
"""
Implements part of the OpenRosa Form List API.
This route is used by Enketo when it fetches external resources.
- It let us specify manifests for preview
+ It lets us specify manifests for preview
"""
if request.method == 'HEAD':
return self.get_response_for_head_request()
@@ -101,31 +103,16 @@ def form_list(self, request, *args, **kwargs):
def get_object(self):
try:
- # Trivial case, try access the object with normal flow
- snapshot = super().get_object()
- except Http404 as e:
- # If 404, fall back on project view permissions
- try:
- snapshot = self.queryset.select_related('asset').defer(
- 'asset__content'
- ).get(uid=self.kwargs[self.lookup_field])
- except AssetSnapshot.DoesNotExist:
- raise e
-
- user = get_database_user(self.request.user)
-
- if (
- self.request.method == 'GET'
- and user_has_project_view_asset_perm(
- snapshot.asset, user, PERM_VIEW_ASSET
- )
- ):
- return self._add_disclaimer(snapshot)
- else:
- # Access to user is still denied, raise 404
- raise Http404
- else:
- return self._add_disclaimer(snapshot)
+ snapshot = self.queryset.select_related('asset').defer(
+ 'asset__content'
+ ).get(uid=self.kwargs[self.lookup_field])
+ except AssetSnapshot.DoesNotExist:
+ raise Http404
+
+ self._asset = snapshot.asset
+ self.check_object_permissions(self.request, snapshot)
+
+ return self._add_disclaimer(snapshot)
@action(
detail=True,
@@ -281,15 +268,5 @@ def _add_disclaimer(self, snapshot: AssetSnapshot) -> AssetSnapshot:
if not self.action == 'xml_with_disclaimer':
return snapshot
- self._set_asset(snapshot)
+ # self._set_asset(snapshot)
return XMLFormWithDisclaimer(snapshot).get_object()
-
- def _set_asset(self, snapshot: Optional[AssetSnapshot] = None):
- if not snapshot:
- snapshot = self.get_object()
- # Calling `snapshot.asset.__class__` instead of `Asset` to avoid circular
- # import
- snapshot.asset = snapshot.asset.__class__.objects.defer(
- 'content'
- ).get(pk=snapshot.asset_id)
- setattr(self, '_asset', snapshot.asset)