Skip to content

Commit b89d02c

Browse files
authored
Further Improvements for sqlmesh as a lib tooling (#5)
* Further Improvements * fix tests * Fix type checks * docs and refactoring * fix tests
1 parent 111ab2b commit b89d02c

File tree

12 files changed

+1114
-790
lines changed

12 files changed

+1114
-790
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ from dagster_sqlmesh import sqlmesh_assets, SQLMeshContextConfig, SQLMeshResourc
2828

2929
sqlmesh_config = SQLMeshContextConfig(path="/home/foo/sqlmesh_project", gateway="name-of-your-gateway")
3030

31-
@sqlmesh_assets(config=sqlmesh_config)
31+
@sqlmesh_assets(environment="dev", config=sqlmesh_config)
3232
def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
3333
yield from sqlmesh.run(context)
3434

dagster_sqlmesh/asset.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
RetryPolicy,
77
)
88

9-
from dagster_sqlmesh.controller.dagster import DagsterSQLMeshController
9+
from dagster_sqlmesh.controller import DagsterSQLMeshController
1010
from dagster_sqlmesh.translator import SQLMeshDagsterTranslator
1111

1212
from .config import SQLMeshContextConfig
@@ -17,6 +17,7 @@
1717
# Define a SQLMesh Asset
1818
def sqlmesh_assets(
1919
*,
20+
environment: str,
2021
config: SQLMeshContextConfig,
2122
name: t.Optional[str] = None,
2223
dagster_sqlmesh_translator: t.Optional[SQLMeshDagsterTranslator] = None,
@@ -25,10 +26,10 @@ def sqlmesh_assets(
2526
required_resource_keys: t.Optional[t.Set[str]] = None,
2627
retry_policy: t.Optional[RetryPolicy] = None,
2728
):
28-
controller = DagsterSQLMeshController.setup(config)
29+
controller = DagsterSQLMeshController.setup_with_config(config)
2930
if not dagster_sqlmesh_translator:
3031
dagster_sqlmesh_translator = SQLMeshDagsterTranslator()
31-
conversion = controller.to_asset_outs(dagster_sqlmesh_translator)
32+
conversion = controller.to_asset_outs(environment, dagster_sqlmesh_translator)
3233

3334
return multi_asset(
3435
name=name,

dagster_sqlmesh/conftest.py

+22-4
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,16 @@ def sample_sqlmesh_project():
5151

5252
@dataclass
5353
class SQLMeshTestContext:
54+
"""A test context for running SQLMesh"""
55+
5456
db_path: str
5557
context_config: SQLMeshContextConfig
5658

5759
def create_controller(self, enable_debug_console: bool = False):
5860
console = None
5961
if enable_debug_console:
6062
console = get_console()
61-
return DagsterSQLMeshController.setup(
63+
return DagsterSQLMeshController.setup_with_config(
6264
self.context_config, debug_console=console
6365
)
6466

@@ -96,17 +98,33 @@ def append_to_test_source(self, df: polars.DataFrame):
9698
"""
9799
)
98100

99-
def run(
101+
def plan_and_run(
100102
self,
101103
*,
102104
environment: str,
103-
apply: bool = False,
104105
execution_time: t.Optional[TimeLike] = None,
105106
enable_debug_console: bool = False,
106107
start: t.Optional[TimeLike] = None,
107108
end: t.Optional[TimeLike] = None,
108109
restate_models: t.Optional[t.List[str]] = None,
109110
):
111+
"""Runs plan and run on SQLMesh with the given configuration and record all of the generated events.
112+
113+
Args:
114+
environment (str): The environment to run SQLMesh in.
115+
execution_time (TimeLike, optional): The execution timestamp for the run. Defaults to None.
116+
enable_debug_console (bool, optional): Flag to enable debug console. Defaults to False.
117+
start (TimeLike, optional): Start time for the run interval. Defaults to None.
118+
end (TimeLike, optional): End time for the run interval. Defaults to None.
119+
restate_models (List[str], optional): List of models to restate. Defaults to None.
120+
121+
Returns:
122+
None: The function records events to a debug console but doesn't return anything.
123+
124+
Note:
125+
TimeLike can be any time-like object that SQLMesh accepts (datetime, str, etc.).
126+
The function creates a controller and recorder to capture all SQLMesh events during execution.
127+
"""
110128
controller = self.create_controller(enable_debug_console=enable_debug_console)
111129
recorder = ConsoleRecorder()
112130
# controller.add_event_handler(ConsoleRecorder())
@@ -126,7 +144,7 @@ def run(
126144
plan_options["end"] = end
127145
run_options["end"] = end
128146

129-
for _context, event in controller.plan_and_run(
147+
for event in controller.plan_and_run(
130148
environment,
131149
plan_options=plan_options,
132150
run_options=run_options,

dagster_sqlmesh/console.py

+14
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,18 @@ class ConsoleException:
274274

275275

276276
class EventConsole(Console):
277+
"""
278+
A console implementation that manages and publishes events related to
279+
SQLMesh operations. The sqlmesh console implementation is mostly for it's
280+
CLI application and doesn't take into account using sqlmesh as a library.
281+
This event pub/sub interface allows us to capture events and choose how we
282+
wish to handle it with N number of handlers.
283+
284+
This class extends the Console class and provides functionality to handle
285+
various events during SQLMesh processes such as plan evaluation, creation,
286+
promotion, migration, and testing.
287+
"""
288+
277289
categorizer: t.Optional[SnapshotCategorizer] = None
278290

279291
def __init__(self, log_override: t.Optional[logging.Logger] = None):
@@ -475,6 +487,8 @@ def exception(self, exc: Exception):
475487

476488

477489
class DebugEventConsole(EventConsole):
490+
"""A console that wraps an existing console and logs all events to a logger"""
491+
478492
def __init__(self, console: Console):
479493
super().__init__()
480494
self._console = console
+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# ruff: noqa: F403 F401
2+
from .base import SQLMeshController, SQLMeshInstance, PlanOptions, RunOptions
3+
from .dagster import DagsterSQLMeshController

0 commit comments

Comments
 (0)