Skip to content

Commit 8d36892

Browse files
authored
feat: enable subsetting behind flag (#19)
1 parent 35c5afe commit 8d36892

File tree

6 files changed

+56
-22
lines changed

6 files changed

+56
-22
lines changed

dagster_sqlmesh/asset.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ def sqlmesh_assets(
2525
op_tags: t.Optional[t.Mapping[str, t.Any]] = None,
2626
required_resource_keys: t.Optional[t.Set[str]] = None,
2727
retry_policy: t.Optional[RetryPolicy] = None,
28+
# For now we don't set this by default
29+
enabled_subsetting: bool = False,
2830
):
2931
controller = DagsterSQLMeshController.setup_with_config(config)
3032
if not dagster_sqlmesh_translator:
@@ -39,5 +41,6 @@ def sqlmesh_assets(
3941
op_tags=op_tags,
4042
compute_kind=compute_kind,
4143
retry_policy=retry_policy,
44+
can_subset=enabled_subsetting,
4245
required_resource_keys=required_resource_keys,
4346
)

dagster_sqlmesh/console.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ def __init__(self, log_override: t.Optional[logging.Logger] = None):
335335
self._handlers: Dict[str, ConsoleEventHandler] = {}
336336
self.logger = log_override or logger
337337
self.id = str(uuid.uuid4())
338+
self.logger.debug(f"EventConsole[{self.id}]: created")
338339
self.categorizer = None
339340

340341
def add_snapshot_categorizer(self, categorizer: SnapshotCategorizer):
@@ -459,7 +460,9 @@ def plan(
459460
no_diff: bool = False,
460461
no_prompts: bool = False,
461462
) -> None:
463+
self.logger.debug("building plan created")
462464
plan = plan_builder.build()
465+
self.logger.debug(f"plan created: {plan}")
463466

464467
for snapshot in plan.uncategorized:
465468
if self.categorizer:

dagster_sqlmesh/controller/base.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ def run_sqlmesh_thread(
164164
plan_options: PlanOptions,
165165
default_catalog: str,
166166
):
167+
logger.debug("dagster-sqlmesh: thread started")
167168
try:
168169
builder = t.cast(
169170
PlanBuilder,
@@ -180,13 +181,16 @@ def run_sqlmesh_thread(
180181
)
181182
except Exception as e:
182183
controller.console.exception(e)
184+
except: # noqa: E722
185+
controller.console.exception(Exception("Unknown error during plan"))
183186

184187
generator = ConsoleGenerator(self.logger)
185188

186189
if categorizer:
187190
self.console.add_snapshot_categorizer(categorizer)
188191

189192
with self.console_context(generator):
193+
self.logger.debug("starting sqlmesh plan thread")
190194
thread = threading.Thread(
191195
target=run_sqlmesh_thread,
192196
args=(
@@ -200,6 +204,7 @@ def run_sqlmesh_thread(
200204
)
201205
thread.start()
202206

207+
self.logger.debug("waiting for events")
203208
for event in generator.events(thread):
204209
match event:
205210
case ConsoleException(e):
@@ -243,6 +248,8 @@ def run_sqlmesh_thread(
243248
context.run(environment=environment, **run_options)
244249
except Exception as e:
245250
controller.console.exception(e)
251+
except: # noqa: E722
252+
controller.console.exception(Exception("Unknown error during plan"))
246253

247254
generator = ConsoleGenerator(self.logger)
248255
with self.console_context(generator):
@@ -277,8 +284,17 @@ def plan_and_run(
277284
run_options = run_options or {}
278285
plan_options = plan_options or {}
279286

280-
yield from self.plan(categorizer, default_catalog, **plan_options)
281-
yield from self.run(**run_options)
287+
try:
288+
self.logger.debug("starting sqlmesh plan")
289+
yield from self.plan(categorizer, default_catalog, **plan_options)
290+
self.logger.debug("starting sqlmesh run")
291+
yield from self.run(**run_options)
292+
except Exception as e:
293+
self.logger.error(f"Error during sqlmesh plan and run: {e}")
294+
raise e
295+
except:
296+
self.logger.error("Error during sqlmesh plan and run")
297+
raise
282298

283299
def models(self):
284300
return self.context.models

dagster_sqlmesh/events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def events(self, thread: threading.Thread) -> Iterator[console.ConsoleEvent]:
7373
while thread.is_alive() or not self._queue.empty():
7474
try:
7575
# Get arguments from the queue with a timeout
76-
args = self._queue.get(timeout=0.1)
76+
args = self._queue.get(timeout=0.5)
7777
yield args
7878
except queue.Empty:
7979
continue

dagster_sqlmesh/resource.py

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -120,22 +120,27 @@ def process_events(
120120
notify = self._tracker.notify_queue_next()
121121
while notify is not None:
122122
completed_name, update_status = notify
123+
124+
# If the model is not in the context, we can skip any notification
125+
# This will happen for external models
123126
if not sqlmesh_context.get_model(completed_name):
124127
notify = self._tracker.notify_queue_next()
125128
continue
126-
model = self._models_map[completed_name]
127-
output_key = sqlmesh_model_name_to_key(model.name)
128-
asset_key = self._context.asset_key_for_output(output_key)
129-
# asset_key = translator.get_asset_key_from_model(
130-
# controller.context, model
131-
# )
132-
yield MaterializeResult(
133-
asset_key=asset_key,
134-
metadata={
135-
"updated": update_status,
136-
"duration_ms": 0,
137-
},
138-
)
129+
130+
model = self._models_map.get(completed_name)
131+
132+
# We allow selecting models. That value is mapped to models_map.
133+
# If the model is not in models_map, we can skip any notification
134+
if model:
135+
output_key = sqlmesh_model_name_to_key(model.name)
136+
asset_key = self._context.asset_key_for_output(output_key)
137+
yield MaterializeResult(
138+
asset_key=asset_key,
139+
metadata={
140+
"updated": update_status,
141+
"duration_ms": 0,
142+
},
143+
)
139144
notify = self._tracker.notify_queue_next()
140145

141146
def report_event(self, event: console.ConsoleEvent):
@@ -189,14 +194,14 @@ def report_event(self, event: console.ConsoleEvent):
189194
raise Exception("sqlmesh failed during run")
190195
case console.LogError(message):
191196
log_context.error(
192-
message,
197+
f"sqlmesh reported an error: {message}",
193198
)
194199
case console.LogFailedModels(models):
195-
log_context.error(
196-
"\n".join(
200+
if len(models) != 0:
201+
failed_models = "\n".join(
197202
[f"{str(model)}\n{str(model.__cause__)}" for model in models]
198-
),
199-
)
203+
)
204+
log_context.error(f"sqlmesh failed models: {failed_models}")
200205
case _:
201206
log_context.debug("Received event")
202207

@@ -244,6 +249,8 @@ def run(
244249
dag = mesh.models_dag()
245250

246251
plan_options["select_models"] = []
252+
plan_options["backfill_models"] = []
253+
run_options["select_models"] = []
247254

248255
models = mesh.models()
249256
models_map = models.copy()
@@ -254,7 +261,12 @@ def run(
254261
sqlmesh_model_name_to_key(model.name)
255262
in context.selected_output_names
256263
):
264+
logger.info(f"selected model: {model.name}")
265+
257266
models_map[key] = model
267+
plan_options["select_models"].append(model.name)
268+
plan_options["backfill_models"].append(model.name)
269+
run_options["select_models"].append(model.name)
258270

259271
event_handler = DagsterSQLMeshEventHandler(
260272
context, models_map, dag, "sqlmesh: "

sample/dagster_project/definitions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def test_source() -> pl.DataFrame:
5151
)
5252

5353

54-
@sqlmesh_assets(environment="dev", config=sqlmesh_config)
54+
@sqlmesh_assets(environment="dev", config=sqlmesh_config, enabled_subsetting=True)
5555
def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
5656
yield from sqlmesh.run(context)
5757

0 commit comments

Comments
 (0)