Skip to content

Commit

Permalink
Merge branch 'master' into jk-fix-empty-schema-field
Browse files Browse the repository at this point in the history
  • Loading branch information
jayasimhankv authored Feb 7, 2025
2 parents fe375a4 + bd47b11 commit bc38d04
Show file tree
Hide file tree
Showing 29 changed files with 2,970 additions and 307 deletions.
2 changes: 1 addition & 1 deletion docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ Please see our [Integrations page](https://datahubproject.io/integrations) if yo
| [bigquery](./generated/ingestion/sources/bigquery.md) | `pip install 'acryl-datahub[bigquery]'` | BigQuery source |
| [datahub-lineage-file](./generated/ingestion/sources/file-based-lineage.md) | _no additional dependencies_ | Lineage File source |
| [datahub-business-glossary](./generated/ingestion/sources/business-glossary.md) | _no additional dependencies_ | Business Glossary File source |
| [dbt](./generated/ingestion/sources/dbt.md) | _no additional dependencies_ | dbt source |
| [dbt](./generated/ingestion/sources/dbt.md) | `pip install 'acryl-datahub[dbt]'` | dbt source |
| [dremio](./generated/ingestion/sources/dremio.md) | `pip install 'acryl-datahub[dremio]'` | Dremio Source |
| [druid](./generated/ingestion/sources/druid.md) | `pip install 'acryl-datahub[druid]'` | Druid Source |
| [feast](./generated/ingestion/sources/feast.md) | `pip install 'acryl-datahub[feast]'` | Feast source (0.26.0) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,50 @@ The Helm chart [datahub-executor-worker](https://executor-helm.acryl.io/index.ya
--set image.tag=v0.3.1 \
acryl datahub-executor-worker
```
9. As of DataHub Cloud `v0.3.8.2` It is possible to pass secrets to ingestion recipes using Kubernetes Secret CRDs as shown below. This allows to update secrets at runtime without restarting Remote Executor process.
```
# 1. Create K8s Secret object in remote executor namespace, e.g.
apiVersion: v1
kind: Secret
metadata:
name: datahub-secret-store
data:
REDSHIFT_PASSWORD: cmVkc2hpZnQtc2VjcmV0Cg==
SNOWFLAKE_PASSWORD: c25vd2ZsYWtlLXNlY3JldAo=
# 2. Add secret into your Remote Executor deployment:
extraVolumes:
- name: datahub-secret-store
secret:
secretName: datahub-secret-store
# 3. Mount it under /mnt/secrets directory
extraVolumeMounts:
- mountPath: /mnt/secrets
name: datahub-secret-store
```
You can then reference the mounted secrets directly in the ingestion recipe:
```yaml
source:
type: redshift
config:
host_port: '<redshift host:port>'
username: connector_test
table_lineage_mode: mixed
include_table_lineage: true
include_tables: true
include_views: true
profiling:
enabled: true
profile_table_level_only: false
stateful_ingestion:
enabled: true
password: '${REDSHIFT_PASSWORD}'
```
By default the executor will look for files mounted in `/mnt/secrets`, this is override-able by setting the env var:
`DATAHUB_EXECUTOR_FILE_SECRET_BASEDIR` to a different location (default: `/mnt/secrets`)

These files are expected to be under 1MB in data by default. To increase this limit set a higher value using:
`DATAHUB_EXECUTOR_FILE_SECRET_MAXLEN` (default: `1024768`, size in bytes)

## FAQ

Expand Down
2 changes: 1 addition & 1 deletion docs/managed-datahub/release-notes/v_0_3_8.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

Release Availability Date
---
21-Jan-2025
29-Jan-2025

Recommended CLI/SDK
---
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ warn_unused_configs = yes
disallow_untyped_defs = no

# try to be a bit more strict in certain areas of the codebase
[mypy-datahub]
# Only for datahub's __init__.py - allow implicit reexport
implicit_reexport = yes
[mypy-datahub.*]
ignore_missing_imports = no
implicit_reexport = no
Expand Down Expand Up @@ -54,7 +57,7 @@ addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers -p no:
markers =
slow: marks tests that are slow to run, including all docker-based tests (deselect with '-m not slow')
integration: marks all integration tests, across all batches (deselect with '-m "not integration"')
integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelisation in CI. Batch 0 is the default batch.
integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelization in CI. Batch 0 is the default batch.
integration_batch_1: mark tests to run in batch 1 of integration tests
integration_batch_2: mark tests to run in batch 2 of integration tests
testpaths =
Expand Down
35 changes: 35 additions & 0 deletions metadata-ingestion/src/datahub/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from datahub.configuration.common import MetaError

# TODO: Move all other error types to this file.


class SdkUsageError(MetaError):
pass


class AlreadyExistsError(SdkUsageError):
pass


class ItemNotFoundError(SdkUsageError):
pass


class MultipleItemsFoundError(SdkUsageError):
pass


class SchemaFieldKeyError(SdkUsageError, KeyError):
pass


class IngestionAttributionWarning(Warning):
pass


class MultipleSubtypesWarning(Warning):
pass


class ExperimentalWarning(Warning):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Callable, Dict, Iterable, List, MutableMapping, Optional

from datahub.ingestion.api.report import SupportsAsObj
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import (
Expand Down Expand Up @@ -100,6 +101,9 @@ class SnowflakeTable(BaseTable):
def is_hybrid(self) -> bool:
return self.type is not None and self.type == "HYBRID TABLE"

def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.TABLE


@dataclass
class SnowflakeView(BaseView):
Expand All @@ -109,6 +113,9 @@ class SnowflakeView(BaseView):
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
is_secure: bool = False

def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.VIEW


@dataclass
class SnowflakeSchema:
Expand Down Expand Up @@ -154,6 +161,9 @@ class SnowflakeStream:
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
last_altered: Optional[datetime] = None

def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.SNOWFLAKE_STREAM


class _SnowflakeTagCache:
def __init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.snowflake.constants import (
GENERIC_PERMISSION_ERROR_KEY,
Expand Down Expand Up @@ -467,7 +466,13 @@ def _process_schema(
context=f"{db_name}.{schema_name}",
)

def _process_tags(self, snowflake_schema, schema_name, db_name, domain):
def _process_tags(
self,
snowflake_schema: SnowflakeSchema,
schema_name: str,
db_name: str,
domain: str,
) -> None:
snowflake_schema.tags = self.tag_extractor.get_tags_on_object(
schema_name=schema_name, db_name=db_name, domain=domain
)
Expand Down Expand Up @@ -837,15 +842,7 @@ def gen_dataset_workunits(
if dpi_aspect:
yield dpi_aspect

subTypes = SubTypes(
typeNames=(
[DatasetSubTypes.SNOWFLAKE_STREAM]
if isinstance(table, SnowflakeStream)
else [DatasetSubTypes.VIEW]
if isinstance(table, SnowflakeView)
else [DatasetSubTypes.TABLE]
)
)
subTypes = SubTypes(typeNames=[table.get_subtype()])

yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=subTypes
Expand Down Expand Up @@ -932,9 +929,9 @@ def get_dataset_properties(
"OWNER_ROLE_TYPE": table.owner_role_type,
"TABLE_NAME": table.table_name,
"BASE_TABLES": table.base_tables,
"STALE_AFTER": table.stale_after.isoformat()
if table.stale_after
else None,
"STALE_AFTER": (
table.stale_after.isoformat() if table.stale_after else None
),
}.items()
if v
}
Expand Down
33 changes: 33 additions & 0 deletions metadata-ingestion/src/datahub/sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import warnings

import datahub.metadata.schema_classes as models
from datahub.errors import ExperimentalWarning, SdkUsageError
from datahub.ingestion.graph.config import DatahubClientConfig
from datahub.metadata.urns import (
ChartUrn,
ContainerUrn,
CorpGroupUrn,
CorpUserUrn,
DashboardUrn,
DataPlatformInstanceUrn,
DataPlatformUrn,
DatasetUrn,
DomainUrn,
GlossaryTermUrn,
SchemaFieldUrn,
TagUrn,
)
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
from datahub.sdk.main_client import DataHubClient

warnings.warn(
"The new datahub SDK (e.g. datahub.sdk.*) is experimental. "
"Our typical backwards-compatibility and stability guarantees do not apply to this code. "
"When it's promoted to stable, the import path will change "
"from `from datahub.sdk import ...` to `from datahub import ...`.",
ExperimentalWarning,
stacklevel=2,
)
del warnings
del ExperimentalWarning
15 changes: 15 additions & 0 deletions metadata-ingestion/src/datahub/sdk/_all_entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Dict, List, Type

from datahub.sdk._entity import Entity
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset

# TODO: Is there a better way to declare this?
ENTITY_CLASSES_LIST: List[Type[Entity]] = [
Container,
Dataset,
]

ENTITY_CLASSES: Dict[str, Type[Entity]] = {
cls.get_urn_type().ENTITY_TYPE: cls for cls in ENTITY_CLASSES_LIST
}
48 changes: 48 additions & 0 deletions metadata-ingestion/src/datahub/sdk/_attribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

import contextlib
from typing import Iterator

from datahub.utilities.str_enum import StrEnum


class KnownAttribution(StrEnum):
INGESTION = "INGESTION"
INGESTION_ALTERNATE = "INGESTION_ALTERNATE"

UI = "UI"
SDK = "SDK"

PROPAGATION = "PROPAGATION"

def is_ingestion(self) -> bool:
return self in (
KnownAttribution.INGESTION,
KnownAttribution.INGESTION_ALTERNATE,
)


_default_attribution = KnownAttribution.SDK


def get_default_attribution() -> KnownAttribution:
return _default_attribution


def set_default_attribution(attribution: KnownAttribution) -> None:
global _default_attribution
_default_attribution = attribution


@contextlib.contextmanager
def change_default_attribution(attribution: KnownAttribution) -> Iterator[None]:
old_attribution = get_default_attribution()
try:
set_default_attribution(attribution)
yield
finally:
set_default_attribution(old_attribution)


def is_ingestion_attribution() -> bool:
return get_default_attribution().is_ingestion()
Loading

0 comments on commit bc38d04

Please sign in to comment.