diff --git a/metadata-ingestion/setup.cfg b/metadata-ingestion/setup.cfg index b7cf43b80b149..ec58f1f5ab7d7 100644 --- a/metadata-ingestion/setup.cfg +++ b/metadata-ingestion/setup.cfg @@ -15,6 +15,9 @@ warn_unused_configs = yes disallow_untyped_defs = no # try to be a bit more strict in certain areas of the codebase +[mypy-datahub] +# Only for datahub's __init__.py - allow implicit reexport +implicit_reexport = yes [mypy-datahub.*] ignore_missing_imports = no implicit_reexport = no @@ -54,7 +57,7 @@ addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers -p no: markers = slow: marks tests that are slow to run, including all docker-based tests (deselect with '-m not slow') integration: marks all integration tests, across all batches (deselect with '-m "not integration"') - integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelisation in CI. Batch 0 is the default batch. + integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelization in CI. Batch 0 is the default batch. integration_batch_1: mark tests to run in batch 1 of integration tests integration_batch_2: mark tests to run in batch 2 of integration tests testpaths = diff --git a/metadata-ingestion/src/datahub/errors.py b/metadata-ingestion/src/datahub/errors.py new file mode 100644 index 0000000000000..cb57251fcb8e1 --- /dev/null +++ b/metadata-ingestion/src/datahub/errors.py @@ -0,0 +1,35 @@ +from datahub.configuration.common import MetaError + +# TODO: Move all other error types to this file. + + +class SdkUsageError(MetaError): + pass + + +class AlreadyExistsError(SdkUsageError): + pass + + +class ItemNotFoundError(SdkUsageError): + pass + + +class MultipleItemsFoundError(SdkUsageError): + pass + + +class SchemaFieldKeyError(SdkUsageError, KeyError): + pass + + +class IngestionAttributionWarning(Warning): + pass + + +class MultipleSubtypesWarning(Warning): + pass + + +class ExperimentalWarning(Warning): + pass diff --git a/metadata-ingestion/src/datahub/sdk/__init__.py b/metadata-ingestion/src/datahub/sdk/__init__.py new file mode 100644 index 0000000000000..54bd18c323047 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/__init__.py @@ -0,0 +1,33 @@ +import warnings + +import datahub.metadata.schema_classes as models +from datahub.errors import ExperimentalWarning, SdkUsageError +from datahub.ingestion.graph.config import DatahubClientConfig +from datahub.metadata.urns import ( + ChartUrn, + ContainerUrn, + CorpGroupUrn, + CorpUserUrn, + DashboardUrn, + DataPlatformInstanceUrn, + DataPlatformUrn, + DatasetUrn, + DomainUrn, + GlossaryTermUrn, + SchemaFieldUrn, + TagUrn, +) +from datahub.sdk.container import Container +from datahub.sdk.dataset import Dataset +from datahub.sdk.main_client import DataHubClient + +warnings.warn( + "The new datahub SDK (e.g. datahub.sdk.*) is experimental. " + "Our typical backwards-compatibility and stability guarantees do not apply to this code. " + "When it's promoted to stable, the import path will change " + "from `from datahub.sdk import ...` to `from datahub import ...`.", + ExperimentalWarning, + stacklevel=2, +) +del warnings +del ExperimentalWarning diff --git a/metadata-ingestion/src/datahub/sdk/_all_entities.py b/metadata-ingestion/src/datahub/sdk/_all_entities.py new file mode 100644 index 0000000000000..04c5fb6045ae6 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/_all_entities.py @@ -0,0 +1,15 @@ +from typing import Dict, List, Type + +from datahub.sdk._entity import Entity +from datahub.sdk.container import Container +from datahub.sdk.dataset import Dataset + +# TODO: Is there a better way to declare this? +ENTITY_CLASSES_LIST: List[Type[Entity]] = [ + Container, + Dataset, +] + +ENTITY_CLASSES: Dict[str, Type[Entity]] = { + cls.get_urn_type().ENTITY_TYPE: cls for cls in ENTITY_CLASSES_LIST +} diff --git a/metadata-ingestion/src/datahub/sdk/_attribution.py b/metadata-ingestion/src/datahub/sdk/_attribution.py new file mode 100644 index 0000000000000..4faf441f58df7 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/_attribution.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import contextlib +from typing import Iterator + +from datahub.utilities.str_enum import StrEnum + + +class KnownAttribution(StrEnum): + INGESTION = "INGESTION" + INGESTION_ALTERNATE = "INGESTION_ALTERNATE" + + UI = "UI" + SDK = "SDK" + + PROPAGATION = "PROPAGATION" + + def is_ingestion(self) -> bool: + return self in ( + KnownAttribution.INGESTION, + KnownAttribution.INGESTION_ALTERNATE, + ) + + +_default_attribution = KnownAttribution.SDK + + +def get_default_attribution() -> KnownAttribution: + return _default_attribution + + +def set_default_attribution(attribution: KnownAttribution) -> None: + global _default_attribution + _default_attribution = attribution + + +@contextlib.contextmanager +def change_default_attribution(attribution: KnownAttribution) -> Iterator[None]: + old_attribution = get_default_attribution() + try: + set_default_attribution(attribution) + yield + finally: + set_default_attribution(old_attribution) + + +def is_ingestion_attribution() -> bool: + return get_default_attribution().is_ingestion() diff --git a/metadata-ingestion/src/datahub/sdk/_entity.py b/metadata-ingestion/src/datahub/sdk/_entity.py new file mode 100644 index 0000000000000..34a088a25949c --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/_entity.py @@ -0,0 +1,89 @@ +import abc +from typing import List, Optional, Type, Union + +from typing_extensions import Self + +import datahub.metadata.schema_classes as models +from datahub.emitter.mce_builder import Aspect as AspectTypeVar +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.errors import SdkUsageError +from datahub.metadata.urns import Urn +from datahub.utilities.urns._urn_base import _SpecificUrn + + +class Entity: + __slots__ = ("_urn", "_prev_aspects", "_aspects") + + def __init__(self, /, urn: Urn): + # This method is not meant for direct usage. + if type(self) is Entity: + raise SdkUsageError(f"{Entity.__name__} cannot be instantiated directly.") + + assert isinstance(urn, self.get_urn_type()) + self._urn: _SpecificUrn = urn + + # prev_aspects is None means this was created from scratch + self._prev_aspects: Optional[models.AspectBag] = None + self._aspects: models.AspectBag = {} + + @classmethod + def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self: + # If an init method from a subclass adds required fields, it also needs to override this method. + # An alternative approach would call cls.__new__() to bypass the init method, but it's a bit + # too hacky for my taste. + entity = cls(urn=urn) + return entity._init_from_graph(current_aspects) + + def _init_from_graph(self, current_aspects: models.AspectBag) -> Self: + self._prev_aspects = current_aspects + aspect: models._Aspect + for aspect_name, aspect in (current_aspects or {}).items(): # type: ignore + aspect_copy = type(aspect).from_obj(aspect.to_obj()) + self._aspects[aspect_name] = aspect_copy # type: ignore + return self + + @classmethod + @abc.abstractmethod + def get_urn_type(cls) -> Type[_SpecificUrn]: ... + + @property + def urn(self) -> _SpecificUrn: + return self._urn + + def _get_aspect( + self, + aspect_type: Type[AspectTypeVar], + /, + ) -> Optional[AspectTypeVar]: + return self._aspects.get(aspect_type.ASPECT_NAME) # type: ignore + + def _set_aspect(self, value: AspectTypeVar, /) -> None: + self._aspects[value.ASPECT_NAME] = value # type: ignore + + def _setdefault_aspect(self, default_aspect: AspectTypeVar, /) -> AspectTypeVar: + # Similar semantics to dict.setdefault. + if existing_aspect := self._get_aspect(type(default_aspect)): + return existing_aspect + self._set_aspect(default_aspect) + return default_aspect + + def _as_mcps( + self, + change_type: Union[str, models.ChangeTypeClass] = models.ChangeTypeClass.UPSERT, + ) -> List[MetadataChangeProposalWrapper]: + urn_str = str(self.urn) + + mcps = [] + for aspect in self._aspects.values(): + assert isinstance(aspect, models._Aspect) + mcps.append( + MetadataChangeProposalWrapper( + entityUrn=urn_str, + aspect=aspect, + changeType=change_type, + ) + ) + return mcps + + def __repr__(self) -> str: + return f"{self.__class__.__name__}('{self.urn}')" diff --git a/metadata-ingestion/src/datahub/sdk/_shared.py b/metadata-ingestion/src/datahub/sdk/_shared.py new file mode 100644 index 0000000000000..559b30ddd5288 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/_shared.py @@ -0,0 +1,338 @@ +import warnings +from datetime import datetime +from typing import ( + TYPE_CHECKING, + List, + Optional, + Tuple, + Union, +) + +from typing_extensions import TypeAlias + +import datahub.metadata.schema_classes as models +from datahub.emitter.mce_builder import ( + make_ts_millis, + make_user_urn, + parse_ts_millis, + validate_ownership_type, +) +from datahub.emitter.mcp_builder import ContainerKey +from datahub.errors import MultipleSubtypesWarning, SdkUsageError +from datahub.metadata.urns import ( + CorpGroupUrn, + CorpUserUrn, + DataJobUrn, + DataPlatformInstanceUrn, + DataPlatformUrn, + DatasetUrn, + DomainUrn, + GlossaryTermUrn, + OwnershipTypeUrn, + TagUrn, + Urn, +) +from datahub.sdk._entity import Entity +from datahub.utilities.urns.error import InvalidUrnError + +if TYPE_CHECKING: + from datahub.sdk.container import Container + +UrnOrStr: TypeAlias = Union[Urn, str] +DatasetUrnOrStr: TypeAlias = Union[str, DatasetUrn] +DatajobUrnOrStr: TypeAlias = Union[str, DataJobUrn] + +ActorUrn: TypeAlias = Union[CorpUserUrn, CorpGroupUrn] + + +def make_time_stamp(ts: Optional[datetime]) -> Optional[models.TimeStampClass]: + if ts is None: + return None + return models.TimeStampClass(time=make_ts_millis(ts)) + + +def parse_time_stamp(ts: Optional[models.TimeStampClass]) -> Optional[datetime]: + if ts is None: + return None + return parse_ts_millis(ts.time) + + +class HasPlatformInstance(Entity): + __slots__ = () + + def _set_platform_instance( + self, + platform: Union[str, DataPlatformUrn], + instance: Union[None, str, DataPlatformInstanceUrn], + ) -> None: + platform = DataPlatformUrn(platform) + if instance is not None: + try: + instance = DataPlatformInstanceUrn.from_string(instance) + except InvalidUrnError: + if not isinstance( + instance, DataPlatformInstanceUrn + ): # redundant check to make mypy happy + instance = DataPlatformInstanceUrn(platform, instance) + # At this point, instance is either None or a DataPlatformInstanceUrn. + + self._set_aspect( + models.DataPlatformInstanceClass( + platform=platform.urn(), + instance=instance.urn() if instance else None, + ) + ) + + @property + def platform_instance(self) -> Optional[DataPlatformInstanceUrn]: + dataPlatformInstance = self._get_aspect(models.DataPlatformInstanceClass) + if dataPlatformInstance and dataPlatformInstance.instance: + return DataPlatformInstanceUrn.from_string(dataPlatformInstance.instance) + return None + + +class HasSubtype(Entity): + __slots__ = () + + @property + def subtype(self) -> Optional[str]: + subtypes = self._get_aspect(models.SubTypesClass) + if subtypes and subtypes.typeNames: + if len(subtypes.typeNames) > 1: + warnings.warn( + f"The entity {self.urn} has multiple subtypes: {subtypes.typeNames}. " + "Only the first subtype will be considered.", + MultipleSubtypesWarning, + stacklevel=2, + ) + return subtypes.typeNames[0] + return None + + def set_subtype(self, subtype: str) -> None: + self._set_aspect(models.SubTypesClass(typeNames=[subtype])) + + +OwnershipTypeType: TypeAlias = Union[str, OwnershipTypeUrn] +OwnerInputType: TypeAlias = Union[ + str, + ActorUrn, + Tuple[Union[str, ActorUrn], OwnershipTypeType], + models.OwnerClass, +] +OwnersInputType: TypeAlias = List[OwnerInputType] + + +class HasOwnership(Entity): + __slots__ = () + + @staticmethod + def _parse_owner_class(owner: OwnerInputType) -> models.OwnerClass: + if isinstance(owner, models.OwnerClass): + return owner + + owner_type = models.OwnershipTypeClass.TECHNICAL_OWNER + owner_type_urn = None + + if isinstance(owner, tuple): + raw_owner, raw_owner_type = owner + + if isinstance(raw_owner_type, OwnershipTypeUrn): + owner_type = models.OwnershipTypeClass.CUSTOM + owner_type_urn = str(raw_owner_type) + else: + owner_type, owner_type_urn = validate_ownership_type(raw_owner_type) + else: + raw_owner = owner + + if isinstance(raw_owner, str): + # Tricky: this will gracefully handle a user passing in a group urn as a string. + # TODO: is this the right behavior? or should we require a valid urn here? + return models.OwnerClass( + owner=make_user_urn(raw_owner), + type=owner_type, + typeUrn=owner_type_urn, + ) + elif isinstance(raw_owner, Urn): + return models.OwnerClass( + owner=str(raw_owner), + type=owner_type, + typeUrn=owner_type_urn, + ) + else: + raise SdkUsageError( + f"Invalid owner {owner}: {type(owner)} is not a valid owner type" + ) + + # TODO: Return a custom type with deserialized urns, instead of the raw aspect. + # Ideally we'd also use first-class ownership type urns here, not strings. + @property + def owners(self) -> Optional[List[models.OwnerClass]]: + if owners_aspect := self._get_aspect(models.OwnershipClass): + return owners_aspect.owners + return None + + def set_owners(self, owners: OwnersInputType) -> None: + # TODO: add docs on the default parsing + default ownership type + parsed_owners = [self._parse_owner_class(owner) for owner in owners] + self._set_aspect(models.OwnershipClass(owners=parsed_owners)) + + +ContainerInputType: TypeAlias = Union["Container", ContainerKey] + + +class HasContainer(Entity): + __slots__ = () + + def _set_container(self, container: Optional[ContainerInputType]) -> None: + # We need to allow container to be None. It won't happen for datasets much, but + # will be required for root containers. + from datahub.sdk.container import Container + + browse_path: List[Union[str, models.BrowsePathEntryClass]] = [] + if isinstance(container, Container): + container_urn = container.urn.urn() + + parent_browse_path = container._get_aspect(models.BrowsePathsV2Class) + if parent_browse_path is None: + raise SdkUsageError( + "Parent container does not have a browse path, so cannot generate one for its children." + ) + browse_path = [ + *parent_browse_path.path, + models.BrowsePathEntryClass( + id=container_urn, + urn=container_urn, + ), + ] + elif container is not None: + container_urn = container.as_urn() + + browse_path_reversed = [container_urn] + parent_key = container.parent_key() + while parent_key is not None: + browse_path_reversed.append(parent_key.as_urn()) + parent_key = parent_key.parent_key() + browse_path = list(reversed(browse_path_reversed)) + else: + container_urn = None + browse_path = [] + + if container_urn: + self._set_aspect(models.ContainerClass(container=container_urn)) + + self._set_aspect( + models.BrowsePathsV2Class( + path=[ + ( + entry + if isinstance(entry, models.BrowsePathEntryClass) + else models.BrowsePathEntryClass( + id=entry, + urn=entry, + ) + ) + for entry in browse_path + ] + ) + ) + + +TagInputType: TypeAlias = Union[str, TagUrn, models.TagAssociationClass] +TagsInputType: TypeAlias = List[TagInputType] + + +class HasTags(Entity): + __slots__ = () + + # TODO: Return a custom type with deserialized urns, instead of the raw aspect. + @property + def tags(self) -> Optional[List[models.TagAssociationClass]]: + if tags := self._get_aspect(models.GlobalTagsClass): + return tags.tags + return None + + @classmethod + def _parse_tag_association_class( + cls, tag: TagInputType + ) -> models.TagAssociationClass: + if isinstance(tag, models.TagAssociationClass): + return tag + elif isinstance(tag, str): + assert TagUrn.from_string(tag) + return models.TagAssociationClass(tag=str(tag)) + + def set_tags(self, tags: TagsInputType) -> None: + self._set_aspect( + models.GlobalTagsClass( + tags=[self._parse_tag_association_class(tag) for tag in tags] + ) + ) + + +TermInputType: TypeAlias = Union[ + str, GlossaryTermUrn, models.GlossaryTermAssociationClass +] +TermsInputType: TypeAlias = List[TermInputType] + + +class HasTerms(Entity): + __slots__ = () + + # TODO: Return a custom type with deserialized urns, instead of the raw aspect. + @property + def terms(self) -> Optional[List[models.GlossaryTermAssociationClass]]: + if glossary_terms := self._get_aspect(models.GlossaryTermsClass): + return glossary_terms.terms + return None + + @classmethod + def _parse_glossary_term_association_class( + cls, term: TermInputType + ) -> models.GlossaryTermAssociationClass: + if isinstance(term, models.GlossaryTermAssociationClass): + return term + elif isinstance(term, str): + assert GlossaryTermUrn.from_string(term) + return models.GlossaryTermAssociationClass(urn=str(term)) + + @classmethod + def _terms_audit_stamp(self) -> models.AuditStampClass: + return models.AuditStampClass( + time=0, + # TODO figure out what to put here + actor=CorpUserUrn("__ingestion").urn(), + ) + + def set_terms(self, terms: TermsInputType) -> None: + self._set_aspect( + models.GlossaryTermsClass( + terms=[ + self._parse_glossary_term_association_class(term) for term in terms + ], + auditStamp=self._terms_audit_stamp(), + ) + ) + + +DomainInputType: TypeAlias = Union[str, DomainUrn] + + +class HasDomain(Entity): + __slots__ = () + + @property + def domain(self) -> Optional[DomainUrn]: + if domains := self._get_aspect(models.DomainsClass): + if len(domains.domains) > 1: + raise SdkUsageError( + f"The entity has multiple domains set, but only one is supported: {domains.domains}" + ) + elif domains.domains: + domain_str = domains.domains[0] + return DomainUrn.from_string(domain_str) + + return None + + def set_domain(self, domain: DomainInputType) -> None: + domain_urn = DomainUrn.from_string(domain) # basically a type assertion + self._set_aspect(models.DomainsClass(domains=[str(domain_urn)])) diff --git a/metadata-ingestion/src/datahub/sdk/container.py b/metadata-ingestion/src/datahub/sdk/container.py new file mode 100644 index 0000000000000..b4edae693bc9e --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/container.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Dict, Optional, Type + +from typing_extensions import Self + +import datahub.metadata.schema_classes as models +from datahub.emitter.mce_builder import ALL_ENV_TYPES +from datahub.emitter.mcp_builder import ( + _INCLUDE_ENV_IN_CONTAINER_PROPERTIES, + ContainerKey, +) +from datahub.errors import SdkUsageError +from datahub.metadata.urns import ( + ContainerUrn, + Urn, +) +from datahub.sdk._entity import Entity +from datahub.sdk._shared import ( + DomainInputType, + HasContainer, + HasDomain, + HasOwnership, + HasPlatformInstance, + HasSubtype, + HasTags, + HasTerms, + OwnersInputType, + TagsInputType, + TermsInputType, + make_time_stamp, + parse_time_stamp, +) + + +class Container( + HasPlatformInstance, + HasSubtype, + HasContainer, + HasOwnership, + HasTags, + HasTerms, + HasDomain, + Entity, +): + __slots__ = () + + @classmethod + def get_urn_type(cls) -> Type[ContainerUrn]: + return ContainerUrn + + def __init__( + self, + /, + # Identity. + container_key: ContainerKey | ContainerUrn, + *, + # Container attributes. + display_name: str, + qualified_name: Optional[str] = None, + description: Optional[str] = None, + external_url: Optional[str] = None, + # TODO: call this custom properties? + extra_properties: Optional[Dict[str, str]] = None, + created: Optional[datetime] = None, + last_modified: Optional[datetime] = None, + # Standard aspects. + subtype: Optional[str] = None, + owners: Optional[OwnersInputType] = None, + tags: Optional[TagsInputType] = None, + terms: Optional[TermsInputType] = None, + domain: Optional[DomainInputType] = None, + ): + if isinstance(container_key, ContainerUrn): + urn = container_key + else: + urn = ContainerUrn.from_string(container_key.as_urn()) + super().__init__(urn) + + # This needs to come first to ensure that the display name is registered. + self._ensure_container_props(name=display_name) + + # TODO: Normal usages should require container key. Only the graph init method can accept an urn. + if isinstance(container_key, ContainerKey): + self._set_platform_instance(container_key.platform, container_key.instance) + + self._set_container(container_key.parent_key()) + + self.set_custom_properties( + { + **container_key.property_dict(), + **(extra_properties or {}), + } + ) + + # Extra validation on the env field. + # In certain cases (mainly for backwards compatibility), the env field will actually + # have a platform instance name. + env = container_key.env if container_key.env in ALL_ENV_TYPES else None + if _INCLUDE_ENV_IN_CONTAINER_PROPERTIES and env is not None: + self._ensure_container_props().env = env + + if description is not None: + self.set_description(description) + if external_url is not None: + self.set_external_url(external_url) + if qualified_name is not None: + self.set_qualified_name(qualified_name) + if created is not None: + self.set_created(created) + if last_modified is not None: + self.set_last_modified(last_modified) + + if subtype is not None: + self.set_subtype(subtype) + if owners is not None: + self.set_owners(owners) + if tags is not None: + self.set_tags(tags) + if terms is not None: + self.set_terms(terms) + if domain is not None: + self.set_domain(domain) + + @classmethod + def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self: + assert isinstance(urn, ContainerUrn) + entity = cls(urn, display_name="__dummy_value__") + return entity._init_from_graph(current_aspects) + + def _ensure_container_props( + self, *, name: Optional[str] = None + ) -> models.ContainerPropertiesClass: + # TODO: Not super happy with this method's implementation, but it's + # internal-only and enforces the constraints that we need. + if name is not None: + return self._setdefault_aspect(models.ContainerPropertiesClass(name=name)) + + props = self._get_aspect(models.ContainerPropertiesClass) + if props is None: + raise SdkUsageError("Containers must have a name.") + return props + + @property + def display_name(self) -> str: + return self._ensure_container_props().name + + def set_display_name(self, value: str) -> None: + self._ensure_container_props().name = value + + @property + def description(self) -> Optional[str]: + return self._ensure_container_props().description + + def set_description(self, description: str) -> None: + self._ensure_container_props().description = description + + @property + def custom_properties(self) -> Optional[Dict[str, str]]: + return self._ensure_container_props().customProperties + + def set_custom_properties(self, custom_properties: Dict[str, str]) -> None: + # TODO: How do we ensure that the container key props are always retained? + self._ensure_container_props().customProperties = custom_properties + + @property + def external_url(self) -> Optional[str]: + return self._ensure_container_props().externalUrl + + def set_external_url(self, external_url: str) -> None: + self._ensure_container_props().externalUrl = external_url + + @property + def qualified_name(self) -> Optional[str]: + return self._ensure_container_props().qualifiedName + + def set_qualified_name(self, qualified_name: str) -> None: + self._ensure_container_props().qualifiedName = qualified_name + + @property + def created(self) -> Optional[datetime]: + return parse_time_stamp(self._ensure_container_props().created) + + def set_created(self, created: datetime) -> None: + self._ensure_container_props().created = make_time_stamp(created) + + @property + def last_modified(self) -> Optional[datetime]: + return parse_time_stamp(self._ensure_container_props().lastModified) + + def set_last_modified(self, last_modified: datetime) -> None: + self._ensure_container_props().lastModified = make_time_stamp(last_modified) diff --git a/metadata-ingestion/src/datahub/sdk/dataset.py b/metadata-ingestion/src/datahub/sdk/dataset.py new file mode 100644 index 0000000000000..f68b2c0845c80 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/dataset.py @@ -0,0 +1,584 @@ +from __future__ import annotations + +import warnings +from datetime import datetime +from typing import Dict, List, Optional, Tuple, Type, Union + +from typing_extensions import Self, TypeAlias, assert_never + +import datahub.metadata.schema_classes as models +from datahub.cli.cli_utils import first_non_null +from datahub.emitter.mce_builder import DEFAULT_ENV +from datahub.errors import ( + IngestionAttributionWarning, + ItemNotFoundError, + SchemaFieldKeyError, +) +from datahub.ingestion.source.sql.sql_types import resolve_sql_type +from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn, Urn +from datahub.sdk._attribution import is_ingestion_attribution +from datahub.sdk._entity import Entity +from datahub.sdk._shared import ( + ContainerInputType, + DatasetUrnOrStr, + DomainInputType, + HasContainer, + HasDomain, + HasOwnership, + HasPlatformInstance, + HasSubtype, + HasTags, + HasTerms, + OwnersInputType, + TagsInputType, + TermsInputType, + make_time_stamp, + parse_time_stamp, +) + +SchemaFieldInputType: TypeAlias = Union[ + str, + Tuple[str, str], # (name, type) + Tuple[str, str, str], # (name, type, description) + models.SchemaFieldClass, +] +SchemaFieldsInputType: TypeAlias = Union[ + List[SchemaFieldInputType], + models.SchemaMetadataClass, +] + +UpstreamInputType: TypeAlias = Union[ + # Dataset upstream variants. + DatasetUrnOrStr, + models.UpstreamClass, + # Column upstream variants. + models.FineGrainedLineageClass, +] +# Mapping of { downstream_column -> [upstream_columns] } +ColumnLineageMapping: TypeAlias = Dict[str, List[str]] +UpstreamLineageInputType: TypeAlias = Union[ + models.UpstreamLineageClass, + List[UpstreamInputType], + # Combined variant. + # Map of { upstream_dataset -> { downstream_column -> [upstream_column] } } + Dict[DatasetUrnOrStr, ColumnLineageMapping], +] + + +def _parse_upstream_input( + upstream_input: UpstreamInputType, +) -> Union[models.UpstreamClass, models.FineGrainedLineageClass]: + if isinstance(upstream_input, models.UpstreamClass): + return upstream_input + elif isinstance(upstream_input, models.FineGrainedLineageClass): + return upstream_input + elif isinstance(upstream_input, (str, DatasetUrn)): + return models.UpstreamClass( + dataset=str(upstream_input), + type=models.DatasetLineageTypeClass.TRANSFORMED, + ) + else: + assert_never(upstream_input) + + +def _parse_cll_mapping( + *, + upstream: DatasetUrnOrStr, + downstream: DatasetUrnOrStr, + cll_mapping: ColumnLineageMapping, +) -> List[models.FineGrainedLineageClass]: + cll = [] + for downstream_column, upstream_columns in cll_mapping.items(): + cll.append( + models.FineGrainedLineageClass( + upstreamType=models.FineGrainedLineageUpstreamTypeClass.FIELD_SET, + downstreamType=models.FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=[ + SchemaFieldUrn(upstream, upstream_column).urn() + for upstream_column in upstream_columns + ], + downstreams=[SchemaFieldUrn(downstream, downstream_column).urn()], + ) + ) + return cll + + +def _parse_upstream_lineage_input( + upstream_input: UpstreamLineageInputType, downstream_urn: DatasetUrn +) -> models.UpstreamLineageClass: + if isinstance(upstream_input, models.UpstreamLineageClass): + return upstream_input + elif isinstance(upstream_input, list): + upstreams = [_parse_upstream_input(upstream) for upstream in upstream_input] + + # Partition into table and column lineages. + tll = [ + upstream + for upstream in upstreams + if isinstance(upstream, models.UpstreamClass) + ] + cll = [ + upstream + for upstream in upstreams + if not isinstance(upstream, models.UpstreamClass) + ] + + # TODO: check that all things in cll are also in tll + return models.UpstreamLineageClass(upstreams=tll, fineGrainedLineages=cll) + elif isinstance(upstream_input, dict): + tll = [] + cll = [] + for dataset_urn, column_lineage in upstream_input.items(): + tll.append( + models.UpstreamClass( + dataset=str(dataset_urn), + type=models.DatasetLineageTypeClass.TRANSFORMED, + ) + ) + cll.extend( + _parse_cll_mapping( + upstream=dataset_urn, + downstream=downstream_urn, + cll_mapping=column_lineage, + ) + ) + + return models.UpstreamLineageClass(upstreams=tll, fineGrainedLineages=cll) + else: + assert_never(upstream_input) + + +class SchemaField: + __slots__ = ("_parent", "_field_path") + + def __init__(self, parent: Dataset, field_path: str): + self._parent = parent + self._field_path = field_path + + def _base_schema_field(self) -> models.SchemaFieldClass: + # This must exist - if it doesn't, we've got a larger bug. + schema_dict = self._parent._schema_dict() + return schema_dict[self._field_path] + + def _get_editable_schema_field( + self, + ) -> Optional[models.EditableSchemaFieldInfoClass]: + # This method does not make any mutations. + editable_schema = self._parent._get_aspect(models.EditableSchemaMetadataClass) + if editable_schema is None: + return None + for field in editable_schema.editableSchemaFieldInfo: + if field.fieldPath == self._field_path: + return field + return None + + def _ensure_editable_schema_field(self) -> models.EditableSchemaFieldInfoClass: + if is_ingestion_attribution(): + warnings.warn( + "This method should not be used in ingestion mode.", + IngestionAttributionWarning, + stacklevel=2, + ) + editable_schema = self._parent._setdefault_aspect( + models.EditableSchemaMetadataClass(editableSchemaFieldInfo=[]) + ) + for field in editable_schema.editableSchemaFieldInfo: + if field.fieldPath == self._field_path: + return field + + # If we don't have an entry for this field yet, create one. + field = models.EditableSchemaFieldInfoClass(fieldPath=self._field_path) + editable_schema.editableSchemaFieldInfo.append(field) + return field + + @property + def field_path(self) -> str: + return self._field_path + + @property + def mapped_type(self) -> models.SchemaFieldDataTypeClass: + return self._base_schema_field().type + + @property + def native_type(self) -> str: + return self._base_schema_field().nativeDataType + + # TODO expose nullability and primary/foreign key details + + @property + def description(self) -> Optional[str]: + editable_field = self._get_editable_schema_field() + return first_non_null( + [ + editable_field.description if editable_field is not None else None, + self._base_schema_field().description, + ] + ) + + def set_description(self, description: str) -> None: + if is_ingestion_attribution(): + editable_field = self._get_editable_schema_field() + if editable_field and editable_field.description is not None: + warnings.warn( + "The field description will be hidden by UI-based edits. " + "Change the edit mode to OVERWRITE_UI to override this behavior.", + category=IngestionAttributionWarning, + stacklevel=2, + ) + + self._base_schema_field().description = description + else: + self._ensure_editable_schema_field().description = description + + @property + def tags(self) -> Optional[List[models.TagAssociationClass]]: + # Tricky: if either has a non-null globalTags, this will not return None. + tags = None + + if (base_tags := self._base_schema_field().globalTags) is not None: + tags = tags or [] + tags.extend(base_tags.tags) + + if editable_field := self._get_editable_schema_field(): + if (editable_tags := editable_field.globalTags) is not None: + tags = tags or [] + tags.extend(editable_tags.tags) + + return tags + + def set_tags(self, tags: TagsInputType) -> None: + parsed_tags = [self._parent._parse_tag_association_class(tag) for tag in tags] + + if is_ingestion_attribution(): + editable_field = self._get_editable_schema_field() + if editable_field and editable_field.globalTags: + warnings.warn( + "Overwriting non-ingestion tags from ingestion is an anti-pattern.", + category=IngestionAttributionWarning, + stacklevel=2, + ) + editable_field.globalTags = None + + self._base_schema_field().globalTags = models.GlobalTagsClass( + tags=parsed_tags + ) + else: + base_field = self._base_schema_field() + if base_field.globalTags: + base_field.globalTags = None + + self._ensure_editable_schema_field().globalTags = models.GlobalTagsClass( + tags=parsed_tags + ) + + @property + def terms(self) -> Optional[List[models.GlossaryTermAssociationClass]]: + # TODO: Basically the same implementation as tags - can we share code? + terms = None + + if (base_terms := self._base_schema_field().glossaryTerms) is not None: + terms = terms or [] + terms.extend(base_terms.terms) + + if editable_field := self._get_editable_schema_field(): + if (editable_terms := editable_field.glossaryTerms) is not None: + terms = terms or [] + terms.extend(editable_terms.terms) + + return terms + + def set_terms(self, terms: List[models.GlossaryTermAssociationClass]) -> None: + parsed_terms = [ + self._parent._parse_glossary_term_association_class(term) for term in terms + ] + + if is_ingestion_attribution(): + editable_field = self._get_editable_schema_field() + if editable_field and editable_field.glossaryTerms: + warnings.warn( + "Overwriting non-ingestion terms from ingestion is an anti-pattern.", + category=IngestionAttributionWarning, + stacklevel=2, + ) + editable_field.glossaryTerms = None + + self._base_schema_field().glossaryTerms = models.GlossaryTermsClass( + terms=parsed_terms, + auditStamp=self._parent._terms_audit_stamp(), + ) + else: + base_field = self._base_schema_field() + if base_field.glossaryTerms: + base_field.glossaryTerms = None + + self._ensure_editable_schema_field().glossaryTerms = ( + models.GlossaryTermsClass( + terms=parsed_terms, + auditStamp=self._parent._terms_audit_stamp(), + ) + ) + + +class Dataset( + HasPlatformInstance, + HasSubtype, + HasContainer, + HasOwnership, + HasTags, + HasTerms, + HasDomain, + Entity, +): + __slots__ = () + + @classmethod + def get_urn_type(cls) -> Type[DatasetUrn]: + return DatasetUrn + + def __init__( + self, + *, + # Identity. + platform: str, + name: str, + platform_instance: Optional[str] = None, + env: str = DEFAULT_ENV, + # Dataset properties. + description: Optional[str] = None, + display_name: Optional[str] = None, + qualified_name: Optional[str] = None, + external_url: Optional[str] = None, + custom_properties: Optional[Dict[str, str]] = None, + created: Optional[datetime] = None, + last_modified: Optional[datetime] = None, + # Standard aspects. + subtype: Optional[str] = None, + container: Optional[ContainerInputType] = None, + owners: Optional[OwnersInputType] = None, + tags: Optional[TagsInputType] = None, + terms: Optional[TermsInputType] = None, + # TODO structured_properties + domain: Optional[DomainInputType] = None, + # Dataset-specific aspects. + schema: Optional[SchemaFieldsInputType] = None, + upstreams: Optional[models.UpstreamLineageClass] = None, + ): + urn = DatasetUrn.create_from_ids( + platform_id=platform, + table_name=name, + platform_instance=platform_instance, + env=env, + ) + super().__init__(urn) + + self._set_platform_instance(urn.platform, platform_instance) + + if schema is not None: + self._set_schema(schema) + if upstreams is not None: + self.set_upstreams(upstreams) + + if description is not None: + self.set_description(description) + if display_name is not None: + self.set_display_name(display_name) + if qualified_name is not None: + self.set_qualified_name(qualified_name) + if external_url is not None: + self.set_external_url(external_url) + if custom_properties is not None: + self.set_custom_properties(custom_properties) + if created is not None: + self.set_created(created) + if last_modified is not None: + self.set_last_modified(last_modified) + + if subtype is not None: + self.set_subtype(subtype) + if container is not None: + self._set_container(container) + if owners is not None: + self.set_owners(owners) + if tags is not None: + self.set_tags(tags) + if terms is not None: + self.set_terms(terms) + if domain is not None: + self.set_domain(domain) + + @classmethod + def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self: + assert isinstance(urn, DatasetUrn) + entity = cls( + platform=urn.platform, + name=urn.name, + env=urn.env, + ) + return entity._init_from_graph(current_aspects) + + @property + def urn(self) -> DatasetUrn: + return self._urn # type: ignore + + def _ensure_dataset_props(self) -> models.DatasetPropertiesClass: + return self._setdefault_aspect(models.DatasetPropertiesClass()) + + def _get_editable_props(self) -> Optional[models.EditableDatasetPropertiesClass]: + return self._get_aspect(models.EditableDatasetPropertiesClass) + + def _ensure_editable_props(self) -> models.EditableDatasetPropertiesClass: + # Note that most of the fields in this aspect are not used. + # The only one that's relevant for us is the description. + return self._setdefault_aspect(models.EditableDatasetPropertiesClass()) + + @property + def description(self) -> Optional[str]: + editable_props = self._get_editable_props() + return first_non_null( + [ + editable_props.description if editable_props is not None else None, + self._ensure_dataset_props().description, + ] + ) + + def set_description(self, description: str) -> None: + if is_ingestion_attribution(): + editable_props = self._get_editable_props() + if editable_props is not None and editable_props.description is not None: + warnings.warn( + "Overwriting non-ingestion description from ingestion is an anti-pattern.", + category=IngestionAttributionWarning, + stacklevel=2, + ) + # Force the ingestion description to show up. + editable_props.description = None + + self._ensure_dataset_props().description = description + else: + self._ensure_editable_props().description = description + + @property + def display_name(self) -> Optional[str]: + return self._ensure_dataset_props().name + + def set_display_name(self, display_name: str) -> None: + self._ensure_dataset_props().name = display_name + + @property + def qualified_name(self) -> Optional[str]: + return self._ensure_dataset_props().qualifiedName + + def set_qualified_name(self, qualified_name: str) -> None: + self._ensure_dataset_props().qualifiedName = qualified_name + + @property + def external_url(self) -> Optional[str]: + return self._ensure_dataset_props().externalUrl + + def set_external_url(self, external_url: str) -> None: + self._ensure_dataset_props().externalUrl = external_url + + @property + def custom_properties(self) -> Dict[str, str]: + return self._ensure_dataset_props().customProperties + + def set_custom_properties(self, custom_properties: Dict[str, str]) -> None: + self._ensure_dataset_props().customProperties = custom_properties + + @property + def created(self) -> Optional[datetime]: + return parse_time_stamp(self._ensure_dataset_props().created) + + def set_created(self, created: datetime) -> None: + self._ensure_dataset_props().created = make_time_stamp(created) + + @property + def last_modified(self) -> Optional[datetime]: + return parse_time_stamp(self._ensure_dataset_props().lastModified) + + def set_last_modified(self, last_modified: datetime) -> None: + self._ensure_dataset_props().lastModified = make_time_stamp(last_modified) + + def _schema_dict(self) -> Dict[str, models.SchemaFieldClass]: + schema_metadata = self._get_aspect(models.SchemaMetadataClass) + if schema_metadata is None: + raise ItemNotFoundError(f"Schema is not set for dataset {self.urn}") + return {field.fieldPath: field for field in schema_metadata.fields} + + @property + def schema(self) -> List[SchemaField]: + # TODO: Add some caching here to avoid iterating over the schema every time. + schema_dict = self._schema_dict() + return [SchemaField(self, field_path) for field_path in schema_dict] + + def _parse_schema_field_input( + self, schema_field_input: SchemaFieldInputType + ) -> models.SchemaFieldClass: + if isinstance(schema_field_input, models.SchemaFieldClass): + return schema_field_input + elif isinstance(schema_field_input, tuple): + # Support (name, type) and (name, type, description) forms + if len(schema_field_input) == 2: + name, field_type = schema_field_input + description = None + elif len(schema_field_input) == 3: + name, field_type, description = schema_field_input + else: + assert_never(schema_field_input) + return models.SchemaFieldClass( + fieldPath=name, + type=models.SchemaFieldDataTypeClass( + resolve_sql_type( + field_type, + platform=self.urn.get_data_platform_urn().platform_name, + ) + or models.NullTypeClass() + ), + nativeDataType=field_type, + description=description, + ) + elif isinstance(schema_field_input, str): + # TODO: Not sure this branch makes sense - we should probably just require types? + return models.SchemaFieldClass( + fieldPath=schema_field_input, + type=models.SchemaFieldDataTypeClass(models.NullTypeClass()), + nativeDataType="unknown", + description=None, + ) + else: + assert_never(schema_field_input) + + def _set_schema(self, schema: SchemaFieldsInputType) -> None: + # This method is not public. Ingestion/restatement users should be setting + # the schema via the constructor. SDK users that got a dataset from the graph + # probably shouldn't be adding/removing fields ad-hoc. The field-level mutators + # can be used instead. + if isinstance(schema, models.SchemaMetadataClass): + self._set_aspect(schema) + else: + parsed_schema = [self._parse_schema_field_input(field) for field in schema] + self._set_aspect( + models.SchemaMetadataClass( + fields=parsed_schema, + # The rest of these fields are not used, and so we can set them to dummy/default values. + schemaName="", + platform=self.urn.platform, + version=0, + hash="", + platformSchema=models.SchemalessClass(), + ) + ) + + def __getitem__(self, field_path: str) -> SchemaField: + # TODO: Automatically deal with field path v2? + schema_dict = self._schema_dict() + if field_path not in schema_dict: + raise SchemaFieldKeyError(f"Field {field_path} not found in schema") + return SchemaField(self, field_path) + + @property + def upstreams(self) -> Optional[models.UpstreamLineageClass]: + return self._get_aspect(models.UpstreamLineageClass) + + def set_upstreams(self, upstreams: UpstreamLineageInputType) -> None: + self._set_aspect(_parse_upstream_lineage_input(upstreams, self.urn)) diff --git a/metadata-ingestion/src/datahub/sdk/entity_client.py b/metadata-ingestion/src/datahub/sdk/entity_client.py new file mode 100644 index 0000000000000..99dc7f9a280ab --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/entity_client.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import warnings +from typing import TYPE_CHECKING, Union, overload + +import datahub.metadata.schema_classes as models +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.errors import IngestionAttributionWarning, ItemNotFoundError, SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.urns import ( + ContainerUrn, + DatasetUrn, + Urn, +) +from datahub.sdk._all_entities import ENTITY_CLASSES +from datahub.sdk._entity import Entity +from datahub.sdk._shared import UrnOrStr +from datahub.sdk.container import Container +from datahub.sdk.dataset import Dataset + +if TYPE_CHECKING: + from datahub.sdk.main_client import DataHubClient + + +class EntityClient: + def __init__(self, client: DataHubClient): + self._client = client + + # TODO: Make all of these methods sync by default. + + @property + def _graph(self) -> DataHubGraph: + return self._client._graph + + @overload + def get(self, urn: ContainerUrn) -> Container: ... + @overload + def get(self, urn: DatasetUrn) -> Dataset: ... + @overload + def get(self, urn: Union[Urn, str]) -> Entity: ... + def get(self, urn: UrnOrStr) -> Entity: + if not isinstance(urn, Urn): + urn = Urn.from_string(urn) + + # TODO: add error handling around this with a suggested alternative if not yet supported + EntityClass = ENTITY_CLASSES[urn.entity_type] + + if not self._graph.exists(str(urn)): + raise ItemNotFoundError(f"Entity {urn} not found") + + aspects = self._graph.get_entity_semityped(str(urn)) + + # TODO: save the timestamp so we can use If-Unmodified-Since on the updates + return EntityClass._new_from_graph(urn, aspects) + + def create(self, entity: Entity) -> None: + mcps = [] + + if self._graph.exists(str(entity.urn)): + raise SdkUsageError( + f"Entity {entity.urn} already exists. Use client.entities.upsert() to update it." + ) + + # Extra safety check: by putting this first, we can ensure that + # the request fails if the entity already exists. + mcps.append( + MetadataChangeProposalWrapper( + entityUrn=str(entity.urn), + aspect=entity.urn.to_key_aspect(), + changeType=models.ChangeTypeClass.CREATE_ENTITY, + ) + ) + mcps.extend(entity._as_mcps(models.ChangeTypeClass.CREATE)) + + self._graph.emit_mcps(mcps) + + def upsert(self, entity: Entity) -> None: + if entity._prev_aspects is None and self._graph.exists(str(entity.urn)): + warnings.warn( + f"The entity {entity.urn} already exists. This operation will partially overwrite the existing entity.", + IngestionAttributionWarning, + stacklevel=2, + ) + # TODO: If there are no previous aspects but the entity exists, should we delete aspects that are not present here? + + mcps = entity._as_mcps(models.ChangeTypeClass.UPSERT) + self._graph.emit_mcps(mcps) + + def update(self, entity: Union[Entity, MetadataPatchProposal]) -> None: + if isinstance(entity, MetadataPatchProposal): + return self._update_patch(entity) + + if entity._prev_aspects is None: + raise SdkUsageError( + f"For entities created via {entity.__class__.__name__}(...), use client.entities.create() or client.entities.upsert() instead" + ) + + # TODO: respect If-Unmodified-Since? + # -> probably add a "mode" parameter that can be "update" (e.g. if not modified) or "update_force" + + mcps = entity._as_mcps(models.ChangeTypeClass.UPSERT) + self._graph.emit_mcps(mcps) + + def _update_patch( + self, updater: MetadataPatchProposal, check_exists: bool = True + ) -> None: + if check_exists and not self._graph.exists(updater.urn): + raise SdkUsageError( + f"Entity {updater.urn} does not exist, and hence cannot be updated. " + "You can bypass this check by setting check_exists=False." + ) + + mcps = updater.build() + self._graph.emit_mcps(mcps) diff --git a/metadata-ingestion/src/datahub/sdk/main_client.py b/metadata-ingestion/src/datahub/sdk/main_client.py new file mode 100644 index 0000000000000..7243114192a9d --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/main_client.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from typing import Optional, overload + +from datahub.errors import SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph, get_default_graph +from datahub.ingestion.graph.config import DatahubClientConfig +from datahub.sdk.entity_client import EntityClient +from datahub.sdk.resolver_client import ResolverClient + + +class DataHubClient: + @overload + def __init__(self, *, server: str, token: Optional[str] = None): ... + @overload + def __init__(self, *, config: DatahubClientConfig): ... + @overload + def __init__(self, *, graph: DataHubGraph): ... + def __init__( + self, + *, + server: Optional[str] = None, + token: Optional[str] = None, + graph: Optional[DataHubGraph] = None, + config: Optional[DatahubClientConfig] = None, + ): + if server is not None: + if config is not None: + raise SdkUsageError("Cannot specify both server and config") + if graph is not None: + raise SdkUsageError("Cannot specify both server and graph") + graph = DataHubGraph(config=DatahubClientConfig(server=server, token=token)) + elif config is not None: + if graph is not None: + raise SdkUsageError("Cannot specify both config and graph") + graph = DataHubGraph(config=config) + elif graph is None: + raise SdkUsageError("Must specify either server, config, or graph") + + self._graph = graph + + @classmethod + def from_env(cls) -> "DataHubClient": + # Inspired by the DockerClient.from_env() method. + # TODO: This one also reads from ~/.datahubenv, so the "from_env" name might be a bit confusing. + # That file is part of the "environment", but is not a traditional "env variable". + graph = get_default_graph() + return cls(graph=graph) + + @property + def entities(self) -> EntityClient: + return EntityClient(self) + + @property + def resolve(self) -> ResolverClient: + return ResolverClient(self) diff --git a/metadata-ingestion/src/datahub/sdk/resolver_client.py b/metadata-ingestion/src/datahub/sdk/resolver_client.py new file mode 100644 index 0000000000000..dae2f61a918dd --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/resolver_client.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Optional, overload + +from datahub.errors import ItemNotFoundError, MultipleItemsFoundError, SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.urns import ( + CorpUserUrn, + DomainUrn, + GlossaryTermUrn, +) + +if TYPE_CHECKING: + from datahub.sdk.main_client import DataHubClient + + +class ResolverClient: + def __init__(self, client: DataHubClient): + self._client = client + + # TODO: add caching to this method + + @property + def _graph(self) -> DataHubGraph: + return self._client._graph + + def domain(self, *, name: str) -> DomainUrn: + urn_str = self._graph.get_domain_urn_by_name(name) + if urn_str is None: + raise ItemNotFoundError(f"Domain with name {name} not found") + return DomainUrn.from_string(urn_str) + + @overload + def user(self, *, name: str) -> CorpUserUrn: ... + @overload + def user(self, *, email: str) -> CorpUserUrn: ... + def user( + self, *, name: Optional[str] = None, email: Optional[str] = None + ) -> CorpUserUrn: + filter_explanation: str + filters = [] + if name is not None: + if email is not None: + raise SdkUsageError("Cannot specify both name and email for auto_user") + # TODO: do we filter on displayName or fullName? + filter_explanation = f"with name {name}" + filters.append( + { + "field": "fullName", + "values": [name], + "condition": "EQUAL", + } + ) + elif email is not None: + filter_explanation = f"with email {email}" + filters.append( + { + "field": "email", + "values": [email], + "condition": "EQUAL", + } + ) + else: + raise SdkUsageError("Must specify either name or email for auto_user") + + users = list( + self._graph.get_urns_by_filter( + entity_types=[CorpUserUrn.ENTITY_TYPE], + extraFilters=filters, + ) + ) + if len(users) == 0: + # TODO: In auto methods, should we just create the user/domain/etc if it doesn't exist? + raise ItemNotFoundError(f"User {filter_explanation} not found") + elif len(users) > 1: + raise MultipleItemsFoundError( + f"Multiple users found {filter_explanation}: {users}" + ) + else: + return CorpUserUrn.from_string(users[0]) + + def term(self, *, name: str) -> GlossaryTermUrn: + # TODO: Add some limits on the graph fetch + terms = list( + self._graph.get_urns_by_filter( + entity_types=[GlossaryTermUrn.ENTITY_TYPE], + extraFilters=[ + { + "field": "id", + "values": [name], + "condition": "EQUAL", + } + ], + ) + ) + if len(terms) == 0: + raise ItemNotFoundError(f"Term with name {name} not found") + elif len(terms) > 1: + raise SdkUsageError(f"Multiple terms found with name {name}: {terms}") + else: + return GlossaryTermUrn.from_string(terms[0]) diff --git a/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_basic_golden.json b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_basic_golden.json new file mode 100644 index 0000000000000..bff58b02c091b --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_basic_golden.json @@ -0,0 +1,91 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.Schemaless": {} + }, + "fields": [ + { + "fieldPath": "field1", + "nullable": false, + "description": "field1 description", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "field2", + "nullable": false, + "description": "field2 description", + "type": { + "type": { + "com.linkedin.schema.NullType": {} + } + }, + "nativeDataType": "int64", + "recursive": false, + "isPartOfKey": false + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "tags": [] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_complex_golden.json b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_complex_golden.json new file mode 100644 index 0000000000000..3b96b4d26ed2e --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_complex_golden.json @@ -0,0 +1,275 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "", + "platform": "urn:li:dataPlatform:snowflake", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.Schemaless": {} + }, + "fields": [ + { + "fieldPath": "field1", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "field2", + "nullable": false, + "description": "field2 description", + "type": { + "type": { + "com.linkedin.schema.NullType": {} + } + }, + "nativeDataType": "int64", + "recursive": false, + "isPartOfKey": false + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "editableDatasetProperties", + "aspect": { + "json": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "description": "test" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "key1": "value1", + "key2": "value2" + }, + "externalUrl": "https://example.com", + "name": "MY_TABLE", + "qualifiedName": "MY_DB.MY_SCHEMA.MY_TABLE", + "created": { + "time": 1735787045000 + }, + "lastModified": { + "time": 1736391846000 + }, + "tags": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c", + "urn": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c" + }, + { + "id": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "urn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:admin@datahubproject.io", + "type": "TECHNICAL_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [ + { + "tag": "urn:li:tag:tag1" + }, + { + "tag": "urn:li:tag:tag2" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "glossaryTerms", + "aspect": { + "json": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:AccountBalance" + } + ], + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:__ingestion" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:Marketing" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "editableSchemaMetadata", + "aspect": { + "json": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "editableSchemaFieldInfo": [ + { + "fieldPath": "field1", + "description": "field1 description", + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:field1_tag1" + }, + { + "tag": "urn:li:tag:field1_tag2" + } + ] + } + }, + { + "fieldPath": "field2", + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:field2_term1" + }, + { + "urn": "urn:li:glossaryTerm:field2_term2" + } + ], + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:__ingestion" + } + } + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_ingestion_golden.json b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_ingestion_golden.json new file mode 100644 index 0000000000000..3325e5e080e7a --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_ingestion_golden.json @@ -0,0 +1,231 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "", + "platform": "urn:li:dataPlatform:snowflake", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.Schemaless": {} + }, + "fields": [ + { + "fieldPath": "field1", + "nullable": false, + "description": "field1 description", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:field1_tag1" + }, + { + "tag": "urn:li:tag:field1_tag2" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "field2", + "nullable": false, + "description": "field2 description", + "type": { + "type": { + "com.linkedin.schema.NullType": {} + } + }, + "nativeDataType": "int64", + "recursive": false, + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:field2_term1" + }, + { + "urn": "urn:li:glossaryTerm:field2_term2" + } + ], + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:__ingestion" + } + }, + "isPartOfKey": false + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "key1": "value1", + "key2": "value2" + }, + "externalUrl": "https://example.com", + "name": "MY_TABLE", + "qualifiedName": "MY_DB.MY_SCHEMA.MY_TABLE", + "description": "test", + "created": { + "time": 1735787045000 + }, + "lastModified": { + "time": 1736391846000 + }, + "tags": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c", + "urn": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c" + }, + { + "id": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "urn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:admin@datahubproject.io", + "type": "TECHNICAL_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [ + { + "tag": "urn:li:tag:tag1" + }, + { + "tag": "urn:li:tag:tag2" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "glossaryTerms", + "aspect": { + "json": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:AccountBalance" + } + ], + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:__ingestion" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:Marketing" + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_db_golden.json b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_db_golden.json new file mode 100644 index 0000000000000..6789608570ba8 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_db_golden.json @@ -0,0 +1,52 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "snowflake", + "database": "test_db" + }, + "name": "test_db" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Database" + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_schema_golden.json b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_schema_golden.json new file mode 100644 index 0000000000000..c9729dc33546d --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_schema_golden.json @@ -0,0 +1,69 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "snowflake", + "database": "test_db", + "schema": "test_schema" + }, + "name": "test_schema" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "urn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de" + } + ] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_creation_golden.json b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_creation_golden.json new file mode 100644 index 0000000000000..4fd65e3fdfccd --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_creation_golden.json @@ -0,0 +1,142 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE_ENTITY", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "name": "test_db.test_schema.table_1", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "", + "platform": "urn:li:dataPlatform:snowflake", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.Schemaless": {} + }, + "fields": [ + { + "fieldPath": "col1", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "col2", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "editableDatasetProperties", + "aspect": { + "json": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "description": "test description" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "urn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de" + }, + { + "id": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "urn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [ + { + "tag": "urn:li:tag:tag1" + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_update_golden.json b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_update_golden.json new file mode 100644 index 0000000000000..4e2ba2a59a2b1 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_update_golden.json @@ -0,0 +1,45 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "description": "original description", + "tags": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "UPSERT", + "aspectName": "editableDatasetProperties", + "aspect": { + "json": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "description": "updated description" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_client_v2.py b/metadata-ingestion/tests/unit/sdk_v2/test_client_v2.py new file mode 100644 index 0000000000000..7acb0a8d2bf97 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/test_client_v2.py @@ -0,0 +1,55 @@ +from unittest.mock import Mock + +import pytest + +from datahub.errors import ItemNotFoundError, MultipleItemsFoundError, SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph +from datahub.ingestion.graph.config import DatahubClientConfig +from datahub.metadata.urns import CorpUserUrn +from datahub.sdk.main_client import DataHubClient + + +@pytest.fixture +def mock_graph() -> Mock: + graph = Mock(spec=DataHubGraph) + graph.exists.return_value = False + return graph + + +def test_client_creation(mock_graph: Mock) -> None: + assert DataHubClient(graph=mock_graph) + assert DataHubClient(server="https://example.com", token="token") + + +def test_client_init_errors(mock_graph: Mock) -> None: + config = DatahubClientConfig(server="https://example.com", token="token") + + with pytest.raises(SdkUsageError): + DataHubClient(server="https://example.com", graph=mock_graph) # type: ignore + with pytest.raises(SdkUsageError): + DataHubClient(server="https://example.com", config=config) # type: ignore + with pytest.raises(SdkUsageError): + DataHubClient(config=config, graph=mock_graph) # type: ignore + with pytest.raises(SdkUsageError): + DataHubClient() # type: ignore + + +def test_resolve_user(mock_graph: Mock) -> None: + client = DataHubClient(graph=mock_graph) + + # This test doesn't really validate the graphql query or vars. + # It probably makes more sense to test via smoke-tests. + + mock_graph.get_urns_by_filter.return_value = [] + with pytest.raises(ItemNotFoundError): + client.resolve.user(name="User") + + mock_graph.get_urns_by_filter.return_value = ["urn:li:corpuser:user"] + assert client.resolve.user(name="User") == CorpUserUrn("urn:li:corpuser:user") + + mock_graph.get_urns_by_filter.return_value = [ + "urn:li:corpuser:user", + "urn:li:corpuser:user2", + ] + with pytest.raises(MultipleItemsFoundError): + client.resolve.user(name="User") diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_dataset.py b/metadata-ingestion/tests/unit/sdk_v2/test_dataset.py new file mode 100644 index 0000000000000..da041ba3d4e30 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/test_dataset.py @@ -0,0 +1,189 @@ +import pathlib +from datetime import datetime, timezone + +import pytest + +from datahub.emitter.mcp_builder import SchemaKey +from datahub.errors import SchemaFieldKeyError +from datahub.ingestion.source.common.subtypes import DatasetSubTypes +from datahub.metadata.urns import ( + CorpUserUrn, + DatasetUrn, + DomainUrn, + GlossaryTermUrn, + TagUrn, +) +from datahub.sdk._attribution import KnownAttribution, change_default_attribution +from datahub.sdk._entity import Entity +from datahub.sdk.dataset import Dataset +from tests.test_helpers import mce_helpers + +_GOLDEN_DIR = pathlib.Path(__file__).parent / "dataset_golden" + + +def assert_entity_golden( + pytestconfig: pytest.Config, entity: Entity, golden_path: pathlib.Path +) -> None: + mce_helpers.check_goldens_stream( + pytestconfig=pytestconfig, + outputs=entity._as_mcps(), + golden_path=golden_path, + ignore_order=False, + ) + + +def test_dataset_basic(pytestconfig: pytest.Config) -> None: + d = Dataset( + platform="bigquery", + name="proj.dataset.table", + subtype=DatasetSubTypes.TABLE, + schema=[ + ("field1", "string", "field1 description"), + ("field2", "int64", "field2 description"), + ], + ) + + # Check urn setup. + assert Dataset.get_urn_type() == DatasetUrn + assert isinstance(d.urn, DatasetUrn) + assert ( + str(d.urn) + == "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)" + ) + assert str(d.urn) in repr(d) + + # Check most attributes. + assert d.platform_instance is None + assert d.tags is None + assert d.terms is None + assert d.created is None + assert d.last_modified is None + assert d.description is None + assert d.custom_properties == {} + assert d.domain is None + + # TODO: The column descriptions should go in the editable fields, since we're not in ingestion mode. + assert len(d.schema) == 2 + assert d["field1"].description == "field1 description" + + with pytest.raises(SchemaFieldKeyError, match=r"Field .* not found"): + d["should_be_missing"] + + with pytest.raises(AttributeError): + assert d.extra_attribute # type: ignore + with pytest.raises(AttributeError): + d.extra_attribute = "slots should reject extra fields" # type: ignore + with pytest.raises(AttributeError): + # This should fail. Eventually we should make it suggest calling set_owners instead. + d.owners = [] # type: ignore + + assert_entity_golden( + pytestconfig, d, _GOLDEN_DIR / "test_dataset_basic_golden.json" + ) + + +def _build_complex_dataset() -> Dataset: + schema = SchemaKey( + platform="snowflake", + instance="my_instance", + database="MY_DB", + schema="MY_SCHEMA", + ) + + created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc) + updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc) + + d = Dataset( + platform="snowflake", + platform_instance="my_instance", + name="my_db.my_schema.my_table", + container=schema, + subtype=DatasetSubTypes.TABLE, + schema=[ + ("field1", "string"), + ("field2", "int64", "field2 description"), + ], + display_name="MY_TABLE", + qualified_name="MY_DB.MY_SCHEMA.MY_TABLE", + created=created, + last_modified=updated, + custom_properties={ + "key1": "value1", + "key2": "value2", + }, + description="test", + external_url="https://example.com", + owners=[ + CorpUserUrn("admin@datahubproject.io"), + ], + tags=[ + TagUrn("tag1"), + TagUrn("tag2"), + ], + terms=[ + GlossaryTermUrn("AccountBalance"), + ], + domain=DomainUrn("Marketing"), + ) + assert d.platform_instance is not None + assert ( + str(d.platform_instance) + == "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + ) + assert d.subtype == "Table" + assert d.description == "test" + assert d.display_name == "MY_TABLE" + assert d.qualified_name == "MY_DB.MY_SCHEMA.MY_TABLE" + assert d.external_url == "https://example.com" + assert d.created == created + assert d.last_modified == updated + assert d.custom_properties == {"key1": "value1", "key2": "value2"} + + # Check standard aspects. + assert d.domain == DomainUrn("Marketing") + assert d.tags is not None + assert len(d.tags) == 2 + assert d.terms is not None + assert len(d.terms) == 1 + assert d.owners is not None + assert len(d.owners) == 1 + + assert len(d.schema) == 2 + + # Schema field description. + assert d["field1"].description is None + assert d["field2"].description == "field2 description" + d["field1"].set_description("field1 description") + assert d["field1"].description == "field1 description" + + # Schema field tags. + assert d["field1"].tags is None + d["field1"].set_tags([TagUrn("field1_tag1"), TagUrn("field1_tag2")]) + assert d["field1"].tags is not None + assert len(d["field1"].tags) == 2 + + # Schema field terms. + assert d["field2"].terms is None + d["field2"].set_terms( + [GlossaryTermUrn("field2_term1"), GlossaryTermUrn("field2_term2")] + ) + assert d["field2"].terms is not None + assert len(d["field2"].terms) == 2 + + return d + + +def test_dataset_complex(pytestconfig: pytest.Config) -> None: + d = _build_complex_dataset() + assert_entity_golden( + pytestconfig, d, _GOLDEN_DIR / "test_dataset_complex_golden.json" + ) + + +def test_dataset_ingestion(pytestconfig: pytest.Config) -> None: + with change_default_attribution(KnownAttribution.INGESTION): + d = _build_complex_dataset() + + assert_entity_golden( + pytestconfig, d, _GOLDEN_DIR / "test_dataset_ingestion_golden.json" + ) diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_entity_client.py b/metadata-ingestion/tests/unit/sdk_v2/test_entity_client.py new file mode 100644 index 0000000000000..2218ef2f90944 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/test_entity_client.py @@ -0,0 +1,142 @@ +import pathlib +from unittest.mock import Mock + +import pytest + +import datahub.metadata.schema_classes as models +from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey +from datahub.errors import ItemNotFoundError, SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.urns import DatasetUrn, TagUrn +from datahub.sdk.container import Container +from datahub.sdk.dataset import Dataset +from datahub.sdk.main_client import DataHubClient +from tests.test_helpers import mce_helpers + +_GOLDEN_DIR = pathlib.Path(__file__).parent / "entity_client_goldens" + + +@pytest.fixture +def mock_graph() -> Mock: + graph = Mock(spec=DataHubGraph) + graph.exists.return_value = False + return graph + + +@pytest.fixture +def client(mock_graph: Mock) -> DataHubClient: + return DataHubClient(graph=mock_graph) + + +def assert_client_golden( + pytestconfig: pytest.Config, + client: DataHubClient, + golden_path: pathlib.Path, +) -> None: + mcps = client._graph.emit_mcps.call_args[0][0] # type: ignore + mce_helpers.check_goldens_stream( + pytestconfig=pytestconfig, + outputs=mcps, + golden_path=golden_path, + ignore_order=False, + ) + + +def test_container_creation_flow( + pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock +) -> None: + # Create database and schema containers + db = DatabaseKey(platform="snowflake", database="test_db") + schema = SchemaKey(**db.dict(), schema="test_schema") + + db_container = Container(db, display_name="test_db", subtype="Database") + schema_container = Container(schema, display_name="test_schema", subtype="Schema") + + # Test database container creation + client.entities.upsert(db_container) + assert_client_golden( + pytestconfig, client, _GOLDEN_DIR / "test_container_db_golden.json" + ) + + # Test schema container creation + client.entities.upsert(schema_container) + assert_client_golden( + pytestconfig, client, _GOLDEN_DIR / "test_container_schema_golden.json" + ) + + +def test_dataset_creation( + pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock +) -> None: + schema = SchemaKey(platform="snowflake", database="test_db", schema="test_schema") + + dataset = Dataset( + platform="snowflake", + name="test_db.test_schema.table_1", + env="prod", + container=schema, + schema=[ + ("col1", "string"), + ("col2", "int"), + ], + description="test description", + tags=[TagUrn("tag1")], + ) + + client.entities.create(dataset) + assert_client_golden( + pytestconfig, client, _GOLDEN_DIR / "test_dataset_creation_golden.json" + ) + + +def test_dataset_read_modify_write( + pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock +) -> None: + # Setup mock for existing dataset + mock_graph.exists.return_value = True + dataset_urn = DatasetUrn( + platform="snowflake", name="test_db.test_schema.table_1", env="prod" + ) + + # Mock the get_entity_semityped response with initial state + mock_graph.get_entity_semityped.return_value = { + "datasetProperties": models.DatasetPropertiesClass( + description="original description", + customProperties={}, + tags=[], + ) + } + + # Get and update dataset + dataset = client.entities.get(dataset_urn) + dataset.set_description("updated description") + + client.entities.update(dataset) + assert_client_golden( + pytestconfig, client, _GOLDEN_DIR / "test_dataset_update_golden.json" + ) + + +def test_create_existing_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None: + mock_graph.exists.return_value = True + + dataset = Dataset( + platform="snowflake", + name="test_db.test_schema.table_1", + env="prod", + schema=[("col1", "string")], + ) + + with pytest.raises(SdkUsageError, match="Entity .* already exists"): + client.entities.create(dataset) + + +def test_get_nonexistent_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None: + mock_graph.exists.return_value = False + + dataset_urn = DatasetUrn( + platform="snowflake", name="test_db.test_schema.missing_table", env="prod" + ) + + with pytest.raises(ItemNotFoundError, match="Entity .* not found"): + client.entities.get(dataset_urn)