Skip to content

Commit

Permalink
Merge branch 'master' into pinakipb2-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
pinakipb2 authored Jan 21, 2025
2 parents ef16e3a + 8ac35fa commit 4b583b2
Show file tree
Hide file tree
Showing 30 changed files with 1,077 additions and 216 deletions.
36 changes: 23 additions & 13 deletions .github/pr-labeler-config.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
ingestion:
- "metadata-ingestion/**/*"
- "metadata-ingestion-modules/**/*"
- "metadata-integration/**/*"
- changed-files:
- any-glob-to-any-file:
- "metadata-ingestion/**/*"
- "metadata-ingestion-modules/**/*"
- "metadata-integration/**/*"

devops:
- "docker/**/*"
- ".github/**/*"
- "perf-test/**/*"
- "metadata-service/**/*"
- changed-files:
- any-glob-to-any-file:
- "docker/**/*"
- ".github/**/*"
- "perf-test/**/*"
- "metadata-service/**/*"

product:
- "datahub-web-react/**/*"
- "datahub-frontend/**/*"
- "datahub-graphql-core/**/*"
- "metadata-io/**/*"
- changed-files:
- any-glob-to-any-file:
- "datahub-web-react/**/*"
- "datahub-frontend/**/*"
- "datahub-graphql-core/**/*"
- "metadata-io/**/*"

docs:
- "docs/**/*"
- changed-files:
- any-glob-to-any-file:
- "docs/**/*"

smoke_test:
- "smoke-test/**/*"
- changed-files:
- any-glob-to-any-file:
- "smoke-test/**/*"
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
sudo apt-get remove 'dotnet-*' azure-cli || true
sudo rm -rf /usr/local/lib/android/ || true
sudo docker image prune -a -f || true
- uses: szenius/set-timezone@v1.2
- uses: szenius/set-timezone@v2.0
with:
timezoneLinux: ${{ matrix.timezone }}
- name: Check out the repo
Expand Down
9 changes: 7 additions & 2 deletions .github/workflows/pr-labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ on:
pull_request_target:
types: [opened, reopened]

permissions:
contents: read
pull-requests: write

