Skip to content

Commit f5cee3d

Browse files
authored
Merge pull request #69 from fact-project/add_local_option_to_runlist
Add local_output option to process scripts
2 parents d173003 + eb25baf commit f5cee3d

File tree

5 files changed

+142
-36
lines changed

5 files changed

+142
-36
lines changed

erna/scripts/process_fact_data.py

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,52 @@
99
from gridmap import Job
1010

1111
import erna
12-
from erna import stream_runner
12+
from erna.utils import create_filename_from_format
13+
from erna import stream_runner as stream_runner_std
14+
from erna import stream_runner_local_output as stream_runner_local
15+
1316
import erna.datacheck_conditions as dcc
1417

1518
logger = logging.getLogger(__name__)
1619

1720

18-
def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime):
21+
def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime, output_path=None, filename_format="{basename}_{num}.json"):
1922
jobs = []
23+
24+
if output_path:
25+
logger.info("Using stream runner for local output")
26+
else:
27+
logger.debug("Using std stream runner gathering output from all nodes")
28+
2029
# create job objects
30+
2131
df_mapping["bunch_index"]= np.arange(len(df_mapping)) // num_runs_per_bunch
2232
for num, df in df_mapping.groupby("bunch_index"):
2333
df=df.copy()
2434
df["bunch_index"] = num
25-
job = Job(stream_runner.run, [jar, xml, df, aux_source_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem))
26-
jobs.append(job)
35+
36+
if output_path:
37+
# create the filenames for each single local run
38+
file_name, _ = path.splitext(path.basename(output_path))
39+
file_name = create_filename_from_format(filename_format, file_name, num)
40+
out_path = path.dirname(output_path)
41+
run = [jar, xml, df, path.join(out_path, file_name), aux_source_path]
42+
stream_runner = stream_runner_local
43+
else:
44+
run = [jar, xml, df, aux_source_path]
45+
stream_runner = stream_runner_std
46+
47+
jobs.append(
48+
Job(stream_runner.run,
49+
run,
50+
queue=queue,
51+
walltime=walltime,
52+
engine=engine,
53+
mem_free='{}mb'.format(vmem)
54+
)
55+
)
56+
avg_num_files = np.mean([len(part) for num, part in df_mapping.groupby("bunch_index")])
57+
logger.info("Created {} jobs with {} files each.".format(len(jobs), avg_num_files))
2758

2859
return jobs
2960

@@ -47,9 +78,20 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine,
4778
@click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g standard', default='standard')
4879
@click.option('--max_delta_t', default=30, help='Maximum time difference (minutes) allowed between drs and data files.', type=click.INT)
4980
@click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .')
81+
@click.option('--local_output', default=False, is_flag=True,
82+
help='Flag indicating whether jobs write their output localy'
83+
+ 'to disk without gathering everything in the mother'
84+
+ 'process. In this case the output file only contains a'
85+
+ 'summary oth the processed jobs. The data ouput will be'
86+
+ 'in separate files',
87+
show_default=True)
88+
@click.option('--local_output_format', default="{basename}_{num}.json", help="Give the file format for the local output funktionality."
89+
+ "%b will replace the out filename and %[1-9]n the given local number."
90+
+ "Default is: '{basename}_{num}.json'.Only works with option --local_output. ")
91+
@click.password_option(help='password to read from the always awesome RunDB')
5092
@click.option('--yes', help="Assume 'yes'if your asked to continue processing and start jobs", default=False, is_flag=True)
5193
@click.password_option(help='password to read from the always awesome RunDB')
52-
def main(earliest_night, latest_night, data_dir, jar, xml, aux_source, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, yes, password):
94+
def main(earliest_night, latest_night, data_dir, jar, xml, aux_source, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, local_output, local_output_format, yes, password):
5395

