|
| 1 | +import abc |
| 2 | +from typing import List, Optional, Type, Union |
| 3 | + |
| 4 | +from typing_extensions import Self |
| 5 | + |
| 6 | +import datahub.metadata.schema_classes as models |
| 7 | +from datahub.emitter.mce_builder import Aspect as AspectTypeVar |
| 8 | +from datahub.emitter.mcp import MetadataChangeProposalWrapper |
| 9 | +from datahub.errors import SdkUsageError |
| 10 | +from datahub.metadata.urns import Urn |
| 11 | +from datahub.utilities.urns._urn_base import _SpecificUrn |
| 12 | + |
| 13 | + |
| 14 | +class Entity: |
| 15 | + __slots__ = ("_urn", "_prev_aspects", "_aspects") |
| 16 | + |
| 17 | + def __init__(self, /, urn: Urn): |
| 18 | + # This method is not meant for direct usage. |
| 19 | + if type(self) is Entity: |
| 20 | + raise SdkUsageError(f"{Entity.__name__} cannot be instantiated directly.") |
| 21 | + |
| 22 | + assert isinstance(urn, self.get_urn_type()) |
| 23 | + self._urn: _SpecificUrn = urn |
| 24 | + |
| 25 | + # prev_aspects is None means this was created from scratch |
| 26 | + self._prev_aspects: Optional[models.AspectBag] = None |
| 27 | + self._aspects: models.AspectBag = {} |
| 28 | + |
| 29 | + @classmethod |
| 30 | + def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self: |
| 31 | + # If an init method from a subclass adds required fields, it also needs to override this method. |
| 32 | + # An alternative approach would call cls.__new__() to bypass the init method, but it's a bit |
| 33 | + # too hacky for my taste. |
| 34 | + entity = cls(urn=urn) |
| 35 | + return entity._init_from_graph(current_aspects) |
| 36 | + |
| 37 | + def _init_from_graph(self, current_aspects: models.AspectBag) -> Self: |
| 38 | + self._prev_aspects = current_aspects |
| 39 | + aspect: models._Aspect |
| 40 | + for aspect_name, aspect in (current_aspects or {}).items(): # type: ignore |
| 41 | + aspect_copy = type(aspect).from_obj(aspect.to_obj()) |
| 42 | + self._aspects[aspect_name] = aspect_copy # type: ignore |
| 43 | + return self |
| 44 | + |
| 45 | + @classmethod |
| 46 | + @abc.abstractmethod |
| 47 | + def get_urn_type(cls) -> Type[_SpecificUrn]: ... |
| 48 | + |
| 49 | + @property |
| 50 | + def urn(self) -> _SpecificUrn: |
| 51 | + return self._urn |
| 52 | + |
| 53 | + def _get_aspect( |
| 54 | + self, |
| 55 | + aspect_type: Type[AspectTypeVar], |
| 56 | + /, |
| 57 | + ) -> Optional[AspectTypeVar]: |
| 58 | + return self._aspects.get(aspect_type.ASPECT_NAME) # type: ignore |
| 59 | + |
| 60 | + def _set_aspect(self, value: AspectTypeVar, /) -> None: |
| 61 | + self._aspects[value.ASPECT_NAME] = value # type: ignore |
| 62 | + |
| 63 | + def _setdefault_aspect(self, default_aspect: AspectTypeVar, /) -> AspectTypeVar: |
| 64 | + # Similar semantics to dict.setdefault. |
| 65 | + if existing_aspect := self._get_aspect(type(default_aspect)): |
| 66 | + return existing_aspect |
| 67 | + self._set_aspect(default_aspect) |
| 68 | + return default_aspect |
| 69 | + |
| 70 | + def _as_mcps( |
| 71 | + self, |
| 72 | + change_type: Union[str, models.ChangeTypeClass] = models.ChangeTypeClass.UPSERT, |
| 73 | + ) -> List[MetadataChangeProposalWrapper]: |
| 74 | + urn_str = str(self.urn) |
| 75 | + |
| 76 | + mcps = [] |
| 77 | + for aspect in self._aspects.values(): |
| 78 | + assert isinstance(aspect, models._Aspect) |
| 79 | + mcps.append( |
| 80 | + MetadataChangeProposalWrapper( |
| 81 | + entityUrn=urn_str, |
| 82 | + aspect=aspect, |
| 83 | + changeType=change_type, |
| 84 | + ) |
| 85 | + ) |
| 86 | + return mcps |
| 87 | + |
| 88 | + def __repr__(self) -> str: |
| 89 | + return f"{self.__class__.__name__}('{self.urn}')" |
0 commit comments