Skip to content

Commit 19114ad

Browse files
committed
refactor(legacy_soap_api): Get tests passing
1 parent 628f075 commit 19114ad

File tree

12 files changed

+261
-25
lines changed

12 files changed

+261
-25
lines changed

api/src/legacy_soap_api/grantors/services/get_application_zip_response.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import logging
2+
from src.auth.endpoint_access_util import verify_access
3+
from src.legacy_soap_api.legacy_soap_api_schemas import SOAPRequest
24
import uuid
35

46
from botocore.exceptions import ClientError
@@ -8,13 +10,14 @@
810
from src.db.models.competition_models import ApplicationSubmission
911
from src.legacy_soap_api.grantors import schemas as grantor_schemas
1012
from src.legacy_soap_api.legacy_soap_api_constants import LegacySoapApiEvent
13+
from src.legacy_soap_api.legacy_soap_api_auth import validate_certificate, ENDPOINT_PRIVILEGES
1114
from src.util import file_util
1215

1316
logger = logging.getLogger(__name__)
1417

1518

1619
def get_application_zip_response(
17-
db_session: db.Session, get_application_zip_request: grantor_schemas.GetApplicationZipRequest
20+
db_session: db.Session, soap_request: SOAPRequest, get_application_zip_request: grantor_schemas.GetApplicationZipRequest
1821
) -> grantor_schemas.GetApplicationZipResponseSOAPEnvelope:
1922
xop_data_instance = grantor_schemas.XOPIncludeData(
2023
**{"@href": f"cid:{uuid.uuid4()}[email protected]"}
@@ -35,10 +38,19 @@ def get_application_zip_response(
3538
legacy_tracking_number = legacy_tracking_number.split("GRANT")[1]
3639
application = db_session.execute(
3740
select(ApplicationSubmission).where(
38-
ApplicationSubmission.legacy_tracking_number == int(legacy_tracking_number)
41+
ApplicationSubmission.legacy_tracking_number == int(legacy_tracking_number),
3942
)
4043
).scalar()
44+
4145
if application:
46+
certificate = validate_certificate(db_session, soap_auth=soap_request.auth, api_name=soap_request.api_name)
47+
if soap_request.operation_name in ENDPOINT_PRIVILEGES:
48+
verify_access(
49+
certificate.user,
50+
{ENDPOINT_PRIVILEGES[soap_request.operation_name]},
51+
application.application,
52+
)
53+
4254
try:
4355
filestream = file_util.open_stream(application.download_path, mode="rb")
4456
schema._mtom_file_stream = filestream

api/src/legacy_soap_api/legacy_soap_api_auth.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,25 @@
99
from pydantic import BaseModel
1010
from requests.adapters import HTTPAdapter
1111

12+
import src.adapters.db as db
13+
from src.constants.lookup_constants import Privilege
14+
from src.db.models.user_models import LegacyCertificate
15+
from src.legacy_soap_api.legacy_soap_api_config import SimplerSoapAPI
1216
from src.legacy_soap_api.legacy_soap_api_constants import LegacySoapApiEvent
17+
from src.util.datetime_util import get_now_us_eastern_date
1318

1419
logger = logging.getLogger(__name__)
1520

1621
MTLS_CERT_HEADER_KEY = "X-Amzn-Mtls-Clientcert"
1722

23+
ENDPOINT_PRIVILEGES = dict(
24+
GetSubmissionListExpandedRequest=Privilege.LEGACY_AGENCY_VIEWER,
25+
GetApplicationRequest=Privilege.LEGACY_AGENCY_GRANT_RETRIEVER,
26+
GetApplicationZipRequest=Privilege.LEGACY_AGENCY_GRANT_RETRIEVER,
27+
ConfirmApplicationDeliveryRequest=Privilege.LEGACY_AGENCY_GRANT_RETRIEVER,
28+
UpdateApplicationInfoReqest=Privilege.LEGACY_AGENCY_ASSIGNER,
29+
)
30+
1831

1932
class SOAPClientCertificateNotConfigured(Exception):
2033
pass
@@ -100,3 +113,45 @@ def get_soap_client_certificate(urlencoded_cert: str) -> SOAPClientCertificate:
100113
issuer=cert.issuer.rfc4514_string(),
101114
serial_number=cert.serial_number,
102115
)
116+
117+
118+
def validate_certificate(
119+
db_session: db.Session, soap_auth: SOAPAuth | None, api_name: str
120+
) -> LegacyCertificate:
121+
if not soap_auth:
122+
logger.warning(
123+
"soap_client_certificate: no soap auth",
124+
)
125+
raise SOAPClientCertificateLookupError("no soap auth")
126+
127+
serial_number_str = str(soap_auth.certificate.serial_number)
128+
legacy_certificate = (
129+
db_session.query(LegacyCertificate)
130+
.filter(LegacyCertificate.serial_number == serial_number_str)
131+
.first()
132+
)
133+
extra_data = {"serial_number": serial_number_str}
134+
135+
if not legacy_certificate:
136+
logger.warning(
137+
"soap_client_certificate: could not retrieve client cert for serial number",
138+
extra=extra_data,
139+
)
140+
raise SOAPClientCertificateLookupError("could not retrieve client cert for serial number")
141+
142+
extra_data.update({"legacy_certificate_id": str(legacy_certificate.legacy_certificate_id)})
143+
if legacy_certificate.expiration_date <= get_now_us_eastern_date():
144+
logger.warning(
145+
"soap_client_certificate: certificate is expired",
146+
extra=extra_data,
147+
)
148+
raise SOAPClientCertificateLookupError("certificate is expired")
149+
150+
if not legacy_certificate.agency and api_name == SimplerSoapAPI.GRANTORS:
151+
logger.warning(
152+
"soap_client_certificate: certificate does not have agency",
153+
extra=extra_data,
154+
)
155+
raise SOAPClientCertificateLookupError("certificate does not have agency")
156+
157+
return legacy_certificate

api/src/legacy_soap_api/legacy_soap_api_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ class SimplerGrantorsS2SClient(BaseSOAPClient):
225225
def GetApplicationZipRequest(self) -> grantors_schemas.GetApplicationZipResponseSOAPEnvelope:
226226
return get_application_zip_response(
227227
db_session=self.db_session,
228+
soap_request=self.soap_request,
228229
get_application_zip_request=grantors_schemas.GetApplicationZipRequest(
229230
**self.get_soap_request_dict()
230231
),

api/src/legacy_soap_api/legacy_soap_api_routes.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
import src.adapters.db as db
66
import src.adapters.db.flask_db as flask_db
7-
from src.legacy_soap_api.legacy_soap_api_auth import MTLS_CERT_HEADER_KEY, get_soap_auth
7+
from src.legacy_soap_api.legacy_soap_api_auth import (
8+
MTLS_CERT_HEADER_KEY,
9+
get_soap_auth,
10+
)
811
from src.legacy_soap_api.legacy_soap_api_blueprint import legacy_soap_api_blueprint
912
from src.legacy_soap_api.legacy_soap_api_constants import LegacySoapApiEvent
1013
from src.legacy_soap_api.legacy_soap_api_proxy import get_proxy_response

api/tests/src/db/models/factories.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3149,7 +3149,7 @@ class Meta:
31493149

31503150
legacy_certificate_id = Generators.UuidObj
31513151
cert_id = factory.Faker("random_int", min=1000, max=10000000)
3152-
serial_number = factory.Faker("random_int", min=1000, max=10000000)
3152+
serial_number = factory.Sequence(lambda n: f"{n}")
31533153
expiration_date = factory.Faker("future_date", end_date="+2y")
31543154
user_id = factory.LazyAttribute(lambda s: s.user.user_id)
31553155
user = factory.SubFactory(UserFactory)

api/tests/src/legacy_soap_api/test_legacy_soap_api_auth.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from datetime import date
2+
from src.constants.lookup_constants import Privilege
3+
import uuid
14
from unittest.mock import patch
25

36
import pytest
@@ -8,6 +11,15 @@
811
SOAPClientCertificateLookupError,
912
SOAPClientCertificateNotConfigured,
1013
get_soap_auth,
14+
validate_certificate,
15+
)
16+
from src.legacy_soap_api.legacy_soap_api_config import SimplerSoapAPI
17+
from tests.src.db.models.factories import (
18+
AgencyFactory,
19+
AgencyUserFactory,
20+
AgencyUserRoleFactory,
21+
LegacyAgencyCertificateFactory,
22+
RoleFactory,
1123
)
1224

1325
MOCK_FINGERPRINT = "123"
@@ -19,6 +31,7 @@
1931
fingerprint=MOCK_FINGERPRINT,
2032
serial_number=123,
2133
)
34+
AGENCY_CODE = "ABC123XYZ"
2235