5496
level=logging.INFO
5597
if log_level is 'DEBUG':
@@ -74,19 +116,31 @@ def main(earliest_night, latest_night, data_dir, jar, xml, aux_source, out, queu
74116
factdb = sqlalchemy.create_engine("mysql+pymysql://factread:{}@129.194.168.95/factdata".format(password))
75117
data_conditions=dcc.conditions[conditions]
76118
df_runs = erna.load(earliest_night, latest_night, data_dir, source_name=source, timedelta_in_minutes=max_delta_t, factdb=factdb, data_conditions=data_conditions)
77-
119+
78120
# check for missing data and fix possible wrong file extension (.fz->.gz)
79121
df = erna.test_data_path(df_runs, "data_path")
122+
80123
df_runs = df[df['data_file_exists']]
81124
df_runs_missing = df[np.logical_not(df['data_file_exists'])]
82-
83-
logger.warn("Missing {} dataruns due to missing datafiles".format(len(df_runs_missing)))
84125

126+
logger.warn("Missing {} dataruns due to missing datafiles".format(len(df_runs_missing)))
85127
logger.info("Would process {} jobs with {} runs per job".format(len(df_runs)//num_runs, num_runs))
86128
if not yes:
87129
click.confirm('Do you want to continue processing and start jobs?', abort=True)
88130

89-
job_list = make_jobs(jarpath, xmlpath, aux_source_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime)
131+
if local_output:
132+
job_list = make_jobs(jarpath, xmlpath, aux_source_path,
133+
output_directory, df_runs, engine, queue,
134+
vmem, num_runs, walltime,
135+
output_path=local_output_dir,
136+
filename_format=local_output_format
137+
)
138+
else:
139+
job_list = make_jobs(jarpath, xmlpath, aux_source_path,
140+
output_directory, df_runs, engine, queue,
141+
vmem, num_runs, walltime
142+
)
143+
90144
job_outputs = gridmap.process_jobs(job_list, max_processes=len(job_list), local=local)
91145
erna.collect_output(job_outputs, out, df_runs)
92146

erna/scripts/process_fact_mc.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from os import path
66

77
import erna
8+
from erna.utils import create_filename_from_format
89
from erna import stream_runner as stream_runner_std
910
from erna import stream_runner_local_output as stream_runner_local
1011

@@ -15,33 +16,22 @@
1516

1617
logger = logging.getLogger(__name__)
1718

18-
import re
19-
20-
def create_filename_from_format(filename_format, basename, num):
21-
"""
22-
Given a special format string, create a filename_format with the basename and a given number.
23-
There are two named variables that can be used, one is basename which inserts the basename
24-
and the second one is num which is mandatory.
25-
"""
26-
m = re.search('\{num', filename_format)
27-
if not m:
28-
raise ValueError("Missing named placeholder 'num' in format string")
29-
return filename_format.format({"basename":basename, "num":num})
30-
31-
3219
def make_jobs(jar, xml, data_paths, drs_paths,
3320
engine, queue, vmem, num_jobs, walltime, output_path=None, filename_format="{basename}_{num}.json"):
3421
jobs = []
3522

3623
data_partitions = np.array_split(data_paths, num_jobs)
3724
drs_partitions = np.array_split(drs_paths, num_jobs)
3825
if output_path:
39-
logger.info("Using stream runner für local output")
26+
logger.info("Using stream runner for local output")
4027
else:
4128
logger.debug("Using std stream runner gathering output from all nodes")
4229

4330
for num, (data, drs) in enumerate(zip(data_partitions, drs_partitions)):
4431
df = pd.DataFrame({'data_path': data, 'drs_path': drs})
32+
df=df.copy()
33+
df["bunch_index"] = num
34+
4535
if output_path:
4636
# create the filenames for each single local run
4737
file_name, _ = path.splitext(path.basename(output_path))

erna/scripts/process_fact_run_list.py

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
from os import path
66
import os
77
import erna
8-
from erna import stream_runner
8+
from erna.utils import create_filename_from_format
9+
from erna import stream_runner as stream_runner_std
10+
from erna import stream_runner_local_output as stream_runner_local
911
import gridmap
1012
from gridmap import Job
1113

@@ -14,18 +16,44 @@
1416

1517

1618
def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, queue,
17-
vmem, num_jobs, walltime):
19+
vmem, num_jobs, walltime, output_path=None, filename_format="{basename}_{num}.json"):
1820
jobs = []
21+
22+
if output_path:
23+
logger.info("Using stream runner for local output")
24+
else:
25+
logger.debug("Using std stream runner gathering output from all nodes")
26+
1927
# create job objects
2028
split_indices = np.array_split(np.arange(len(df_mapping)), num_jobs)
2129
for num, indices in enumerate(split_indices):
2230
df = df_mapping[indices.min(): indices.max()]
23-
24-
job = Job(stream_runner.run, [jar, xml, df, aux_source_path],
25-
queue=queue, walltime=walltime, engine=engine,
26-
mem_free='{}mb'.format(vmem))
27-
jobs.append(job)
28-
31+
df=df.copy()
32+
df["bunch_index"] = num
33+
34+
if output_path:
35+
# create the filenames for each single local run
36+
file_name, _ = path.splitext(path.basename(output_path))
37+
file_name = create_filename_from_format(filename_format, file_name, num)
38+
out_path = path.dirname(output_path)
39+
run = [jar, xml, df, path.join(out_path, file_name), aux_source_path]
40+
stream_runner = stream_runner_local
41+
else:
42+
run = [jar, xml, df, aux_source_path]
43+
stream_runner = stream_runner_std
44+
45+
jobs.append(
46+
Job(stream_runner.run,
47+
run,
48+
queue=queue,
49+
walltime=walltime,
50+
engine=engine,
51+
mem_free='{}mb'.format(vmem)
52+
)
53+
)
54+
55+
avg_num_files = np.mean([len(part) for part in split_indices])
56+
logger.info("Created {} jobs with {} files each.".format(len(jobs), avg_num_files))
2957
return jobs
3058

3159

@@ -45,7 +73,17 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine,
4573
@click.option("--log_level", type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO')
4674
@click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int)
4775
@click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .')
48-
def main(file_list, jar, xml, aux_source, out, queue, walltime, engine, num_jobs, vmem, log_level, port, local):
76+
@click.option('--local_output', default=False, is_flag=True,
77+
help='Flag indicating whether jobs write their output localy'
78+
+ 'to disk without gathering everything in the mother'
79+
+ 'process. In this case the output file only contains a'
80+
+ 'summary oth the processed jobs. The data ouput will be'
81+
+ 'in separate files',
82+
show_default=True)
83+
@click.option('--local_output_format', default="{basename}_{num}.json", help="Give the file format for the local output funktionality."
84+
+ "%b will replace the out filename and %[1-9]n the given local number."
85+
+ "Default is: '{basename}_{num}.json'.Only works with option --local_output. ")
86+
def main(file_list, jar, xml, aux_source, out, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output, local_output_format):
4987
'''
5088
Specify the path to a .json file as created by the fetch_runs.py script via the FILE_LIST argument.
5189
num_jobs will be created and executed on the cluster.
@@ -74,8 +112,19 @@ def main(file_list, jar, xml, aux_source, out, queue, walltime, engine, num_jobs
74112
os.makedirs(output_directory, exist_ok=True)
75113
logger.info("Writing output and temporary data to {}".format(output_directory))
76114

77-
78-
job_list = make_jobs(jarpath, xmlpath, aux_source_path, output_directory, df, engine, queue, vmem, num_jobs, walltime)
115+
if local_output:
116+
job_list = make_jobs(jarpath, xmlpath, aux_source_path,
117+
output_directory, df, engine, queue,
118+
vmem, num_jobs, walltime,
119+
output_path=local_output_dir,
120+
filename_format=local_output_format
121+
)
122+
else:
123+
job_list = make_jobs(jarpath, xmlpath, aux_source_path,
124+
output_directory, df, engine, queue,
125+
vmem, num_jobs, walltime,
126+
)
127+
79128
job_outputs = gridmap.process_jobs(job_list, max_processes=num_jobs, local=local)
80129
erna.collect_output(job_outputs, out, df)
81130

erna/utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,16 @@ def check_environment_on_node():
9393
subprocess.check_call(['which', 'java'])
9494
subprocess.check_call(['free', '-m'])
9595
subprocess.check_call(['java', '-Xmx512m', '-version'])
96+
97+
import re
98+
99+
def create_filename_from_format(filename_format, basename, num):
100+
"""
101+
Given a special format string, create a filename_format with the basename and a given number.
102+
There are two named variables that can be used, one is basename which inserts the basename
103+
and the second one is num which is mandatory.
104+
"""
105+
m = re.search('\{num', filename_format)
106+
if not m:
107+
raise ValueError("Missing named placeholder 'num' in format string")
108+
return filename_format.format({"basename":basename, "num":num})

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
setup(
44
name='erna',
5-
version='0.8.1',
5+
version='0.8.2',
66
description='Easy RuN Access. Tools that help to do batch processing of FACT data',
77
url='https://github.com/fact-project/erna',
88
author='Kai Brügge, Jens Buss, Maximilian Nöthe',

0 commit comments

Comments
 (0)