-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathbuiltins.py
1916 lines (1416 loc) · 59 KB
/
builtins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
These are the built-in "pipes" - functions that can be used to put together a processing pipeling for pyFF.
"""
import base64
import hashlib
import json
import operator
import os
import re
import sys
import traceback
from copy import deepcopy
from datetime import datetime
from io import BytesIO
from str2bool import str2bool
from typing import Dict, Optional
import ipaddress
import six
import xmlsec
from lxml import etree
from lxml.etree import DocumentInvalid
from six.moves.urllib_parse import quote_plus, urlparse
from pyff.constants import NS
from pyff.decorators import deprecated
from pyff.exceptions import MetadataException
from pyff.logs import get_log
from pyff.pipes import PipeException, PipelineCallback, Plumbing, pipe, registry
from pyff.samlmd import (
annotate_entity,
discojson_sp_t,
discojson_sp_attr_t,
discojson_t,
entitiesdescriptor,
find_in_document,
iter_entities,
resolve_entities,
set_entity_attributes,
set_nodecountry,
set_pubinfo,
set_reginfo,
sort_entities,
)
from pyff.utils import (
datetime2iso,
dumptree,
duration2timedelta,
hash_id,
iso2datetime,
parse_xml,
root,
safe_write,
total_seconds,
utc_now,
validate_document,
with_tree,
xslt_transform,
)
__author__ = 'leifj'
FILESPEC_REGEX = r'([^ \t\n\r\f\v]+)\s+as\s+([^ \t\n\r\f\v]+)'
log = get_log(__name__)
@pipe
def dump(req: Plumbing.Request, *opts):
"""
Print a representation of the entities set on stdout. Useful for testing.
:param req: The request
:param opts: Options (unused)
:return: None
"""
if req.t is not None:
print(dumptree(req.t))
else:
print("<EntitiesDescriptor xmlns=\"{}\"/>".format(NS['md']))
@pipe(name="map")
def _map(req: Plumbing.Request, *opts):
"""
loop over the entities in a selection
:param req:
:param opts:
:return: None
**Examples**
.. code-block:: yaml
- map:
- ...statements...
Executes a set of statements in parallell (using a thread pool).
"""
def _p(e):
entity_id = e.get('entityID')
ip = Plumbing(pipeline=req.args, pid="{}.each[{}]".format(req.plumbing.pid, entity_id))
ireq = Plumbing.Request(ip, req.md, t=e, scheduler=req.scheduler)
ireq.set_id(entity_id)
ireq.set_parent(req)
return ip.iprocess(ireq)
from multiprocessing.pool import ThreadPool
pool = ThreadPool()
result = pool.map(_p, iter_entities(req.t), chunksize=10)
log.info("processed {} entities".format(len(result)))
@pipe(name="then")
def _then(req: Plumbing.Request, *opts):
"""
Call a named 'when' clause and return - akin to macro invocations for pyFF
"""
for cb in [PipelineCallback(p, req, store=req.md.store) for p in opts]:
req.t = cb(req.t)
return req.t
@pipe(name="log_entity")
def _log_entity(req: Plumbing.Request, *opts):
"""
log the request id as it is processed (typically the entity_id)
"""
log.info(str(req.id))
return req.t
@pipe(name="print")
def _print_t(req: Plumbing.Request, *opts):
"""
Print whatever is in the active tree without transformation
:param req: The request
:param opts: Options (unused)
:return: None
**Examples**
.. code-block:: yaml
- print
output: "somewhere.foo"
"""
fn = None
if isinstance(req.args, dict):
fn = req.args.get('output', None)
if fn is not None:
safe_write(fn, req.t)
else:
print(req.t)
@pipe
def end(req: Plumbing.Request, *opts):
"""
Exit with optional error code and message.
:param req: The request
:param opts: Options (unused)
:return: None
**Examples**
.. code-block:: yaml
- end
- unreachable
**Warning** This is very bad if used with pyffd - the server will stop running. If you just want to
break out of the pipeline, use break instead.
"""
code = 0
if isinstance(req.args, dict):
code = req.args.get('code', 0)
msg = req.args.get('message', None)
if msg is not None:
print(msg)
sys.exit(code)
@pipe
def fork(req: Plumbing.Request, *opts):
"""
Make a copy of the working tree and process the arguments as a pipleline. This essentially resets the working
tree and allows a new plumbing to run. Useful for producing multiple outputs from a single source.
:param req: The request
:param opts: Options (unused)
:return: None
**Examples**
.. code-block:: yaml
- select # select all entities
- fork:
- certreport
- publish:
output: "/tmp/annotated.xml"
- fork:
- xslt:
stylesheet: tidy.xml
- publish:
output: "/tmp/clean.xml"
The second fork in this example is strictly speaking not necessary since the main plumbing is still active
but it may help to structure your plumbings this way.
**Merging**
Normally the result of the "inner" plumbing is disgarded - unless published or emit:ed to a calling client
in the case of the MDX server - but by adding 'merge' to the options with an optional 'merge strategy' the
behaviour can be changed to merge the result of the inner pipeline back to the parent working document.
The default merge strategy is 'replace_existing' which replaces each EntityDescriptor found in the resulting
document in the parent document (using the entityID as a pointer). Any python module path ('a.mod.u.le:callable')
ending in a callable is accepted. If the path doesn't contain a ':' then it is assumed to reference one of the
standard merge strategies in pyff.merge_strategies.
For instance the following block can be used to set an attribute on a single entity:
.. code-block:: yaml
- fork merge:
- select: http://sp.example.com/shibboleth-sp
- setattr:
attribute: value
Note that unless you have a select statement before your fork merge you'll be merging into an empty
active document which with the default merge strategy of replace_existing will result in an empty
active document. To avoid this do a select before your fork, thus:
.. code-block:: yaml
- select
- fork merge:
- select: http://sp.example.com/shibboleth-sp
- setattr:
attribute: value
**parsecopy**
Due to a hard to find bug, fork which uses deepcopy can lose some namespaces. The parsecopy argument is a workaround.
It uses a brute force serialisation and deserialisation to get around the bug.
.. code-block:: yaml
- select # select all entities
- fork parsecopy:
- certreport
- publish:
output: "/tmp/annotated.xml"
- fork:
- xslt:
stylesheet: tidy.xml
- publish:
output: "/tmp/clean.xml"
"""
nt = None
if req.t is not None:
if 'parsecopy' in opts:
nt = root(parse_xml(BytesIO(dumptree(req.t))))
else:
nt = deepcopy(req.t)
if not isinstance(req.args, list):
raise ValueError('Non-list arguments to "fork" not allowed')
ip = Plumbing(pipeline=req.args, pid=f'{req.plumbing.pid}.fork')
ireq = Plumbing.Request(ip, req.md, t=nt, scheduler=req.scheduler)
ireq.set_id(req.id)
ireq.set_parent(req)
ip.iprocess(ireq)
if req.t is not None and ireq.t is not None and len(root(ireq.t)) > 0:
if 'merge' in opts:
sn = "pyff.merge_strategies:replace_existing"
if opts[-1] != 'merge':
sn = opts[-1]
req.md.store.merge(req.t, ireq.t, strategy_name=sn)
return req.t
@deprecated(reason="any pipeline has been replace by other behaviour")
@pipe(name='any')
def _any(lst, d):
for x in lst:
if x in d:
if type(d) == dict:
return d[x]
else:
return True
return False
@pipe(name='break')
def _break(req: Plumbing.Request, *opts):
"""
Break out of a pipeline.
:param req: The request
:param opts: Options (unused)
:return: None
This sets the 'done' request property to True which causes the pipeline to terminate at that point. The method name
is '_break' but the keyword is 'break' to avoid conflicting with python builtin methods.
**Examples**
.. code-block:: yaml
- one
- two
- break
- unreachable
"""
req.done = True
return req.t
@pipe(name='pipe')
def _pipe(req: Plumbing.Request, *opts):
"""
Run the argument list as a pipleine.
:param req: The request
:param opts: Options (unused)
:return: None
Unlike fork, pipe does not copy the working document but instead operates on the current active document. The done
request property is reset to False after the pipeline has been processed. This allows for a classical switch/case
flow using the following construction:
.. code-block:: yaml
- pipe:
- when a:
- one
- break
- when b:
- two
- break
In this case if 'a' is present in the request state, then 'one' will be executed and the 'when b' condition will not
be tested at all. Note that at the topmost level the pipe is implicit and may be left out.
.. code-block:: yaml
- pipe:
- one
- two
is equivalent to
.. code-block:: yaml
- one
- two
"""
if not isinstance(req.args, list):
raise ValueError('Non-list arguments to "pipe" not allowed')
ot = Plumbing(pipeline=req.args, pid=f'{req.plumbing.id}.pipe').iprocess(req)
req.done = False
return ot
@pipe
def when(req: Plumbing.Request, condition: str, *values):
"""
Conditionally execute part of the pipeline.
:param req: The request
:param condition: The condition key
:param values: The condition values
:return: None
The inner pipeline is executed if the at least one of the condition values is present for the specified key in
the request state.
**Examples**
.. code-block:: yaml
- when foo
- something
- when bar bill
- other
The condition operates on the state: if 'foo' is present in the state (with any value), then the something branch is
followed. If 'bar' is present in the state with the value 'bill' then the other branch is followed.
"""
c = req.state.get(condition, None)
if c is None:
log.debug(f'Condition {repr(condition)} not present in state {req.state}')
if c is not None and (not values or _any(values, c)):
if not isinstance(req.args, list):
raise ValueError('Non-list arguments to "when" not allowed')
return Plumbing(pipeline=req.args, pid="%s.when" % req.plumbing.id).iprocess(req)
return req.t
@pipe
def info(req: Plumbing.Request, *opts):
"""
Dumps the working document on stdout. Useful for testing.
:param req: The request
:param opts: Options (unused)
:return: None
"""
if req.t is None:
raise PipeException("Your pipeline is missing a select statement.")
for e in req.t.xpath("//md:EntityDescriptor", namespaces=NS, smart_strings=False):
print(e.get('entityID'))
return req.t
@pipe
def sort(req: Plumbing.Request, *opts):
"""
Sorts the working entities by the value returned by the given xpath.
By default, entities are sorted by 'entityID' when the 'order_by [xpath]' option is omitted and
otherwise as second criteria.
Entities where no value exists for a given xpath are sorted last.
:param req: The request
:param opts: Options: <order_by [xpath]> (see bellow)
:return: None
Options are put directly after "sort". E.g:
.. code-block:: yaml
- sort order_by [xpath]
**Options**
- order_by [xpath] : xpath expression selecting to the value used for sorting the entities.
"""
if req.t is None:
raise PipeException("Unable to sort empty document.")
_opts: Dict[str, Optional[str]] = dict(list(zip(opts[0:1], [" ".join(opts[1:])])))
if 'order_by' not in _opts:
_opts['order_by'] = None
sort_entities(req.t, _opts['order_by'])
return req.t
@pipe
def publish(req: Plumbing.Request, *opts):
"""
Publish the working document in XML form.
:param req: The request
:param opts: Options (unused)
:return: None
Publish takes one argument: path to a file where the document tree will be written.
**Examples**
.. code-block:: yaml
- publish: /tmp/idp.xml
The full set of options with their corresponding defaults:
.. code-block:: yaml
- publish:
output: output
raw: false
pretty_print: false
urlencode_filenames: false
hash_link: false
update_store: true
ext: .xml
If output is an existing directory, publish will write the working tree to a filename in the directory
based on the @entityID or @Name attribute. Unless 'raw' is set to true the working tree will be serialized
to a string before writing, with minimal formatting if 'pretty_print' is true (see 'indent' action for more
extensive control). If true, 'hash_link' will generate a symlink based on the hash id (sha1) for
compatibility with MDQ. Unless false, 'update_store' will cause the the current store to be updated with
the published artifact. Setting 'ext' allows control over the file extension.
"""
if req.t is None:
raise PipeException("Empty document submitted for publication")
if req.args is None:
raise PipeException("Publish must at least specify output")
if not isinstance(req.args, dict):
req.args = dict(output=req.args[0])
for t in ('raw', 'pretty_print', 'update_store', 'hash_link', 'urlencode_filenames'):
if t in req.args and type(req.args[t]) is not bool:
req.args[t] = str2bool(str(req.args[t]))
req.args.setdefault('ext', '.xml')
req.args.setdefault('output_file', 'output')
req.args.setdefault('raw', False)
req.args.setdefault('pretty_print', False)
req.args.setdefault('update_store', True)
req.args.setdefault('hash_link', False)
req.args.setdefault('urlencode_filenames', False)
output_file = req.args.get("output", None)
if not req.args.get('raw'):
try:
validate_document(req.t)
except DocumentInvalid as ex:
log.error(ex.error_log)
raise PipeException("XML schema validation failed")
def _nop(x):
return x
enc = _nop
if req.args.get('urlencode_filenames'):
enc = quote_plus
if output_file is not None:
output_file = output_file.strip()
resource_name = output_file
m = re.match(FILESPEC_REGEX, output_file)
if m:
output_file = m.group(1)
resource_name = m.group(2)
out = output_file
data = req.t
if not req.args.get('raw'):
data = dumptree(req.t, pretty_print=req.args.get('pretty_print'))
if os.path.isdir(output_file):
file_name = "{}{}".format(enc(req.id), req.args.get('ext'))
out = os.path.join(output_file, file_name)
safe_write(out, data, mkdirs=True)
if req.args.get('hash_link'):
link_name = "{}{}".format(enc(hash_id(req.id)), req.args.get('ext'))
link_path = os.path.join(output_file, link_name)
if os.path.exists(link_path):
os.unlink(link_path)
os.symlink(file_name, link_path)
else:
safe_write(out, data, mkdirs=True)
if req.args.get('update_store'):
req.store.update(req.t, tid=resource_name) # TODO maybe this is not the right thing to do anymore
return req.t
@pipe
@deprecated(reason="stats subsystem was removed")
def loadstats(req: Plumbing.Request, *opts):
"""
Log (INFO) information about the result of the last call to load
:param req: The request
:param opts: Options: (none)
:return: None
"""
log.info("pyff loadstats has been deprecated")
@pipe
@deprecated(reason="replaced with load")
def remote(req: Plumbing.Request, *opts):
"""
Deprecated. Calls :py:mod:`pyff.pipes.builtins.load`.
"""
return load(req, opts)
@pipe
@deprecated(reason="replaced with load")
def local(req: Plumbing.Request, *opts):
"""
Deprecated. Calls :py:mod:`pyff.pipes.builtins.load`.
"""
return load(req, opts)
@pipe
@deprecated(reason="replaced with load")
def _fetch(req: Plumbing.Request, *opts):
return load(req, *opts)
@pipe
def load(req: Plumbing.Request, *opts):
"""
General-purpose resource fetcher.
:param req: The request
:param _opts: Options: See "Options" below
:return: None
Supports both remote and local resources. Fetching remote resources is done in parallel using threads.
Note: When downloading remote files over HTTPS the TLS server certificate is not validated by default
Note: Default behaviour is to ignore metadata files or entities in MD files that cannot be loaded
Options are put directly after "load". E.g:
.. code-block:: yaml
- load fail_on_error True filter_invalid False:
- http://example.com/some_remote_metadata.xml
- local_file.xml
- /opt/directory_containing_md_files/
**Options**
Defaults are marked with (*)
- max_workers <5> : Number of parallel threads to use for loading MD files
- timeout <120> : Socket timeout when downloading files
- validate <True*|False> : When true downloaded metadata files are validated (schema validation)
- fail_on_error <True|False*> : Control whether an error during download, parsing or (optional)validation of a MD file
does not abort processing of the pipeline. When true a failure aborts and causes pyff
to exit with a non zero exit code. Otherwise errors are logged but ignored.
- filter_invalid <True*|False> : Controls validation behaviour. When true Entities that fail validation are filtered
I.e. are not loaded. When false the entire metadata file is either loaded, or not.
fail_on_error controls whether failure to validating the entire MD file will abort
processing of the pipeline.
- verify_tls <True|False*> : Controls the validation of the host's TLS certificate on fetching the resources
"""
_opts = dict(list(zip(opts[::2], opts[1::2])))
_opts.setdefault('timeout', 120)
_opts.setdefault('max_workers', 5)
_opts.setdefault('validate', "True")
_opts.setdefault('fail_on_error', "False")
_opts.setdefault('filter_invalid', "True")
_opts.setdefault('verify_tls', "False")
_opts['validate'] = bool(str2bool(_opts['validate']))
_opts['fail_on_error'] = bool(str2bool(_opts['fail_on_error']))
_opts['filter_invalid'] = bool(str2bool(_opts['filter_invalid']))
_opts['verify_tls'] = bool(str2bool(_opts['verify_tls']))
if not isinstance(req.args, list):
raise ValueError('Non-list args to "load" not allowed')
for x in req.args:
x = x.strip()
log.debug(f"load parsing '{x}'")
r = x.split()
assert len(r) in range(1, 8), PipeException(
"Usage: load resource [as url] [[verify] verification] [via pipeline] [cleanup pipeline]"
)
url = r.pop(0)
# Copy parent node opts as a starting point
child_opts = req.md.rm.opts.copy(update={"via": [], "cleanup": [], "verify": None, "alias": url})
while len(r) > 0:
elt = r.pop(0)
if elt in ("as", "verify", "via", "cleanup"):
# These elements have an argument
if len(r) > 0:
value = r.pop(0)
if elt == "as":
child_opts.alias = value
elif elt == "verify":
child_opts.verify = value
elif elt == "via":
child_opts.via.append(PipelineCallback(value, req, store=req.md.store))
elif elt == "cleanup":
child_opts.cleanup.append(PipelineCallback(value, req, store=req.md.store))
else:
raise ValueError(f'Unhandled resource option {elt}')
else:
raise PipeException(
"Usage: load resource [as url] [[verify] verification] [via pipeline]* [cleanup pipeline]*"
)
else:
child_opts.verify = elt
# override anything in child_opts with what is in opts
child_opts = child_opts.copy(update=_opts)
req.md.rm.add_child(url, child_opts)
log.debug("Refreshing all resources")
req.md.rm.reload(fail_on_error=bool(_opts['fail_on_error']))
def _select_args(req):
args = req.args
if args is None and 'select' in req.state:
args = [req.state.get('select')]
if args is None:
args = req.store.collections()
if args is None or not args:
args = req.store.lookup('entities')
if args is None or not args:
args = []
log.info("selecting using args: %s" % args)
return args
@pipe
def select(req: Plumbing.Request, *opts):
"""
Select a set of EntityDescriptor elements as the working document.
:param req: The request
:param opts: Options - see Options below
:return: returns the result of the operation as a working document
Select picks and expands elements (with optional filtering) from the active repository you setup using calls
to :py:mod:`pyff.pipes.builtins.load`. See :py:mod:`pyff.mdrepo.MDRepository.lookup` for a description of the syntax for
selectors.
**Examples**
.. code-block:: yaml
- select
This would select all entities in the active repository.
.. code-block:: yaml
- select: "/var/local-metadata"
This would select all entities found in the directory /var/local-metadata. You must have a call to local to load
entities from this directory before select statement.
.. code-block:: yaml
- select: "/var/local-metadata!//md:EntityDescriptor[md:IDPSSODescriptor]"
This would selects all IdPs from /var/local-metadata
.. code-block:: yaml
- select: "!//md:EntityDescriptor[md:SPSSODescriptor]"
This would select all SPs
Select statements are not cumulative - a select followed by another select in the plumbing resets the
working documents to the result of the second select.
Most statements except local and remote depend on having a select somewhere in your plumbing and will
stop the plumbing if the current working document is empty. For instance, running
.. code-block:: yaml
- select: "!//md:EntityDescriptor[md:SPSSODescriptor]"
would terminate the plumbing at select if there are no SPs in the local repository. This is useful in
combination with fork for handling multiple cases in your plumbings.
Options are put directly after "select". E.g:
.. code-block:: yaml
- select as /foo-2.0 dedup True: "!//md:EntityDescriptor[md:IDPSSODescriptor]"
**Options**
Defaults are marked with (*)
- as <name> : The 'as' keyword allows a select to be stored as an alias in the local repository. For instance
.. code-block:: yaml
- select as /foo-2.0: "!//md:EntityDescriptor[md:IDPSSODescriptor]"
would allow you to use /foo-2.0.json to refer to the JSON-version of all IdPs in the current repository.
Note that you should not include an extension in your "as foo-bla-something" since that would make your
alias invisible for anything except the corresponding mime type.
- dedup <True*|False> : Whether to deduplicate the results by entityID.
Note: When select is used after a load pipe with more than one source, if dedup is set to True
and there are entity properties that may differ from one source to another, these will be squashed
rather than merged.
"""
opt_names = ('as', 'dedup')
if len(opts) % 2 == 0:
_opts = dict(list(zip(opts[::2], opts[1::2])))
else:
_opts = {}
for i in range(0, len(opts), 2):
if opts[i] in opt_names:
_opts[opts[i]] = opts[i + 1]
else:
_opts['as'] = opts[i]
if i + 1 < len(opts):
more_opts = opts[i + 1:]
_opts.update(dict(list(zip(more_opts[::2], more_opts[1::2]))))
break
_opts.setdefault('dedup', "True")
_opts.setdefault('name', req.plumbing.id)
_opts['dedup'] = bool(str2bool(_opts['dedup']))
args = _select_args(req)
name = _opts['name']
dedup = _opts['dedup']
if len(opts) > 0:
if opts[0] != 'as' and len(opts) == 1:
name = opts[0]
if opts[0] == 'as' and len(opts) == 2:
name = opts[1]
entities = resolve_entities(args, lookup_fn=req.md.store.select, dedup=dedup)
if req.state.get('match', None): # TODO - allow this to be passed in via normal arguments
match = req.state['match']
if isinstance(match, six.string_types):
query = [match.lower()]
def _strings(elt):
lst = []
for attr in [
'.//{%s}UIInfo/{%s}DisplayName' % (NS['mdui'], NS['mdui']),
'{%s}ServiceName' % NS['md'],
'{%s}OrganizationDisplayName' % NS['md'],
'{%s}OrganizationName' % NS['md'],
'.//{%s}UIInfo/{%s}Keywords' % (NS['mdui'], NS['mdui']),
'{%s}Scope' % NS['shibmd'],
]:
lst.extend([s.text for s in elt.iterfind(attr)])
lst.append(elt.get('entityID'))
return [item for item in lst if item is not None]
def _ip_networks(elt):
return [ipaddress.ip_network(x.text) for x in elt.iter('{%s}IPHint' % NS['mdui'])]
def _match(q, elt):
q = q.strip()
if ':' in q or '.' in q:
try:
nets = _ip_networks(elt)
for net in nets:
if ipaddress.ip_adress(q) in net:
return net
except ValueError:
pass
if q is not None and len(q) > 0:
tokens = _strings(elt)
p = re.compile(r'\b{}'.format(q), re.IGNORECASE)
for tstr in tokens:
if p.search(tstr):
return tstr
return None
log.debug("matching {} in {} entities".format(match, len(entities)))
entities = list(filter(lambda e: _match(match, e) is not None, entities))
log.debug("returning {} entities after match".format(len(entities)))
ot = entitiesdescriptor(entities, name)
if ot is None:
raise PipeException("empty select - stop")
if req.plumbing.id != name:
log.debug("storing synthetic collection {}".format(name))
req.store.update(ot, name)
return ot
@pipe(name="filter")
def _filter(req: Plumbing.Request, *opts):
"""
Refines the working document by applying a filter. The filter expression is a subset of the
select semantics and syntax:
.. code-block:: yaml
- filter:
- "!//md:EntityDescriptor[md:SPSSODescriptor]"
- "https://idp.example.com/shibboleth"
This would select all SPs and any entity with entityID "https://idp.example.com/shibboleth"
from the current working document and return as the new working document. Filter also supports
the "as <alias>" construction from select allowing new synthetic collections to be created
from filtered documents.
"""
if req.t is None:
raise PipeException("Unable to filter on an empty document - use select first")
alias = False
if len(opts) > 0:
if opts[0] != 'as' and len(opts) == 1:
name = opts[0]
alias = True
if opts[0] == 'as' and len(opts) == 2:
name = opts[1]
alias = True
name = req.plumbing.id
args = req.args
if args is None or not args:
args = []
ot = entitiesdescriptor(args, name, lookup_fn=lambda member: find_in_document(req.t, member), copy=False)
if alias:
req.store.update(ot, name)
req.t = None
if ot is None:
raise PipeException("empty filter - stop")
# print "filter returns %s" % [e for e in iter_entities(ot)]
return ot
@pipe
def pick(req: Plumbing.Request, *opts):
"""
Select a set of EntityDescriptor elements as a working document but don't validate it.
:param req: The request
:param opts: Options (unused)
:return: returns the result of the operation as a working document
Useful for testing. See py:mod:`pyff.pipes.builtins.pick` for more information about selecting the document.
"""
args = _select_args(req)
ot = entitiesdescriptor(args, req.plumbing.id, lookup_fn=req.md.store.lookup, validate=False)
if ot is None:
raise PipeException("empty select '%s' - stop" % ",".join(args))
return ot
@pipe
def first(req: Plumbing.Request, *opts):
"""
If the working document is a single EntityDescriptor, strip the outer EntitiesDescriptor element and return it.
:param req: The request
:param opts: Options (unused)
:return: returns the first entity descriptor if the working document only contains one
Sometimes (eg when running an MDX pipeline) it is usually expected that if a single EntityDescriptor is being returned
then the outer EntitiesDescriptor is stripped. This method does exactly that.
"""
if req.t is None:
raise PipeException("Your pipeline is missing a select statement.")
gone = object() # sentinel
entities = iter_entities(req.t)
one = next(entities, gone)
if one is gone:
return req.t # empty tree - return it as is
two = next(entities, gone) # one EntityDescriptor in tree - return just that one
if two is gone:
return one
return req.t
@pipe(name='discojson')
def _discojson(req: Plumbing.Request, *opts):
"""
Return a discojuice-compatible json representation of the tree