Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'RuntimeTaskInstance' object has no attribute 'get_dagrun' #46411

Open
1 of 2 tasks
atul-astronomer opened this issue Feb 4, 2025 · 0 comments
Open
1 of 2 tasks

'RuntimeTaskInstance' object has no attribute 'get_dagrun' #46411

atul-astronomer opened this issue Feb 4, 2025 · 0 comments
Assignees
Labels
affected_version:3.0.0alpha For all 3.0.0 alpha releases area:core area:core-operators Operators, Sensors and hooks within Core Airflow kind:bug This is a clearly a bug

Comments

@atul-astronomer
Copy link

atul-astronomer commented Feb 4, 2025

Apache Airflow version

Tag: 3.0.0a1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The DAG has failed on the BranchSQLOperator stating TI object does not have attribute get_dagrun

Stack trace:

dag_id=sql_check_operator_run_id=manual__2025-02-04T05_54_46.258183+00_00_task_id=branch_sql_attempt=1.log

What you think should happen instead?

The BranchSQLOperator should work and excute the query

How to reproduce

The following DAG can be used to reproduce.
This dag requires a postgres connection to be configured before you run it

from datetime import date, timedelta

from airflow.models import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.common.sql.operators.sql import (
    SQLCheckOperator,
    SQLIntervalCheckOperator,
    SQLThresholdCheckOperator,
    SQLValueCheckOperator,
    SQLExecuteQueryOperator
)
from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.state import TaskInstanceState
from pendulum import today

from dags.plugins.elephantsql_kashin import conn_id as CONN_ID

from dags.plugins.airflow_dag_introspection import assert_the_task_states
from providers.common.sql.src.airflow.providers.common.sql.operators.sql import BranchSQLOperator

docs = """
####Info
This dag requires a postgres connection to be configured before you run it
####Purpose
This dag tests BranchSQLOperator, SQLCheckOperator, SQLIntervalCheckOperator, SQLThresholdCheckOperator and SQLValueCheckOperator
####Expected Behavior
This dag has 11 tasks 9 of which are expected to succeed and 2 tasks that are expected to be skipped.\n
This dag should pass.

"""

DATES = []
for i in range(6):
    DATES.append((date.today() - timedelta(days=i)).strftime("%Y-%m-%d"))

TABLE = "checktable"
DROP = f"DROP TABLE IF EXISTS {TABLE} CASCADE;"
CREATE = f"CREATE TABLE IF NOT EXISTS {TABLE}(state varchar, temp integer, date date)"
INSERT = f"""
    INSERT INTO {TABLE}(state, temp, date)
    VALUES ('Lagos', 23, '{DATES[4]}'),
        ('Enugu', 25, '{DATES[3]}'),
        ('Delta', 25, '{DATES[2]}'),
        ('California', 28, '{DATES[1]}'),
        ('Abuja', 25, '{DATES[0]}')
    """

SQLBOOL_QUERY = f"""
SELECT CAST(CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END AS BIT)
FROM {TABLE} WHERE temp = 30;
"""


def prepare_data():
    postgres = PostgresHook(CONN_ID)
    with postgres.get_conn() as conn:
        with conn.cursor() as cur:
            cur.execute(DROP)
            cur.execute(CREATE)
            cur.execute(INSERT)
        conn.commit()


with DAG(
    dag_id="sql_check_operator",
    default_args={"owner": "airflow", "start_date": today('UTC').add(days=-2)},
    schedule=None,
    tags=["core", "psql"],
    doc_md=docs,
) as dag:
    t1 = PythonOperator(task_id="prepare_table", python_callable=prepare_data)

    t2 = SQLCheckOperator(
        task_id="sql_check", sql=f"SELECT COUNT(*) FROM {TABLE}", conn_id=CONN_ID
    )
    t3 = SQLValueCheckOperator(
        task_id="sql_check_value",
        sql=f"SELECT COUNT(*) FROM {TABLE}",
        pass_value=5,
        conn_id=CONN_ID,
    )
    t4 = BranchSQLOperator(
        conn_id=CONN_ID,
        task_id="branch_sql",
        sql=SQLBOOL_QUERY,
        follow_task_ids_if_false="add_state",
        follow_task_ids_if_true="remove_state",
    )
    t5 = SQLExecuteQueryOperator(
        conn_id=CONN_ID,
        task_id="remove_state",
        sql=f"DELETE FROM {TABLE} WHERE name='Bob'",
    )

    d0 = EmptyOperator(
        task_id="dummy0"
    )

    t6 = SQLExecuteQueryOperator(
        conn_id=CONN_ID,
        task_id="add_state",
        sql=f"INSERT INTO {TABLE} (state, temp, date) VALUES ('Abia', 25, '{DATES[5]}')",
    )
    t7 = SQLThresholdCheckOperator(
        conn_id=CONN_ID,
        task_id="check_threshold",
        min_threshold=5,
        max_threshold=7,
        sql=f"SELECT COUNT(*) FROM {TABLE}",
    )
    t8 = SQLIntervalCheckOperator(
        conn_id=CONN_ID,
        task_id="interval_check",
        table=TABLE,
        days_back=3,
        date_filter_column="date",
        metrics_thresholds={"temp": 24},
    )

    t9 = SQLExecuteQueryOperator(
        conn_id=CONN_ID,
        task_id="drop_table_last",
        sql=f"DROP TABLE IF EXISTS {TABLE} CASCADE;",
    )

    t10 = PythonOperator(
        task_id="check_task_states",
        python_callable=assert_the_task_states,
        op_kwargs={"task_ids_and_assertions": {
            "prepare_table": TaskInstanceState.SUCCESS,
            "sql_check": TaskInstanceState.SUCCESS,
            "sql_check_value": TaskInstanceState.SUCCESS,
            "branch_sql": TaskInstanceState.SUCCESS,
            "add_state": TaskInstanceState.SUCCESS,
            "check_threshold": TaskInstanceState.SUCCESS,
            "interval_check": TaskInstanceState.SUCCESS,
            "drop_table_last": TaskInstanceState.SUCCESS,
            "remove_state": TaskInstanceState.SKIPPED,
            "dummy0": TaskInstanceState.SKIPPED,
            "check_threshold": TaskInstanceState.SUCCESS,

        }},
        trigger_rule=TriggerRule.ALL_DONE
    )

    t1 >> t2 >> t3 >> t4 >> [t6, t5]
    t6 >> t7 >> t8 >> t9 >> t10
    t5 >> d0

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@atul-astronomer atul-astronomer added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Feb 4, 2025
@dosubot dosubot bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Feb 4, 2025
@vatsrahul1001 vatsrahul1001 added affected_version:3.0.0alpha For all 3.0.0 alpha releases and removed needs-triage label for new issues that we didn't triage yet labels Feb 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:3.0.0alpha For all 3.0.0 alpha releases area:core area:core-operators Operators, Sensors and hooks within Core Airflow kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

3 participants