Skip to content

Commit

Permalink
Merge branch 'master' into include-system-timestamp-for-metadata-modi…
Browse files Browse the repository at this point in the history
…fication
  • Loading branch information
deepgarg-visa authored Feb 8, 2025
2 parents 394a57d + 32c62e5 commit bf1faaa
Show file tree
Hide file tree
Showing 44 changed files with 3,858 additions and 443 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,8 @@ public static String getEntityTypeUrn(String name) {
}
return ENTITY_NAME_TO_ENTITY_TYPE_URN.get(name);
}

public static boolean isValidEntityType(String entityTypeUrn) {
return ENTITY_TYPE_URN_TO_NAME.containsKey(entityTypeUrn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.datahub.graphql.generated.StructuredPropertySettings;
import com.linkedin.datahub.graphql.generated.TypeQualifier;
import com.linkedin.datahub.graphql.types.common.mappers.util.MappingHelper;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeUrnMapper;
import com.linkedin.datahub.graphql.types.mappers.MapperUtils;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import com.linkedin.entity.EntityResponse;
Expand All @@ -30,7 +31,9 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class StructuredPropertyMapper
implements ModelMapper<EntityResponse, StructuredPropertyEntity> {

Expand Down Expand Up @@ -141,8 +144,21 @@ private TypeQualifier mapTypeQualifier(final StringArrayMap gmsTypeQualifier) {
final TypeQualifier typeQualifier = new TypeQualifier();
List<String> allowedTypes = gmsTypeQualifier.get(ALLOWED_TYPES);
if (allowedTypes != null) {
// filter out correct allowedTypes
List<String> validAllowedTypes =
allowedTypes.stream()
.filter(EntityTypeUrnMapper::isValidEntityType)
.collect(Collectors.toList());
if (validAllowedTypes.size() != allowedTypes.size()) {
log.error(
String.format(
"Property has invalid allowed types set. Current list of allowed types: %s",
allowedTypes));
}
typeQualifier.setAllowedTypes(
allowedTypes.stream().map(this::createEntityTypeEntity).collect(Collectors.toList()));
validAllowedTypes.stream()
.map(this::createEntityTypeEntity)
.collect(Collectors.toList()));
}
return typeQualifier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,38 @@ export function computeLines(chartData: TimeSeriesChartType, insertBlankPoints:
return returnLines;
}

const formatAxisDate = (value: number, chartData: TimeSeriesChartType) => {
const date = new Date(value);

switch (chartData.interval) {
case 'MONTH':
return date.toLocaleDateString('en-US', {
month: 'short',
year: 'numeric',
timeZone: 'UTC',
});
case 'WEEK':
return date.toLocaleDateString('en-US', {
month: 'short',
day: 'numeric',
timeZone: 'UTC',
});
case 'DAY':
return date.toLocaleDateString('en-US', {
weekday: 'short',
day: 'numeric',
timeZone: 'UTC',
});
default:
return date.toLocaleDateString('en-US', {
month: 'short',
day: 'numeric',
year: 'numeric',
timeZone: 'UTC',
});
}
};

export const TimeSeriesChart = ({
chartData,
width,
Expand Down Expand Up @@ -117,6 +149,7 @@ export const TimeSeriesChart = ({
strokeWidth={style?.axisWidth}
tickLabelProps={{ fill: 'black', fontFamily: 'inherit', fontSize: 10 }}
numTicks={3}
tickFormat={(value) => formatAxisDate(value, chartData)}
/>
<Axis
orientation="right"
Expand Down Expand Up @@ -151,9 +184,7 @@ export const TimeSeriesChart = ({
tooltipData?.nearestDatum && (
<div>
<div>
{new Date(
Number(accessors.xAccessor(tooltipData.nearestDatum.datum)),
).toDateString()}
{formatAxisDate(accessors.xAccessor(tooltipData.nearestDatum.datum), chartData)}
</div>
<div>{accessors.yAccessor(tooltipData.nearestDatum.datum)}</div>
</div>
Expand Down
2 changes: 1 addition & 1 deletion docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ Please see our [Integrations page](https://datahubproject.io/integrations) if yo
| [bigquery](./generated/ingestion/sources/bigquery.md) | `pip install 'acryl-datahub[bigquery]'` | BigQuery source |
| [datahub-lineage-file](./generated/ingestion/sources/file-based-lineage.md) | _no additional dependencies_ | Lineage File source |
| [datahub-business-glossary](./generated/ingestion/sources/business-glossary.md) | _no additional dependencies_ | Business Glossary File source |
| [dbt](./generated/ingestion/sources/dbt.md) | _no additional dependencies_ | dbt source |
| [dbt](./generated/ingestion/sources/dbt.md) | `pip install 'acryl-datahub[dbt]'` | dbt source |
| [dremio](./generated/ingestion/sources/dremio.md) | `pip install 'acryl-datahub[dremio]'` | Dremio Source |
| [druid](./generated/ingestion/sources/druid.md) | `pip install 'acryl-datahub[druid]'` | Druid Source |
| [feast](./generated/ingestion/sources/feast.md) | `pip install 'acryl-datahub[feast]'` | Feast source (0.26.0) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,50 @@ The Helm chart [datahub-executor-worker](https://executor-helm.acryl.io/index.ya
--set image.tag=v0.3.1 \
acryl datahub-executor-worker
```
9. As of DataHub Cloud `v0.3.8.2` It is possible to pass secrets to ingestion recipes using Kubernetes Secret CRDs as shown below. This allows to update secrets at runtime without restarting Remote Executor process.
```
# 1. Create K8s Secret object in remote executor namespace, e.g.
apiVersion: v1
kind: Secret
metadata:
name: datahub-secret-store
data:
REDSHIFT_PASSWORD: cmVkc2hpZnQtc2VjcmV0Cg==
SNOWFLAKE_PASSWORD: c25vd2ZsYWtlLXNlY3JldAo=
# 2. Add secret into your Remote Executor deployment:
extraVolumes:
- name: datahub-secret-store
secret:
secretName: datahub-secret-store
# 3. Mount it under /mnt/secrets directory
extraVolumeMounts:
- mountPath: /mnt/secrets
name: datahub-secret-store
```
You can then reference the mounted secrets directly in the ingestion recipe:
```yaml
source:
type: redshift
config:
host_port: '<redshift host:port>'
username: connector_test
table_lineage_mode: mixed
include_table_lineage: true
include_tables: true
include_views: true
profiling:
enabled: true
profile_table_level_only: false
stateful_ingestion:
enabled: true
password: '${REDSHIFT_PASSWORD}'
```
By default the executor will look for files mounted in `/mnt/secrets`, this is override-able by setting the env var:
`DATAHUB_EXECUTOR_FILE_SECRET_BASEDIR` to a different location (default: `/mnt/secrets`)

These files are expected to be under 1MB in data by default. To increase this limit set a higher value using:
`DATAHUB_EXECUTOR_FILE_SECRET_MAXLEN` (default: `1024768`, size in bytes)

## FAQ

Expand Down
2 changes: 1 addition & 1 deletion docs/managed-datahub/release-notes/v_0_3_8.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

Release Availability Date
---
21-Jan-2025
29-Jan-2025

Recommended CLI/SDK
---
Expand Down
2 changes: 2 additions & 0 deletions docs/modeling/extending-the-metadata-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ It takes the following parameters:
This annotation is applied to fields inside an Aspect. It instructs DataHub to index the field so it can be retrieved
via the search APIs.

:::note If you are adding @Searchable to a field that already has data, you'll want to restore indices [via api](https://datahubproject.io/docs/api/restli/restore-indices/) or [via upgrade step](https://github.com/datahub-project/datahub/blob/master/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java) to have it be populated with existing data.

It takes the following parameters:

- **fieldType**: string - The settings for how each field is indexed is defined by the field type. Each field type is
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ warn_unused_configs = yes
disallow_untyped_defs = no

# try to be a bit more strict in certain areas of the codebase
[mypy-datahub]
# Only for datahub's __init__.py - allow implicit reexport
implicit_reexport = yes
[mypy-datahub.*]
ignore_missing_imports = no
implicit_reexport = no
Expand Down Expand Up @@ -54,7 +57,7 @@ addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers -p no:
markers =
slow: marks tests that are slow to run, including all docker-based tests (deselect with '-m not slow')
integration: marks all integration tests, across all batches (deselect with '-m "not integration"')
integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelisation in CI. Batch 0 is the default batch.
integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelization in CI. Batch 0 is the default batch.
integration_batch_1: mark tests to run in batch 1 of integration tests
integration_batch_2: mark tests to run in batch 2 of integration tests
testpaths =
Expand Down
35 changes: 35 additions & 0 deletions metadata-ingestion/src/datahub/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from datahub.configuration.common import MetaError

# TODO: Move all other error types to this file.


class SdkUsageError(MetaError):
pass


class AlreadyExistsError(SdkUsageError):
pass


class ItemNotFoundError(SdkUsageError):
pass


class MultipleItemsFoundError(SdkUsageError):
pass


class SchemaFieldKeyError(SdkUsageError, KeyError):
pass


class IngestionAttributionWarning(Warning):
pass


class MultipleSubtypesWarning(Warning):
pass


class ExperimentalWarning(Warning):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -1673,10 +1673,11 @@ def to_platform_resource(
primary_key="",
)

# Extract user email mappings
# Extract user email mappings.
# Sort it to ensure the order is deterministic.
user_email_cache = {
user_id: user.email
for user_id, user in self._user_cache.items()
for user_id, user in sorted(self._user_cache.items())
if user.email
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Callable, Dict, Iterable, List, MutableMapping, Optional

from datahub.ingestion.api.report import SupportsAsObj
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import (
Expand Down Expand Up @@ -100,6 +101,9 @@ class SnowflakeTable(BaseTable):
def is_hybrid(self) -> bool:
return self.type is not None and self.type == "HYBRID TABLE"

def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.TABLE


@dataclass
class SnowflakeView(BaseView):
Expand All @@ -109,6 +113,9 @@ class SnowflakeView(BaseView):
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
is_secure: bool = False

def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.VIEW


@dataclass
class SnowflakeSchema:
Expand Down Expand Up @@ -154,6 +161,9 @@ class SnowflakeStream:
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
last_altered: Optional[datetime] = None

def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.SNOWFLAKE_STREAM


class _SnowflakeTagCache:
def __init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.snowflake.constants import (
GENERIC_PERMISSION_ERROR_KEY,
Expand Down Expand Up @@ -467,7 +466,13 @@ def _process_schema(
context=f"{db_name}.{schema_name}",
)

def _process_tags(self, snowflake_schema, schema_name, db_name, domain):
def _process_tags(
self,
snowflake_schema: SnowflakeSchema,
schema_name: str,
db_name: str,
domain: str,
) -> None:
snowflake_schema.tags = self.tag_extractor.get_tags_on_object(
schema_name=schema_name, db_name=db_name, domain=domain
)
Expand Down Expand Up @@ -837,15 +842,7 @@ def gen_dataset_workunits(
if dpi_aspect:
yield dpi_aspect

subTypes = SubTypes(
typeNames=(
[DatasetSubTypes.SNOWFLAKE_STREAM]
if isinstance(table, SnowflakeStream)
else [DatasetSubTypes.VIEW]
if isinstance(table, SnowflakeView)
else [DatasetSubTypes.TABLE]
)
)
subTypes = SubTypes(typeNames=[table.get_subtype()])

yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=subTypes
Expand Down Expand Up @@ -932,9 +929,9 @@ def get_dataset_properties(
"OWNER_ROLE_TYPE": table.owner_role_type,
"TABLE_NAME": table.table_name,
"BASE_TABLES": table.base_tables,
"STALE_AFTER": table.stale_after.isoformat()
if table.stale_after
else None,
"STALE_AFTER": (
table.stale_after.isoformat() if table.stale_after else None
),
}.items()
if v
}
Expand Down
16 changes: 10 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,15 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str)
)
self.report.sql_aggregator = self.aggregator.report

def _add_default_options(self, sql_config: SQLCommonConfig) -> None:
"""Add default SQLAlchemy options. Can be overridden by subclasses to add additional defaults."""
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if sql_config.is_profiling_enabled():
sql_config.options.setdefault(
"max_overflow", sql_config.profiling.max_workers
)

@classmethod
def test_connection(cls, config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
Expand Down Expand Up @@ -519,12 +528,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
# Known issue with sqlalchemy https://stackoverflow.com/questions/60804288/pycharm-duplicated-log-for-sqlalchemy-echo-true
sqlalchemy_log._add_default_handler = lambda x: None # type: ignore

# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if sql_config.is_profiling_enabled():
sql_config.options.setdefault(
"max_overflow", sql_config.profiling.max_workers
)
self._add_default_options(sql_config)

for inspector in self.get_inspectors():
profiler = None
Expand Down
Loading

0 comments on commit bf1faaa

Please sign in to comment.