2336

2437
@patch("src.legacy_soap_api.legacy_soap_api_auth.get_soap_client_certificate")
@@ -47,3 +60,87 @@ def test_client_auth_exceptions():
4760
SOAPClientCertificateLookupError, match="could not retrieve client cert for serial number"
4861
):
4962
auth.certificate.get_pem({MOCK_FINGERPRINT: "not-an-object"})
63+
64+
65+
def test_validate_certificate_raies_error_when_no_legacy_certificate_found(
66+
enable_factory_create, fixture_from_file, db_session
67+
) -> None:
68+
soap_auth = SOAPAuth(
69+
certificate={
70+
"cert": "MOCKED_CERT_STRING_HERE",
71+
"serial_number": "7000",
72+
"fingerprint": "MOCKED_FINGERPRINT",
73+
}
74+
)
75+
with pytest.raises(
76+
SOAPClientCertificateLookupError, match="could not retrieve client cert for serial number"
77+
):
78+
validate_certificate(
79+
db_session, soap_auth, SimplerSoapAPI.GRANTORS
80+
)
81+
82+
83+
def test_validate_certificate_raises_error_when_certificate_expired(
84+
enable_factory_create, db_session
85+
) -> None:
86+
agency = AgencyFactory.create(agency_code=f"XYZ-{uuid.uuid4()}", is_multilevel_agency=False)
87+
legacy_certificate = LegacyAgencyCertificateFactory.create(expiration_date=date(2004, 1, 1), agency=agency)
88+
soap_auth = SOAPAuth(
89+
certificate={
90+
"cert": "MOCKED_CERT_STRING_HERE",
91+
"serial_number": f"{legacy_certificate.serial_number}",
92+
"fingerprint": "MOCKED_FINGERPRINT",
93+
}
94+
)
95+
with pytest.raises(SOAPClientCertificateLookupError, match="certificate is expired"):
96+
validate_certificate(
97+
db_session, soap_auth, SimplerSoapAPI.GRANTORS
98+
)
99+
100+
101+
def test_validate_certificate_raises_error_when_certificate_has_no_agency_when_grantor(
102+
enable_factory_create, db_session
103+
) -> None:
104+
legacy_certificate = LegacyAgencyCertificateFactory.create(agency=None, agency_id=None)
105+
soap_auth = SOAPAuth(
106+
certificate={
107+
"cert": "MOCKED_CERT_STRING_HERE",
108+
"serial_number": f"{legacy_certificate.serial_number}",
109+
"fingerprint": "MOCKED_FINGERPRINT",
110+
}
111+
)
112+
with pytest.raises(SOAPClientCertificateLookupError, match="certificate does not have agency"):
113+
validate_certificate(
114+
db_session, soap_auth, SimplerSoapAPI.GRANTORS
115+
)
116+
117+
118+
def test_validate_certificate_does_not_raise_agency_error_when_certificate_has_no_agency_when_applicant(
119+
enable_factory_create, db_session
120+
) -> None:
121+
legacy_certificate = LegacyAgencyCertificateFactory.create(agency=None, agency_id=None)
122+
soap_auth = SOAPAuth(
123+
certificate={
124+
"cert": "MOCKED_CERT_STRING_HERE",
125+
"serial_number": f"{legacy_certificate.serial_number}",
126+
"fingerprint": "MOCKED_FINGERPRINT",
127+
}
128+
)
129+
result = validate_certificate(db_session, soap_auth, SimplerSoapAPI.APPLICANTS)
130+
assert result.legacy_certificate_id == legacy_certificate.legacy_certificate_id
131+
132+
133+
def test_validate_certificate_raises_error_if_not_soap_auth(
134+
enable_factory_create, db_session
135+
) -> None:
136+
agency = AgencyFactory.create(agency_code=f"XYZ-{uuid.uuid4()}", is_multilevel_agency=False)
137+
legacy_certificate = LegacyAgencyCertificateFactory.create(agency=agency)
138+
agency_user = AgencyUserFactory.create(
139+
agency=legacy_certificate.agency, user=legacy_certificate.user
140+
)
141+
role = RoleFactory(privileges=[Privilege.LEGACY_AGENCY_VIEWER])
142+
AgencyUserRoleFactory.create(agency_user=agency_user, role=role)
143+
with pytest.raises(SOAPClientCertificateLookupError, match="no soap auth"):
144+
validate_certificate(
145+
db_session, None, SimplerSoapAPI.GRANTORS
146+
)

