From 7e385b25955a8abf00143440ba2d66f38de93ab0 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 21 Sep 2024 22:25:02 +0200 Subject: [PATCH] Add support for sp:SignedElements --- src/zeep/wsdl/definitions.py | 1 + src/zeep/wsdl/wsdl.py | 33 +++++++++++++++++++++++++++++++++ src/zeep/wsse/signature.py | 25 +++++++++++++++++++++++++ tests/test_wsdl.py | 28 ++++++++++++++++++++++++---- 4 files changed, 83 insertions(+), 4 deletions(-) diff --git a/src/zeep/wsdl/definitions.py b/src/zeep/wsdl/definitions.py index 59e399f8..1dbee0e6 100644 --- a/src/zeep/wsdl/definitions.py +++ b/src/zeep/wsdl/definitions.py @@ -137,6 +137,7 @@ def __init__(self, wsdl, name, port_name): self._operations = {} self.signatures = { "header": [], # Parts of header, that should be signed + "elements": [], # Arbitrary XPath elements that should be signed "body": False, # If body should be signed "everything": False, # If every header should be signed } diff --git a/src/zeep/wsdl/wsdl.py b/src/zeep/wsdl/wsdl.py index 09752934..4ea55199 100644 --- a/src/zeep/wsdl/wsdl.py +++ b/src/zeep/wsdl/wsdl.py @@ -476,6 +476,39 @@ def parse_binding( # If we didn't set "everything" to True, update the headers if not binding.signatures.get("everything", False): binding.signatures["header"] = [dict(header) for header in all_headers] + + # Begin parsing SignedElements assertions + signed_elements = doc.xpath( + 'wsp:Policy[@wsu:Id="{}"]//sp:SignedElements'.format(binding_policy), + namespaces=NSMAP, + ) + + for signed_element in signed_elements: + xpath_version = signed_element.get('XPathVersion', 'http://www.w3.org/TR/1999/REC-xpath-19991116') # Default to XPath 1.0 if not specified + + xpath_expressions = signed_element.xpath('sp:XPath', namespaces=NSMAP) + + for xpath in xpath_expressions: + xpath_string = xpath.text + if xpath_string: + # Store the XPath expression and its version + binding.signatures.setdefault('elements', []).append({ + 'xpath': xpath_string, + 'xpath_version': xpath_version + }) + + # If you want to merge multiple SignedElements assertions as per the specification + if 'elements' in binding.signatures: + # Remove duplicates while preserving order + unique_elements = [] + seen = set() + for element in binding.signatures['elements']: + element_tuple = (element['xpath'], element['xpath_version']) + if element_tuple not in seen: + seen.add(element_tuple) + unique_elements.append(element) + binding.signatures['elements'] = unique_elements + logger.debug("Adding binding: %s", binding.name.text) result[binding.name.text] = binding break diff --git a/src/zeep/wsse/signature.py b/src/zeep/wsse/signature.py index 3459fa20..25a7fda7 100644 --- a/src/zeep/wsse/signature.py +++ b/src/zeep/wsse/signature.py @@ -272,6 +272,17 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature header.find(QName(node["Namespace"], node["Name"])), digest_method, ) + # Sign elements specified by XPath expressions + for element in signatures.get("elements", []): + _sign_node_by_xpath( + ctx, + signature, + envelope, + element["xpath"], + element["xpath_version"], + digest_method + ) + ctx.sign(signature) # Place the X509 data inside a WSSE SecurityTokenReference within @@ -281,6 +292,20 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature sec_token_ref = etree.SubElement(key_info, QName(ns.WSSE, "SecurityTokenReference")) return security, sec_token_ref, x509_data +def _sign_node_by_xpath(ctx, signature, envelope, xpath, xpath_version, digest_method): + # Create an XPath evaluator with the appropriate version + if xpath_version == '1.0': + evaluator = etree.XPath(xpath, namespaces=envelope.nsmap) + else: + evaluator = etree.XPath(xpath, namespaces=envelope.nsmap, extension={('http://www.w3.org/TR/1999/REC-xpath-19991116', 'version'): xpath_version}) + + # Evaluate the XPath expression + nodes = evaluator(envelope) + + # Sign each node found by the XPath expression + for node in nodes: + _sign_node(ctx, signature, node, digest_method) + def _sign_envelope_with_key( envelope, key, signature_method, digest_method, signatures=None diff --git a/tests/test_wsdl.py b/tests/test_wsdl.py index 627d72e9..cc2721e5 100644 --- a/tests/test_wsdl.py +++ b/tests/test_wsdl.py @@ -1377,7 +1377,7 @@ def test_parse_bindings_signed_unknown(): document = wsdl.Document(content, None) assert document.bindings[ "{http://tests.python-zeep.org/xsd-main}TestBinding" - ].signatures == {"body": False, "everything": False, "header": []} + ].signatures == {"body": False, "everything": False, "header": [], "elements": []} def test_parse_bindings_signed_body(): policy = """ @@ -1391,7 +1391,7 @@ def test_parse_bindings_signed_body(): document = wsdl.Document(content, None) assert document.bindings[ "{http://tests.python-zeep.org/xsd-main}TestBinding" - ].signatures == {"body": True, "everything": False, "header": []} + ].signatures == {"body": True, "everything": False, "header": [], "elements": []} def test_parse_bindings_signed_everything(): @@ -1404,7 +1404,7 @@ def test_parse_bindings_signed_everything(): document = wsdl.Document(content, None) assert document.bindings[ "{http://tests.python-zeep.org/xsd-main}TestBinding" - ].signatures == {"body": True, "everything": True, "header": []} + ].signatures == {"body": True, "everything": True, "header": [], "elements": []} def test_parse_bindings_signed_headers(): @@ -1423,12 +1423,32 @@ def test_parse_bindings_signed_headers(): "body": False, "everything": False, "header": [{"Name": "To", "Namespace": "http://www.w3.org/2005/08/addressing"}], + "elements": [] } +def test_parse_bindings_signed_elements(): + policy = """ + + + //wsse:Security/wsu:Timestamp + + + """ + content = StringIO(BASE_WSDL.format(policy=policy).strip()) + document = wsdl.Document(content, None) + assert document.bindings[ + "{http://tests.python-zeep.org/xsd-main}TestBinding" + ].signatures == { + "body": False, + "everything": False, + "header": [], + "elements": [{"xpath": "//wsse:Security/wsu:Timestamp", "xpath_version": "http://www.w3.org/TR/1999/REC-xpath-19991116"}] + } + def test_parse_bindings_signed_nothing(): content = StringIO(BASE_WSDL.format(policy="").strip()) document = wsdl.Document(content, None) assert document.bindings[ "{http://tests.python-zeep.org/xsd-main}TestBinding" - ].signatures == {"body": False, "everything": False, "header": []} + ].signatures == {"body": False, "everything": False, "header": [], "elements": []}