Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xcom pull is failing when is key is not provided #46417

Open
1 of 2 tasks
vatsrahul1001 opened this issue Feb 4, 2025 · 1 comment
Open
1 of 2 tasks

Xcom pull is failing when is key is not provided #46417

vatsrahul1001 opened this issue Feb 4, 2025 · 1 comment
Assignees
Labels
affected_version:3.0.0alpha For all 3.0.0 alpha releases area:core kind:bug This is a clearly a bug

Comments

@vatsrahul1001
Copy link
Collaborator

Apache Airflow version

3.0.0a1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

pulled_value_1 = ti.xcom_pull(key=None, task_ids="push") in failing in AF3, however, same code works with 2.10.4. Looks like when I provide the key it works fine. I am raising this issue as there is a deviation of behaviour from AF2 here. Maybe we can handle this in task-sdk

3.0.0a1

Image

2.10.4

Image

{"timestamp":"2025-02-04T11:01:20.886364","level":"error","event":"Task failed with exception","logger":"task","error_detail":[{"exc_type":"ValidationError","exc_value":"1 validation error for GetXCom\nkey\n Input should be a valid string [type=string_type, input_value=None, input_type=NoneType]\n For further information visit [https://errors.pydantic.dev/2.10/v/string_type","syntax_error":null,"is_cause":false,"frames":[](https://errors.pydantic.dev/2.10/v/string_type%22,%22syntax_error%22:null,%22is_cause%22:false,%22frames%22:[){"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":545,"name":"run"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":645,"name":"_execute_task"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":196,"name":"execute"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":222,"name":"execute_callable"},{"filename":"/opt/airflow/airflow/utils/operator_helpers.py","lineno":261,"name":"run"},{"filename":"/files/dags/example_xcom.py","lineno":34,"name":"puller"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":266,"name":"xcom_pull"},{"filename":"/usr/local/lib/python3.9/site-packages/pydantic/main.py","lineno":214,"name":"__init__"}]}]}

What you think should happen instead?

Same code working in AF2 should work in AF3 as well

How to reproduce

Use below DAG to replicate
DAG CODE

"""Example DAG demonstrating the usage of XComs."""
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    "example_xcom",
    start_date=datetime(2023, 11, 28),
    default_args={"owner": "airflow"},
    schedule="@daily",
    catchup=False,
    tags=["core"],
)

value_1 = [1, 2, 3]
value_2 = {"a": "b"}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    kwargs["ti"].xcom_push(key="value from pusher 1", value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs["ti"]

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids="push")
    if pulled_value_1 != value_1:
        raise ValueError(f"The two values differ {pulled_value_1} and {value_1}")

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids="push_by_returning")
    if pulled_value_2 != value_2:
        raise ValueError(f"The two values differ {pulled_value_2} and {value_2}")

    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(
        key=None, task_ids=["push", "push_by_returning"]
    )
    print(f"pulled_value_1 is {pulled_value_1}")
    print(f"pulled_value_2 is {pulled_value_2}")
    if pulled_value_1 != value_1:
        raise ValueError(f"The two values differ {pulled_value_1} and {value_1}")
    if pulled_value_2 != value_2:
        raise ValueError(f"The two values differ {pulled_value_2} and {value_2}")


push1 = PythonOperator(
    task_id="push",
    dag=dag,
    python_callable=push,
    depends_on_past=True,
)

push2 = PythonOperator(
    task_id="push_by_returning",
    dag=dag,
    python_callable=push_by_returning,
)

pull = PythonOperator(
    task_id="puller",
    dag=dag,
    python_callable=puller,
)

pull << [push1, push2]

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@vatsrahul1001 vatsrahul1001 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Feb 4, 2025
@vatsrahul1001 vatsrahul1001 added affected_version:3.0.0alpha For all 3.0.0 alpha releases and removed needs-triage label for new issues that we didn't triage yet labels Feb 4, 2025
@amoghrajesh
Copy link
Contributor

Nice observation. Will pick it up soon

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:3.0.0alpha For all 3.0.0 alpha releases area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

2 participants