|
23 | 23 | import pretend
|
24 | 24 | import pytest
|
25 | 25 |
|
| 26 | +from pypi_attestation_models import ( |
| 27 | + Attestation, |
| 28 | + Envelope, |
| 29 | + VerificationError, |
| 30 | + VerificationMaterial, |
| 31 | +) |
26 | 32 | from pyramid.httpexceptions import HTTPBadRequest, HTTPForbidden, HTTPTooManyRequests
|
| 33 | +from sigstore.verify import Verifier |
27 | 34 | from sqlalchemy import and_, exists
|
28 | 35 | from sqlalchemy.exc import IntegrityError
|
29 | 36 | from sqlalchemy.orm import joinedload
|
@@ -2385,6 +2392,85 @@ def test_upload_fails_without_oidc_publisher_permission(
|
2385 | 2392 | "See /the/help/url/ for more information."
|
2386 | 2393 | ).format(project.name)
|
2387 | 2394 |
|
| 2395 | + def test_upload_attestation_fails_without_oidc_publisher( |
| 2396 | + self, |
| 2397 | + monkeypatch, |
| 2398 | + pyramid_config, |
| 2399 | + db_request, |
| 2400 | + metrics, |
| 2401 | + project_service, |
| 2402 | + macaroon_service, |
| 2403 | + ): |
| 2404 | + project = ProjectFactory.create() |
| 2405 | + owner = UserFactory.create() |
| 2406 | + maintainer = UserFactory.create() |
| 2407 | + RoleFactory.create(user=owner, project=project, role_name="Owner") |
| 2408 | + RoleFactory.create(user=maintainer, project=project, role_name="Maintainer") |
| 2409 | + |
| 2410 | + EmailFactory.create(user=maintainer) |
| 2411 | + db_request.user = maintainer |
| 2412 | + raw_macaroon, macaroon = macaroon_service.create_macaroon( |
| 2413 | + "fake location", |
| 2414 | + "fake description", |
| 2415 | + [caveats.RequestUser(user_id=str(maintainer.id))], |
| 2416 | + user_id=maintainer.id, |
| 2417 | + ) |
| 2418 | + identity = UserTokenContext(maintainer, macaroon) |
| 2419 | + |
| 2420 | + filename = "{}-{}.tar.gz".format(project.name, "1.0") |
| 2421 | + attestation = Attestation( |
| 2422 | + version=1, |
| 2423 | + verification_material=VerificationMaterial( |
| 2424 | + certificate="some_cert", transparency_entries=[dict()] |
| 2425 | + ), |
| 2426 | + envelope=Envelope( |
| 2427 | + statement="somebase64string", |
| 2428 | + signature="somebase64string", |
| 2429 | + ), |
| 2430 | + ) |
| 2431 | + |
| 2432 | + pyramid_config.testing_securitypolicy(identity=identity) |
| 2433 | + db_request.POST = MultiDict( |
| 2434 | + { |
| 2435 | + "metadata_version": "1.2", |
| 2436 | + "name": project.name, |
| 2437 | + "attestations": f"[{attestation.model_dump_json()}]", |
| 2438 | + "version": "1.0", |
| 2439 | + "filetype": "sdist", |
| 2440 | + "md5_digest": _TAR_GZ_PKG_MD5, |
| 2441 | + "content": pretend.stub( |
| 2442 | + filename=filename, |
| 2443 | + file=io.BytesIO(_TAR_GZ_PKG_TESTDATA), |
| 2444 | + type="application/tar", |
| 2445 | + ), |
| 2446 | + } |
| 2447 | + ) |
| 2448 | + |
| 2449 | + storage_service = pretend.stub(store=lambda path, filepath, meta: None) |
| 2450 | + extract_http_macaroon = pretend.call_recorder(lambda r, _: raw_macaroon) |
| 2451 | + monkeypatch.setattr( |
| 2452 | + security_policy, "_extract_http_macaroon", extract_http_macaroon |
| 2453 | + ) |
| 2454 | + |
| 2455 | + db_request.find_service = lambda svc, name=None, context=None: { |
| 2456 | + IFileStorage: storage_service, |
| 2457 | + IMacaroonService: macaroon_service, |
| 2458 | + IMetricsService: metrics, |
| 2459 | + IProjectService: project_service, |
| 2460 | + }.get(svc) |
| 2461 | + db_request.user_agent = "warehouse-tests/6.6.6" |
| 2462 | + |
| 2463 | + with pytest.raises(HTTPBadRequest) as excinfo: |
| 2464 | + legacy.file_upload(db_request) |
| 2465 | + |
| 2466 | + resp = excinfo.value |
| 2467 | + |
| 2468 | + assert resp.status_code == 400 |
| 2469 | + assert resp.status == ( |
| 2470 | + "400 Attestations are currently only supported when using Trusted " |
| 2471 | + "Publishing with GitHub Actions." |
| 2472 | + ) |
| 2473 | + |
2388 | 2474 | @pytest.mark.parametrize(
|
2389 | 2475 | "plat",
|
2390 | 2476 | [
|
@@ -3293,6 +3379,247 @@ def test_upload_succeeds_creates_release(
|
3293 | 3379 | ),
|
3294 | 3380 | ]
|
3295 | 3381 |
|
| 3382 | + def test_upload_with_valid_attestation_succeeds( |
| 3383 | + self, |
| 3384 | + monkeypatch, |
| 3385 | + pyramid_config, |
| 3386 | + db_request, |
| 3387 | + metrics, |
| 3388 | + ): |
| 3389 | + from warehouse.events.models import HasEvents |
| 3390 | + |
| 3391 | + project = ProjectFactory.create() |
| 3392 | + version = "1.0" |
| 3393 | + publisher = GitHubPublisherFactory.create(projects=[project]) |
| 3394 | + claims = { |
| 3395 | + "sha": "somesha", |
| 3396 | + "repository": f"{publisher.repository_owner}/{publisher.repository_name}", |
| 3397 | + "workflow": "workflow_name", |
| 3398 | + } |
| 3399 | + identity = PublisherTokenContext(publisher, SignedClaims(claims)) |
| 3400 | + db_request.oidc_publisher = identity.publisher |
| 3401 | + db_request.oidc_claims = identity.claims |
| 3402 | + |
| 3403 | + db_request.db.add(Classifier(classifier="Environment :: Other Environment")) |
| 3404 | + db_request.db.add(Classifier(classifier="Programming Language :: Python")) |
| 3405 | + |
| 3406 | + filename = "{}-{}.tar.gz".format(project.name, "1.0") |
| 3407 | + attestation = Attestation( |
| 3408 | + version=1, |
| 3409 | + verification_material=VerificationMaterial( |
| 3410 | + certificate="somebase64string", transparency_entries=[dict()] |
| 3411 | + ), |
| 3412 | + envelope=Envelope( |
| 3413 | + statement="somebase64string", |
| 3414 | + signature="somebase64string", |
| 3415 | + ), |
| 3416 | + ) |
| 3417 | + |
| 3418 | + pyramid_config.testing_securitypolicy(identity=identity) |
| 3419 | + db_request.user = None |
| 3420 | + db_request.user_agent = "warehouse-tests/6.6.6" |
| 3421 | + db_request.POST = MultiDict( |
| 3422 | + { |
| 3423 | + "metadata_version": "1.2", |
| 3424 | + "name": project.name, |
| 3425 | + "attestations": f"[{attestation.model_dump_json()}]", |
| 3426 | + "version": version, |
| 3427 | + "summary": "This is my summary!", |
| 3428 | + "filetype": "sdist", |
| 3429 | + "md5_digest": _TAR_GZ_PKG_MD5, |
| 3430 | + "content": pretend.stub( |
| 3431 | + filename=filename, |
| 3432 | + file=io.BytesIO(_TAR_GZ_PKG_TESTDATA), |
| 3433 | + type="application/tar", |
| 3434 | + ), |
| 3435 | + } |
| 3436 | + ) |
| 3437 | + |
| 3438 | + storage_service = pretend.stub(store=lambda path, filepath, meta: None) |
| 3439 | + db_request.find_service = lambda svc, name=None, context=None: { |
| 3440 | + IFileStorage: storage_service, |
| 3441 | + IMetricsService: metrics, |
| 3442 | + }.get(svc) |
| 3443 | + |
| 3444 | + record_event = pretend.call_recorder( |
| 3445 | + lambda self, *, tag, request=None, additional: None |
| 3446 | + ) |
| 3447 | + monkeypatch.setattr(HasEvents, "record_event", record_event) |
| 3448 | + |
| 3449 | + verify = pretend.call_recorder(lambda _self, _verifier, _policy, _dist: None) |
| 3450 | + monkeypatch.setattr(Attestation, "verify", verify) |
| 3451 | + monkeypatch.setattr(Verifier, "production", lambda: pretend.stub()) |
| 3452 | + |
| 3453 | + resp = legacy.file_upload(db_request) |
| 3454 | + |
| 3455 | + assert resp.status_code == 200 |
| 3456 | + |
| 3457 | + assert len(verify.calls) == 1 |
| 3458 | + |
| 3459 | + def test_upload_with_malformed_attestation_fails( |
| 3460 | + self, |
| 3461 | + monkeypatch, |
| 3462 | + pyramid_config, |
| 3463 | + db_request, |
| 3464 | + metrics, |
| 3465 | + ): |
| 3466 | + from warehouse.events.models import HasEvents |
| 3467 | + |
| 3468 | + project = ProjectFactory.create() |
| 3469 | + version = "1.0" |
| 3470 | + publisher = GitHubPublisherFactory.create(projects=[project]) |
| 3471 | + claims = { |
| 3472 | + "sha": "somesha", |
| 3473 | + "repository": f"{publisher.repository_owner}/{publisher.repository_name}", |
| 3474 | + "workflow": "workflow_name", |
| 3475 | + } |
| 3476 | + identity = PublisherTokenContext(publisher, SignedClaims(claims)) |
| 3477 | + db_request.oidc_publisher = identity.publisher |
| 3478 | + db_request.oidc_claims = identity.claims |
| 3479 | + |
| 3480 | + db_request.db.add(Classifier(classifier="Environment :: Other Environment")) |
| 3481 | + db_request.db.add(Classifier(classifier="Programming Language :: Python")) |
| 3482 | + |
| 3483 | + filename = "{}-{}.tar.gz".format(project.name, "1.0") |
| 3484 | + |
| 3485 | + pyramid_config.testing_securitypolicy(identity=identity) |
| 3486 | + db_request.user = None |
| 3487 | + db_request.user_agent = "warehouse-tests/6.6.6" |
| 3488 | + db_request.POST = MultiDict( |
| 3489 | + { |
| 3490 | + "metadata_version": "1.2", |
| 3491 | + "name": project.name, |
| 3492 | + "attestations": "[{'a_malformed_attestation': 3}]", |
| 3493 | + "version": version, |
| 3494 | + "summary": "This is my summary!", |
| 3495 | + "filetype": "sdist", |
| 3496 | + "md5_digest": _TAR_GZ_PKG_MD5, |
| 3497 | + "content": pretend.stub( |
| 3498 | + filename=filename, |
| 3499 | + file=io.BytesIO(_TAR_GZ_PKG_TESTDATA), |
| 3500 | + type="application/tar", |
| 3501 | + ), |
| 3502 | + } |
| 3503 | + ) |
| 3504 | + |
| 3505 | + storage_service = pretend.stub(store=lambda path, filepath, meta: None) |
| 3506 | + db_request.find_service = lambda svc, name=None, context=None: { |
| 3507 | + IFileStorage: storage_service, |
| 3508 | + IMetricsService: metrics, |
| 3509 | + }.get(svc) |
| 3510 | + |
| 3511 | + record_event = pretend.call_recorder( |
| 3512 | + lambda self, *, tag, request=None, additional: None |
| 3513 | + ) |
| 3514 | + monkeypatch.setattr(HasEvents, "record_event", record_event) |
| 3515 | + |
| 3516 | + with pytest.raises(HTTPBadRequest) as excinfo: |
| 3517 | + legacy.file_upload(db_request) |
| 3518 | + |
| 3519 | + resp = excinfo.value |
| 3520 | + |
| 3521 | + assert resp.status_code == 400 |
| 3522 | + assert resp.status.startswith( |
| 3523 | + "400 Error while decoding the included attestation:" |
| 3524 | + ) |
| 3525 | + |
| 3526 | + @pytest.mark.parametrize( |
| 3527 | + "verify_exception, expected_msg", |
| 3528 | + [ |
| 3529 | + ( |
| 3530 | + VerificationError, |
| 3531 | + "400 Could not verify the uploaded artifact using the included " |
| 3532 | + "attestation", |
| 3533 | + ), |
| 3534 | + ( |
| 3535 | + ValueError, |
| 3536 | + "400 Unknown error while trying to verify included attestations", |
| 3537 | + ), |
| 3538 | + ], |
| 3539 | + ) |
| 3540 | + def test_upload_with_failing_attestation_verification( |
| 3541 | + self, |
| 3542 | + monkeypatch, |
| 3543 | + pyramid_config, |
| 3544 | + db_request, |
| 3545 | + metrics, |
| 3546 | + verify_exception, |
| 3547 | + expected_msg, |
| 3548 | + ): |
| 3549 | + from warehouse.events.models import HasEvents |
| 3550 | + |
| 3551 | + project = ProjectFactory.create() |
| 3552 | + version = "1.0" |
| 3553 | + publisher = GitHubPublisherFactory.create(projects=[project]) |
| 3554 | + claims = { |
| 3555 | + "sha": "somesha", |
| 3556 | + "repository": f"{publisher.repository_owner}/{publisher.repository_name}", |
| 3557 | + "workflow": "workflow_name", |
| 3558 | + } |
| 3559 | + identity = PublisherTokenContext(publisher, SignedClaims(claims)) |
| 3560 | + db_request.oidc_publisher = identity.publisher |
| 3561 | + db_request.oidc_claims = identity.claims |
| 3562 | + |
| 3563 | + db_request.db.add(Classifier(classifier="Environment :: Other Environment")) |
| 3564 | + db_request.db.add(Classifier(classifier="Programming Language :: Python")) |
| 3565 | + |
| 3566 | + filename = "{}-{}.tar.gz".format(project.name, "1.0") |
| 3567 | + attestation = Attestation( |
| 3568 | + version=1, |
| 3569 | + verification_material=VerificationMaterial( |
| 3570 | + certificate="somebase64string", transparency_entries=[dict()] |
| 3571 | + ), |
| 3572 | + envelope=Envelope( |
| 3573 | + statement="somebase64string", |
| 3574 | + signature="somebase64string", |
| 3575 | + ), |
| 3576 | + ) |
| 3577 | + |
| 3578 | + pyramid_config.testing_securitypolicy(identity=identity) |
| 3579 | + db_request.user = None |
| 3580 | + db_request.user_agent = "warehouse-tests/6.6.6" |
| 3581 | + db_request.POST = MultiDict( |
| 3582 | + { |
| 3583 | + "metadata_version": "1.2", |
| 3584 | + "name": project.name, |
| 3585 | + "attestations": f"[{attestation.model_dump_json()}]", |
| 3586 | + "version": version, |
| 3587 | + "summary": "This is my summary!", |
| 3588 | + "filetype": "sdist", |
| 3589 | + "md5_digest": _TAR_GZ_PKG_MD5, |
| 3590 | + "content": pretend.stub( |
| 3591 | + filename=filename, |
| 3592 | + file=io.BytesIO(_TAR_GZ_PKG_TESTDATA), |
| 3593 | + type="application/tar", |
| 3594 | + ), |
| 3595 | + } |
| 3596 | + ) |
| 3597 | + |
| 3598 | + storage_service = pretend.stub(store=lambda path, filepath, meta: None) |
| 3599 | + db_request.find_service = lambda svc, name=None, context=None: { |
| 3600 | + IFileStorage: storage_service, |
| 3601 | + IMetricsService: metrics, |
| 3602 | + }.get(svc) |
| 3603 | + |
| 3604 | + record_event = pretend.call_recorder( |
| 3605 | + lambda self, *, tag, request=None, additional: None |
| 3606 | + ) |
| 3607 | + monkeypatch.setattr(HasEvents, "record_event", record_event) |
| 3608 | + |
| 3609 | + def failing_verify(_self, _verifier, _policy, _dist): |
| 3610 | + raise verify_exception("error") |
| 3611 | + |
| 3612 | + monkeypatch.setattr(Attestation, "verify", failing_verify) |
| 3613 | + monkeypatch.setattr(Verifier, "production", lambda: pretend.stub()) |
| 3614 | + |
| 3615 | + with pytest.raises(HTTPBadRequest) as excinfo: |
| 3616 | + legacy.file_upload(db_request) |
| 3617 | + |
| 3618 | + resp = excinfo.value |
| 3619 | + |
| 3620 | + assert resp.status_code == 400 |
| 3621 | + assert resp.status.startswith(expected_msg) |
| 3622 | + |
3296 | 3623 | @pytest.mark.parametrize(
|
3297 | 3624 | "version, expected_version",
|
3298 | 3625 | [
|
|
0 commit comments