Skip to content

Commit c916bb8

Browse files
[td] Don’t raise errors when adding dbt files (mage-ai#4191)
* [td] Don’t raise errors when adding dbt files * fix * fix
1 parent 74d9e0f commit c916bb8

File tree

5 files changed

+185
-89
lines changed

5 files changed

+185
-89
lines changed

mage_ai/data_preparation/models/block/dbt/block_sql.py

+34-17
Original file line numberDiff line numberDiff line change
@@ -186,23 +186,40 @@ def upstream_dbt_blocks(self, read_only=False) -> List['DBTBlockSQL']:
186186
"""
187187
# Get upstream nodes via dbt list
188188
with Profiles(self.project_path, self.pipeline.variables) as profiles:
189-
args = [
190-
'list',
191-
# project-dir
192-
# /home/src/default_repo/default_platform/default_repo/dbt/demo
193-
'--project-dir', self.project_path,
194-
'--profiles-dir', str(profiles.profiles_dir),
195-
'--select', '+' + Path(self.__get_original_file_path()).stem,
196-
'--output', 'json',
197-
'--output-keys', 'unique_id original_file_path depends_on',
198-
'--resource-type', 'model',
199-
'--resource-type', 'snapshot'
200-
]
201-
res, _success = DBTCli(args).invoke()
202-
if res:
203-
nodes_init = [simplejson.loads(node) for node in res]
204-
else:
205-
return []
189+
try:
190+
args = [
191+
'list',
192+
# project-dir
193+
# /home/src/default_repo/default_platform/default_repo/dbt/demo
194+
'--project-dir', self.project_path,
195+
'--profiles-dir', str(profiles.profiles_dir),
196+
'--select', '+' + Path(self.__get_original_file_path()).stem,
197+
'--output', 'json',
198+
'--output-keys', 'unique_id original_file_path depends_on',
199+
'--resource-type', 'model',
200+
'--resource-type', 'snapshot'
201+
]
202+
res, _success = DBTCli(args).invoke()
203+
204+
if res:
205+
nodes_init = [simplejson.loads(node) for node in res]
206+
else:
207+
return []
208+
except Exception as err:
209+
print(f'[ERROR] DBTBlockSQL.upstream_dbt_blocks: {err}.')
210+
return [
211+
self.build_dbt_block(
212+
block_class=DBTBlock,
213+
block_dict=dict(
214+
block_type=self.type,
215+
configuration=self.configuration,
216+
language=self.language,
217+
name=self.uuid,
218+
pipeline=self.pipeline,
219+
uuid=self.uuid,
220+
),
221+
)
222+
]
206223

207224
# transform List into dict and remove unnecessary fields
208225
file_path = self.__get_original_file_path()

mage_ai/data_preparation/models/block/dbt/profiles.py

+12-11
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,19 @@ def interpolate(self) -> Union[str, os.PathLike]:
135135

136136
# write interpolated profiles.yml
137137
interpoalted_profiles_full_path = interpolated_profiles_dir / PROFILES_FILE_NAME
138-
try:
139-
with interpoalted_profiles_full_path.open('w') as f:
140-
yaml.safe_dump(self.profiles, f)
141-
except Exception as e:
142-
raise ProfilesError(
143-
f'Failed to write interpolated `{PROFILES_FILE_NAME}`' +
144-
f' to `{interpolated_profiles_dir}`: {e}'
145-
)
138+
if os.path.exists(interpoalted_profiles_full_path):
139+
try:
140+
with interpoalted_profiles_full_path.open('w') as f:
141+
yaml.safe_dump(self.profiles, f)
142+
except Exception as e:
143+
raise ProfilesError(
144+
f'Failed to write interpolated `{PROFILES_FILE_NAME}`' +
145+
f' to `{interpolated_profiles_dir}`: {e}'
146+
)
146147

147-
self.__interpolated_profiles_dir = str(interpolated_profiles_dir)
148-
self.is_interpolated = True
149-
return self.__interpolated_profiles_dir
148+
self.__interpolated_profiles_dir = str(interpolated_profiles_dir)
149+
self.is_interpolated = True
150+
return self.__interpolated_profiles_dir
150151

151152
def __del__(self) -> None:
152153
self.clean()

mage_ai/data_preparation/models/block/platform/mixins.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def build_dbt_block(
201201
self,
202202
block_class,
203203
block_dict,
204-
node: Dict,
204+
node: Dict = None,
205205
):
206206
block_type = block_dict['block_type']
207207
configuration = block_dict['configuration'] or {}
@@ -219,7 +219,7 @@ def build_dbt_block(
219219
path=remove_base_repo_directory_name(
220220
os.path.join(
221221
self.project_path,
222-
node['original_file_path'],
222+
(node.get('original_file_path') or '') if node else '',
223223
),
224224
),
225225
)

mage_ai/tests/data_preparation/models/block/dbt/test_dbt_adapter.py

+96-44
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import os
12
import shutil
23
from decimal import Decimal
34
from pathlib import Path
5+
from unittest.mock import patch
46

57
from dbt.include.starter_project import PACKAGE_PATH as starter_project_directory
68

@@ -58,55 +60,105 @@ def test_execute(self):
5860
Test the Project Interface by reading the original dbt_project.yml
5961
and return the dictionary.
6062
"""
61-
with DBTAdapter(str(self.project_dir)) as dbt_adapter:
62-
# create df
63-
import pandas as pd
64-
df = pd.DataFrame([[1, 'foo'], [2, 'bar']], columns=['id', 'text'])
65-
df_dict = df.to_dict(orient='list')
66-
67-
# create agate table from df
68-
from agate import Table
69-
table = Table(
70-
rows=list(map(list, zip(*[v for _, v in df_dict.items()]))),
71-
column_names=df_dict.keys()
72-
)
73-
74-
relation = dbt_adapter.get_relation(
75-
database='test',
76-
schema='main',
77-
identifier='test',
78-
)
79-
relation_context = dict(this=relation)
80-
81-
dbt_adapter.execute_macro(
82-
'reset_csv_table',
83-
context_overide=relation_context,
84-
model={'config': {}},
85-
old_relation=relation,
86-
full_refresh=True,
87-
agate_table=table
88-
)
89-
90-
dbt_adapter.execute_macro(
91-
'load_csv_rows',
92-
context_overide=relation_context,
93-
model={'config': {}},
94-
agate_table=table
95-
)
63+
value = 'mage'
9664

97-
_res, df = dbt_adapter.execute('select * from test.main.test', fetch=True)
65+
file_path = os.path.join(
66+
self.repo_path,
67+
'dbt_test_project',
68+
f'.mage_temp_profiles_{value}',
69+
'profiles.yml',
70+
)
9871

99-
self.assertEqual(
100-
df.to_dict(),
101-
{0: {'id': Decimal('1'), 'text': Decimal('2')}, 1: {'id': 'foo', 'text': 'bar'}}
102-
)
72+
os.makedirs(os.path.dirname(file_path), exist_ok=True)
73+
with open(file_path, 'w') as f:
74+
f.write('')
75+
76+
class MockUUID:
77+
def __str__(self):
78+
return value
79+
80+
@property
81+
def hex(self) -> str:
82+
return value
83+
84+
with patch(
85+
'mage_ai.data_preparation.models.block.dbt.profiles.uuid.uuid4',
86+
lambda: MockUUID(),
87+
):
88+
with DBTAdapter(str(self.project_dir)) as dbt_adapter:
89+
# create df
90+
import pandas as pd
91+
df = pd.DataFrame([[1, 'foo'], [2, 'bar']], columns=['id', 'text'])
92+
df_dict = df.to_dict(orient='list')
93+
94+
# create agate table from df
95+
from agate import Table
96+
table = Table(
97+
rows=list(map(list, zip(*[v for _, v in df_dict.items()]))),
98+
column_names=df_dict.keys()
99+
)
100+
101+
relation = dbt_adapter.get_relation(
102+
database='test',
103+
schema='main',
104+
identifier='test',
105+
)
106+
relation_context = dict(this=relation)
107+
108+
dbt_adapter.execute_macro(
109+
'reset_csv_table',
110+
context_overide=relation_context,
111+
model={'config': {}},
112+
old_relation=relation,
113+
full_refresh=True,
114+
agate_table=table
115+
)
116+
117+
dbt_adapter.execute_macro(
118+
'load_csv_rows',
119+
context_overide=relation_context,
120+
model={'config': {}},
121+
agate_table=table
122+
)
123+
124+
_res, df = dbt_adapter.execute('select * from test.main.test', fetch=True)
125+
126+
self.assertEqual(
127+
df.to_dict(),
128+
{0: {'id': Decimal('1'), 'text': Decimal('2')}, 1: {'id': 'foo', 'text': 'bar'}}
129+
)
103130

104131
def test_credentials(self):
105132
"""
106133
Test the Project Interface by reading the original dbt_project.yml
107134
and return the dictionary.
108135
"""
109-
with DBTAdapter(str(self.project_dir)) as dbt_adapter:
110-
credentials = dbt_adapter.credentials
111-
self.assertEqual(credentials.schema, 'main')
112-
self.assertEqual(credentials.database, 'test')
136+
value = 'mage'
137+
138+
file_path = os.path.join(
139+
self.repo_path,
140+
'dbt_test_project',
141+
f'.mage_temp_profiles_{value}',
142+
'profiles.yml',
143+
)
144+
145+
os.makedirs(os.path.dirname(file_path), exist_ok=True)
146+
with open(file_path, 'w') as f:
147+
f.write('')
148+
149+
class MockUUID:
150+
def __str__(self):
151+
return value
152+
153+
@property
154+
def hex(self) -> str:
155+
return value
156+
157+
with patch(
158+
'mage_ai.data_preparation.models.block.dbt.profiles.uuid.uuid4',
159+
lambda: MockUUID(),
160+
):
161+
with DBTAdapter(str(self.project_dir)) as dbt_adapter:
162+
credentials = dbt_adapter.credentials
163+
self.assertEqual(credentials.schema, 'main')
164+
self.assertEqual(credentials.database, 'test')

mage_ai/tests/data_preparation/models/block/dbt/test_profiles.py

+41-15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import os
12
from pathlib import Path
3+
from unittest.mock import patch
24

35
import yaml
46

@@ -58,25 +60,49 @@ def test_interpolate_clean(self):
5860
- checking the file contents
5961
- cleaning up the interpolated profiles.yml
6062
"""
61-
with Profiles(self.repo_path, self.variables) as profiles:
62-
self.assertNotEquals(
63-
profiles.profiles_dir,
64-
self.repo_path
65-
)
63+
value = 'mage'
6664

67-
interpolated_profiles_full_path = Path(profiles.profiles_dir) / 'profiles.yml'
68-
with interpolated_profiles_full_path.open('r') as f:
69-
interpolated_profiles = yaml.safe_load(f.read())
70-
self.assertEqual(
71-
self.interpolated_profiles,
72-
interpolated_profiles
73-
)
65+
file_path = os.path.join(
66+
self.repo_path,
67+
f'.mage_temp_profiles_{value}',
68+
'profiles.yml',
69+
)
70+
71+
os.makedirs(os.path.dirname(file_path), exist_ok=True)
72+
with open(file_path, 'w') as f:
73+
f.write('')
74+
75+
class MockUUID:
76+
def __str__(self):
77+
return value
78+
79+
@property
80+
def hex(self) -> str:
81+
return value
82+
83+
with patch(
84+
'mage_ai.data_preparation.models.block.dbt.profiles.uuid.uuid4',
85+
lambda: MockUUID(),
86+
):
87+
with Profiles(self.repo_path, self.variables) as profiles:
88+
self.assertNotEquals(
89+
profiles.profiles_dir,
90+
self.repo_path
91+
)
92+
93+
interpolated_profiles_full_path = Path(profiles.profiles_dir) / 'profiles.yml'
94+
with interpolated_profiles_full_path.open('r') as f:
95+
interpolated_profiles = yaml.safe_load(f.read())
96+
self.assertEqual(
97+
self.interpolated_profiles,
98+
interpolated_profiles
99+
)
74100

75-
self.assertTrue(interpolated_profiles_full_path.exists())
101+
self.assertTrue(interpolated_profiles_full_path.exists())
76102

77-
profiles.clean()
103+
profiles.clean()
78104

79-
self.assertFalse(interpolated_profiles_full_path.exists())
105+
self.assertFalse(interpolated_profiles_full_path.exists())
80106

81107
def test_profiles(self):
82108
profiles = Profiles(self.repo_path, self.variables)

0 commit comments

Comments
 (0)