Skip to content

Commit 1461b22

Browse files
authored
fix: implement missing functions from abstract class (#13)
1 parent 8c3703b commit 1461b22

File tree

5 files changed

+220
-338
lines changed

5 files changed

+220
-338
lines changed

dagster_sqlmesh/console.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import unittest
66
import logging
77

8+
from sqlglot.expressions import Alter
89
from sqlmesh.core.console import Console
910
from sqlmesh.core.plan import EvaluatablePlan
1011
from sqlmesh.core.context_diff import ContextDiff
@@ -16,6 +17,7 @@
1617
SnapshotChangeCategory,
1718
SnapshotInfoLike,
1819
)
20+
from sqlmesh.utils.concurrency import NodeExecutionFailedError
1921

2022
logger = logging.getLogger(__name__)
2123

@@ -193,11 +195,35 @@ class LogError:
193195
message: str
194196

195197

198+
@dataclass
199+
class LogWarning:
200+
message: str
201+
202+
196203
@dataclass
197204
class LogSuccess:
198205
message: str
199206

200207

208+
@dataclass
209+
class LogFailedModels:
210+
errors: t.List[NodeExecutionFailedError]
211+
212+
213+
@dataclass
214+
class LogSkippedModels:
215+
snapshot_names: t.Set[str]
216+
217+
218+
@dataclass
219+
class LogDestructiveChange:
220+
snapshot_name: str
221+
dropped_column_names: t.List[str]
222+
alter_expressions: t.List[Alter]
223+
dialect: str
224+
error: bool = True
225+
226+
201227
@dataclass
202228
class LoadingStart:
203229
message: t.Optional[str] = None
@@ -254,7 +280,11 @@ class ConsoleException:
254280
ShowSQL,
255281
LogStatusUpdate,
256282
LogError,
283+
LogWarning,
257284
LogSuccess,
285+
LogFailedModels,
286+
LogSkippedModels,
287+
LogDestructiveChange,
258288
LoadingStart,
259289
LoadingStop,
260290
ShowSchemaDiff,
@@ -444,9 +474,32 @@ def log_status_update(self, message: str) -> None:
444474
def log_error(self, message: str) -> None:
445475
self.publish(LogError(message))
446476

477+
def log_warning(self, message):
478+
self.publish(LogWarning(message))
479+
447480
def log_success(self, message: str) -> None:
448481
self.publish(LogSuccess(message))
449482

483+
def log_failed_models(self, errors):
484+
self.publish(LogFailedModels(errors))
485+
486+
def log_skipped_models(self, snapshot_names):
487+
self.publish(LogSkippedModels(snapshot_names))
488+
489+
def log_destructive_change(
490+
self,
491+
snapshot_name,
492+
dropped_column_names,
493+
alter_expressions,
494+
dialect,
495+
error=True,
496+
):
497+
self.publish(
498+
LogDestructiveChange(
499+
snapshot_name, dropped_column_names, alter_expressions, dialect, error
500+
)
501+
)
502+
450503
def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
451504
event_id = uuid.uuid4()
452505
self.publish(LoadingStart(message, event_id))

dagster_sqlmesh/controller/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from sqlmesh.core.context import Context
99
from sqlmesh.core.plan import PlanBuilder
1010
from sqlmesh.core.config import CategorizerConfig
11-
from sqlmesh.core.console import Console
11+
from sqlmesh.core.console import Console, set_console
1212
from sqlmesh.core.model import Model
1313

1414
from ..events import ConsoleGenerator
@@ -378,10 +378,10 @@ def _create_context(self):
378378
options: t.Dict[str, t.Any] = dict(
379379
paths=self.config.path,
380380
gateway=self.config.gateway,
381-
console=self.console,
382381
)
383382
if self.config.sqlmesh_config:
384383
options["config"] = self.config.sqlmesh_config
384+
set_console(self.console)
385385
return Context(**options)
386386

387387
@contextmanager

dagster_sqlmesh/scheduler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import typing as t
22

3-
from sqlmesh.core.scheduler import Scheduler
3+
from sqlmesh.core.scheduler import Scheduler, CompletionStatus
44

55

66
class DagsterSQLMeshScheduler(Scheduler):
@@ -11,7 +11,7 @@ def __init__(self, selected_snapshots: t.Optional[t.Set[str]], *args, **kwargs):
1111
super().__init__(*args, **kwargs)
1212
self._selected_snapshots: t.Set[str] = selected_snapshots or set()
1313

14-
def run(self, *args, **kwargs) -> bool:
14+
def run(self, *args, **kwargs) -> CompletionStatus:
1515
if len(self._selected_snapshots) > 0:
1616
kwargs["selected_snapshots"] = self._selected_snapshots
1717
return super().run(*args, **kwargs)

0 commit comments

Comments
 (0)