10
10
import warnings
11
11
from collections import defaultdict
12
12
from datetime import datetime
13
- from typing import List , Union , Optional , Dict , cast , Iterator , Tuple
13
+ from typing import List , Union , Optional , Dict , cast , Iterator
14
14
15
15
import jsonschema .validators
16
16
@@ -235,14 +235,39 @@ def _deserialize(self, input_dict: dict) -> None:
235
235
# add quick access to `start` and `end` values if the annotation is using `targets` property
236
236
if 'targets' in ann .properties :
237
237
if 'start' in ann .properties or 'end' in ann .properties :
238
- raise ValueError (f"Annotation { ann .id } (in view { view .id } ) has `targes ` and `start`/`end/` "
238
+ raise ValueError (f"Annotation { ann .id } (in view { view .id } ) has `targets ` and `start`/`end/` "
239
239
f"properties at the same time. Annotation anchors are ambiguous." )
240
240
ann ._props_ephemeral ['start' ] = self ._get_linear_anchor_point (ann , start = True )
241
241
ann ._props_ephemeral ['end' ] = self ._get_linear_anchor_point (ann , start = False )
242
242
243
243
## caching alignments
244
244
if ann .at_type == AnnotationTypes .Alignment :
245
- view ._cache_alignment (ann )
245
+ self ._cache_alignment (ann )
246
+
247
+ def _cache_alignment (self , alignment_ann : Annotation ):
248
+ view = self .views .get (alignment_ann .parent )
249
+ if view is None :
250
+ warnings .warn (f"Alignment { alignment_ann .long_id } doesn't have a parent view, but it should." , RuntimeWarning )
251
+ return
252
+
253
+ ## caching alignments
254
+ def _desprately_search_annotation_object (ann_short_id ):
255
+ ann_long_id = f"{ view .id } { self .id_delimiter } { ann_short_id } "
256
+ try :
257
+ return self .__getitem__ (ann_long_id )
258
+ except KeyError :
259
+ return self .__getitem__ (ann_short_id )
260
+
261
+ if all (map (lambda x : x in alignment_ann .properties , ('source' , 'target' ))):
262
+ source_ann = _desprately_search_annotation_object (alignment_ann .get ('source' ))
263
+ target_ann = _desprately_search_annotation_object (alignment_ann .get ('target' ))
264
+ if isinstance (source_ann , Annotation ) and isinstance (target_ann , Annotation ):
265
+ source_ann ._cache_alignment (alignment_ann , target_ann )
266
+ target_ann ._cache_alignment (alignment_ann , source_ann )
267
+ else :
268
+ warnings .warn (
269
+ f"Alignment { alignment_ann .long_id } has `source` and `target` properties that do not point to Annotation objects." ,
270
+ RuntimeWarning )
246
271
247
272
def generate_capital_annotations (self ):
248
273
"""
@@ -566,19 +591,16 @@ def get_all_views_with_error(self) -> List[View]:
566
591
567
592
get_views_with_error = get_all_views_with_error
568
593
569
- def get_all_views_contain (self , at_types : Union [ThingTypesBase , str , List [ Union [ str , ThingTypesBase ]] ]) -> List [View ]:
594
+ def get_all_views_contain (self , * at_types : Union [ThingTypesBase , str ]) -> List [View ]:
570
595
"""
571
596
Returns the list of all views in the MMIF if given types
572
597
are present in that view's 'contains' metadata.
573
598
574
599
:param at_types: a list of types or just a type to check for. When given more than one types, all types must be found.
575
600
:return: the list of views that contain the type
576
601
"""
577
- if isinstance (at_types , list ):
578
- return [view for view in self .views
579
- if all (map (lambda x : x in view .metadata .contains , at_types ))]
580
- else :
581
- return [view for view in self .views if at_types in view .metadata .contains ]
602
+ return [view for view in self .views
603
+ if all (map (lambda x : x in view .metadata .contains , at_types ))]
582
604
583
605
get_views_contain = get_all_views_contain
584
606
@@ -621,35 +643,20 @@ def get_view_contains(self, at_types: Union[ThingTypesBase, str, List[Union[str,
621
643
return view
622
644
return None
623
645
624
- def _is_in_time_range (self , ann : Annotation , start : Union [int , float ], end : Union [int , float ]) -> bool :
646
+ def _is_in_time_range (self , ann : Annotation , range_s : Union [int , float ], range_e : Union [int , float ]) -> bool :
625
647
"""
626
- Checks if the annotation is anchored within the given time range.
648
+ Checks if the annotation is anchored within the given time range. Any overlap is considered included.
627
649
628
- :param ann: the Annotation object to check
629
- :param start : the start time point in milliseconds
630
- :param end : the end time point in milliseconds
650
+ :param ann: the Annotation object to check, must be time-based itself or anchored to time-based annotations
651
+ :param range_s : the start time point of the range ( in milliseconds)
652
+ :param range_e : the end time point of the range ( in milliseconds)
631
653
632
654
:return: True if the annotation is anchored within the time range, False otherwise
633
655
"""
634
- s , e = self .get_start (ann ), self .get_end (ann )
635
- return (s < start < e ) or (s < end < e ) or (s > start and e < end )
636
-
637
- def _handle_time_unit (self , input_unit : str , ann_unit : str ,
638
- start : int , end : int ) -> Tuple [Union [int , float , str ], Union [int , float , str ]]:
639
- """
640
- Helper method to convert time unit defined by user to the unit in mmif object.
656
+ ann_s , ann_e = self .get_start (ann ), self .get_end (ann )
657
+ return (ann_s < range_s < ann_e ) or (ann_s < range_e < ann_e ) or (ann_s > range_s and ann_e < range_e )
641
658
642
- :param input_unit: the time unit defined by user
643
- :param ann_unit: the time unit in mmif object
644
- :param start: the start time point in the unit of `input_unit`
645
- :param end: the end time point in the unit of `input_unit`
646
-
647
- :return: the start and end time points in the unit of `ann_unit`
648
- """
649
- from mmif .utils .timeunit_helper import convert
650
- return convert (start , input_unit , ann_unit , 1 ), convert (end , input_unit , ann_unit , 1 )
651
-
652
- def get_annotations_between_time (self , start : Union [int , float ], end : Union [int , float ],
659
+ def get_annotations_between_time (self , start : Union [int , float ], end : Union [int , float ],
653
660
time_unit : str = "ms" ) -> Iterator [Annotation ]:
654
661
"""
655
662
Finds annotations that are anchored between the given time points.
@@ -662,34 +669,24 @@ def get_annotations_between_time(self, start: Union[int, float], end: Union[int,
662
669
assert start < end , f"Start time point must be smaller than the end time point, given { start } and { end } "
663
670
assert start >= 0 , f"Start time point must be non-negative, given { start } "
664
671
assert end >= 0 , f"End time point must be non-negative, given { end } "
672
+
673
+ from mmif .utils .timeunit_helper import convert
665
674
666
- tf_in_range = []
667
- tf_to_anns = defaultdict (list )
675
+ time_anchors_in_range = []
668
676
669
- # Runtime: O(V * (TF * AL))
670
- for view in self .get_all_views_contain ([AnnotationTypes .TimeFrame , AnnotationTypes .Alignment ]):
677
+ for view in self .get_all_views_contain (AnnotationTypes .TimeFrame ) + self .get_all_views_contain (AnnotationTypes .TimePoint ):
671
678
time_unit_in_view = view .metadata .contains .get (AnnotationTypes .TimeFrame )["timeUnit" ]
672
- start_time , end_time = self ._handle_time_unit (time_unit , time_unit_in_view , start , end )
673
-
674
- tf_anns = view .get_annotations (AnnotationTypes .TimeFrame )
675
- al_anns = view .get_annotations (AnnotationTypes .Alignment )
676
-
677
- for tf_ann in tf_anns :
678
- if self ._is_in_time_range (tf_ann , start_time , end_time ):
679
- tf_in_range .append (tf_ann )
680
- tf_to_anns [self .get_start (tf_ann )] = []
681
-
682
- for al_ann in al_anns :
683
- for tf in tf_in_range :
684
- target_ann_long_id = tf .aligned_to_by (al_ann .long_id )
685
- if target_ann_long_id :
686
- tf_to_anns [self .get_start (tf )].append (view .get_annotation_by_id (target_ann_long_id ))
687
- break
688
-
689
- # Runtime: O(TF + AL)
690
- for start_point , anns in dict (sorted (tf_to_anns .items ())).items ():
691
- for ann in anns :
692
- yield ann
679
+
680
+ start_time = convert (start , time_unit , time_unit_in_view , 1 )
681
+ end_time = convert (end , time_unit , time_unit_in_view , 1 )
682
+ for ann in view .get_annotations ():
683
+ if ann .at_type in (AnnotationTypes .TimeFrame , AnnotationTypes .TimePoint ) and self ._is_in_time_range (ann , start_time , end_time ):
684
+ time_anchors_in_range .append (ann )
685
+ time_anchors_in_range .sort (key = lambda x : self .get_start (x ))
686
+ for time_anchor in time_anchors_in_range :
687
+ yield time_anchor
688
+ for aligned in time_anchor .get_all_aligned ():
689
+ yield aligned
693
690
694
691
def _get_linear_anchor_point (self , ann : Annotation , targets_sorted = False , start : bool = True ) -> Union [int , float ]:
695
692
# TODO (krim @ 2/5/24): Update the return type once timeunits are unified to `ms` as integers (https://github.com/clamsproject/mmif/issues/192)
0 commit comments