diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapper.java index 334faf753cb8b5..205b6b7d11443e 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapper.java @@ -111,4 +111,8 @@ public static String getEntityTypeUrn(String name) { } return ENTITY_NAME_TO_ENTITY_TYPE_URN.get(name); } + + public static boolean isValidEntityType(String entityTypeUrn) { + return ENTITY_TYPE_URN_TO_NAME.containsKey(entityTypeUrn); + } } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/structuredproperty/StructuredPropertyMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/structuredproperty/StructuredPropertyMapper.java index 5dc73d9ad09388..98d48d7aa143e3 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/structuredproperty/StructuredPropertyMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/structuredproperty/StructuredPropertyMapper.java @@ -20,6 +20,7 @@ import com.linkedin.datahub.graphql.generated.StructuredPropertySettings; import com.linkedin.datahub.graphql.generated.TypeQualifier; import com.linkedin.datahub.graphql.types.common.mappers.util.MappingHelper; +import com.linkedin.datahub.graphql.types.entitytype.EntityTypeUrnMapper; import com.linkedin.datahub.graphql.types.mappers.MapperUtils; import com.linkedin.datahub.graphql.types.mappers.ModelMapper; import com.linkedin.entity.EntityResponse; @@ -30,7 +31,9 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class StructuredPropertyMapper implements ModelMapper { @@ -141,8 +144,21 @@ private TypeQualifier mapTypeQualifier(final StringArrayMap gmsTypeQualifier) { final TypeQualifier typeQualifier = new TypeQualifier(); List allowedTypes = gmsTypeQualifier.get(ALLOWED_TYPES); if (allowedTypes != null) { + // filter out correct allowedTypes + List validAllowedTypes = + allowedTypes.stream() + .filter(EntityTypeUrnMapper::isValidEntityType) + .collect(Collectors.toList()); + if (validAllowedTypes.size() != allowedTypes.size()) { + log.error( + String.format( + "Property has invalid allowed types set. Current list of allowed types: %s", + allowedTypes)); + } typeQualifier.setAllowedTypes( - allowedTypes.stream().map(this::createEntityTypeEntity).collect(Collectors.toList())); + validAllowedTypes.stream() + .map(this::createEntityTypeEntity) + .collect(Collectors.toList())); } return typeQualifier; } diff --git a/datahub-web-react/src/app/analyticsDashboard/components/TimeSeriesChart.tsx b/datahub-web-react/src/app/analyticsDashboard/components/TimeSeriesChart.tsx index 68851a950bcc55..85fa7843d108e5 100644 --- a/datahub-web-react/src/app/analyticsDashboard/components/TimeSeriesChart.tsx +++ b/datahub-web-react/src/app/analyticsDashboard/components/TimeSeriesChart.tsx @@ -84,6 +84,38 @@ export function computeLines(chartData: TimeSeriesChartType, insertBlankPoints: return returnLines; } +const formatAxisDate = (value: number, chartData: TimeSeriesChartType) => { + const date = new Date(value); + + switch (chartData.interval) { + case 'MONTH': + return date.toLocaleDateString('en-US', { + month: 'short', + year: 'numeric', + timeZone: 'UTC', + }); + case 'WEEK': + return date.toLocaleDateString('en-US', { + month: 'short', + day: 'numeric', + timeZone: 'UTC', + }); + case 'DAY': + return date.toLocaleDateString('en-US', { + weekday: 'short', + day: 'numeric', + timeZone: 'UTC', + }); + default: + return date.toLocaleDateString('en-US', { + month: 'short', + day: 'numeric', + year: 'numeric', + timeZone: 'UTC', + }); + } +}; + export const TimeSeriesChart = ({ chartData, width, @@ -117,6 +149,7 @@ export const TimeSeriesChart = ({ strokeWidth={style?.axisWidth} tickLabelProps={{ fill: 'black', fontFamily: 'inherit', fontSize: 10 }} numTicks={3} + tickFormat={(value) => formatAxisDate(value, chartData)} />
- {new Date( - Number(accessors.xAccessor(tooltipData.nearestDatum.datum)), - ).toDateString()} + {formatAxisDate(accessors.xAccessor(tooltipData.nearestDatum.datum), chartData)}
{accessors.yAccessor(tooltipData.nearestDatum.datum)}
diff --git a/docs/cli.md b/docs/cli.md index c9cd85fd0e8155..f332f77d9d21a5 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -735,7 +735,7 @@ Please see our [Integrations page](https://datahubproject.io/integrations) if yo | [bigquery](./generated/ingestion/sources/bigquery.md) | `pip install 'acryl-datahub[bigquery]'` | BigQuery source | | [datahub-lineage-file](./generated/ingestion/sources/file-based-lineage.md) | _no additional dependencies_ | Lineage File source | | [datahub-business-glossary](./generated/ingestion/sources/business-glossary.md) | _no additional dependencies_ | Business Glossary File source | -| [dbt](./generated/ingestion/sources/dbt.md) | _no additional dependencies_ | dbt source | +| [dbt](./generated/ingestion/sources/dbt.md) | `pip install 'acryl-datahub[dbt]'` | dbt source | | [dremio](./generated/ingestion/sources/dremio.md) | `pip install 'acryl-datahub[dremio]'` | Dremio Source | | [druid](./generated/ingestion/sources/druid.md) | `pip install 'acryl-datahub[druid]'` | Druid Source | | [feast](./generated/ingestion/sources/feast.md) | `pip install 'acryl-datahub[feast]'` | Feast source (0.26.0) | diff --git a/docs/managed-datahub/operator-guide/setting-up-remote-ingestion-executor.md b/docs/managed-datahub/operator-guide/setting-up-remote-ingestion-executor.md index 68ba9af38dd2ea..e5b0562734273d 100644 --- a/docs/managed-datahub/operator-guide/setting-up-remote-ingestion-executor.md +++ b/docs/managed-datahub/operator-guide/setting-up-remote-ingestion-executor.md @@ -125,6 +125,50 @@ The Helm chart [datahub-executor-worker](https://executor-helm.acryl.io/index.ya --set image.tag=v0.3.1 \ acryl datahub-executor-worker ``` +9. As of DataHub Cloud `v0.3.8.2` It is possible to pass secrets to ingestion recipes using Kubernetes Secret CRDs as shown below. This allows to update secrets at runtime without restarting Remote Executor process. + ``` + # 1. Create K8s Secret object in remote executor namespace, e.g. + apiVersion: v1 + kind: Secret + metadata: + name: datahub-secret-store + data: + REDSHIFT_PASSWORD: cmVkc2hpZnQtc2VjcmV0Cg== + SNOWFLAKE_PASSWORD: c25vd2ZsYWtlLXNlY3JldAo= + # 2. Add secret into your Remote Executor deployment: + extraVolumes: + - name: datahub-secret-store + secret: + secretName: datahub-secret-store + # 3. Mount it under /mnt/secrets directory + extraVolumeMounts: + - mountPath: /mnt/secrets + name: datahub-secret-store + ``` +You can then reference the mounted secrets directly in the ingestion recipe: +```yaml +source: + type: redshift + config: + host_port: '' + username: connector_test + table_lineage_mode: mixed + include_table_lineage: true + include_tables: true + include_views: true + profiling: + enabled: true + profile_table_level_only: false + stateful_ingestion: + enabled: true + password: '${REDSHIFT_PASSWORD}' +``` + +By default the executor will look for files mounted in `/mnt/secrets`, this is override-able by setting the env var: +`DATAHUB_EXECUTOR_FILE_SECRET_BASEDIR` to a different location (default: `/mnt/secrets`) + +These files are expected to be under 1MB in data by default. To increase this limit set a higher value using: +`DATAHUB_EXECUTOR_FILE_SECRET_MAXLEN` (default: `1024768`, size in bytes) ## FAQ diff --git a/docs/managed-datahub/release-notes/v_0_3_8.md b/docs/managed-datahub/release-notes/v_0_3_8.md index 18a40915ad22a9..488ce0e7ac772e 100644 --- a/docs/managed-datahub/release-notes/v_0_3_8.md +++ b/docs/managed-datahub/release-notes/v_0_3_8.md @@ -3,7 +3,7 @@ Release Availability Date --- -21-Jan-2025 +29-Jan-2025 Recommended CLI/SDK --- diff --git a/docs/modeling/extending-the-metadata-model.md b/docs/modeling/extending-the-metadata-model.md index 8b308fb65d243c..b1a8655481365c 100644 --- a/docs/modeling/extending-the-metadata-model.md +++ b/docs/modeling/extending-the-metadata-model.md @@ -361,6 +361,8 @@ It takes the following parameters: This annotation is applied to fields inside an Aspect. It instructs DataHub to index the field so it can be retrieved via the search APIs. +:::note If you are adding @Searchable to a field that already has data, you'll want to restore indices [via api](https://datahubproject.io/docs/api/restli/restore-indices/) or [via upgrade step](https://github.com/datahub-project/datahub/blob/master/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java) to have it be populated with existing data. + It takes the following parameters: - **fieldType**: string - The settings for how each field is indexed is defined by the field type. Each field type is diff --git a/metadata-ingestion/setup.cfg b/metadata-ingestion/setup.cfg index b7cf43b80b149e..ec58f1f5ab7d7b 100644 --- a/metadata-ingestion/setup.cfg +++ b/metadata-ingestion/setup.cfg @@ -15,6 +15,9 @@ warn_unused_configs = yes disallow_untyped_defs = no # try to be a bit more strict in certain areas of the codebase +[mypy-datahub] +# Only for datahub's __init__.py - allow implicit reexport +implicit_reexport = yes [mypy-datahub.*] ignore_missing_imports = no implicit_reexport = no @@ -54,7 +57,7 @@ addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers -p no: markers = slow: marks tests that are slow to run, including all docker-based tests (deselect with '-m not slow') integration: marks all integration tests, across all batches (deselect with '-m "not integration"') - integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelisation in CI. Batch 0 is the default batch. + integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelization in CI. Batch 0 is the default batch. integration_batch_1: mark tests to run in batch 1 of integration tests integration_batch_2: mark tests to run in batch 2 of integration tests testpaths = diff --git a/metadata-ingestion/src/datahub/errors.py b/metadata-ingestion/src/datahub/errors.py new file mode 100644 index 00000000000000..cb57251fcb8e1b --- /dev/null +++ b/metadata-ingestion/src/datahub/errors.py @@ -0,0 +1,35 @@ +from datahub.configuration.common import MetaError + +# TODO: Move all other error types to this file. + + +class SdkUsageError(MetaError): + pass + + +class AlreadyExistsError(SdkUsageError): + pass + + +class ItemNotFoundError(SdkUsageError): + pass + + +class MultipleItemsFoundError(SdkUsageError): + pass + + +class SchemaFieldKeyError(SdkUsageError, KeyError): + pass + + +class IngestionAttributionWarning(Warning): + pass + + +class MultipleSubtypesWarning(Warning): + pass + + +class ExperimentalWarning(Warning): + pass diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index abe9b5684f8f1f..950c1ccf5bfb41 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -1673,10 +1673,11 @@ def to_platform_resource( primary_key="", ) - # Extract user email mappings + # Extract user email mappings. + # Sort it to ensure the order is deterministic. user_email_cache = { user_id: user.email - for user_id, user in self._user_cache.items() + for user_id, user in sorted(self._user_cache.items()) if user.email } diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py index c1d29127699aaf..e3cf7147fd27d8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py @@ -6,6 +6,7 @@ from typing import Callable, Dict, Iterable, List, MutableMapping, Optional from datahub.ingestion.api.report import SupportsAsObj +from datahub.ingestion.source.common.subtypes import DatasetSubTypes from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection from datahub.ingestion.source.snowflake.snowflake_query import ( @@ -100,6 +101,9 @@ class SnowflakeTable(BaseTable): def is_hybrid(self) -> bool: return self.type is not None and self.type == "HYBRID TABLE" + def get_subtype(self) -> DatasetSubTypes: + return DatasetSubTypes.TABLE + @dataclass class SnowflakeView(BaseView): @@ -109,6 +113,9 @@ class SnowflakeView(BaseView): column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict) is_secure: bool = False + def get_subtype(self) -> DatasetSubTypes: + return DatasetSubTypes.VIEW + @dataclass class SnowflakeSchema: @@ -154,6 +161,9 @@ class SnowflakeStream: column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict) last_altered: Optional[datetime] = None + def get_subtype(self) -> DatasetSubTypes: + return DatasetSubTypes.SNOWFLAKE_STREAM + class _SnowflakeTagCache: def __init__(self) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 0490b323bda9ea..03f83a4e7320ae 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -21,7 +21,6 @@ from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage from datahub.ingestion.source.common.subtypes import ( DatasetContainerSubTypes, - DatasetSubTypes, ) from datahub.ingestion.source.snowflake.constants import ( GENERIC_PERMISSION_ERROR_KEY, @@ -467,7 +466,13 @@ def _process_schema( context=f"{db_name}.{schema_name}", ) - def _process_tags(self, snowflake_schema, schema_name, db_name, domain): + def _process_tags( + self, + snowflake_schema: SnowflakeSchema, + schema_name: str, + db_name: str, + domain: str, + ) -> None: snowflake_schema.tags = self.tag_extractor.get_tags_on_object( schema_name=schema_name, db_name=db_name, domain=domain ) @@ -837,15 +842,7 @@ def gen_dataset_workunits( if dpi_aspect: yield dpi_aspect - subTypes = SubTypes( - typeNames=( - [DatasetSubTypes.SNOWFLAKE_STREAM] - if isinstance(table, SnowflakeStream) - else [DatasetSubTypes.VIEW] - if isinstance(table, SnowflakeView) - else [DatasetSubTypes.TABLE] - ) - ) + subTypes = SubTypes(typeNames=[table.get_subtype()]) yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=subTypes @@ -932,9 +929,9 @@ def get_dataset_properties( "OWNER_ROLE_TYPE": table.owner_role_type, "TABLE_NAME": table.table_name, "BASE_TABLES": table.base_tables, - "STALE_AFTER": table.stale_after.isoformat() - if table.stale_after - else None, + "STALE_AFTER": ( + table.stale_after.isoformat() if table.stale_after else None + ), }.items() if v } diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index a0bd9ce0760bd1..5b1b9b1c2952c3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -352,6 +352,15 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str) ) self.report.sql_aggregator = self.aggregator.report + def _add_default_options(self, sql_config: SQLCommonConfig) -> None: + """Add default SQLAlchemy options. Can be overridden by subclasses to add additional defaults.""" + # Extra default SQLAlchemy option for better connection pooling and threading. + # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow + if sql_config.is_profiling_enabled(): + sql_config.options.setdefault( + "max_overflow", sql_config.profiling.max_workers + ) + @classmethod def test_connection(cls, config_dict: dict) -> TestConnectionReport: test_report = TestConnectionReport() @@ -519,12 +528,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit # Known issue with sqlalchemy https://stackoverflow.com/questions/60804288/pycharm-duplicated-log-for-sqlalchemy-echo-true sqlalchemy_log._add_default_handler = lambda x: None # type: ignore - # Extra default SQLAlchemy option for better connection pooling and threading. - # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow - if sql_config.is_profiling_enabled(): - sql_config.options.setdefault( - "max_overflow", sql_config.profiling.max_workers - ) + self._add_default_options(sql_config) for inspector in self.get_inspectors(): profiler = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index e6319f668ecb8c..c52eceb726955e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -22,6 +22,7 @@ from sqlalchemy.engine import Engine from sqlalchemy.engine.base import Connection from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.pool import QueuePool from sqlalchemy.sql.expression import text from teradatasqlalchemy.dialect import TeradataDialect from teradatasqlalchemy.options import configure @@ -678,6 +679,16 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): if self.config.stateful_ingestion: self.config.stateful_ingestion.remove_stale_metadata = False + def _add_default_options(self, sql_config: SQLCommonConfig) -> None: + """Add Teradata-specific default options""" + super()._add_default_options(sql_config) + if sql_config.is_profiling_enabled(): + # Sqlalchemy uses QueuePool by default however Teradata uses SingletonThreadPool. + # SingletonThreadPool does not support parellel connections. For using profiling, we need to use QueuePool. + # https://docs.sqlalchemy.org/en/20/core/pooling.html#connection-pool-configuration + # https://github.com/Teradata/sqlalchemy-teradata/issues/96 + sql_config.options.setdefault("poolclass", QueuePool) + @classmethod def create(cls, config_dict, ctx): config = TeradataConfig.parse_obj(config_dict) @@ -705,6 +716,7 @@ def get_inspectors(self): # This method can be overridden in the case that you want to dynamically # run on multiple databases. url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") engine = create_engine(url, **self.config.options) with engine.connect() as conn: diff --git a/metadata-ingestion/src/datahub/sdk/__init__.py b/metadata-ingestion/src/datahub/sdk/__init__.py new file mode 100644 index 00000000000000..54bd18c3230476 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/__init__.py @@ -0,0 +1,33 @@ +import warnings + +import datahub.metadata.schema_classes as models +from datahub.errors import ExperimentalWarning, SdkUsageError +from datahub.ingestion.graph.config import DatahubClientConfig +from datahub.metadata.urns import ( + ChartUrn, + ContainerUrn, + CorpGroupUrn, + CorpUserUrn, + DashboardUrn, + DataPlatformInstanceUrn, + DataPlatformUrn, + DatasetUrn, + DomainUrn, + GlossaryTermUrn, + SchemaFieldUrn, + TagUrn, +) +from datahub.sdk.container import Container +from datahub.sdk.dataset import Dataset +from datahub.sdk.main_client import DataHubClient + +warnings.warn( + "The new datahub SDK (e.g. datahub.sdk.*) is experimental. " + "Our typical backwards-compatibility and stability guarantees do not apply to this code. " + "When it's promoted to stable, the import path will change " + "from `from datahub.sdk import ...` to `from datahub import ...`.", + ExperimentalWarning, + stacklevel=2, +) +del warnings +del ExperimentalWarning diff --git a/metadata-ingestion/src/datahub/sdk/_all_entities.py b/metadata-ingestion/src/datahub/sdk/_all_entities.py new file mode 100644 index 00000000000000..04c5fb6045ae61 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/_all_entities.py @@ -0,0 +1,15 @@ +from typing import Dict, List, Type + +from datahub.sdk._entity import Entity +from datahub.sdk.container import Container +from datahub.sdk.dataset import Dataset + +# TODO: Is there a better way to declare this? +ENTITY_CLASSES_LIST: List[Type[Entity]] = [ + Container, + Dataset, +] + +ENTITY_CLASSES: Dict[str, Type[Entity]] = { + cls.get_urn_type().ENTITY_TYPE: cls for cls in ENTITY_CLASSES_LIST +} diff --git a/metadata-ingestion/src/datahub/sdk/_attribution.py b/metadata-ingestion/src/datahub/sdk/_attribution.py new file mode 100644 index 00000000000000..4faf441f58df76 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/_attribution.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import contextlib +from typing import Iterator + +from datahub.utilities.str_enum import StrEnum + + +class KnownAttribution(StrEnum): + INGESTION = "INGESTION" + INGESTION_ALTERNATE = "INGESTION_ALTERNATE" + + UI = "UI" + SDK = "SDK" + + PROPAGATION = "PROPAGATION" + + def is_ingestion(self) -> bool: + return self in ( + KnownAttribution.INGESTION, + KnownAttribution.INGESTION_ALTERNATE, + ) + + +_default_attribution = KnownAttribution.SDK + + +def get_default_attribution() -> KnownAttribution: + return _default_attribution + + +def set_default_attribution(attribution: KnownAttribution) -> None: + global _default_attribution + _default_attribution = attribution + + +@contextlib.contextmanager +def change_default_attribution(attribution: KnownAttribution) -> Iterator[None]: + old_attribution = get_default_attribution() + try: + set_default_attribution(attribution) + yield + finally: + set_default_attribution(old_attribution) + + +def is_ingestion_attribution() -> bool: + return get_default_attribution().is_ingestion() diff --git a/metadata-ingestion/src/datahub/sdk/_entity.py b/metadata-ingestion/src/datahub/sdk/_entity.py new file mode 100644 index 00000000000000..34a088a25949c5 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/_entity.py @@ -0,0 +1,89 @@ +import abc +from typing import List, Optional, Type, Union + +from typing_extensions import Self + +import datahub.metadata.schema_classes as models +from datahub.emitter.mce_builder import Aspect as AspectTypeVar +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.errors import SdkUsageError +from datahub.metadata.urns import Urn +from datahub.utilities.urns._urn_base import _SpecificUrn + + +class Entity: + __slots__ = ("_urn", "_prev_aspects", "_aspects") + + def __init__(self, /, urn: Urn): + # This method is not meant for direct usage. + if type(self) is Entity: + raise SdkUsageError(f"{Entity.__name__} cannot be instantiated directly.") + + assert isinstance(urn, self.get_urn_type()) + self._urn: _SpecificUrn = urn + + # prev_aspects is None means this was created from scratch + self._prev_aspects: Optional[models.AspectBag] = None + self._aspects: models.AspectBag = {} + + @classmethod + def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self: + # If an init method from a subclass adds required fields, it also needs to override this method. + # An alternative approach would call cls.__new__() to bypass the init method, but it's a bit + # too hacky for my taste. + entity = cls(urn=urn) + return entity._init_from_graph(current_aspects) + + def _init_from_graph(self, current_aspects: models.AspectBag) -> Self: + self._prev_aspects = current_aspects + aspect: models._Aspect + for aspect_name, aspect in (current_aspects or {}).items(): # type: ignore + aspect_copy = type(aspect).from_obj(aspect.to_obj()) + self._aspects[aspect_name] = aspect_copy # type: ignore + return self + + @classmethod + @abc.abstractmethod + def get_urn_type(cls) -> Type[_SpecificUrn]: ... + + @property + def urn(self) -> _SpecificUrn: + return self._urn + + def _get_aspect( + self, + aspect_type: Type[AspectTypeVar], + /, + ) -> Optional[AspectTypeVar]: + return self._aspects.get(aspect_type.ASPECT_NAME) # type: ignore + + def _set_aspect(self, value: AspectTypeVar, /) -> None: + self._aspects[value.ASPECT_NAME] = value # type: ignore + + def _setdefault_aspect(self, default_aspect: AspectTypeVar, /) -> AspectTypeVar: + # Similar semantics to dict.setdefault. + if existing_aspect := self._get_aspect(type(default_aspect)): + return existing_aspect + self._set_aspect(default_aspect) + return default_aspect + + def _as_mcps( + self, + change_type: Union[str, models.ChangeTypeClass] = models.ChangeTypeClass.UPSERT, + ) -> List[MetadataChangeProposalWrapper]: + urn_str = str(self.urn) + + mcps = [] + for aspect in self._aspects.values(): + assert isinstance(aspect, models._Aspect) + mcps.append( + MetadataChangeProposalWrapper( + entityUrn=urn_str, + aspect=aspect, + changeType=change_type, + ) + ) + return mcps + + def __repr__(self) -> str: + return f"{self.__class__.__name__}('{self.urn}')" diff --git a/metadata-ingestion/src/datahub/sdk/_shared.py b/metadata-ingestion/src/datahub/sdk/_shared.py new file mode 100644 index 00000000000000..80651115cad5fc --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/_shared.py @@ -0,0 +1,345 @@ +import warnings +from datetime import datetime +from typing import ( + TYPE_CHECKING, + List, + Optional, + Tuple, + Union, +) + +from typing_extensions import TypeAlias + +import datahub.metadata.schema_classes as models +from datahub.emitter.mce_builder import ( + make_ts_millis, + make_user_urn, + parse_ts_millis, + validate_ownership_type, +) +from datahub.emitter.mcp_builder import ContainerKey +from datahub.errors import MultipleSubtypesWarning, SdkUsageError +from datahub.metadata.urns import ( + CorpGroupUrn, + CorpUserUrn, + DataJobUrn, + DataPlatformInstanceUrn, + DataPlatformUrn, + DatasetUrn, + DomainUrn, + GlossaryTermUrn, + OwnershipTypeUrn, + TagUrn, + Urn, +) +from datahub.sdk._entity import Entity +from datahub.utilities.urns.error import InvalidUrnError + +if TYPE_CHECKING: + from datahub.sdk.container import Container + +UrnOrStr: TypeAlias = Union[Urn, str] +DatasetUrnOrStr: TypeAlias = Union[str, DatasetUrn] +DatajobUrnOrStr: TypeAlias = Union[str, DataJobUrn] + +ActorUrn: TypeAlias = Union[CorpUserUrn, CorpGroupUrn] + + +def make_time_stamp(ts: Optional[datetime]) -> Optional[models.TimeStampClass]: + if ts is None: + return None + return models.TimeStampClass(time=make_ts_millis(ts)) + + +def parse_time_stamp(ts: Optional[models.TimeStampClass]) -> Optional[datetime]: + if ts is None: + return None + return parse_ts_millis(ts.time) + + +class HasPlatformInstance(Entity): + __slots__ = () + + def _set_platform_instance( + self, + platform: Union[str, DataPlatformUrn], + instance: Union[None, str, DataPlatformInstanceUrn], + ) -> None: + platform = DataPlatformUrn(platform) + if instance is not None: + try: + instance = DataPlatformInstanceUrn.from_string(instance) + except InvalidUrnError: + if not isinstance( + instance, DataPlatformInstanceUrn + ): # redundant check to make mypy happy + instance = DataPlatformInstanceUrn(platform, instance) + # At this point, instance is either None or a DataPlatformInstanceUrn. + + self._set_aspect( + models.DataPlatformInstanceClass( + platform=platform.urn(), + instance=instance.urn() if instance else None, + ) + ) + + @property + def platform_instance(self) -> Optional[DataPlatformInstanceUrn]: + dataPlatformInstance = self._get_aspect(models.DataPlatformInstanceClass) + if dataPlatformInstance and dataPlatformInstance.instance: + return DataPlatformInstanceUrn.from_string(dataPlatformInstance.instance) + return None + + +class HasSubtype(Entity): + __slots__ = () + + @property + def subtype(self) -> Optional[str]: + subtypes = self._get_aspect(models.SubTypesClass) + if subtypes and subtypes.typeNames: + if len(subtypes.typeNames) > 1: + warnings.warn( + f"The entity {self.urn} has multiple subtypes: {subtypes.typeNames}. " + "Only the first subtype will be considered.", + MultipleSubtypesWarning, + stacklevel=2, + ) + return subtypes.typeNames[0] + return None + + def set_subtype(self, subtype: str) -> None: + self._set_aspect(models.SubTypesClass(typeNames=[subtype])) + + +OwnershipTypeType: TypeAlias = Union[str, OwnershipTypeUrn] +OwnerInputType: TypeAlias = Union[ + str, + ActorUrn, + Tuple[Union[str, ActorUrn], OwnershipTypeType], + models.OwnerClass, +] +OwnersInputType: TypeAlias = List[OwnerInputType] + + +class HasOwnership(Entity): + __slots__ = () + + @staticmethod + def _parse_owner_class(owner: OwnerInputType) -> models.OwnerClass: + if isinstance(owner, models.OwnerClass): + return owner + + owner_type = models.OwnershipTypeClass.TECHNICAL_OWNER + owner_type_urn = None + + if isinstance(owner, tuple): + raw_owner, raw_owner_type = owner + + if isinstance(raw_owner_type, OwnershipTypeUrn): + owner_type = models.OwnershipTypeClass.CUSTOM + owner_type_urn = str(raw_owner_type) + else: + owner_type, owner_type_urn = validate_ownership_type(raw_owner_type) + else: + raw_owner = owner + + if isinstance(raw_owner, str): + # Tricky: this will gracefully handle a user passing in a group urn as a string. + # TODO: is this the right behavior? or should we require a valid urn here? + return models.OwnerClass( + owner=make_user_urn(raw_owner), + type=owner_type, + typeUrn=owner_type_urn, + ) + elif isinstance(raw_owner, Urn): + return models.OwnerClass( + owner=str(raw_owner), + type=owner_type, + typeUrn=owner_type_urn, + ) + else: + raise SdkUsageError( + f"Invalid owner {owner}: {type(owner)} is not a valid owner type" + ) + + # TODO: Return a custom type with deserialized urns, instead of the raw aspect. + # Ideally we'd also use first-class ownership type urns here, not strings. + @property + def owners(self) -> Optional[List[models.OwnerClass]]: + if owners_aspect := self._get_aspect(models.OwnershipClass): + return owners_aspect.owners + return None + + def set_owners(self, owners: OwnersInputType) -> None: + # TODO: add docs on the default parsing + default ownership type + parsed_owners = [self._parse_owner_class(owner) for owner in owners] + self._set_aspect(models.OwnershipClass(owners=parsed_owners)) + + +ContainerInputType: TypeAlias = Union["Container", ContainerKey] + + +class HasContainer(Entity): + __slots__ = () + + def _set_container(self, container: Optional[ContainerInputType]) -> None: + # We need to allow container to be None. It won't happen for datasets much, but + # will be required for root containers. + from datahub.sdk.container import Container + + browse_path: List[Union[str, models.BrowsePathEntryClass]] = [] + if isinstance(container, Container): + container_urn = container.urn.urn() + + parent_browse_path = container._get_aspect(models.BrowsePathsV2Class) + if parent_browse_path is None: + raise SdkUsageError( + "Parent container does not have a browse path, so cannot generate one for its children." + ) + browse_path = [ + *parent_browse_path.path, + models.BrowsePathEntryClass( + id=container_urn, + urn=container_urn, + ), + ] + elif container is not None: + container_urn = container.as_urn() + + browse_path_reversed = [container_urn] + parent_key = container.parent_key() + while parent_key is not None: + browse_path_reversed.append(parent_key.as_urn()) + parent_key = parent_key.parent_key() + if container.instance is not None: + browse_path_reversed.append( + DataPlatformInstanceUrn( + container.platform, container.instance + ).urn() + ) + + browse_path = list(reversed(browse_path_reversed)) + else: + container_urn = None + browse_path = [] + + if container_urn: + self._set_aspect(models.ContainerClass(container=container_urn)) + + self._set_aspect( + models.BrowsePathsV2Class( + path=[ + ( + entry + if isinstance(entry, models.BrowsePathEntryClass) + else models.BrowsePathEntryClass( + id=entry, + urn=entry, + ) + ) + for entry in browse_path + ] + ) + ) + + +TagInputType: TypeAlias = Union[str, TagUrn, models.TagAssociationClass] +TagsInputType: TypeAlias = List[TagInputType] + + +class HasTags(Entity): + __slots__ = () + + # TODO: Return a custom type with deserialized urns, instead of the raw aspect. + @property + def tags(self) -> Optional[List[models.TagAssociationClass]]: + if tags := self._get_aspect(models.GlobalTagsClass): + return tags.tags + return None + + @classmethod + def _parse_tag_association_class( + cls, tag: TagInputType + ) -> models.TagAssociationClass: + if isinstance(tag, models.TagAssociationClass): + return tag + elif isinstance(tag, str): + assert TagUrn.from_string(tag) + return models.TagAssociationClass(tag=str(tag)) + + def set_tags(self, tags: TagsInputType) -> None: + self._set_aspect( + models.GlobalTagsClass( + tags=[self._parse_tag_association_class(tag) for tag in tags] + ) + ) + + +TermInputType: TypeAlias = Union[ + str, GlossaryTermUrn, models.GlossaryTermAssociationClass +] +TermsInputType: TypeAlias = List[TermInputType] + + +class HasTerms(Entity): + __slots__ = () + + # TODO: Return a custom type with deserialized urns, instead of the raw aspect. + @property + def terms(self) -> Optional[List[models.GlossaryTermAssociationClass]]: + if glossary_terms := self._get_aspect(models.GlossaryTermsClass): + return glossary_terms.terms + return None + + @classmethod + def _parse_glossary_term_association_class( + cls, term: TermInputType + ) -> models.GlossaryTermAssociationClass: + if isinstance(term, models.GlossaryTermAssociationClass): + return term + elif isinstance(term, str): + assert GlossaryTermUrn.from_string(term) + return models.GlossaryTermAssociationClass(urn=str(term)) + + @classmethod + def _terms_audit_stamp(self) -> models.AuditStampClass: + return models.AuditStampClass( + time=0, + # TODO figure out what to put here + actor=CorpUserUrn("__ingestion").urn(), + ) + + def set_terms(self, terms: TermsInputType) -> None: + self._set_aspect( + models.GlossaryTermsClass( + terms=[ + self._parse_glossary_term_association_class(term) for term in terms + ], + auditStamp=self._terms_audit_stamp(), + ) + ) + + +DomainInputType: TypeAlias = Union[str, DomainUrn] + + +class HasDomain(Entity): + __slots__ = () + + @property + def domain(self) -> Optional[DomainUrn]: + if domains := self._get_aspect(models.DomainsClass): + if len(domains.domains) > 1: + raise SdkUsageError( + f"The entity has multiple domains set, but only one is supported: {domains.domains}" + ) + elif domains.domains: + domain_str = domains.domains[0] + return DomainUrn.from_string(domain_str) + + return None + + def set_domain(self, domain: DomainInputType) -> None: + domain_urn = DomainUrn.from_string(domain) # basically a type assertion + self._set_aspect(models.DomainsClass(domains=[str(domain_urn)])) diff --git a/metadata-ingestion/src/datahub/sdk/container.py b/metadata-ingestion/src/datahub/sdk/container.py new file mode 100644 index 00000000000000..0555773cd202f3 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/container.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Dict, Optional, Type + +from typing_extensions import Self + +import datahub.metadata.schema_classes as models +from datahub.emitter.mce_builder import ALL_ENV_TYPES +from datahub.emitter.mcp_builder import ( + _INCLUDE_ENV_IN_CONTAINER_PROPERTIES, + ContainerKey, +) +from datahub.errors import SdkUsageError +from datahub.metadata.urns import ( + ContainerUrn, + Urn, +) +from datahub.sdk._entity import Entity +from datahub.sdk._shared import ( + DomainInputType, + HasContainer, + HasDomain, + HasOwnership, + HasPlatformInstance, + HasSubtype, + HasTags, + HasTerms, + OwnersInputType, + TagsInputType, + TermsInputType, + make_time_stamp, + parse_time_stamp, +) + + +class Container( + HasPlatformInstance, + HasSubtype, + HasContainer, + HasOwnership, + HasTags, + HasTerms, + HasDomain, + Entity, +): + __slots__ = () + + @classmethod + def get_urn_type(cls) -> Type[ContainerUrn]: + return ContainerUrn + + def __init__( + self, + /, + # Identity. + container_key: ContainerKey | ContainerUrn, + *, + # Container attributes. + display_name: str, + qualified_name: Optional[str] = None, + description: Optional[str] = None, + external_url: Optional[str] = None, + # TODO: call this custom properties? + extra_properties: Optional[Dict[str, str]] = None, + created: Optional[datetime] = None, + last_modified: Optional[datetime] = None, + # Standard aspects. + subtype: Optional[str] = None, + owners: Optional[OwnersInputType] = None, + tags: Optional[TagsInputType] = None, + terms: Optional[TermsInputType] = None, + domain: Optional[DomainInputType] = None, + ): + if isinstance(container_key, ContainerUrn): + urn = container_key + else: + urn = ContainerUrn.from_string(container_key.as_urn()) + super().__init__(urn) + + # This needs to come first to ensure that the display name is registered. + self._ensure_container_props(name=display_name) + + # TODO: Normal usages should require container key. Only the graph init method can accept an urn. + if isinstance(container_key, ContainerKey): + self._set_platform_instance(container_key.platform, container_key.instance) + + self._set_container(container_key.parent_key()) + + self.set_custom_properties( + { + **container_key.property_dict(), + **(extra_properties or {}), + } + ) + + # Extra validation on the env field. + # In certain cases (mainly for backwards compatibility), the env field will actually + # have a platform instance name. + env = container_key.env if container_key.env in ALL_ENV_TYPES else None + if _INCLUDE_ENV_IN_CONTAINER_PROPERTIES and env is not None: + self._ensure_container_props().env = env + + if description is not None: + self.set_description(description) + if external_url is not None: + self.set_external_url(external_url) + if qualified_name is not None: + self.set_qualified_name(qualified_name) + if created is not None: + self.set_created(created) + if last_modified is not None: + self.set_last_modified(last_modified) + + if subtype is not None: + self.set_subtype(subtype) + if owners is not None: + self.set_owners(owners) + if tags is not None: + self.set_tags(tags) + if terms is not None: + self.set_terms(terms) + if domain is not None: + self.set_domain(domain) + + @classmethod + def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self: + assert isinstance(urn, ContainerUrn) + entity = cls(urn, display_name="__dummy_value__") + return entity._init_from_graph(current_aspects) + + def _ensure_container_props( + self, *, name: Optional[str] = None + ) -> models.ContainerPropertiesClass: + # TODO: Not super happy with this method's implementation, but it's + # internal-only and enforces the constraints that we need. + if name is not None: + return self._setdefault_aspect(models.ContainerPropertiesClass(name=name)) + + props = self._get_aspect(models.ContainerPropertiesClass) + if props is None: + raise SdkUsageError("Containers must have a name.") + return props + + @property + def display_name(self) -> str: + return self._ensure_container_props().name + + def set_display_name(self, value: str) -> None: + self._ensure_container_props(name=value).name = value + + @property + def description(self) -> Optional[str]: + return self._ensure_container_props().description + + def set_description(self, description: str) -> None: + self._ensure_container_props().description = description + + @property + def custom_properties(self) -> Optional[Dict[str, str]]: + return self._ensure_container_props().customProperties + + def set_custom_properties(self, custom_properties: Dict[str, str]) -> None: + # TODO: How do we ensure that the container key props are always retained? + self._ensure_container_props().customProperties = custom_properties + + @property + def external_url(self) -> Optional[str]: + return self._ensure_container_props().externalUrl + + def set_external_url(self, external_url: str) -> None: + self._ensure_container_props().externalUrl = external_url + + @property + def qualified_name(self) -> Optional[str]: + return self._ensure_container_props().qualifiedName + + def set_qualified_name(self, qualified_name: str) -> None: + self._ensure_container_props().qualifiedName = qualified_name + + @property + def created(self) -> Optional[datetime]: + return parse_time_stamp(self._ensure_container_props().created) + + def set_created(self, created: datetime) -> None: + self._ensure_container_props().created = make_time_stamp(created) + + @property + def last_modified(self) -> Optional[datetime]: + return parse_time_stamp(self._ensure_container_props().lastModified) + + def set_last_modified(self, last_modified: datetime) -> None: + self._ensure_container_props().lastModified = make_time_stamp(last_modified) diff --git a/metadata-ingestion/src/datahub/sdk/dataset.py b/metadata-ingestion/src/datahub/sdk/dataset.py new file mode 100644 index 00000000000000..f68b2c0845c803 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/dataset.py @@ -0,0 +1,584 @@ +from __future__ import annotations + +import warnings +from datetime import datetime +from typing import Dict, List, Optional, Tuple, Type, Union + +from typing_extensions import Self, TypeAlias, assert_never + +import datahub.metadata.schema_classes as models +from datahub.cli.cli_utils import first_non_null +from datahub.emitter.mce_builder import DEFAULT_ENV +from datahub.errors import ( + IngestionAttributionWarning, + ItemNotFoundError, + SchemaFieldKeyError, +) +from datahub.ingestion.source.sql.sql_types import resolve_sql_type +from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn, Urn +from datahub.sdk._attribution import is_ingestion_attribution +from datahub.sdk._entity import Entity +from datahub.sdk._shared import ( + ContainerInputType, + DatasetUrnOrStr, + DomainInputType, + HasContainer, + HasDomain, + HasOwnership, + HasPlatformInstance, + HasSubtype, + HasTags, + HasTerms, + OwnersInputType, + TagsInputType, + TermsInputType, + make_time_stamp, + parse_time_stamp, +) + +SchemaFieldInputType: TypeAlias = Union[ + str, + Tuple[str, str], # (name, type) + Tuple[str, str, str], # (name, type, description) + models.SchemaFieldClass, +] +SchemaFieldsInputType: TypeAlias = Union[ + List[SchemaFieldInputType], + models.SchemaMetadataClass, +] + +UpstreamInputType: TypeAlias = Union[ + # Dataset upstream variants. + DatasetUrnOrStr, + models.UpstreamClass, + # Column upstream variants. + models.FineGrainedLineageClass, +] +# Mapping of { downstream_column -> [upstream_columns] } +ColumnLineageMapping: TypeAlias = Dict[str, List[str]] +UpstreamLineageInputType: TypeAlias = Union[ + models.UpstreamLineageClass, + List[UpstreamInputType], + # Combined variant. + # Map of { upstream_dataset -> { downstream_column -> [upstream_column] } } + Dict[DatasetUrnOrStr, ColumnLineageMapping], +] + + +def _parse_upstream_input( + upstream_input: UpstreamInputType, +) -> Union[models.UpstreamClass, models.FineGrainedLineageClass]: + if isinstance(upstream_input, models.UpstreamClass): + return upstream_input + elif isinstance(upstream_input, models.FineGrainedLineageClass): + return upstream_input + elif isinstance(upstream_input, (str, DatasetUrn)): + return models.UpstreamClass( + dataset=str(upstream_input), + type=models.DatasetLineageTypeClass.TRANSFORMED, + ) + else: + assert_never(upstream_input) + + +def _parse_cll_mapping( + *, + upstream: DatasetUrnOrStr, + downstream: DatasetUrnOrStr, + cll_mapping: ColumnLineageMapping, +) -> List[models.FineGrainedLineageClass]: + cll = [] + for downstream_column, upstream_columns in cll_mapping.items(): + cll.append( + models.FineGrainedLineageClass( + upstreamType=models.FineGrainedLineageUpstreamTypeClass.FIELD_SET, + downstreamType=models.FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=[ + SchemaFieldUrn(upstream, upstream_column).urn() + for upstream_column in upstream_columns + ], + downstreams=[SchemaFieldUrn(downstream, downstream_column).urn()], + ) + ) + return cll + + +def _parse_upstream_lineage_input( + upstream_input: UpstreamLineageInputType, downstream_urn: DatasetUrn +) -> models.UpstreamLineageClass: + if isinstance(upstream_input, models.UpstreamLineageClass): + return upstream_input + elif isinstance(upstream_input, list): + upstreams = [_parse_upstream_input(upstream) for upstream in upstream_input] + + # Partition into table and column lineages. + tll = [ + upstream + for upstream in upstreams + if isinstance(upstream, models.UpstreamClass) + ] + cll = [ + upstream + for upstream in upstreams + if not isinstance(upstream, models.UpstreamClass) + ] + + # TODO: check that all things in cll are also in tll + return models.UpstreamLineageClass(upstreams=tll, fineGrainedLineages=cll) + elif isinstance(upstream_input, dict): + tll = [] + cll = [] + for dataset_urn, column_lineage in upstream_input.items(): + tll.append( + models.UpstreamClass( + dataset=str(dataset_urn), + type=models.DatasetLineageTypeClass.TRANSFORMED, + ) + ) + cll.extend( + _parse_cll_mapping( + upstream=dataset_urn, + downstream=downstream_urn, + cll_mapping=column_lineage, + ) + ) + + return models.UpstreamLineageClass(upstreams=tll, fineGrainedLineages=cll) + else: + assert_never(upstream_input) + + +class SchemaField: + __slots__ = ("_parent", "_field_path") + + def __init__(self, parent: Dataset, field_path: str): + self._parent = parent + self._field_path = field_path + + def _base_schema_field(self) -> models.SchemaFieldClass: + # This must exist - if it doesn't, we've got a larger bug. + schema_dict = self._parent._schema_dict() + return schema_dict[self._field_path] + + def _get_editable_schema_field( + self, + ) -> Optional[models.EditableSchemaFieldInfoClass]: + # This method does not make any mutations. + editable_schema = self._parent._get_aspect(models.EditableSchemaMetadataClass) + if editable_schema is None: + return None + for field in editable_schema.editableSchemaFieldInfo: + if field.fieldPath == self._field_path: + return field + return None + + def _ensure_editable_schema_field(self) -> models.EditableSchemaFieldInfoClass: + if is_ingestion_attribution(): + warnings.warn( + "This method should not be used in ingestion mode.", + IngestionAttributionWarning, + stacklevel=2, + ) + editable_schema = self._parent._setdefault_aspect( + models.EditableSchemaMetadataClass(editableSchemaFieldInfo=[]) + ) + for field in editable_schema.editableSchemaFieldInfo: + if field.fieldPath == self._field_path: + return field + + # If we don't have an entry for this field yet, create one. + field = models.EditableSchemaFieldInfoClass(fieldPath=self._field_path) + editable_schema.editableSchemaFieldInfo.append(field) + return field + + @property + def field_path(self) -> str: + return self._field_path + + @property + def mapped_type(self) -> models.SchemaFieldDataTypeClass: + return self._base_schema_field().type + + @property + def native_type(self) -> str: + return self._base_schema_field().nativeDataType + + # TODO expose nullability and primary/foreign key details + + @property + def description(self) -> Optional[str]: + editable_field = self._get_editable_schema_field() + return first_non_null( + [ + editable_field.description if editable_field is not None else None, + self._base_schema_field().description, + ] + ) + + def set_description(self, description: str) -> None: + if is_ingestion_attribution(): + editable_field = self._get_editable_schema_field() + if editable_field and editable_field.description is not None: + warnings.warn( + "The field description will be hidden by UI-based edits. " + "Change the edit mode to OVERWRITE_UI to override this behavior.", + category=IngestionAttributionWarning, + stacklevel=2, + ) + + self._base_schema_field().description = description + else: + self._ensure_editable_schema_field().description = description + + @property + def tags(self) -> Optional[List[models.TagAssociationClass]]: + # Tricky: if either has a non-null globalTags, this will not return None. + tags = None + + if (base_tags := self._base_schema_field().globalTags) is not None: + tags = tags or [] + tags.extend(base_tags.tags) + + if editable_field := self._get_editable_schema_field(): + if (editable_tags := editable_field.globalTags) is not None: + tags = tags or [] + tags.extend(editable_tags.tags) + + return tags + + def set_tags(self, tags: TagsInputType) -> None: + parsed_tags = [self._parent._parse_tag_association_class(tag) for tag in tags] + + if is_ingestion_attribution(): + editable_field = self._get_editable_schema_field() + if editable_field and editable_field.globalTags: + warnings.warn( + "Overwriting non-ingestion tags from ingestion is an anti-pattern.", + category=IngestionAttributionWarning, + stacklevel=2, + ) + editable_field.globalTags = None + + self._base_schema_field().globalTags = models.GlobalTagsClass( + tags=parsed_tags + ) + else: + base_field = self._base_schema_field() + if base_field.globalTags: + base_field.globalTags = None + + self._ensure_editable_schema_field().globalTags = models.GlobalTagsClass( + tags=parsed_tags + ) + + @property + def terms(self) -> Optional[List[models.GlossaryTermAssociationClass]]: + # TODO: Basically the same implementation as tags - can we share code? + terms = None + + if (base_terms := self._base_schema_field().glossaryTerms) is not None: + terms = terms or [] + terms.extend(base_terms.terms) + + if editable_field := self._get_editable_schema_field(): + if (editable_terms := editable_field.glossaryTerms) is not None: + terms = terms or [] + terms.extend(editable_terms.terms) + + return terms + + def set_terms(self, terms: List[models.GlossaryTermAssociationClass]) -> None: + parsed_terms = [ + self._parent._parse_glossary_term_association_class(term) for term in terms + ] + + if is_ingestion_attribution(): + editable_field = self._get_editable_schema_field() + if editable_field and editable_field.glossaryTerms: + warnings.warn( + "Overwriting non-ingestion terms from ingestion is an anti-pattern.", + category=IngestionAttributionWarning, + stacklevel=2, + ) + editable_field.glossaryTerms = None + + self._base_schema_field().glossaryTerms = models.GlossaryTermsClass( + terms=parsed_terms, + auditStamp=self._parent._terms_audit_stamp(), + ) + else: + base_field = self._base_schema_field() + if base_field.glossaryTerms: + base_field.glossaryTerms = None + + self._ensure_editable_schema_field().glossaryTerms = ( + models.GlossaryTermsClass( + terms=parsed_terms, + auditStamp=self._parent._terms_audit_stamp(), + ) + ) + + +class Dataset( + HasPlatformInstance, + HasSubtype, + HasContainer, + HasOwnership, + HasTags, + HasTerms, + HasDomain, + Entity, +): + __slots__ = () + + @classmethod + def get_urn_type(cls) -> Type[DatasetUrn]: + return DatasetUrn + + def __init__( + self, + *, + # Identity. + platform: str, + name: str, + platform_instance: Optional[str] = None, + env: str = DEFAULT_ENV, + # Dataset properties. + description: Optional[str] = None, + display_name: Optional[str] = None, + qualified_name: Optional[str] = None, + external_url: Optional[str] = None, + custom_properties: Optional[Dict[str, str]] = None, + created: Optional[datetime] = None, + last_modified: Optional[datetime] = None, + # Standard aspects. + subtype: Optional[str] = None, + container: Optional[ContainerInputType] = None, + owners: Optional[OwnersInputType] = None, + tags: Optional[TagsInputType] = None, + terms: Optional[TermsInputType] = None, + # TODO structured_properties + domain: Optional[DomainInputType] = None, + # Dataset-specific aspects. + schema: Optional[SchemaFieldsInputType] = None, + upstreams: Optional[models.UpstreamLineageClass] = None, + ): + urn = DatasetUrn.create_from_ids( + platform_id=platform, + table_name=name, + platform_instance=platform_instance, + env=env, + ) + super().__init__(urn) + + self._set_platform_instance(urn.platform, platform_instance) + + if schema is not None: + self._set_schema(schema) + if upstreams is not None: + self.set_upstreams(upstreams) + + if description is not None: + self.set_description(description) + if display_name is not None: + self.set_display_name(display_name) + if qualified_name is not None: + self.set_qualified_name(qualified_name) + if external_url is not None: + self.set_external_url(external_url) + if custom_properties is not None: + self.set_custom_properties(custom_properties) + if created is not None: + self.set_created(created) + if last_modified is not None: + self.set_last_modified(last_modified) + + if subtype is not None: + self.set_subtype(subtype) + if container is not None: + self._set_container(container) + if owners is not None: + self.set_owners(owners) + if tags is not None: + self.set_tags(tags) + if terms is not None: + self.set_terms(terms) + if domain is not None: + self.set_domain(domain) + + @classmethod + def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self: + assert isinstance(urn, DatasetUrn) + entity = cls( + platform=urn.platform, + name=urn.name, + env=urn.env, + ) + return entity._init_from_graph(current_aspects) + + @property + def urn(self) -> DatasetUrn: + return self._urn # type: ignore + + def _ensure_dataset_props(self) -> models.DatasetPropertiesClass: + return self._setdefault_aspect(models.DatasetPropertiesClass()) + + def _get_editable_props(self) -> Optional[models.EditableDatasetPropertiesClass]: + return self._get_aspect(models.EditableDatasetPropertiesClass) + + def _ensure_editable_props(self) -> models.EditableDatasetPropertiesClass: + # Note that most of the fields in this aspect are not used. + # The only one that's relevant for us is the description. + return self._setdefault_aspect(models.EditableDatasetPropertiesClass()) + + @property + def description(self) -> Optional[str]: + editable_props = self._get_editable_props() + return first_non_null( + [ + editable_props.description if editable_props is not None else None, + self._ensure_dataset_props().description, + ] + ) + + def set_description(self, description: str) -> None: + if is_ingestion_attribution(): + editable_props = self._get_editable_props() + if editable_props is not None and editable_props.description is not None: + warnings.warn( + "Overwriting non-ingestion description from ingestion is an anti-pattern.", + category=IngestionAttributionWarning, + stacklevel=2, + ) + # Force the ingestion description to show up. + editable_props.description = None + + self._ensure_dataset_props().description = description + else: + self._ensure_editable_props().description = description + + @property + def display_name(self) -> Optional[str]: + return self._ensure_dataset_props().name + + def set_display_name(self, display_name: str) -> None: + self._ensure_dataset_props().name = display_name + + @property + def qualified_name(self) -> Optional[str]: + return self._ensure_dataset_props().qualifiedName + + def set_qualified_name(self, qualified_name: str) -> None: + self._ensure_dataset_props().qualifiedName = qualified_name + + @property + def external_url(self) -> Optional[str]: + return self._ensure_dataset_props().externalUrl + + def set_external_url(self, external_url: str) -> None: + self._ensure_dataset_props().externalUrl = external_url + + @property + def custom_properties(self) -> Dict[str, str]: + return self._ensure_dataset_props().customProperties + + def set_custom_properties(self, custom_properties: Dict[str, str]) -> None: + self._ensure_dataset_props().customProperties = custom_properties + + @property + def created(self) -> Optional[datetime]: + return parse_time_stamp(self._ensure_dataset_props().created) + + def set_created(self, created: datetime) -> None: + self._ensure_dataset_props().created = make_time_stamp(created) + + @property + def last_modified(self) -> Optional[datetime]: + return parse_time_stamp(self._ensure_dataset_props().lastModified) + + def set_last_modified(self, last_modified: datetime) -> None: + self._ensure_dataset_props().lastModified = make_time_stamp(last_modified) + + def _schema_dict(self) -> Dict[str, models.SchemaFieldClass]: + schema_metadata = self._get_aspect(models.SchemaMetadataClass) + if schema_metadata is None: + raise ItemNotFoundError(f"Schema is not set for dataset {self.urn}") + return {field.fieldPath: field for field in schema_metadata.fields} + + @property + def schema(self) -> List[SchemaField]: + # TODO: Add some caching here to avoid iterating over the schema every time. + schema_dict = self._schema_dict() + return [SchemaField(self, field_path) for field_path in schema_dict] + + def _parse_schema_field_input( + self, schema_field_input: SchemaFieldInputType + ) -> models.SchemaFieldClass: + if isinstance(schema_field_input, models.SchemaFieldClass): + return schema_field_input + elif isinstance(schema_field_input, tuple): + # Support (name, type) and (name, type, description) forms + if len(schema_field_input) == 2: + name, field_type = schema_field_input + description = None + elif len(schema_field_input) == 3: + name, field_type, description = schema_field_input + else: + assert_never(schema_field_input) + return models.SchemaFieldClass( + fieldPath=name, + type=models.SchemaFieldDataTypeClass( + resolve_sql_type( + field_type, + platform=self.urn.get_data_platform_urn().platform_name, + ) + or models.NullTypeClass() + ), + nativeDataType=field_type, + description=description, + ) + elif isinstance(schema_field_input, str): + # TODO: Not sure this branch makes sense - we should probably just require types? + return models.SchemaFieldClass( + fieldPath=schema_field_input, + type=models.SchemaFieldDataTypeClass(models.NullTypeClass()), + nativeDataType="unknown", + description=None, + ) + else: + assert_never(schema_field_input) + + def _set_schema(self, schema: SchemaFieldsInputType) -> None: + # This method is not public. Ingestion/restatement users should be setting + # the schema via the constructor. SDK users that got a dataset from the graph + # probably shouldn't be adding/removing fields ad-hoc. The field-level mutators + # can be used instead. + if isinstance(schema, models.SchemaMetadataClass): + self._set_aspect(schema) + else: + parsed_schema = [self._parse_schema_field_input(field) for field in schema] + self._set_aspect( + models.SchemaMetadataClass( + fields=parsed_schema, + # The rest of these fields are not used, and so we can set them to dummy/default values. + schemaName="", + platform=self.urn.platform, + version=0, + hash="", + platformSchema=models.SchemalessClass(), + ) + ) + + def __getitem__(self, field_path: str) -> SchemaField: + # TODO: Automatically deal with field path v2? + schema_dict = self._schema_dict() + if field_path not in schema_dict: + raise SchemaFieldKeyError(f"Field {field_path} not found in schema") + return SchemaField(self, field_path) + + @property + def upstreams(self) -> Optional[models.UpstreamLineageClass]: + return self._get_aspect(models.UpstreamLineageClass) + + def set_upstreams(self, upstreams: UpstreamLineageInputType) -> None: + self._set_aspect(_parse_upstream_lineage_input(upstreams, self.urn)) diff --git a/metadata-ingestion/src/datahub/sdk/entity_client.py b/metadata-ingestion/src/datahub/sdk/entity_client.py new file mode 100644 index 00000000000000..99dc7f9a280aba --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/entity_client.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import warnings +from typing import TYPE_CHECKING, Union, overload + +import datahub.metadata.schema_classes as models +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.errors import IngestionAttributionWarning, ItemNotFoundError, SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.urns import ( + ContainerUrn, + DatasetUrn, + Urn, +) +from datahub.sdk._all_entities import ENTITY_CLASSES +from datahub.sdk._entity import Entity +from datahub.sdk._shared import UrnOrStr +from datahub.sdk.container import Container +from datahub.sdk.dataset import Dataset + +if TYPE_CHECKING: + from datahub.sdk.main_client import DataHubClient + + +class EntityClient: + def __init__(self, client: DataHubClient): + self._client = client + + # TODO: Make all of these methods sync by default. + + @property + def _graph(self) -> DataHubGraph: + return self._client._graph + + @overload + def get(self, urn: ContainerUrn) -> Container: ... + @overload + def get(self, urn: DatasetUrn) -> Dataset: ... + @overload + def get(self, urn: Union[Urn, str]) -> Entity: ... + def get(self, urn: UrnOrStr) -> Entity: + if not isinstance(urn, Urn): + urn = Urn.from_string(urn) + + # TODO: add error handling around this with a suggested alternative if not yet supported + EntityClass = ENTITY_CLASSES[urn.entity_type] + + if not self._graph.exists(str(urn)): + raise ItemNotFoundError(f"Entity {urn} not found") + + aspects = self._graph.get_entity_semityped(str(urn)) + + # TODO: save the timestamp so we can use If-Unmodified-Since on the updates + return EntityClass._new_from_graph(urn, aspects) + + def create(self, entity: Entity) -> None: + mcps = [] + + if self._graph.exists(str(entity.urn)): + raise SdkUsageError( + f"Entity {entity.urn} already exists. Use client.entities.upsert() to update it." + ) + + # Extra safety check: by putting this first, we can ensure that + # the request fails if the entity already exists. + mcps.append( + MetadataChangeProposalWrapper( + entityUrn=str(entity.urn), + aspect=entity.urn.to_key_aspect(), + changeType=models.ChangeTypeClass.CREATE_ENTITY, + ) + ) + mcps.extend(entity._as_mcps(models.ChangeTypeClass.CREATE)) + + self._graph.emit_mcps(mcps) + + def upsert(self, entity: Entity) -> None: + if entity._prev_aspects is None and self._graph.exists(str(entity.urn)): + warnings.warn( + f"The entity {entity.urn} already exists. This operation will partially overwrite the existing entity.", + IngestionAttributionWarning, + stacklevel=2, + ) + # TODO: If there are no previous aspects but the entity exists, should we delete aspects that are not present here? + + mcps = entity._as_mcps(models.ChangeTypeClass.UPSERT) + self._graph.emit_mcps(mcps) + + def update(self, entity: Union[Entity, MetadataPatchProposal]) -> None: + if isinstance(entity, MetadataPatchProposal): + return self._update_patch(entity) + + if entity._prev_aspects is None: + raise SdkUsageError( + f"For entities created via {entity.__class__.__name__}(...), use client.entities.create() or client.entities.upsert() instead" + ) + + # TODO: respect If-Unmodified-Since? + # -> probably add a "mode" parameter that can be "update" (e.g. if not modified) or "update_force" + + mcps = entity._as_mcps(models.ChangeTypeClass.UPSERT) + self._graph.emit_mcps(mcps) + + def _update_patch( + self, updater: MetadataPatchProposal, check_exists: bool = True + ) -> None: + if check_exists and not self._graph.exists(updater.urn): + raise SdkUsageError( + f"Entity {updater.urn} does not exist, and hence cannot be updated. " + "You can bypass this check by setting check_exists=False." + ) + + mcps = updater.build() + self._graph.emit_mcps(mcps) diff --git a/metadata-ingestion/src/datahub/sdk/main_client.py b/metadata-ingestion/src/datahub/sdk/main_client.py new file mode 100644 index 00000000000000..7243114192a9dc --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/main_client.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from typing import Optional, overload + +from datahub.errors import SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph, get_default_graph +from datahub.ingestion.graph.config import DatahubClientConfig +from datahub.sdk.entity_client import EntityClient +from datahub.sdk.resolver_client import ResolverClient + + +class DataHubClient: + @overload + def __init__(self, *, server: str, token: Optional[str] = None): ... + @overload + def __init__(self, *, config: DatahubClientConfig): ... + @overload + def __init__(self, *, graph: DataHubGraph): ... + def __init__( + self, + *, + server: Optional[str] = None, + token: Optional[str] = None, + graph: Optional[DataHubGraph] = None, + config: Optional[DatahubClientConfig] = None, + ): + if server is not None: + if config is not None: + raise SdkUsageError("Cannot specify both server and config") + if graph is not None: + raise SdkUsageError("Cannot specify both server and graph") + graph = DataHubGraph(config=DatahubClientConfig(server=server, token=token)) + elif config is not None: + if graph is not None: + raise SdkUsageError("Cannot specify both config and graph") + graph = DataHubGraph(config=config) + elif graph is None: + raise SdkUsageError("Must specify either server, config, or graph") + + self._graph = graph + + @classmethod + def from_env(cls) -> "DataHubClient": + # Inspired by the DockerClient.from_env() method. + # TODO: This one also reads from ~/.datahubenv, so the "from_env" name might be a bit confusing. + # That file is part of the "environment", but is not a traditional "env variable". + graph = get_default_graph() + return cls(graph=graph) + + @property + def entities(self) -> EntityClient: + return EntityClient(self) + + @property + def resolve(self) -> ResolverClient: + return ResolverClient(self) diff --git a/metadata-ingestion/src/datahub/sdk/resolver_client.py b/metadata-ingestion/src/datahub/sdk/resolver_client.py new file mode 100644 index 00000000000000..dae2f61a918dd0 --- /dev/null +++ b/metadata-ingestion/src/datahub/sdk/resolver_client.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Optional, overload + +from datahub.errors import ItemNotFoundError, MultipleItemsFoundError, SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.urns import ( + CorpUserUrn, + DomainUrn, + GlossaryTermUrn, +) + +if TYPE_CHECKING: + from datahub.sdk.main_client import DataHubClient + + +class ResolverClient: + def __init__(self, client: DataHubClient): + self._client = client + + # TODO: add caching to this method + + @property + def _graph(self) -> DataHubGraph: + return self._client._graph + + def domain(self, *, name: str) -> DomainUrn: + urn_str = self._graph.get_domain_urn_by_name(name) + if urn_str is None: + raise ItemNotFoundError(f"Domain with name {name} not found") + return DomainUrn.from_string(urn_str) + + @overload + def user(self, *, name: str) -> CorpUserUrn: ... + @overload + def user(self, *, email: str) -> CorpUserUrn: ... + def user( + self, *, name: Optional[str] = None, email: Optional[str] = None + ) -> CorpUserUrn: + filter_explanation: str + filters = [] + if name is not None: + if email is not None: + raise SdkUsageError("Cannot specify both name and email for auto_user") + # TODO: do we filter on displayName or fullName? + filter_explanation = f"with name {name}" + filters.append( + { + "field": "fullName", + "values": [name], + "condition": "EQUAL", + } + ) + elif email is not None: + filter_explanation = f"with email {email}" + filters.append( + { + "field": "email", + "values": [email], + "condition": "EQUAL", + } + ) + else: + raise SdkUsageError("Must specify either name or email for auto_user") + + users = list( + self._graph.get_urns_by_filter( + entity_types=[CorpUserUrn.ENTITY_TYPE], + extraFilters=filters, + ) + ) + if len(users) == 0: + # TODO: In auto methods, should we just create the user/domain/etc if it doesn't exist? + raise ItemNotFoundError(f"User {filter_explanation} not found") + elif len(users) > 1: + raise MultipleItemsFoundError( + f"Multiple users found {filter_explanation}: {users}" + ) + else: + return CorpUserUrn.from_string(users[0]) + + def term(self, *, name: str) -> GlossaryTermUrn: + # TODO: Add some limits on the graph fetch + terms = list( + self._graph.get_urns_by_filter( + entity_types=[GlossaryTermUrn.ENTITY_TYPE], + extraFilters=[ + { + "field": "id", + "values": [name], + "condition": "EQUAL", + } + ], + ) + ) + if len(terms) == 0: + raise ItemNotFoundError(f"Term with name {name} not found") + elif len(terms) > 1: + raise SdkUsageError(f"Multiple terms found with name {name}: {terms}") + else: + return GlossaryTermUrn.from_string(terms[0]) diff --git a/metadata-ingestion/src/datahub/sql_parsing/split_statements.py b/metadata-ingestion/src/datahub/sql_parsing/split_statements.py index 42dda4e62158b0..0cb402d5fa1925 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/split_statements.py +++ b/metadata-ingestion/src/datahub/sql_parsing/split_statements.py @@ -1,6 +1,10 @@ import re from enum import Enum -from typing import Generator, List, Tuple +from typing import Iterator, List, Tuple + +SELECT_KEYWORD = "SELECT" +CASE_KEYWORD = "CASE" +END_KEYWORD = "END" CONTROL_FLOW_KEYWORDS = [ "GO", @@ -9,18 +13,36 @@ "BEGIN", r"END\w+TRY", r"END\w+CATCH", - "END", + # This isn't strictly correct, but we assume that IF | (condition) | (block) should all be split up + # This mainly ensures that IF statements don't get tacked onto the previous statement incorrectly + "IF", + # For things like CASE, END does not mean the end of a statement. + # We have special handling for this. + END_KEYWORD, + # "ELSE", # else is also valid in CASE, so we we can't use it here. ] # There's an exception to this rule, which is when the statement -# is preceeded by a CTE. -FORCE_NEW_STATEMENT_KEYWORDS = [ +# is preceded by a CTE. For those, we have to check if the character +# before this is a ")". +NEW_STATEMENT_KEYWORDS = [ # SELECT is used inside queries as well, so we can't include it here. + "CREATE", "INSERT", "UPDATE", "DELETE", "MERGE", ] +STRICT_NEW_STATEMENT_KEYWORDS = [ + # For these keywords, a SELECT following it does indicate a new statement. + "DROP", + "TRUNCATE", +] + + +class _AlreadyIncremented(Exception): + # Using exceptions for control flow isn't great - but the code is clearer so it's fine. + pass class ParserState(Enum): @@ -30,134 +52,199 @@ class ParserState(Enum): MULTILINE_COMMENT = 4 -def _is_keyword_at_position(sql: str, pos: int, keyword: str) -> bool: - """ - Check if a keyword exists at the given position using regex word boundaries. - """ - if pos + len(keyword) > len(sql): - return False +class _StatementSplitter: + def __init__(self, sql: str): + self.sql = sql - # If we're not at a word boundary, we can't generate a keyword. - if pos > 0 and not ( - bool(re.match(r"\w\W", sql[pos - 1 : pos + 1])) - or bool(re.match(r"\W\w", sql[pos - 1 : pos + 1])) - ): - return False + # Main parser state. + self.i = 0 + self.state = ParserState.NORMAL + self.current_statement: List[str] = [] - pattern = rf"^{re.escape(keyword)}\b" - match = re.match(pattern, sql[pos:], re.IGNORECASE) - return bool(match) + # Additional parser state. + # If we see a SELECT, should we start a new statement? + # If we previously saw a drop/truncate/etc, a SELECT does mean a new statement. + # But if we're in a select/create/etc, a select could just be a subquery. + self.does_select_mean_new_statement = False -def _look_ahead_for_keywords( - sql: str, pos: int, keywords: List[str] -) -> Tuple[bool, str, int]: - """ - Look ahead for SQL keywords at the current position. - """ + # The END keyword terminates CASE and BEGIN blocks. + # We need to match the CASE statements with END blocks to determine + # what a given END is closing. + self.current_case_statements = 0 - for keyword in keywords: - if _is_keyword_at_position(sql, pos, keyword): - return True, keyword, len(keyword) - return False, "", 0 + def _is_keyword_at_position(self, pos: int, keyword: str) -> bool: + """ + Check if a keyword exists at the given position using regex word boundaries. + """ + sql = self.sql + if pos + len(keyword) > len(sql): + return False -def split_statements(sql: str) -> Generator[str, None, None]: - """ - Split T-SQL code into individual statements, handling various SQL constructs. - """ - if not sql or not sql.strip(): - return + # If we're not at a word boundary, we can't generate a keyword. + if pos > 0 and not ( + bool(re.match(r"\w\W", sql[pos - 1 : pos + 1])) + or bool(re.match(r"\W\w", sql[pos - 1 : pos + 1])) + ): + return False + + pattern = rf"^{re.escape(keyword)}\b" + match = re.match(pattern, sql[pos:], re.IGNORECASE) + return bool(match) - current_statement: List[str] = [] - state = ParserState.NORMAL - i = 0 + def _look_ahead_for_keywords(self, keywords: List[str]) -> Tuple[bool, str, int]: + """ + Look ahead for SQL keywords at the current position. + """ - def yield_if_complete() -> Generator[str, None, None]: - statement = "".join(current_statement).strip() + for keyword in keywords: + if self._is_keyword_at_position(self.i, keyword): + return True, keyword, len(keyword) + return False, "", 0 + + def _yield_if_complete(self) -> Iterator[str]: + statement = "".join(self.current_statement).strip() if statement: + # Subtle - to avoid losing full whitespace, they get merged into the next statement. yield statement - current_statement.clear() - - prev_real_char = "\0" # the most recent non-whitespace, non-comment character - while i < len(sql): - c = sql[i] - next_char = sql[i + 1] if i < len(sql) - 1 else "\0" - - if state == ParserState.NORMAL: - if c == "'": - state = ParserState.STRING - current_statement.append(c) - prev_real_char = c - elif c == "-" and next_char == "-": - state = ParserState.COMMENT - current_statement.append(c) - current_statement.append(next_char) - i += 1 - elif c == "/" and next_char == "*": - state = ParserState.MULTILINE_COMMENT - current_statement.append(c) - current_statement.append(next_char) - i += 1 - else: - most_recent_real_char = prev_real_char - if not c.isspace(): + self.current_statement.clear() + + # Reset current_statement-specific state. + self.does_select_mean_new_statement = False + if self.current_case_statements != 0: + breakpoint() + self.current_case_statements = 0 + + def process(self) -> Iterator[str]: + if not self.sql or not self.sql.strip(): + return + + prev_real_char = "\0" # the most recent non-whitespace, non-comment character + while self.i < len(self.sql): + c = self.sql[self.i] + next_char = self.sql[self.i + 1] if self.i < len(self.sql) - 1 else "\0" + + if self.state == ParserState.NORMAL: + if c == "'": + self.state = ParserState.STRING + self.current_statement.append(c) prev_real_char = c - - is_control_keyword, keyword, keyword_len = _look_ahead_for_keywords( - sql, i, keywords=CONTROL_FLOW_KEYWORDS - ) - if is_control_keyword: - # Yield current statement if any - yield from yield_if_complete() - # Yield keyword as its own statement - yield keyword - i += keyword_len - continue - - ( - is_force_new_statement_keyword, - keyword, - keyword_len, - ) = _look_ahead_for_keywords( - sql, i, keywords=FORCE_NEW_STATEMENT_KEYWORDS - ) - if ( - is_force_new_statement_keyword and most_recent_real_char != ")" - ): # usually we'd have a close paren that closes a CTE - # Force termination of current statement - yield from yield_if_complete() - - current_statement.append(keyword) - i += keyword_len - continue - - elif c == ";": - yield from yield_if_complete() + elif c == "-" and next_char == "-": + self.state = ParserState.COMMENT + self.current_statement.append(c) + self.current_statement.append(next_char) + self.i += 1 + elif c == "/" and next_char == "*": + self.state = ParserState.MULTILINE_COMMENT + self.current_statement.append(c) + self.current_statement.append(next_char) + self.i += 1 else: - current_statement.append(c) - - elif state == ParserState.STRING: - current_statement.append(c) - if c == "'" and next_char == "'": - current_statement.append(next_char) - i += 1 - elif c == "'": - state = ParserState.NORMAL - - elif state == ParserState.COMMENT: - current_statement.append(c) - if c == "\n": - state = ParserState.NORMAL - - elif state == ParserState.MULTILINE_COMMENT: - current_statement.append(c) - if c == "*" and next_char == "/": - current_statement.append(next_char) - i += 1 - state = ParserState.NORMAL - - i += 1 - - # Handle the last statement - yield from yield_if_complete() + most_recent_real_char = prev_real_char + if not c.isspace(): + prev_real_char = c + + try: + yield from self._process_normal( + most_recent_real_char=most_recent_real_char + ) + except _AlreadyIncremented: + # Skip the normal i += 1 step. + continue + + elif self.state == ParserState.STRING: + self.current_statement.append(c) + if c == "'" and next_char == "'": + self.current_statement.append(next_char) + self.i += 1 + elif c == "'": + self.state = ParserState.NORMAL + + elif self.state == ParserState.COMMENT: + self.current_statement.append(c) + if c == "\n": + self.state = ParserState.NORMAL + + elif self.state == ParserState.MULTILINE_COMMENT: + self.current_statement.append(c) + if c == "*" and next_char == "/": + self.current_statement.append(next_char) + self.i += 1 + self.state = ParserState.NORMAL + + self.i += 1 + + # Handle the last statement + yield from self._yield_if_complete() + + def _process_normal(self, most_recent_real_char: str) -> Iterator[str]: + c = self.sql[self.i] + + if self._is_keyword_at_position(self.i, CASE_KEYWORD): + self.current_case_statements += 1 + + is_control_keyword, keyword, keyword_len = self._look_ahead_for_keywords( + keywords=CONTROL_FLOW_KEYWORDS + ) + if ( + is_control_keyword + and keyword == END_KEYWORD + and self.current_case_statements > 0 + ): + # If we're closing a CASE statement with END, we can just decrement the counter and continue. + self.current_case_statements -= 1 + elif is_control_keyword: + # Yield current statement if any + yield from self._yield_if_complete() + # Yield keyword as its own statement + yield keyword + self.i += keyword_len + self.does_select_mean_new_statement = True + raise _AlreadyIncremented() + + ( + is_strict_new_statement_keyword, + keyword, + keyword_len, + ) = self._look_ahead_for_keywords(keywords=STRICT_NEW_STATEMENT_KEYWORDS) + if is_strict_new_statement_keyword: + yield from self._yield_if_complete() + self.current_statement.append(keyword) + self.i += keyword_len + self.does_select_mean_new_statement = True + raise _AlreadyIncremented() + + ( + is_force_new_statement_keyword, + keyword, + keyword_len, + ) = self._look_ahead_for_keywords( + keywords=( + NEW_STATEMENT_KEYWORDS + + ([SELECT_KEYWORD] if self.does_select_mean_new_statement else []) + ), + ) + if ( + is_force_new_statement_keyword and most_recent_real_char != ")" + ): # usually we'd have a close paren that closes a CTE + # Force termination of current statement + yield from self._yield_if_complete() + + self.current_statement.append(keyword) + self.i += keyword_len + raise _AlreadyIncremented() + + if c == ";": + yield from self._yield_if_complete() + else: + self.current_statement.append(c) + + +def split_statements(sql: str) -> Iterator[str]: + """ + Split T-SQL code into individual statements, handling various SQL constructs. + """ + + splitter = _StatementSplitter(sql) + yield from splitter.process() diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json index 4b3bd16deceeae..3e9a889640c973 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json @@ -4240,295 +4240,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_28-00_01_52-5vkne0", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)", - "changeType": "UPSERT", - "aspectName": "schemaMetadata", - "aspect": { - "json": { - "schemaName": "test_db.test_schema.stream_1", - "platform": "urn:li:dataPlatform:snowflake", - "version": 0, - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "hash": "", - "platformSchema": { - "com.linkedin.schema.MySqlDDL": { - "tableSchema": "" - } - }, - "fields": [ - { - "fieldPath": "col_1", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.NumberType": {} - } - }, - "nativeDataType": "NUMBER(38,0)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "col_2", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(255)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "col_3", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(255)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "col_4", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(255)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "col_5", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(255)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "col_6", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(255)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "col_7", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(255)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "col_8", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(255)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "col_9", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(255)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "col_10", - "nullable": false, - "description": "Comment for column", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(255)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "metadata$action", - "nullable": false, - "description": "Type of DML operation (INSERT/DELETE)", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "nativeDataType": "VARCHAR(10)", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "metadata$isupdate", - "nullable": false, - "description": "Whether row is from UPDATE operation", - "type": { - "type": { - "com.linkedin.schema.BooleanType": {} - } - }, - "nativeDataType": "BOOLEAN", - "recursive": false, - "isPartOfKey": false - }, - { - "fieldPath": "metadata$row_id", - "nullable": false, - "description": "Unique row identifier", - "type": { - "type": { - "com.linkedin.schema.NumberType": {} - } - }, - "nativeDataType": "NUMBER(38,0)", - "recursive": false, - "isPartOfKey": false - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_28-00_01_52-5vkne0", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)", - "changeType": "UPSERT", - "aspectName": "datasetProperties", - "aspect": { - "json": { - "customProperties": { - "SOURCE_TYPE": "Table", - "TYPE": "DELTA", - "STALE": "false", - "MODE": "DEFAULT", - "OWNER_ROLE_TYPE": "ROLE", - "TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_1", - "BASE_TABLES": "TEST_DB.TEST_SCHEMA.TABLE_1", - "STALE_AFTER": "2021-06-22T00:00:00+00:00" - }, - "externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/view/STREAM_1/", - "name": "STREAM_1", - "qualifiedName": "TEST_DB.TEST_SCHEMA.STREAM_1", - "description": "Comment for Stream 1", - "created": { - "time": 1623110400000 - }, - "lastModified": { - "time": 1623110400000 - }, - "tags": [] - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_28-00_01_52-5vkne0", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)", - "changeType": "UPSERT", - "aspectName": "container", - "aspect": { - "json": { - "container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c" - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_28-00_01_52-5vkne0", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)", - "changeType": "UPSERT", - "aspectName": "subTypes", - "aspect": { - "json": { - "typeNames": [ - "Snowflake Stream" - ] - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_28-00_01_52-5vkne0", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)", diff --git a/metadata-ingestion/tests/test_helpers/sdk_v2_helpers.py b/metadata-ingestion/tests/test_helpers/sdk_v2_helpers.py new file mode 100644 index 00000000000000..76715b93f1cf8b --- /dev/null +++ b/metadata-ingestion/tests/test_helpers/sdk_v2_helpers.py @@ -0,0 +1,17 @@ +import pathlib + +import pytest + +from datahub.sdk._entity import Entity +from tests.test_helpers import mce_helpers + + +def assert_entity_golden( + pytestconfig: pytest.Config, entity: Entity, golden_path: pathlib.Path +) -> None: + mce_helpers.check_goldens_stream( + pytestconfig=pytestconfig, + outputs=entity._as_mcps(), + golden_path=golden_path, + ignore_order=False, + ) diff --git a/metadata-ingestion/tests/unit/sdk_v2/container_golden/test_container_basic_golden.json b/metadata-ingestion/tests/unit/sdk_v2/container_golden/test_container_basic_golden.json new file mode 100644 index 00000000000000..d812e83fa36e71 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/container_golden/test_container_basic_golden.json @@ -0,0 +1,52 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:1e476e4c36434ae8a7ea78e467e5b59d", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "bigquery", + "database": "my_bq_project" + }, + "name": "my_bq_project" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:1e476e4c36434ae8a7ea78e467e5b59d", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:1e476e4c36434ae8a7ea78e467e5b59d", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:1e476e4c36434ae8a7ea78e467e5b59d", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Project" + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/container_golden/test_container_complex_golden.json b/metadata-ingestion/tests/unit/sdk_v2/container_golden/test_container_complex_golden.json new file mode 100644 index 00000000000000..6735cccdf5e879 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/container_golden/test_container_complex_golden.json @@ -0,0 +1,157 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "snowflake", + "instance": "my_instance", + "database": "MY_DB", + "schema": "MY_SCHEMA", + "key1": "value1", + "key2": "value2" + }, + "externalUrl": "https://example.com", + "name": "MY_SCHEMA", + "qualifiedName": "MY_DB.MY_SCHEMA", + "description": "test", + "created": { + "time": 1735787045000 + }, + "lastModified": { + "time": 1736391846000 + } + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + }, + { + "id": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c", + "urn": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c" + } + ] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:admin@datahubproject.io", + "type": "TECHNICAL_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [ + { + "tag": "urn:li:tag:tag1" + }, + { + "tag": "urn:li:tag:tag2" + } + ] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "changeType": "UPSERT", + "aspectName": "glossaryTerms", + "aspect": { + "json": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:AccountBalance" + } + ], + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:__ingestion" + } + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:Marketing" + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_basic_golden.json b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_basic_golden.json new file mode 100644 index 00000000000000..bff58b02c091b0 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_basic_golden.json @@ -0,0 +1,91 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.Schemaless": {} + }, + "fields": [ + { + "fieldPath": "field1", + "nullable": false, + "description": "field1 description", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "field2", + "nullable": false, + "description": "field2 description", + "type": { + "type": { + "com.linkedin.schema.NullType": {} + } + }, + "nativeDataType": "int64", + "recursive": false, + "isPartOfKey": false + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "tags": [] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_complex_golden.json b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_complex_golden.json new file mode 100644 index 00000000000000..b224c8c7090dd3 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_complex_golden.json @@ -0,0 +1,279 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "", + "platform": "urn:li:dataPlatform:snowflake", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.Schemaless": {} + }, + "fields": [ + { + "fieldPath": "field1", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "field2", + "nullable": false, + "description": "field2 description", + "type": { + "type": { + "com.linkedin.schema.NullType": {} + } + }, + "nativeDataType": "int64", + "recursive": false, + "isPartOfKey": false + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "editableDatasetProperties", + "aspect": { + "json": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "description": "test" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "key1": "value1", + "key2": "value2" + }, + "externalUrl": "https://example.com", + "name": "MY_TABLE", + "qualifiedName": "MY_DB.MY_SCHEMA.MY_TABLE", + "created": { + "time": 1735787045000 + }, + "lastModified": { + "time": 1736391846000 + }, + "tags": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + }, + { + "id": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c", + "urn": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c" + }, + { + "id": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "urn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:admin@datahubproject.io", + "type": "TECHNICAL_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [ + { + "tag": "urn:li:tag:tag1" + }, + { + "tag": "urn:li:tag:tag2" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "glossaryTerms", + "aspect": { + "json": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:AccountBalance" + } + ], + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:__ingestion" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:Marketing" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "editableSchemaMetadata", + "aspect": { + "json": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "editableSchemaFieldInfo": [ + { + "fieldPath": "field1", + "description": "field1 description", + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:field1_tag1" + }, + { + "tag": "urn:li:tag:field1_tag2" + } + ] + } + }, + { + "fieldPath": "field2", + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:field2_term1" + }, + { + "urn": "urn:li:glossaryTerm:field2_term2" + } + ], + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:__ingestion" + } + } + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_ingestion_golden.json b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_ingestion_golden.json new file mode 100644 index 00000000000000..ca3a9a6d64bbe7 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/dataset_golden/test_dataset_ingestion_golden.json @@ -0,0 +1,235 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "", + "platform": "urn:li:dataPlatform:snowflake", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.Schemaless": {} + }, + "fields": [ + { + "fieldPath": "field1", + "nullable": false, + "description": "field1 description", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:field1_tag1" + }, + { + "tag": "urn:li:tag:field1_tag2" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "field2", + "nullable": false, + "description": "field2 description", + "type": { + "type": { + "com.linkedin.schema.NullType": {} + } + }, + "nativeDataType": "int64", + "recursive": false, + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:field2_term1" + }, + { + "urn": "urn:li:glossaryTerm:field2_term2" + } + ], + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:__ingestion" + } + }, + "isPartOfKey": false + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "key1": "value1", + "key2": "value2" + }, + "externalUrl": "https://example.com", + "name": "MY_TABLE", + "qualifiedName": "MY_DB.MY_SCHEMA.MY_TABLE", + "description": "test", + "created": { + "time": 1735787045000 + }, + "lastModified": { + "time": 1736391846000 + }, + "tags": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + }, + { + "id": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c", + "urn": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c" + }, + { + "id": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056", + "urn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:admin@datahubproject.io", + "type": "TECHNICAL_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [ + { + "tag": "urn:li:tag:tag1" + }, + { + "tag": "urn:li:tag:tag2" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "glossaryTerms", + "aspect": { + "json": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:AccountBalance" + } + ], + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:__ingestion" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:Marketing" + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_db_golden.json b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_db_golden.json new file mode 100644 index 00000000000000..6789608570ba8a --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_db_golden.json @@ -0,0 +1,52 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "snowflake", + "database": "test_db" + }, + "name": "test_db" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Database" + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_schema_golden.json b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_schema_golden.json new file mode 100644 index 00000000000000..c9729dc33546d3 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_container_schema_golden.json @@ -0,0 +1,69 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "snowflake", + "database": "test_db", + "schema": "test_schema" + }, + "name": "test_schema" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "urn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de" + } + ] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_creation_golden.json b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_creation_golden.json new file mode 100644 index 00000000000000..4fd65e3fdfccd9 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_creation_golden.json @@ -0,0 +1,142 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE_ENTITY", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "name": "test_db.test_schema.table_1", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "", + "platform": "urn:li:dataPlatform:snowflake", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.Schemaless": {} + }, + "fields": [ + { + "fieldPath": "col1", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "col2", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "editableDatasetProperties", + "aspect": { + "json": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "description": "test description" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de", + "urn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de" + }, + { + "id": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6", + "urn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "CREATE", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [ + { + "tag": "urn:li:tag:tag1" + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_update_golden.json b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_update_golden.json new file mode 100644 index 00000000000000..4e2ba2a59a2b18 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/entity_client_goldens/test_dataset_update_golden.json @@ -0,0 +1,45 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "description": "original description", + "tags": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "UPSERT", + "aspectName": "editableDatasetProperties", + "aspect": { + "json": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "description": "updated description" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_client_v2.py b/metadata-ingestion/tests/unit/sdk_v2/test_client_v2.py new file mode 100644 index 00000000000000..7acb0a8d2bf976 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/test_client_v2.py @@ -0,0 +1,55 @@ +from unittest.mock import Mock + +import pytest + +from datahub.errors import ItemNotFoundError, MultipleItemsFoundError, SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph +from datahub.ingestion.graph.config import DatahubClientConfig +from datahub.metadata.urns import CorpUserUrn +from datahub.sdk.main_client import DataHubClient + + +@pytest.fixture +def mock_graph() -> Mock: + graph = Mock(spec=DataHubGraph) + graph.exists.return_value = False + return graph + + +def test_client_creation(mock_graph: Mock) -> None: + assert DataHubClient(graph=mock_graph) + assert DataHubClient(server="https://example.com", token="token") + + +def test_client_init_errors(mock_graph: Mock) -> None: + config = DatahubClientConfig(server="https://example.com", token="token") + + with pytest.raises(SdkUsageError): + DataHubClient(server="https://example.com", graph=mock_graph) # type: ignore + with pytest.raises(SdkUsageError): + DataHubClient(server="https://example.com", config=config) # type: ignore + with pytest.raises(SdkUsageError): + DataHubClient(config=config, graph=mock_graph) # type: ignore + with pytest.raises(SdkUsageError): + DataHubClient() # type: ignore + + +def test_resolve_user(mock_graph: Mock) -> None: + client = DataHubClient(graph=mock_graph) + + # This test doesn't really validate the graphql query or vars. + # It probably makes more sense to test via smoke-tests. + + mock_graph.get_urns_by_filter.return_value = [] + with pytest.raises(ItemNotFoundError): + client.resolve.user(name="User") + + mock_graph.get_urns_by_filter.return_value = ["urn:li:corpuser:user"] + assert client.resolve.user(name="User") == CorpUserUrn("urn:li:corpuser:user") + + mock_graph.get_urns_by_filter.return_value = [ + "urn:li:corpuser:user", + "urn:li:corpuser:user2", + ] + with pytest.raises(MultipleItemsFoundError): + client.resolve.user(name="User") diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_container.py b/metadata-ingestion/tests/unit/sdk_v2/test_container.py new file mode 100644 index 00000000000000..3cd2d928fac08c --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/test_container.py @@ -0,0 +1,135 @@ +import pathlib +from datetime import datetime, timezone + +import pytest + +from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey +from datahub.ingestion.source.common.subtypes import ( + DatasetContainerSubTypes, +) +from datahub.metadata.urns import ( + ContainerUrn, + CorpUserUrn, + DomainUrn, + GlossaryTermUrn, + TagUrn, +) +from datahub.sdk.container import Container +from tests.test_helpers.sdk_v2_helpers import assert_entity_golden + +_GOLDEN_DIR = pathlib.Path(__file__).parent / "container_golden" + + +def test_container_basic(pytestconfig: pytest.Config) -> None: + db_key = DatabaseKey( + platform="bigquery", + database="my_bq_project", + ) + + c = Container( + db_key, + display_name="my_bq_project", + subtype=DatasetContainerSubTypes.BIGQUERY_PROJECT, + ) + + # Check urn setup. + assert Container.get_urn_type() == ContainerUrn + assert isinstance(c.urn, ContainerUrn) + assert str(c.urn) == "urn:li:container:1e476e4c36434ae8a7ea78e467e5b59d" + assert str(c.urn) in repr(c) + + # Check most attributes. + assert c.platform_instance is None + assert c.tags is None + assert c.terms is None + assert c.created is None + assert c.last_modified is None + assert c.description is None + assert c.custom_properties == { + "platform": "bigquery", + "database": "my_bq_project", + } + assert c.domain is None + + # Check slots. + with pytest.raises(AttributeError): + assert c.extra_attribute # type: ignore + with pytest.raises(AttributeError): + c.extra_attribute = "slots should reject extra fields" # type: ignore + with pytest.raises(AttributeError): + # This should fail. Eventually we should make it suggest calling set_owners instead. + c.owners = [] # type: ignore + + assert_entity_golden( + pytestconfig, c, _GOLDEN_DIR / "test_container_basic_golden.json" + ) + + +def test_container_complex(pytestconfig: pytest.Config) -> None: + schema_key = SchemaKey( + platform="snowflake", + instance="my_instance", + database="MY_DB", + schema="MY_SCHEMA", + ) + created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc) + updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc) + + d = Container( + schema_key, + display_name="MY_SCHEMA", + qualified_name="MY_DB.MY_SCHEMA", + subtype=DatasetContainerSubTypes.SCHEMA, + created=created, + last_modified=updated, + extra_properties={ + "key1": "value1", + "key2": "value2", + }, + description="test", + external_url="https://example.com", + owners=[ + CorpUserUrn("admin@datahubproject.io"), + ], + tags=[ + TagUrn("tag1"), + TagUrn("tag2"), + ], + terms=[ + GlossaryTermUrn("AccountBalance"), + ], + domain=DomainUrn("Marketing"), + ) + assert d.platform_instance is not None + assert ( + str(d.platform_instance) + == "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + ) + assert d.subtype == "Schema" + assert d.description == "test" + assert d.display_name == "MY_SCHEMA" + assert d.qualified_name == "MY_DB.MY_SCHEMA" + assert d.external_url == "https://example.com" + assert d.created == created + assert d.last_modified == updated + assert d.custom_properties == { + "platform": "snowflake", + "instance": "my_instance", + "database": "MY_DB", + "schema": "MY_SCHEMA", + "key1": "value1", + "key2": "value2", + } + + # Check standard aspects. + assert d.domain == DomainUrn("Marketing") + assert d.tags is not None + assert len(d.tags) == 2 + assert d.terms is not None + assert len(d.terms) == 1 + assert d.owners is not None + assert len(d.owners) == 1 + + assert_entity_golden( + pytestconfig, d, _GOLDEN_DIR / "test_container_complex_golden.json" + ) diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_dataset.py b/metadata-ingestion/tests/unit/sdk_v2/test_dataset.py new file mode 100644 index 00000000000000..d64302d8fdd911 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/test_dataset.py @@ -0,0 +1,177 @@ +import pathlib +from datetime import datetime, timezone + +import pytest + +from datahub.emitter.mcp_builder import SchemaKey +from datahub.errors import SchemaFieldKeyError +from datahub.ingestion.source.common.subtypes import DatasetSubTypes +from datahub.metadata.urns import ( + CorpUserUrn, + DatasetUrn, + DomainUrn, + GlossaryTermUrn, + TagUrn, +) +from datahub.sdk._attribution import KnownAttribution, change_default_attribution +from datahub.sdk.dataset import Dataset +from tests.test_helpers.sdk_v2_helpers import assert_entity_golden + +_GOLDEN_DIR = pathlib.Path(__file__).parent / "dataset_golden" + + +def test_dataset_basic(pytestconfig: pytest.Config) -> None: + d = Dataset( + platform="bigquery", + name="proj.dataset.table", + subtype=DatasetSubTypes.TABLE, + schema=[ + ("field1", "string", "field1 description"), + ("field2", "int64", "field2 description"), + ], + ) + + # Check urn setup. + assert Dataset.get_urn_type() == DatasetUrn + assert isinstance(d.urn, DatasetUrn) + assert ( + str(d.urn) + == "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)" + ) + assert str(d.urn) in repr(d) + + # Check most attributes. + assert d.platform_instance is None + assert d.tags is None + assert d.terms is None + assert d.created is None + assert d.last_modified is None + assert d.description is None + assert d.custom_properties == {} + assert d.domain is None + + # TODO: The column descriptions should go in the editable fields, since we're not in ingestion mode. + assert len(d.schema) == 2 + assert d["field1"].description == "field1 description" + + with pytest.raises(SchemaFieldKeyError, match=r"Field .* not found"): + d["should_be_missing"] + + with pytest.raises(AttributeError): + assert d.extra_attribute # type: ignore + with pytest.raises(AttributeError): + d.extra_attribute = "slots should reject extra fields" # type: ignore + with pytest.raises(AttributeError): + # This should fail. Eventually we should make it suggest calling set_owners instead. + d.owners = [] # type: ignore + + assert_entity_golden( + pytestconfig, d, _GOLDEN_DIR / "test_dataset_basic_golden.json" + ) + + +def _build_complex_dataset() -> Dataset: + schema = SchemaKey( + platform="snowflake", + instance="my_instance", + database="MY_DB", + schema="MY_SCHEMA", + ) + + created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc) + updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc) + + d = Dataset( + platform="snowflake", + platform_instance="my_instance", + name="my_db.my_schema.my_table", + container=schema, + subtype=DatasetSubTypes.TABLE, + schema=[ + ("field1", "string"), + ("field2", "int64", "field2 description"), + ], + display_name="MY_TABLE", + qualified_name="MY_DB.MY_SCHEMA.MY_TABLE", + created=created, + last_modified=updated, + custom_properties={ + "key1": "value1", + "key2": "value2", + }, + description="test", + external_url="https://example.com", + owners=[ + CorpUserUrn("admin@datahubproject.io"), + ], + tags=[ + TagUrn("tag1"), + TagUrn("tag2"), + ], + terms=[ + GlossaryTermUrn("AccountBalance"), + ], + domain=DomainUrn("Marketing"), + ) + assert d.platform_instance is not None + assert ( + str(d.platform_instance) + == "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + ) + assert d.subtype == "Table" + assert d.description == "test" + assert d.display_name == "MY_TABLE" + assert d.qualified_name == "MY_DB.MY_SCHEMA.MY_TABLE" + assert d.external_url == "https://example.com" + assert d.created == created + assert d.last_modified == updated + assert d.custom_properties == {"key1": "value1", "key2": "value2"} + + # Check standard aspects. + assert d.domain == DomainUrn("Marketing") + assert d.tags is not None + assert len(d.tags) == 2 + assert d.terms is not None + assert len(d.terms) == 1 + assert d.owners is not None + assert len(d.owners) == 1 + + assert len(d.schema) == 2 + + # Schema field description. + assert d["field1"].description is None + assert d["field2"].description == "field2 description" + d["field1"].set_description("field1 description") + assert d["field1"].description == "field1 description" + + # Schema field tags. + assert d["field1"].tags is None + d["field1"].set_tags([TagUrn("field1_tag1"), TagUrn("field1_tag2")]) + assert d["field1"].tags is not None + assert len(d["field1"].tags) == 2 + + # Schema field terms. + assert d["field2"].terms is None + d["field2"].set_terms( + [GlossaryTermUrn("field2_term1"), GlossaryTermUrn("field2_term2")] + ) + assert d["field2"].terms is not None + assert len(d["field2"].terms) == 2 + + return d + + +def test_dataset_complex(pytestconfig: pytest.Config) -> None: + d = _build_complex_dataset() + assert_entity_golden( + pytestconfig, d, _GOLDEN_DIR / "test_dataset_complex_golden.json" + ) + + +def test_dataset_ingestion(pytestconfig: pytest.Config) -> None: + with change_default_attribution(KnownAttribution.INGESTION): + d = _build_complex_dataset() + + assert_entity_golden( + pytestconfig, d, _GOLDEN_DIR / "test_dataset_ingestion_golden.json" + ) diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_entity_client.py b/metadata-ingestion/tests/unit/sdk_v2/test_entity_client.py new file mode 100644 index 00000000000000..2218ef2f909448 --- /dev/null +++ b/metadata-ingestion/tests/unit/sdk_v2/test_entity_client.py @@ -0,0 +1,142 @@ +import pathlib +from unittest.mock import Mock + +import pytest + +import datahub.metadata.schema_classes as models +from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey +from datahub.errors import ItemNotFoundError, SdkUsageError +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.urns import DatasetUrn, TagUrn +from datahub.sdk.container import Container +from datahub.sdk.dataset import Dataset +from datahub.sdk.main_client import DataHubClient +from tests.test_helpers import mce_helpers + +_GOLDEN_DIR = pathlib.Path(__file__).parent / "entity_client_goldens" + + +@pytest.fixture +def mock_graph() -> Mock: + graph = Mock(spec=DataHubGraph) + graph.exists.return_value = False + return graph + + +@pytest.fixture +def client(mock_graph: Mock) -> DataHubClient: + return DataHubClient(graph=mock_graph) + + +def assert_client_golden( + pytestconfig: pytest.Config, + client: DataHubClient, + golden_path: pathlib.Path, +) -> None: + mcps = client._graph.emit_mcps.call_args[0][0] # type: ignore + mce_helpers.check_goldens_stream( + pytestconfig=pytestconfig, + outputs=mcps, + golden_path=golden_path, + ignore_order=False, + ) + + +def test_container_creation_flow( + pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock +) -> None: + # Create database and schema containers + db = DatabaseKey(platform="snowflake", database="test_db") + schema = SchemaKey(**db.dict(), schema="test_schema") + + db_container = Container(db, display_name="test_db", subtype="Database") + schema_container = Container(schema, display_name="test_schema", subtype="Schema") + + # Test database container creation + client.entities.upsert(db_container) + assert_client_golden( + pytestconfig, client, _GOLDEN_DIR / "test_container_db_golden.json" + ) + + # Test schema container creation + client.entities.upsert(schema_container) + assert_client_golden( + pytestconfig, client, _GOLDEN_DIR / "test_container_schema_golden.json" + ) + + +def test_dataset_creation( + pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock +) -> None: + schema = SchemaKey(platform="snowflake", database="test_db", schema="test_schema") + + dataset = Dataset( + platform="snowflake", + name="test_db.test_schema.table_1", + env="prod", + container=schema, + schema=[ + ("col1", "string"), + ("col2", "int"), + ], + description="test description", + tags=[TagUrn("tag1")], + ) + + client.entities.create(dataset) + assert_client_golden( + pytestconfig, client, _GOLDEN_DIR / "test_dataset_creation_golden.json" + ) + + +def test_dataset_read_modify_write( + pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock +) -> None: + # Setup mock for existing dataset + mock_graph.exists.return_value = True + dataset_urn = DatasetUrn( + platform="snowflake", name="test_db.test_schema.table_1", env="prod" + ) + + # Mock the get_entity_semityped response with initial state + mock_graph.get_entity_semityped.return_value = { + "datasetProperties": models.DatasetPropertiesClass( + description="original description", + customProperties={}, + tags=[], + ) + } + + # Get and update dataset + dataset = client.entities.get(dataset_urn) + dataset.set_description("updated description") + + client.entities.update(dataset) + assert_client_golden( + pytestconfig, client, _GOLDEN_DIR / "test_dataset_update_golden.json" + ) + + +def test_create_existing_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None: + mock_graph.exists.return_value = True + + dataset = Dataset( + platform="snowflake", + name="test_db.test_schema.table_1", + env="prod", + schema=[("col1", "string")], + ) + + with pytest.raises(SdkUsageError, match="Entity .* already exists"): + client.entities.create(dataset) + + +def test_get_nonexistent_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None: + mock_graph.exists.return_value = False + + dataset_urn = DatasetUrn( + platform="snowflake", name="test_db.test_schema.missing_table", env="prod" + ) + + with pytest.raises(ItemNotFoundError, match="Entity .* not found"): + client.entities.get(dataset_urn) diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_split_statements.py b/metadata-ingestion/tests/unit/sql_parsing/test_split_statements.py index 06e0e84ede5547..7b012f5ee0f225 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_split_statements.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_split_statements.py @@ -2,6 +2,7 @@ def test_split_statements_complex() -> None: + # Purposely leaving the preceding whitespace on every line to test that we ignore it. test_sql = """ CREATE TABLE Users (Id INT); -- Comment here @@ -49,3 +50,70 @@ def test_split_statements_cte() -> None: assert statements == [ test_sql, ] + + +def test_split_statement_drop() -> None: + test_sql = """\ +DROP TABLE #temp1 +select 'foo' into #temp1 +drop table #temp1 + """ + statements = [statement.strip() for statement in split_statements(test_sql)] + assert statements == [ + "DROP TABLE #temp1", + "SELECT 'foo' into #temp1", + "DROP table #temp1", + ] + + +def test_single_statement_with_case() -> None: + test_sql = """\ +SELECT + a, + b as B, + CASE + WHEN a = 1 THEN 'one' + WHEN a = 2 THEN 'two' + ELSE 'other' + END AS c, + ROW_NUMBER() + OVER ( + PARTITION BY a + ORDER BY b ASC + ) AS d +INTO #temp1 +FROM foo +LEFT JOIN + bar + ON + foo.a = bar.a + AND foo.b < bar.b + AND bar.f = 'product' +WHERE foo.a > 2 +""" + + statements = [statement.strip() for statement in split_statements(test_sql)] + assert statements == [ + test_sql.strip(), + ] + + +def test_split_strict_keywords() -> None: + test_sql = """\ +CREATE TABLE prev (a INT) +IF OBJECT_ID('#foo') IS NOT NULL + DROP TABLE #foo +SELECT 1 as a INTO #foo +TRUNCATE TABLE #foo +SELECT 1 as a INTO #foo + """ + statements = [statement.strip() for statement in split_statements(test_sql)] + assert statements == [ + "CREATE TABLE prev (a INT)", + "IF", + "OBJECT_ID('#foo') IS NOT NULL", + "DROP TABLE #foo", + "SELECT 1 as a INTO #foo", + "TRUNCATE TABLE #foo", + "SELECT 1 as a INTO #foo", + ] diff --git a/metadata-io/src/main/java/com/linkedin/metadata/structuredproperties/validation/PropertyDefinitionValidator.java b/metadata-io/src/main/java/com/linkedin/metadata/structuredproperties/validation/PropertyDefinitionValidator.java index 6e047c12da9a9f..18553d6930b8d4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/structuredproperties/validation/PropertyDefinitionValidator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/structuredproperties/validation/PropertyDefinitionValidator.java @@ -1,13 +1,14 @@ package com.linkedin.metadata.structuredproperties.validation; -import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; -import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME; +import static com.linkedin.metadata.Constants.*; import static com.linkedin.structured.PropertyCardinality.*; import com.google.common.collect.ImmutableSet; import com.linkedin.common.Status; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.GetMode; +import com.linkedin.data.template.StringArrayMap; import com.linkedin.entity.Aspect; import com.linkedin.metadata.Constants; import com.linkedin.metadata.aspect.AspectRetriever; @@ -24,6 +25,8 @@ import com.linkedin.structured.StructuredPropertyDefinition; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -42,6 +45,8 @@ public class PropertyDefinitionValidator extends AspectPayloadValidator { private AspectPluginConfig config; + private static String ALLOWED_TYPES = "allowedTypes"; + /** * Prevent deletion of the definition or key aspect (only soft delete) * @@ -92,6 +97,9 @@ public static Stream validateDefinitionUpserts( urnIdCheck(item).ifPresent(exceptions::addException); qualifiedNameCheck(item, newDefinition.getQualifiedName()) .ifPresent(exceptions::addException); + allowedTypesCheck( + item, newDefinition.getTypeQualifier(), retrieverContext.getAspectRetriever()) + .ifPresent(exceptions::addException); if (item.getPreviousSystemAspect() != null) { @@ -211,4 +219,47 @@ private static Optional qualifiedNameCheck( } return Optional.empty(); } + + private static Optional allowedTypesCheck( + MCPItem item, @Nullable StringArrayMap typeQualifier, AspectRetriever aspectRetriever) { + if (typeQualifier == null || typeQualifier.get(ALLOWED_TYPES) == null) { + return Optional.empty(); + } + List allowedTypes = typeQualifier.get(ALLOWED_TYPES); + try { + List allowedTypesUrns = + allowedTypes.stream().map(UrnUtils::getUrn).collect(Collectors.toList()); + + // ensure all types are entityTypes + if (allowedTypesUrns.stream() + .anyMatch(t -> !t.getEntityType().equals(ENTITY_TYPE_ENTITY_NAME))) { + return Optional.of( + AspectValidationException.forItem( + item, + String.format( + "Provided allowedType that is not an entityType entity. List of allowedTypes: %s", + allowedTypes))); + } + + // ensure all types exist as entities + Map existsMap = aspectRetriever.entityExists(new HashSet<>(allowedTypesUrns)); + if (existsMap.containsValue(false)) { + return Optional.of( + AspectValidationException.forItem( + item, + String.format( + "Provided allowedType that does not exist. List of allowedTypes: %s", + allowedTypes))); + } + } catch (Exception e) { + return Optional.of( + AspectValidationException.forItem( + item, + String.format( + "Issue resolving allowedTypes inside of typeQualifier. These must be entity type urns. List of allowedTypes: %s", + allowedTypes))); + } + + return Optional.empty(); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/structuredproperties/validators/PropertyDefinitionValidatorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/structuredproperties/validators/PropertyDefinitionValidatorTest.java index 18949f0566dd19..681b31a2dbc7f1 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/structuredproperties/validators/PropertyDefinitionValidatorTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/structuredproperties/validators/PropertyDefinitionValidatorTest.java @@ -1,5 +1,6 @@ package com.linkedin.metadata.structuredproperties.validators; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -8,6 +9,8 @@ import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.SetMode; +import com.linkedin.data.template.StringArray; +import com.linkedin.data.template.StringArrayMap; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.GraphRetriever; import com.linkedin.metadata.aspect.RetrieverContext; @@ -22,6 +25,7 @@ import com.linkedin.test.metadata.aspect.TestEntityRegistry; import com.linkedin.test.metadata.aspect.batch.TestMCP; import java.net.URISyntaxException; +import java.util.HashMap; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -37,6 +41,8 @@ public void init() { testPropertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:foo.bar"); AspectRetriever mockAspectRetriever = mock(AspectRetriever.class); when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry); + HashMap map = new HashMap<>(); + when(mockAspectRetriever.entityExists(any())).thenReturn(map); GraphRetriever mockGraphRetriever = mock(GraphRetriever.class); mockRetrieverContext = mock(RetrieverContext.class); when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever); @@ -433,4 +439,110 @@ public void testQualifiedNameWithSpace() .count(), 1); } + + @Test + public void testValidAllowedTypes() + throws URISyntaxException, CloneNotSupportedException, AspectValidationException { + Urn propertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:foo.bar"); + StructuredPropertyDefinition newProperty = createValidPropertyDefinition(); + StringArrayMap typeQualifier = new StringArrayMap(); + typeQualifier.put("allowedTypes", new StringArray("urn:li:entityType:datahub.dataset")); + newProperty.setTypeQualifier(typeQualifier); + assertEquals( + PropertyDefinitionValidator.validateDefinitionUpserts( + TestMCP.ofOneMCP(propertyUrn, null, newProperty, entityRegistry), + mockRetrieverContext) + .count(), + 0); + } + + @Test + public void testInvalidUrnsInAllowedTypes() + throws URISyntaxException, CloneNotSupportedException, AspectValidationException { + Urn propertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:foo.bar"); + StructuredPropertyDefinition newProperty = createValidPropertyDefinition(); + StringArrayMap typeQualifier = new StringArrayMap(); + // invalid urn here + typeQualifier.put("allowedTypes", new StringArray("invalidUrn")); + newProperty.setTypeQualifier(typeQualifier); + assertEquals( + PropertyDefinitionValidator.validateDefinitionUpserts( + TestMCP.ofOneMCP(propertyUrn, null, newProperty, entityRegistry), + mockRetrieverContext) + .count(), + 1); + } + + @Test + public void testNotEntityTypeInAllowedTypes() + throws URISyntaxException, CloneNotSupportedException, AspectValidationException { + Urn propertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:foo.bar"); + StructuredPropertyDefinition newProperty = createValidPropertyDefinition(); + StringArrayMap typeQualifier = new StringArrayMap(); + // urn that is not an entityType + typeQualifier.put("allowedTypes", new StringArray("urn:li:dataPlatform:snowflake")); + newProperty.setTypeQualifier(typeQualifier); + assertEquals( + PropertyDefinitionValidator.validateDefinitionUpserts( + TestMCP.ofOneMCP(propertyUrn, null, newProperty, entityRegistry), + mockRetrieverContext) + .count(), + 1); + } + + @Test + public void testEntityTypeDoesNotExistInAllowedTypes() + throws URISyntaxException, CloneNotSupportedException, AspectValidationException { + AspectRetriever mockAspectRetriever = mock(AspectRetriever.class); + when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry); + HashMap map = new HashMap<>(); + map.put(UrnUtils.getUrn("urn:li:entityType:datahub.fakeEntity"), false); + when(mockAspectRetriever.entityExists(any())).thenReturn(map); + GraphRetriever mockGraphRetriever = mock(GraphRetriever.class); + RetrieverContext retrieverContext = mock(RetrieverContext.class); + when(retrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever); + when(retrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever); + + Urn propertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:foo.bar"); + StructuredPropertyDefinition newProperty = createValidPropertyDefinition(); + StringArrayMap typeQualifier = new StringArrayMap(); + // urn that doesn't exist + typeQualifier.put("allowedTypes", new StringArray("urn:li:entityType:datahub.fakeEntity")); + newProperty.setTypeQualifier(typeQualifier); + assertEquals( + PropertyDefinitionValidator.validateDefinitionUpserts( + TestMCP.ofOneMCP(propertyUrn, null, newProperty, entityRegistry), retrieverContext) + .count(), + 1); + } + + @Test + public void testAllowedTypesMixOfValidAndInvalid() + throws URISyntaxException, CloneNotSupportedException, AspectValidationException { + Urn propertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:foo.bar"); + StructuredPropertyDefinition newProperty = createValidPropertyDefinition(); + StringArrayMap typeQualifier = new StringArrayMap(); + // urn that is not an entityType + typeQualifier.put( + "allowedTypes", + new StringArray("urn:li:entityType:datahub.dataset", "urn:li:dataPlatform:snowflake")); + newProperty.setTypeQualifier(typeQualifier); + assertEquals( + PropertyDefinitionValidator.validateDefinitionUpserts( + TestMCP.ofOneMCP(propertyUrn, null, newProperty, entityRegistry), + mockRetrieverContext) + .count(), + 1); + } + + private StructuredPropertyDefinition createValidPropertyDefinition() throws URISyntaxException { + StructuredPropertyDefinition newProperty = new StructuredPropertyDefinition(); + newProperty.setEntityTypes( + new UrnArray(Urn.createFromString("urn:li:entityType:datahub.dataset"))); + newProperty.setDisplayName("oldProp"); + newProperty.setQualifiedName("foo.bar"); + newProperty.setCardinality(PropertyCardinality.MULTIPLE); + newProperty.setValueType(Urn.createFromString("urn:li:dataType:datahub.urn")); + return newProperty; + } } diff --git a/metadata-service/iceberg-catalog/requirements.txt b/metadata-service/iceberg-catalog/requirements.txt index de0d0e77672fd2..0bd935018d504e 100644 --- a/metadata-service/iceberg-catalog/requirements.txt +++ b/metadata-service/iceberg-catalog/requirements.txt @@ -3,7 +3,7 @@ pytest-dependency>=0.5.1 pyspark==3.5.3 -e ../../metadata-ingestion[iceberg-catalog] # libaries for linting below this -black==23.7.0 +black==24.3.0 isort==5.12.0 mypy==1.5.1 ruff==0.0.287