Skip to content

Commit

Permalink
Add role-based validation tests for organization member permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
rajpatel24 committed Nov 14, 2024
1 parent 1dec6eb commit 3c174df
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 56 deletions.
31 changes: 30 additions & 1 deletion kobo/apps/organizations/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,33 @@ def has_object_permission(self, request, view, obj):
return True

# Instance must have an attribute named `is_admin`
return obj.is_admin(request.user)
return obj.organization.is_admin(request.user)


class IsOrgOwnerOrAdminOrMember(permissions.BasePermission):
def has_permission(self, request, view):
# Ensure the user is authenticated
if not request.user.is_authenticated:
return False
return True

def has_object_permission(self, request, view, obj):
user_role = obj.organization.get_user_role(request.user)

# Allow owners to view, update, and delete members
if user_role == 'owner':
return True

# Allow admins to view and update, but not delete members
if user_role == 'admin':
return request.method in permissions.SAFE_METHODS or request.method == 'PATCH'

# Allow members to only view other members
if user_role == 'member':
return request.method in permissions.SAFE_METHODS

# Deny access to external users
if user_role == 'external':
raise Http404()

return False
1 change: 0 additions & 1 deletion kobo/apps/organizations/serializers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from constance import config
from django.contrib.auth import get_user_model
from django.utils.translation import gettext as t
from rest_framework import serializers
Expand Down
118 changes: 78 additions & 40 deletions kobo/apps/organizations/tests/test_organization_members_api.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
from ddt import ddt, data, unpack
from django.urls import reverse
from model_bakery import baker
from rest_framework import status

from kobo.apps.kobo_auth.shortcuts import User
from kobo.apps.organizations.models import Organization, OrganizationUser
from kpi.tests.kpi_test_case import BaseTestCase
from kobo.apps.organizations.tests.test_organizations_api import (
BaseOrganizationAssetApiTestCase
)
from kpi.urls.router_api_v2 import URL_NAMESPACE


class OrganizationMemberAPITestCase(BaseTestCase):
@ddt
class OrganizationMemberAPITestCase(BaseOrganizationAssetApiTestCase):
fixtures = ['test_data']
URL_NAMESPACE = URL_NAMESPACE

def setUp(self):
self.organization = baker.make(
Organization, id='org_12345', mmo_override=True
)
self.owner_user = baker.make(User, username='owner')
self.member_user = baker.make(User, username='member')
super().setUp()
self.organization = self.someuser.organization
self.owner_user = self.someuser
self.member_user = self.alice
self.admin_user = self.anotheruser
self.external_user = self.bob

self.organization.add_user(self.owner_user)
self.organization.add_user(self.member_user, is_admin=False)

self.client.force_login(self.owner_user)
self.list_url = reverse(
self._get_endpoint('organization-members-list'),
kwargs={'organization_id': self.organization.id},
Expand All @@ -35,36 +32,77 @@ def setUp(self):
},
)

