From d36ac5e85405ecdfe0637d2c1653a32d513b773c Mon Sep 17 00:00:00 2001 From: Bohan Jiang Date: Tue, 20 Aug 2024 16:26:53 -0400 Subject: [PATCH 1/3] Updated get/__getitem__ method in dict-like class --- mmif/serialize/__init__.py | 38 ++++++++ mmif/serialize/annotation.py | 178 ++++++++++++++++++----------------- mmif/serialize/mmif.py | 136 ++++++++++++++------------ mmif/serialize/model.py | 65 ++++++++----- mmif/serialize/view.py | 91 ++++++++++-------- 5 files changed, 297 insertions(+), 211 deletions(-) diff --git a/mmif/serialize/__init__.py b/mmif/serialize/__init__.py index 18523bac..7962010c 100644 --- a/mmif/serialize/__init__.py +++ b/mmif/serialize/__init__.py @@ -1,3 +1,41 @@ +""" +Aggregatitive summary for mmif.serialize package: + +''Prerequisites'': recall CLAMS MMIF structure at https://mmif.clams.ai/1.0.5/#the-structure-of-mmif-files + +The MMIF follows JSON schema which consists of two data structures: dictionary/hash and list. +Therefore, for the purpose of best practice in Python's OOP, MMIF overwrites its own 'dict-like' +and 'list-like' classes. + +As a high-level overview of the package, the following parent classes are defined first: + +- `MmifObject`: a base class for MMIF objects that are ''dict-like'' +- `DataList`: a base class for MMIF data fields that are ''list-like'' +- `DataDict`: a base class for MMIF data fields that are ''dict-like'' + +Then, the following classes are defined and categorized into either ''dict-like'' +or ''list-like'' child classes: + +'''dict-like''': + - `Mmif`: a class for MMIF objects + - `MmifMetaData`: a class for MMIF metadata + - `View`: a class for MMIF view + - `ViewMetaData`: a class for MMIF view metadata + - `ErrorDict`: a class for a specific view that contains error + - `ContainsDict`: a class for `View`'s 'contains' field + - `Annotation` & `Document`: a class for MMIF annotation and document + - `AnnotationProperties` & `DocumentProperties`: a class for MMIF annotation properties + - `Text`: a class for `Document`'s text field + +'''list-like''': + - `DocumentsList`: a class for a list of `Document` objects + - `ViewsList`: a class for a list of `View` objects + - `AnnotationsList`: a class for a list of `Annotation` objects + +By doing so, the any field of a ''dict-like'' class should be accessed if and +only if by either bracket `[key]` or `.get(key)`. For a ''list-like'' class, the +elements are ordered and accessed by index, for example, `[idx]`. +""" from .annotation import * from .annotation import __all__ as anno_all from .mmif import * diff --git a/mmif/serialize/annotation.py b/mmif/serialize/annotation.py index d1b7a245..8436805d 100644 --- a/mmif/serialize/annotation.py +++ b/mmif/serialize/annotation.py @@ -59,59 +59,59 @@ def __init__(self, anno_obj: Optional[Union[bytes, str, dict]] = None, *_) -> No self.disallow_additional_properties() self._required_attributes = ["_type", "properties"] super().__init__(anno_obj) - + def __hash__(self): return hash(self.serialize()) - + def _deserialize(self, input_dict: dict) -> None: self.at_type = input_dict.pop('_type', '') # TODO (krim @ 6/1/21): If annotation IDs must follow a certain string format, # (e.g. currently auto-generated IDs will always have "prefix"_"number" format) - # here is the place to parse formatted IDs and store prefixes in the parent mmif object. + # here is the place to parse formatted IDs and store prefixes in the parent mmif object. # (see https://github.com/clamsproject/mmif/issues/64#issuecomment-849241309 for discussion) super()._deserialize(input_dict) for k, v in self.properties.items(): self._add_prop_aliases(k, v) - + def _cache_alignment(self, alignment_ann: 'Annotation', alignedto_ann: 'Annotation') -> None: """ - Cache alignment information. This cache will not be serialized. - + Cache alignment information. This cache will not be serialized. + :param alignment_ann: the Alignment annotation that has this annotation on one side :param alignedto_ann: the annotation that this annotation is aligned to (other side of Alignment) """ self._alignments[alignment_ann] = alignedto_ann - + def aligned_to_by(self, alignment: 'Annotation') -> Optional['Annotation']: """ - Retrieves the other side of ``Alignment`` annotation that has this annotation on one side. - + Retrieves the other side of ``Alignment`` annotation that has this annotation on one side. + :param alignment: ``Alignment`` annotation that has this annotation on one side - :return: the annotation that this annotation is aligned to (other side of ``Alignment``), + :return: the annotation that this annotation is aligned to (other side of ``Alignment``), or None if this annotation is not used in the ``Alignment``. """ return self._alignments.get(alignment) - + def get_all_aligned(self) -> Iterator['Annotation']: """ Generator to iterate through all alignments and aligned annotations. Note that this generator will yield the `Alignment` annotations as well. Every odd-numbered yield will be an `Alignment` annotation, and every even-numbered yield will be the aligned annotation. If there's a specific annotation type that you're looking - for, you need to filter the generated results outside. - + for, you need to filter the generated results outside. + :return: yields the alignment annotation and the aligned annotation. The order is decided by the order of appearance of Alignment annotations in the MMIF """ for alignment, aligned in self._alignments.items(): yield alignment yield aligned - - + + def _add_prop_aliases(self, key_to_add, val_to_add): """ Method to handle aliases of the same property. - Annotation property aliases were first introduced in MMIF 1.0.2, - with addition of general `label` property to all `Annotation` + Annotation property aliases were first introduced in MMIF 1.0.2, + with addition of general `label` property to all `Annotation` subtypes, and effectively deprecated `frameType` and `boxType` in `TimeFrame` and `BoundingBox` respectively. """ @@ -164,31 +164,31 @@ def id(self) -> str: @id.setter def id(self, aid: str) -> None: self.properties.id = aid - + @property def long_id(self) -> str: if self.parent is not None and len(self.parent) > 0: return f"{self.parent}{self.id_delimiter}{self.id}" else: return self.id - + @long_id.setter def long_id(self, long_id: str) -> None: if self.id_delimiter in long_id: self.parent, self.id = long_id.split(self.id_delimiter) else: self.id = long_id - + @staticmethod def check_prop_value_is_simple_enough( value: Union[PRMTV_TYPES, LIST_PRMTV, LIST_LIST_PRMTV, DICT_PRMTV, DICT_LIST_PRMTV]) -> bool: - - def json_primitives(x): + + def json_primitives(x): return isinstance(x, typing.get_args(PRMTV_TYPES)) - + def json_primitives_list(x): return isinstance(x, list) and all(map(json_primitives, x)) - + def json_primitives_list_of_list(x): return all(map(lambda elem: isinstance(elem, list), x) and map(json_primitives, [subelem for elem in x for subelem in elem])) @@ -201,7 +201,7 @@ def add_property(self, name: str, value: Union[PRMTV_TYPES, LIST_PRMTV, LIST_LIST_PRMTV, DICT_PRMTV, DICT_LIST_PRMTV]) -> None: """ Adds a property to the annotation's properties. - + :param name: the name of the property :param value: the property's desired value :return: None @@ -215,13 +215,22 @@ def add_property(self, name: str, # f"(\"{name}\": \"{str(value)}\"") self._add_prop_aliases(name, value) - def get(self, prop_name: str) -> Union['AnnotationProperties', PRMTV_TYPES, LIST_PRMTV, LIST_LIST_PRMTV, DICT_PRMTV, DICT_LIST_PRMTV]: + def get_property(self, prop_name: str, default=None) -> Union['AnnotationProperties', + PRMTV_TYPES, + LIST_PRMTV, + LIST_LIST_PRMTV, + DICT_PRMTV, + DICT_LIST_PRMTV]: """ A special getter for Annotation properties. This is to allow for directly accessing properties without having to go through the - properties object, or view-level annotation properties encoded in the - ``view.metadata.contains`` dict. Note that the regular props will take + properties object, or view-level annotation properties encoded in the + ``view.metadata.contains`` dict. Note that the regular props will take the priority over the ephemeral props when there are conflicts. + + :param prop_name: the name of the property to retrieve + :param default: the default value to return if the property does not exist + :return: the value of the property, or the default value if it does not exist """ if prop_name in {'at_type', '@type'}: return str(self._type) @@ -232,27 +241,22 @@ def get(self, prop_name: str) -> Union['AnnotationProperties', PRMTV_TYPES, LIST elif prop_name in self._props_ephemeral: return self._props_ephemeral[prop_name] else: - raise KeyError(f"Property {prop_name} does not exist in this annotation.") + return default - get_property = get - - def __getitem__(self, prop_name: str): - return self.get(prop_name) - def __contains__(self, item): try: self.get(item) return True except KeyError: return False - + def _get_label(self) -> str: """ Another prototypical method to handle property aliases. - See :meth:`.Annotation._add_prop_aliases` for more details on + See :meth:`.Annotation._add_prop_aliases` for more details on what property aliases are. - Not recommended to use this method as `_add_prop_aliases` method - is preferred. + Not recommended to use this method as `_add_prop_aliases` method + is preferred. """ if 'label' in self: return str(self.get('label')) @@ -262,7 +266,7 @@ def _get_label(self) -> str: return str(self.get('boxType')) else: raise KeyError("No label found in this annotation.") - + def is_document(self): return isinstance(self._type, DocumentTypesBase) @@ -287,45 +291,45 @@ def __init__(self, doc_obj: Optional[Union[bytes, str, dict]] = None, *_) -> Non self._props_original: DocumentProperties = DocumentProperties() self._props_pending: AnnotationProperties = AnnotationProperties() self.reserved_names.update(('_props_original', '_props_pending')) - + self._type: Union[ThingTypesBase, DocumentTypesBase] = ThingTypesBase('') self.properties = self._props_original self.disallow_additional_properties() self._attribute_classes = {'properties': DocumentProperties} super().__init__(doc_obj) - + def add_property(self, name: str, value: Union[PRMTV_TYPES, LIST_PRMTV] ) -> None: """ Adds a property to the document's properties. - - Unlike the parent :class:`Annotation` class, added properties of a - ``Document`` object can be lost during serialization unless it belongs - to somewhere in a ``Mmif`` object. This is because we want to keep - ``Document`` object as "read-only" as possible. Thus, if you want to add - a property to a ``Document`` object, - - * add the document to a ``Mmif`` object (either in the documents list or + + Unlike the parent :class:`Annotation` class, added properties of a + ``Document`` object can be lost during serialization unless it belongs + to somewhere in a ``Mmif`` object. This is because we want to keep + ``Document`` object as "read-only" as possible. Thus, if you want to add + a property to a ``Document`` object, + + * add the document to a ``Mmif`` object (either in the documents list or in a view from the views list), or * directly write to ``Document.properties`` instead of using this method - (which is not recommended). - - With the former method, the SDK will record the added property as a - `Annotation` annotation object, separate from the original `Document` + (which is not recommended). + + With the former method, the SDK will record the added property as a + `Annotation` annotation object, separate from the original `Document` object. See :meth:`.Mmif.generate_capital_annotations()` for more. - + A few notes to keep in mind: - - #. You can't overwrite an existing property of a ``Document`` object. - #. A MMIF can have multiple ``Annotation`` objects with the same + + #. You can't overwrite an existing property of a ``Document`` object. + #. A MMIF can have multiple ``Annotation`` objects with the same property name but different values. When this happens, the SDK will - only keep the latest value (in order of appearances in views list) of + only keep the latest value (in order of appearances in views list) of the property, effectively overwriting the previous values. """ # we don't checking if this k-v already exists in _original (new props) or _ephemeral (read from existing MMIF) # because it is impossible to keep the _original updated when a new annotation is added (via `new_annotation`) - # without look across other views and top-level documents list. Also see + # without look across other views and top-level documents list. Also see # ``mmif.serialize.mmif.Mmif.generate_capital_annotations`` for where the "de-duplication" happens. if name == "text": self.properties.text = Text(value) @@ -339,14 +343,14 @@ def add_property(self, name: str, else: super().add_property(name, value) - def get(self, prop_name): + def get_property(self, prop_name: str, default=None): """ A special getter for Document properties. The major difference from - the super class's :py:meth:`Annotation.get` method is that Document - class has one more set of *"pending"* properties, that are added after - the Document object is created and will be serialized as a separate - :py:class:`Annotation` object of which ``@type = Annotation``. The - pending properties will take the priority over the regular properties + the super class's :py:meth:`Annotation.get` method is that Document + class has one more set of *"pending"* properties, that are added after + the Document object is created and will be serialized as a separate + :py:class:`Annotation` object of which ``@type = Annotation``. The + pending properties will take the priority over the regular properties when there are conflicts. """ if prop_name == 'id': @@ -362,10 +366,8 @@ class has one more set of *"pending"* properties, that are added after elif prop_name in self._props_ephemeral: return self._props_ephemeral[prop_name] else: - return super().get(prop_name) + return super().get_property(prop_name, default) - get_property = get - @property def text_language(self) -> str: if self._type == DocumentTypes.TextDocument: @@ -433,7 +435,7 @@ def location_path(self, nonexist_ok=True) -> Optional[str]: To obtain the original value of the "path" part in the location string (before resolving), use ``properties.location_path_literal`` method. Returns None when no location is set. - + :param nonexist_ok: if False, raise FileNotFoundError when the resolved path doesn't exist """ return self.properties.location_path_resolved(nonexist_ok=nonexist_ok) @@ -455,22 +457,22 @@ def __delitem__(self, key: str) -> None: else: raise AttributeError(f'Cannot delete a required attribute "{key}"!') raise KeyError(f'Key "{key}" not found.') - + def __iter__(self) -> Iterator[str]: """ - ``__iter__`` on Mapping should basically work as ``keys()`` method + ``__iter__`` on Mapping should basically work as ``keys()`` method of vanilla dict. """ for key in itertools.chain(self._named_attributes(), self._unnamed_attributes): yield key - - def __getitem__(self, key): + + def __getitem__(self, key: str) -> Optional[T]: """ - Parent MmifObject class has a __getitem__ method that checks if - the value is empty when asked for an unnamed attribute. But for - AnnotationProperties, any arbitrary property that's added - explicitly by the user (developer) should not be ignored and - returned even the value is empty. + Parent MmifObject class has a __getitem__ method that checks if + the value is empty when asked for an unnamed attribute. But for + AnnotationProperties, any arbitrary property that's added + explicitly by the user (developer) should not be ignored and + returned even the value is empty. """ if key in self._named_attributes(): return self.__dict__[key] @@ -504,9 +506,9 @@ def __init__(self, mmif_obj: Optional[Union[bytes, str, dict]] = None, *_) -> No self.text: Text = Text() self._attribute_classes = {'text': Text} # in theory, either `location` or `text` should appear in a `document` - # but with current implementation, there's no easy way to set a condition - # for `oneOf` requirement - # see MmifObject::_required_attributes in model.py + # but with current implementation, there's no easy way to set a condition + # for `oneOf` requirement + # see MmifObject::_required_attributes in model.py super().__init__(mmif_obj) def _deserialize(self, input_dict: dict) -> None: @@ -519,7 +521,7 @@ def _serialize(self, alt_container: Optional[Dict] = None) -> dict: if "location_" in serialized: serialized["location"] = serialized.pop("location_") return serialized - + @property def text_language(self) -> str: return self.text.lang @@ -539,8 +541,8 @@ def text_value(self, s: str) -> None: @property def location(self) -> Optional[str]: """ - ``location`` property must be a legitimate URI. That is, should the document be a local file - then the file:// scheme must be used. + ``location`` property must be a legitimate URI. That is, should the document be a local file + then the file:// scheme must be used. Returns None when no location is set. """ return self.location_ if len(self.location_) > 0 else None @@ -578,14 +580,14 @@ def location_address(self) -> Optional[str]: def location_path(self) -> Optional[str]: warnings.warn('location_path() is deprecated. Use location_path_resolved() instead.', DeprecationWarning) return self.location_path_resolved() - + def location_path_resolved(self, nonexist_ok=True) -> Optional[str]: """ - Retrieves only path name of the document location (hostname is ignored), + Retrieves only path name of the document location (hostname is ignored), and then try to resolve the path name in the local file system. This method should be used when the document scheme is ``file`` or empty. For other schemes, users should install ``mmif-locdoc-`` plugin. - + Returns None when no location is set. Raise ValueError when no code found to resolve the given location scheme. """ @@ -605,7 +607,7 @@ def location_path_resolved(self, nonexist_ok=True) -> Optional[str]: def location_path_literal(self) -> Optional[str]: """ - Retrieves only path name of the document location (hostname is ignored). + Retrieves only path name of the document location (hostname is ignored). Returns None when no location is set. """ if self.location is None: diff --git a/mmif/serialize/mmif.py b/mmif/serialize/mmif.py index 1900a0d4..a357853d 100644 --- a/mmif/serialize/mmif.py +++ b/mmif/serialize/mmif.py @@ -122,14 +122,14 @@ def get_last_contentful_view(self) -> Optional[View]: for view in reversed(self._items.values()): if 'error' not in view.metadata and 'warnings' not in view.metadata: return view - + def get_last_view(self) -> Optional[View]: """ Returns the last view appended. """ if self._items: return self._items[list(self._items.keys())[-1]] - + def get_last(self) -> Optional[View]: warnings.warn('get_last() is deprecated, use get_last_contentful_view() instead.', DeprecationWarning) return self.get_last_contentful_view() @@ -183,11 +183,11 @@ def serialize(self, pretty: bool = False, sanitize: bool = False, autogenerate_c """ Serializes the MMIF object to a JSON string. - :param sanitize: If True, performs some sanitization of before returning + :param sanitize: If True, performs some sanitization of before returning the JSON string. See :meth:`sanitize` for details. - :param autogenerate_capital_annotations: If True, automatically convert - any "pending" temporary properties from `Document` objects to - `Annotation` objects. See :meth:`generate_capital_annotations` for + :param autogenerate_capital_annotations: If True, automatically convert + any "pending" temporary properties from `Document` objects to + `Annotation` objects. See :meth:`generate_capital_annotations` for details. :param pretty: If True, returns string representation with indentation. :return: JSON string of the MMIF object. @@ -202,24 +202,24 @@ def serialize(self, pretty: bool = False, sanitize: bool = False, autogenerate_c def _deserialize(self, input_dict: dict) -> None: """ Deserializes the MMIF JSON string into a Mmif object. - After *regular* deserialization, this method will perform the following - *special* handling of Annotation.properties that allows apps to access - Annotation/Document properties that are not encoded in the objects - themselves. This is to allow apps to access in a more intuitive way, + After *regular* deserialization, this method will perform the following + *special* handling of Annotation.properties that allows apps to access + Annotation/Document properties that are not encoded in the objects + themselves. This is to allow apps to access in a more intuitive way, without having too much hassle to iterate views and manually collect the properties. - + 1. This will read in existing *view*-scoped properties from *contains* metadata and attach them to the corresponding ``Annotation`` objects. - 1. This will read in existing ``Annotation`` typed annotations and - attach the document-level properties to the ``Document`` objects, - using an ephemeral property dict. - + 1. This will read in existing ``Annotation`` typed annotations and + attach the document-level properties to the ``Document`` objects, + using an ephemeral property dict. + """ super()._deserialize(input_dict) for view in self.views: view._parent_mmif = self - # this dict will be populated with properties + # this dict will be populated with properties # that are not encoded in individual annotations objects themselves extrinsic_props = defaultdict(dict) for at_type, type_lv_props in view.metadata.contains.items(): @@ -231,7 +231,7 @@ def _deserialize(self, input_dict: dict) -> None: # as "ephemeral" properties for prop_key, prop_value in extrinsic_props[ann.at_type].items(): ann._props_ephemeral[prop_key] = prop_value - # then, do the same to associated Document objects. Note that, + # then, do the same to associated Document objects. Note that, # in a view, it is guaranteed that all Annotation objects are not duplicates if ann.at_type == AnnotationTypes.Annotation: doc_id = ann.get_property('document') @@ -241,7 +241,7 @@ def _deserialize(self, input_dict: dict) -> None: except KeyError: warnings.warn(f"Annotation {ann.id} (in view {view.id}) has a document ID {doc_id} that " f"does not exist in the MMIF object. Skipping.", RuntimeWarning) - + ## caching start and end points for time-based annotations # add quick access to `start` and `end` values if the annotation is using `targets` property if 'targets' in ann.properties: @@ -250,11 +250,11 @@ def _deserialize(self, input_dict: dict) -> None: f"properties at the same time. Annotation anchors are ambiguous.") ann._props_ephemeral['start'] = self._get_linear_anchor_point(ann, start=True) ann._props_ephemeral['end'] = self._get_linear_anchor_point(ann, start=False) - + ## caching alignments if ann.at_type == AnnotationTypes.Alignment: self._cache_alignment(ann) - + def _cache_alignment(self, alignment_ann: Annotation): view = self.views.get(alignment_ann.parent) if view is None: @@ -282,31 +282,31 @@ def _desprately_search_annotation_object(ann_short_id): def generate_capital_annotations(self): """ - Automatically convert any "pending" temporary properties from - `Document` objects to `Annotation` objects . The generated `Annotation` - objects are then added to the last `View` in the views lists. - + Automatically convert any "pending" temporary properties from + `Document` objects to `Annotation` objects . The generated `Annotation` + objects are then added to the last `View` in the views lists. + See https://github.com/clamsproject/mmif-python/issues/226 for rationale behind this behavior and discussion. """ # this view will be the default kitchen sink for all generated annotations. last_view = self.views.get_last_contentful_view() - + # proceed only when there's at least one view if last_view: - + # this app name is used to check a view is generated by the "currently running" app. - # knowing the currently running app is important so that properties of `Document` objects generated by the - # current app can be properly recorded inside the `Document` objects (since they are "writable" to the + # knowing the currently running app is important so that properties of `Document` objects generated by the + # current app can be properly recorded inside the `Document` objects (since they are "writable" to the # current app), instead of being recorded in a separate `Annotation` object. current_app = last_view.metadata.app # to avoid duplicate property recording, this will be populated with # existing Annotation objects from all existing views existing_anns = defaultdict(lambda: defaultdict(dict)) - # ideally, if we can "de-duplicate" props at `add_property()` time, that'd be more efficient, + # ideally, if we can "de-duplicate" props at `add_property()` time, that'd be more efficient, # but that is impossible without looking for the target `document` across other views and top documents list - + # new properties to record in the current serialization call anns_to_write = defaultdict(dict) for view in self.views: @@ -326,7 +326,7 @@ def generate_capital_annotations(self): anns_to_write[doc.long_id].update(doc._props_pending) for doc in self.documents: anns_to_write[doc.long_id].update(doc._props_pending) - # additional iteration of views, to find a proper view to add the + # additional iteration of views, to find a proper view to add the # generated annotations. If none found, use the last view as the kitchen sink last_view_for_docs = defaultdict(lambda: last_view) doc_ids = set(anns_to_write.keys()) @@ -345,8 +345,8 @@ def generate_capital_annotations(self): last_view_for_docs[doc_id] = view break for doc_id, found_props in anns_to_write.items(): - # ignore the "empty" id property from temporary dict - # `id` is "required" attribute for `AnnotationProperty` class + # ignore the "empty" id property from temporary dict + # `id` is "required" attribute for `AnnotationProperty` class # thus will always be present in `props` dict as a key with emtpy value # also ignore duplicate k-v pairs props = {} @@ -371,10 +371,10 @@ def sanitize(self): """ Sanitizes a Mmif object by running some safeguards. Concretely, it performs the following before returning the JSON string. - + #. validating output using built-in MMIF jsonschema #. remove non-existing annotation types from ``contains`` metadata - + """ for view in self.views: existing_at_types = set(annotation.at_type for annotation in view.annotations) @@ -532,7 +532,7 @@ def get_document_by_id(self, doc_id: str) -> Document: vid, did = doc_id.split(self.id_delimiter) view = self[vid] if isinstance(view, View): - return view.get_document_by_id(did) + return view.get_document_by_id(did) else: raise KeyError("{} view not found".format(vid)) else: @@ -567,7 +567,7 @@ def get_alignments(self, at_type1: Union[str, ThingTypesBase], at_type2: Union[s alignments = [] contains_meta = alignment_view.metadata.contains[AnnotationTypes.Alignment] if 'sourceType' in contains_meta and 'targetType' in contains_meta: - aligned_types = [ThingTypesBase.from_str(x) + aligned_types = [ThingTypesBase.from_str(x) for x in {contains_meta['sourceType'], contains_meta['targetType']}] if len(aligned_types) == 2 and at_type1 in aligned_types and at_type2 in aligned_types: alignments.extend(alignment_view.annotations) @@ -602,7 +602,7 @@ def get_views_for_document(self, doc_id: str) -> List[View]: next(annotations) views.append(view) except StopIteration: - # means search failed by the full doc_id string, + # means search failed by the full doc_id string, # now try trimming the view_id from the string and re-do the search if self.id_delimiter in doc_id: vid, did = doc_id.split(self.id_delimiter) @@ -618,14 +618,14 @@ def get_views_for_document(self, doc_id: str) -> List[View]: def get_all_views_with_error(self) -> List[View]: """ - Returns the list of all views in the MMIF that have errors. - + Returns the list of all views in the MMIF that have errors. + :return: the list of views that contain errors but no annotations """ return [v for v in self.views if v.has_error()] - + get_views_with_error = get_all_views_with_error - + def get_all_views_contain(self, *at_types: Union[ThingTypesBase, str]) -> List[View]: """ Returns the list of all views in the MMIF if given types @@ -638,22 +638,22 @@ def get_all_views_contain(self, *at_types: Union[ThingTypesBase, str]) -> List[V if all(map(lambda x: x in view.metadata.contains, at_types))] get_views_contain = get_all_views_contain - + def get_view_with_error(self) -> Optional[View]: """ Returns the last view appended that contains an error. - + :return: the view, or None if no error is found """ for view in reversed(self.views): if view.has_error(): return view return None - + def get_last_error(self) -> Optional[str]: """ Returns the last error message found in the views. - + :return: the error message in human-readable format, or None if no error is found """ v = self.get_view_with_error() @@ -680,7 +680,7 @@ def get_view_contains(self, at_types: Union[ThingTypesBase, str, List[Union[str, def _is_in_time_range(self, ann: Annotation, range_s: Union[int, float], range_e: Union[int, float]) -> bool: """ - Checks if the annotation is anchored within the given time range. Any overlap is considered included. + Checks if the annotation is anchored within the given time range. Any overlap is considered included. :param ann: the Annotation object to check, must be time-based itself or anchored to time-based annotations :param range_s: the start time point of the range (in milliseconds) @@ -705,7 +705,7 @@ def get_annotations_between_time(self, start: Union[int, float], end: Union[int, assert start < end, f"Start time point must be smaller than the end time point, given {start} and {end}" assert start >= 0, f"Start time point must be non-negative, given {start}" assert end >= 0, f"End time point must be non-negative, given {end}" - + from mmif.utils.timeunit_helper import convert time_anchors_in_range = [] @@ -713,7 +713,7 @@ def get_annotations_between_time(self, start: Union[int, float], end: Union[int, for view in self.get_all_views_contain(AnnotationTypes.TimeFrame) + self.get_all_views_contain(AnnotationTypes.TimePoint): time_unit_in_view = view.metadata.contains.get(AnnotationTypes.TimeFrame)["timeUnit"] - + start_time = convert(start, time_unit, time_unit_in_view, 1) end_time = convert(end, time_unit, time_unit_in_view, 1) for ann in view.get_annotations(): @@ -730,9 +730,9 @@ def get_annotations_between_time(self, start: Union[int, float], end: Union[int, def _get_linear_anchor_point(self, ann: Annotation, targets_sorted=False, start: bool = True) -> Union[int, float]: # TODO (krim @ 2/5/24): Update the return type once timeunits are unified to `ms` as integers (https://github.com/clamsproject/mmif/issues/192) """ - Retrieves the anchor point of the annotation. Currently, this method only supports linear anchors, + Retrieves the anchor point of the annotation. Currently, this method only supports linear anchors, namely time and text, hence does not work with spatial anchors (polygons or video-object). - + :param ann: An Annotation object that has a linear anchor point. Namely, some subtypes of `Region` vocabulary type. :param start: If True, returns the start anchor point. Otherwise, returns the end anchor point. N/A for `timePoint` anchors. :param targets_sorted: If True, the method will assume that the targets are sorted in the order of the anchor points. @@ -742,12 +742,12 @@ def _get_linear_anchor_point(self, ann: Annotation, targets_sorted=False, start: if 'timePoint' in props: return ann.get_property('timePoint') elif 'targets' in props: - + def get_target_ann(cur_ann, target_id): if self.id_delimiter not in target_id: target_id = self.id_delimiter.join((cur_ann.parent, target_id)) return self.__getitem__(target_id) - + if not targets_sorted: point = math.inf if start else -1 comp = min if start else max @@ -762,13 +762,13 @@ def get_target_ann(cur_ann, target_id): return ann.get_property('start' if start else 'end') else: raise ValueError(f"{ann.id} ({ann.at_type}) does not have a valid anchor point. Is it a valid 'Region' type?") - + def get_start(self, annotation: Annotation) -> Union[int, float]: """ An alias to `get_anchor_point` method with `start=True`. """ return self._get_linear_anchor_point(annotation, start=True) - + def get_end(self, annotation: Annotation) -> Union[int, float]: """ An alias to `get_anchor_point` method with `start=False`. @@ -778,12 +778,12 @@ def get_end(self, annotation: Annotation) -> Union[int, float]: def __getitem__(self, item: str) \ -> Union[Document, View, Annotation, MmifMetadata, DocumentsList, ViewsList]: """ - getitem implementation for Mmif. This will try to find any object, given an identifier or an immediate - attribute name. When nothing is found, this will raise an error rather than returning a None + getitem implementation for Mmif. This will try to find any object, given an identifier or an immediate + attribute name. When nothing is found, this will raise an error rather than returning a None :raises KeyError: if the item is not found or if the search results are ambiguous - :param item: an attribute name or an object identifier (a document ID, a view ID, or an annotation ID). When - annotation ID is given as a "short" ID (without view ID prefix), the method will try to find a + :param item: an attribute name or an object identifier (a document ID, a view ID, or an annotation ID). When + annotation ID is given as a "short" ID (without view ID prefix), the method will try to find a match from the first view, and return immediately if found. :return: the object searched for :raise KeyError: if the item is not found or multiple objects are found with the same ID @@ -813,3 +813,21 @@ def __getitem__(self, item: str) \ raise KeyError("ID not found: %s" % item) else: return found[-1] + + def get(self, key: str, default=None) -> Optional[Union[Document, + View, + Annotation, + MmifMetadata, + DocumentsList, + ViewsList]]: + """ + A get method that returns the default value if the key is not found. + + :param key: the key to search for + :param default: the default value to return if the key is not found + :return: the value of the key, or the default value if the key is not found + """ + try: + return self.__getitem__(key) + except KeyError: + return default \ No newline at end of file diff --git a/mmif/serialize/model.py b/mmif/serialize/model.py index 3e49fbad..7071802a 100644 --- a/mmif/serialize/model.py +++ b/mmif/serialize/model.py @@ -44,49 +44,49 @@ class MmifObject(object): 1. _unnamed_attributes: Only can be either None or an empty dictionary. If it's set to None, - it means the class won't take any ``Additional Attributes`` in the - JSON schema sense. If it's an empty dict, users can throw any k-v - pairs to the class, as long as the key is not a "reserved" name, + it means the class won't take any ``Additional Attributes`` in the + JSON schema sense. If it's an empty dict, users can throw any k-v + pairs to the class, as long as the key is not a "reserved" name, and those additional attributes will be stored in this dict while - in memory. + in memory. 2. _attribute_classes: This is a dict from a key name to a specific python class to use for deserialize the value. Note that a key name in this dict does NOT have to be a *named* attribute, but is recommended to be one. 3. _required_attributes: - This is a simple list of names of attributes that are required in - the object. When serialize, an object will skip its *empty* (e.g. - zero-length, or None) attributes unless they are in this list. - Otherwise, the serialized JSON string would have empty + This is a simple list of names of attributes that are required in + the object. When serialize, an object will skip its *empty* (e.g. + zero-length, or None) attributes unless they are in this list. + Otherwise, the serialized JSON string would have empty representations (e.g. ``""``, ``[]``). 4. _exclude_from_diff: - This is a simple list of names of attributes that should be excluded - from the diff calculation in ``__eq__``. + This is a simple list of names of attributes that should be excluded + from the diff calculation in ``__eq__``. # TODO (krim @ 8/17/20): this dict is however, a duplicate with the type hints in the class definition. - Maybe there is a better way to utilize type hints (e.g. getting them - as a programmatically), but for now developers should be careful to + Maybe there is a better way to utilize type hints (e.g. getting them + as a programmatically), but for now developers should be careful to add types to hints as well as to this dict. Also note that those special attributes MUST be set in the __init__() before calling super method, otherwise deserialization will not work. And also, a subclass that has one or more *named* attributes, it must - set those attributes in the __init__() before calling super method. - When serializing a MmifObject, all *empty* attributes will be ignored, - so for optional named attributes, you must leave the values empty - (len == 0), but NOT None. Any None-valued named attributes will cause + set those attributes in the __init__() before calling super method. + When serializing a MmifObject, all *empty* attributes will be ignored, + so for optional named attributes, you must leave the values empty + (len == 0), but NOT None. Any None-valued named attributes will cause issues with current implementation. :param mmif_obj: JSON string or `dict` to initialize an object. If not given, an empty object will be initialized, sometimes with an ID value automatically generated, based on its parent object. """ - + view_prefix: ClassVar[str] = 'v_' id_delimiter: ClassVar[str] = ':' - # these are the reserved names that cannot be used as attribute names, and + # these are the reserved names that cannot be used as attribute names, and # they won't be serialized reserved_names: Set[str] = { 'reserved_names', @@ -274,9 +274,9 @@ def __eq__(self, other) -> bool: def __len__(self) -> int: """ - Returns number of attributes that are not *empty*. + Returns number of attributes that are not *empty*. """ - return (sum([named in self and not self.is_empty(self[named]) for named in self._named_attributes()]) + return (sum([named in self and not self.is_empty(self[named]) for named in self._named_attributes()]) + (len(self._unnamed_attributes) if self._unnamed_attributes else 0)) def __setitem__(self, key, value) -> None: @@ -307,13 +307,28 @@ def __getitem__(self, key) -> Union['MmifObject', str, datetime]: value = self.__dict__[key] elif self._unnamed_attributes is None: raise AttributeError(f"Additional properties are disallowed by {self.__class__}: {key}") - else: + else: value = self._unnamed_attributes[key] if key not in self._required_attributes and self.is_empty(value): raise KeyError(f"Property not found: {key} (is it set?)") - else: + else: return value + def get(self, key, default=None) -> Optional[Union['MmifObject', str, datetime]]: + """ + The dict-like get(key, default) method. + If the key is not found, it returns the default value instead of raising KeyError. + Note: Although KeyError is not raised, AttributeError can be raised if the key is not disallowed. + + :param key: the key to search for + :param default: the default value to return if the key is not found + :return: the value matching that key or the default value + """ + try: + return self.__getitem__(key) + except KeyError: + return default + class MmifObjectEncoder(json.JSONEncoder): """ @@ -362,14 +377,14 @@ def deserialize(self, mmif_json: Union[str, list]) -> None: # pytype: disable=s """ Passes the input data into the internal deserializer. """ - super().deserialize(mmif_json) + super().deserialize(mmif_json) @staticmethod def _load_json(json_list: Union[list, str]) -> list: if type(json_list) is str: json_list = json.loads(json_list) return [MmifObject._load_json(obj) for obj in json_list] - + def _deserialize(self, input_list: list) -> None: raise NotImplementedError() @@ -492,6 +507,6 @@ def __len__(self): def __contains__(self, item): return item in self._items - + def empty(self): self._items = {} diff --git a/mmif/serialize/view.py b/mmif/serialize/view.py index 58413f41..be557113 100644 --- a/mmif/serialize/view.py +++ b/mmif/serialize/view.py @@ -37,7 +37,7 @@ def __init__(self, view_obj: Optional[Union[bytes, str, dict]] = None, parent_mm self._parent_mmif = parent_mmif self.reserved_names.update(("_parent_mmif", "_id_counts")) self._exclude_from_diff = {"_id_counts"} - + self.id: str = '' self.metadata: ViewMetadata = ViewMetadata() self.annotations: AnnotationsList = AnnotationsList() @@ -63,7 +63,7 @@ def new_contain(self, at_type: Union[str, ThingTypesBase], **contains_metadata) raise ValueError("@type must not be empty.") else: return self.metadata.new_contain(at_type, **contains_metadata) - + def _set_ann_id(self, annotation: Annotation, identifier): if identifier is not None: annotation.id = identifier @@ -73,8 +73,8 @@ def _set_ann_id(self, annotation: Annotation, identifier): new_id = f'{prefix}_{new_num}' self._id_counts[prefix] = new_num annotation.id = new_id - - def new_annotation(self, at_type: Union[str, ThingTypesBase], aid: Optional[str] = None, + + def new_annotation(self, at_type: Union[str, ThingTypesBase], aid: Optional[str] = None, overwrite=False, **properties) -> 'Annotation': """ Generates a new :class:`mmif.serialize.annotation.Annotation` @@ -84,9 +84,9 @@ def new_annotation(self, at_type: Union[str, ThingTypesBase], aid: Optional[str] in the view, unless ``overwrite`` is set to True. :param at_type: the desired ``@type`` of the annotation. - :param aid: the desired ID of the annotation, when not given, - the mmif SDK tries to automatically generate an ID based on - Annotation type and existing annotations in the view. + :param aid: the desired ID of the annotation, when not given, + the mmif SDK tries to automatically generate an ID based on + Annotation type and existing annotations in the view. :param overwrite: if set to True, will overwrite an existing annotation with the same ID. :raises KeyError: if ``overwrite`` is set to False and @@ -125,8 +125,8 @@ def add_annotation(self, annotation: 'Annotation', overwrite=False) -> 'Annotati if annotation.at_type == AnnotationTypes.Alignment: self._parent_mmif._cache_alignment(annotation) return annotation - - def new_textdocument(self, text: str, lang: str = "en", did: Optional[str] = None, + + def new_textdocument(self, text: str, lang: str = "en", did: Optional[str] = None, overwrite=False, **properties) -> 'Document': """ Generates a new :class:`mmif.serialize.annotation.Document` @@ -137,9 +137,9 @@ def new_textdocument(self, text: str, lang: str = "en", did: Optional[str] = Non :param text: text content of the new document :param lang: ISO 639-1 code of the language used in the new document - :param did: the desired ID of the document, when not given, - the mmif SDK tries to automatically generate an ID based on - Annotation type and existing documents in the view. + :param did: the desired ID of the document, when not given, + the mmif SDK tries to automatically generate an ID based on + Annotation type and existing documents in the view. :param overwrite: if set to True, will overwrite an existing document with the same ID :raises KeyError: if ``overwrite`` is set to False and @@ -170,7 +170,7 @@ def add_document(self, document: Document, overwrite=False) -> Annotation: """ return self.add_annotation(document, overwrite) - def get_annotations(self, at_type: Optional[Union[str, ThingTypesBase]] = None, + def get_annotations(self, at_type: Optional[Union[str, ThingTypesBase]] = None, **properties) -> Generator[Annotation, None, None]: """ Look for certain annotations in this view, specified by parameters @@ -186,10 +186,10 @@ def prop_check(k, v, *props): for annotation in self.annotations: at_type_metadata = self.metadata.contains.get(annotation.at_type, {}) if not at_type or (at_type and annotation.at_type == at_type): - if all(map(lambda kv: prop_check(kv[0], kv[1], annotation.properties, at_type_metadata), + if all(map(lambda kv: prop_check(kv[0], kv[1], annotation.properties, at_type_metadata), properties.items())): yield annotation - + def get_annotation_by_id(self, ann_id) -> Annotation: if self.id_delimiter in ann_id and not ann_id.startswith(self.id): try: @@ -205,7 +205,7 @@ def get_annotation_by_id(self, ann_id) -> Annotation: raise KeyError(f"Annotation \"{ann_id}\" is not found in view {self.id}.") else: return ann_found - + def get_documents(self) -> List[Document]: return [cast(Document, annotation) for annotation in self.annotations if annotation.is_document()] @@ -238,15 +238,28 @@ def __getitem__(self, key: str) -> 'Annotation': if not anno_result: raise KeyError("Annotation ID not found: %s" % key) return anno_result - + + def get(self, key: str, default=None) -> Optional['Annotation']: + """ + get implementation for View. + + :param key: the search string. + :param default: the default value to return if the key is not found + :return: the :class:`mmif.serialize.annotation.Annotation` object searched for + """ + try: + return self.__getitem__(key) + except KeyError: + return default + def set_error(self, err_message: str, err_trace: str) -> None: self.metadata.set_error(err_message, err_trace) self.annotations.empty() - + def get_error(self) -> Optional[str]: """ - Get the "text" representation of the error occurred during - processing. Text representation is supposed to be human-readable. + Get the "text" representation of the error occurred during + processing. Text representation is supposed to be human-readable. When ths view does not have any error, returns None. """ if self.has_error(): @@ -283,26 +296,26 @@ def __init__(self, viewmetadata_obj: Optional[Union[bytes, str, dict]] = None, * 'contains': ContainsDict } # in theory, *oneOf* `contains`, `error`, or `warnings` should appear in a `view` - # but with current implementation, there's no easy way to set a condition - # for `oneOf` requirement - # see MmifObject::_required_attributes in model.py + # but with current implementation, there's no easy way to set a condition + # for `oneOf` requirement + # see MmifObject::_required_attributes in model.py # also see this class' `_serialize()` override implementation super().__init__(viewmetadata_obj) def _serialize(self, alt_container: Optional[Dict] = None) -> dict: serialized = super()._serialize() - # `_serialize()` eliminates any *empty* attributes, so + # `_serialize()` eliminates any *empty* attributes, so # when no "contains", "errors", nor "warnings", at least add an empty contains back if not (self.contains.items() or self.error or self.warnings): serialized['contains'] = {} return serialized - + def has_error(self) -> bool: return len(self.error) > 0 - + def has_warnings(self): return len(self.warnings) > 0 - + def get_error_as_text(self) -> str: if self.has_error(): if isinstance(self.error, ErrorDict): @@ -313,7 +326,7 @@ def get_error_as_text(self) -> str: return f"Error (unknown error format): {self.error}" else: raise KeyError(f"No error found") - + def new_contain(self, at_type: Union[str, ThingTypesBase], **contains_metadata) -> Optional['Contain']: """ Adds a new element to the ``contains`` dictionary. @@ -324,12 +337,12 @@ def new_contain(self, at_type: Union[str, ThingTypesBase], **contains_metadata) """ if isinstance(at_type, str): at_type = ThingTypesBase.from_str(at_type) - + if at_type not in self.contains: new_contain = Contain(contains_metadata) self.add_contain(new_contain, at_type) return new_contain - + def add_contain(self, contain: 'Contain', at_type: Union[str, ThingTypesBase]) -> None: self.contains[at_type] = contain @@ -371,11 +384,11 @@ def get_parameter(self, param_key: str) -> str: return self.parameters[param_key] except KeyError: raise KeyError(f"parameter \"{param_key}\" is not set in the view: {self.serialize()}") - + def set_error(self, message: str, stack_trace: str): self.error = ErrorDict({"message": message, "stackTrace": stack_trace}) self.contains.empty() - + def add_warnings(self, *warnings: Warning): for warning in warnings: self.warnings.append(f'{warning.__class__.__name__}: {" - ".join(warning.args)}') @@ -386,16 +399,16 @@ def emtpy_warnings(self): class ErrorDict(MmifObject): """ - Error object that stores information about error occurred during processing. + Error object that stores information about error occurred during processing. """ def __init__(self, error_obj: Optional[Union[bytes, str, dict]] = None, *_) -> None: self.message: str = '' self.stackTrace: str = '' super().__init__(error_obj) - + def __str__(self): return f"({self.message})\n\n{self.stackTrace}" - + class Contain(DataDict[str, str]): """ @@ -440,11 +453,11 @@ def append(self, value: Union[Annotation, Document], overwrite=False) -> None: :return: None """ super()._append_with_key(value.id, value, overwrite) - + def __getitem__(self, key: str): """ specialized getter implementation to workaround https://github.com/clamsproject/mmif/issues/228 - # TODO (krim @ 7/12/24): annotation ids must be in the long form in the future, so this check will be unnecessary once https://github.com/clamsproject/mmif/issues/228 is resolved. + # TODO (krim @ 7/12/24): annotation ids must be in the long form in the future, so this check will be unnecessary once https://github.com/clamsproject/mmif/issues/228 is resolved. """ if ":" in key: _, aid = key.split(":") @@ -465,12 +478,12 @@ def update(self, other: Union[dict, 'ContainsDict'], overwrite=False): if isinstance(k, str): k = ThingTypesBase.from_str(k) self._append_with_key(k, v, overwrite=overwrite) - + def get(self, key: Union[str, ThingTypesBase], default=None): if isinstance(key, str): key = ThingTypesBase.from_str(key) return self._items.get(key, default) - + def __contains__(self, item: Union[str, ThingTypesBase]): if isinstance(item, str): string_keys = [str(k) for k in self._items.keys()] From 70c4843b20d000e3dd5923da7e20e7d5aed450a8 Mon Sep 17 00:00:00 2001 From: Bohan Jiang Date: Wed, 21 Aug 2024 22:23:02 -0400 Subject: [PATCH 2/3] Updated list-like property for target classes --- mmif/serialize/annotation.py | 6 +-- mmif/serialize/mmif.py | 31 +++++----------- mmif/serialize/model.py | 72 +++++++++++++++++++++++++----------- mmif/serialize/view.py | 16 ++------ 4 files changed, 67 insertions(+), 58 deletions(-) diff --git a/mmif/serialize/annotation.py b/mmif/serialize/annotation.py index 8436805d..2596137a 100644 --- a/mmif/serialize/annotation.py +++ b/mmif/serialize/annotation.py @@ -259,11 +259,11 @@ def _get_label(self) -> str: is preferred. """ if 'label' in self: - return str(self.get('label')) + return str(self.get_property('label')) elif self._type.shortname == 'TimeFrame' and 'frameType' in self: - return str(self.get('frameType')) + return str(self.get_property('frameType')) elif self._type.shortname == 'BoundingBox' and 'boxType' in self: - return str(self.get('boxType')) + return str(self.get_property('boxType')) else: raise KeyError("No label found in this annotation.") diff --git a/mmif/serialize/mmif.py b/mmif/serialize/mmif.py index a357853d..bd268ab9 100644 --- a/mmif/serialize/mmif.py +++ b/mmif/serialize/mmif.py @@ -123,17 +123,6 @@ def get_last_contentful_view(self) -> Optional[View]: if 'error' not in view.metadata and 'warnings' not in view.metadata: return view - def get_last_view(self) -> Optional[View]: - """ - Returns the last view appended. - """ - if self._items: - return self._items[list(self._items.keys())[-1]] - - def get_last(self) -> Optional[View]: - warnings.warn('get_last() is deprecated, use get_last_contentful_view() instead.', DeprecationWarning) - return self.get_last_contentful_view() - class Mmif(MmifObject): """ @@ -256,7 +245,7 @@ def _deserialize(self, input_dict: dict) -> None: self._cache_alignment(ann) def _cache_alignment(self, alignment_ann: Annotation): - view = self.views.get(alignment_ann.parent) + view = self.views.get_item(alignment_ann.parent) if view is None: warnings.warn(f"Alignment {alignment_ann.long_id} doesn't have a parent view, but it should.", RuntimeWarning) return @@ -355,7 +344,7 @@ def generate_capital_annotations(self): props[k] = v if props: view_to_write = last_view_for_docs[doc_id] - if view_to_write.metadata.app == current_app and view_to_write.annotations.get(doc_id) is not None: + if view_to_write.metadata.app == current_app and view_to_write.annotations.get_item(doc_id) is not None: view_to_write.get_document_by_id(doc_id).properties.update(props) else: if len(anns_to_write) == 1: @@ -446,7 +435,7 @@ def get_documents_in_view(self, vid: Optional[str] = None) -> List[Document]: :param vid: the source view ID to search for :return: a list of documents matching the requested source view ID, or an empty list if the view not found """ - view = self.views.get(vid) + view = self.views.get_item(vid) if view is not None: return view.get_documents() else: @@ -490,7 +479,7 @@ def get_documents_by_property(self, prop_key: str, prop_value: str) -> List[Docu docs = [] for view in self.views: for doc in view.get_documents(): - if prop_key in doc and doc.get(prop_key) == prop_value: + if prop_key in doc and doc.get_property(prop_key) == prop_value: docs.append(doc) docs.extend([document for document in self.documents if document[prop_key] == prop_value]) return docs @@ -536,7 +525,7 @@ def get_document_by_id(self, doc_id: str) -> Document: else: raise KeyError("{} view not found".format(vid)) else: - doc_found = self.documents.get(doc_id) + doc_found = self.documents.get_item(doc_id) if doc_found is None: raise KeyError("{} document not found".format(doc_id)) return cast(Document, doc_found) @@ -549,7 +538,7 @@ def get_view_by_id(self, req_view_id: str) -> View: :return: a reference to the corresponding view, if it exists :raises Exception: if there is no corresponding view """ - result = self.views.get(req_view_id) + result = self.views.get_item(req_view_id) if result is None: raise KeyError("{} view not found".format(req_view_id)) return result @@ -795,14 +784,14 @@ def __getitem__(self, item: str) \ found = [] if len(split_attempt) == 1: - found.append(self.documents.get(split_attempt[0])) - found.append(self.views.get(split_attempt[0])) + found.append(self.documents.get_item(split_attempt[0])) + found.append(self.views.get_item(split_attempt[0])) for view in self.views: - found.append(view.annotations.get(split_attempt[0])) + found.append(view.annotations.get_item(split_attempt[0])) elif len(split_attempt) == 2: v = self.get_view_by_id(split_attempt[0]) if v is not None: - found.append(v.annotations.get(split_attempt[1])) + found.append(v.annotations.get_item(split_attempt[1])) else: raise KeyError("Tried to subscript into a view that doesn't exist") found = [x for x in found if x is not None] diff --git a/mmif/serialize/model.py b/mmif/serialize/model.py index 7071802a..97708de2 100644 --- a/mmif/serialize/model.py +++ b/mmif/serialize/model.py @@ -14,7 +14,8 @@ import json from datetime import datetime -from typing import Union, Any, Dict, Optional, TypeVar, Generic, Generator, Iterator, Type, Set, ClassVar +from collections import OrderedDict +from typing import Union, Any, Dict, Optional, TypeVar, Generic, Generator, Iterator, Type, Set, ClassVar, List from deepdiff import DeepDiff @@ -325,7 +326,7 @@ def get(self, key, default=None) -> Optional[Union['MmifObject', str, datetime]] :return: the value matching that key or the default value """ try: - return self.__getitem__(key) + return self[key] except KeyError: return default @@ -388,18 +389,24 @@ def _load_json(json_list: Union[list, str]) -> list: def _deserialize(self, input_list: list) -> None: raise NotImplementedError() - def get(self, key: str) -> Optional[T]: + def get_item(self, key: str) -> Optional[T]: """ - Standard dictionary-style get() method, albeit with no ``default`` - parameter. Relies on the implementation of __getitem__. - - Will return ``None`` if the key is not found. + Mimic to standard dictionary-style get() method, + albeit with no ``default`` parameter (which will return ``None``). + However, it doesn't rely on the implementation of __getitem__, + so it can be understood as a backdoor underneath the list property of the class. :param key: the key to search for :return: the value matching that key """ + key_id = None + if ":" in key: + _, key_id = key.split(":") try: - return self[key] + if key_id: + return self._items[key_id] + else: + return self._items[key] except KeyError: return None @@ -426,28 +433,51 @@ def _append_with_key(self, key: str, value: T, overwrite=False) -> None: def append(self, value, overwrite): raise NotImplementedError() - def __getitem__(self, key: str) -> T: - if key not in self.reserved_names: - return self._items.__getitem__(key) - else: - raise KeyError("Don't use __getitem__ to access a reserved name") + def __getitem__(self, index: int) -> T: + """ + Index-based access for list-like behavior. - def __setitem__(self, key: str, value: T): - if key not in self.reserved_names: - self._items.__setitem__(key, value) - else: - super().__setitem__(key, value) + FIXME: + -------- + Since this implementation makes use of python's built-in list, + I didn't explicitly handle the `IndexError` manually. Instead, + I depended on the built-in list's behavior to raise the error. + -------- + + :param idx: the index of the item to access + :return: the item at the specified index + """ + return list(self._items.values()).__getitem__(index) + + def __setitem__(self, index: int, value: T) -> None: + """ + Set up the item at the specified index. + + IMPORTANT: + -------- + For any subclass that needs to implement this method, it's list-like but + assumed to be read-only. That's to say, once the data list is built, the + items are not supposed to be changed. If any attempt that changes the items, + an error would be raised by this method. + -------- + + :param index: the index of the item to set + :param value: the value to set at the specified index + + :raise TypeError: the list is assumed to be read-only + """ + raise TypeError("The list is read-only.") def __iter__(self) -> Iterator[T]: - return self._items.values().__iter__() + return self._items.__iter__() def __len__(self) -> int: return self._items.__len__() def __reversed__(self) -> Iterator[T]: - return reversed(self._items.values()) + return reversed(self._items) - def __contains__(self, item) -> bool: + def __contains__(self, item: T) -> bool: return item in self._items def empty(self): diff --git a/mmif/serialize/view.py b/mmif/serialize/view.py index be557113..cdc68281 100644 --- a/mmif/serialize/view.py +++ b/mmif/serialize/view.py @@ -197,7 +197,7 @@ def get_annotation_by_id(self, ann_id) -> Annotation: except KeyError: ann_found = None else: - ann_found = self.annotations.get(ann_id.split(self.id_delimiter)[-1]) + ann_found = self.annotations.get_item(ann_id.split(self.id_delimiter)[-1]) if ann_found is None or not isinstance(ann_found, Annotation): if self.id_delimiter in ann_id: raise KeyError(f"Annotation \"{ann_id}\" is not found in the MMIF.") @@ -210,7 +210,7 @@ def get_documents(self) -> List[Document]: return [cast(Document, annotation) for annotation in self.annotations if annotation.is_document()] def get_document_by_id(self, doc_id) -> Document: - doc_found = self.annotations.get(doc_id) + doc_found = self.annotations.get_item(doc_id) if doc_found is None or not isinstance(doc_found, Document): raise KeyError(f"Document \"{doc_id}\" not found in view {self.id}.") else: @@ -234,7 +234,7 @@ def __getitem__(self, key: str) -> 'Annotation': """ if key in self._named_attributes(): return self.__dict__[key] - anno_result = self.annotations.get(key) + anno_result = self.annotations.get_item(key) if not anno_result: raise KeyError("Annotation ID not found: %s" % key) return anno_result @@ -454,16 +454,6 @@ def append(self, value: Union[Annotation, Document], overwrite=False) -> None: """ super()._append_with_key(value.id, value, overwrite) - def __getitem__(self, key: str): - """ - specialized getter implementation to workaround https://github.com/clamsproject/mmif/issues/228 - # TODO (krim @ 7/12/24): annotation ids must be in the long form in the future, so this check will be unnecessary once https://github.com/clamsproject/mmif/issues/228 is resolved. - """ - if ":" in key: - _, aid = key.split(":") - return self._items.__getitem__(aid) - return self._items.get(key, None) - class ContainsDict(DataDict[ThingTypesBase, Contain]): From 0a4ce6c4ec8feb09c8a07a5d724d8bf2db4d6207 Mon Sep 17 00:00:00 2001 From: Bohan Jiang Date: Fri, 23 Aug 2024 15:26:36 -0400 Subject: [PATCH 3/3] Fixed and updated features proposed in PR#301 for both unittests and codebase --- documentation/target-versions.csv | 1 + mmif/serialize/annotation.py | 8 +--- mmif/serialize/mmif.py | 8 ++-- mmif/serialize/model.py | 6 +-- mmif/utils/sequence_helper.py | 11 +++-- tests/test_serialize.py | 71 ++++++++++++++++--------------- tests/test_utils.py | 8 +++- tests/test_vocab.py | 2 +- 8 files changed, 61 insertions(+), 54 deletions(-) diff --git a/documentation/target-versions.csv b/documentation/target-versions.csv index 361ce8df..1a718c40 100644 --- a/documentation/target-versions.csv +++ b/documentation/target-versions.csv @@ -1,4 +1,5 @@ "``mmif-python`` version","Target MMIF Specification" +1.0.20.dev1,"1.0.5" 1.0.16,"1.0.4" 1.0.15,"1.0.4" 1.0.14,"1.0.4" diff --git a/mmif/serialize/annotation.py b/mmif/serialize/annotation.py index 2596137a..5598ded0 100644 --- a/mmif/serialize/annotation.py +++ b/mmif/serialize/annotation.py @@ -244,11 +244,7 @@ def get_property(self, prop_name: str, default=None) -> Union['AnnotationPropert return default def __contains__(self, item): - try: - self.get(item) - return True - except KeyError: - return False + return self.get_property(item) def _get_label(self) -> str: """ @@ -466,7 +462,7 @@ def __iter__(self) -> Iterator[str]: for key in itertools.chain(self._named_attributes(), self._unnamed_attributes): yield key - def __getitem__(self, key: str) -> Optional[T]: + def __getitem__(self, key): """ Parent MmifObject class has a __getitem__ method that checks if the value is empty when asked for an unnamed attribute. But for diff --git a/mmif/serialize/mmif.py b/mmif/serialize/mmif.py index bd268ab9..eadb2488 100644 --- a/mmif/serialize/mmif.py +++ b/mmif/serialize/mmif.py @@ -259,8 +259,8 @@ def _desprately_search_annotation_object(ann_short_id): return self.__getitem__(ann_short_id) if all(map(lambda x: x in alignment_ann.properties, ('source', 'target'))): - source_ann = _desprately_search_annotation_object(alignment_ann.get('source')) - target_ann = _desprately_search_annotation_object(alignment_ann.get('target')) + source_ann = _desprately_search_annotation_object(alignment_ann.get_property('source')) + target_ann = _desprately_search_annotation_object(alignment_ann.get_property('target')) if isinstance(source_ann, Annotation) and isinstance(target_ann, Annotation): source_ann._cache_alignment(alignment_ann, target_ann) target_ann._cache_alignment(alignment_ann, source_ann) @@ -481,7 +481,7 @@ def get_documents_by_property(self, prop_key: str, prop_value: str) -> List[Docu for doc in view.get_documents(): if prop_key in doc and doc.get_property(prop_key) == prop_value: docs.append(doc) - docs.extend([document for document in self.documents if document[prop_key] == prop_value]) + docs.extend([document for document in self.documents if document.get_property(prop_key) == prop_value]) return docs def get_documents_locations(self, m_type: Union[DocumentTypes, str], path_only=False) -> List[Union[str, None]]: @@ -563,7 +563,7 @@ def get_alignments(self, at_type1: Union[str, ThingTypesBase], at_type2: Union[s else: for alignment in alignment_view.get_annotations(AnnotationTypes.Alignment): aligned_types = set() - for ann_id in [alignment['target'], alignment['source']]: + for ann_id in [alignment.get_property('target'), alignment.get_property('source')]: ann_id = cast(str, ann_id) if self.id_delimiter in ann_id: view_id, ann_id = ann_id.split(self.id_delimiter) diff --git a/mmif/serialize/model.py b/mmif/serialize/model.py index 97708de2..1f1f6d67 100644 --- a/mmif/serialize/model.py +++ b/mmif/serialize/model.py @@ -428,7 +428,7 @@ def _append_with_key(self, key: str, value: T, overwrite=False) -> None: if not overwrite and key in self._items: raise KeyError(f"Key {key} already exists") else: - self[key] = value + self._items[key] = value def append(self, value, overwrite): raise NotImplementedError() @@ -469,13 +469,13 @@ def __setitem__(self, index: int, value: T) -> None: raise TypeError("The list is read-only.") def __iter__(self) -> Iterator[T]: - return self._items.__iter__() + return self._items.values().__iter__() def __len__(self) -> int: return self._items.__len__() def __reversed__(self) -> Iterator[T]: - return reversed(self._items) + return reversed(self._items.values()) def __contains__(self, item: T) -> bool: return item in self._items diff --git a/mmif/utils/sequence_helper.py b/mmif/utils/sequence_helper.py index 17e29e87..813c0c78 100644 --- a/mmif/utils/sequence_helper.py +++ b/mmif/utils/sequence_helper.py @@ -241,10 +241,13 @@ def validate_labelset(annotations: Iterable[Annotation]) -> List[str]: # and validate that all annotations have the same label set for a in annotations: - if set(a.get_property('labelset')) != src_labels: - raise ValueError("All annotations must have the same label set, " - f"but found {a.get_property('labelset')}, " - f"different from {src_labels}") + if ann_labels := a.get_property('labelset'): + if set(ann_labels) != src_labels: + raise ValueError("All annotations must have the same label set, " + f"but found {a.get_property('labelset')}, " + f"different from {src_labels}") + else: + return ann_labels return list(src_labels) diff --git a/tests/test_serialize.py b/tests/test_serialize.py index ae8c3ad0..80ad208a 100644 --- a/tests/test_serialize.py +++ b/tests/test_serialize.py @@ -330,8 +330,8 @@ def test_cache_alignment(self): for vid, alignments in views_and_alignments.items(): v = mmif_obj.get_view_by_id(vid) for alignment in alignments: - s = v.get_annotation_by_id(alignment.get('source')) - t = v.get_annotation_by_id(alignment.get('target')) + s = v.get_annotation_by_id(alignment.get_property('source')) + t = v.get_annotation_by_id(alignment.get_property('target')) self.assertTrue(s.aligned_to_by(alignment).long_id.endswith(t.long_id)) self.assertTrue(t.aligned_to_by(alignment).long_id.endswith(s.long_id)) @@ -500,9 +500,14 @@ def test_get_label(self): self.assertEqual(a._get_label(), "text") a = v.new_annotation(AnnotationTypes.BoundingBox, boxType="text") self.assertEqual(a._get_label(), "text") - with self.assertRaises(KeyError): - a = v.new_annotation(AnnotationTypes.BoundingBox) - _ = a._get_label() + + # NOTE: + # In PR #304 (https://github.com/clamsproject/mmif-python/pull/304): + # ``get_property`` no longer raise ``KeyError`` + + # with self.assertRaises(KeyError): + # a = v.new_annotation(AnnotationTypes.BoundingBox) + # _ = a._get_label() def test_get_anchor_point(self): mmif = Mmif(validate=False) @@ -582,7 +587,7 @@ def setUp(self) -> None: def test_mmif_getitem_document(self): try: m1 = self.mmif_obj['m1'] - self.assertIs(m1, self.mmif_obj.documents.get('m1')) + self.assertIs(m1, self.mmif_obj.documents.get_item('m1')) except TypeError: self.fail("__getitem__ not implemented") except KeyError: @@ -614,7 +619,7 @@ def test_mmif_getitem_idconflict(self): def test_mmif_getitem_view(self): try: v1 = self.mmif_obj['v1'] - self.assertIs(v1, self.mmif_obj.views.get('v1')) + self.assertIs(v1, self.mmif_obj.views.get_item('v1')) except TypeError: self.fail("__getitem__ not implemented") except KeyError: @@ -623,7 +628,7 @@ def test_mmif_getitem_view(self): def test_mmif_getitem_annotation(self): try: v1_bb1 = self.mmif_obj['v5:bb1'] - self.assertIs(v1_bb1, self.mmif_obj.views.get('v5').annotations.get('bb1')) + self.assertIs(v1_bb1, self.mmif_obj.views.get_item('v5').annotations.get_item('bb1')) except TypeError: self.fail("__getitem__ not implemented") except KeyError: @@ -653,7 +658,7 @@ def test_mmif_fail_getitem_no_annotation(self): def test_view_getitem(self): try: s1 = self.mmif_obj['v1:s1'] - self.assertIs(s1, self.mmif_obj.get_view_by_id('v1').annotations.get('s1')) + self.assertIs(s1, self.mmif_obj.get_view_by_id('v1').annotations.get_item('s1')) except TypeError: self.fail("__getitem__ not implemented") except KeyError: @@ -780,7 +785,7 @@ def test_new_textdocument(self): self.assertTrue(td1.properties.text_value == td1.text_value) self.assertNotEqual(td1.text_language, td2.text_language) self.assertEqual(english_text, td1.text_value) - self.assertEqual(td1, self.view_obj.annotations.get(td1.id)) + self.assertEqual(td1, self.view_obj.annotations.get_item(td1.id)) td3 = self.view_obj.new_textdocument(english_text, mime='plain/text') self.assertEqual(td1.text_value, td3.text_value) self.assertEqual(len(td1.properties), len(td3.properties) - 1) @@ -937,8 +942,8 @@ def test_empty_annotation_property(self): 'empty_lst_prop': [] } }) - self.assertEqual(a['empty_str_prop'], "") - self.assertEqual(a['empty_lst_prop'], []) + self.assertEqual(a.get_property('empty_str_prop'), "") + self.assertEqual(a.get_property('empty_lst_prop'), []) self.assertEqual(4, len(a.properties.keys())) a_serialized = a.serialize() json.loads(a_serialized) @@ -976,7 +981,7 @@ def test_add_property(self): props.pop(removed_prop_key) try: new_mmif = Mmif(datum['json']) - new_mmif.get_view_by_id(view_id).annotations[anno_id].add_property(removed_prop_key, removed_prop_value) + new_mmif.get_view_by_id(view_id).annotations.get_item(anno_id).add_property(removed_prop_key, removed_prop_value) self.assertEqual(json.loads(datum['string'])['views'][j], json.loads(new_mmif.serialize())['views'][j], f'Failed on {i}, {view_id}') @@ -990,8 +995,6 @@ def test_get_property(self): self.assertEqual(a.properties['prop1'], 'value1') self.assertEqual(a['properties']['prop1'], 'value1') self.assertEqual(a.get_property('prop1'), 'value1') - self.assertEqual(a['prop1'], 'value1') - self.assertEqual(a.id, a['id']) self.assertEqual(a.id, a.get_property('id')) def test_get_property_with_alias(self): @@ -1109,7 +1112,7 @@ def test_document_added_properties(self): # `mime` is a special prop and shouldn't be added to "temporary" props doc1.add_property('mime', 'text') doc1_roundtrip = Document(doc1.serialize()) - self.assertTrue(doc1_roundtrip.get('mime'), 'text') + self.assertTrue(doc1_roundtrip.get_property('mime'), 'text') # but a generic prop should be added to "temporary" props # ("temporary" props dict will be lost after `Document`-level serialization) doc1.add_property('author', 'me') @@ -1178,9 +1181,9 @@ def test_document_adding_duplicate_properties(self): doc1.add_property('publisher', 'they') self.assertEqual(2, len(doc1._props_pending)) mmif_roundtrip3 = Mmif(mmif_roundtrip2.serialize()) - r0_v_anns = list(mmif_roundtrip3.views[r0_vid].get_annotations(AnnotationTypes.Annotation)) - r1_v_anns = list(mmif_roundtrip3.views[r1_vid].get_annotations(AnnotationTypes.Annotation)) - r2_v_anns = list(mmif_roundtrip3.views[r2_vid].get_annotations(AnnotationTypes.Annotation)) + r0_v_anns = list(mmif_roundtrip3.views.get_item(r0_vid).get_annotations(AnnotationTypes.Annotation)) + r1_v_anns = list(mmif_roundtrip3.views.get_item(r1_vid).get_annotations(AnnotationTypes.Annotation)) + r2_v_anns = list(mmif_roundtrip3.views.get_item(r2_vid).get_annotations(AnnotationTypes.Annotation)) # two props (`author` and `publisher`) are serialized to one `Annotation` objects self.assertEqual(1, len(r0_v_anns)) self.assertEqual(0, len(r1_v_anns)) @@ -1266,7 +1269,7 @@ def test_capital_annotation_generation_viewfinder(self): mmif.get_document_by_id(f'doc{i+1}').add_property('author', authors[i]) mmif_roundtrip = Mmif(mmif.serialize()) for i in range(1, 3): - cap_anns = list(mmif_roundtrip.views[f'v{i}'].get_annotations(AnnotationTypes.Annotation)) + cap_anns = list(mmif_roundtrip.views.get_item(f'v{i}').get_annotations(AnnotationTypes.Annotation)) self.assertEqual(1, len(cap_anns)) self.assertEqual(authors[i-1], cap_anns[0].get_property('author')) @@ -1343,16 +1346,16 @@ def setUp(self) -> None: self.mmif_obj = Mmif(MMIF_EXAMPLES['everything']) self.datalist = self.mmif_obj.views - def test_setitem(self): - self.datalist['v1'] = View({'id': 'v1'}) - self.datalist['v2'] = View({'id': 'v2'}) + def test_setitem_raise(self): + with self.assertRaises(TypeError): + self.datalist[0] = View({'id': 'v1'}) def test_getitem(self): - self.assertIs(self.mmif_obj['v1'], self.datalist['v1']) + self.assertIs(self.mmif_obj['v1'], self.datalist[0]) + self.assertIs(self.mmif_obj['v1'], self.datalist.get_item('v1')) - def test_getitem_raises(self): - with self.assertRaises(KeyError): - _ = self.datalist['reserved_names'] + def test_getitem_no_raises(self): + self.assertFalse(self.datalist.get_item('reserved_names')) def test_append(self): self.assertTrue('v256' not in self.datalist._items) @@ -1377,18 +1380,18 @@ def test_membership(self): self.assertIn('v1', self.datalist) self.assertNotIn('v200', self.datalist) - self.datalist['v200'] = View({'id': 'v200'}) + self.datalist.append(View({'id': 'v200'})) self.assertIn('v200', self.datalist) def test_len(self): self.assertEqual(8, len(self.datalist)) for i in range(9, 19): - self.datalist[f'v{i}'] = View({'id': f'v{i}'}) + self.datalist.append(View({'id': f'v{i}'})) self.assertEqual(i, len(self.datalist)) def test_iter(self): for i in range(9, 19): - self.datalist[f'v{i}'] = View({'id': f'v{i}'}) + self.datalist.append(View({'id': f'v{i}'})) for expected_index, (actual_index, item) in zip(range(18), enumerate(self.datalist)): self.assertEqual(expected_index, actual_index, "here") @@ -1397,13 +1400,13 @@ def test_iter(self): def test_setitem_fail_on_reserved_name(self): for i, name in enumerate(self.datalist.reserved_names): try: - self.datalist[name] = View({'id': f'v{i+1}'}) + self.datalist[i] = View({'id': f'v{i+1}'}) self.fail("was able to setitem on reserved name") - except KeyError as ke: - self.assertEqual("can't set item on a reserved name", ke.args[0]) + except TypeError as te: + self.assertEqual("The list is read-only.", te.args[0]) def test_get(self): - self.assertEqual(self.datalist['v1'], self.datalist.get('v1')) + self.assertEqual(self.datalist[0], self.datalist.get_item('v1')) def test_update(self): other_contains = """{ diff --git a/tests/test_utils.py b/tests/test_utils.py index 415fff00..3252fc23 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -118,8 +118,12 @@ def test_validate_labelset(self): self.assertFalse(sqh.validate_labelset(anns)) anns.pop() anns.append(view.new_annotation(AnnotationTypes.TimePoint)) - with pytest.raises(KeyError): - self.assertFalse(sqh.validate_labelset(anns)) + + # NOTE: As of PR #301, ``get_property`` no longer raises ``KeyError`` + # with pytest.raises(KeyError): + # self.assertFalse(sqh.validate_labelset(anns)) + + self.assertFalse(sqh.validate_labelset(anns)) def test_build_remapper(self): self.assertEqual({'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd', 'e': 'e', 'f': 'f'}, diff --git a/tests/test_vocab.py b/tests/test_vocab.py index eef3e76d..8d5d05d9 100644 --- a/tests/test_vocab.py +++ b/tests/test_vocab.py @@ -30,7 +30,7 @@ def test_use_in_mmif(self): def test_type_checking(self): mmif_obj = Mmif(MMIF_EXAMPLES['everything']) - ann_obj = mmif_obj.get_view_by_id('v1').annotations['s1'] + ann_obj = mmif_obj.get_view_by_id('v1').annotations.get_item('s1') self.assertTrue(ann_obj.is_type(ann_obj.at_type)) self.assertTrue(ann_obj.is_type(str(ann_obj.at_type))) self.assertFalse(ann_obj.is_type(DocumentTypes.VideoDocument))