-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy path__init__.py
477 lines (384 loc) · 18.1 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# This code was inspired by https://github.com/andrewdyates/xmldsig
# and includes https://github.com/andrewdyates/rsa_x509_pem with
# permission from the author.
__author__ = 'leifj'
import six
from defusedxml import lxml
from lxml import etree as etree
import logging
import copy
import traceback
from lxml.builder import ElementMaker
from cryptography.x509 import ExtensionNotFound, BasicConstraints, load_pem_x509_certificate
from xmlsec.exceptions import XMLSigException
from xmlsec import constants
from xmlsec.utils import parse_xml, pem2b64, unescape_xml_entities, delete_elt, root_elt, b64d, b64e, etree_to_string
from xmlsec.Signer import Signer
import xmlsec.crypto
import pyconfig
NS = {'ds': 'http://www.w3.org/2000/09/xmldsig#'}
NSDefault = {None: 'http://www.w3.org/2000/09/xmldsig#'}
DS = ElementMaker(namespace=NS['ds'], nsmap=NSDefault)
log = logging.getLogger('xmlsec')
class Config(object):
"""
This class holds a set of configuration parameters (using pyconfig) for pyXMLSecurity:
:param default_signature_alg: The URI of the default signature algorithm (RSA_SHA256 by default)
:param default_digest_alg: The URI of the default digest algorithm (SHA256 by default)
:param default_c14n_alg: The URI of the default c14n algorithm (c14n exclusive by default)
:param debug_write_to_files: Set to True to dump certain XML traces to /tmp. Danger! Not for production!
:param same_document_is_root: Set to True to treat implicit null same-document-references as reference to the whole document.
:param id_attributes: A list of attributes to be used as 'id's. By default set to ['ID','id']
:param c14n_strip_ws: Set to True to strip whitespaces in c14n. Only use if you have very special needs.
Refer to the pyconfig documentation for information on how to override these in your own project.
"""
default_signature_alg = pyconfig.setting("xmlsec.default_signature_alg", constants.ALGORITHM_SIGNATURE_RSA_SHA256)
default_digest_alg = pyconfig.setting("xmlsec.default_digest_alg", constants.ALGORITHM_DIGEST_SHA256)
default_c14n_alg = pyconfig.setting("xmlsec.default_c14n_alg", constants.TRANSFORM_C14N_EXCLUSIVE)
debug_write_to_files = pyconfig.setting("xmlsec.config.debug_write_to_files", False)
same_document_is_root = pyconfig.setting("xmlsec.same_document_is_root", False)
id_attributes = pyconfig.setting("xmlsec.id_attributes", ['ID', 'id'])
c14n_strip_ws = pyconfig.setting("xmlsec.c14n_strip_ws", False)
config = Config()
def _implicit_same_document(t, sig):
if config.same_document_is_root:
return root_elt(copy.deepcopy(t))
else:
return copy.deepcopy(sig.getparent())
def _signed_value_pkcs1_v1_5(data, key_size, do_pad, hash_alg): # TODO Do proper asn1 CMS
"""Return unencrypted rsa-sha1 signature value `padded_digest` from `data`.
The resulting signed value will be in the form:
(01 | FF* | 00 | prefix | digest) [RSA-SHA1]
where "digest" is of the generated c14n xml for <SignedInfo>.
:param data: str of bytes to sign
:param key_size: key length (if known) in bits; => len(`data`) + 3
:param do_pad: Do PKCS1 (?) padding of the data - requires integer key_size
:param hash_alg: Hash algorithm as string
:returns: rsa-sha signature value of `data`
:type data: string
:type key_size: None | int
:type do_pad: bool
:type hash_alg: string
:rtype: string
"""
prefix = constants.ASN1_BER_ALG_DESIGNATOR_PREFIX.get(hash_alg)
if not prefix:
raise XMLSigException("Unknown hash algorithm %s" % hash_alg)
asn_digest = prefix + data
if do_pad:
# Pad to "one octet shorter than the RSA modulus" [RSA-SHA1]
# WARNING: key size is in bits, not bytes!
padded_size = key_size // 8 - 1
pad_size = padded_size - len(asn_digest) - 2
pad = b'\x01' + b'\xFF' * pad_size + b'\x00'
return pad + asn_digest
else:
return asn_digest
def _get_by_id(t, id_v):
for id_a in config.id_attributes:
log.debug("Looking for #%s using id attribute '%s'" % (id_v, id_a))
elts = t.xpath("//*[@%s='%s']" % (id_a, id_v))
if elts is not None and len(elts) > 0:
return elts[0]
return None
def _alg(elt):
"""
Return the xmldsig name of an Algorithm. Hopefully.
:returns: None or string
"""
return elt.get('Algorithm', None)
def _remove_child_comments(t):
#root = root_elt(t)
for c in t.iter():
if c.tag is etree.Comment or c.tag is etree.PI:
delete_elt(c)
return t
def _process_references(t, sig, verify_mode=True, sig_path=".//{%s}Signature" % NS['ds'], drop_signature=False):
"""
:returns: hash algorithm as string
"""
verified_objects = {}
for ref in sig.findall(".//{%s}Reference" % NS['ds']):
obj = None
hash_alg = None
uri = ref.get('URI', None)
if uri is None or uri == '#' or uri == '':
ref_obj = _implicit_same_document(t, sig)
if ref_obj is None:
raise XMLSigException("Unable to find reference while processing implicit same document reference")
ct = _remove_child_comments(ref_obj)
obj = root_elt(ct)
elif uri.startswith('#'):
ct = copy.deepcopy(t)
ref_obj = _get_by_id(ct, uri[1:])
if ref_obj is None:
raise XMLSigException("Unable to find reference while processing '%s'" % uri)
obj = _remove_child_comments(ref_obj)
else:
raise XMLSigException("Unknown reference %s" % uri)
if obj is None:
raise XMLSigException("Unable to dereference Reference URI='%s'" % uri)
obj_copy = obj
if verify_mode:
obj_copy = copy.deepcopy(obj)
if drop_signature:
for sig in obj_copy.findall(sig_path):
sig.getparent().remove(sig)
if config.debug_write_to_files:
with open("/tmp/foo-pre-transform.xml", "w") as fd:
fd.write(etree_to_string(obj))
for tr in ref.findall(".//{%s}Transform" % NS['ds']):
obj = _transform(_alg(tr), obj, tr=tr, sig_path=sig_path)
nslist = _find_nslist(tr)
if nslist is not None:
r = root_elt(t)
for nsprefix in nslist:
if nsprefix in r.nsmap:
obj_copy.nsmap[nsprefix] = r.nsmap[nsprefix]
if not isinstance(obj, six.string_types):
if config.debug_write_to_files:
with open("/tmp/foo-pre-serialize.xml", "w") as fd:
fd.write(etree_to_string(obj))
obj = _transform(constants.TRANSFORM_C14N_INCLUSIVE, obj)
if config.debug_write_to_files:
with open("/tmp/foo-obj.xml", "w") as fd:
if six.PY2:
obj = obj.encode('utf-8')
fd.write(obj)
hash_alg = _ref_digest(ref)
log.debug("using hash algorithm %s" % hash_alg)
digest = xmlsec.crypto._digest(obj, hash_alg)
log.debug("computed %s digest %s for ref %s" % (hash_alg, digest, uri))
dv = ref.find(".//{%s}DigestValue" % NS['ds'])
if verify_mode:
log.debug("found %s digest %s for ref %s" % (hash_alg, dv.text, uri))
computed_digest_binary = b64d(digest)
digest_binary = b64d(dv.text)
if digest_binary == computed_digest_binary: # no point in verifying signature if the digest doesn't match
verified_objects[ref] = obj_copy
else:
log.error("not returning ref %s - digest mismatch" % uri)
else: # signing - lets store the digest
log.debug("replacing digest in %s" % etree.tostring(dv))
dv.text = digest
if verify_mode:
return verified_objects
else:
return None
def _ref_digest(ref):
dm = ref.find(".//{%s}DigestMethod" % NS['ds'])
if dm is None:
raise XMLSigException("Unable to find DigestMethod for Reference@URI {!s}".format(ref.get('URI')))
alg_uri = _alg(dm)
if alg_uri is None:
raise XMLSigException("No suitable DigestMethod")
hash_alg = constants.sign_alg_xmldsig_digest_to_internal(alg_uri.lower())
return hash_alg
def _enveloped_signature(t, sig_path=".//{%s}Signature" % NS['ds']):
sig = t.find(sig_path)
if sig is not None:
delete_elt(sig)
if config.debug_write_to_files:
with open("/tmp/foo-env.xml", "w") as fd:
fd.write(etree_to_string(t))
return t
def _c14n(t, exclusive, with_comments, inclusive_prefix_list=None, schema=None):
"""
Perform XML canonicalization (c14n) on an lxml.etree.
:param t: XML as lxml.etree
:param exclusive: boolean
:param with_comments: boolean, keep comments or not
:param inclusive_prefix_list: List of namespaces to include (?)
:returns: XML as string (utf8)
"""
doc = t
if root_elt(doc).getparent() is not None:
xml_str = etree_to_string(doc)
doc = parse_xml(xml_str, remove_whitespace=config.c14n_strip_ws, remove_comments=not with_comments, schema=schema)
del xml_str
buf = six.text_type(
etree.tostring(doc,
method='c14n',
exclusive=exclusive,
with_comments=with_comments,
inclusive_ns_prefixes=inclusive_prefix_list),
'utf-8')
#u = unescape_xml_entities(buf.decode("utf8", 'strict')).encode("utf8").strip()
assert buf[0] == '<'
assert buf[-1] == '>'
#if u[0] != '<':
# raise XMLSigException("C14N buffer doesn't start with '<'")
#if u[-1] != '>':
# raise XMLSigException("C14N buffer doesn't end with '>'")
#return u
return buf
def _find_nslist(tr):
nslist = None
if tr is not None:
elt = tr.find(".//{%s}InclusiveNamespaces" % 'http://www.w3.org/2001/10/xml-exc-c14n#')
if elt is not None:
nslist = elt.get('PrefixList', '').split()
return nslist
def _transform(uri, t, tr=None, schema=None, sig_path=".//{%s}Signature" % NS['ds']):
if uri == constants.TRANSFORM_ENVELOPED_SIGNATURE:
return _enveloped_signature(t, sig_path)
if uri == constants.TRANSFORM_C14N_EXCLUSIVE_WITH_COMMENTS:
return _c14n(t, exclusive=True, with_comments=True, inclusive_prefix_list=_find_nslist(tr), schema=schema)
if uri == constants.TRANSFORM_C14N_EXCLUSIVE:
return _c14n(t, exclusive=True, with_comments=False, inclusive_prefix_list=_find_nslist(tr), schema=schema)
if uri == constants.TRANSFORM_C14N_INCLUSIVE:
return _c14n(t, exclusive=False, with_comments=False, schema=schema)
raise XMLSigException("unknown or unimplemented transform %s" % uri)
def setID(ids):
constants.id_attributes = ids
def _verify(t, keyspec, sig_path=".//{%s}Signature" % NS['ds'], drop_signature=False):
"""
Verify the signature(s) in an XML document.
Throws an XMLSigException on any non-matching signatures.
:param t: XML as lxml.etree
:param keyspec: X.509 cert filename, string with fingerprint or X.509 cert as string
:returns: True if signature(s) validated, False if there were no signatures
"""
if config.debug_write_to_files:
with open("/tmp/foo-sig.xml", "w") as fd:
fd.write(etree_to_string(t))
validated = []
for sig in t.findall(sig_path):
try:
sv = sig.findtext(".//{%s}SignatureValue" % NS['ds'])
if not sv:
raise XMLSigException("No SignatureValue")
log.debug("SignatureValue: {!s}".format(sv))
log.debug("KeySpec: {!s}".format(keyspec))
this_cert = xmlsec.crypto.from_keyspec(keyspec, signature_element=sig)
log.debug("key size: {!s} bits".format(this_cert.keysize))
# Try verification by CA signed signing certificate
bc = None
try:
bc = this_cert.key.extensions.get_extension_for_class(BasicConstraints)
except ExtensionNotFound:
pass
else:
log.debug("CA=true cert")
# If this_cert a CA cert it is probably not the signing cert
if bc.value.ca is True:
# Find X509Certificate in signature that is child of the root element
cert = t.find(".//ds:Signature/ds:KeyInfo/ds:X509Data/ds:X509Certificate", namespaces=NS)
if cert is not None:
certspec = "-----BEGIN CERTIFICATE-----\n" + cert.text.strip() + "\n-----END CERTIFICATE-----"
embedded_cert = load_pem_x509_certificate(certspec.encode())
try:
embedded_cert.verify_directly_issued_by(this_cert.key)
except Exception:
pass
else:
this_cert.key = embedded_cert
si = sig.find(".//{%s}SignedInfo" % NS['ds'])
log.debug("Found signedinfo {!s}".format(etree.tostring(si)))
cm_alg = _cm_alg(si)
try:
sig_uri = _sig_uri(si)
except AttributeError:
raise XMLSigException("Failed to validate {!s} because of unsupported hash format".format(etree.tostring(sig)))
refmap = _process_references(t, sig, verify_mode=True, sig_path=sig_path, drop_signature=drop_signature)
for ref,obj in refmap.items():
log.debug("transform %s on %s" % (cm_alg, etree.tostring(si)))
sic = _transform(cm_alg, si)
logging.debug("SignedInfo C14N: %s" % sic)
if this_cert.do_digest: # assume pkcs1 v1.5 right now
hash_alg = constants.sign_alg_xmldsig_sig_to_hashalg(sig_uri)
digest = xmlsec.crypto._digest(sic, hash_alg)
log.debug("SignedInfo digest: %s" % digest)
b_digest = b64d(digest)
actual = _signed_value_pkcs1_v1_5(b_digest, this_cert.keysize, True, hash_alg)
else:
actual = sic
logging.debug("Verifying signature (computed) {} to (actual) {}".format(sv, actual))
if not this_cert.verify(b64d(sv), actual, sig_uri):
raise XMLSigException("Failed to validate {!s} using sig sig method {!s}".format(etree.tostring(sig), sig_uri))
validated.append(obj)
except (XMLSigException, ValueError) as ex: # we will try the next available signature
log.error(ex)
if not validated:
raise XMLSigException("No valid ds:Signature elements found")
return validated
def verify(t, keyspec, sig_path=".//{%s}Signature" % NS['ds']):
return len(_verify(t, keyspec, sig_path)) > 0
def verified(t, keyspec, sig_path=".//{%s}Signature" % NS['ds'], drop_signature=False):
return _verify(t, keyspec, sig_path, drop_signature)
## TODO - support transforms with arguments
def _signed_info_transforms(transforms):
ts = [DS.Transform(Algorithm=t) for t in transforms]
return DS.Transforms(*ts)
# standard enveloped signature
def _enveloped_signature_template(c14n_method,
digest_alg,
transforms,
reference_uri,
signature_alg):
return DS.Signature(
DS.SignedInfo(
DS.CanonicalizationMethod(Algorithm=c14n_method),
DS.SignatureMethod(Algorithm=signature_alg),
DS.Reference(
_signed_info_transforms(transforms),
DS.DigestMethod(Algorithm=digest_alg),
DS.DigestValue(),
URI=reference_uri
)
)
)
def add_enveloped_signature(t,
c14n_method=config.default_c14n_alg,
digest_alg=config.default_digest_alg,
signature_alg=config.default_signature_alg,
transforms=None,
reference_uri='',
pos=0):
if transforms is None:
transforms = (constants.TRANSFORM_ENVELOPED_SIGNATURE,
constants.TRANSFORM_C14N_EXCLUSIVE_WITH_COMMENTS)
tmpl = _enveloped_signature_template(c14n_method, digest_alg, transforms, reference_uri, signature_alg)
if pos == -1:
root_elt(t).append(tmpl)
else:
root_elt(t).insert(pos, tmpl)
return tmpl
def _is_template(sig):
si = sig.find(".//{%s}SignedInfo" % NS['ds'])
if si is None:
return False
dv = si.find(".//{%s}DigestValue" % NS['ds'])
if dv is not None and dv.text is not None and len(dv.text) > 0:
return False
sv = sig.find(".//{%s}SignatureValue" % NS['ds'])
if sv is not None and sv.text is not None and len(sv.text) > 0:
return False
return True
def sign(t, key_spec, cert_spec=None, reference_uri='', insert_index=0, sig_path=".//{%s}Signature" % NS['ds']):
"""
Sign an XML document. This means to 'complete' all Signature elements in the XML.
:param t: XML as lxml.etree
:param key_spec: private key reference, see xmlsec.crypto.from_keyspec() for syntax.
:param cert_spec: None or public key reference (to add cert to document),
see xmlsec.crypto.from_keyspec() for syntax.
:param sig_path: An xpath expression identifying the Signature template element
:param reference_uri: Envelope signature reference URI
:param insert_index: Insertion point for the Signature element,
Signature is inserted at beginning by default
:returns: XML as lxml.etree (for convenience, 't' is modified in-place)
"""
signer = Signer(key_spec=key_spec, cert_spec=cert_spec)
return signer.sign(t, reference_uri, insert_index, sig_path)
def _cm_alg(si):
cm = si.find(".//{%s}CanonicalizationMethod" % NS['ds'])
cm_alg = _alg(cm)
if cm is None or cm_alg is None:
raise XMLSigException("No CanonicalizationMethod")
return cm_alg
def _sig_uri(si):
sm = si.find(".//{%s}SignatureMethod" % NS['ds'])
sig_uri = _alg(sm)
if sm is None or sig_uri is None:
raise XMLSigException("No SignatureMethod")
return sig_uri