api/tests/src/legacy_soap_api/test_legacy_soap_api_client.py

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import logging
2+
from src.constants.lookup_constants import Privilege
3+
from apiflask import HTTPError
24
import uuid
35
from collections.abc import Iterator
46
from unittest.mock import MagicMock, patch
@@ -24,10 +26,17 @@
2426
from src.util.datetime_util import parse_grants_gov_date
2527
from tests.lib.db_testing import cascade_delete_from_db_table
2628
from tests.src.db.models.factories import (
29+
AgencyFactory,
2730
ApplicationSubmissionFactory,
2831
CompetitionFactory,
2932
OpportunityAssistanceListingFactory,
3033
OpportunityFactory,
34+
LegacyAgencyCertificateFactory,
35+
AgencyUserFactory,
36+
RoleFactory,
37+
AgencyUserRoleFactory,
38+
ApplicationUserFactory,
39+
ApplicationUserRoleFactory,
3140
)
3241
from tests.src.legacy_soap_api.soap_request_templates import (
3342
get_opportunity_list_requests as mock_requests,
@@ -357,10 +366,21 @@ def test_get_simpler_soap_response_when_operation_is_not_get_opportunity_list_re
357366

358367

359368
class TestSimplerSOAPGetApplicationZip:
369+
@patch("src.legacy_soap_api.grantors.services.get_application_zip_response.validate_certificate")
360370
def test_get_simpler_soap_response_returns_mtom_xml(
361-
self, db_session, enable_factory_create, mock_s3_bucket
371+
self, mock_validate_certificate, db_session, enable_factory_create, mock_s3_bucket
362372
):
373+
agency = AgencyFactory.create(agency_code=f"123-{uuid.uuid4()}")
374+
legacy_certificate = LegacyAgencyCertificateFactory.create(agency=agency)
375+
mock_validate_certificate.return_value = legacy_certificate
363376
submission = ApplicationSubmissionFactory.create()
377+
agency_user = AgencyUserFactory.create(
378+
agency=legacy_certificate.agency, user=legacy_certificate.user
379+
)
380+
role = RoleFactory.create(privileges=[Privilege.LEGACY_AGENCY_GRANT_RETRIEVER])
381+
AgencyUserRoleFactory.create(agency_user=agency_user, role=role)
382+
application_user = ApplicationUserFactory.create(application=submission.application, user=legacy_certificate.user)
383+
ApplicationUserRoleFactory.create(application_user=application_user, role=role)
364384
response = requests.get(submission.download_path, timeout=10)
365385
submission_text = response.content.decode()
366386
request_xml_bytes = (
@@ -406,11 +426,52 @@ def test_get_simpler_soap_response_returns_mtom_xml(
406426
"MIME-Version": "1.0",
407427
}
408428

429+
@patch("src.legacy_soap_api.grantors.services.get_application_zip_response.validate_certificate")
430+
def test_get_simpler_soap_response_returns_error_if_certificate_user_does_not_have_permissions(
431+
self, mock_validate_certificate, db_session, enable_factory_create, mock_s3_bucket
432+
):
433+
submission = ApplicationSubmissionFactory.create()
434+
request_xml_bytes = (
435+
'<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" '
436+
'xmlns:agen="http://apply.grants.gov/services/AgencyWebServices-V2.0" '
437+
'xmlns:gran="http://apply.grants.gov/system/GrantsCommonElements-V1.0">'
438+
"<soapenv:Header/>"
439+
"<soapenv:Body>"
440+
"<agen:GetApplicationZipRequest>"
441+
f"<gran:GrantsGovTrackingNumber>{submission.legacy_tracking_number}</gran:GrantsGovTrackingNumber>"
442+
"</agen:GetApplicationZipRequest>"
443+
"</soapenv:Body>"
444+
"</soapenv:Envelope>"
445+
).encode("utf-8")
446+
soap_request = SOAPRequest(
447+
data=request_xml_bytes,
448+
full_path="x",
449+
headers={},
450+
method="POST",
451+
api_name=SimplerSoapAPI.GRANTORS,
452+
operation_name="GetApplicationZipRequest",
453+
)
454+
mock_proxy_response = SOAPResponse(data=b"", status_code=500, headers={})
455+
client = SimplerGrantorsS2SClient(soap_request, db_session)
456+
with pytest.raises(HTTPError):
457+
client.get_simpler_soap_response(mock_proxy_response)
458+
459+
@patch("src.legacy_soap_api.grantors.services.get_application_zip_response.validate_certificate")
409460
def test_get_simpler_soap_response_logging_if_downloading_the_file_from_s3_fails(
410-
self, db_session, enable_factory_create, caplog
461+
self, mock_validate_certificate, db_session, enable_factory_create, caplog
411462
):
412463
caplog.set_level(logging.INFO)
464+
agency = AgencyFactory.create(agency_code=f"123-{uuid.uuid4()}")
465+
legacy_certificate = LegacyAgencyCertificateFactory.create(agency=agency)
466+
mock_validate_certificate.return_value = legacy_certificate
413467
submission = ApplicationSubmissionFactory.create()
468+
agency_user = AgencyUserFactory.create(
469+
agency=legacy_certificate.agency, user=legacy_certificate.user
470+
)
471+
role = RoleFactory.create(privileges=[Privilege.LEGACY_AGENCY_GRANT_RETRIEVER])
472+
AgencyUserRoleFactory.create(agency_user=agency_user, role=role)
473+
application_user = ApplicationUserFactory.create(application=submission.application, user=legacy_certificate.user)
474+
ApplicationUserRoleFactory.create(application_user=application_user, role=role)
414475
request_xml_bytes = (
415476
'<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" '
416477
'xmlns:agen="http://apply.grants.gov/services/AgencyWebServices-V2.0" '

0 commit comments

Comments
 (0)