diff --git a/mmif/serialize/mmif.py b/mmif/serialize/mmif.py index b1a04a4f..62112038 100644 --- a/mmif/serialize/mmif.py +++ b/mmif/serialize/mmif.py @@ -24,6 +24,104 @@ __all__ = ['Mmif'] +class MmifMetadata(MmifObject): + """ + Basic MmifObject class to contain the top-level metadata of a MMIF file. + + :param metadata_obj: the JSON data + """ + + def __init__(self, metadata_obj: Optional[Union[bytes, str, dict]] = None) -> None: + # TODO (krim @ 10/7/20): there could be a better name and a better way to give a value to this + self.mmif: str = f"http://mmif.clams.ai/{mmif.__specver__}" + self._required_attributes = ["mmif"] + super().__init__(metadata_obj) + + +class DocumentsList(DataList[Document]): + """ + DocumentsList object that implements :class:`mmif.serialize.model.DataList` + for :class:`mmif.serialize.document.Document`. + """ + _items: Dict[str, Document] + + def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch + """ + Extends base ``_deserialize`` method to initialize ``items`` as a dict from + document IDs to :class:`mmif.serialize.document.Document` objects. + + :param input_list: the JSON data that defines the list of documents + :return: None + """ + self._items = {item['properties']['id']: Document(item) for item in input_list} + + def append(self, value: Document, overwrite=False) -> None: + """ + Appends a document to the list. + + Fails if there is already a document with the same ID + in the list, unless ``overwrite`` is set to True. + + :param value: the :class:`mmif.serialize.document.Document` + object to add + :param overwrite: if set to True, will overwrite an + existing document with the same ID + :raises KeyError: if ``overwrite`` is set to False and + a document with the same ID exists + in the list + :return: None + """ + super()._append_with_key(value.id, value, overwrite) + + +class ViewsList(DataList[View]): + """ + ViewsList object that implements :class:`mmif.serialize.model.DataList` + for :class:`mmif.serialize.view.View`. + """ + _items: Dict[str, View] + + def __init__(self, mmif_obj: Optional[Union[bytes, str, list]] = None): + super().__init__(mmif_obj) + + def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch + """ + Extends base ``_deserialize`` method to initialize ``items`` as a dict from + view IDs to :class:`mmif.serialize.view.View` objects. + + :param input_list: the JSON data that defines the list of views + :return: None + """ + if input_list: + self._items = {item['id']: View(item) for item in input_list} + + def append(self, value: View, overwrite=False) -> None: + """ + Appends a view to the list. + + Fails if there is already a view with the same ID + in the list, unless ``overwrite`` is set to True. + + :param value: the :class:`mmif.serialize.view.View` + object to add + :param overwrite: if set to True, will overwrite an + existing view with the same ID + :raises KeyError: if ``overwrite`` is set to False and + a view with the same ID exists + in the list + :return: None + """ + super()._append_with_key(value.id, value, overwrite) + + def get_last(self) -> Optional[View]: + """ + Returns the last view appended to the list. + """ + for view in reversed(self._items.values()): + if 'error' not in view.metadata and 'warning' not in view.metadata: + return view + + class Mmif(MmifObject): """ MmifObject that represents a full MMIF file. @@ -560,131 +658,41 @@ def get_end(self, annotation: Annotation) -> Union[int, float]: """ return self._get_linear_anchor_point(annotation, start=False) - # pytype: disable=bad-return-type - def __getitem__(self, item: str) -> Union[Document, View, Annotation]: + def __getitem__(self, item: str) \ + -> Union[Document, View, Annotation, MmifMetadata, DocumentsList, ViewsList]: """ - getitem implementation for Mmif. When nothing is found, this will raise an error - rather than returning a None (although pytype doesn't think so...) + getitem implementation for Mmif. This will try to find any object, given an identifier or an immediate + attribute name. When nothing is found, this will raise an error rather than returning a None :raises KeyError: if the item is not found or if the search results are ambiguous - :param item: the search string, a document ID, a view ID, or a view-scoped annotation ID + :param item: an attribute name or an object identifier (a document ID, a view ID, or an annotation ID). When + annotation ID is given as a "short" ID (without view ID prefix), the method will try to find a + match from the first view, and return immediately if found. :return: the object searched for + :raise KeyError: if the item is not found or multiple objects are found with the same ID """ if item in self._named_attributes(): return self.__dict__[item] split_attempt = item.split(self.id_delimiter) - document_result = self.documents.get(split_attempt[0]) - view_result = self.views.get(split_attempt[0]) + found = [] if len(split_attempt) == 1: - anno_result = None - elif view_result: - anno_result = view_result[split_attempt[1]] + found.append(self.documents.get(split_attempt[0])) + found.append(self.views.get(split_attempt[0])) + for view in self.views: + found.append(view.annotations.get(split_attempt[0])) + elif len(split_attempt) == 2: + v = self.get_view_by_id(split_attempt[0]) + if v is not None: + found.append(v.annotations.get(split_attempt[1])) else: raise KeyError("Tried to subscript into a view that doesn't exist") + found = [x for x in found if x is not None] - if view_result and document_result: + if len(found) > 1: raise KeyError("Ambiguous ID search result") - if not (view_result or document_result): + elif len(found) == 0: raise KeyError("ID not found: %s" % item) - return anno_result or view_result or document_result - # pytype: enable=bad-return-type - - -class MmifMetadata(MmifObject): - """ - Basic MmifObject class to contain the top-level metadata of a MMIF file. - - :param metadata_obj: the JSON data - """ - - def __init__(self, metadata_obj: Optional[Union[bytes, str, dict]] = None) -> None: - # TODO (krim @ 10/7/20): there could be a better name and a better way to give a value to this - self.mmif: str = f"http://mmif.clams.ai/{mmif.__specver__}" - self._required_attributes = ["mmif"] - super().__init__(metadata_obj) - - -class DocumentsList(DataList[Document]): - """ - DocumentsList object that implements :class:`mmif.serialize.model.DataList` - for :class:`mmif.serialize.document.Document`. - """ - _items: Dict[str, Document] - - def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch - """ - Extends base ``_deserialize`` method to initialize ``items`` as a dict from - document IDs to :class:`mmif.serialize.document.Document` objects. - - :param input_list: the JSON data that defines the list of documents - :return: None - """ - self._items = {item['properties']['id']: Document(item) for item in input_list} - - def append(self, value: Document, overwrite=False) -> None: - """ - Appends a document to the list. - - Fails if there is already a document with the same ID - in the list, unless ``overwrite`` is set to True. - - :param value: the :class:`mmif.serialize.document.Document` - object to add - :param overwrite: if set to True, will overwrite an - existing document with the same ID - :raises KeyError: if ``overwrite`` is set to False and - a document with the same ID exists - in the list - :return: None - """ - super()._append_with_key(value.id, value, overwrite) - - -class ViewsList(DataList[View]): - """ - ViewsList object that implements :class:`mmif.serialize.model.DataList` - for :class:`mmif.serialize.view.View`. - """ - _items: Dict[str, View] - - def __init__(self, mmif_obj: Optional[Union[bytes, str, list]] = None): - super().__init__(mmif_obj) - - def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch - """ - Extends base ``_deserialize`` method to initialize ``items`` as a dict from - view IDs to :class:`mmif.serialize.view.View` objects. - - :param input_list: the JSON data that defines the list of views - :return: None - """ - if input_list: - self._items = {item['id']: View(item) for item in input_list} - - def append(self, value: View, overwrite=False) -> None: - """ - Appends a view to the list. - - Fails if there is already a view with the same ID - in the list, unless ``overwrite`` is set to True. - - :param value: the :class:`mmif.serialize.view.View` - object to add - :param overwrite: if set to True, will overwrite an - existing view with the same ID - :raises KeyError: if ``overwrite`` is set to False and - a view with the same ID exists - in the list - :return: None - """ - super()._append_with_key(value.id, value, overwrite) - - def get_last(self) -> Optional[View]: - """ - Returns the last view appended to the list. - """ - for view in reversed(self._items.values()): - if 'error' not in view.metadata and 'warning' not in view.metadata: - return view + else: + return found[-1] diff --git a/mmif/utils/video_document_helper.py b/mmif/utils/video_document_helper.py index 425fde69..ec3cebac 100644 --- a/mmif/utils/video_document_helper.py +++ b/mmif/utils/video_document_helper.py @@ -6,7 +6,7 @@ import mmif from mmif import Annotation, Document, Mmif from mmif.utils.timeunit_helper import convert -from mmif.vocabulary import DocumentTypes +from mmif.vocabulary import DocumentTypes, AnnotationTypes for cv_dep in ('cv2', 'ffmpeg', 'PIL'): try: @@ -83,14 +83,16 @@ def extract_frames_as_images(video_document: Document, framenums: List[int], as_ frames = [] video = capture(video_document) cur_f = 0 + tot_fcount = video_document.get_property(FRAMECOUNT_DOCPROP_KEY) while True: - if not framenums or cur_f > video_document.get_property(FRAMECOUNT_DOCPROP_KEY): + if not framenums or cur_f > tot_fcount: break ret, frame = video.read() if cur_f == framenums[0]: if not ret: sec = convert(cur_f, 'f', 's', video_document.get_property(FPS_DOCPROP_KEY)) warnings.warn(f'Frame #{cur_f} ({sec}s) could not be read from the video {video_document.id}.') + cur_f += 1 continue frames.append(Image.fromarray(frame[:, :, ::-1]) if as_PIL else frame) framenums.pop(0) @@ -125,6 +127,42 @@ def extract_mid_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False): return extract_frames_as_images(vd, [get_mid_framenum(mmif, time_frame)], as_PIL=as_PIL)[0] +def get_representative_framenum(mmif: Mmif, time_frame: Annotation): + """ + Calculates the representative frame number from an annotation. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation containing a `representatives` property (``"@type": ".../TimeFrame/..."``) + :return: representative frame number as an integer + """ + if 'representatives' not in time_frame.properties: + raise ValueError(f'The time frame {time_frame.id} does not have a representative.') + timeunit = time_frame.get_property('timeUnit') + video_document = mmif[time_frame.get_property('document')] + fps = get_framerate(video_document) + representatives = time_frame.get_property('representatives') + top_representative_id = representatives[0] + try: + representative_timepoint_anno = mmif[time_frame._parent_view_id+time_frame.id_delimiter+top_representative_id] + except KeyError: + raise ValueError(f'Representative timepoint {top_representative_id} not found in any view.') + return convert(representative_timepoint_anno.get_property('timePoint'), timeunit, 'frame', fps) + + +def extract_representative_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False): + """ + Extracts the representative frame of an annotation as a numpy ndarray or PIL Image. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) + :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` + :return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image` + """ + video_document = mmif[time_frame.get_property('document')] + rep_frame_num = get_representative_framenum(mmif, time_frame) + return extract_frames_as_images(video_document, [rep_frame_num], as_PIL=as_PIL)[0] + + def sample_frames(start_frame: int, end_frame: int, sample_rate: float = 1) -> List[int]: """ Helper function to sample frames from a time interval. diff --git a/tests/test_serialize.py b/tests/test_serialize.py index 3079d0e4..b83174b6 100644 --- a/tests/test_serialize.py +++ b/tests/test_serialize.py @@ -516,6 +516,29 @@ def test_mmif_getitem_document(self): except KeyError: self.fail("didn't get document 'm1'") + def test_mmif_getitem_idconflict(self): + m = Mmif(validate=False) + v1 = m.new_view() + v1.id = 'v1' + v2 = m.new_view() + v2.id = 'v1' + with pytest.raises(KeyError): + _ = m['v1'] + + m = Mmif(validate=False) + v1 = m.new_view() + v1a = v1.new_annotation(AnnotationTypes.Annotation, id='a1') + v2 = m.new_view() + v2a = v2.new_annotation(AnnotationTypes.Annotation, id='a1') + self.assertIsNotNone(m[v1.id]) + self.assertIsNotNone(m[v2.id]) + # conflict short IDs + self.assertEqual(v1a.id, v2a.id) + with pytest.raises(KeyError): + _ = m[v1a.id] + self.assertIsNotNone(m[v1a.long_id]) + self.assertIsNotNone(m[v2a.long_id]) + def test_mmif_getitem_view(self): try: v1 = self.mmif_obj['v1'] diff --git a/tests/test_utils.py b/tests/test_utils.py index 332787e6..150bd71f 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -44,6 +44,23 @@ def test_extract_mid_frame(self): tf = self.a_view.new_annotation(AnnotationTypes.TimeFrame, start=0, end=3, timeUnit='seconds', document='d1') self.assertEqual(vdh.convert(1.5, 's', 'f', self.fps), vdh.get_mid_framenum(self.mmif_obj, tf)) + def test_extract_representative_frame(self): + tp = self.a_view.new_annotation(AnnotationTypes.TimePoint, timePoint=1500, timeUnit='milliseconds', document='d1') + tf = self.a_view.new_annotation(AnnotationTypes.TimeFrame, start=1000, end=2000, timeUnit='milliseconds', document='d1') + tf.add_property('representatives', [tp.id]) + rep_frame_num = vdh.get_representative_framenum(self.mmif_obj, tf) + expected_frame_num = vdh.millisecond_to_framenum(self.video_doc, tp.get_property('timePoint')) + self.assertEqual(expected_frame_num, rep_frame_num) + # check there is an error if no representatives + tf = self.a_view.new_annotation(AnnotationTypes.TimeFrame, start=1000, end=2000, timeUnit='milliseconds', document='d1') + with pytest.raises(ValueError): + vdh.get_representative_framenum(self.mmif_obj, tf) + # check there is an error if there is a representative referencing a timepoint that + # does not exist + tf.add_property('representatives', ['fake_tp_id']) + with pytest.raises(ValueError): + vdh.get_representative_framenum(self.mmif_obj, tf) + def test_get_framerate(self): self.assertAlmostEqual(29.97, vdh.get_framerate(self.video_doc), places=0)