23
23
ADD_BUSINESS_ATTRIBUTE_BY_ID ,
24
24
BULK_UPDATE ,
25
25
CREATE_TYPE_DEFS ,
26
+ UPDATE_TYPE_DEFS ,
26
27
DELETE_ENTITY_BY_ATTRIBUTE ,
27
28
DELETE_ENTITY_BY_GUID ,
28
29
DELETE_TYPE_DEF_BY_NAME ,
@@ -122,6 +123,61 @@ def get_session():
122
123
return session
123
124
124
125
126
+ def _build_typdef_request (typedef : TypeDef ) -> TypeDefResponse :
127
+ if isinstance (typedef , ClassificationDef ):
128
+ # Set up the request payload...
129
+ payload = TypeDefResponse (
130
+ classification_defs = [typedef ],
131
+ enum_defs = [],
132
+ struct_defs = [],
133
+ entity_defs = [],
134
+ relationship_defs = [],
135
+ custom_metadata_defs = [],
136
+ )
137
+ elif isinstance (typedef , CustomMetadataDef ):
138
+ # Set up the request payload...
139
+ payload = TypeDefResponse (
140
+ classification_defs = [],
141
+ enum_defs = [],
142
+ struct_defs = [],
143
+ entity_defs = [],
144
+ relationship_defs = [],
145
+ custom_metadata_defs = [typedef ],
146
+ )
147
+ elif isinstance (typedef , EnumDef ):
148
+ # Set up the request payload...
149
+ payload = TypeDefResponse (
150
+ classification_defs = [],
151
+ enum_defs = [typedef ],
152
+ struct_defs = [],
153
+ entity_defs = [],
154
+ relationship_defs = [],
155
+ custom_metadata_defs = [],
156
+ )
157
+ else :
158
+ raise InvalidRequestException (
159
+ "Unable to update type definitions of category: " + typedef .category .value ,
160
+ param = "category" ,
161
+ )
162
+ # Throw an invalid request exception
163
+ return payload
164
+
165
+
166
+ def _refresh_caches (typedef : TypeDef ) -> None :
167
+ if isinstance (typedef , ClassificationDef ):
168
+ from pyatlan .cache .classification_cache import ClassificationCache
169
+
170
+ ClassificationCache .refresh_cache ()
171
+ if isinstance (typedef , CustomMetadataDef ):
172
+ from pyatlan .cache .custom_metadata_cache import CustomMetadataCache
173
+
174
+ CustomMetadataCache .refresh_cache ()
175
+ if isinstance (typedef , EnumDef ):
176
+ from pyatlan .cache .enum_cache import EnumCache
177
+
178
+ EnumCache .refresh_cache ()
179
+
180
+
125
181
class AtlanClient (BaseSettings ):
126
182
_default_client : "ClassVar[Optional[AtlanClient]]" = None
127
183
base_url : HttpUrl
@@ -409,6 +465,44 @@ def upsert(
409
465
raw_json = self ._call_api (BULK_UPDATE , query_params , request )
410
466
return AssetMutationResponse (** raw_json )
411
467
468
+ def upsert_merging_cm (
469
+ self , entity : Union [Asset , list [Asset ]], replace_classifications : bool = False
470
+ ) -> AssetMutationResponse :
471
+ query_params = {
472
+ "replaceClassifications" : replace_classifications ,
473
+ "replaceBusinessAttributes" : True ,
474
+ "overwriteBusinessAttributes" : False ,
475
+ }
476
+ entities : list [Asset ] = []
477
+ if isinstance (entity , list ):
478
+ entities .extend (entity )
479
+ else :
480
+ entities .append (entity )
481
+ for asset in entities :
482
+ asset .validate_required ()
483
+ request = BulkRequest [Asset ](entities = entities )
484
+ raw_json = self ._call_api (BULK_UPDATE , query_params , request )
485
+ return AssetMutationResponse (** raw_json )
486
+
487
+ def upsert_replacing_cm (
488
+ self , entity : Union [Asset , list [Asset ]], replace_classifications : bool = False
489
+ ) -> AssetMutationResponse :
490
+ query_params = {
491
+ "replaceClassifications" : replace_classifications ,
492
+ "replaceBusinessAttributes" : True ,
493
+ "overwriteBusinessAttributes" : True ,
494
+ }
495
+ entities : list [Asset ] = []
496
+ if isinstance (entity , list ):
497
+ entities .extend (entity )
498
+ else :
499
+ entities .append (entity )
500
+ for asset in entities :
501
+ asset .validate_required ()
502
+ request = BulkRequest [Asset ](entities = entities )
503
+ raw_json = self ._call_api (BULK_UPDATE , query_params , request )
504
+ return AssetMutationResponse (** raw_json )
505
+
412
506
def purge_entity_by_guid (self , guid ) -> AssetMutationResponse :
413
507
raw_json = self ._call_api (
414
508
DELETE_ENTITY_BY_GUID .format_path_with_params (guid ),
@@ -455,54 +549,19 @@ def get_typedefs(self, type_category: AtlanTypeCategory) -> TypeDefResponse:
455
549
return TypeDefResponse (** raw_json )
456
550
457
551
def create_typedef (self , typedef : TypeDef ) -> TypeDefResponse :
458
- if isinstance (typedef , ClassificationDef ):
459
- # Set up the request payload...
460
- payload = TypeDefResponse (
461
- classification_defs = [typedef ],
462
- enum_defs = [],
463
- struct_defs = [],
464
- entity_defs = [],
465
- relationship_defs = [],
466
- custom_metadata_defs = [],
467
- )
468
- elif isinstance (typedef , CustomMetadataDef ):
469
- # Set up the request payload...
470
- payload = TypeDefResponse (
471
- classification_defs = [],
472
- enum_defs = [],
473
- struct_defs = [],
474
- entity_defs = [],
475
- relationship_defs = [],
476
- custom_metadata_defs = [typedef ],
477
- )
478
- elif isinstance (typedef , EnumDef ):
479
- # Set up the request payload...
480
- payload = TypeDefResponse (
481
- classification_defs = [],
482
- enum_defs = [typedef ],
483
- struct_defs = [],
484
- entity_defs = [],
485
- relationship_defs = [],
486
- custom_metadata_defs = [],
487
- )
488
- else :
489
- raise InvalidRequestException (
490
- "Unable to create new type definitions of category: "
491
- + typedef .category .value ,
492
- param = "category" ,
493
- )
494
- # Throw an invalid request exception
552
+ payload = _build_typdef_request (typedef )
495
553
raw_json = self ._call_api (
496
- CREATE_TYPE_DEFS , request_obj = payload , exclude_unset = False
554
+ CREATE_TYPE_DEFS , request_obj = payload , exclude_unset = True
497
555
)
498
- if isinstance (typedef , ClassificationDef ):
499
- from pyatlan .cache .classification_cache import ClassificationCache
500
-
501
- ClassificationCache .refresh_cache ()
502
- if isinstance (typedef , CustomMetadataDef ):
503
- from pyatlan .cache .custom_metadata_cache import CustomMetadataCache
556
+ _refresh_caches (typedef )
557
+ return TypeDefResponse (** raw_json )
504
558
505
- CustomMetadataCache .refresh_cache ()
559
+ def update_typedef (self , typedef : TypeDef ) -> TypeDefResponse :
560
+ payload = _build_typdef_request (typedef )
561
+ raw_json = self ._call_api (
562
+ UPDATE_TYPE_DEFS , request_obj = payload , exclude_unset = True
563
+ )
564
+ _refresh_caches (typedef )
506
565
return TypeDefResponse (** raw_json )
507
566
508
567
def purge_typedef (self , internal_name : str ) -> None :
@@ -511,9 +570,11 @@ def purge_typedef(self, internal_name: str) -> None:
511
570
# to refresh that particular cache
512
571
from pyatlan .cache .classification_cache import ClassificationCache
513
572
from pyatlan .cache .custom_metadata_cache import CustomMetadataCache
573
+ from pyatlan .cache .enum_cache import EnumCache
514
574
515
575
ClassificationCache .refresh_cache ()
516
576
CustomMetadataCache .refresh_cache ()
577
+ EnumCache .refresh_cache ()
517
578
518
579
@validate_arguments ()
519
580
def add_classifications (
@@ -628,8 +689,35 @@ def remove_announcement(
628
689
asset .remove_announcement ()
629
690
return self ._update_asset_by_attribute (asset , asset_type , qualified_name )
630
691
692
+ def update_custom_metadata_attributes (
693
+ self , guid : str , custom_metadata : CustomMetadata
694
+ ):
695
+ custom_metadata_request = CustomMetadataReqest (__root__ = custom_metadata )
696
+ self ._call_api (
697
+ ADD_BUSINESS_ATTRIBUTE_BY_ID .format_path (
698
+ {"entity_guid" : guid , "bm_id" : custom_metadata ._meta_data_type_id }
699
+ ),
700
+ None ,
701
+ custom_metadata_request ,
702
+ )
703
+
631
704
def replace_custom_metadata (self , guid : str , custom_metadata : CustomMetadata ):
632
- # TODO: This endpoint is not currently functioning correctly on the server
705
+ from pyatlan .cache .custom_metadata_cache import CustomMetadataCache
706
+
707
+ # Iterate through the custom metadata provided and explicitly set every
708
+ # single attribute, so that they are all serialized out (forcing removal
709
+ # of any empty ones)
710
+ for k , v in custom_metadata .items ():
711
+ # Need to translate the hashed-string key here back to an attribute name
712
+ attr_name = str (
713
+ CustomMetadataCache .get_attr_name_for_id (
714
+ set_id = custom_metadata ._meta_data_type_id , attr_id = k
715
+ )
716
+ )
717
+ if not v :
718
+ setattr (custom_metadata , attr_name , None )
719
+ else :
720
+ setattr (custom_metadata , attr_name , v )
633
721
custom_metadata_request = CustomMetadataReqest (__root__ = custom_metadata )
634
722
self ._call_api (
635
723
ADD_BUSINESS_ATTRIBUTE_BY_ID .format_path (
@@ -639,6 +727,24 @@ def replace_custom_metadata(self, guid: str, custom_metadata: CustomMetadata):
639
727
custom_metadata_request ,
640
728
)
641
729
730
+ def remove_custom_metadata (self , guid : str , cm_name : str ):
731
+ from pyatlan .cache .custom_metadata_cache import CustomMetadataCache
732
+
733
+ # Ensure the custom metadata exists first - let this throw an error if not
734
+ if cm_id := CustomMetadataCache .get_id_for_name (cm_name ):
735
+ # Initialize a dict of empty attributes for the custom metadata, and then
736
+ # send that so that they are removed accordingly
737
+ if cm_type := CustomMetadataCache .get_type_for_id (cm_id ):
738
+ custom_metadata = cm_type ()
739
+ custom_metadata_request = CustomMetadataReqest (__root__ = custom_metadata )
740
+ self ._call_api (
741
+ ADD_BUSINESS_ATTRIBUTE_BY_ID .format_path (
742
+ {"entity_guid" : guid , "bm_id" : cm_id }
743
+ ),
744
+ None ,
745
+ custom_metadata_request ,
746
+ )
747
+
642
748
@validate_arguments ()
643
749
def append_terms (
644
750
self ,
0 commit comments