Skip to content
This repository was archived by the owner on Jun 13, 2025. It is now read-only.

Commit ab61f47

Browse files
Show Okta Enforced Repos when Impersonating (#786)
Co-authored-by: Michelle Tran <[email protected]>
1 parent a39ae25 commit ab61f47

File tree

8 files changed

+129
-16
lines changed

8 files changed

+129
-16
lines changed

codecov_auth/middleware.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def process_request(self, request: HttpRequest) -> None:
8282
if current_user and not current_user.is_anonymous:
8383
impersonating_ownerid = request.COOKIES.get("staff_user")
8484
if impersonating_ownerid is None:
85+
request.impersonation = False
8586
return
8687

8788
log.info(
@@ -129,6 +130,9 @@ def process_request(self, request: HttpRequest) -> None:
129130
impersonating_ownerid=impersonating_ownerid,
130131
),
131132
)
133+
request.impersonation = True
134+
else:
135+
request.impersonation = False
132136

133137

134138
class CorsMiddleware(BaseCorsMiddleware):

codecov_auth/tests/unit/test_authentication.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from datetime import datetime, timedelta
22
from http.cookies import SimpleCookie
3+
from unittest.mock import call, patch
34

45
import pytest
56
from django.conf import settings
67
from django.test import TestCase, TransactionTestCase, override_settings
78
from django.urls import ResolverMatch
89
from rest_framework.exceptions import AuthenticationFailed, PermissionDenied
910
from rest_framework.test import APIRequestFactory
11+
from shared.django_apps.core.tests.factories import RepositoryFactory
1012

1113
from codecov_auth.authentication import (
1214
InternalTokenAuthentication,
@@ -168,7 +170,7 @@ def test_bearer_token_default_token_envar_and_same_string_as_header(self):
168170
class ImpersonationTests(TransactionTestCase):
169171
def setUp(self):
170172
self.owner_to_impersonate = OwnerFactory(
171-
username="impersonateme", service="github"
173+
username="impersonateme", service="github", user=UserFactory(is_staff=False)
172174
)
173175
self.staff_user = UserFactory(is_staff=True)
174176
self.non_staff_user = UserFactory(is_staff=False)
@@ -184,6 +186,47 @@ def test_impersonation(self):
184186
)
185187
assert res.json()["data"]["me"] == {"user": {"username": "impersonateme"}}
186188

189+
@patch("core.commands.repository.repository.RepositoryCommands.fetch_repository")
190+
def test_impersonation_with_okta(self, mock_call_to_fetch_repository):
191+
repo = RepositoryFactory(author=self.owner_to_impersonate, private=True)
192+
query_repositories = """{ owner(username: "%s") { repository(name: "%s") { ... on Repository { name } } } }"""
193+
query = query_repositories % (repo.author.username, repo.name)
194+
195+
# not impersonating
196+
del self.client.cookies["staff_user"]
197+
self.client.force_login(user=self.owner_to_impersonate.user)
198+
self.client.post(
199+
"/graphql/gh",
200+
{"query": query},
201+
content_type="application/json",
202+
)
203+
204+
# impersonating, same query
205+
self.client.cookies = SimpleCookie({"staff_user": self.owner_to_impersonate.pk})
206+
self.client.force_login(user=self.staff_user)
207+
self.client.post(
208+
"/graphql/gh",
209+
{"query": query},
210+
content_type="application/json",
211+
)
212+
213+
mock_call_to_fetch_repository.assert_has_calls(
214+
[
215+
call(
216+
self.owner_to_impersonate,
217+
repo.name,
218+
[],
219+
exclude_okta_enforced_repos=True,
220+
),
221+
call(
222+
self.owner_to_impersonate,
223+
repo.name,
224+
[],
225+
exclude_okta_enforced_repos=False,
226+
),
227+
]
228+
)
229+
187230
def test_impersonation_non_staff(self):
188231
self.client.force_login(user=self.non_staff_user)
189232
with pytest.raises(PermissionDenied):

