diff --git a/cgatcore/pipeline/executors.py b/cgatcore/pipeline/executors.py index 75c906e..818fd78 100644 --- a/cgatcore/pipeline/executors.py +++ b/cgatcore/pipeline/executors.py @@ -137,23 +137,40 @@ def get_default_partition(self): self.default_partition = partition def run(self, statement_list): + """Run a list of statements on the cluster. + + Parameters + ---------- + statement_list : list + List of commands to execute. + + Returns + ------- + list : Benchmark data for each job. + """ benchmark_data = [] + for statement in statement_list: self.logger.info(f"Running statement on Slurm: {statement}") - full_statement, job_path = self.build_job_script(statement) + # Get job parameters from config + job_threads = str(self.config.get("job_threads", self.config.get("num_cpus", "1"))) + job_memory = self.config.get("memory", "4G") + job_queue = self.config.get("queue", self.default_partition) + job_time = self.config.get("runtime", "01:00:00") - # Build complete sbatch command with all necessary parameters + # Build job script + full_statement, job_path = self.build_job_script(statement) + + # Build sbatch command with explicit parameters slurm_command = [ "sbatch", "--parsable", - f"--partition={self.default_partition}", - f"--mem={self.config.get('memory', '4G')}", - f"--cpus-per-task={self.config.get('num_cpus', '1')}", - f"--time={self.config.get('runtime', '01:00:00')}", + f"--partition={job_queue}", + f"--mem={job_memory}", + f"--cpus-per-task={job_threads}", + f"--time={job_time}", "--export=ALL", - f"--output={job_path}.out", - f"--error={job_path}.err", job_path ] @@ -173,7 +190,15 @@ def run(self, statement_list): self.monitor_job_completion(job_id) benchmark_data.append(self.collect_benchmark_data([statement], resource_usage=[])) except Exception as e: - self.logger.error(f"Error monitoring job {job_id}: {str(e)}") + # Try to get error file content + try: + err_file = os.path.join(os.getcwd(), "slurm_jobs", f"{os.path.basename(job_path)}.err") + if os.path.exists(err_file): + with open(err_file, 'r') as f: + error_content = f.read() + self.logger.error(f"Job error output:\n{error_content}") + except Exception as err_e: + self.logger.error(f"Could not read error file: {str(err_e)}") raise return benchmark_data @@ -189,12 +214,6 @@ def build_job_script(self, statement): script_name = f"slurm_job_{timestamp}" script_path = os.path.join(job_path, f"{script_name}.sh") - # Get SLURM parameters from config or use defaults - memory = self.config.get("memory", "4G") - queue = self.config.get("queue", self.default_partition) - num_cpus = str(self.config.get("job_threads", self.config.get("num_cpus", "1"))) - runtime = self.config.get("runtime", "01:00:00") - # Create output/error file paths in job directory out_file = os.path.join(job_path, f"{script_name}.out") err_file = os.path.join(job_path, f"{script_name}.err") @@ -202,16 +221,12 @@ def build_job_script(self, statement): # Write SLURM script with proper headers with open(script_path, "w") as script_file: script_file.write("#!/bin/bash\n") - script_file.write(f"#SBATCH --partition={queue}\n") - script_file.write(f"#SBATCH --mem={memory}\n") - script_file.write(f"#SBATCH --cpus-per-task={num_cpus}\n") - script_file.write(f"#SBATCH --time={runtime}\n") - script_file.write("#SBATCH --export=ALL\n") script_file.write(f"#SBATCH --output={out_file}\n") script_file.write(f"#SBATCH --error={err_file}\n\n") # Load required modules script_file.write("# Load required modules\n") + script_file.write("set -e\n") # Exit on error script_file.write("module purge\n") script_file.write("module load kallisto\n\n")