From 31117b9972d84cfd45ecc64814584f5510d418db Mon Sep 17 00:00:00 2001 From: Thomas Steen Rasmussen Date: Thu, 11 Apr 2024 07:30:34 +0200 Subject: [PATCH] more linting and api polishing, introduce pytest, unfinished commit --- pyproject.toml | 7 +++ src/albums/api.py | 49 ++++++++++------ src/albums/schema.py | 3 +- src/albums/tests.py | 30 +++++----- src/bma/api.py | 1 - src/files/api.py | 107 ++++++++++++++++----------------- src/files/models.py | 40 +++++++++++++ src/files/schema.py | 4 +- src/files/tests.py | 127 ++++++++++++++++++++-------------------- src/files/validators.py | 8 +++ src/utils/api.py | 13 ++-- src/utils/schema.py | 5 +- src/utils/tests.py | 59 +++++++++---------- 13 files changed, 257 insertions(+), 196 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3b908208..b20de0b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ dev = [ test = [ "coverage==7.2.2", "factory-boy==3.2.1", + "pytest-django==4.8.0", ] [project.urls] @@ -95,3 +96,9 @@ convention = "google" "*/migrations/*" = [ "D" # https://docs.astral.sh/ruff/rules/#pydocstyle-d ] + +[tool.pytest.ini_options] +DJANGO_SETTINGS_MODULE = "bma.settings" +python_files = ["tests.py"] +log_cli = 1 +log_cli_level = "DEBUG" diff --git a/src/albums/api.py b/src/albums/api.py index 43e4173c..2ad6b87c 100644 --- a/src/albums/api.py +++ b/src/albums/api.py @@ -1,17 +1,18 @@ +"""The albums API.""" import logging import operator import uuid from functools import reduce +from django.core.exceptions import ValidationError from django.db.models import Q from django.http import HttpRequest from django.shortcuts import get_object_or_404 from guardian.shortcuts import assign_perm from ninja import Query from ninja import Router -from utils.api import wrap_response +from utils.api import AlbumApiResponseType from utils.schema import ApiMessageSchema -from utils.schema import ApiResponseSchema from .filters import AlbumFilters from .models import Album @@ -25,15 +26,18 @@ router = Router() # https://django-ninja.rest-framework.com/guides/input/query-params/#using-schema -query = Query(...) # type: ignore +query = Query(...) # type: ignore[type-arg] @router.post( "/create/", - response={201: SingleAlbumResponseSchema}, + response={ + 201: SingleAlbumResponseSchema, + 422: ApiMessageSchema, + }, summary="Create a new album", ) -def album_create(request: HttpRequest, payload: AlbumRequestSchema) -> tuple[int, ApiResponseSchema]: +def album_create(request: HttpRequest, payload: AlbumRequestSchema) -> AlbumApiResponseType: """Use this endpoint to create a new album, with or without files.""" album = Album() for k, v in payload.dict().items(): @@ -41,15 +45,20 @@ def album_create(request: HttpRequest, payload: AlbumRequestSchema) -> tuple[int album.files.set(v) else: setattr(album, k, v) - album.owner = request.user # type: ignore + album.owner = request.user # type: ignore[assignment] + + try: + album.full_clean() + except ValidationError: + logger.exception("validation failed") + return 422, ApiMessageSchema(message="Validation error") + album.save() - # assign permissions + # assign permissions and return response assign_perm("change_album", request.user, album) assign_perm("delete_album", request.user, album) - - # return response - return 200, wrap_response(album) + return 201, {"bma_response": album} @router.get( @@ -58,10 +67,10 @@ def album_create(request: HttpRequest, payload: AlbumRequestSchema) -> tuple[int summary="Return an album.", auth=None, ) -def album_get(request: HttpRequest, album_uuid: uuid.UUID) -> tuple[int, ApiResponseSchema]: +def album_get(request: HttpRequest, album_uuid: uuid.UUID) -> AlbumApiResponseType: """Return an album.""" album = get_object_or_404(Album, uuid=album_uuid) - return 200, wrap_response(album) + return 200, {"bma_response": album} @router.get( @@ -70,7 +79,7 @@ def album_get(request: HttpRequest, album_uuid: uuid.UUID) -> tuple[int, ApiResp summary="Return a list of albums.", auth=None, ) -def album_list(request: HttpRequest, filters: AlbumFilters = query) -> tuple[int, ApiResponseSchema]: +def album_list(request: HttpRequest, filters: AlbumFilters = query) -> AlbumApiResponseType: """Return a list of albums.""" albums = Album.objects.all() @@ -98,7 +107,7 @@ def album_list(request: HttpRequest, filters: AlbumFilters = query) -> tuple[int if filters.limit: albums = albums[: filters.limit] - return 200, wrap_response(albums) + return 200, {"bma_response": albums} @router.put( @@ -127,8 +136,9 @@ def album_update( request: HttpRequest, album_uuid: uuid.UUID, payload: AlbumRequestSchema, + *, check: bool = False, -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> AlbumApiResponseType: """Update (PATCH) or replace (PUT) an Album.""" album = get_object_or_404(Album, uuid=album_uuid) if not request.user.has_perm("change_album", album): @@ -150,12 +160,12 @@ def album_update( if attr == "files": # handle the m2m seperate continue - else: - setattr(album, attr, value) + # set the attribute on the album + setattr(album, attr, value) album.save() if "files" in payload.dict(): album.files.set(payload.dict()["files"]) - return 200, wrap_response(album) + return 200, {"bma_response": album} @router.delete( @@ -164,8 +174,9 @@ def album_update( summary="Delete an album.", ) def album_delete( - request: HttpRequest, album_uuid: uuid.UUID, check: bool = False + request: HttpRequest, album_uuid: uuid.UUID, *, check: bool = False ) -> tuple[int, ApiMessageSchema | None]: + """Delete an album.""" album = get_object_or_404(Album, uuid=album_uuid) if not request.user.has_perm("delete_album", album): # no permission diff --git a/src/albums/schema.py b/src/albums/schema.py index cb4a628d..937a3171 100644 --- a/src/albums/schema.py +++ b/src/albums/schema.py @@ -1,5 +1,6 @@ """Schemas for album API calls.""" import uuid +from collections.abc import Sequence from django.http import HttpRequest from django.urls import reverse @@ -14,7 +15,7 @@ class AlbumRequestSchema(ModelSchema): title: str = "" description: str = "" - files: list[uuid.UUID] + files: Sequence[uuid.UUID] = [] class Config: """Set model and fields.""" diff --git a/src/albums/tests.py b/src/albums/tests.py index 77707daf..14269f55 100644 --- a/src/albums/tests.py +++ b/src/albums/tests.py @@ -30,7 +30,7 @@ def test_album_create( content_type="application/json", ) assert response.status_code == 201 - self.album_uuid = response.json()["uuid"] + self.album_uuid = response.json()["bma_response"]["uuid"] def test_album_create_with_files( self, @@ -84,9 +84,9 @@ def test_album_update(self): content_type="application/json", ) assert response.status_code == 200 - assert len(response.json()["files"]) == 2 - assert response.json()["title"] == "new title" - assert response.json()["description"] == "description here" + assert len(response.json()["bma_response"]["files"]) == 2 + assert response.json()["bma_response"]["title"] == "new title" + assert response.json()["bma_response"]["description"] == "description here" # update the album with more files response = self.client.patch( @@ -96,7 +96,7 @@ def test_album_update(self): content_type="application/json", ) assert response.status_code == 200 - assert len(response.json()["files"]) == 10 + assert len(response.json()["bma_response"]["files"]) == 10 # update to remove all files response = self.client.patch( @@ -106,7 +106,7 @@ def test_album_update(self): content_type="application/json", ) assert response.status_code == 200 - assert len(response.json()["files"]) == 0 + assert len(response.json()["bma_response"]["files"]) == 0 def test_album_delete(self): """Test deleting an album.""" @@ -154,16 +154,16 @@ def test_album_list(self): self.test_album_create_with_files(title=f"album{i}") response = self.client.get(reverse("api-v1-json:album_list"), headers={"authorization": self.user1.auth}) assert response.status_code == 200 - assert len(response.json()) == 10 + assert len(response.json()["bma_response"]) == 10 # test the file filter with files in different albums response = self.client.get( reverse("api-v1-json:album_list"), - data={"files": [self.files[0], response.json()[1]["files"][0]]}, + data={"files": [self.files[0], response.json()["bma_response"][1]["files"][0]]}, headers={"authorization": self.user1.auth}, ) assert response.status_code == 200 - assert len(response.json()) == 0 + assert len(response.json()["bma_response"]) == 0 # test with files in the same album response = self.client.get( reverse("api-v1-json:album_list"), @@ -171,14 +171,14 @@ def test_album_list(self): headers={"authorization": self.user1.auth}, ) assert response.status_code == 200 - assert len(response.json()) == 1 + assert len(response.json()["bma_response"]) == 1 # test search response = self.client.get( reverse("api-v1-json:album_list"), data={"search": "album4"}, headers={"authorization": self.user1.auth} ) assert response.status_code == 200 - assert len(response.json()) == 1 + assert len(response.json()["bma_response"]) == 1 # test sorting response = self.client.get( @@ -187,8 +187,8 @@ def test_album_list(self): headers={"authorization": self.user1.auth}, ) assert response.status_code == 200 - assert len(response.json()) == 10 - assert response.json()[0]["title"] == "album9" + assert len(response.json()["bma_response"]) == 10 + assert response.json()["bma_response"][0]["title"] == "album9" # test offset response = self.client.get( @@ -197,5 +197,5 @@ def test_album_list(self): headers={"authorization": self.user1.auth}, ) assert response.status_code == 200 - assert len(response.json()) == 5 - assert response.json()[0]["title"] == "album5" + assert len(response.json()["bma_response"]) == 5 + assert response.json()["bma_response"][0]["title"] == "album5" diff --git a/src/bma/api.py b/src/bma/api.py index 0706163c..3260f591 100644 --- a/src/bma/api.py +++ b/src/bma/api.py @@ -20,7 +20,6 @@ version="1", # we require CSRF but disable it for non-session auth requests # inside ExemptOauthFromCSRFMiddleware - csrf=True, parser=ORJSONParser(), renderer=ORJSONRenderer(), urls_namespace="api-v1-json", diff --git a/src/files/api.py b/src/files/api.py index 5874da34..01a606c3 100644 --- a/src/files/api.py +++ b/src/files/api.py @@ -18,9 +18,8 @@ from ninja import Router from ninja.files import UploadedFile from pictures.models import Picture -from utils.api import wrap_response +from utils.api import FileApiResponseType from utils.schema import ApiMessageSchema -from utils.schema import ApiResponseSchema from videos.models import Video from .filters import FileFilters @@ -53,9 +52,7 @@ }, summary="Upload a new file.", ) -def upload( - request: HttpRequest, f: UploadedFile, metadata: UploadRequestSchema -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +def upload(request: HttpRequest, f: UploadedFile, metadata: UploadRequestSchema) -> FileApiResponseType: """API endpoint for file uploads.""" # find the filetype using libmagic by reading the first bit of the file mime = magic.from_buffer(f.read(512), mime=True) @@ -104,14 +101,14 @@ def upload( ): # use the large_thumbnail size as default uploaded_file.thumbnail_url = uploaded_file.large_thumbnail.url - uploaded_file.save() + uploaded_file.save(update_fields=["thumbnail_url", "updated"]) # assign permissions (publish_basefile and unpublish_basefile are assigned after moderation) assign_perm("view_basefile", request.user, uploaded_file) assign_perm("change_basefile", request.user, uploaded_file) assign_perm("delete_basefile", request.user, uploaded_file) - return 201, wrap_response(uploaded_file) + return 201, {"bma_response": uploaded_file, "message": f"File {uploaded_file.uuid} uploaded OK!"} ############## LIST ########################################################### @@ -121,7 +118,7 @@ def upload( summary="Return a list of metadata for files.", auth=None, ) -def file_list(request: HttpRequest, filters: FileFilters = query) -> tuple[int, ApiResponseSchema]: # noqa: C901,PLR0912 +def file_list(request: HttpRequest, filters: FileFilters = query) -> FileApiResponseType: # noqa: C901,PLR0912 """Return a list of metadata for files.""" # start out with a list of all PUBLISHED files plus whatever else the user has explicit access to files = BaseFile.objects.filter(status="PUBLISHED") | get_objects_for_user( @@ -185,43 +182,52 @@ def file_list(request: HttpRequest, filters: FileFilters = query) -> tuple[int, if filters.limit: files = files[: filters.limit] - return 200, wrap_response(files) + return 200, {"bma_response": files} ############## GENERIC FILE ACTION ############################################ def api_file_action( # noqa: PLR0913 request: HttpRequest, - uuids: list[uuid.UUID], + file_uuids: list[uuid.UUID] | uuid.UUID, permission: str, old_status: str, action: str, *, check: bool, -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> FileApiResponseType: """Perform an action on one or more files.""" - file_filter: dict[str, str | list[str]] = {"uuid__in": [str(u) for u in uuids]} + if isinstance(file_uuids, uuid.UUID): + single = True + file_uuids = [file_uuids] + else: + single = False + file_filter: dict[str, str | list[str]] = {"uuid__in": [str(u) for u in file_uuids]} if old_status: file_filter["status"] = old_status - dbfiles = get_objects_for_user(request.user, permission, klass=BaseFile.objects.filter(**file_filter)) - if len(uuids) != dbfiles.count(): - errors = len(uuids) - dbfiles.count() - return 403, ApiMessageSchema(message=f"Wrong status/no permission to {action} {errors} of {len(uuids)} files)") + db_files = get_objects_for_user(request.user, permission, klass=BaseFile.objects.filter(**file_filter)) + db_uuids = list(db_files.values_list("uuid", flat=True)) + logger.debug( + f"user {request.user} wants to {action} {len(file_uuids)} files, has perm {permission} for {len(db_uuids)}" + ) + if len(file_uuids) != db_files.count(): + errors = len(file_uuids) - db_files.count() + return 403, ApiMessageSchema( + message=f"Wrong status/no permission to {action} {errors} of {len(file_uuids)} files)" + ) if check: return 202, ApiMessageSchema(message="OK") - updated = getattr(dbfiles, action)() - logger.debug(f"{action} {updated} files") - return 200, ApiResponseSchema( - bma_request={}, - bma_response=BaseFile.objects.filter( - uuid__in=dbfiles.values_list("uuid", flat=True), - ), + updated = getattr(db_files, action)() + logger.debug(f"{action} {updated} OK") + db_files = BaseFile.objects.filter( + uuid__in=db_uuids, ) + if single: + db_files = db_files.get() + return 200, {"bma_response": db_files, "message": f"{action} {len(db_uuids)} files OK"} ############## APPROVE ######################################################## -def approve( - request: HttpRequest, uuids: list[uuid.UUID], *, check: bool -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +def approve(request: HttpRequest, uuids: list[uuid.UUID] | uuid.UUID, *, check: bool) -> FileApiResponseType: """Approve one or more files by changing the status of a list of files from PENDING_MODERATION to UNPUBLISHED.""" return api_file_action( request, @@ -244,9 +250,9 @@ def approve( ) def approve_file( request: HttpRequest, file_uuid: SingleFileRequestSchema, *, check: bool = False -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> FileApiResponseType: """API endpoint to change the status of a single file from PENDING_MODERATION to UNPUBLISHED.""" - return approve(request, [file_uuid.file_uuid], check=check) + return approve(request, file_uuid.file_uuid, check=check) @router.patch( @@ -260,16 +266,14 @@ def approve_file( ) def approve_files( request: HttpRequest, payload: MultipleFileRequestSchema, *, check: bool = False -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> FileApiResponseType: """API endpoint to change the status of multiple files from PENDING_MODERATION to UNPUBLISHED.""" uuids = payload.dict()["files"] return approve(request, uuids, check=check) ############## UNAPPROVE ###################################################### -def unapprove( - request: HttpRequest, uuids: list[uuid.UUID], *, check: bool -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +def unapprove(request: HttpRequest, uuids: list[uuid.UUID] | uuid.UUID, *, check: bool) -> FileApiResponseType: """Unapprove one or more files by changing the status of a list of files to PENDING_MODERATION.""" return api_file_action( request, @@ -292,9 +296,9 @@ def unapprove( ) def unapprove_file( request: HttpRequest, file_uuid: SingleFileRequestSchema, *, check: bool = False -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> FileApiResponseType: """API endpoint to change the status of a single file to PENDING_MODERATION.""" - return unapprove(request, [file_uuid.file_uuid], check=check) + return unapprove(request, file_uuid.file_uuid, check=check) @router.patch( @@ -308,16 +312,14 @@ def unapprove_file( ) def unapprove_files( request: HttpRequest, payload: MultipleFileRequestSchema, *, check: bool = False -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> FileApiResponseType: """API endpoint to change the status of multiple files to PENDING_MODERATION.""" uuids = payload.dict()["files"] return unapprove(request, uuids, check=check) ############## PUBLISH ######################################################## -def publish( - request: HttpRequest, uuids: list[uuid.UUID], *, check: bool -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +def publish(request: HttpRequest, uuids: list[uuid.UUID] | uuid.UUID, *, check: bool) -> FileApiResponseType: """Publish a list of files by changing the status from UNPUBLISHED to PUBLISHED.""" return api_file_action( request, @@ -340,9 +342,9 @@ def publish( ) def publish_file( request: HttpRequest, file_uuid: SingleFileRequestSchema, *, check: bool = False -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> FileApiResponseType: """API endpoint to change the status of a single file from UNPUBLISHED to PUBLISHED.""" - return publish(request, [file_uuid.file_uuid], check=check) + return publish(request, file_uuid.file_uuid, check=check) @router.patch( @@ -354,18 +356,14 @@ def publish_file( }, summary="Publish multiple files (change status from UNPUBLISHED to PUBLISHED).", ) -def publish_files( - request: HttpRequest, data: MultipleFileRequestSchema, *, check: bool = False -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +def publish_files(request: HttpRequest, data: MultipleFileRequestSchema, *, check: bool = False) -> FileApiResponseType: """Change the status of files from UNPUBLISHED to PUBLISHED.""" files = data.dict()["files"] return publish(request, files, check=check) ############## UNPUBLISH ######################################################## -def unpublish( - request: HttpRequest, uuids: list[uuid.UUID], *, check: bool -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +def unpublish(request: HttpRequest, uuids: list[uuid.UUID] | uuid.UUID, *, check: bool) -> FileApiResponseType: """Unpublish a list of files by changing the status from PUBLISHED to UNPUBLISHED.""" return api_file_action( request, @@ -388,9 +386,9 @@ def unpublish( ) def unpublish_file( request: HttpRequest, file_uuid: SingleFileRequestSchema, *, check: bool = False -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> FileApiResponseType: """API endpoint to change the status of a single file from PUBLISHED to UNPUBLISHED.""" - return unpublish(request, [file_uuid.file_uuid], check=check) + return unpublish(request, file_uuid.file_uuid, check=check) @router.patch( @@ -404,7 +402,7 @@ def unpublish_file( ) def unpublish_files( request: HttpRequest, data: MultipleFileRequestSchema, *, check: bool = False -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> FileApiResponseType: """Change the status of files from PUBLISHED to UNPUBLISHED.""" files = data.dict()["files"] return unpublish(request, files, check=check) @@ -421,14 +419,14 @@ def unpublish_files( summary="Return the metadata of a file.", auth=None, ) -def file_get(request: HttpRequest, file_uuid: uuid.UUID) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +def file_get(request: HttpRequest, file_uuid: uuid.UUID) -> FileApiResponseType: """Return a file object.""" basefile = get_object_or_404(BaseFile, uuid=file_uuid) if basefile.status == "PUBLISHED" or request.user.has_perm( "view_basefile", basefile, ): - return 200, wrap_response(basefile) + return 200, {"bma_response": basefile} return 403, ApiMessageSchema(message="Permission denied.") @@ -462,7 +460,7 @@ def file_update( metadata: FileUpdateRequestSchema, *, check: bool = False, -) -> tuple[int, ApiMessageSchema | ApiResponseSchema]: +) -> FileApiResponseType: """Update (PATCH) or replace (PUT) a file metadata object.""" basefile = get_object_or_404(BaseFile, uuid=file_uuid) if not request.user.has_perm("change_basefile", basefile): @@ -492,7 +490,7 @@ def file_update( basefile.full_clean() except ValidationError: return 422, ApiMessageSchema(message="Validation error") - return 200, wrap_response(basefile) + return 200, {"bma_response": basefile, "message": "File updated."} ############## DELETE ######################################################### @@ -517,6 +515,5 @@ def file_delete( # check mode requested, don't change anything return 202, ApiMessageSchema(message="OK") # ok go but we don't let users fully delete files for now - basefile.status = "PENDING_DELETION" - basefile.save() + basefile.update_status(new_status="PENDING_DELETION") return 204, None diff --git a/src/files/models.py b/src/files/models.py index 9b41d36a..b24299f0 100644 --- a/src/files/models.py +++ b/src/files/models.py @@ -1,18 +1,25 @@ """This file contains the main BMA model BaseFile and related classes.""" import contextlib +import logging import uuid from django.conf import settings +from django.core.exceptions import ValidationError from django.db import models from django.http import HttpRequest from django.urls import reverse +from guardian.shortcuts import assign_perm +from guardian.shortcuts import remove_perm from polymorphic.managers import PolymorphicQuerySet from polymorphic.models import PolymorphicManager from polymorphic.models import PolymorphicModel from users.sentinel import get_sentinel_user +from .validators import validate_file_status from .validators import validate_thumbnail_url +logger = logging.getLogger("bma") + class StatusChoices(models.TextChoices): """The possible status choices for a file.""" @@ -155,6 +162,7 @@ class Meta: status = models.CharField( max_length=20, blank=False, + validators=[validate_file_status], choices=StatusChoices.choices, default="PENDING_MODERATION", help_text="The status of this file. Only published files are visible on the public website.", @@ -202,6 +210,7 @@ def resolve_links(self, request: HttpRequest | None = None) -> dict[str, str | d """ links: dict[str, str | dict[str, str]] = { "self": reverse("api-v1-json:file_get", kwargs={"file_uuid": self.uuid}), + "html": self.get_absolute_url(), } downloads: dict[str, str] = { "original": self.original.url, @@ -243,3 +252,34 @@ def resolve_links(self, request: HttpRequest | None = None) -> dict[str, str | d ) links["downloads"] = downloads return links + + def update_status(self, new_status: str) -> int: + """Update the status of a file.""" + self.status = new_status + try: + self.full_clean() + self.save(update_fields=["status", "updated"]) + except ValidationError: + logger.exception("Invalid file status.") + return 0 + return 1 + + def approve(self) -> int: + """Approve this file and add publish/unpublish permissions to the owner.""" + assign_perm("publish_basefile", self.owner, self) + assign_perm("unpublish_basefile", self.owner, self) + return self.unpublish() + + def unapprove(self) -> int: + """Unapprove this file and remove publish/unpublish permissions to the owner.""" + remove_perm("publish_basefile", self.owner, self) + remove_perm("unpublish_basefile", self.owner, self) + return self.update_status(new_status="PENDING_MODERATION") + + def publish(self) -> int: + """Publish this file.""" + return self.update_status(new_status="PUBLISHED") + + def unpublish(self) -> int: + """Unpublish this file.""" + return self.update_status(new_status="UNPUBLISHED") diff --git a/src/files/schema.py b/src/files/schema.py index d7b1c3c6..1c5afbf4 100644 --- a/src/files/schema.py +++ b/src/files/schema.py @@ -70,7 +70,7 @@ class SingleFileRequestSchema(Schema): class MultipleFileRequestSchema(Schema): """The schema used for requests involving multiple files.""" - file_uuids: list[uuid.UUID] + files: list[uuid.UUID] """Response schemas below here.""" @@ -123,7 +123,7 @@ def resolve_filename(obj: BaseFile, context: dict[str, HttpRequest]) -> str: @staticmethod def resolve_size_bytes(obj: BaseFile, context: dict[str, HttpRequest]) -> int: """Get the value for the size_bytes field, return 0 if file is not found.""" - if Path.exists(obj.original.path): + if Path(obj.original.path).exists(): return int(obj.original.size) return 0 diff --git a/src/files/tests.py b/src/files/tests.py index 8ddab7de..5f8b0146 100644 --- a/src/files/tests.py +++ b/src/files/tests.py @@ -43,7 +43,7 @@ def test_file_upload(self): """Test file upload cornercases.""" data = self.file_upload(title="", return_full=True) assert data["title"] == data["original_filename"] - self.file_upload(license="notalicense", expect_status_code=422) + self.file_upload(file_license="notalicense", expect_status_code=422) self.file_upload(thumbnail_url="/foo/wrong.tar", expect_status_code=422) def test_file_list(self): @@ -53,7 +53,7 @@ def test_file_list(self): files.append(self.file_upload(title=f"title{i}")) response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user1.auth}) assert response.status_code == 200 - assert len(response.json()) == 15 + assert len(response.json()["bma_response"]) == 15 # test sorting response = self.client.get( @@ -61,17 +61,17 @@ def test_file_list(self): data={"limit": 5, "sorting": "title_asc"}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 5 - assert response.json()[0]["title"] == "title0" - assert response.json()[1]["title"] == "title1" - assert response.json()[2]["title"] == "title10" - assert response.json()[4]["title"] == "title12" + assert len(response.json()["bma_response"]) == 5 + assert response.json()["bma_response"][0]["title"] == "title0" + assert response.json()["bma_response"][1]["title"] == "title1" + assert response.json()["bma_response"][2]["title"] == "title10" + assert response.json()["bma_response"][4]["title"] == "title12" response = self.client.get( reverse("api-v1-json:file_list"), data={"limit": 1, "sorting": "created_desc"}, headers={"authorization": self.user1.auth}, ) - assert response.json()[0]["title"] == "title14" + assert response.json()["bma_response"][0]["title"] == "title14" # test offset response = self.client.get( @@ -79,8 +79,8 @@ def test_file_list(self): data={"offset": 5, "sorting": "created_asc"}, headers={"authorization": self.user1.auth}, ) - assert response.json()[0]["title"] == "title5" - assert response.json()[4]["title"] == "title9" + assert response.json()["bma_response"][0]["title"] == "title5" + assert response.json()["bma_response"][4]["title"] == "title9" # test owner filter response = self.client.get( @@ -88,20 +88,20 @@ def test_file_list(self): data={"owners": [self.user1.uuid, self.user2.uuid]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 15 + assert len(response.json()["bma_response"]) == 15 response = self.client.get( reverse("api-v1-json:file_list"), data={"owners": [self.user2.uuid]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 0 + assert len(response.json()["bma_response"]) == 0 # test search response = self.client.get( reverse("api-v1-json:file_list"), data={"search": "title7"}, headers={"authorization": self.user1.auth} ) - assert len(response.json()) == 1 - assert response.json()[0]["title"] == "title7" + assert len(response.json()["bma_response"]) == 1 + assert response.json()["bma_response"][0]["title"] == "title7" # create an album with some files response = self.client.post( @@ -114,7 +114,7 @@ def test_file_list(self): content_type="application/json", ) assert response.status_code == 201 - self.album_uuid = response.json()["uuid"] + self.album_uuid = response.json()["bma_response"]["uuid"] # test album filter response = self.client.get( @@ -122,7 +122,8 @@ def test_file_list(self): data={"albums": [self.album_uuid]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 3 + print(response.content) + assert len(response.json()["bma_response"]) == 3 # create another empty album response = self.client.post( @@ -134,7 +135,7 @@ def test_file_list(self): content_type="application/json", ) assert response.status_code == 201 - uuid = response.json()["uuid"] + uuid = response.json()["bma_response"]["uuid"] # test filtering for multiple albums response = self.client.get( @@ -142,33 +143,33 @@ def test_file_list(self): data={"albums": [self.album_uuid, uuid]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 3 + assert len(response.json()["bma_response"]) == 3 # test file size filter response = self.client.get( reverse("api-v1-json:file_list"), data={"size": 9478}, headers={"authorization": self.user1.auth} ) - assert len(response.json()) == 15 + assert len(response.json()["bma_response"]) == 15 # test file size_lt filter response = self.client.get( reverse("api-v1-json:file_list"), data={"size_lt": 10000}, headers={"authorization": self.user1.auth} ) - assert len(response.json()) == 15 + assert len(response.json()["bma_response"]) == 15 response = self.client.get( reverse("api-v1-json:file_list"), data={"size_lt": 1000}, headers={"authorization": self.user1.auth} ) - assert len(response.json()) == 0 + assert len(response.json()["bma_response"]) == 0 # test file size_gt filter response = self.client.get( reverse("api-v1-json:file_list"), data={"size_gt": 10000}, headers={"authorization": self.user1.auth} ) - assert len(response.json()) == 0 + assert len(response.json()["bma_response"]) == 0 response = self.client.get( reverse("api-v1-json:file_list"), data={"size_gt": 1000}, headers={"authorization": self.user1.auth} ) - assert len(response.json()) == 15 + assert len(response.json()["bma_response"]) == 15 # test file type filter response = self.client.get( @@ -176,13 +177,13 @@ def test_file_list(self): data={"filetypes": ["picture"]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 15 + assert len(response.json()["bma_response"]) == 15 response = self.client.get( reverse("api-v1-json:file_list"), data={"filetypes": ["audio", "video", "document"]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 0 + assert len(response.json()["bma_response"]) == 0 # test file license filter response = self.client.get( @@ -190,13 +191,13 @@ def test_file_list(self): data={"licenses": ["CC_ZERO_1_0"]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 15 + assert len(response.json()["bma_response"]) == 15 response = self.client.get( reverse("api-v1-json:file_list"), data={"licenses": ["CC_BY_4_0", "CC_BY_SA_4_0"]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 0 + assert len(response.json()["bma_response"]) == 0 def test_file_list_permissions(self): """Test various permissions stuff for the file_list endpoint.""" @@ -207,44 +208,44 @@ def test_file_list_permissions(self): # no files should be visible response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user2.auth}) assert response.status_code == 200 - assert len(response.json()) == 0 + assert len(response.json()["bma_response"]) == 0 # the superuser can see all files response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.superuser.auth}) assert response.status_code == 200 - assert len(response.json()) == 15 + assert len(response.json()["bma_response"]) == 15 # attempt to publish a file before approval response = self.client.patch( - reverse("api-v1-json:file_publish", kwargs={"file_uuid": files[0]}), + reverse("api-v1-json:publish_file", kwargs={"file_uuid": files[0]}), headers={"authorization": self.user1.auth}, ) assert response.status_code == 403 # approve the file without permission response = self.client.patch( - reverse("api-v1-json:file_approve", kwargs={"file_uuid": files[0]}), + reverse("api-v1-json:approve_file", kwargs={"file_uuid": files[0]}), headers={"authorization": self.user1.auth}, ) assert response.status_code == 403 # approve the file, check mode response = self.client.patch( - reverse("api-v1-json:file_approve", kwargs={"file_uuid": files[0]}) + "?check=true", + reverse("api-v1-json:approve_file", kwargs={"file_uuid": files[0]}) + "?check=true", headers={"authorization": self.superuser.auth}, ) assert response.status_code == 202 # really approve the file response = self.client.patch( - reverse("api-v1-json:file_approve", kwargs={"file_uuid": files[0]}), + reverse("api-v1-json:approve_file", kwargs={"file_uuid": files[0]}), headers={"authorization": self.superuser.auth}, ) assert response.status_code == 200 # try again with wrong status response = self.client.patch( - reverse("api-v1-json:file_approve", kwargs={"file_uuid": files[0]}), + reverse("api-v1-json:approve_file", kwargs={"file_uuid": files[0]}), headers={"authorization": self.superuser.auth}, ) assert response.status_code == 403 @@ -255,18 +256,18 @@ def test_file_list_permissions(self): data={"statuses": ["UNPUBLISHED"]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 1 + assert len(response.json()["bma_response"]) == 1 # publish a file, check mode response = self.client.patch( - reverse("api-v1-json:file_publish", kwargs={"file_uuid": files[0]}) + "?check=true", + reverse("api-v1-json:publish_file", kwargs={"file_uuid": files[0]}) + "?check=true", headers={"authorization": self.user1.auth}, ) assert response.status_code == 202 # publish the file response = self.client.patch( - reverse("api-v1-json:file_publish", kwargs={"file_uuid": files[0]}), + reverse("api-v1-json:publish_file", kwargs={"file_uuid": files[0]}), headers={"authorization": self.user1.auth}, ) assert response.status_code == 200 @@ -274,32 +275,32 @@ def test_file_list_permissions(self): # make sure someone else can see it response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user2.auth}) assert response.status_code == 200 - assert len(response.json()) == 1 + assert len(response.json()["bma_response"]) == 1 # make sure anonymous can see it response = self.client.get( reverse("api-v1-json:file_list"), ) assert response.status_code == 200 - assert len(response.json()) == 1 + assert len(response.json()["bma_response"]) == 1 # unpublish the file without permission response = self.client.patch( - reverse("api-v1-json:file_unpublish", kwargs={"file_uuid": files[0]}), + reverse("api-v1-json:unpublish_file", kwargs={"file_uuid": files[0]}), headers={"authorization": self.user2.auth}, ) assert response.status_code == 403 # unpublish the file, check mode response = self.client.patch( - reverse("api-v1-json:file_unpublish", kwargs={"file_uuid": files[0]}) + "?check=true", + reverse("api-v1-json:unpublish_file", kwargs={"file_uuid": files[0]}) + "?check=true", headers={"authorization": self.user1.auth}, ) assert response.status_code == 202 # unpublish the file response = self.client.patch( - reverse("api-v1-json:file_unpublish", kwargs={"file_uuid": files[0]}), + reverse("api-v1-json:unpublish_file", kwargs={"file_uuid": files[0]}), headers={"authorization": self.user1.auth}, ) assert response.status_code == 200 @@ -307,14 +308,14 @@ def test_file_list_permissions(self): # make sure it is not visible anymore response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user2.auth}) assert response.status_code == 200 - assert len(response.json()) == 0 + assert len(response.json()["bma_response"]) == 0 # make sure it is not visible anymore to anonymous response = self.client.get( reverse("api-v1-json:file_list"), ) assert response.status_code == 200 - assert len(response.json()) == 0 + assert len(response.json()["bma_response"]) == 0 def test_metadata_get(self): """Get file metadata from the API.""" @@ -324,19 +325,19 @@ def test_metadata_get(self): headers={"authorization": self.user1.auth}, ) assert response.status_code == 200 - assert "uuid" in response.json() - assert response.json()["uuid"] == self.file_uuid + assert "uuid" in response.json()["bma_response"] + assert response.json()["bma_response"]["uuid"] == self.file_uuid def test_file_download(self): """Test downloading a file after uploading it.""" self.file_upload() metadata = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user1.auth}).json()[ - 0 - ] + "bma_response" + ][0] url = metadata["links"]["downloads"]["original"] # try download of unpublished file without auth response = self.client.get(url) - assert response.status_code == 404 + assert response.status_code == 403 # try again with auth self.client.force_login(self.user1) response = self.client.get(url) @@ -353,7 +354,7 @@ def test_file_metadata_update(self): headers={"authorization": self.user1.auth}, ) assert response.status_code == 200 - original_metadata = response.json() + original_metadata = response.json()["bma_response"] updates = { "title": "some title", "description": "some description", @@ -397,14 +398,14 @@ def test_file_metadata_update(self): ) assert response.status_code == 200 original_metadata.update(updates) - for k, v in response.json().items(): + for k, v in response.json()["bma_response"].items(): # "updated" will have changed of course, if k == "updated": assert v != original_metadata[k] # and "source" was initially set but not specified in the PUT call, - # so it should be blank now + # so it should be blank now, so it should return the files detail url elif k == "source": - assert v == "" + assert v == original_metadata["links"]["html"] # everything else should be the same else: assert v == original_metadata[k] @@ -436,8 +437,8 @@ def test_file_metadata_update(self): assert response.status_code == 200 # make sure we updated only the source attribute with the PATCH request - assert response.json()["source"] == "https://example.com/foo.png" - assert response.json()["attribution"] == "some attribution" + assert response.json()["bma_response"]["source"] == "https://example.com/foo.png" + assert response.json()["bma_response"]["attribution"] == "some attribution" # update thumbnail to an invalid value response = self.client.patch( @@ -535,15 +536,15 @@ def test_metadata_get_403(self): ) assert response.status_code == 403 - def test_file_approve_multiple(self): + def test_approve_files(self): """Approve multiple files.""" for _ in range(10): self.file_upload() response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user1.auth}) - files = [f["uuid"] for f in response.json()] + files = [f["uuid"] for f in response.json()["bma_response"]] # first try with no permissions response = self.client.patch( - reverse("api-v1-json:file_approve_multiple"), + reverse("api-v1-json:approve_files"), {"files": files[0:5]}, headers={"authorization": self.user1.auth}, content_type="application/json", @@ -552,7 +553,7 @@ def test_file_approve_multiple(self): # then check mode response = self.client.patch( - reverse("api-v1-json:file_approve_multiple") + "?check=true", + reverse("api-v1-json:approve_files") + "?check=true", {"files": files[0:5]}, headers={"authorization": self.superuser.auth}, content_type="application/json", @@ -561,7 +562,7 @@ def test_file_approve_multiple(self): # then with permission response = self.client.patch( - reverse("api-v1-json:file_approve_multiple"), + reverse("api-v1-json:approve_files"), {"files": files[0:5]}, headers={"authorization": self.superuser.auth}, content_type="application/json", @@ -570,7 +571,7 @@ def test_file_approve_multiple(self): # then try approving the same files again response = self.client.patch( - reverse("api-v1-json:file_approve_multiple"), + reverse("api-v1-json:approve_files"), {"files": files[0:5]}, headers={"authorization": self.superuser.auth}, content_type="application/json", @@ -583,7 +584,7 @@ def test_file_approve_multiple(self): data={"statuses": ["UNPUBLISHED"]}, headers={"authorization": self.user1.auth}, ) - assert len(response.json()) == 5 + assert len(response.json()["bma_response"]) == 5 def test_file_missing_on_disk(self): """Test the case where a file has gone missing from disk for some reason.""" @@ -598,4 +599,4 @@ def test_file_missing_on_disk(self): headers={"authorization": self.user1.auth}, ) assert response.status_code == 200 - assert response.json()["size_bytes"] == 0 + assert response.json()["bma_response"]["size_bytes"] == 0 diff --git a/src/files/validators.py b/src/files/validators.py index 18a227d2..e5fdf1b4 100644 --- a/src/files/validators.py +++ b/src/files/validators.py @@ -6,3 +6,11 @@ def validate_thumbnail_url(value: str) -> None: """Make sure thumbnail URLs are local relative URLs under /static/images/ or /media/.""" if not value.startswith("/static/images/") and not value.startswith("/media/"): raise ValidationError("non-local") + + +def validate_file_status(value: str) -> None: + """Make sure the file status is valid.""" + from .models import StatusChoices + + if value not in StatusChoices.names: + raise ValidationError("bad-status") diff --git a/src/utils/api.py b/src/utils/api.py index f4cee00b..ad2f3795 100644 --- a/src/utils/api.py +++ b/src/utils/api.py @@ -1,13 +1,12 @@ """API related utility functions.""" +from typing import TypeAlias + from albums.models import Album from django.db.models import QuerySet from files.models import BaseFile -from utils.schema import ApiResponseSchema - +from .schema import ApiMessageSchema -def wrap_response( - payload: list[str] | dict[str, str] | QuerySet[Album] | QuerySet[BaseFile] | Album | BaseFile, -) -> ApiResponseSchema: - """Wrap the response payload in the api envelope.""" - return ApiResponseSchema(bma_request=None, bma_response=payload) +# type aliases to make API return types more readable +FileApiResponseType: TypeAlias = tuple[int, ApiMessageSchema | dict[str, BaseFile | QuerySet[BaseFile] | str]] +AlbumApiResponseType: TypeAlias = tuple[int, ApiMessageSchema | dict[str, Album | QuerySet[Album] | str]] diff --git a/src/utils/schema.py b/src/utils/schema.py index 4010ae3b..dc704400 100644 --- a/src/utils/schema.py +++ b/src/utils/schema.py @@ -39,8 +39,9 @@ def resolve_client_ip(obj: dict[str, str], context: dict[str, HttpRequest]) -> s class ApiMessageSchema(Schema): """The schema used for all API responses which are just messages.""" - bma_request: RequestMetadataSchema - message: str | None = None + # TODO(tykling): figure out why 1) bma_request needs a default to work, and 2) why that default can't be the schema + bma_request: RequestMetadataSchema = None # type: ignore[assignment] + message: str = "OK" details: dict[str, str] | None = None diff --git a/src/utils/tests.py b/src/utils/tests.py index ebe749bc..0b1d86d0 100644 --- a/src/utils/tests.py +++ b/src/utils/tests.py @@ -3,7 +3,7 @@ import hashlib import json import logging -import random +import secrets import string from pathlib import Path from urllib.parse import parse_qs @@ -37,10 +37,7 @@ def setUpTestData(cls) -> None: # create 4 regular users and 1 superuser for i in range(6): - if i == 5: - username = "superuser" - else: - username = f"user{i}" + username = "superuser" if i == 5 else f"user{i}" user = UserFactory.create(username=username) user.set_password("secret") if i == 5: @@ -55,7 +52,7 @@ def setUpTestData(cls) -> None: client_type=Application.CLIENT_PUBLIC, authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, client_id=f"client_id_{username}", - client_secret="client_secret", + client_secret="client_secret", # noqa: S106 skip_authorization=True, ) user.auth = cls.get_access_token(user) @@ -63,12 +60,11 @@ def setUpTestData(cls) -> None: cls.client.logout() @classmethod - def get_access_token(cls, user): + def get_access_token(cls, user) -> str: # noqa: ANN001 """Test the full oauth2 public client authorization code pkce token flow.""" - # prepare to get access token - code_verifier = "".join( - random.choice(string.ascii_uppercase + string.digits) for _ in range(random.randint(43, 128)) - ) + # generate a verifier string from 43-128 chars + alphabet = string.ascii_uppercase + string.digits + code_verifier = "".join(secrets.choice(alphabet) for i in range(43 + secrets.randbelow(86))) code_verifier = base64.urlsafe_b64encode(code_verifier.encode("utf-8")) code_challenge = hashlib.sha256(code_verifier).digest() code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8").replace("=", "") @@ -111,47 +107,48 @@ def get_access_token(cls, user): user.tokeninfo = json.loads(response.content) return f"Bearer {user.tokeninfo['access_token']}" - def file_upload( - cls, - filepath="static_src/images/logo_wide_black_500_RGB.png", - title="some title", - license="CC_ZERO_1_0", - attribution="fotoarne", - source="https://example.com/something.png", - thumbnail_url=None, - return_full=False, - expect_status_code=201, - ): + def file_upload( # noqa: PLR0913 + self, + *, + filepath: str = "static_src/images/logo_wide_black_500_RGB.png", + title: str = "some title", + file_license: str = "CC_ZERO_1_0", + attribution: str = "fotoarne", + source: str = "https://example.com/something.png", + thumbnail_url: str = "", + return_full: bool = False, + expect_status_code: int = 201, + ) -> str | dict[str, str]: + """The upload method used by many tests.""" metadata = { "title": title, - "license": license, + "license": file_license, "attribution": attribution, "source": source, } if thumbnail_url: metadata["thumbnail_url"] = thumbnail_url - with open(filepath, "rb") as f: - response = cls.client.post( + with Path(filepath).open("rb") as f: + response = self.client.post( reverse("api-v1-json:upload"), { "f": f, "metadata": json.dumps(metadata), }, - HTTP_AUTHORIZATION=cls.user1.auth, + headers={"authorization": self.user1.auth}, ) assert response.status_code == expect_status_code if expect_status_code == 422: return None - data = response.json() + data = response.json()["bma_response"] assert "uuid" in data if not title: title = Path(filepath).name assert data["title"] == title assert data["attribution"] == attribution - assert data["license"] == license + assert data["license"] == file_license assert data["source"] == source - cls.file_uuid = data["uuid"] + self.file_uuid = data["uuid"] if return_full: return data - else: - return data["uuid"] + return data["uuid"]