graphql_api/actions/repository.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,26 +35,35 @@ def list_repository_for_owner(
3535
owner: Owner,
3636
filters: dict[str, Any] | None,
3737
okta_account_auths: list[int],
38+
exclude_okta_enforced_repos: bool = True,
3839
) -> QuerySet:
40+
queryset = Repository.objects.viewable_repos(current_owner)
41+
42+
if exclude_okta_enforced_repos:
43+
queryset = queryset.exclude_accounts_enforced_okta(okta_account_auths)
44+
3945
queryset = (
40-
Repository.objects.viewable_repos(current_owner)
41-
.exclude_accounts_enforced_okta(okta_account_auths)
42-
.with_recent_coverage()
43-
.with_latest_commit_at()
44-
.filter(author=owner)
46+
queryset.with_recent_coverage().with_latest_commit_at().filter(author=owner)
4547
)
48+
4649
queryset = apply_filters_to_queryset(queryset, filters)
4750
return queryset
4851

4952

5053
def search_repos(
51-
current_owner: Owner, filters: dict[str, Any] | None, okta_account_auths: list[int]
54+
current_owner: Owner,
55+
filters: dict[str, Any] | None,
56+
okta_account_auths: list[int],
57+
exclude_okta_enforced_repos: bool = True,
5258
) -> QuerySet:
5359
authors_from = [current_owner.ownerid] + (current_owner.organizations or [])
60+
queryset = Repository.objects.viewable_repos(current_owner)
61+
62+
if exclude_okta_enforced_repos:
63+
queryset = queryset.exclude_accounts_enforced_okta(okta_account_auths)
64+
5465
queryset = (
55-
Repository.objects.viewable_repos(current_owner)
56-
.exclude_accounts_enforced_okta(okta_account_auths)
57-
.with_recent_coverage()
66+
queryset.with_recent_coverage()
5867
.with_latest_commit_at()
5968
.filter(author__ownerid__in=authors_from)
6069
)

graphql_api/tests/helper.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
from http.cookies import SimpleCookie
2+
3+
from shared.django_apps.codecov_auth.tests.factories import OwnerFactory, UserFactory
4+
15
from codecov_auth.views.okta_cloud import OKTA_SIGNED_IN_ACCOUNTS_SESSION_KEY
26
from utils.test_utils import Client
37

@@ -11,12 +15,21 @@ def gql_request(
1115
variables=None,
1216
with_errors=False,
1317
okta_signed_in_accounts=[],
18+
impersonate_owner=False,
1419
):
1520
url = f"/graphql/{provider}"
1621

1722
if owner:
1823
self.client = Client()
19-
self.client.force_login_owner(owner)
24+
25+
if impersonate_owner:
26+
staff_owner = OwnerFactory(
27+
name="staff_user", service="github", user=UserFactory(is_staff=True)
28+
)
29+
self.client.cookies = SimpleCookie({"staff_user": owner.pk})
30+
self.client.force_login_owner(staff_owner)
31+
else:
32+
self.client.force_login_owner(owner)
2033

2134
if okta_signed_in_accounts:
2235
session = self.client.session

graphql_api/tests/test_current_user_ariadne.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ def test_fetch_terms_agreement_and_business_email_when_owner_profile_is_null(sel
187187
assert data == {"me": {"businessEmail": None, "termsAgreement": False}}
188188

189189
def test_fetch_null_terms_agreement_for_user_without_owner(self):
190-
# There is an edge where a owner without user can call the me endpoint
190+
# There is an edge where an owner without user can call the "me" endpoint
191191
# via impersonation, in that case return null for terms agreement
192192
owner_to_impersonate = OwnerFactory()
193193
owner_to_impersonate.user.delete()
@@ -281,6 +281,22 @@ def test_fetching_viewable_repositories(self):
281281
]
282282
)
283283

