Skip to content

Commit 2b4db14

Browse files
committed
Merge branch 'prototype_sdk' of https://github.com/atlanhq/atlan-python into prototype_sdk
Merging Chris's changes
2 parents 4f055f3 + 8ff8bfd commit 2b4db14

15 files changed

+960
-155
lines changed

pyatlan/cache/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from pyatlan.cache.role_cache import RoleCache
2+
from pyatlan.cache.classification_cache import ClassificationCache
3+
from pyatlan.cache.custom_metadata_cache import CustomMetadataCache

pyatlan/cache/classification_cache.py

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
2+
from pyatlan.client.atlan import AtlanClient
3+
from pyatlan.client.typedef import TypeDefClient
4+
from pyatlan.model.typedef import ClassificationDef
5+
from pyatlan.model.enums import AtlanTypeCategory
6+
7+
from typing import Optional
8+
9+
class ClassificationCache:
10+
11+
cache_by_id: dict[str, ClassificationDef] = dict()
12+
map_id_to_name: dict[str, str] = dict()
13+
map_name_to_id: dict[str, str] = dict()
14+
deleted_ids: set[str] = set()
15+
deleted_names: set[str] = set()
16+
17+
@classmethod
18+
def _refresh_cache(cls) -> None:
19+
response = TypeDefClient(AtlanClient()).get_typedefs(type=AtlanTypeCategory.CLASSIFICATION)
20+
if response is not None:
21+
cls.cache_by_id = dict()
22+
cls.map_id_to_name = dict()
23+
cls.map_name_to_id = dict()
24+
for classification in response.classification_defs:
25+
classification_id = classification.name
26+
classification_name = classification.display_name
27+
cls.cache_by_id[classification_id] = classification
28+
cls.map_id_to_name[classification_id] = classification_name
29+
cls.map_name_to_id[classification_name] = classification_id
30+
31+
@classmethod
32+
def get_id_for_name(cls, name: str) -> Optional[str]:
33+
"""
34+
Translate the provided human-readable classification name to its Atlan-internal ID string.
35+
"""
36+
cls_id = cls.map_name_to_id.get(name)
37+
if not cls_id and not name in cls.deleted_names:
38+
# If not found, refresh the cache and look again (could be stale)
39+
cls._refresh_cache()
40+
cls_id = cls.map_name_to_id.get(name)
41+
if not cls_id:
42+
# If still not found after refresh, mark it as deleted (could be
43+
# an entry in an audit log that refers to a classification that
44+
# no longer exists)
45+
cls.deleted_names.add(name)
46+
return cls_id
47+
48+
@classmethod
49+
def get_name_for_id(cls, idstr: str) -> Optional[str]:
50+
"""
51+
Translate the provided Atlan-internal classification ID string to the human-readable classification name.
52+
"""
53+
cls_name = cls.map_id_to_name.get(idstr)
54+
if not cls_name and not idstr in cls.deleted_ids:
55+
# If not found, refresh the cache and look again (could be stale)
56+
cls._refresh_cache()
57+
cls_name = cls.map_id_to_name.get(idstr)
58+
if not cls_name:
59+
# If still not found after refresh, mark it as deleted (could be
60+
# an entry in an audit log that refers to a classification that
61+
# no longer exists)
62+
cls.deleted_ids.add(idstr)
63+
return cls_name
+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
2+
from pyatlan.client.atlan import AtlanClient
3+
from pyatlan.client.typedef import TypeDefClient
4+
from pyatlan.model.typedef import CustomMetadataDef, AttributeDef
5+
from pyatlan.model.enums import AtlanTypeCategory
6+
from pyatlan.error import LogicError
7+
8+
from typing import Optional
9+
10+
class CustomMetadataCache:
11+
12+
cache_by_id: dict[str, CustomMetadataDef] = dict()
13+
map_id_to_name: dict[str, str] = dict()
14+
map_name_to_id: dict[str, str] = dict()
15+
map_attr_id_to_name: dict[str, dict[str, str]] = dict()
16+
map_attr_name_to_id: dict[str, dict[str, str]] = dict()
17+
archived_attr_ids: dict[str, str] = dict()
18+
19+
@classmethod
20+
def _refresh_cache(cls) -> None:
21+
response = TypeDefClient(AtlanClient()).get_typedefs(type=AtlanTypeCategory.CUSTOM_METADATA)
22+
if response is not None:
23+
cls.cache_by_id = dict()
24+
cls.map_id_to_name = dict()
25+
cls.map_name_to_id = dict()
26+
cls.map_attr_id_to_name = dict()
27+
cls.map_attr_name_to_id = dict()
28+
cls.archived_attr_ids = dict()
29+
for cm in response.custom_metadata_defs:
30+
type_id = cm.name
31+
type_name = cm.display_name
32+
cls.cache_by_id[type_id] = cm
33+
cls.map_id_to_name[type_id] = type_name
34+
cls.map_name_to_id[type_name] = type_id
35+
cls.map_attr_id_to_name[type_id] = dict()
36+
cls.map_attr_name_to_id[type_id] = dict()
37+
if cm.attribute_defs:
38+
for attr in cm.attribute_defs:
39+
attr_id = attr.name
40+
attr_name = attr.display_name
41+
cls.map_attr_id_to_name[type_id][attr_id] = attr_name
42+
if attr.options and attr.options.is_archived:
43+
cls.archived_attr_ids[attr_id] = attr_name
44+
else:
45+
if attr_name in cls.map_attr_name_to_id[type_id]:
46+
raise LogicError(
47+
"Multiple custom attributes with exactly the same name (" + attr_name + ") found for: " + type_name,
48+
code="ATLAN-PYTHON-500-100"
49+
)
50+
cls.map_attr_name_to_id[type_id][attr_name] = attr_id
51+
52+
53+
@classmethod
54+
def get_id_for_name(cls, name: str) -> Optional[str]:
55+
"""
56+
Translate the provided human-readable custom metadata set name to its Atlan-internal ID string.
57+
"""
58+
cm_id = cls.map_name_to_id.get(name)
59+
if cm_id:
60+
return cm_id
61+
else:
62+
# If not found, refresh the cache and look again (could be stale)
63+
cls._refresh_cache()
64+
return cls.map_name_to_id.get(name)
65+
66+
@classmethod
67+
def get_name_for_id(cls, idstr: str) -> Optional[str]:
68+
"""
69+
Translate the provided Atlan-internal custom metadata ID string to the human-readable custom metadata set name.
70+
"""
71+
cm_name = cls.map_id_to_name.get(idstr)
72+
if cm_name:
73+
return cm_name
74+
else:
75+
# If not found, refresh the cache and look again (could be stale)
76+
cls._refresh_cache()
77+
return cls.map_id_to_name.get(idstr)
78+
79+
@classmethod
80+
def get_all_custom_attributes(cls, include_deleted: bool=False, force_refresh: bool=False) -> dict[str, list[AttributeDef]]:
81+
"""
82+
Retrieve all the custom metadata attributes. The map will be keyed by custom metadata set
83+
name, and the value will be a listing of all the attributes within that set (with all the details
84+
of each of those attributes).
85+
"""
86+
if len(cls.cache_by_id) == 0 or force_refresh:
87+
cls._refresh_cache()
88+
m = {}
89+
for type_id, cm in cls.cache_by_id.items():
90+
type_name = cls.get_name_for_id(type_id)
91+
attribute_defs = cm.attribute_defs
92+
if include_deleted:
93+
to_include = attribute_defs
94+
else:
95+
to_include = []
96+
if attribute_defs:
97+
for attr in attribute_defs:
98+
if not attr.options or not attr.options.is_archived:
99+
to_include.append(attr)
100+
m[type_name] = to_include
101+
return m
102+
103+
@classmethod
104+
def get_attr_id_for_name(cls, set_name: str, attr_name: str) -> Optional[str]:
105+
"""
106+
Translate the provided human-readable custom metadata set and attribute names to the Atlan-internal ID string
107+
for the attribute.
108+
"""
109+
attr_id = None
110+
set_id = cls.get_id_for_name(set_name)
111+
if set_id:
112+
sub_map = cls.map_attr_name_to_id.get(set_id)
113+
if sub_map:
114+
attr_id = sub_map.get(attr_name)
115+
if attr_id:
116+
# If found, return straight away
117+
return attr_id
118+
else:
119+
# Otherwise, refresh the cache and look again (could be stale)
120+
cls._refresh_cache()
121+
sub_map = cls.map_attr_name_to_id.get(set_id)
122+
if sub_map:
123+
return sub_map.get(attr_name)
124+
return None
125+
126+
@classmethod
127+
def _get_attributes_for_search_results(cls, set_id: str) -> Optional[list[str]]:
128+
sub_map = cls.map_attr_name_to_id.get(set_id)
129+
if sub_map:
130+
attr_ids = sub_map.values()
131+
dot_names = []
132+
for idstr in attr_ids:
133+
dot_names.append(set_id + "." + idstr)
134+
return dot_names
135+
return None
136+
137+
@classmethod
138+
def get_attributes_for_search_results(cls, set_name: str) -> Optional[list[str]]:
139+
"""
140+
Retrieve the full set of custom attributes to include on search results.
141+
"""
142+
set_id = cls.get_id_for_name(set_name)
143+
if set_id:
144+
dot_names = cls._get_attributes_for_search_results(set_id)
145+
if dot_names:
146+
return dot_names
147+
else:
148+
cls._refresh_cache()
149+
return cls._get_attributes_for_search_results(set_id)
150+
return None

