-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmmif.py
822 lines (707 loc) · 38.8 KB
/
mmif.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
"""
The :mod:`mmif` module contains the classes used to represent a full MMIF
file as a live Python object.
See the specification docs and the JSON Schema file for more information.
"""
import json
import math
import warnings
from collections import defaultdict
from datetime import datetime
from typing import List, Union, Optional, Dict, cast, Iterator
import jsonschema.validators
import mmif
from mmif import ThingTypesBase
from mmif.serialize.annotation import Annotation, Document
from mmif.serialize.model import MmifObject, DataList
from mmif.serialize.view import View
from mmif.vocabulary import AnnotationTypes, DocumentTypes
__all__ = ['Mmif']
class MmifMetadata(MmifObject):
"""
Basic MmifObject class to contain the top-level metadata of a MMIF file.
:param metadata_obj: the JSON data
"""
def __init__(self, metadata_obj: Optional[Union[bytes, str, dict]] = None, *_) -> None:
# TODO (krim @ 10/7/20): there could be a better name and a better way to give a value to this
self.mmif: str = f"http://mmif.clams.ai/{mmif.__specver__}"
self._required_attributes = ["mmif"]
super().__init__(metadata_obj)
class DocumentsList(DataList[Document]):
"""
DocumentsList object that implements :class:`mmif.serialize.model.DataList`
for :class:`mmif.serialize.document.Document`.
"""
_items: Dict[str, Document]
def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch
"""
Extends base ``_deserialize`` method to initialize ``items`` as a dict from
document IDs to :class:`mmif.serialize.document.Document` objects.
:param input_list: the JSON data that defines the list of documents
:return: None
"""
self._items = {item['properties']['id']: Document(item) for item in input_list}
def append(self, value: Document, overwrite=False) -> None:
"""
Appends a document to the list.
Fails if there is already a document with the same ID
in the list, unless ``overwrite`` is set to True.
:param value: the :class:`mmif.serialize.document.Document`
object to add
:param overwrite: if set to True, will overwrite an
existing document with the same ID
:raises KeyError: if ``overwrite`` is set to False and
a document with the same ID exists
in the list
:return: None
"""
super()._append_with_key(value.id, value, overwrite)
class ViewsList(DataList[View]):
"""
ViewsList object that implements :class:`mmif.serialize.model.DataList`
for :class:`mmif.serialize.view.View`.
"""
_items: Dict[str, View]
def __init__(self, mmif_obj: Optional[Union[bytes, str, list]] = None, parent_mmif=None, *_):
self._parent_mmif = parent_mmif
self.reserved_names.update(("_parent_mmif", "_id_counts"))
super().__init__(mmif_obj)
def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch
"""
Extends base ``_deserialize`` method to initialize ``items`` as a dict from
view IDs to :class:`mmif.serialize.view.View` objects.
:param input_list: the JSON data that defines the list of views
:return: None
"""
if input_list:
self._items = {item['id']: View(item, self._parent_mmif) for item in input_list}
def append(self, value: View, overwrite=False) -> None:
"""
Appends a view to the list.
Fails if there is already a view with the same ID
in the list, unless ``overwrite`` is set to True.
:param value: the :class:`mmif.serialize.view.View`
object to add
:param overwrite: if set to True, will overwrite an
existing view with the same ID
:raises KeyError: if ``overwrite`` is set to False and
a view with the same ID exists
in the list
:return: None
"""
super()._append_with_key(value.id, value, overwrite)
def get_last_contentful_view(self) -> Optional[View]:
"""
Returns the last view that is contentful, i.e., has no error or warning .
"""
for view in reversed(self._items.values()):
if 'error' not in view.metadata and 'warnings' not in view.metadata:
return view
class Mmif(MmifObject):
"""
MmifObject that represents a full MMIF file.
:param mmif_obj: the JSON data
:param validate: whether to validate the data against the MMIF JSON schema.
"""
def __init__(self, mmif_obj: Optional[Union[bytes, str, dict]] = None, *, validate: bool = True) -> None:
self.metadata: MmifMetadata = MmifMetadata()
self.documents: DocumentsList = DocumentsList()
self.views: ViewsList = ViewsList()
if validate:
self.validate(mmif_obj)
self.disallow_additional_properties()
self._attribute_classes = {
'metadata': MmifMetadata,
'documents': DocumentsList,
'views': ViewsList
}
self._required_attributes = ["metadata", "documents", "views"]
super().__init__(mmif_obj)
@staticmethod
def validate(json_str: Union[bytes, str, dict]) -> None:
"""
Validates a MMIF JSON object against the MMIF Schema.
Note that this method operates before processing by MmifObject._load_str,
so it expects @ and not _ for the JSON-LD @-keys.
:raises jsonschema.exceptions.ValidationError: if the input fails validation
:param json_str: a MMIF JSON dict or string
:return: None
"""
# NOTE that schema file first needs to be copied to resources directory
# this is automatically done via setup.py, so for users this shouldn't be a matter
if isinstance(json_str, bytes):
json_str = json_str.decode('utf8')
schema = json.loads(mmif.get_mmif_json_schema())
if isinstance(json_str, str):
json_str = json.loads(json_str)
jsonschema.validators.validate(json_str, schema)
def serialize(self, pretty: bool = False, sanitize: bool = False, autogenerate_capital_annotations=True) -> str:
"""
Serializes the MMIF object to a JSON string.
:param sanitize: If True, performs some sanitization of before returning
the JSON string. See :meth:`sanitize` for details.
:param autogenerate_capital_annotations: If True, automatically convert
any "pending" temporary properties from `Document` objects to
`Annotation` objects. See :meth:`generate_capital_annotations` for
details.
:param pretty: If True, returns string representation with indentation.
:return: JSON string of the MMIF object.
"""
if autogenerate_capital_annotations:
self.generate_capital_annotations()
# sanitization should be done after `Annotation` annotations are generated
if sanitize:
self.sanitize()
return super().serialize(pretty)
def _deserialize(self, input_dict: dict) -> None:
"""
Deserializes the MMIF JSON string into a Mmif object.
After *regular* deserialization, this method will perform the following
*special* handling of Annotation.properties that allows apps to access
Annotation/Document properties that are not encoded in the objects
themselves. This is to allow apps to access in a more intuitive way,
without having too much hassle to iterate views and manually collect the properties.
1. This will read in existing *view*-scoped properties from *contains*
metadata and attach them to the corresponding ``Annotation`` objects.
1. This will read in existing ``Annotation`` typed annotations and
attach the document-level properties to the ``Document`` objects,
using an ephemeral property dict.
"""
super()._deserialize(input_dict)
for view in self.views:
view._parent_mmif = self
# this dict will be populated with properties
# that are not encoded in individual annotations objects themselves
extrinsic_props = defaultdict(dict)
for at_type, type_lv_props in view.metadata.contains.items():
for prop_key, prop_value in type_lv_props.items():
extrinsic_props[at_type][prop_key] = prop_value
for ann in view.get_annotations():
## for "capital" Annotation properties
# first add all extrinsic properties to the Annotation objects
# as "ephemeral" properties
for prop_key, prop_value in extrinsic_props[ann.at_type].items():
ann._props_ephemeral[prop_key] = prop_value
# then, do the same to associated Document objects. Note that,
# in a view, it is guaranteed that all Annotation objects are not duplicates
if ann.at_type == AnnotationTypes.Annotation:
doc_id = ann.get_property('document')
try:
for prop_key, prop_value in ann.properties.items():
self.get_document_by_id(doc_id)._props_ephemeral[prop_key] = prop_value
except KeyError:
warnings.warn(f"Annotation {ann.id} (in view {view.id}) has a document ID {doc_id} that "
f"does not exist in the MMIF object. Skipping.", RuntimeWarning)
## caching start and end points for time-based annotations
# add quick access to `start` and `end` values if the annotation is using `targets` property
if 'targets' in ann.properties:
if 'start' in ann.properties or 'end' in ann.properties:
raise ValueError(f"Annotation {ann.id} (in view {view.id}) has `targets` and `start`/`end/` "
f"properties at the same time. Annotation anchors are ambiguous.")
ann._props_ephemeral['start'] = self._get_linear_anchor_point(ann, start=True)
ann._props_ephemeral['end'] = self._get_linear_anchor_point(ann, start=False)
## caching alignments
if ann.at_type == AnnotationTypes.Alignment:
self._cache_alignment(ann)
def _cache_alignment(self, alignment_ann: Annotation):
view = self.views.get_item(alignment_ann.parent)
if view is None:
warnings.warn(f"Alignment {alignment_ann.long_id} doesn't have a parent view, but it should.", RuntimeWarning)
return
## caching alignments
def _desprately_search_annotation_object(ann_short_id):
ann_long_id = f"{view.id}{self.id_delimiter}{ann_short_id}"
try:
return self.__getitem__(ann_long_id)
except KeyError:
return self.__getitem__(ann_short_id)
if all(map(lambda x: x in alignment_ann.properties, ('source', 'target'))):
source_ann = _desprately_search_annotation_object(alignment_ann.get_property('source'))
target_ann = _desprately_search_annotation_object(alignment_ann.get_property('target'))
if isinstance(source_ann, Annotation) and isinstance(target_ann, Annotation):
source_ann._cache_alignment(alignment_ann, target_ann)
target_ann._cache_alignment(alignment_ann, source_ann)
else:
warnings.warn(
f"Alignment {alignment_ann.long_id} has `source` and `target` properties that do not point to Annotation objects.",
RuntimeWarning)
def generate_capital_annotations(self):
"""
Automatically convert any "pending" temporary properties from
`Document` objects to `Annotation` objects . The generated `Annotation`
objects are then added to the last `View` in the views lists.
See https://github.com/clamsproject/mmif-python/issues/226 for rationale
behind this behavior and discussion.
"""
# this view will be the default kitchen sink for all generated annotations.
last_view = self.views.get_last_contentful_view()
# proceed only when there's at least one view
if last_view:
# this app name is used to check a view is generated by the "currently running" app.
# knowing the currently running app is important so that properties of `Document` objects generated by the
# current app can be properly recorded inside the `Document` objects (since they are "writable" to the
# current app), instead of being recorded in a separate `Annotation` object.
current_app = last_view.metadata.app
# to avoid duplicate property recording, this will be populated with
# existing Annotation objects from all existing views
existing_anns = defaultdict(lambda: defaultdict(dict))
# ideally, if we can "de-duplicate" props at `add_property()` time, that'd be more efficient,
# but that is impossible without looking for the target `document` across other views and top documents list
# new properties to record in the current serialization call
anns_to_write = defaultdict(dict)
for view in self.views:
doc_id = None
if AnnotationTypes.Annotation in view.metadata.contains:
if 'document' in view.metadata.contains[AnnotationTypes.Annotation]:
doc_id = view.metadata.contains[AnnotationTypes.Annotation]['document']
for ann in view.get_annotations(AnnotationTypes.Annotation):
if doc_id is None:
doc_id = ann.get_property('document')
# only if we are sure that the document ID is unique across all views... (with v_id prefix)
# TODO (krim @ 7/15/24): update id checking once https://github.com/clamsproject/mmif/issues/228 is resolved
if not any([doc_id == doc.id for doc in self.documents]) and Mmif.id_delimiter not in doc_id:
doc_id = f"{view.id}{Mmif.id_delimiter}{doc_id}"
existing_anns[doc_id].update(ann.properties)
for doc in view.get_documents():
anns_to_write[doc.long_id].update(doc._props_pending)
for doc in self.documents:
anns_to_write[doc.long_id].update(doc._props_pending)
# additional iteration of views, to find a proper view to add the
# generated annotations. If none found, use the last view as the kitchen sink
last_view_for_docs = defaultdict(lambda: last_view)
doc_ids = set(anns_to_write.keys())
for doc_id in doc_ids:
if len(last_view.annotations) == 0:
# meaning, this new app didn't generate any annotation except for these document properties
# thus, we should add capital annotations to the last (empty) view
last_view_for_docs[doc_id] = last_view
break
for view in reversed(self.views):
# first try to find out if this view "contains" any annotation to the doc
# then, check for individual annotations
# TODO (krim @ 7/15/24): update id checking once https://github.com/clamsproject/mmif/issues/228 is resolved
if [cont for cont in view.metadata.contains.values() if doc_id.endswith(cont.get('document', 'TODO:this endswith test is a temporal solution we use until long_id is forced everywhere'))] \
or list(view.get_annotations(document=doc_id)):
last_view_for_docs[doc_id] = view
break
for doc_id, found_props in anns_to_write.items():
# ignore the "empty" id property from temporary dict
# `id` is "required" attribute for `AnnotationProperty` class
# thus will always be present in `props` dict as a key with emtpy value
# also ignore duplicate k-v pairs
props = {}
for k, v in found_props.items():
if k != 'id' and existing_anns[doc_id][k] != v:
props[k] = v
if props:
view_to_write = last_view_for_docs[doc_id]
if view_to_write.metadata.app == current_app and view_to_write.annotations.get_item(doc_id) is not None:
view_to_write.get_document_by_id(doc_id).properties.update(props)
else:
if len(anns_to_write) == 1:
# if there's only one document, we can record the doc_id in the contains metadata
view_to_write.metadata.new_contain(AnnotationTypes.Annotation, document=doc_id)
props.pop('document', None)
else:
# otherwise, doc_id needs to be recorded in the annotation property
props['document'] = doc_id
view_to_write.new_annotation(AnnotationTypes.Annotation, **props)
def sanitize(self):
"""
Sanitizes a Mmif object by running some safeguards.
Concretely, it performs the following before returning the JSON string.
#. validating output using built-in MMIF jsonschema
#. remove non-existing annotation types from ``contains`` metadata
"""
for view in self.views:
existing_at_types = set(annotation.at_type for annotation in view.annotations)
to_pop = set()
for contains_at_type in view.metadata.contains.keys():
if contains_at_type not in existing_at_types:
to_pop.add(contains_at_type)
for key in to_pop:
view.metadata.contains.pop(key)
serialized = self.serialize()
self.validate(serialized)
def new_view_id(self) -> str:
"""
Fetches an ID for a new view.
:return: the ID
"""
index = len(self.views)
new_id = self.view_prefix + str(index)
while new_id in self.views:
index += 1
new_id = self.view_prefix + str(index)
return new_id
def new_view(self) -> View:
"""
Creates an empty view with a new ID and appends it to the views list.
:return: a reference to the new View object
"""
new_view = View()
new_view.id = self.new_view_id()
new_view.metadata.timestamp = datetime.now()
self.add_view(new_view)
return new_view
def add_view(self, view: View, overwrite=False) -> None:
"""
Appends a View object to the views list.
Fails if there is already a view with the same ID in the MMIF object.
:param view: the Document object to add
:param overwrite: if set to True, will overwrite
an existing view with the same ID
:return: None
"""
view._parent_mmif = self
self.views.append(view, overwrite)
def add_document(self, document: Document, overwrite=False) -> None:
"""
Appends a Document object to the documents list.
Fails if there is already a document with the same ID in the MMIF object.
:param document: the Document object to add
:param overwrite: if set to True, will overwrite
an existing view with the same ID
:return: None
"""
self.documents.append(document, overwrite)
def get_documents_in_view(self, vid: Optional[str] = None) -> List[Document]:
"""
Method to get all documents object queries by a view id.
:param vid: the source view ID to search for
:return: a list of documents matching the requested source view ID, or an empty list if the view not found
"""
view = self.views.get_item(vid)
if view is not None:
return view.get_documents()
else:
return []
def get_documents_by_type(self, doc_type: Union[str, DocumentTypes]) -> List[Document]:
"""
Method to get all documents where the type matches a particular document type, which should be one of the CLAMS document types.
:param doc_type: the type of documents to search for, must be one of ``Document`` type defined in the CLAMS vocabulary.
:return: a list of documents matching the requested type, or an empty list if none found.
"""
docs = []
# although only `TextDocument`s are allowed in view:annotations list, this implementation is more future-proof
for view in self.views:
docs.extend([document for document in view.get_documents() if document.is_type(doc_type)])
docs.extend([document for document in self.documents if document.is_type(doc_type)])
return docs
def get_documents_by_app(self, app_id: str) -> List[Document]:
"""
Method to get all documents object queries by its originated app name.
:param app_id: the app name to search for
:return: a list of documents matching the requested app name, or an empty list if the app not found
"""
docs = []
for view in self.views:
if view.metadata.app == app_id:
docs.extend(view.get_documents())
return docs
def get_documents_by_property(self, prop_key: str, prop_value: str) -> List[Document]:
"""
Method to retrieve documents by an arbitrary key-value pair in the document properties objects.
:param prop_key: the metadata key to search for
:param prop_value: the metadata value to match
:return: a list of documents matching the requested metadata key-value pair
"""
docs = []
for view in self.views:
for doc in view.get_documents():
if prop_key in doc and doc.get_property(prop_key) == prop_value:
docs.append(doc)
docs.extend([document for document in self.documents if document.get_property(prop_key) == prop_value])
return docs
def get_documents_locations(self, m_type: Union[DocumentTypes, str], path_only=False) -> List[Union[str, None]]:
"""
This method returns the file paths of documents of given type.
Only top-level documents have locations, so we only check them.
:param m_type: the type to search for
:return: a list of the values of the location fields in the corresponding documents
"""
docs = [document for document in self.documents if document.is_type(m_type) and document.location is not None]
if path_only:
return [doc.location_path() for doc in docs]
else:
return [doc.location for doc in docs]
def get_document_location(self, m_type: Union[DocumentTypes, str], path_only=False) -> Optional[str]:
"""
Method to get the location of *first* document of given type.
:param m_type: the type to search for
:return: the value of the location field in the corresponding document
"""
# TODO (krim @ 8/10/20): Is returning the first location desirable?
locations = self.get_documents_locations(m_type, path_only=path_only)
return locations[0] if len(locations) > 0 else None
def get_document_by_id(self, doc_id: str) -> Document:
"""
Finds a Document object with the given ID.
:param doc_id: the ID to search for
:return: a reference to the corresponding document, if it exists
:raises KeyError: if there is no corresponding document
"""
if self.id_delimiter in doc_id:
vid, did = doc_id.split(self.id_delimiter)
view = self[vid]
if isinstance(view, View):
return view.get_document_by_id(did)
else:
raise KeyError("{} view not found".format(vid))
else:
doc_found = self.documents.get_item(doc_id)
if doc_found is None:
raise KeyError("{} document not found".format(doc_id))
return cast(Document, doc_found)
def get_view_by_id(self, req_view_id: str) -> View:
"""
Finds a View object with the given ID.
:param req_view_id: the ID to search for
:return: a reference to the corresponding view, if it exists
:raises Exception: if there is no corresponding view
"""
result = self.views.get_item(req_view_id)
if result is None:
raise KeyError("{} view not found".format(req_view_id))
return result
def get_alignments(self, at_type1: Union[str, ThingTypesBase], at_type2: Union[str, ThingTypesBase]) -> Dict[str, List[Annotation]]:
"""
Finds views where alignments between two given annotation types occurred.
:return: a dict that keyed by view IDs (str) and has lists of alignment Annotation objects as values.
"""
v_and_a = {}
at_type1, at_type2 = [ThingTypesBase.from_str(x) if isinstance(x, str) else x for x in (at_type1, at_type2)]
assert at_type1 != at_type2, f"Alignment must be between two different types, given only one: {at_type1}"
for alignment_view in self.get_all_views_contain(AnnotationTypes.Alignment):
alignments = []
contains_meta = alignment_view.metadata.contains[AnnotationTypes.Alignment]
if 'sourceType' in contains_meta and 'targetType' in contains_meta:
aligned_types = [ThingTypesBase.from_str(x)
for x in {contains_meta['sourceType'], contains_meta['targetType']}]
if len(aligned_types) == 2 and at_type1 in aligned_types and at_type2 in aligned_types:
alignments.extend(alignment_view.annotations)
else:
for alignment in alignment_view.get_annotations(AnnotationTypes.Alignment):
aligned_types = set()
for ann_id in [alignment.get_property('target'), alignment.get_property('source')]:
ann_id = cast(str, ann_id)
if self.id_delimiter in ann_id:
view_id, ann_id = ann_id.split(self.id_delimiter)
aligned_type = cast(Annotation, self[view_id][ann_id]).at_type
else:
aligned_type = cast(Annotation, alignment_view[ann_id]).at_type
aligned_types.add(aligned_type)
aligned_types = list(aligned_types) # because membership check for sets also checks hash() values
if len(aligned_types) == 2 and at_type1 in aligned_types and at_type2 in aligned_types:
alignments.append(alignment)
if len(alignments) > 0:
v_and_a[alignment_view.id] = alignments
return v_and_a
def get_views_for_document(self, doc_id: str) -> List[View]:
"""
Returns the list of all views that have annotations anchored on a particular document.
Note that when the document is inside a view (generated during the pipeline's running),
doc_id must be prefixed with the view_id.
"""
views = []
for view in self.views:
annotations = view.get_annotations(document=doc_id)
try:
next(annotations)
views.append(view)
except StopIteration:
# means search failed by the full doc_id string,
# now try trimming the view_id from the string and re-do the search
if self.id_delimiter in doc_id:
vid, did = doc_id.split(self.id_delimiter)
if view.id == vid:
annotations = view.get_annotations(document=did)
try:
next(annotations)
views.append(view)
except StopIteration:
# both search failed, give up and move to next view
pass
return views
def get_all_views_with_error(self) -> List[View]:
"""
Returns the list of all views in the MMIF that have errors.
:return: the list of views that contain errors but no annotations
"""
return [v for v in self.views if v.has_error()]
get_views_with_error = get_all_views_with_error
def get_all_views_contain(self, *at_types: Union[ThingTypesBase, str]) -> List[View]:
"""
Returns the list of all views in the MMIF if given types
are present in that view's 'contains' metadata.
:param at_types: a list of types or just a type to check for. When given more than one types, all types must be found.
:return: the list of views that contain the type
"""
return [view for view in self.views
if all(map(lambda x: x in view.metadata.contains, at_types))]
get_views_contain = get_all_views_contain
def get_view_with_error(self) -> Optional[View]:
"""
Returns the last view appended that contains an error.
:return: the view, or None if no error is found
"""
for view in reversed(self.views):
if view.has_error():
return view
return None
def get_last_error(self) -> Optional[str]:
"""
Returns the last error message found in the views.
:return: the error message in human-readable format, or None if no error is found
"""
v = self.get_view_with_error()
return v.get_error() if v is not None else None
def get_view_contains(self, at_types: Union[ThingTypesBase, str, List[Union[str, ThingTypesBase]]]) -> Optional[View]:
"""
Returns the last view appended that contains the given
types in its 'contains' metadata.
:param at_types: a list of types or just a type to check for. When given more than one types, all types must be found.
:return: the view, or None if the type is not found
"""
# will return the *latest* view
# works as of python 3.6+ (checked by setup.py) because dicts are deterministically ordered by insertion order
for view in reversed(self.views):
if isinstance(at_types, list):
if all(map(lambda x: x in view.metadata.contains, at_types)):
return view
else:
if at_types in view.metadata.contains:
return view
return None
def _is_in_time_range(self, ann: Annotation, range_s: Union[int, float], range_e: Union[int, float]) -> bool:
"""
Checks if the annotation is anchored within the given time range. Any overlap is considered included.
:param ann: the Annotation object to check, must be time-based itself or anchored to time-based annotations
:param range_s: the start time point of the range (in milliseconds)
:param range_e: the end time point of the range (in milliseconds)
:return: True if the annotation is anchored within the time range, False otherwise
"""
ann_s, ann_e = self.get_start(ann), self.get_end(ann)
return (ann_s < range_s < ann_e) or (ann_s < range_e < ann_e) or (ann_s > range_s and ann_e < range_e)
def get_annotations_between_time(self, start: Union[int, float], end: Union[int, float], time_unit: str = "ms",
at_types: List[Union[ThingTypesBase, str]] = []) -> Iterator[Annotation]:
"""
Finds annotations that are anchored between the given time points.
:param start: the start time point in the unit of `input_unit`
:param end: the end time point in the unit of `input_unit`
:param time_unit: the unit of the input time points. Default is `ms`.
:param at_types: a list of annotation types to filter with. Any type in this list will be included in the return.
:return: an iterator of Annotation objects that are anchored between the given time points
"""
assert start < end, f"Start time point must be smaller than the end time point, given {start} and {end}"
assert start >= 0, f"Start time point must be non-negative, given {start}"
assert end >= 0, f"End time point must be non-negative, given {end}"
from mmif.utils.timeunit_helper import convert
time_anchors_in_range = []
at_types = set(at_types)
for view in self.get_all_views_contain(AnnotationTypes.TimeFrame) + self.get_all_views_contain(AnnotationTypes.TimePoint):
time_unit_in_view = view.metadata.contains.get(AnnotationTypes.TimeFrame)["timeUnit"]
start_time = convert(start, time_unit, time_unit_in_view, 1)
end_time = convert(end, time_unit, time_unit_in_view, 1)
for ann in view.get_annotations():
if ann.at_type in (AnnotationTypes.TimeFrame, AnnotationTypes.TimePoint) and self._is_in_time_range(ann, start_time, end_time):
time_anchors_in_range.append(ann)
time_anchors_in_range.sort(key=lambda x: self.get_start(x))
for time_anchor in time_anchors_in_range:
if not at_types or time_anchor.at_type in at_types:
yield time_anchor
for aligned in time_anchor.get_all_aligned():
if not at_types or aligned.at_type in at_types:
yield aligned
def _get_linear_anchor_point(self, ann: Annotation, targets_sorted=False, start: bool = True) -> Union[int, float]:
# TODO (krim @ 2/5/24): Update the return type once timeunits are unified to `ms` as integers (https://github.com/clamsproject/mmif/issues/192)
"""
Retrieves the anchor point of the annotation. Currently, this method only supports linear anchors,
namely time and text, hence does not work with spatial anchors (polygons or video-object).
:param ann: An Annotation object that has a linear anchor point. Namely, some subtypes of `Region` vocabulary type.
:param start: If True, returns the start anchor point. Otherwise, returns the end anchor point. N/A for `timePoint` anchors.
:param targets_sorted: If True, the method will assume that the targets are sorted in the order of the anchor points.
:return: the anchor point of the annotation. 1d for linear regions (time, text)
"""
props = ann.properties
if 'timePoint' in props:
return ann.get_property('timePoint')
elif 'targets' in props:
def get_target_ann(cur_ann, target_id):
if self.id_delimiter not in target_id:
target_id = self.id_delimiter.join((cur_ann.parent, target_id))
return self.__getitem__(target_id)
if not targets_sorted:
point = math.inf if start else -1
comp = min if start else max
for target_id in ann.get_property('targets'):
target = get_target_ann(ann, target_id)
point = comp(point, self._get_linear_anchor_point(target, start=start))
return point
target_id = ann.get_property('targets')[0 if start else -1]
target = get_target_ann(ann, target_id)
return self._get_linear_anchor_point(target, start=start)
elif (start and 'start' in props) or (not start and 'end' in props):
return ann.get_property('start' if start else 'end')
else:
raise ValueError(f"{ann.id} ({ann.at_type}) does not have a valid anchor point. Is it a valid 'Region' type?")
def get_start(self, annotation: Annotation) -> Union[int, float]:
"""
An alias to `get_anchor_point` method with `start=True`.
"""
return self._get_linear_anchor_point(annotation, start=True)
def get_end(self, annotation: Annotation) -> Union[int, float]:
"""
An alias to `get_anchor_point` method with `start=False`.
"""
return self._get_linear_anchor_point(annotation, start=False)
def __getitem__(self, item: str) \
-> Union[Document, View, Annotation, MmifMetadata, DocumentsList, ViewsList]:
"""
getitem implementation for Mmif. This will try to find any object, given an identifier or an immediate
attribute name. When nothing is found, this will raise an error rather than returning a None
:raises KeyError: if the item is not found or if the search results are ambiguous
:param item: an attribute name or an object identifier (a document ID, a view ID, or an annotation ID). When
annotation ID is given as a "short" ID (without view ID prefix), the method will try to find a
match from the first view, and return immediately if found.
:return: the object searched for
:raise KeyError: if the item is not found or multiple objects are found with the same ID
"""
if item in self._named_attributes():
return self.__dict__[item]
split_attempt = item.split(self.id_delimiter)
found = []
if len(split_attempt) == 1:
found.append(self.documents.get_item(split_attempt[0]))
found.append(self.views.get_item(split_attempt[0]))
for view in self.views:
found.append(view.annotations.get_item(split_attempt[0]))
elif len(split_attempt) == 2:
v = self.get_view_by_id(split_attempt[0])
if v is not None:
found.append(v.annotations.get_item(split_attempt[1]))
else:
raise KeyError("Tried to subscript into a view that doesn't exist")
found = [x for x in found if x is not None]
if len(found) > 1:
raise KeyError("Ambiguous ID search result")
elif len(found) == 0:
raise KeyError("ID not found: %s" % item)
else:
return found[-1]
def get(self, key: str, default=None) -> Optional[Union[Document,
View,
Annotation,
MmifMetadata,
DocumentsList,
ViewsList]]:
"""
A get method that returns the default value if the key is not found.
:param key: the key to search for
:param default: the default value to return if the key is not found
:return: the value of the key, or the default value if the key is not found
"""
try:
return self.__getitem__(key)
except KeyError:
return default