Skip to content

Watcher skip on producer failures#2430

Open
johnhoran wants to merge 66 commits intoastronomer:mainfrom
johnhoran:empty-gate
Open

Watcher skip on producer failures#2430
johnhoran wants to merge 66 commits intoastronomer:mainfrom
johnhoran:empty-gate

Conversation

@johnhoran
Copy link
Copy Markdown
Contributor

@johnhoran johnhoran commented Mar 4, 2026

Description

Ran into an issue where the watcher_producer task failed (due to pod preemption), the sensor tasks retried and ran successfully but then the dagrun failed because the watcher_producer had failed.

The solution I'm proposing is that the producer should handle failures by converting its exceptions into AirflowSkipException, the watcher sensors should treat these as failures and act accordingly. I've also added an empty operator, its purpose is to stop the skip state from propagating to any downstream tasks.

I also think the empty watcher gate should be downstream of all the DBT tasks, at least when depends_on_past is set to true, and that if depends_on_past is set to true for the tasks, then wait_for_downstream should also be true for the tasks that are upstream of the gate task. The logic here is so we can consider the task group as one atomic function. If wait_for_downstream isn't set, then its possible for one producer task to fail, and cause the sensors to retry, but allow another producer task to start running. In the current state I guess we are basically relying on the person on call to be aware of this possibility and not clear tasks to avoid it.

Related Issue(s)

closes #2429

Breaking Change?

Checklist

  • I have made corresponding changes to the documentation (if required)
  • I have added tests that prove my fix is effective or that my feature works

Copilot AI review requested due to automatic review settings March 10, 2026 10:28
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/_watcher/triggerer.py
Comment thread cosmos/operators/watcher.py Outdated
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 10, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.06%. Comparing base (39b9bfb) to head (a099ea0).
⚠️ Report is 9 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2430   +/-   ##
=======================================
  Coverage   98.05%   98.06%           
=======================================
  Files         103      103           
  Lines        7238     7269   +31     
=======================================
+ Hits         7097     7128   +31     
  Misses        141      141           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread tests/operators/test_watcher_kubernetes_unit.py Outdated
Comment thread cosmos/operators/_watcher/base.py
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread cosmos/operators/watcher_kubernetes.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented Mar 19, 2026

@johnhoran how do you feel this approach relates to the one in #2283?
I still have the hope we'll be able to find a good solution for running retries more gracefully in the producer itself (#1978 (comment))

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

cosmos/operators/_watcher/triggerer.py:223

  • WatcherTrigger.run() now treats a producer task state of skipped as a terminal failure, but _log_startup_events() (called before the polling loop) only exits when the producer is in failed/success. If the producer is skipped (either due to converted failures or retry-attempt skips), the trigger can loop indefinitely waiting for startup events that will never arrive. Update _log_startup_events() to treat skipped as terminal (and likely upstream_failed too if applicable), or otherwise short-circuit when the producer is not going to emit events.
    async def run(self) -> AsyncIterator[TriggerEvent]:
        logger.info("Starting WatcherTrigger for model: %s", self.model_unique_id)
        await self._log_startup_events()

        while True:
            producer_task_state = await self._get_producer_task_status()
            dbt_log_event = await self.get_xcom_val(f"{self.model_unique_id.replace('.', '__')}_dbt_event")
            _log_dbt_event(dbt_log_event)
            dbt_node_status, compiled_sql = await self._parse_dbt_node_status_and_compiled_sql()
            if is_dbt_node_status_success(dbt_node_status):
                logger.debug("dbt node '%s' succeeded", self.model_unique_id)
                event_data: dict[str, Any] = {"status": EventStatus.SUCCESS}
                if compiled_sql:
                    event_data["compiled_sql"] = compiled_sql
                yield TriggerEvent(event_data)  # type: ignore[no-untyped-call]
                return
            elif is_dbt_node_status_failed(dbt_node_status):
                logger.warning("dbt node '%s' failed", self.model_unique_id)
                event_data = {"status": EventStatus.FAILED, "reason": "model_failed"}
                if compiled_sql:
                    event_data["compiled_sql"] = compiled_sql
                yield TriggerEvent(event_data)  # type: ignore[no-untyped-call]
                return
            elif producer_task_state == "failed" or producer_task_state == "skipped":
                logger.error(
                    "Watcher producer task '%s' %s before delivering results for node '%s'",
                    self.producer_task_id,
                    producer_task_state,
                    self.model_unique_id,
                )
                yield TriggerEvent({"status": EventStatus.FAILED, "reason": "producer_failed"})  # type: ignore[no-untyped-call]
                return

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread cosmos/operators/_watcher/base.py
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (2)