pyatlan/cache/role_cache.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
2+
from pyatlan.client.atlan import AtlanClient
3+
from pyatlan.client.role import RoleClient
4+
from pyatlan.model.role import AtlanRole
5+
6+
from typing import Optional
7+
8+
class RoleCache:
9+
10+
cache_by_id: dict[str, AtlanRole] = dict()
11+
map_id_to_name: dict[str, str] = dict()
12+
map_name_to_id: dict[str, str] = dict()
13+
14+
@classmethod
15+
def _refresh_cache(cls) -> None:
16+
response = RoleClient(AtlanClient()).get_all_roles()
17+
if response is not None:
18+
cls.cache_by_id = dict()
19+
cls.map_id_to_name = dict()
20+
cls.map_name_to_id = dict()
21+
for role in response.records:
22+
role_id = role.id
23+
role_name = role.name
24+
cls.cache_by_id[role_id] = role
25+
cls.map_id_to_name[role_id] = role_name
26+
cls.map_name_to_id[role_name] = role_id
27+
28+
@classmethod
29+
def get_id_for_name(cls, name: str) -> Optional[str]:
30+
"""
31+
Translate the provided human-readable role name to its GUID.
32+
"""
33+
role_id = cls.map_name_to_id.get(name)
34+
if role_id:
35+
return role_id
36+
else:
37+
cls._refresh_cache()
38+
return cls.map_name_to_id.get(name)
39+
40+
@classmethod
41+
def get_name_for_id(cls, idstr: str) -> Optional[str]:
42+
"""
43+
Translate the provided role GUID to the human-readable role name.
44+
"""
45+
role_name = cls.map_id_to_name.get(idstr)
46+
if role_name:
47+
return role_name
48+
else:
49+
cls._refresh_cache()
50+
return cls.map_id_to_name.get(idstr)

