Skip to content

DAG Processor crashes on MySQL connection failure during import error recording #59166

@AmosG

Description

@AmosG

DAG Processor Crashes on MySQL Connection Failure During Import Error Recording

Description

The DAG processor crashes and enters restart loops when MySQL connection fails while recording DAG import errors to the database. This is due to missing session cleanup (session.rollback()) after caught exceptions, leaving the SQLAlchemy session in an invalid state.

Apache Airflow Version

Affected Version: 3.1.3 (likely affects 3.0.x and 3.1.x series)

Environment

  • Deployment: Kubernetes, Docker Compose
  • Database: MySQL 8.0
  • Executor: KubernetesExecutor, LocalExecutor

Problem Description

Expected Behavior

When a DAG has import errors (e.g., ModuleNotFoundError):

  1. Import error should be caught during parsing
  2. Error should be recorded to the database
  3. If database operation fails, it should be logged or retried
  4. DAG processor should continue processing other DAGs
  5. Import errors should eventually appear in the Airflow UI

Actual Behavior

When MySQL connection fails during import error recording:

  1. Import error is caught during parsing ✓
  2. Database operation fails (connection timeout, pool exhaustion, network issue) ✗
  3. Exception is caught but session is NOT rolled back
  4. session.flush() attempts to flush the invalid session ✗
  5. New exception is raised (OperationalError or PendingRollbackError) ✗
  6. DAG processor process crashes with exit code 1
  7. In production with restart policy: continuous restart loop

Production Impact

  • DAG processor restarted 1,259 times in 4 days (~13 restarts/hour)
  • Connection pool exhaustion
  • Cascading failures across Airflow components
  • Import errors not visible in UI
  • System instability

Root Cause

File: airflow-core/src/airflow/dag_processing/collection.py
Function: update_dag_parsing_results_in_db()
Lines: ~430-447

# Current problematic code:
try:
    _update_import_errors(...)
except Exception:
    log.exception("Error logging import errors!")
    # ❌ MISSING: session.rollback()

try:
    _update_dag_warnings(...)
except Exception:
    log.exception("Error logging DAG warnings.")
    # ❌ MISSING: session.rollback()

session.flush()  # ❌ NO ERROR HANDLING - crashes if session invalid

The Issue:

  1. MySQL connection fails during _update_import_errors()
  2. Exception is caught but session.rollback() is NOT called
  3. Session remains in invalid transaction state
  4. session.flush() attempts to flush invalid session
  5. New exception (OperationalError/PendingRollbackError) is raised
  6. This exception is NOT caught → process crashes

Proposed Solution

Add session.rollback() calls after caught exceptions and wrap session.flush() in error handling:

# Fixed code:
try:
    _update_import_errors(...)
except Exception:
    log.exception("Error logging import errors!")
    session.rollback()  # ✅ FIX: Clean up invalid session state

try:
    _update_dag_warnings(...)
except Exception:
    log.exception("Error logging DAG warnings.")
    session.rollback()  # ✅ FIX: Clean up invalid session state

try:
    session.flush()  # ✅ FIX: Wrapped in error handling
except Exception:
    log.exception("Error flushing session after parsing results")
    session.rollback()  # ✅ FIX: Don't crash - continue processing

Error Messages

Lost Connection (Transient)

sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) 
(2013, 'Lost connection to MySQL server during query')

Connection Refused (MySQL Down)

sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) 
(2003, "Can't connect to MySQL server on 'mysql'")

Invalid Session State (After Caught Exception)

sqlalchemy.exc.PendingRollbackError: 
Can't reconnect until invalid transaction is rolled back.
Please rollback() fully before proceeding

All these errors lead to crashes due to missing error handling in session.flush().

Workarounds (Temporary)

While these reduce frequency, they don't prevent crashes:

  • Increase connection pool settings
  • Reduce DAG processor resource limits
  • Increase liveness probe timeout
  • Monitor and alert on high restart counts

None of these are real solutions - the code needs proper session cleanup.

Why This is Important

This violates distributed systems best practices:

  • Network hiccups are normal, expected events in distributed systems
  • Database timeouts happen under load
  • Systems should retry transient failures automatically
  • Code must handle recoverable errors gracefully

Other Airflow components (Scheduler, Triggerer) handle database failures properly - only the DAG processor lacks proper error recovery for this code path.

Benefits of Fix

  1. Graceful degradation: DAG processor continues running during database issues
  2. Automatic recovery: Import errors saved when connection recovers
  3. No restart loops: Eliminates cascading failures
  4. Better resilience: Handles network hiccups and transient errors
  5. Production stability: Prevents connection pool exhaustion from restart storms

Additional Context

  • Reproduced in both Docker Compose and Kubernetes environments
  • 100% reproducible when DAGs have import errors and MySQL connection fails
  • Configuration tuning (pool size, timeouts) does not address the root cause
  • The fix is straightforward and consistent with error handling elsewhere in Airflow

Are you willing to submit PR?

Yes, I have already prepared the fix with comprehensive unit tests.

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corebackport-to-v3-1-testMark PR with this label to backport to v3-1-test branchkind:bugThis is a clearly a bug

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions