Skip to content

Commit c3e4af1

Browse files
authored
Merge pull request #479 from atlanhq/FT-653
FT-653: Provide helper to create OpenLineage connection
2 parents cd4c452 + 01e10e4 commit c3e4af1

File tree

3 files changed

+204
-2
lines changed

3 files changed

+204
-2
lines changed

pyatlan/client/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,12 @@
512512
HTTPStatus.OK,
513513
endpoint=EndPoint.HERACLES,
514514
)
515+
CREATE_OL_CREDENTIALS = API(
516+
CREDENTIALS_API,
517+
HTTPMethod.POST,
518+
HTTPStatus.OK,
519+
EndPoint.HERACLES,
520+
)
515521
AUDIT_API = "entity/auditSearch"
516522
AUDIT_SEARCH = API(AUDIT_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.ATLAS)
517523
SEARCH_LOG_API = "search/searchlog"

pyatlan/client/open_lineage.py

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
from http import HTTPStatus
2+
from typing import List, Optional
23

34
from pydantic.v1 import validate_arguments
45

6+
from pyatlan import utils
57
from pyatlan.client.common import ApiCaller
6-
from pyatlan.client.constants import OPEN_LINEAGE_SEND_EVENT_API
8+
from pyatlan.client.constants import CREATE_OL_CREDENTIALS, OPEN_LINEAGE_SEND_EVENT_API
79
from pyatlan.errors import AtlanError, ErrorCode
10+
from pyatlan.model.assets import Connection
11+
from pyatlan.model.credential import CredentialResponse
812
from pyatlan.model.enums import AtlanConnectorType
913
from pyatlan.model.open_lineage.event import OpenLineageEvent
14+
from pyatlan.model.response import AssetMutationResponse
1015

1116

1217
class OpenLineageClient:
@@ -21,6 +26,68 @@ def __init__(self, client: ApiCaller):
2126
)
2227
self._client = client
2328