cosmos/operators/_watcher/triggerer.py:223

  • WatcherTrigger._log_startup_events only exits when producer_task_state is in ("failed", "success"). With this PR converting producer exceptions into AirflowSkipException, the producer can now end up in state "skipped", which will cause deferrable watcher tasks to hang indefinitely in _log_startup_events before they ever reach the run() loop that handles "skipped". Please treat "skipped" as a terminal producer state in _log_startup_events as well (and any other similar stop conditions).
                    event_data["compiled_sql"] = compiled_sql
                yield TriggerEvent(event_data)  # type: ignore[no-untyped-call]
                return
            elif producer_task_state == "failed" or producer_task_state == "skipped":
                logger.error(
                    "Watcher producer task '%s' %s before delivering results for node '%s'",
                    self.producer_task_id,
                    producer_task_state,
                    self.model_unique_id,
                )
                yield TriggerEvent({"status": EventStatus.FAILED, "reason": "producer_failed"})  # type: ignore[no-untyped-call]
                return

cosmos/operators/watcher.py:357

  • DbtTestWatcherOperator.init is forcing depends_on_past based only on explicit kwargs/default_args passed to the operator. Since tasks are created via cosmos.core.airflow.get_airflow_task() (which does not pass default_args), this will override DAG-level default_args and incorrectly set depends_on_past=False even when the DAG/task_group default_args has depends_on_past=True. A safer approach is to not pass depends_on_past unless explicitly provided, or set wait_for_downstream after super().init based on self.depends_on_past so DAG defaults are respected.
    def __init__(self, *args: Any, **kwargs: Any):
        default_args = kwargs.get("default_args", {})
        desired_keys = ("dag", "task_group", "task_id")
        new_kwargs = {key: value for key, value in kwargs.items() if key in desired_keys}
        depends_on_past = kwargs.get("depends_on_past", False) or default_args.get("depends_on_past", False)
        super().__init__(depends_on_past=depends_on_past, wait_for_downstream=depends_on_past, **new_kwargs)  # type: ignore[no-untyped-call]


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 216 to 221
logger.error(
"Watcher producer task '%s' failed before delivering results for node '%s'",
"Watcher producer task '%s' %s before delivering results for node '%s'",
self.producer_task_id,
producer_task_state,
self.model_unique_id,
)
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error log message now reads "Watcher producer task '%s' %s before delivering results...", which is missing a verb and produces awkward output (e.g., "... skipped before ..."). Consider changing it to something like "was %s" or expanding the message so it remains grammatically correct for both states.

