|
31 | 31 | build_unique_component_name,
|
32 | 32 | )
|
33 | 33 | from reana_db.database import Session
|
34 |
| -from reana_db.models import Job, JobCache, Workflow, RunStatus |
| 34 | +from reana_db.models import Job, JobCache, Workflow, RunStatus, JobStatus |
35 | 35 | from sqlalchemy.exc import SQLAlchemyError
|
36 | 36 | from sqlalchemy.orm.attributes import flag_modified
|
37 | 37 |
|
@@ -118,11 +118,40 @@ def on_message(self, body, message):
|
118 | 118 | )
|
119 | 119 | elif workflow and workflow.status not in ALIVE_STATUSES:
|
120 | 120 | logging.warning(
|
121 |
| - f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received:\n" |
122 |
| - f"{body}\nIgnoring..." |
| 121 | + f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received." |
| 122 | + ) |
| 123 | + try: |
| 124 | + _delete_workflow_job(workflow) |
| 125 | + logging.info( |
| 126 | + f"Remove batch-pod for not alive {workflow.id_} workflow." |
| 127 | + ) |
| 128 | + except REANAWorkflowControllerError as exception: |
| 129 | + logging.error( |
| 130 | + f"Could not clean up not alive workflow {workflow.id_} batch pod for workflow." |
| 131 | + f" Error: {exception}" |
| 132 | + ) |
| 133 | + jobs = Session.query(Job).filter( |
| 134 | + Job.workflow_uuid == workflow_uuid, |
123 | 135 | )
|
| 136 | + for job in jobs: |
| 137 | + job_name = build_unique_component_name("run-job", job.id_) |
| 138 | + if job.status in [ |
| 139 | + JobStatus.running, |
| 140 | + JobStatus.queued, |
| 141 | + JobStatus.created, |
| 142 | + ]: |
| 143 | + try: |
| 144 | + current_k8s_batchv1_api_client.delete_namespaced_job( |
| 145 | + name=job_name, |
| 146 | + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, |
| 147 | + propagation_policy="Background", |
| 148 | + ) |
| 149 | + except ApiException as e: |
| 150 | + logging.error( |
| 151 | + f"run-job pod {job_name} for {workflow.id_} could not be deleted. Error: {e}" |
| 152 | + ) |
124 | 153 | else:
|
125 |
| - logging.warning( |
| 154 | + logging.error( |
126 | 155 | f"Event for workflow {workflow_uuid} that doesn't exist in DB received:\n"
|
127 | 156 | f"{body}\nIgnoring..."
|
128 | 157 | )
|
|
0 commit comments