1
1
from collections import defaultdict
2
2
import json
3
+ import os
3
4
from pathlib import Path
4
- from typing import List , Dict , Tuple , Set
5
+ from typing import List , Dict , Tuple , Set , Optional
5
6
6
7
from packaging .version import parse as parse_version
7
8
12
13
logger = getLogger (__name__ )
13
14
14
15
15
- def import_dbt ():
16
+ def import_dbt_dependencies ():
16
17
try :
17
18
from dbt_artifacts_parser .parser import parse_run_results , parse_manifest
18
19
from dbt .config .renderer import ProfileRenderer
19
20
import yaml
20
21
except ImportError :
21
22
raise RuntimeError ("Could not import 'dbt' package. You can install it using: pip install 'data-diff[dbt]'." )
22
23
23
- return parse_run_results , parse_manifest , ProfileRenderer , yaml
24
+ # dbt 1.5+ specific stuff to power selection of models
25
+ try :
26
+ from dbt .cli .main import dbtRunner
27
+ except ImportError :
28
+ dbtRunner = None
29
+
30
+ if dbtRunner is not None :
31
+ dbt_runner = dbtRunner ()
32
+ else :
33
+ dbt_runner = None
34
+
35
+ return parse_run_results , parse_manifest , ProfileRenderer , yaml , dbt_runner
24
36
25
37
26
38
RUN_RESULTS_PATH = "target/run_results.json"
27
39
MANIFEST_PATH = "target/manifest.json"
28
40
PROJECT_FILE = "dbt_project.yml"
29
41
PROFILES_FILE = "profiles.yml"
30
42
LOWER_DBT_V = "1.0.0"
31
- UPPER_DBT_V = "1.4.7 "
43
+ UPPER_DBT_V = "1.6.0 "
32
44
33
45
34
46
# https://github.com/dbt-labs/dbt-core/blob/c952d44ec5c2506995fbad75320acbae49125d3d/core/dbt/cli/resolvers.py#L6
@@ -49,7 +61,13 @@ def legacy_profiles_dir() -> Path:
49
61
50
62
class DbtParser :
51
63
def __init__ (self , profiles_dir_override : str , project_dir_override : str ) -> None :
52
- self .parse_run_results , self .parse_manifest , self .ProfileRenderer , self .yaml = import_dbt ()
64
+ (
65
+ self .parse_run_results ,
66
+ self .parse_manifest ,
67
+ self .ProfileRenderer ,
68
+ self .yaml ,
69
+ self .dbt_runner ,
70
+ ) = import_dbt_dependencies ()
53
71
self .profiles_dir = Path (profiles_dir_override or default_profiles_dir ())
54
72
self .project_dir = Path (project_dir_override or default_project_dir ())
55
73
self .connection = None
@@ -68,7 +86,60 @@ def get_datadiff_variables(self) -> dict:
68
86
vars = get_from_dict_with_raise (self .project_dict , "vars" , error_message )
69
87
return get_from_dict_with_raise (vars , "data_diff" , error_message )
70
88
71
- def get_models (self ):
89
+ def get_models (self , dbt_selection : Optional [str ] = None ):
90
+ dbt_version = parse_version (self .dbt_version )
91
+ if dbt_selection :
92
+ if (dbt_version .major , dbt_version .minor ) >= (1 , 5 ):
93
+ if self .dbt_runner :
94
+ return self .get_dbt_selection_models (dbt_selection )
95
+ # edge case if running data-diff from a separate env than dbt (likely local development)
96
+ else :
97
+ raise Exception (
98
+ "data-diff is using a dbt-core version < 1.5, update the environment's dbt-core version via pip install 'dbt-core>=1.5' in order to use `--select`"
99
+ )
100
+ else :
101
+ raise Exception (
102
+ f"Use of the `--select` feature requires dbt >= 1.5. Found dbt manifest: v{ dbt_version } "
103
+ )
104
+ else :
105
+ return self .get_run_results_models ()
106
+
107
+ def get_dbt_selection_models (self , dbt_selection : str ) -> List [str ]:
108
+ # log level and format settings needed to prevent dbt from printing to stdout
109
+ # ls command is used to get the list of model unique_ids
110
+ results = self .dbt_runner .invoke (
111
+ [
112
+ "--log-format" ,
113
+ "json" ,
114
+ "--log-level" ,
115
+ "none" ,
116
+ "ls" ,
117
+ "--select" ,
118
+ dbt_selection ,
119
+ "--resource-type" ,
120
+ "model" ,
121
+ "--output" ,
122
+ "json" ,
123
+ "--output-keys" ,
124
+ "unique_id" ,
125
+ "--project-dir" ,
126
+ self .project_dir ,
127
+ ]
128
+ )
129
+ if results .success and results .result :
130
+ model_list = [json .loads (model )["unique_id" ] for model in results .result ]
131
+ models = [self .manifest_obj .nodes .get (x ) for x in model_list ]
132
+ return models
133
+ elif not results .result :
134
+ raise Exception (f"No dbt models found for `--select { dbt_selection } `" )
135
+ else :
136
+ if results .exception :
137
+ raise results .exception
138
+ else :
139
+ logger .debug (str (results ))
140
+ raise Exception ("Encountered an error while finding `--select` models" )
141
+
142
+ def get_run_results_models (self ):
72
143
with open (self .project_dir / RUN_RESULTS_PATH ) as run_results :
73
144
logger .info (f"Parsing file { RUN_RESULTS_PATH } " )
74
145
run_results_dict = json .load (run_results )
@@ -80,11 +151,11 @@ def get_models(self):
80
151
self .profiles_dir = legacy_profiles_dir ()
81
152
82
153
if dbt_version < parse_version (LOWER_DBT_V ):
83
- raise Exception (
84
- f"Found dbt: v{ dbt_version } Expected the dbt project's version to be >= { LOWER_DBT_V } "
85
- )
154
+ raise Exception (f"Found dbt: v{ dbt_version } Expected the dbt project's version to be >= { LOWER_DBT_V } " )
86
155
elif dbt_version >= parse_version (UPPER_DBT_V ):
87
- logger .warning (f"{ dbt_version } is a recent version of dbt and may not be fully tested with data-diff! \n Please report any issues to https://github.com/datafold/data-diff/issues" )
156
+ logger .warning (
157
+ f"{ dbt_version } is a recent version of dbt and may not be fully tested with data-diff! \n Please report any issues to https://github.com/datafold/data-diff/issues"
158
+ )
88
159
89
160
success_models = [x .unique_id for x in run_results_obj .results if x .status .name == "success" ]
90
161
models = [self .manifest_obj .nodes .get (x ) for x in success_models ]
0 commit comments