Skip to content

Commit a4c600c

Browse files
authored
Merge branch 'master' into feat/update-mlflow-ui
2 parents f7336bc + 995857c commit a4c600c

File tree

19 files changed

+99
-174
lines changed

19 files changed

+99
-174
lines changed

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java

-10
Original file line numberDiff line numberDiff line change
@@ -3108,16 +3108,6 @@ private void configureDataProcessInstanceResolvers(final RuntimeWiring.Builder b
31083108
? dataProcessInstance.getDataPlatformInstance().getUrn()
31093109
: null;
31103110
}))
3111-
.dataFetcher(
3112-
"platform",
3113-
new LoadableTypeResolver<>(
3114-
dataPlatformType,
3115-
(env) -> {
3116-
final DataProcessInstance dataProcessInstance = env.getSource();
3117-
return dataProcessInstance.getPlatform() != null
3118-
? dataProcessInstance.getPlatform().getUrn()
3119-
: null;
3120-
}))
31213111
.dataFetcher("parentContainers", new ParentContainersResolver(entityClient))
31223112
.dataFetcher(
31233113
"container",

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataprocessinst/mappers/DataProcessInstanceMapper.java

-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.linkedin.data.DataMap;
99
import com.linkedin.data.template.RecordTemplate;
1010
import com.linkedin.datahub.graphql.QueryContext;
11-
import com.linkedin.datahub.graphql.generated.DataPlatform;
1211
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
1312
import com.linkedin.datahub.graphql.generated.EntityType;
1413
import com.linkedin.datahub.graphql.types.common.mappers.AuditStampMapper;
@@ -80,10 +79,6 @@ public DataProcessInstance apply(
8079
DataPlatformInstance dataPlatformInstance = new DataPlatformInstance(dataMap);
8180
dataProcessInstance.setDataPlatformInstance(
8281
DataPlatformInstanceAspectMapper.map(context, dataPlatformInstance));
83-
DataPlatform dataPlatform = new DataPlatform();
84-
dataPlatform.setUrn(dataPlatformInstance.getPlatform().toString());
85-
dataPlatform.setType(EntityType.DATA_PLATFORM);
86-
dataProcessInstance.setPlatform(dataPlatform);
8782
});
8883
mappingHelper.mapToResult(
8984
SUB_TYPES_ASPECT_NAME,

datahub-graphql-core/src/main/resources/entity.graphql

-5
Original file line numberDiff line numberDiff line change
@@ -13054,11 +13054,6 @@ extend type DataProcessInstance {
1305413054
"""
1305513055
container: Container
1305613056

13057-
"""
13058-
Standardized platform urn where the data process instance is defined
13059-
"""
13060-
platform: DataPlatform!
13061-
1306213057
"""
1306313058
Recursively get the lineage of containers for this entity
1306413059
"""

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/dataprocessinst/mappers/DataProcessInstanceMapperTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,10 @@ public void testMapPlatformInstance() throws Exception {
8080
DataProcessInstance instance = DataProcessInstanceMapper.map(null, entityResponse);
8181

8282
assertNotNull(instance.getDataPlatformInstance());
83-
assertNotNull(instance.getPlatform());
84-
assertEquals(instance.getPlatform().getUrn(), TEST_PLATFORM_URN);
85-
assertEquals(instance.getPlatform().getType(), EntityType.DATA_PLATFORM);
83+
assertNotNull(instance.getDataPlatformInstance().getPlatform());
84+
assertEquals(instance.getDataPlatformInstance().getPlatform().getUrn(), TEST_PLATFORM_URN);
85+
assertEquals(
86+
instance.getDataPlatformInstance().getPlatform().getType(), EntityType.DATA_PLATFORM);
8687
}
8788

8889
@Test

datahub-web-react/src/app/entity/dataProcessInstance/DataProcessInstanceEntity.tsx

+6-4
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ export class DataProcessInstanceEntity implements Entity<DataProcessInstance> {
144144
return {
145145
name,
146146
externalUrl,
147+
platform: processInstance?.dataPlatformInstance?.platform,
147148
};
148149
};
149150

@@ -195,9 +196,10 @@ export class DataProcessInstanceEntity implements Entity<DataProcessInstance> {
195196
subType={data.subTypes?.typeNames?.[0]}
196197
description=""
197198
platformName={
198-
data?.platform?.properties?.displayName || capitalizeFirstLetterOnly(data?.platform?.name)
199+
data?.dataPlatformInstance?.platform?.properties?.displayName ||
200+
capitalizeFirstLetterOnly(data?.dataPlatformInstance?.platform?.name)
199201
}
200-
platformLogo={data.platform.properties?.logoUrl}
202+
platformLogo={data.dataPlatformInstance?.platform?.properties?.logoUrl}
201203
platformInstanceId={data.dataPlatformInstance?.instanceId}
202204
owners={null}
203205
globalTags={null}
@@ -225,8 +227,8 @@ export class DataProcessInstanceEntity implements Entity<DataProcessInstance> {
225227
name: this.displayName(entity),
226228
type: EntityType.DataProcessInstance,
227229
subtype: entity?.subTypes?.typeNames?.[0],
228-
icon: entity?.platform?.properties?.logoUrl || undefined,
229-
platform: entity?.platform,
230+
icon: entity?.dataPlatformInstance?.platform?.properties?.logoUrl || undefined,
231+
platform: entity?.dataPlatformInstance?.platform,
230232
container: entity?.container,
231233
// health: entity?.health || undefined,
232234
};

datahub-web-react/src/graphql/dataProcessInstance.graphql

-6
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,6 @@ fragment processInstanceRelationshipResults on EntityRelationshipsResult {
6767
fragment dataProcessInstanceFields on DataProcessInstance {
6868
urn
6969
type
70-
platform {
71-
...platformFields
72-
}
7370
parentContainers {
7471
...parentContainersFields
7572
}
@@ -125,9 +122,6 @@ query getDataProcessInstance($urn: String!) {
125122
dataProcessInstance(urn: $urn) {
126123
urn
127124
type
128-
platform {
129-
...platformFields
130-
}
131125
parentContainers {
132126
...parentContainersFields
133127
}

docs/how/updating-datahub.md

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
2020

2121
### Breaking Changes
2222

23+
- #12408: The `platform` field in the DataPlatformInstance GraphQL type is removed. Clients need to retrieve the platform via the optional `dataPlatformInstance` field.
24+
2325
### Potential Downtime
2426

2527
### Deprecations

metadata-ingestion/docs/sources/redshift/redshift_recipe.yml

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
source:
12
type: redshift
23
config:
34
# Coordinates

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import re
33
from base64 import b32decode
44
from collections import defaultdict
5-
from itertools import groupby
65
from typing import Dict, Iterable, List, Optional, Set, Type, Union, cast
76

87
from google.cloud.bigquery.table import TableListItem
@@ -101,6 +100,7 @@
101100
from datahub.metadata.urns import TagUrn
102101
from datahub.sql_parsing.schema_resolver import SchemaResolver
103102
from datahub.utilities.file_backed_collections import FileBackedDict
103+
from datahub.utilities.groupby import groupby_unsorted
104104
from datahub.utilities.hive_schema_to_avro import (
105105
HiveColumnToAvroConverter,
106106
get_schema_fields_for_hive_column,
@@ -730,7 +730,7 @@ def gen_foreign_keys(
730730
foreign_keys: List[BigqueryTableConstraint] = list(
731731
filter(lambda x: x.type == "FOREIGN KEY", table.constraints)
732732
)
733-
for key, group in groupby(
733+
for key, group in groupby_unsorted(
734734
foreign_keys,
735735
lambda x: f"{x.referenced_project_id}.{x.referenced_dataset}.{x.referenced_table_name}",
736736
):

metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import itertools
21
import logging
32
import re
43
from abc import abstractmethod
@@ -111,6 +110,7 @@
111110
parse_statements_and_pick,
112111
try_format_query,
113112
)
113+
from datahub.utilities.groupby import groupby_unsorted
114114
from datahub.utilities.lossy_collections import LossyList
115115
from datahub.utilities.mapping import Constants, OperationProcessor
116116
from datahub.utilities.time import datetime_to_ts_millis
@@ -1929,7 +1929,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
19291929
else None
19301930
),
19311931
)
1932-
for downstream, upstreams in itertools.groupby(
1932+
for downstream, upstreams in groupby_unsorted(
19331933
node.upstream_cll, lambda x: x.downstream_col
19341934
)
19351935
]

metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import json
33
import logging
44
from collections import namedtuple
5-
from itertools import groupby
65
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
76

87
from pydantic.dataclasses import dataclass
@@ -58,6 +57,7 @@
5857
SubTypesClass,
5958
ViewPropertiesClass,
6059
)
60+
from datahub.utilities.groupby import groupby_unsorted
6161
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column
6262
from datahub.utilities.str_enum import StrEnum
6363

@@ -490,7 +490,7 @@ def loop_tables(
490490

491491
iter_res = self._alchemy_client.execute_query(statement)
492492

493-
for key, group in groupby(iter_res, self._get_table_key):
493+
for key, group in groupby_unsorted(iter_res, self._get_table_key):
494494
schema_name = (
495495
f"{db_name}.{key.schema}"
496496
if self.config.include_catalog_name_in_ids
@@ -647,7 +647,7 @@ def get_hive_view_columns(self, inspector: Inspector) -> Iterable[ViewDataset]:
647647
)
648648

649649
iter_res = self._alchemy_client.execute_query(statement)
650-
for key, group in groupby(iter_res, self._get_table_key):
650+
for key, group in groupby_unsorted(iter_res, self._get_table_key):
651651
db_name = self.get_db_name(inspector)
652652

653653
schema_name = (

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from dataclasses import dataclass
44
from datetime import datetime
55
from functools import lru_cache
6-
from itertools import groupby
76
from typing import (
87
Any,
98
Dict,
@@ -59,6 +58,7 @@
5958
from datahub.metadata.schema_classes import SchemaMetadataClass
6059
from datahub.sql_parsing.schema_resolver import SchemaResolver
6160
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
61+
from datahub.utilities.groupby import groupby_unsorted
6262

6363
logger: logging.Logger = logging.getLogger(__name__)
6464

@@ -286,7 +286,7 @@ def grouper(fk_row):
286286

287287
# TODO: Check if there's a better way
288288
fk_dicts = list()
289-
for constraint_info, constraint_cols in groupby(res, grouper):
289+
for constraint_info, constraint_cols in groupby_unsorted(res, grouper):
290290
fk_dict = {
291291
"name": str(constraint_info["name"]),
292292
"constrained_columns": list(),

metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import dataclasses
33
import enum
44
import functools
5-
import itertools
65
import json
76
import logging
87
import os
@@ -63,6 +62,7 @@
6362
FileBackedDict,
6463
FileBackedList,
6564
)
65+
from datahub.utilities.groupby import groupby_unsorted
6666
from datahub.utilities.lossy_collections import LossyDict, LossyList
6767
from datahub.utilities.ordered_set import OrderedSet
6868
from datahub.utilities.perf_timer import PerfTimer
@@ -1314,8 +1314,8 @@ def _gen_lineage_for_downstream(
13141314
upstream_aspect.fineGrainedLineages = []
13151315
for downstream_column, all_upstream_columns in cll.items():
13161316
# Group by query ID.
1317-
for query_id, upstream_columns_for_query in itertools.groupby(
1318-
sorted(all_upstream_columns.items(), key=lambda x: x[1]),
1317+
for query_id, upstream_columns_for_query in groupby_unsorted(
1318+
all_upstream_columns.items(),
13191319
key=lambda x: x[1],
13201320
):
13211321
upstream_columns = [x[0] for x in upstream_columns_for_query]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import collections
2+
from typing import Callable, Iterable, Tuple, TypeVar
3+
4+
T = TypeVar("T")
5+
K = TypeVar("K")
6+
7+
8+
def groupby_unsorted(
9+
iterable: Iterable[T], key: Callable[[T], K]
10+
) -> Iterable[Tuple[K, Iterable[T]]]:
11+
"""The default itertools.groupby() requires that the iterable is already sorted by the key.
12+
This method is similar to groupby() but without the pre-sorted requirement."""
13+
14+
values = collections.defaultdict(list)
15+
for v in iterable:
16+
values[key(v)].append(v)
17+
return values.items()

metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,26 @@
3838
{
3939
"upstreamType": "FIELD_SET",
4040
"upstreams": [
41-
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD),a)"
41+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD),a)"
4242
],
4343
"downstreamType": "FIELD",
4444
"downstreams": [
4545
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD),a)"
4646
],
4747
"confidenceScore": 0.2,
48-
"query": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1"
48+
"query": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e"
4949
},
5050
{
5151
"upstreamType": "FIELD_SET",
5252
"upstreams": [
53-
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD),a)"
53+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD),a)"
5454
],
5555
"downstreamType": "FIELD",
5656
"downstreams": [
5757
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD),a)"
5858
],
5959
"confidenceScore": 0.2,
60-
"query": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e"
60+
"query": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1"
6161
},
6262
{
6363
"upstreamType": "FIELD_SET",

metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts_from_temp_tables.json

+8-8
Original file line numberDiff line numberDiff line change
@@ -64,26 +64,26 @@
6464
{
6565
"upstreamType": "FIELD_SET",
6666
"upstreams": [
67-
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),customer_id)"
67+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),customer_id)"
6868
],
6969
"downstreamType": "FIELD",
7070
"downstreams": [
7171
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),customer_id)"
7272
],
7373
"confidenceScore": 0.2,
74-
"query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45"
74+
"query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8"
7575
},
7676
{
7777
"upstreamType": "FIELD_SET",
7878
"upstreams": [
79-
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),customer_id)"
79+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),customer_id)"
8080
],
8181
"downstreamType": "FIELD",
8282
"downstreams": [
8383
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),customer_id)"
8484
],
8585
"confidenceScore": 0.2,
86-
"query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8"
86+
"query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45"
8787
},
8888
{
8989
"upstreamType": "FIELD_SET",
@@ -100,26 +100,26 @@
100100
{
101101
"upstreamType": "FIELD_SET",
102102
"upstreams": [
103-
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),return_date)"
103+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),return_date)"
104104
],
105105
"downstreamType": "FIELD",
106106
"downstreams": [
107107
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),return_date)"
108108
],
109109
"confidenceScore": 0.2,
110-
"query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45"
110+
"query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8"
111111
},
112112
{
113113
"upstreamType": "FIELD_SET",
114114
"upstreams": [
115-
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),return_date)"
115+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),return_date)"
116116
],
117117
"downstreamType": "FIELD",
118118
"downstreams": [
119119
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),return_date)"
120120
],
121121
"confidenceScore": 0.2,
122-
"query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8"
122+
"query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45"
123123
},
124124
{
125125
"upstreamType": "FIELD_SET",

metadata-ingestion/tests/unit/utilities/test_utilities.py

+11
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
77
from datahub.testing.doctest import assert_doctest
88
from datahub.utilities.delayed_iter import delayed_iter
9+
from datahub.utilities.groupby import groupby_unsorted
910
from datahub.utilities.is_pytest import is_pytest_running
1011
from datahub.utilities.urns.dataset_urn import DatasetUrn
1112

@@ -335,3 +336,13 @@ def test_logging_name_extraction() -> None:
335336

336337
def test_is_pytest_running() -> None:
337338
assert is_pytest_running()
339+
340+
341+
def test_groupby_unsorted():
342+
grouped = groupby_unsorted("ABCAC", key=lambda x: x)
343+
344+
assert list(grouped) == [
345+
("A", ["A", "A"]),
346+
("B", ["B"]),
347+
("C", ["C", "C"]),
348+
]

0 commit comments

Comments
 (0)