-
Notifications
You must be signed in to change notification settings - Fork 16.1k
Description
DAG Processor Crashes on MySQL Connection Failure During Import Error Recording
Description
The DAG processor crashes and enters restart loops when MySQL connection fails while recording DAG import errors to the database. This is due to missing session cleanup (session.rollback()) after caught exceptions, leaving the SQLAlchemy session in an invalid state.
Apache Airflow Version
Affected Version: 3.1.3 (likely affects 3.0.x and 3.1.x series)
Environment
- Deployment: Kubernetes, Docker Compose
- Database: MySQL 8.0
- Executor: KubernetesExecutor, LocalExecutor
Problem Description
Expected Behavior
When a DAG has import errors (e.g., ModuleNotFoundError):
- Import error should be caught during parsing
- Error should be recorded to the database
- If database operation fails, it should be logged or retried
- DAG processor should continue processing other DAGs
- Import errors should eventually appear in the Airflow UI
Actual Behavior
When MySQL connection fails during import error recording:
- Import error is caught during parsing ✓
- Database operation fails (connection timeout, pool exhaustion, network issue) ✗
- Exception is caught but session is NOT rolled back ✗
session.flush()attempts to flush the invalid session ✗- New exception is raised (
OperationalErrororPendingRollbackError) ✗ - DAG processor process crashes with exit code 1 ✗
- In production with restart policy: continuous restart loop ✗
Production Impact
- DAG processor restarted 1,259 times in 4 days (~13 restarts/hour)
- Connection pool exhaustion
- Cascading failures across Airflow components
- Import errors not visible in UI
- System instability
Root Cause
File: airflow-core/src/airflow/dag_processing/collection.py
Function: update_dag_parsing_results_in_db()
Lines: ~430-447
# Current problematic code:
try:
_update_import_errors(...)
except Exception:
log.exception("Error logging import errors!")
# ❌ MISSING: session.rollback()
try:
_update_dag_warnings(...)
except Exception:
log.exception("Error logging DAG warnings.")
# ❌ MISSING: session.rollback()
session.flush() # ❌ NO ERROR HANDLING - crashes if session invalidThe Issue:
- MySQL connection fails during
_update_import_errors() - Exception is caught but
session.rollback()is NOT called - Session remains in invalid transaction state
session.flush()attempts to flush invalid session- New exception (
OperationalError/PendingRollbackError) is raised - This exception is NOT caught → process crashes
Proposed Solution
Add session.rollback() calls after caught exceptions and wrap session.flush() in error handling:
# Fixed code:
try:
_update_import_errors(...)
except Exception:
log.exception("Error logging import errors!")
session.rollback() # ✅ FIX: Clean up invalid session state
try:
_update_dag_warnings(...)
except Exception:
log.exception("Error logging DAG warnings.")
session.rollback() # ✅ FIX: Clean up invalid session state
try:
session.flush() # ✅ FIX: Wrapped in error handling
except Exception:
log.exception("Error flushing session after parsing results")
session.rollback() # ✅ FIX: Don't crash - continue processingError Messages
Lost Connection (Transient)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError)
(2013, 'Lost connection to MySQL server during query')
Connection Refused (MySQL Down)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError)
(2003, "Can't connect to MySQL server on 'mysql'")
Invalid Session State (After Caught Exception)
sqlalchemy.exc.PendingRollbackError:
Can't reconnect until invalid transaction is rolled back.
Please rollback() fully before proceeding
All these errors lead to crashes due to missing error handling in session.flush().
Workarounds (Temporary)
While these reduce frequency, they don't prevent crashes:
- Increase connection pool settings
- Reduce DAG processor resource limits
- Increase liveness probe timeout
- Monitor and alert on high restart counts
None of these are real solutions - the code needs proper session cleanup.
Why This is Important
This violates distributed systems best practices:
- Network hiccups are normal, expected events in distributed systems
- Database timeouts happen under load
- Systems should retry transient failures automatically
- Code must handle recoverable errors gracefully
Other Airflow components (Scheduler, Triggerer) handle database failures properly - only the DAG processor lacks proper error recovery for this code path.
Benefits of Fix
- Graceful degradation: DAG processor continues running during database issues
- Automatic recovery: Import errors saved when connection recovers
- No restart loops: Eliminates cascading failures
- Better resilience: Handles network hiccups and transient errors
- Production stability: Prevents connection pool exhaustion from restart storms
Additional Context
- Reproduced in both Docker Compose and Kubernetes environments
- 100% reproducible when DAGs have import errors and MySQL connection fails
- Configuration tuning (pool size, timeouts) does not address the root cause
- The fix is straightforward and consistent with error handling elsewhere in Airflow
Are you willing to submit PR?
Yes, I have already prepared the fix with comprehensive unit tests.