diff --git a/CHANGELOG.md b/CHANGELOG.md index 6549f7e2..e8ae9e18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,11 @@ While the project is still on major version 0, breaking changes may be introduce ## Unreleased +### Added + +- Project artifacts method: `HarborAsyncClient.get_project_artifacts()` +- `with_sbom_overview` parameter for `HarborAsyncClient.get_artifact()` and `HarborAsyncClient.get_artifacts()` to include SBOM overview in the response. + ### Changed - Models updated to API schema from [c97253f](https://github.com/goharbor/harbor/blob/4a12623459a754ff4d07fbd1cddb4df436e8524c/api/v2.0/swagger.yaml) diff --git a/harborapi/client.py b/harborapi/client.py index 11923dda..c81ca9b2 100644 --- a/harborapi/client.py +++ b/harborapi/client.py @@ -127,6 +127,7 @@ from .retry import retry from .utils import get_artifact_path from .utils import get_basicauth +from .utils import get_mime_type_header from .utils import get_params from .utils import get_project_headers from .utils import get_repo_path @@ -2336,6 +2337,100 @@ async def get_project_summary( ) return self.construct_model(ProjectSummary, summary) + # GET /projects/{project_name_or_id}/artifacts + async def get_project_artifacts( + self, + project_name_or_id: Union[str, int], + query: Optional[str] = None, + sort: Optional[str] = None, + page: int = 1, + page_size: int = 10, + limit: Optional[int] = None, + latest: bool = False, + with_tag: bool = False, + with_label: bool = False, + with_scan_overview: bool = False, + with_sbom_overview: bool = False, + with_immutable_status: bool = False, + with_accessory: bool = False, + mime_type: Union[str, Sequence[str]] = DEFAULT_MIME_TYPES, + ) -> List[Artifact]: + """Get artifatcs for a project. + + Parameters + ---------- + project_name_or_id : Union[str, int] + The name or ID of the project + String arguments are treated as project names. + Integer arguments are treated as project IDs. + query : Optional[str] + Query string to filter the artifacts. + + Supported query patterns are: + + * exact match(`"k=v"`) + * fuzzy match(`"k=~v"`) + * range(`"k=[min~max]"`) + * list with union releationship(`"k={v1 v2 v3}"`) + * list with intersection relationship(`"k=(v1 v2 v3)`). + + The value of range and list can be: + + * string(enclosed by `"` or `'`) + * integer + * time(in format `"2020-04-09 02:36:00"`) + + All of these query patterns should be put in the query string + and separated by `","`. e.g. `"k1=v1,k2=~v2,k3=[min~max]"` + sort : Optional[str] + The sort order of the artifacts. + page : int + The page of results to return + page_size : int + The number of results to return per page + limit : Optional[int] + The maximum number of results to return + latest : bool + Whether to return only the latest version of each artifact + with_tag : bool + Whether to include the tags of the artifacts + with_label : bool + Whether to include the labels of the artifacts + with_scan_overview : bool + Whether to include the scan overview of the artifacts + with_sbom_overview : bool + Whether to include the SBOM overview of the artifacts + with_immutable_status : bool + Whether to include the immutable status of the artifacts + with_accessory : bool + Whether to include the accessory of the artifacts + mime_type : Union[str, Sequence[str]] + MIME types for the scan report or scan summary. The first mime type will be used when a report is found for it. + Can be a list of MIME types or a single MIME type. + """ + params = get_params( + q=query, + sort=sort, + page=page, + page_size=page_size, + latest=latest, + with_tag=with_tag, + with_label=with_label, + with_scan_overview=with_scan_overview, + with_sbom_overview=with_sbom_overview, + with_immutable_status=with_immutable_status, + with_accessory=with_accessory, + ) + headers = get_project_headers(project_name_or_id) + headers.update(get_mime_type_header(mime_type)) + resp = await self.get( + f"/projects/{project_name_or_id}/artifacts", + params=params, + headers=headers, + limit=limit, + ) + return self.construct_model(Artifact, resp, is_list=True) + # GET /projects/{project_name_or_id}/_deletable async def get_project_deletable( self, project_name_or_id: Union[str, int] @@ -3512,7 +3607,7 @@ async def copy_artifact( ) return urldecode_header(resp, "Location") - # GET /projects/{project_name}/repositories/{repository_name}/artifacts + # GET /projects/{project_name_or_id}/repositories/{repository_name}/artifacts async def get_artifacts( self, project_name: str, @@ -3525,10 +3620,11 @@ async def get_artifacts( with_tag: bool = True, with_label: bool = False, with_scan_overview: bool = False, + with_sbom_overview: bool = False, with_signature: bool = False, with_immutable_status: bool = False, with_accessory: bool = False, - mime_type: str = "application/vnd.security.vulnerability.report; version=1.1", + mime_type: Union[str, Sequence[str]] = DEFAULT_MIME_TYPES, **kwargs: Any, ) -> List[Artifact]: """Get the artifacts in a repository. @@ -3572,6 +3668,8 @@ async def get_artifacts( Whether to include the labels of the artifact in the response with_scan_overview : bool Whether to include the scan overview of the artifact in the response + with_sbom_overview : bool + Whether to include the SBOM overview of the artifacts with_signature : bool Whether the signature is included inside the tags of the returning artifacts. Only works when setting `with_tag==True`. @@ -3579,14 +3677,9 @@ async def get_artifacts( Whether the immutable status is included inside the tags of the returning artifacts. with_accessory : bool Whether the accessories are included of the returning artifacts. - mime_type : str - A comma-separated lists of MIME types for the scan report or scan summary. - The first mime type will be used when the report found for it. - Currently the mime type supports: - - * application/vnd.scanner.adapter.vuln.report.harbor+json; version=1.0 - * application/vnd.security.vulnerability.report; version=1.1 - + mime_type : Union[str, Sequence[str]] + MIME types for the scan report or scan summary. The first mime type will be used when a report is found for it. + Can be a list of MIME types or a single MIME type. Returns ------- List[Artifact] @@ -3601,6 +3694,7 @@ async def get_artifacts( with_tag=with_tag, with_label=with_label, with_scan_overview=with_scan_overview, + with_sbom_overview=with_scan_overview, with_signature=with_signature, with_immutable_status=with_immutable_status, with_accessory=with_accessory, @@ -3608,7 +3702,7 @@ async def get_artifacts( resp = await self.get( path, params=params, - headers={"X-Accept-Vulnerabilities": mime_type}, + headers=get_mime_type_header(mime_type), limit=limit, ) return self.construct_model(Artifact, resp, is_list=True) @@ -3655,7 +3749,8 @@ async def get_artifact( with_signature: bool = False, with_immutable_status: bool = False, with_accessory: bool = False, - mime_type: str = "application/vnd.security.vulnerability.report; version=1.1", + with_sbom_overview: bool = False, + mime_type: Union[str, Sequence[str]] = DEFAULT_MIME_TYPES, ) -> Artifact: """Get an artifact. @@ -3684,6 +3779,8 @@ async def get_artifact( Whether the immutable status is included inside the tags of the returning artifact. with_accessory : bool Whether the accessories are included of the returning artifact. + with_sbom_overview : bool + Whether the sbom overview is included of the returning artifact. mime_type : str A comma-separated lists of MIME types for the scan report or scan summary. The first mime type will be used when the report found for it. @@ -3709,8 +3806,9 @@ async def get_artifact( "with_signature": with_signature, "with_immutable_status": with_immutable_status, "with_accessory": with_accessory, + "with_sbom_overview": with_sbom_overview, }, - headers={"X-Accept-Vulnerabilities": mime_type}, + headers=get_mime_type_header(mime_type), ) return self.construct_model(Artifact, resp) @@ -3841,15 +3939,7 @@ async def get_artifact_vulnerability_reports( """ path = get_artifact_path(project_name, repository_name, reference) url = f"{path}/additions/vulnerabilities" - # NOTE: in the offical API spec, a comma AND space is used to separate: - # https://github.com/goharbor/harbor/blob/df4ab856c7597e6fe28b466ba8419257de8a1af7/api/v2.0/swagger.yaml#L6256 - if not isinstance(mime_type, str): - mime_type_param = ", ".join(mime_type) - else: - mime_type_param = mime_type - resp = await self.get( - url, headers={"X-Accept-Vulnerabilities": mime_type_param} - ) + resp = await self.get(url, headers=get_mime_type_header(mime_type)) if not isinstance(resp, dict): raise UnprocessableEntity(f"Unable to process response from {url}: {resp}") reports: FirstDict[str, HarborVulnerabilityReport] = FirstDict() diff --git a/harborapi/utils.py b/harborapi/utils.py index 62aeb7a2..5eaf8280 100644 --- a/harborapi/utils.py +++ b/harborapi/utils.py @@ -5,6 +5,7 @@ from json import JSONDecodeError from typing import Dict from typing import Optional +from typing import Sequence from typing import Union from typing import cast from urllib.parse import quote_plus @@ -255,6 +256,16 @@ def get_project_headers(project_name_or_id: Union[str, int]) -> Dict[str, str]: return {"X-Is-Resource-Name": str(isinstance(project_name_or_id, str)).lower()} +def get_mime_type_header(mime_type: Union[str, Sequence[str]]) -> Dict[str, str]: + # NOTE: in the offical API spec, a comma AND space is used to separate: + # https://github.com/goharbor/harbor/blob/df4ab856c7597e6fe28b466ba8419257de8a1af7/api/v2.0/swagger.yaml#L6256 + if not isinstance(mime_type, str): + mime_type_param = ", ".join(mime_type) + else: + mime_type_param = mime_type + return {"X-Accept-Vulnerabilities": mime_type_param} + + def get_params(**kwargs: QueryParamValue) -> QueryParamMapping: """Get parameters for an API call as a dict, where `None` values are ignored. diff --git a/tests/endpoints/test_projects.py b/tests/endpoints/test_projects.py index ecd4d39d..b2d105ce 100644 --- a/tests/endpoints/test_projects.py +++ b/tests/endpoints/test_projects.py @@ -11,6 +11,7 @@ from pytest_httpserver import HTTPServer from harborapi.client import HarborAsyncClient +from harborapi.models.models import Artifact from harborapi.models.models import AuditLog from harborapi.models.models import Project from harborapi.models.models import ProjectDeletable @@ -207,6 +208,23 @@ async def test_get_project_summary_mock( assert resp == project_summary +@pytest.mark.asyncio +@given(st.lists(st.builds(Artifact))) +@settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) +async def test_get_project_artifacts_mock( + async_client: HarborAsyncClient, + httpserver: HTTPServer, + artifacts: List[Artifact], +): + httpserver.expect_oneshot_request( + "/api/v2.0/projects/1234/artifacts", + method="GET", + ).respond_with_data(json_from_list(artifacts), content_type="application/json") + + resp = await async_client.get_project_artifacts("1234") + assert resp == artifacts + + @pytest.mark.asyncio @given(st.builds(ProjectDeletable)) @settings(suppress_health_check=[HealthCheck.function_scoped_fixture])