Skip to content

Commit 84a930f

Browse files
authored
feat: support tags in sqlmesh models (#20)
1 parent 8d36892 commit 84a930f

File tree

4 files changed

+24
-16
lines changed

4 files changed

+24
-16
lines changed

dagster_sqlmesh/controller/dagster.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def to_asset_outs(
3434
if not model:
3535
# If no model is returned this seems to be an asset dependency
3636
continue
37-
asset_out = translator.get_asset_key_from_model(
37+
asset_key = translator.get_asset_key_from_model(
3838
context,
3939
model,
4040
)
@@ -43,6 +43,8 @@ def to_asset_outs(
4343
for dep in deps
4444
]
4545
internal_asset_deps: t.Set[AssetKey] = set()
46+
asset_tags = translator.get_tags(context, model)
47+
4648
for dep in model_deps:
4749
if dep.model:
4850
internal_asset_deps.add(
@@ -55,7 +57,9 @@ def to_asset_outs(
5557
# create an external dep
5658
depsMap[table.name] = AssetDep(key)
5759
model_key = sqlmesh_model_name_to_key(model.name)
58-
output.outs[model_key] = AssetOut(key=asset_out, is_required=False)
60+
output.outs[model_key] = AssetOut(
61+
key=asset_key, tags=asset_tags, is_required=False
62+
)
5963
output.internal_asset_deps[model_key] = internal_asset_deps
6064

6165
output.deps = list(depsMap.values())

dagster_sqlmesh/translator.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import typing as t
12
import sqlglot
23
from sqlglot import exp
34
from sqlmesh.core.context import Context
@@ -9,27 +10,22 @@ class SQLMeshDagsterTranslator:
910
"""Translates sqlmesh objects for dagster"""
1011

1112
def get_asset_key_from_model(self, context: Context, model: Model) -> AssetKey:
13+
"""Given the sqlmesh context and a model return the asset key"""
1214
return AssetKey(model.view_name)
1315

1416
def get_asset_key_fqn(self, context: Context, fqn: str) -> AssetKey:
17+
"""Given the sqlmesh context and a fqn of a model return an asset key"""
1518
table = self.get_fqn_to_table(context, fqn)
1619
return AssetKey(table.name)
1720

1821
def get_fqn_to_table(self, context: Context, fqn: str) -> exp.Table:
19-
dialect = self.get_context_dialect(context)
22+
"""Given the sqlmesh context and a fqn return the table"""
23+
dialect = self._get_context_dialect(context)
2024
return sqlglot.to_table(fqn, dialect=dialect)
2125

22-
def get_context_dialect(self, context: Context) -> str:
26+
def _get_context_dialect(self, context: Context) -> str:
2327
return context.engine_adapter.dialect
2428

25-
# def get_asset_deps(
26-
# self, context: Context, model: Model, deps: List[SQLMeshModelDep]
27-
# ) -> List[AssetKey]:
28-
# asset_keys: List[AssetKey] = []
29-
# for dep in deps:
30-
# if dep.model:
31-
# asset_keys.append(AssetKey(dep.model.view_name))
32-
# else:
33-
# parsed_fqn = dep.parse_fqn()
34-
# asset_keys.append(AssetKey([parsed_fqn.view_name]))
35-
# return asset_keys
29+
def get_tags(self, context: Context, model: Model) -> t.Dict[str, str]:
30+
"""Given the sqlmesh context and a model return the tags for that model"""
31+
return {k: "true" for k in model.tags}

sample/sqlmesh_project/models/marts/full_model.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ MODEL (
44
cron '@daily',
55
grain item_id,
66
audits (assert_positive_order_ids),
7+
tags (
8+
"mart",
9+
"full",
10+
)
711
);
812

913
SELECT

sample/sqlmesh_project/models/staging/staging_model_1.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ MODEL (
55
),
66
start '2020-01-01',
77
cron '@daily',
8-
grain (id, event_date)
8+
grain (id, event_date),
9+
tags (
10+
"staging",
11+
"incremental"
12+
)
913
);
1014

1115
SELECT

0 commit comments

Comments
 (0)