jobs:
triage:
permissions:
Expand All @@ -25,7 +29,6 @@ jobs:
"chriscollins3456",
"david-leifker",
"shirshanka",
"sid-acryl",
"swaroopjagadish",
"treff7es",
"yoonhyejin",
Expand All @@ -46,7 +49,9 @@ jobs:
"kevinkarchacryl",
"sgomezvillamor",
"acrylJonny",
"chakru-r"
"chakru-r",
"brock-acryl",
"mminichino"
]'),
github.actor
)
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python-build-pages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
with:
distribution: "zulu"
java-version: 17
- uses: gradle/actions/setup-gradle@v3
- uses: gradle/actions/setup-gradle@v4
- uses: acryldata/sane-checkout-action@v3
- uses: actions/setup-python@v5
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ public class EntityTypeUrnMapper {
.put(
Constants.BUSINESS_ATTRIBUTE_ENTITY_NAME,
"urn:li:entityType:datahub.businessAttribute")
.put(
Constants.DATA_PROCESS_INSTANCE_ENTITY_NAME,
"urn:li:entityType:datahub.dataProcessInstance")
.build();

private static final Map<String, String> ENTITY_TYPE_URN_TO_NAME =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.linkedin.datahub.graphql.types.entitytype;

import static org.testng.Assert.*;

import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.metadata.Constants;
import org.testng.annotations.Test;

public class EntityTypeMapperTest {

@Test
public void testGetType() throws Exception {
assertEquals(EntityTypeMapper.getType(Constants.DATASET_ENTITY_NAME), EntityType.DATASET);
}

@Test
public void testGetName() throws Exception {
assertEquals(EntityTypeMapper.getName(EntityType.DATASET), Constants.DATASET_ENTITY_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.datahub.graphql.types.entitytype;

import static org.testng.Assert.*;

import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.metadata.Constants;
import org.testng.annotations.Test;

public class EntityTypeUrnMapperTest {

@Test
public void testGetName() throws Exception {
assertEquals(
EntityTypeUrnMapper.getName("urn:li:entityType:datahub.dataset"),
Constants.DATASET_ENTITY_NAME);
}

@Test
public void testGetEntityType() throws Exception {
assertEquals(
EntityTypeUrnMapper.getEntityType("urn:li:entityType:datahub.dataset"), EntityType.DATASET);
}

@Test
public void testGetEntityTypeUrn() throws Exception {
assertEquals(
EntityTypeUrnMapper.getEntityTypeUrn(Constants.DATASET_ENTITY_NAME),
"urn:li:entityType:datahub.dataset");
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
source:
type: redshift
config:
# Coordinates
Expand Down
18 changes: 16 additions & 2 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ def undo_by_filter(
type=str,
help="Urn of the entity to delete, for single entity deletion",
)
@click.option(
"--urn-file",
required=False,
help="Path of file with urns (one per line) to be deleted",
)
@click.option(
"-a",
"--aspect",
Expand Down Expand Up @@ -353,6 +358,7 @@ def undo_by_filter(
@telemetry.with_telemetry()
def by_filter(
urn: Optional[str],
urn_file: Optional[str],
aspect: Optional[str],
force: bool,
soft: bool,
Expand All @@ -373,6 +379,7 @@ def by_filter(
# Validate the cli arguments.
_validate_user_urn_and_filters(
urn=urn,
urn_file=urn_file,
entity_type=entity_type,
platform=platform,
env=env,
Expand Down Expand Up @@ -429,6 +436,12 @@ def by_filter(
batch_size=batch_size,
)
)
elif urn_file:
with open(urn_file, "r") as r:
urns = []
for line in r.readlines():
urn = line.strip().strip('"')
urns.append(urn)
else:
urns = list(
graph.get_urns_by_filter(
Expand Down Expand Up @@ -537,6 +550,7 @@ def process_urn(urn):

def _validate_user_urn_and_filters(
urn: Optional[str],
urn_file: Optional[str],
entity_type: Optional[str],
platform: Optional[str],
env: Optional[str],
Expand All @@ -549,9 +563,9 @@ def _validate_user_urn_and_filters(
raise click.UsageError(
"You cannot provide both an urn and a filter rule (entity-type / platform / env / query)."
)
elif not urn and not (entity_type or platform or env or query):
elif not urn and not urn_file and not (entity_type or platform or env or query):
raise click.UsageError(
"You must provide either an urn or at least one filter (entity-type / platform / env / query) in order to delete entities."
"You must provide either an urn or urn_file or at least one filter (entity-type / platform / env / query) in order to delete entities."
)
elif query:
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import re
from base64 import b32decode
from collections import defaultdict
from itertools import groupby
from typing import Dict, Iterable, List, Optional, Set, Type, Union, cast

from google.cloud.bigquery.table import TableListItem
Expand Down Expand Up @@ -101,6 +100,7 @@
from datahub.metadata.urns import TagUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.groupby import groupby_unsorted
from datahub.utilities.hive_schema_to_avro import (
HiveColumnToAvroConverter,
get_schema_fields_for_hive_column,
Expand Down Expand Up @@ -730,7 +730,7 @@ def gen_foreign_keys(
foreign_keys: List[BigqueryTableConstraint] = list(
filter(lambda x: x.type == "FOREIGN KEY", table.constraints)
)
for key, group in groupby(
for key, group in groupby_unsorted(
foreign_keys,
lambda x: f"{x.referenced_project_id}.{x.referenced_dataset}.{x.referenced_table_name}",
):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import itertools
import logging
import re
from abc import abstractmethod
Expand Down Expand Up @@ -111,6 +110,7 @@
parse_statements_and_pick,
try_format_query,
)
from datahub.utilities.groupby import groupby_unsorted
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.time import datetime_to_ts_millis
Expand Down Expand Up @@ -1929,7 +1929,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
else None
),
)
for downstream, upstreams in itertools.groupby(
for downstream, upstreams in groupby_unsorted(
node.upstream_cll, lambda x: x.downstream_col
)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ class PlatformDetail(ConfigModel):
description="The database that all assets produced by this connector belong to. "
"For destinations, this defaults to the fivetran log config's database.",
)
include_schema_in_urn: bool = pydantic.Field(
default=True,
description="Include schema in the dataset URN. In some cases, the schema is not relevant to the dataset URN and Fivetran sets it to the source and destination table names in the connector.",
)


class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,31 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
)

for lineage in connector.lineage:
source_table = (
lineage.source_table
if source_details.include_schema_in_urn
else lineage.source_table.split(".", 1)[1]
)
input_dataset_urn = DatasetUrn.create_from_ids(
platform_id=source_details.platform,
table_name=(
f"{source_details.database.lower()}.{lineage.source_table}"
f"{source_details.database.lower()}.{source_table}"
if source_details.database
else lineage.source_table
else source_table
),
env=source_details.env,
platform_instance=source_details.platform_instance,
)
input_dataset_urn_list.append(input_dataset_urn)

destination_table = (
lineage.destination_table
if destination_details.include_schema_in_urn
else lineage.destination_table.split(".", 1)[1]
)
output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=destination_details.platform,
table_name=f"{destination_details.database.lower()}.{lineage.destination_table}",
table_name=f"{destination_details.database.lower()}.{destination_table}",
env=destination_details.env,
platform_instance=destination_details.platform_instance,
)
Expand Down Expand Up @@ -176,12 +186,12 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
**{
f"source.{k}": str(v)
for k, v in source_details.dict().items()
if v is not None
if v is not None and not isinstance(v, bool)
},
**{
f"destination.{k}": str(v)
for k, v in destination_details.dict().items()
if v is not None
if v is not None and not isinstance(v, bool)
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(self, config: GCSSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.config = config
self.report = GCSSourceReport()
self.platform: str = PLATFORM_GCS
self.s3_source = self.create_equivalent_s3_source(ctx)

@classmethod
Expand Down Expand Up @@ -135,7 +136,7 @@ def create_equivalent_s3_path_specs(self):

def create_equivalent_s3_source(self, ctx: PipelineContext) -> S3Source:
config = self.create_equivalent_s3_config()
return self.s3_source_overrides(S3Source(config, ctx))
return self.s3_source_overrides(S3Source(config, PipelineContext(ctx.run_id)))

def s3_source_overrides(self, source: S3Source) -> S3Source:
source.source_config.platform = PLATFORM_GCS
Expand Down
Loading

0 comments on commit 4b583b2

Please sign in to comment.