pyatlan/client/atlan.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def call_api(self, api, query_params=None, request_obj=None):
8585
return None
8686
elif response.status_code == api.expected_status:
8787
try:
88-
if response.content is None:
88+
if response.content is None or response.status_code == HTTPStatus.NO_CONTENT:
8989
return None
9090
if LOGGER.isEnabledFor(logging.DEBUG):
9191
LOGGER.debug(

pyatlan/client/role.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#!/usr/bin/env/python
2+
# Copyright 2022 Atlan Pte, Ltd
3+
# Copyright [2015-2021] The Apache Software Foundation
4+
#
5+
# Licensed to the Apache Software Foundation (ASF) under one
6+
# or more contributor license agreements. See the NOTICE file
7+
# distributed with this work for additional information
8+
# regarding copyright ownership. The ASF licenses this file
9+
# to you under the Apache License, Version 2.0 (the
10+
# "License"); you may not use this file except in compliance
11+
# with the License. You may obtain a copy of the License at
12+
#
13+
# http://www.apache.org/licenses/LICENSE-2.0
14+
#
15+
# Unless required by applicable law or agreed to in writing, software
16+
# distributed under the License is distributed on an "AS IS" BASIS,
17+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
# See the License for the specific language governing permissions and
19+
# limitations under the License.
20+
from pyatlan.client.atlan import AtlanClient
21+
from typing import Optional
22+
from pyatlan.model.role import (
23+
RoleResponse
24+
)
25+
from pyatlan.utils import (
26+
API,
27+
ADMIN_URI,
28+
HTTPMethod,
29+
HTTPStatus,
30+
)
31+
32+
33+
class RoleClient:
34+
ROLE_API = ADMIN_URI + "roles"
35+
36+
# Role APIs
37+
GET_ROLES = API(ROLE_API, HTTPMethod.GET, HTTPStatus.OK)
38+
39+
def __init__(self, client: AtlanClient):
40+
self.client = client
41+
42+
43+
def get_roles(self, limit: int, filter: Optional[str]=None, sort: Optional[str]=None, count: bool=True, offset: int=0) -> RoleResponse:
44+
if filter is None:
45+
filter = ""
46+
if sort is None:
47+
sort = ""
48+
query_params = {
49+
"filter": filter,
50+
"sort": sort,
51+
"count": count,
52+
"offset": offset,
53+
"limit": limit
54+
}
55+
raw_json = self.client.call_api(RoleClient.GET_ROLES.format_path_with_params(), query_params)
56+
response = RoleResponse(**raw_json)
57+
return response
58+
59+
60+
def get_all_roles(self) -> RoleResponse:
61+
"""
62+
Retrieve all roles defined in Atlan.
63+
"""
64+
raw_json = self.client.call_api(RoleClient.GET_ROLES.format_path_with_params())
65+
response = RoleResponse(**raw_json)
66+
return response

0 commit comments

Comments
 (0)