Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

signed parts #1428

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/zeep/ns.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
MIME = "http://schemas.xmlsoap.org/wsdl/mime/"

WSA = "http://www.w3.org/2005/08/addressing"
WSP = "http://schemas.xmlsoap.org/ws/2004/09/policy"
SP = "http://docs.oasis-open.org/ws-sx/ws-securitypolicy/200702"


DS = "http://www.w3.org/2000/09/xmldsig#"
Expand Down
8 changes: 6 additions & 2 deletions src/zeep/wsdl/bindings/soap.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ def _create(self, operation, args, kwargs, client=None, options=None):
if client.wsse:
if isinstance(client.wsse, list):
for wsse in client.wsse:
envelope, http_headers = wsse.apply(envelope, http_headers)
envelope, http_headers = wsse.apply(
envelope, http_headers, operation_obj.binding.signatures
)
else:
envelope, http_headers = client.wsse.apply(envelope, http_headers)
envelope, http_headers = client.wsse.apply(
envelope, http_headers, operation_obj.binding.signatures
)

# Add extra http headers from the setings object
if client.settings.extra_http_headers:
Expand Down
6 changes: 6 additions & 0 deletions src/zeep/wsdl/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ def __init__(self, wsdl, name, port_name):
self.port_type = None
self.wsdl = wsdl
self._operations = {}
self.signatures = {
"header": [], # Parts of header, that should be signed
"elements": [], # Arbitrary XPath elements that should be signed
"body": False, # If body should be signed
"everything": False, # If every header should be signed
}

def resolve(self, definitions: Definition) -> None:
self.port_type = definitions.get("port_types", self.port_name.text)
Expand Down
71 changes: 70 additions & 1 deletion src/zeep/wsdl/wsdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from lxml import etree

