Skip to content

Commit

Permalink
add error for logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Jan 17, 2025
1 parent 3b86927 commit 51c437f
Showing 1 changed file with 35 additions and 20 deletions.
55 changes: 35 additions & 20 deletions cgatcore/pipeline/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,40 @@ def get_default_partition(self):
self.default_partition = partition

def run(self, statement_list):
"""Run a list of statements on the cluster.
Parameters
----------
statement_list : list
List of commands to execute.
Returns
-------
list : Benchmark data for each job.
"""
benchmark_data = []

for statement in statement_list:
self.logger.info(f"Running statement on Slurm: {statement}")

full_statement, job_path = self.build_job_script(statement)
# Get job parameters from config
job_threads = str(self.config.get("job_threads", self.config.get("num_cpus", "1")))
job_memory = self.config.get("memory", "4G")
job_queue = self.config.get("queue", self.default_partition)
job_time = self.config.get("runtime", "01:00:00")

# Build complete sbatch command with all necessary parameters
# Build job script
full_statement, job_path = self.build_job_script(statement)

# Build sbatch command with explicit parameters
slurm_command = [
"sbatch",
"--parsable",
f"--partition={self.default_partition}",
f"--mem={self.config.get('memory', '4G')}",
f"--cpus-per-task={self.config.get('num_cpus', '1')}",
f"--time={self.config.get('runtime', '01:00:00')}",
f"--partition={job_queue}",
f"--mem={job_memory}",
f"--cpus-per-task={job_threads}",
f"--time={job_time}",
"--export=ALL",
f"--output={job_path}.out",
f"--error={job_path}.err",
job_path
]

Expand All @@ -173,7 +190,15 @@ def run(self, statement_list):
self.monitor_job_completion(job_id)
benchmark_data.append(self.collect_benchmark_data([statement], resource_usage=[]))
except Exception as e:
self.logger.error(f"Error monitoring job {job_id}: {str(e)}")
# Try to get error file content
try:
err_file = os.path.join(os.getcwd(), "slurm_jobs", f"{os.path.basename(job_path)}.err")
if os.path.exists(err_file):
with open(err_file, 'r') as f:
error_content = f.read()
self.logger.error(f"Job error output:\n{error_content}")
except Exception as err_e:
self.logger.error(f"Could not read error file: {str(err_e)}")
raise

return benchmark_data
Expand All @@ -189,29 +214,19 @@ def build_job_script(self, statement):
script_name = f"slurm_job_{timestamp}"
script_path = os.path.join(job_path, f"{script_name}.sh")

# Get SLURM parameters from config or use defaults
memory = self.config.get("memory", "4G")
queue = self.config.get("queue", self.default_partition)
num_cpus = str(self.config.get("job_threads", self.config.get("num_cpus", "1")))
runtime = self.config.get("runtime", "01:00:00")

# Create output/error file paths in job directory
out_file = os.path.join(job_path, f"{script_name}.out")
err_file = os.path.join(job_path, f"{script_name}.err")

# Write SLURM script with proper headers
with open(script_path, "w") as script_file:
script_file.write("#!/bin/bash\n")
script_file.write(f"#SBATCH --partition={queue}\n")
script_file.write(f"#SBATCH --mem={memory}\n")
script_file.write(f"#SBATCH --cpus-per-task={num_cpus}\n")
script_file.write(f"#SBATCH --time={runtime}\n")
script_file.write("#SBATCH --export=ALL\n")
script_file.write(f"#SBATCH --output={out_file}\n")
script_file.write(f"#SBATCH --error={err_file}\n\n")

# Load required modules
script_file.write("# Load required modules\n")
script_file.write("set -e\n") # Exit on error
script_file.write("module purge\n")
script_file.write("module load kallisto\n\n")

Expand Down

0 comments on commit 51c437f

Please sign in to comment.