Skip to content

Commit

Permalink
handle both darwin and gnu time
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Oct 29, 2024
1 parent 8a87322 commit c885781
Showing 1 changed file with 31 additions and 46 deletions.
77 changes: 31 additions & 46 deletions cgatcore/pipeline/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import math
import shutil
import gevent
import shlex
import cgatcore.experiment as E
import cgatcore.iotools as iotools

Expand All @@ -43,6 +44,9 @@
HAS_DRMAA = False
import platform

# Define TIME_CMD based on gtime availability
TIME_CMD = shutil.which("gtime") or "time"

# global drmaa session
GLOBAL_SESSION = None

Expand Down Expand Up @@ -960,20 +964,16 @@ def _convert(key, v):
return JobInfo(jobId=process.pid, resourceUsage=data)

def run(self, statement_list):

benchmark_data = []
for statement in statement_list:
self.logger.info("running statement:\n%s" % statement)

full_statement, job_path = self.build_job_script(statement)

time_command = "gtime" if platform.system() == "Darwin" else "time"

# max_vmem is set to max_rss, not available by /usr/bin/time
# Use `shlex.quote()` to wrap `job_path` safely in the full statement
full_statement = (
f"\\{time_command} --output={job_path}.times "
f"-f '"
f"exit_status\t%x\n"
f"{shlex.quote(TIME_CMD)} --output={shlex.quote(job_path + '.times')} "
f"-f 'exit_status\t%x\n"
f"user_t\t%U\n"
f"sys_t\t%S\n"
f"wall_t\t%e\n"
Expand All @@ -995,57 +995,42 @@ def run(self, statement_list):
f"socket_sent\t%s\n"
f"major_page_fault\t%F\n"
f"unshared_data\t%D\n' "
f"{job_path}"
f"{shlex.quote(job_path)}"
)

while 1:
start_time = time.time()

os.environ.update(
{'BASH_ENV': os.path.join(os.environ['HOME'], '.bashrc')})
process = subprocess.Popen(
full_statement,
cwd=self.work_dir,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=os.environ.copy(),
close_fds=True,
executable="/bin/bash")

# process.stdin.close()
stdout, stderr = process.communicate()

end_time = time.time()

if process.returncode == 126:
self.logger.warn(
"repeating execution: message={}".format(stderr))
time.sleep(1)
continue

break
stdout = stdout.decode("utf-8")
stderr = stderr.decode("utf-8")
start_time = time.time()
os.environ.update({'BASH_ENV': os.path.join(os.environ['HOME'], '.bashrc')})
process = subprocess.Popen(
full_statement,
cwd=self.work_dir,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=os.environ.copy(),
close_fds=True,
executable="/bin/bash"
)

stdout, stderr = process.communicate()
end_time = time.time()

if process.returncode != 0 and not self.ignore_errors:
raise OSError(
"---------------------------------------\n"
"Child was terminated by signal %i: \n"
"The stderr was: \n%s\n%s\n"
"-----------------------------------------" %
(-process.returncode, stderr, statement))
(-process.returncode, stderr.decode("utf-8"), statement)
)

resource_usage = self.collect_metric_data(process,
start_time,
end_time,
time_data_file=job_path + ".times")
resource_usage = self.collect_metric_data(
process, start_time, end_time, time_data_file=job_path + ".times"
)

benchmark_data.extend(
self.collect_benchmark_data(
[statement],
resource_usage=[resource_usage]))
self.collect_benchmark_data([statement], resource_usage=[resource_usage])
)

try:
os.unlink(job_path)
Expand Down

0 comments on commit c885781

Please sign in to comment.