Skip to content

Commit 7587356

Browse files
authored
Merge pull request #51 from atlanhq/get_lineage
Get lineage
2 parents e682638 + d2f81b3 commit 7587356

File tree

10 files changed

+5782
-4
lines changed

10 files changed

+5782
-4
lines changed

HISTORY.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
## 0.0.31 (May 15, 2023)
2+
3+
* Added the following classes to support lineage retrieval
4+
* LineageRelation
5+
* DirectedPair
6+
* LineageGraph
7+
* LineageResponse
8+
* Added the get_lineage method to AtlanClient
9+
* Modify create_typdef in client to handle creating EnumDef
10+
* Refresh caches on any SDK-driven creates or deletes
11+
112
## 0.0.30 (May 11, 2023)
213

314
* Fix problem where custom metadata created via the SDK failed to show up in the UI

pyatlan/client/atlan.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
GET_ALL_TYPE_DEFS,
3030
GET_ENTITY_BY_GUID,
3131
GET_ENTITY_BY_UNIQUE_ATTRIBUTE,
32+
GET_LINEAGE,
3233
GET_ROLES,
3334
INDEX_SEARCH,
3435
PARTIAL_UPDATE_ENTITY_BY_ATTRIBUTE,
@@ -67,12 +68,14 @@
6768
AtlanTypeCategory,
6869
CertificateStatus,
6970
)
71+
from pyatlan.model.lineage import LineageRequest, LineageResponse
7072
from pyatlan.model.response import AssetMutationResponse
7173
from pyatlan.model.role import RoleResponse
7274
from pyatlan.model.search import DSL, IndexSearchRequest, Term
7375
from pyatlan.model.typedef import (
7476
ClassificationDef,
7577
CustomMetadataDef,
78+
EnumDef,
7679
TypeDef,
7780
TypeDefResponse,
7881
)
@@ -472,6 +475,16 @@ def create_typedef(self, typedef: TypeDef) -> TypeDefResponse:
472475
relationship_defs=[],
473476
custom_metadata_defs=[typedef],
474477
)
478+
elif isinstance(typedef, EnumDef):
479+
# Set up the request payload...
480+
payload = TypeDefResponse(
481+
classification_defs=[],
482+
enum_defs=[typedef],
483+
struct_defs=[],
484+
entity_defs=[],
485+
relationship_defs=[],
486+
custom_metadata_defs=[],
487+
)
475488
else:
476489
raise InvalidRequestException(
477490
"Unable to create new type definitions of category: "
@@ -725,8 +738,10 @@ def find_connections_by_name(
725738
self,
726739
name: str,
727740
connector_type: AtlanConnectorType,
728-
attributes: list[str] = None,
741+
attributes: Optional[list[str]] = None,
729742
) -> list[Connection]:
743+
if attributes is None:
744+
attributes = []
730745
query = (
731746
Term.with_state("ACTIVE")
732747
+ Term.with_type_name("CONNECTION")
@@ -740,3 +755,9 @@ def find_connections_by_name(
740755
)
741756
results = self.search(search_request)
742757
return [asset for asset in results if isinstance(asset, Connection)]
758+
759+
def get_lineage(self, lineage_request: LineageRequest) -> LineageResponse:
760+
raw_json = self._call_api(
761+
GET_LINEAGE, None, lineage_request, exclude_unset=False
762+
)
763+
return LineageResponse(**raw_json)

pyatlan/client/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
BULK_HEADERS = "bulk/headers"
2828

2929
BULK_UPDATE = API(ENTITY_BULK_API, HTTPMethod.POST, HTTPStatus.OK)
30+
# Lineage APIs
31+
GET_LINEAGE = API(f"{BASE_URI}lineage/getlineage", HTTPMethod.POST, HTTPStatus.OK)
3032
# Entity APIs
3133
GET_ENTITY_BY_GUID = API(f"{ENTITY_API}guid", HTTPMethod.GET, HTTPStatus.OK)
3234
GET_ENTITY_BY_UNIQUE_ATTRIBUTE = API(

pyatlan/model/enums.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,9 @@ def to_qualified_name(self):
256256
class SortOrder(str, Enum):
257257
ASCENDING = "asc"
258258
DESCENDING = "desc"
259+
260+
261+
class LineageDirection(str, Enum):
262+
UPSTREAM = "INPUT"
263+
DOWNSTREAM = "OUTPUT"
264+
BOTH = "BOTH"

pyatlan/model/lineage.py

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2022 Atlan Pte. Ltd.
3+
# Based on original code from https://github.com/apache/atlas (under Apache-2.0 license)
4+
from collections import deque
5+
from typing import TYPE_CHECKING, Any, Optional
6+
7+
from pydantic import Field
8+
9+
if TYPE_CHECKING:
10+
from dataclasses import dataclass
11+
else:
12+
from pydantic.dataclasses import dataclass
13+
14+
from pyatlan.error import InvalidRequestError
15+
from pyatlan.model.assets import Asset
16+
from pyatlan.model.core import AtlanObject
17+
from pyatlan.model.enums import LineageDirection
18+
19+
20+
class LineageRelation(AtlanObject):
21+
from_entity_id: Optional[str]
22+
to_entity_id: Optional[str]
23+
process_id: Optional[str]
24+
relationship_id: Optional[str]
25+
26+
@property
27+
def is_full_link(self):
28+
return self.process_id is not None
29+
30+
31+
@dataclass(frozen=True)
32+
class DirectedPair:
33+
process_guid: str
34+
target_guid: str
35+
36+
37+
@dataclass(frozen=True)
38+
class LineageGraph:
39+
downstream_list: dict[str, dict[DirectedPair, None]]
40+
upstream_list: dict[str, dict[DirectedPair, None]]
41+
42+
@classmethod
43+
def create(cls, relations: list[LineageRelation]) -> "LineageGraph":
44+
downstream_list: dict[str, dict[DirectedPair, None]] = {}
45+
upstream_list: dict[str, dict[DirectedPair, None]] = {}
46+
47+
def add_relation(relation: LineageRelation):
48+
if (
49+
relation.from_entity_id
50+
and relation.process_id
51+
and relation.to_entity_id
52+
):
53+
add_edges(
54+
relation.from_entity_id, relation.process_id, relation.to_entity_id
55+
)
56+
57+
def add_edges(source_guid: str, process_guid: str, target_guid: str):
58+
if source_guid not in downstream_list:
59+
downstream_list[source_guid] = {}
60+
if target_guid not in upstream_list:
61+
upstream_list[target_guid] = {}
62+
downstream_list[source_guid][
63+
DirectedPair(process_guid=process_guid, target_guid=target_guid)
64+
] = None
65+
upstream_list[target_guid][
66+
DirectedPair(process_guid=process_guid, target_guid=source_guid)
67+
] = None
68+
69+
for relation in relations:
70+
if relation.is_full_link:
71+
add_relation(relation)
72+
else:
73+
raise InvalidRequestError(
74+
param="",
75+
code="ATLAN-JAVA-400-013",
76+
message="Lineage was retrieved using hideProces=false. "
77+
"We do not provide a graph view in this case.",
78+
)
79+
return cls(downstream_list=downstream_list, upstream_list=upstream_list)
80+
81+
@staticmethod
82+
def get_asset_guids(
83+
guid: str, guids: dict[str, dict[DirectedPair, None]]
84+
) -> list[str]:
85+
if guid in guids:
86+
return list({pair.target_guid: None for pair in guids[guid].keys()}.keys())
87+
return []
88+
89+
@staticmethod
90+
def get_process_guids(
91+
guid: str, guids: dict[str, dict[DirectedPair, None]]
92+
) -> list[str]:
93+
if guid in guids:
94+
return list({pair.process_guid: None for pair in guids[guid].keys()}.keys())
95+
return []
96+
97+
def get_downstream_asset_guids(self, guid: str) -> list[str]:
98+
return LineageGraph.get_asset_guids(guid, self.downstream_list)
99+
100+
def get_downstream_process_guids(self, guid: str) -> list[str]:
101+
return LineageGraph.get_process_guids(guid, self.downstream_list)
102+
103+
def get_upstream_asset_guids(self, guid: str) -> list[str]:
104+
return LineageGraph.get_asset_guids(guid, self.upstream_list)
105+
106+
def get_upstream_process_guids(self, guid: str) -> list[str]:
107+
return LineageGraph.get_process_guids(guid, self.upstream_list)
108+
109+
def get_all_downstream_asset_guids_dfs(self, guid: str) -> list[str]:
110+
visited: dict[str, None] = {}
111+
stack: deque[str] = deque()
112+
stack.append(guid)
113+
while len(stack) > 0:
114+
to_traverse = stack.pop()
115+
if to_traverse not in visited:
116+
visited[to_traverse] = None
117+
for downstream_guid in self.get_downstream_asset_guids(to_traverse):
118+
if downstream_guid not in visited:
119+
stack.append(downstream_guid)
120+
return list(visited.keys())
121+
122+
def get_all_upstream_asset_guids_dfs(self, guid: str) -> list[str]:
123+
visited: dict[str, None] = {}
124+
stack: deque[str] = deque()
125+
stack.append(guid)
126+
while len(stack) > 0:
127+
to_traverse = stack.pop()
128+
if to_traverse not in visited:
129+
visited[to_traverse] = None
130+
for upstream_guid in self.get_upstream_asset_guids(to_traverse):
131+
if upstream_guid not in visited:
132+
stack.append(upstream_guid)
133+
return list(visited.keys())
134+
135+
136+
class LineageResponse(AtlanObject):
137+
base_entity_guid: str
138+
lineage_direction: LineageDirection
139+
lineage_depth: int
140+
limit: int
141+
offset: int
142+
has_more_upstream_vertices: bool
143+
has_more_downstream_vertices: bool
144+
guid_entity_map: dict[str, Asset]
145+
relations: list[LineageRelation]
146+
vertex_children_info: Optional[dict[str, Any]]
147+
graph: Optional[LineageGraph] = None
148+
149+
def get_graph(self):
150+
if self.graph is None:
151+
self.graph = LineageGraph.create(self.relations)
152+
return self.graph
153+
154+
def get_all_downstream_asset_guids_dfs(
155+
self, guid: Optional[str] = None
156+
) -> list[str]:
157+
return self.get_graph().get_all_downstream_asset_guids_dfs(
158+
guid if guid else self.base_entity_guid
159+
)
160+
161+
def get_all_downstream_assets_dfs(self, guid: Optional[str] = None) -> list[Asset]:
162+
return [
163+
self.guid_entity_map[guid]
164+
for guid in self.get_graph().get_all_downstream_asset_guids_dfs(
165+
guid if guid else self.base_entity_guid
166+
)
167+
]
168+
169+
def get_all_upstream_asset_guids_dfs(self, guid: Optional[str] = None) -> list[str]:
170+
return self.get_graph().get_all_upstream_asset_guids_dfs(
171+
guid if guid else self.base_entity_guid
172+
)
173+
174+
def get_all_upstream_assets_dfs(self, guid: Optional[str] = None) -> list[Asset]:
175+
return [
176+
self.guid_entity_map[guid]
177+
for guid in self.get_graph().get_all_upstream_asset_guids_dfs(
178+
guid if guid else self.base_entity_guid
179+
)
180+
]
181+
182+
def get_downstream_asset_guids(self, guid: Optional[str] = None) -> list[str]:
183+
return self.get_graph().get_downstream_asset_guids(
184+
guid if guid else self.base_entity_guid
185+
)
186+
187+
def get_downstream_assets(self, guid: Optional[str] = None) -> list[Asset]:
188+
return [
189+
self.guid_entity_map[guid]
190+
for guid in self.get_graph().get_downstream_asset_guids(
191+
guid if guid else self.base_entity_guid
192+
)
193+
]
194+
195+
def get_downstream_process_guids(self, guid: Optional[str] = None) -> list[str]:
196+
return self.get_graph().get_downstream_process_guids(
197+
guid if guid else self.base_entity_guid
198+
)
199+
200+
def get_upstream_asset_guids(self, guid: Optional[str] = None) -> list[str]:
201+
return self.get_graph().get_upstream_asset_guids(
202+
guid if guid else self.base_entity_guid
203+
)
204+
205+
def get_upstream_assets(self, guid: Optional[str] = None) -> list[Asset]:
206+
return [
207+
self.guid_entity_map[guid]
208+
for guid in self.get_graph().get_upstream_asset_guids(
209+
guid if guid else self.base_entity_guid
210+
)
211+
]
212+
213+
def get_upstream_process_guids(self, guid: Optional[str] = None) -> list[str]:
214+
return self.get_graph().get_upstream_process_guids(
215+
guid if guid else self.base_entity_guid
216+
)
217+
218+
219+
class LineageRequest(AtlanObject):
220+
guid: str
221+
depth: int = Field(default=0)
222+
direction: LineageDirection = Field(default=LineageDirection.BOTH)
223+
hide_process: bool = Field(default=True)
224+
allow_deleted_process: bool = Field(default=False)

pyatlan/model/typedef.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ class Options(AtlanObject):
131131
is_archived: Optional[bool] = Field(
132132
None,
133133
description="Whether the attribute has been deleted (true) or is still active (false).\n",
134-
example=True
134+
example=True,
135135
)
136136
archived_at: Optional[int] = Field(
137137
None, description="When the attribute was deleted.\n"
@@ -148,7 +148,7 @@ class Options(AtlanObject):
148148
is_new: Optional[bool] = Field(
149149
True,
150150
description="Whether the attribute is being newly created (true) or not (false).",
151-
example=True
151+
example=True,
152152
)
153153
cardinality: Optional[Cardinality] = Field(
154154
"SINGLE",

pyatlan/version.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.0.30
1+
0.0.31

tests/integration/test_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pyatlan.client.atlan import AtlanClient
77
from pyatlan.model.assets import AtlasGlossary, AtlasGlossaryTerm, Connection, Database
88
from pyatlan.model.enums import AtlanConnectorType
9+
from pyatlan.model.lineage import LineageRequest
910

1011
iter_count = count(1)
1112

@@ -230,3 +231,11 @@ def test_find_connections_by_name(client: AtlanClient):
230231
)
231232
assert len(connections) == 1
232233
assert connections[0].connector_name == AtlanConnectorType.SNOWFLAKE.value
234+
235+
236+
def test_get_lineage(client: AtlanClient):
237+
response = client.get_lineage(
238+
LineageRequest(guid="75474eab-3105-4ef9-9f84-709e386a7d3e")
239+
)
240+
for guid, asset in response.guid_entity_map.items():
241+
assert guid == asset.guid

0 commit comments

Comments
 (0)