Skip to content

Commit bd5e405

Browse files
authored
Merge pull request #44 from atlanhq/create_lineage
Add create method to Process class
2 parents 612049f + 30e2a82 commit bd5e405

File tree

6 files changed

+442
-3
lines changed

6 files changed

+442
-3
lines changed

HISTORY.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 0.0.27 (May 8, 2023)
2+
3+
* Add a create method to Process
4+
15
## 0.0.26 (May 4, 2023)
26

37
* Add remove_terms method to AtlanClient

pyatlan/generator/templates/entity.jinja2

+91-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from __future__ import annotations
55

66
import sys
7+
from io import StringIO
8+
import hashlib
79
from datetime import datetime
810
from typing import Any, ClassVar, Dict, List, Optional, TypeVar
911

@@ -65,12 +67,14 @@ def validate_single_required_field(field_names: list[str], values: list[Any]):
6567
f"Only one of the following parameters are allowed: {', '.join(names)}"
6668
)
6769

68-
def validate_required_fields(field_names:list[str], values:list[Any]):
69-
for field_name, value in zip(field_names, values):
70+
def validate_required_fields(field_names: list[str], values: list[Any]):
71+
for field_name, value in zip(field_names, values):
7072
if value is None:
7173
raise ValueError(f"{field_name} is required")
7274
if isinstance(value, str) and not value.strip():
7375
raise ValueError(f"{field_name} cannot be blank")
76+
if isinstance(value, list) and len(value) == 0:
77+
raise ValueError(f"{field_name} cannot be an empty list")
7478
{%- macro gen_properties(attribute_defs, additional_names=[]) %}
7579
_convience_properties: ClassVar[list[str]] = [
7680
{%- for attribute_def in attribute_defs %}
@@ -455,6 +459,70 @@ class {{ entity_def.name }}({{super_classes[0]}} {%- if "Asset" in super_classes
455459
raise ValueError(
456460
"One of admin_user, admin_groups or admin_roles is required"
457461
)
462+
{%- elif entity_def.name == "Process" %}
463+
@staticmethod
464+
def generate_qualified_name(
465+
name: str,
466+
connection_qualified_name: str,
467+
inputs: list["Catalog"],
468+
outputs: list["Catalog"],
469+
parent: Optional["Process"] = None,
470+
process_id: Optional[str] = None,
471+
) -> str:
472+
def append_relationship(output: StringIO, relationship: Asset):
473+
if relationship.guid:
474+
output.write(relationship.guid)
475+
476+
def append_relationships(output: StringIO, relationships: list["Catalog"]):
477+
for catalog in relationships:
478+
append_relationship(output, catalog)
479+
480+
validate_required_fields(
481+
["name", "connection_qualified_name", "inputs", "outputs"],
482+
[name, connection_qualified_name, inputs, outputs],
483+
)
484+
if process_id and process_id.strip():
485+
return f"{connection_qualified_name}/{process_id}"
486+
buffer = StringIO()
487+
buffer.write(name)
488+
buffer.write(connection_qualified_name)
489+
if parent:
490+
append_relationship(buffer, parent)
491+
append_relationships(buffer, inputs)
492+
append_relationships(buffer, outputs)
493+
ret_value = hashlib.md5(
494+
buffer.getvalue().encode(), usedforsecurity=False
495+
).hexdigest()
496+
buffer.close()
497+
return ret_value
498+
499+
@classmethod
500+
def create(
501+
cls,
502+
name: str,
503+
connection_qualified_name: str,
504+
inputs: list["Catalog"],
505+
outputs: list["Catalog"],
506+
process_id: Optional[str] = None,
507+
parent: Optional[Process] = None,
508+
) -> Process.Attributes:
509+
qualified_name = Process.Attributes.generate_qualified_name(
510+
name=name,
511+
connection_qualified_name=connection_qualified_name,
512+
process_id=process_id,
513+
inputs=inputs,
514+
outputs=outputs,
515+
parent=parent,
516+
)
517+
connector_name = connection_qualified_name.split("/")[1]
518+
return Process.Attributes(
519+
name=name,
520+
qualified_name=qualified_name,
521+
connector_name=connector_name,
522+
connection_qualified_name=connection_qualified_name,
523+
inputs=inputs,
524+
outputs=outputs,
525+
)
458526
{%- elif entity_def.name == "Readme" %}
459527
@classmethod
460528
# @validate_arguments()
@@ -755,6 +823,27 @@ class {{ entity_def.name }}({{super_classes[0]}} {%- if "Asset" in super_classes
755823
raise ValueError(
756824
"One of admin_user, admin_groups or admin_roles is required"
757825
)
826+
{%- elif entity_def.name == "Process" %}
827+
@classmethod
828+
def create(
829+
cls,
830+
name: str,
831+
connection_qualified_name: str,
832+
inputs: list["Catalog"],
833+
outputs: list["Catalog"],
834+
process_id: Optional[str] = None,
835+
parent: Optional[Process] = None,
836+
) -> Process:
837+
return Process(
838+
attributes=Process.Attributes.create(
839+
name=name,
840+
connection_qualified_name=connection_qualified_name,
841+
process_id=process_id,
842+
inputs=inputs,
843+
outputs=outputs,
844+
parent=parent,
845+
)
846+
)
758847
{%- elif entity_def.name == "Database" %}
759848
@classmethod
760849
# @validate_arguments()

pyatlan/model/assets.py

+89
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
# Based on original code from https://github.com/apache/atlas (under Apache-2.0 license)
44
from __future__ import annotations
55

6+
import hashlib
67
import sys
78
from datetime import datetime
9+
from io import StringIO
810
from typing import Any, ClassVar, Dict, List, Optional, TypeVar
911
from urllib.parse import quote, unquote
1012

@@ -79,6 +81,8 @@ def validate_required_fields(field_names: list[str], values: list[Any]):
7981
raise ValueError(f"{field_name} is required")
8082
if isinstance(value, str) and not value.strip():
8183
raise ValueError(f"{field_name} cannot be blank")
84+
if isinstance(value, list) and len(value) == 0:
85+
raise ValueError(f"{field_name} cannot be an empty list")
8286

8387

8488
SelfAsset = TypeVar("SelfAsset", bound="Asset")
@@ -3081,12 +3085,97 @@ class Attributes(Asset.Attributes):
30813085
None, description="", alias="meanings"
30823086
) # relationship
30833087

3088+
@staticmethod
3089+
def generate_qualified_name(
3090+
name: str,
3091+
connection_qualified_name: str,
3092+
inputs: list["Catalog"],
3093+
outputs: list["Catalog"],
3094+
parent: Optional["Process"] = None,
3095+
process_id: Optional[str] = None,
3096+
) -> str:
3097+
def append_relationship(output: StringIO, relationship: Asset):
3098+
if relationship.guid:
3099+
output.write(relationship.guid)
3100+
3101+
def append_relationships(output: StringIO, relationships: list["Catalog"]):
3102+
for catalog in relationships:
3103+
append_relationship(output, catalog)
3104+
3105+
validate_required_fields(
3106+
["name", "connection_qualified_name", "inputs", "outputs"],
3107+
[name, connection_qualified_name, inputs, outputs],
3108+
)
3109+
if process_id and process_id.strip():
3110+
return f"{connection_qualified_name}/{process_id}"
3111+
buffer = StringIO()
3112+
buffer.write(name)
3113+
buffer.write(connection_qualified_name)
3114+
if parent:
3115+
append_relationship(buffer, parent)
3116+
append_relationships(buffer, inputs)
3117+
append_relationships(buffer, outputs)
3118+
ret_value = hashlib.md5(
3119+
buffer.getvalue().encode(), usedforsecurity=False
3120+
).hexdigest()
3121+
buffer.close()
3122+
return ret_value
3123+
3124+
@classmethod
3125+
def create(
3126+
cls,
3127+
name: str,
3128+
connection_qualified_name: str,
3129+
inputs: list["Catalog"],
3130+
outputs: list["Catalog"],
3131+
process_id: Optional[str] = None,
3132+
parent: Optional[Process] = None,
3133+
) -> Process.Attributes:
3134+
qualified_name = Process.Attributes.generate_qualified_name(
3135+
name=name,
3136+
connection_qualified_name=connection_qualified_name,
3137+
process_id=process_id,
3138+
inputs=inputs,
3139+
outputs=outputs,
3140+
parent=parent,
3141+
)
3142+
connector_name = connection_qualified_name.split("/")[1]
3143+
return Process.Attributes(
3144+
name=name,
3145+
qualified_name=qualified_name,
3146+
connector_name=connector_name,
3147+
connection_qualified_name=connection_qualified_name,
3148+
inputs=inputs,
3149+
outputs=outputs,
3150+
)
3151+
30843152
attributes: "Process.Attributes" = Field(
30853153
None,
30863154
description="Map of attributes in the instance and their values. The specific keys of this map will vary by "
30873155
"type, so are described in the sub-types of this schema.\n",
30883156
)
30893157

3158+
@classmethod
3159+
def create(
3160+
cls,
3161+
name: str,
3162+
connection_qualified_name: str,
3163+
inputs: list["Catalog"],
3164+
outputs: list["Catalog"],
3165+
process_id: Optional[str] = None,
3166+
parent: Optional[Process] = None,
3167+
) -> Process:
3168+
return Process(
3169+
attributes=Process.Attributes.create(
3170+
name=name,
3171+
connection_qualified_name=connection_qualified_name,
3172+
process_id=process_id,
3173+
inputs=inputs,
3174+
outputs=outputs,
3175+
parent=parent,
3176+
)
3177+
)
3178+
30903179

30913180
class AtlasGlossaryCategory(Asset, type_name="AtlasGlossaryCategory"):
30923181
"""Description"""

pyatlan/version.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.0.26
1+
0.0.27

tests/integration/test_entity_model.py

+50
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import random
55
import string
6+
from typing import Callable, Generator
67

78
import pytest
89
import requests
@@ -18,7 +19,9 @@
1819
Column,
1920
Connection,
2021
Database,
22+
Process,
2123
Readme,
24+
S3Object,
2225
Schema,
2326
Table,
2427
View,
@@ -43,6 +46,7 @@
4346
"57f5463d-cc2a-4859-bf28-e4fa52002e8e",
4447
}
4548
TEMP_CONNECTION_GUID = "b3a5c49a-0c7c-4e66-8453-f4da8d9ce222"
49+
S3_CONNECTION_GUID = "25f2dc21-cd53-47fe-bbed-be10759d087a"
4650

4751

4852
@pytest.fixture(scope="module")
@@ -189,6 +193,7 @@ def cleanup(atlan_host, headers, atlan_api_key):
189193
"Connection",
190194
"View",
191195
"Column",
196+
"Process",
192197
"Readme",
193198
]
194199
for type_name in type_names:
@@ -825,3 +830,48 @@ def test_create_readme(client: AtlanClient):
825830
assert len(reaadmes) == 1
826831
assert (glossaries := response.assets_updated(asset_type=AtlasGlossary))
827832
assert len(glossaries) == 1
833+
834+
835+
@pytest.fixture()
836+
def make_s3_object(
837+
client: AtlanClient,
838+
) -> Generator[Callable[[str], S3Object], None, None]:
839+
created_guids = []
840+
connection = client.get_asset_by_guid(S3_CONNECTION_GUID, Connection)
841+
842+
def _make_s3_object(name: str) -> S3Object:
843+
s3_object = S3Object.create(
844+
connection_qualified_name=connection.qualified_name,
845+
name=name,
846+
aws_arn=f"arn:aws:s3:::{name}",
847+
)
848+
s3_object = client.upsert(s3_object).assets_created(S3Object)[0]
849+
created_guids.append(s3_object.guid)
850+
return s3_object
851+
852+
yield _make_s3_object
853+
854+
for guid in created_guids:
855+
client.purge_entity_by_guid(guid=guid)
856+
857+
858+
def test_process_create(client: AtlanClient, make_s3_object: Callable[[str], S3Object]):
859+
connection = client.get_asset_by_guid(S3_CONNECTION_GUID, Connection)
860+
861+
input_object = make_s3_object("Integration Source")
862+
863+
output_object = make_s3_object("Integration Target")
864+
865+
process = Process.create(
866+
name="Integration Process Test",
867+
connection_qualified_name=connection.qualified_name,
868+
process_id="doit",
869+
inputs=[input_object],
870+
outputs=[output_object],
871+
)
872+
873+
response = client.upsert(process)
874+
assert (processes := response.assets_created(Process))
875+
assert len(processes) == 1
876+
assert (assets := response.assets_updated(S3Object))
877+
assert len(assets) == 2

0 commit comments

Comments
 (0)