Skip to content

Commit 39f6e5e

Browse files
committed
using lineagex
1 parent 70aa297 commit 39f6e5e

File tree

2 files changed

+21
-19
lines changed

2 files changed

+21
-19
lines changed

lineage.py

+17-15
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
import json
33
from fal import FalDbt
44

5-
from column_lineage import ColumnLineage
6-
from utils import _preprocess_sql, _produce_json
5+
# from column_lineage import ColumnLineage
6+
from lineagex.ColumnLineage import ColumnLineage
7+
from utils import dbt_preprocess_sql, dbt_produce_json, dbt_find_column
78
from typing import List
8-
#from itertools import islice
9+
# from itertools import islice
910
# for key, value in islice(manifest['nodes'].items(), 3):
1011

1112

@@ -33,34 +34,35 @@ def _run_lineage(self) -> None:
3334
:return: the output_dict object will be the final output with each model name being key
3435
"""
3536
self.part_tables = self._get_part_tables()
36-
#key = 'model.mimic.age_histogram_test'
37-
#value = self.manifest['nodes'][key]
37+
# key = 'model.mimic.age_histogram_test'
38+
# value = self.manifest['nodes'][key]
3839
for key, value in self.manifest["nodes"].items():
39-
#for key, value in islice(self.manifest['nodes'].items(), 3):
40-
print(key)
40+
# for key, value in islice(self.manifest['nodes'].items(), 3):
41+
print(key, " completed")
4142
table_name = value["schema"] + "." + value["name"]
4243
self.output_dict[key] = {}
43-
ret_sql = _preprocess_sql(value)
44+
ret_sql = dbt_preprocess_sql(value)
4445
# self.output_dict[key]["sql"] = value["compiled_code"].replace('\n', '')
45-
#self.output_dict[key]["sql"] = ret_sql
46+
# self.output_dict[key]["sql"] = ret_sql
4647
ret_fal = self.faldbt.execute_sql(
4748
"EXPLAIN (VERBOSE TRUE, FORMAT JSON, COSTS FALSE) {}".format(ret_sql)
4849
)
4950
plan = json.loads(ret_fal.iloc[0]["QUERY PLAN"][1:-1])
50-
#col_names_new = self.table_cols_df[self.table_cols_df["table"] == table_name]
51-
#print(self.table_cols_df, col_names)
51+
# col_names_new = self.table_cols_df[self.table_cols_df["table"] == table_name]
52+
# print(self.table_cols_df, col_names)
53+
cols = dbt_find_column(table_name=table_name, engine=self.faldbt)
5254
col_lineage = ColumnLineage(
5355
plan=plan["Plan"],
5456
sql=ret_sql,
55-
table_name=table_name,
56-
faldbt=self.faldbt,
57+
columns=cols,
58+
conn=self.faldbt,
5759
part_tables=self.part_tables,
5860
)
5961
self.output_dict[key]["tables"] = col_lineage.table_list
6062
self.output_dict[key]["columns"] = col_lineage.column_dict
6163
self.output_dict[key]["table_name"] = table_name
62-
#self.output_dict[key]["plan"] = plan["Plan"]
63-
_produce_json(self.output_dict, self.faldbt)
64+
# self.output_dict[key]["plan"] = plan["Plan"]
65+
dbt_produce_json(self.output_dict, self.faldbt)
6466

6567
def _get_part_tables(self) -> dict:
6668
"""

utils.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def _remove_comments(str1: str = "") -> str:
2525
return str1
2626

2727

28-
def _preprocess_sql(node: dict = None) -> str:
28+
def dbt_preprocess_sql(node: dict = None) -> str:
2929
"""
3030
Process the sql, remove database name in the clause/datetime_add/datetime_sub adding quotes
3131
:param node: the node containing the original sql, file: file name for the sql
@@ -79,7 +79,7 @@ def _preprocess_sql(node: dict = None) -> str:
7979
return ret_sql
8080

8181

82-
def _find_column(table_name: str = "", engine: FalDbt = None) -> List:
82+
def dbt_find_column(table_name: str = "", engine: FalDbt = None) -> List:
8383
"""
8484
Find the columns for the base table in the database
8585
:param engine: the connection engine
@@ -100,7 +100,7 @@ def _find_column(table_name: str = "", engine: FalDbt = None) -> List:
100100
return list(cols_fal["col"])
101101

102102

103-
def _produce_json(output_dict: dict = None, engine: FalDbt = None) -> dict:
103+
def dbt_produce_json(output_dict: dict = None, engine: FalDbt = None) -> dict:
104104
table_to_model_dict = {}
105105
for key, val in output_dict.items():
106106
table_to_model_dict[val["table_name"]] = key
@@ -134,7 +134,7 @@ def _produce_json(output_dict: dict = None, engine: FalDbt = None) -> dict:
134134
base_table_dict[key] = {}
135135
base_table_dict[key]["tables"] = [""]
136136
base_table_dict[key]["columns"] = {}
137-
cols = _find_column(key, engine)
137+
cols = dbt_find_column(key, engine)
138138
for i in cols:
139139
base_table_dict[key]["columns"][i] = [""]
140140
base_table_dict[key]["table_name"] = str(key)

0 commit comments

Comments
 (0)