Skip to content

Commit

Permalink
implemented executor to track jobs in run_workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Oct 30, 2024
1 parent 7f5edf3 commit 81f5abc
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 15 deletions.
47 changes: 32 additions & 15 deletions cgatcore/pipeline/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from cgatcore.experiment import get_header, MultiLineFormatter
from cgatcore.pipeline.utils import get_caller, get_caller_locals, is_test
from cgatcore.pipeline.execution import execute, start_session, \
close_session
close_session, Executor


# redirect os.stat and other OS utilities to cached versions to speed
Expand Down Expand Up @@ -1266,6 +1266,10 @@ def run_workflow(args, argv=None, pipeline=None):
logger.debug("starting run_workflow with action {}".format(
args.pipeline_action))

# Instantiate Executor to manage job tracking and cleanup
executor = Executor(job_threads=args.multiprocess, work_dir=get_params()["work_dir"])
executor.setup_signal_handlers() # Set up signal handlers for cleanup on interruption

if args.force_run:
if args.force_run == "all":
forcedtorun_tasks = ruffus.pipeline_get_task_names()
Expand Down Expand Up @@ -1358,20 +1362,33 @@ def run_workflow(args, argv=None, pipeline=None):

logger.info("current directory is {}".format(os.getcwd()))

ruffus.pipeline_run(
args.pipeline_targets,
forcedtorun_tasks=forcedtorun_tasks,
logger=logger,
verbose=args.loglevel,
log_exceptions=args.log_exceptions,
exceptions_terminate_immediately=args.exceptions_terminate_immediately,
checksum_level=args.ruffus_checksums_level,
pipeline=pipeline,
one_second_per_job=False,
**opts
)

close_session()
# Iterate over tasks and track each job's lifecycle
for task in stream.getvalue().splitlines():
job_info = {"task": task, "status": "running"}
executor.start_job(job_info) # Track job start

try:

ruffus.pipeline_run(
args.pipeline_targets,
forcedtorun_tasks=forcedtorun_tasks,
logger=logger,
verbose=args.loglevel,
log_exceptions=args.log_exceptions,
exceptions_terminate_immediately=args.exceptions_terminate_immediately,
checksum_level=args.ruffus_checksums_level,
pipeline=pipeline,
one_second_per_job=False,
**opts
)
executor.finish_job(job_info) # Mark job as finished
except Exception as e:
logger.error(f"Error in job {task}: {e}")
executor.cleanup_all_jobs()
raise
finally:
if not args.without_cluster:
close_session()

elif args.pipeline_action == "show":
ruffus.pipeline_printout(
Expand Down
86 changes: 86 additions & 0 deletions cgatcore/pipeline/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def __init__(self, **kwargs):
self.queue_manager = None
self.run_on_cluster = will_run_on_cluster(kwargs)
self.job_threads = kwargs.get("job_threads", 1)
self.active_jobs = [] # List to track active jobs

if "job_memory" in kwargs and "job_total_memory" in kwargs:
raise ValueError(
Expand Down Expand Up @@ -459,6 +460,8 @@ def __init__(self, **kwargs):
if self.monitor_interval_running is None:
self.monitor_interval_running = get_params()["cluster"].get(
'monitor_interval_running_default', GEVENT_TIMEOUT_WAIT)
# Set up signal handlers for clean-up on interruption
self.setup_signal_handlers()

def __enter__(self):
return self
Expand Down Expand Up @@ -735,6 +738,89 @@ def get_val(d, v, alt):

return benchmark_data

def start_job(self, job_info):
"""Add a job to active_jobs list when it starts."""
self.active_jobs.append(job_info)
self.logger.info(f"Job started: {job_info}")

def finish_job(self, job_info):
"""Remove a job from active_jobs list when it finishes."""
if job_info in self.active_jobs:
self.active_jobs.remove(job_info)
self.logger.info(f"Job completed: {job_info}")

def cleanup_all_jobs(self):
"""Clean up all remaining active jobs on interruption."""
self.logger.info("Cleaning up all job outputs due to pipeline interruption")
for job_info in self.active_jobs:
self.cleanup_failed_job(job_info)
self.active_jobs.clear() # Clear the list after cleanup

def setup_signal_handlers(self):
"""Set up signal handlers to clean up jobs on SIGINT and SIGTERM."""

def signal_handler(signum, frame):
self.logger.info(f"Received signal {signum}. Starting clean-up.")
self.cleanup_all_jobs()
exit(1)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

def cleanup_failed_job(self, job_info):
"""Clean up files generated by a failed job."""
if "outfile" in job_info:
outfiles = [job_info["outfile"]]
elif "outfiles" in job_info:
outfiles = job_info["outfiles"]
else:
self.logger.warning(f"No output files found for job {job_info.get('job_name', 'unknown')}")
return

for outfile in outfiles:
if os.path.exists(outfile):
try:
os.remove(outfile)
self.logger.info(f"Removed failed job output file: {outfile}")
except OSError as e:
self.logger.error(f"Error removing file {outfile}: {str(e)}")
else:
self.logger.info(f"Output file not found (already removed or not created): {outfile}")

def run(self, statement_list):
"""Run a list of statements and track each job's lifecycle."""
benchmark_data = []
for statement in statement_list:
job_info = {"statement": statement}
self.start_job(job_info) # Add job to active_jobs

try:
# Execute job
full_statement, job_path = self.build_job_script(statement)
process = subprocess.Popen(
full_statement, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()

if process.returncode != 0:
raise OSError(
f"Job failed with return code {process.returncode}.\n"
f"stderr: {stderr.decode('utf-8')}\nstatement: {statement}"
)

# Collect benchmark data if job was successful
benchmark_data.append(
self.collect_benchmark_data([statement], resource_usage=[{"job_id": process.pid}])
)
self.finish_job(job_info) # Remove job from active_jobs

except Exception as e:
self.logger.error(f"Job failed: {e}")
self.cleanup_failed_job(job_info)
continue

return benchmark_data


class GridExecutor(Executor):

Expand Down

0 comments on commit 81f5abc

Please sign in to comment.