from zeep import ns
from zeep.exceptions import IncompleteMessage
from zeep.loader import (
absolute_location,
Expand All @@ -30,7 +31,12 @@
if typing.TYPE_CHECKING:
from zeep.transports import Transport

NSMAP = {"wsdl": "http://schemas.xmlsoap.org/wsdl/"}
NSMAP = {
'wsdl': ns.WSDL,
'wsp': ns.WSP,
'sp': ns.SP,
'wsu': ns.WSU,
}

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -440,6 +446,69 @@ def parse_binding(
logger.debug("Ignoring binding: %s", exc)
continue

# Begin heuristics for signed parts...
binding_policy = binding.name.localname + "_policy"
signed_parts = doc.xpath(
'wsp:Policy[@wsu:Id="{}"]//sp:SignedParts'.format(
binding_policy
),
namespaces=NSMAP,
)
# Initialize a set to keep track of all unique headers
all_headers = set()

for sign in signed_parts:
if len(sign.getchildren()) == 0:
# No children, we should sign everything
binding.signatures["body"] = True
binding.signatures["everything"] = True
break

for child in sign.iterchildren():
if len(child.items()) > 0:
# Header ...
part = frozenset({attr: value for attr, value in child.items()}.items())
all_headers.add(part)
elif child.tag.split("}")[-1].lower() == "body":
# Body ...
binding.signatures["body"] = True

# If we didn't set "everything" to True, update the headers
if not binding.signatures.get("everything", False):
binding.signatures["header"] = [dict(header) for header in all_headers]

# Begin parsing SignedElements assertions
signed_elements = doc.xpath(
'wsp:Policy[@wsu:Id="{}"]//sp:SignedElements'.format(binding_policy),
namespaces=NSMAP,
)

for signed_element in signed_elements:
xpath_version = signed_element.get('XPathVersion', 'http://www.w3.org/TR/1999/REC-xpath-19991116') # Default to XPath 1.0 if not specified

xpath_expressions = signed_element.xpath('sp:XPath', namespaces=NSMAP)

for xpath in xpath_expressions:
xpath_string = xpath.text
if xpath_string:
# Store the XPath expression and its version
binding.signatures.setdefault('elements', []).append({
'xpath': xpath_string,
'xpath_version': xpath_version
})

# If you want to merge multiple SignedElements assertions as per the specification
if 'elements' in binding.signatures:
# Remove duplicates while preserving order
unique_elements = []
seen = set()
for element in binding.signatures['elements']:
element_tuple = (element['xpath'], element['xpath_version'])
if element_tuple not in seen:
seen.add(element_tuple)
unique_elements.append(element)
binding.signatures['elements'] = unique_elements

logger.debug("Adding binding: %s", binding.name.text)
result[binding.name.text] = binding
break
Expand Down
90 changes: 73 additions & 17 deletions src/zeep/wsse/signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from zeep import ns
from zeep.exceptions import SignatureVerificationFailed
from zeep.utils import detect_soap_env
from zeep.wsdl.utils import get_or_create_header
from zeep.wsse.utils import ensure_id, get_security_header

try:
Expand All @@ -24,6 +25,8 @@

# SOAP envelope
SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/"
# Namespaces omitted from signing
OMITTED_HEADERS = [ns.WSSE]


def _read_file(f_name):
Expand Down Expand Up @@ -61,10 +64,10 @@ def __init__(
self.digest_method = digest_method
self.signature_method = signature_method

def apply(self, envelope, headers):
def apply(self, envelope, headers, signatures=None):
key = _make_sign_key(self.key_data, self.cert_data, self.password)
_sign_envelope_with_key(
envelope, key, self.signature_method, self.digest_method
envelope, key, self.signature_method, self.digest_method, signatures
)
return envelope, headers

Expand Down Expand Up @@ -99,10 +102,10 @@ class BinarySignature(Signature):

Place the key information into BinarySecurityElement."""

def apply(self, envelope, headers):
def apply(self, envelope, headers, signatures=None):
key = _make_sign_key(self.key_data, self.cert_data, self.password)
_sign_envelope_with_key_binary(
envelope, key, self.signature_method, self.digest_method
envelope, key, self.signature_method, self.digest_method, signatures
)
return envelope, headers

Expand All @@ -123,6 +126,7 @@ def sign_envelope(
password=None,
signature_method=None,
digest_method=None,
signatures=None,
):
"""Sign given SOAP envelope with WSSE sig using given key and cert.

Expand Down Expand Up @@ -213,10 +217,12 @@ def sign_envelope(
"""
# Load the signing key and certificate.
key = _make_sign_key(_read_file(keyfile), _read_file(certfile), password)
return _sign_envelope_with_key(envelope, key, signature_method, digest_method)
return _sign_envelope_with_key(
envelope, key, signature_method, digest_method, signatures
)


def _signature_prepare(envelope, key, signature_method, digest_method):
def _signature_prepare(envelope, key, signature_method, digest_method, signatures=None):
"""Prepare envelope and sign."""
soap_env = detect_soap_env(envelope)

Expand All @@ -236,15 +242,47 @@ def _signature_prepare(envelope, key, signature_method, digest_method):

# Insert the Signature node in the wsse:Security header.
security = get_security_header(envelope)
security.insert(0, signature)
security.append(signature)

# Perform the actual signing.
ctx = xmlsec.SignatureContext()
ctx.key = key
_sign_node(ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method)
timestamp = security.find(QName(ns.WSU, "Timestamp"))
if timestamp != None:
_sign_node(ctx, signature, timestamp, digest_method)
# Preserve the previous behaviour for backwards compatibility
if signatures is None:
_sign_node(ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method)
timestamp = security.find(QName(ns.WSU, "Timestamp"))
if timestamp != None:
_sign_node(ctx, signature, timestamp, digest_method)
else:
if signatures.get("body") or signatures.get("everything"):
_sign_node(
ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method
)
header = get_or_create_header(envelope)
if signatures.get("everything"):
for node in header.iterchildren():
# Everything doesn't mean everything ...
if node.nsmap.get(node.prefix) not in OMITTED_HEADERS:
_sign_node(ctx, signature, node, digest_method)
else:
for node in signatures.get("header", []):
_sign_node(
ctx,
signature,
header.find(QName(node["Namespace"], node["Name"])),
digest_method,
)
# Sign elements specified by XPath expressions
for element in signatures.get("elements", []):
_sign_node_by_xpath(
ctx,
signature,
envelope,
element["xpath"],
element["xpath_version"],
digest_method
)

ctx.sign(signature)

# Place the X509 data inside a WSSE SecurityTokenReference within
Expand All @@ -254,17 +292,35 @@ def _signature_prepare(envelope, key, signature_method, digest_method):
sec_token_ref = etree.SubElement(key_info, QName(ns.WSSE, "SecurityTokenReference"))
return security, sec_token_ref, x509_data


def _sign_envelope_with_key(envelope, key, signature_method, digest_method):
def _sign_node_by_xpath(ctx, signature, envelope, xpath, xpath_version, digest_method):
# Create an XPath evaluator with the appropriate version
if xpath_version == '1.0':
evaluator = etree.XPath(xpath, namespaces=envelope.nsmap)
else:
evaluator = etree.XPath(xpath, namespaces=envelope.nsmap, extension={('http://www.w3.org/TR/1999/REC-xpath-19991116', 'version'): xpath_version})

# Evaluate the XPath expression
nodes = evaluator(envelope)

# Sign each node found by the XPath expression
for node in nodes:
_sign_node(ctx, signature, node, digest_method)


def _sign_envelope_with_key(
envelope, key, signature_method, digest_method, signatures=None
):
_, sec_token_ref, x509_data = _signature_prepare(
envelope, key, signature_method, digest_method
envelope, key, signature_method, digest_method, signatures=signatures
)
sec_token_ref.append(x509_data)


def _sign_envelope_with_key_binary(envelope, key, signature_method, digest_method):
def _sign_envelope_with_key_binary(
envelope, key, signature_method, digest_method, signatures=None
):
security, sec_token_ref, x509_data = _signature_prepare(
envelope, key, signature_method, digest_method
envelope, key, signature_method, digest_method, signatures=signatures
)
ref = etree.SubElement(
sec_token_ref,
Expand All @@ -285,7 +341,7 @@ def _sign_envelope_with_key_binary(envelope, key, signature_method, digest_metho
)
ref.attrib["URI"] = "#" + ensure_id(bintok)
bintok.text = x509_data.find(QName(ns.DS, "X509Certificate")).text
security.insert(1, bintok)
security.insert(0, bintok)
x509_data.getparent().remove(x509_data)


Expand Down
2 changes: 1 addition & 1 deletion src/zeep/wsse/username.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(
self.zulu_timestamp = zulu_timestamp
self.hash_password = hash_password

def apply(self, envelope, headers):
def apply(self, envelope, headers, operation_obj=None):
security = utils.get_security_header(envelope)

# The token placeholder might already exists since it is specified in
Expand Down
Loading