Copilot uses AI. Check for mistakes.
if self.poke_retry_number > 0:
raise AirflowException(
f"The dbt build command failed in producer task. Please check the log of task {self.producer_task_id} for details."
f"The dbt build command {producer_task_state} in the producer task. Please check the log of task {self.producer_task_id} for details."
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new exception message is ungrammatical for the skipped case: "The dbt build command skipped in the producer task...". Consider adding a verb (e.g., "was {state}" / "{state} before completing") so the message reads correctly for both "failed" and "skipped".

Suggested change
f"The dbt build command {producer_task_state} in the producer task. Please check the log of task {self.producer_task_id} for details."
f"The dbt build command was {producer_task_state} in the producer task. Please check the log of task {self.producer_task_id} for details."

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +239 to 246
elif producer_task_state == "failed" or producer_task_state == "skipped":
logger.error(
"Watcher producer task '%s' failed before delivering results for node '%s'",
"Watcher producer task '%s' %s before delivering results for node '%s'",
self.producer_task_id,
producer_task_state,
self.model_unique_id,
)
yield TriggerEvent({"status": EventStatus.FAILED, "reason": WatcherEventReason.PRODUCER_FAILED}) # type: ignore[no-untyped-call]
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In run(), producer_task_state == "skipped" is treated as a producer failure, but _log_startup_events() still only exits when the producer state is in (failed, success). If the producer gets skipped before emitting startup events (the new behavior), _log_startup_events() can loop indefinitely because node status will remain None. Update _log_startup_events() to also return when producer state is skipped (and potentially other terminal states) so deferrable sensors don’t hang.

Copilot uses AI. Check for mistakes.
Comment on lines +223 to +226
except Exception as e:
safe_xcom_push(task_instance=context["ti"], key="task_status", value="completed")
raise
self.log.exception("DbtProducerWatcherOperator execution failed")
raise AirflowSkipException("Skipping execution due to task failure") from e
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broad except Exception converts all exceptions from super().execute() into AirflowSkipException, including an AirflowSkipException intentionally raised by lower layers (e.g., skip_exit_code in DbtLocalBaseOperator). That will incorrectly log it as a failure and replace the original skip reason/message. Consider explicitly re-raising AirflowSkipException (and any other control-flow exceptions you want preserved) before the generic handler, similar to the Kubernetes variant.

Copilot uses AI. Check for mistakes.
Comment on lines +275 to +286
def test_dbt_producer_watcher_operator_logs_retry_message(caplog):
op = DbtProducerWatcherOperator(project_dir=".", profile_config=None)
ti = _MockTI()
ti.try_number = 2
ti.try_number = 1
context = {"ti": ti}

with patch("cosmos.operators.local.DbtLocalBaseOperator.execute") as mock_execute:
with patch("cosmos.operators.local.DbtLocalBaseOperator.execute", return_value="ok") as mock_execute:
with caplog.at_level(logging.INFO):
result = op.execute(context=context)
op.execute(context=context)

mock_execute.assert_called_once()
assert any("forces Airflow retries to 0" in message for message in caplog.messages)
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts the producer logs a message containing "forces Airflow retries to 0", but that string doesn’t appear anywhere in the implementation and the patched DbtLocalBaseOperator.execute bypasses any potential logging from the base class. As written, the assertion will fail consistently. Either add the corresponding log line in DbtProducerWatcherOperator.execute (if it’s a desired contract) or remove/adjust the expectation to assert behavior that actually occurs (e.g., that execute calls the base execute and returns).

Copilot uses AI. Check for mistakes.
@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented Apr 7, 2026

Hi @johnhoran - sorry for not following up after our chat on 19 March.

I’ve discussed this with the team, and there’s some hesitation about further modifying the DbtDag / DbtTaskGroup topology by introducing an additional “gateway” task.

In the meantime, we addressed the problematic default reported in #2429 with the fix in PR #2479.

I agree that having the watcher producer marked as skipped during retries - while still allowing its downstream tasks to run normally - would be a cleaner approach than marking it as success when it hasn’t actually done any work.

What do you think about marking the producer as skipped and configuring the downstream consumer tasks with trigger_rule=TriggerRule.ALL_DONE, so they continue to run even if the parent task is skipped?

@johnhoran
Copy link
Copy Markdown
Contributor Author

The consumer tasks aren't downstream of the producer one, otherwise they won't start until the producer task completes. And making them downstream would be unfortunate because then you'd be waiting until the producer task completes before you start working though the consumer tasks.

You could maybe have the very last consumers in the task group be downstream of the producer task and use NONE_FAILED trigger rule. I'm still hesitant about this for selfish reasons just because some of our task groups are very flat so essentially it's the problem above.

Other than that, I think it could work. My other concern was around the wait_for_downstream, but that should be okay.

@johnhoran
Copy link
Copy Markdown
Contributor Author

johnhoran commented Apr 8, 2026

Also, and I'm probably wrong about this. But I thought the reason why retries was set to zero was so you could have

elif producer_task_state == "failed":
logger.error(
"Watcher producer task '%s' failed before delivering results for node '%s'",
self.producer_task_id,
self.model_unique_id,
)
yield TriggerEvent({"status": EventStatus.FAILED, "reason": WatcherEventReason.PRODUCER_FAILED}) # type: ignore[no-untyped-call]
return
elif producer_task_state == "success" and dbt_node_status is None:
logger.info(
"The producer task '%s' succeeded. There is no information about the node '%s' execution.",
self.producer_task_id,
self.model_unique_id,
)
yield TriggerEvent({"status": EventStatus.SUCCESS, "reason": WatcherEventReason.NODE_NOT_RUN}) # type: ignore[no-untyped-call]
return

If the producer task fails, and then retries and gets marked as successful because its a retry and not doing anything, won't the consumer tasks also succeed? Or at least there might be a race condition where if a consumer is running while the producer fails then its okay, but if it is waiting on an upstream task, then by the time it gets its turn the producer has retried and succeeded.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Watcher pattern producer max retries forced to zero

3 participants