29+
def _create_credential(self, connector_name: str) -> CredentialResponse:
30+
"""
31+
Creates an OpenLineage credential for the specified connector.
32+
33+
:param connector_name: of the connection that should be OpenLineage event
34+
:return: details of the created credential
35+
"""
36+
body = {
37+
"authType": "atlan_api_key",
38+
"name": f"default-{connector_name}-{int(utils.get_epoch_timestamp())}-0",
39+
"connectorConfigName": f"atlan-connectors-{connector_name}",
40+
"connector": f"{connector_name}",
41+
"connectorType": "event",
42+
"extra": {
43+
"events.enable-partial-assets": True,
44+
"events.enabled": True,
45+
"events.topic": f"openlineage_{connector_name}",
46+
"events.urlPath": f"/events/openlineage/{connector_name}/api/v1/lineage",
47+
},
48+
}
49+
raw_json = self._client._call_api(
50+
api=CREATE_OL_CREDENTIALS.format_path_with_params(),
51+
query_params={"testCredential": "true"},
52+
request_obj=body,
53+
)
54+
return CredentialResponse(**raw_json)
55+
56+
@validate_arguments
57+
def create_connection(
58+
self,
59+
name: str,
60+
connector_type: AtlanConnectorType = AtlanConnectorType.SPARK,
61+
admin_users: Optional[List[str]] = None,
62+
admin_roles: Optional[List[str]] = None,
63+
admin_groups: Optional[List[str]] = None,
64+
) -> AssetMutationResponse:
65+
"""
66+
Creates a connection for OpenLineage.
67+
68+
:param name: name for the new connection
69+
:param connector_type: for the new connection to be associated with
70+
:param admin_users: list of admin users to associate with this connection
71+
:param admin_roles: list of admin roles to associate with this connection
72+
:param admin_groups:list of admin groups to associate with this connection
73+
:return: details of the connection created
74+
"""
75+
from pyatlan.client.atlan import AtlanClient
76+
77+
client = AtlanClient.get_default_client()
78+
79+
response = self._create_credential(connector_name=connector_type.value)
80+
connection = Connection.creator(
81+
name=name,
82+
connector_type=connector_type,
83+
admin_users=admin_users,
84+
admin_groups=admin_groups,
85+
admin_roles=admin_roles,
86+
)
87+
88+
connection.default_credential_guid = response.id
89+
return client.asset.save(connection)
90+
2491
@validate_arguments
2592
def send(
2693
self, request: OpenLineageEvent, connector_type: AtlanConnectorType
@@ -37,7 +104,7 @@ def send(
37104
self._client._call_api(
38105
request_obj=request,
39106
api=OPEN_LINEAGE_SEND_EVENT_API.format_path(
40-
{"connector_type": connector_type}
107+
{"connector_type": connector_type.value}
41108
),
42109
text_response=True,
43110
)
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import time
2+
3+
import pytest
4+
5+
from pyatlan.cache.role_cache import RoleCache
6+
from pyatlan.client.atlan import AtlanClient
7+
from pyatlan.model.assets import Asset, Connection, Process, SparkJob
8+
from pyatlan.model.audit import AuditSearchRequest
9+
from pyatlan.model.enums import OpenLineageEventType
10+
from pyatlan.model.open_lineage.event import OpenLineageEvent
11+
from pyatlan.model.open_lineage.job import OpenLineageJob
12+
from pyatlan.model.open_lineage.run import OpenLineageRun
13+
from tests.integration.client import TestId, delete_asset
14+
15+
MODULE_NAME = TestId.make_unique("OL")
16+
17+
18+
@pytest.fixture(scope="module")
19+
def connection(client: AtlanClient):
20+
admin_role_guid = RoleCache.get_id_for_name("$admin")
21+
assert admin_role_guid
22+
response = client.open_lineage.create_connection(
23+
name=MODULE_NAME, admin_roles=[admin_role_guid]
24+
)
25+
result = response.assets_created(asset_type=Connection)[0]
26+
yield client.asset.get_by_guid(
27+
result.guid, asset_type=Connection, ignore_relationships=False
28+
)
29+
delete_asset(client, asset_type=Connection, guid=result.guid)
30+
31+
32+
def test_open_lineage_integration(connection: Connection, client: AtlanClient):
33+
34+
assert connection is not None
35+
assert connection.name == MODULE_NAME
36+
37+
namespace = "snowflake://abc123.snowflakecomputing.com"
38+
producer = "https://your.orchestrator/unique/id/123"
39+
job = OpenLineageJob.creator(
40+
connection_name=MODULE_NAME, job_name="dag_123", producer=producer
41+
)
42+
run = OpenLineageRun.creator(job=job)
43+
id = job.create_input(namespace=namespace, asset_name="OPS.DEFAULT.RUN_STATS")
44+
od = job.create_output(namespace=namespace, asset_name="OPS.DEFAULT.FULL_STATS")
45+
od.to_fields = [
46+
{
47+
"COLUMN": [
48+
id.from_field(field_name="COLUMN"),
49+
id.from_field(field_name="ONE"),
50+
id.from_field(field_name="TWO"),
51+
]
52+
},
53+
{
54+
"ANOTHER": [
55+
id.from_field(field_name="THREE"),
56+
]
57+
},
58+
]
59+
start = OpenLineageEvent.creator(run=run, event_type=OpenLineageEventType.START)
60+
start.inputs = [
61+
id,
62+
job.create_input(namespace=namespace, asset_name="SOME.OTHER.TBL"),
63+
job.create_input(namespace=namespace, asset_name="AN.OTHER.TBL"),
64+
]
65+
start.outputs = [
66+
od,
67+
job.create_output(namespace=namespace, asset_name="AN.OTHER.VIEW"),
68+
]
69+
start.emit()
70+
71+
complete = OpenLineageEvent.creator(
72+
run=run, event_type=OpenLineageEventType.COMPLETE
73+
)
74+
complete.emit()
75+
76+
assert job
77+
assert start.event_type == OpenLineageEventType.START
78+
assert complete.event_type == OpenLineageEventType.COMPLETE
79+
80+
# Awaiting the creation and storage of the Job asset in the backend
81+
time.sleep(30)
82+
83+
job_qualified_name = f"{connection.qualified_name}/{job.name}"
84+
85+
# Use the audit search, similar to UI calls,
86+
# to retrieve complete information (process, inputs, outputs) about Spark jobs
87+
results = client.audit.search(
88+
AuditSearchRequest.by_qualified_name(
89+
type_name=SparkJob.__name__,
90+
qualified_name=job_qualified_name,
91+
)
92+
)
93+
assert results and results.current_page() and len(results.current_page()) > 0
94+
job_asset = results.current_page()[0]
95+
assert (
96+
job_asset
97+
and job_asset.detail
98+
and isinstance(job_asset.detail, Asset)
99+
and job_asset.detail.relationship_attributes
100+
)
101+
assert job_asset.detail.name == job.name
102+
assert job_asset.detail.qualified_name == job_qualified_name
103+
104+
inputs = job_asset.detail.relationship_attributes.get("inputs")
105+
outputs = job_asset.detail.relationship_attributes.get("outputs")
106+
process = job_asset.detail.relationship_attributes.get("process")
107+
108+
assert inputs
109+
assert outputs
110+
assert process
111+
112+
input_qns = {
113+
input.get("uniqueAttributes", {}).get("qualifiedName") for input in inputs
114+
}
115+
assert f"{connection.qualified_name}/OPS/DEFAULT/RUN_STATS" in input_qns
116+
assert f"{connection.qualified_name}/SOME/OTHER/TBL" in input_qns
117+
assert f"{connection.qualified_name}/AN/OTHER/TBL" in input_qns
118+
119+
outputs_qns = {
120+
output.get("uniqueAttributes", {}).get("qualifiedName") for output in outputs
121+
}
122+
assert f"{connection.qualified_name}/OPS/DEFAULT/FULL_STATS" in outputs_qns
123+
assert f"{connection.qualified_name}/AN/OTHER/VIEW" in outputs_qns
124+
assert (
125+
process.get("uniqueAttributes", {}).get("qualifiedName")
126+
== f"{connection.qualified_name}/dag_123/process"
127+
)
128+
delete_asset(client, asset_type=Process, guid=process.get("guid"))
129+
delete_asset(client, asset_type=SparkJob, guid=job_asset.detail.guid)

0 commit comments

Comments
 (0)