def test_list_members(self):
@data(
('owner', status.HTTP_200_OK),
('admin', status.HTTP_200_OK),
('member', status.HTTP_200_OK),
('external', status.HTTP_200_OK),
)
@unpack
def test_list_members_with_different_roles(self, user_role, expected_status):
user = getattr(self, f"{user_role}_user")
self.client.force_login(user)
response = self.client.get(self.list_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(
'owner',
[member['user__username'] for member in response.data.get('results')]
)
self.assertIn(
'member',
[member['user__username'] for member in response.data.get('results')]
)
self.assertEqual(response.status_code, expected_status)

@data(
('owner', status.HTTP_200_OK),
('admin', status.HTTP_200_OK),
('member', status.HTTP_200_OK),
('external', status.HTTP_404_NOT_FOUND),
)
@unpack
def test_retrieve_member_details_with_different_roles(self, user_role, expected_status):
user = getattr(self, f"{user_role}_user")
self.client.force_login(user)
response = self.client.get(self.detail_url(self.member_user))
self.assertEqual(response.status_code, expected_status)

@data(
('owner', status.HTTP_200_OK),
('admin', status.HTTP_200_OK),
('member', status.HTTP_403_FORBIDDEN),
('external', status.HTTP_404_NOT_FOUND),
)
@unpack
def test_update_member_role_with_different_roles(self, user_role, expected_status):
user = getattr(self, f"{user_role}_user")
data = {'role': 'admin'}
self.client.force_login(user)
response = self.client.patch(self.detail_url(self.member_user), data)
self.assertEqual(response.status_code, expected_status)

def test_retrieve_member_details(self):
response = self.client.get(self.detail_url('member'))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['user__username'], 'member')
self.assertEqual(response.data['role'], 'member')
@data(
('owner', status.HTTP_204_NO_CONTENT),
('admin', status.HTTP_403_FORBIDDEN),
('member', status.HTTP_403_FORBIDDEN),
('external', status.HTTP_404_NOT_FOUND),
)
@unpack
def test_delete_member_with_different_roles(self, user_role, expected_status):
user = getattr(self, f"{user_role}_user")
self.client.force_login(user)
response = self.client.delete(self.detail_url(self.member_user))
self.assertEqual(response.status_code, expected_status)
if expected_status == status.HTTP_204_NO_CONTENT:
# Confirm deletion
response = self.client.get(self.detail_url(self.member_user))
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

def test_update_member_role(self):
@data(
('owner', status.HTTP_405_METHOD_NOT_ALLOWED),
('admin', status.HTTP_405_METHOD_NOT_ALLOWED),
('member', status.HTTP_405_METHOD_NOT_ALLOWED),
('external', status.HTTP_405_METHOD_NOT_ALLOWED),
)
@unpack
def test_post_request_is_not_allowed(self, user_role, expected_status):
user = getattr(self, f"{user_role}_user")
data = {'role': 'admin'}
response = self.client.patch(self.detail_url('member'), data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['role'], 'admin')
self.client.force_login(user)
response = self.client.post(self.list_url, data)
self.assertEqual(response.status_code, expected_status)

def test_delete_member(self):
response = self.client.delete(self.detail_url('member'))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
# Confirm deletion
response = self.client.get(self.detail_url('member'))
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

def test_list_requires_authentication(self):
self.client.logout()
Expand Down
20 changes: 6 additions & 14 deletions kobo/apps/organizations/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
from kpi.utils.object_permission import get_database_user
from kpi.views.v2.asset import AssetViewSet
from .models import Organization, OrganizationOwner, OrganizationUser
from .permissions import IsOrgAdmin, IsOrgAdminOrReadOnly
from .permissions import (
IsOrgAdmin,
IsOrgAdminOrReadOnly,
IsOrgOwnerOrAdminOrMember
)
from .serializers import OrganizationSerializer, OrganizationUserSerializer
from ..accounts.mfa.models import MfaMethod
from ..stripe.constants import ACTIVE_STRIPE_STATUSES
Expand Down Expand Up @@ -399,7 +403,7 @@ class OrganizationMemberViewSet(viewsets.ModelViewSet):
in updates.
"""
serializer_class = OrganizationUserSerializer
permission_classes = [IsAuthenticated]
permission_classes = [IsOrgOwnerOrAdminOrMember]
pagination_class = OrganizationMemberPagination
http_method_names = ['get', 'patch', 'delete']
lookup_field = 'user__username'
Expand Down Expand Up @@ -432,15 +436,3 @@ def get_queryset(self):
has_mfa_enabled=Exists(mfa_subquery)
)
return queryset

def partial_update(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(
instance, data=request.data, partial=True
)
serializer.is_valid(raise_exception=True)
role = serializer.validated_data.get('role')
if role:
instance.is_admin = (role == 'admin')
instance.save()
return super().partial_update(request, *args, **kwargs)

0 comments on commit 3c174df

Please sign in to comment.