284+
# Test with impersonation
285+
data = self.gql_request(query, owner=current_user, impersonate_owner=True)
286+
repos = paginate_connection(data["me"]["viewableRepositories"])
287+
repos_name = [repo["name"] for repo in repos]
288+
assert (
289+
sorted(repos_name)
290+
== [
291+
"1", # public repo in org of user
292+
"2", # private repo in org of user and in user permission
293+
"6", # personal private repo
294+
"7", # personal public repo
295+
"okta_enforced_repo_authed", # Okta repo should show up for impersonated users
296+
"okta_enforced_repo_unauthed", # Okta repo should show up for impersonated users
297+
]
298+
)
299+
284300
def test_fetching_viewable_repositories_ordering(self):
285301
current_user = OwnerFactory()
286302
query = """

graphql_api/tests/test_owner.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def setUp(self):
6161
author=self.owner, active=True, activated=True, private=True, name="a"
6262
)
6363
RepositoryFactory(
64-
author=self.owner, active=False, private=False, activated=False, name="b"
64+
author=self.owner, active=False, activated=False, private=False, name="b"
6565
)
6666
RepositoryFactory(
6767
author=random_user, active=True, activated=False, private=True, name="not"
@@ -247,6 +247,12 @@ def test_fetching_repositories_filter_out_okta_enforced(self):
247247
repos = paginate_connection(data["owner"]["repositories"])
248248
assert repos == []
249249

250+
def test_fetching_repositories_impersonation_show_okta_enforced(self):
251+
query = query_repositories % (self.owner.username, "", "")
252+
data = self.gql_request(query, owner=self.owner, impersonate_owner=True)
253+
repos = paginate_connection(data["owner"]["repositories"])
254+
assert repos == [{"name": "a"}, {"name": "b"}]
255+
250256
def test_is_part_of_org_when_unauthenticated(self):
251257
query = query_repositories % (self.owner.username, "", "")
252258
data = self.gql_request(query)

graphql_api/types/me/me.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,14 @@ def resolve_viewable_repositories(
5353
okta_authenticated_accounts: list[int] = info.context["request"].session.get(
5454
OKTA_SIGNED_IN_ACCOUNTS_SESSION_KEY, []
5555
)
56-
queryset = search_repos(current_user, filters, okta_authenticated_accounts)
56+
is_impersonation = info.context["request"].impersonation
57+
# If the user is impersonating another user, we want to show all the Okta repos.
58+
# This means we do not want to filter out the Okta enforced repos
59+
exclude_okta_enforced_repos = not is_impersonation
60+
61+
queryset = search_repos(
62+
current_user, filters, okta_authenticated_accounts, exclude_okta_enforced_repos
63+
)
5764
return queryset_to_connection(
5865
queryset,
5966
ordering=(ordering, RepositoryOrdering.ID),

graphql_api/types/owner/owner.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,14 @@ def resolve_repositories(
5555
okta_account_auths: list[int] = info.context["request"].session.get(
5656
OKTA_SIGNED_IN_ACCOUNTS_SESSION_KEY, []
5757
)
58+
59+
is_impersonation = info.context["request"].impersonation
60+
# If the user is impersonating another user, we want to show all the Okta repos.
61+
# This means we do not want to filter out the Okta enforced repos
62+
exclude_okta_enforced_repos = not is_impersonation
63+
5864
queryset = list_repository_for_owner(
59-
current_owner, owner, filters, okta_account_auths
65+
current_owner, owner, filters, okta_account_auths, exclude_okta_enforced_repos
6066
)
6167
return queryset_to_connection(
6268
queryset,
@@ -126,8 +132,17 @@ async def resolve_repository(owner: Owner, info, name):
126132
okta_authenticated_accounts: list[int] = info.context["request"].session.get(
127133
OKTA_SIGNED_IN_ACCOUNTS_SESSION_KEY, []
128134
)
135+
136+
is_impersonation = info.context["request"].impersonation
137+
# If the user is impersonating another user, we want to show all the Okta repos.
138+
# This means we do not want to filter out the Okta enforced repos
139+
exclude_okta_enforced_repos = not is_impersonation
140+
129141
repository: Optional[Repository] = await command.fetch_repository(
130-
owner, name, okta_authenticated_accounts
142+
owner,
143+
name,
144+
okta_authenticated_accounts,
145+
exclude_okta_enforced_repos=exclude_okta_enforced_repos,
131146
)
132147

133148
if repository is None:

0 commit comments

Comments
 (0)