From 3c174df4d80e2a0e0edb9bcaee047fcbc4c7ca79 Mon Sep 17 00:00:00 2001 From: rajpatel24 Date: Thu, 14 Nov 2024 21:14:39 +0530 Subject: [PATCH] Add role-based validation tests for organization member permissions --- kobo/apps/organizations/permissions.py | 31 ++++- kobo/apps/organizations/serializers.py | 1 - .../tests/test_organization_members_api.py | 118 ++++++++++++------ kobo/apps/organizations/views.py | 20 +-- 4 files changed, 114 insertions(+), 56 deletions(-) diff --git a/kobo/apps/organizations/permissions.py b/kobo/apps/organizations/permissions.py index 7564a57eda..a29a71f37a 100644 --- a/kobo/apps/organizations/permissions.py +++ b/kobo/apps/organizations/permissions.py @@ -41,4 +41,33 @@ def has_object_permission(self, request, view, obj): return True # Instance must have an attribute named `is_admin` - return obj.is_admin(request.user) + return obj.organization.is_admin(request.user) + + +class IsOrgOwnerOrAdminOrMember(permissions.BasePermission): + def has_permission(self, request, view): + # Ensure the user is authenticated + if not request.user.is_authenticated: + return False + return True + + def has_object_permission(self, request, view, obj): + user_role = obj.organization.get_user_role(request.user) + + # Allow owners to view, update, and delete members + if user_role == 'owner': + return True + + # Allow admins to view and update, but not delete members + if user_role == 'admin': + return request.method in permissions.SAFE_METHODS or request.method == 'PATCH' + + # Allow members to only view other members + if user_role == 'member': + return request.method in permissions.SAFE_METHODS + + # Deny access to external users + if user_role == 'external': + raise Http404() + + return False diff --git a/kobo/apps/organizations/serializers.py b/kobo/apps/organizations/serializers.py index 714b4ced63..7232ba1669 100644 --- a/kobo/apps/organizations/serializers.py +++ b/kobo/apps/organizations/serializers.py @@ -1,4 +1,3 @@ -from constance import config from django.contrib.auth import get_user_model from django.utils.translation import gettext as t from rest_framework import serializers diff --git a/kobo/apps/organizations/tests/test_organization_members_api.py b/kobo/apps/organizations/tests/test_organization_members_api.py index d9cd447a2b..9766600f92 100644 --- a/kobo/apps/organizations/tests/test_organization_members_api.py +++ b/kobo/apps/organizations/tests/test_organization_members_api.py @@ -1,28 +1,25 @@ +from ddt import ddt, data, unpack from django.urls import reverse -from model_bakery import baker from rest_framework import status -from kobo.apps.kobo_auth.shortcuts import User -from kobo.apps.organizations.models import Organization, OrganizationUser -from kpi.tests.kpi_test_case import BaseTestCase +from kobo.apps.organizations.tests.test_organizations_api import ( + BaseOrganizationAssetApiTestCase +) from kpi.urls.router_api_v2 import URL_NAMESPACE - -class OrganizationMemberAPITestCase(BaseTestCase): +@ddt +class OrganizationMemberAPITestCase(BaseOrganizationAssetApiTestCase): fixtures = ['test_data'] URL_NAMESPACE = URL_NAMESPACE def setUp(self): - self.organization = baker.make( - Organization, id='org_12345', mmo_override=True - ) - self.owner_user = baker.make(User, username='owner') - self.member_user = baker.make(User, username='member') + super().setUp() + self.organization = self.someuser.organization + self.owner_user = self.someuser + self.member_user = self.alice + self.admin_user = self.anotheruser + self.external_user = self.bob - self.organization.add_user(self.owner_user) - self.organization.add_user(self.member_user, is_admin=False) - - self.client.force_login(self.owner_user) self.list_url = reverse( self._get_endpoint('organization-members-list'), kwargs={'organization_id': self.organization.id}, @@ -35,36 +32,77 @@ def setUp(self): }, ) - def test_list_members(self): + @data( + ('owner', status.HTTP_200_OK), + ('admin', status.HTTP_200_OK), + ('member', status.HTTP_200_OK), + ('external', status.HTTP_200_OK), + ) + @unpack + def test_list_members_with_different_roles(self, user_role, expected_status): + user = getattr(self, f"{user_role}_user") + self.client.force_login(user) response = self.client.get(self.list_url) - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertIn( - 'owner', - [member['user__username'] for member in response.data.get('results')] - ) - self.assertIn( - 'member', - [member['user__username'] for member in response.data.get('results')] - ) + self.assertEqual(response.status_code, expected_status) + + @data( + ('owner', status.HTTP_200_OK), + ('admin', status.HTTP_200_OK), + ('member', status.HTTP_200_OK), + ('external', status.HTTP_404_NOT_FOUND), + ) + @unpack + def test_retrieve_member_details_with_different_roles(self, user_role, expected_status): + user = getattr(self, f"{user_role}_user") + self.client.force_login(user) + response = self.client.get(self.detail_url(self.member_user)) + self.assertEqual(response.status_code, expected_status) + + @data( + ('owner', status.HTTP_200_OK), + ('admin', status.HTTP_200_OK), + ('member', status.HTTP_403_FORBIDDEN), + ('external', status.HTTP_404_NOT_FOUND), + ) + @unpack + def test_update_member_role_with_different_roles(self, user_role, expected_status): + user = getattr(self, f"{user_role}_user") + data = {'role': 'admin'} + self.client.force_login(user) + response = self.client.patch(self.detail_url(self.member_user), data) + self.assertEqual(response.status_code, expected_status) - def test_retrieve_member_details(self): - response = self.client.get(self.detail_url('member')) - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(response.data['user__username'], 'member') - self.assertEqual(response.data['role'], 'member') + @data( + ('owner', status.HTTP_204_NO_CONTENT), + ('admin', status.HTTP_403_FORBIDDEN), + ('member', status.HTTP_403_FORBIDDEN), + ('external', status.HTTP_404_NOT_FOUND), + ) + @unpack + def test_delete_member_with_different_roles(self, user_role, expected_status): + user = getattr(self, f"{user_role}_user") + self.client.force_login(user) + response = self.client.delete(self.detail_url(self.member_user)) + self.assertEqual(response.status_code, expected_status) + if expected_status == status.HTTP_204_NO_CONTENT: + # Confirm deletion + response = self.client.get(self.detail_url(self.member_user)) + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - def test_update_member_role(self): + @data( + ('owner', status.HTTP_405_METHOD_NOT_ALLOWED), + ('admin', status.HTTP_405_METHOD_NOT_ALLOWED), + ('member', status.HTTP_405_METHOD_NOT_ALLOWED), + ('external', status.HTTP_405_METHOD_NOT_ALLOWED), + ) + @unpack + def test_post_request_is_not_allowed(self, user_role, expected_status): + user = getattr(self, f"{user_role}_user") data = {'role': 'admin'} - response = self.client.patch(self.detail_url('member'), data) - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(response.data['role'], 'admin') + self.client.force_login(user) + response = self.client.post(self.list_url, data) + self.assertEqual(response.status_code, expected_status) - def test_delete_member(self): - response = self.client.delete(self.detail_url('member')) - self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) - # Confirm deletion - response = self.client.get(self.detail_url('member')) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) def test_list_requires_authentication(self): self.client.logout() diff --git a/kobo/apps/organizations/views.py b/kobo/apps/organizations/views.py index 6230a16d7e..eff0fc2f73 100644 --- a/kobo/apps/organizations/views.py +++ b/kobo/apps/organizations/views.py @@ -30,7 +30,11 @@ from kpi.utils.object_permission import get_database_user from kpi.views.v2.asset import AssetViewSet from .models import Organization, OrganizationOwner, OrganizationUser -from .permissions import IsOrgAdmin, IsOrgAdminOrReadOnly +from .permissions import ( + IsOrgAdmin, + IsOrgAdminOrReadOnly, + IsOrgOwnerOrAdminOrMember +) from .serializers import OrganizationSerializer, OrganizationUserSerializer from ..accounts.mfa.models import MfaMethod from ..stripe.constants import ACTIVE_STRIPE_STATUSES @@ -399,7 +403,7 @@ class OrganizationMemberViewSet(viewsets.ModelViewSet): in updates. """ serializer_class = OrganizationUserSerializer - permission_classes = [IsAuthenticated] + permission_classes = [IsOrgOwnerOrAdminOrMember] pagination_class = OrganizationMemberPagination http_method_names = ['get', 'patch', 'delete'] lookup_field = 'user__username' @@ -432,15 +436,3 @@ def get_queryset(self): has_mfa_enabled=Exists(mfa_subquery) ) return queryset - - def partial_update(self, request, *args, **kwargs): - instance = self.get_object() - serializer = self.get_serializer( - instance, data=request.data, partial=True - ) - serializer.is_valid(raise_exception=True) - role = serializer.validated_data.get('role') - if role: - instance.is_admin = (role == 'admin') - instance.save() - return super().partial_update(request, *args, **kwargs)