forked from diffpy/diffpy.labpdfproc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtools.py
305 lines (244 loc) · 8.69 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import copy
from pathlib import Path
from diffpy.labpdfproc.mud_calculator import compute_mud
from diffpy.utils.scattering_objects.diffraction_objects import QQUANTITIES, XQUANTITIES
from diffpy.utils.tools import get_package_info, get_user_info
WAVELENGTHS = {"Mo": 0.71, "Ag": 0.59, "Cu": 1.54}
known_sources = [key for key in WAVELENGTHS.keys()]
METADATA_KEYS_TO_EXCLUDE = ["output_correction", "force_overwrite", "input", "input_paths"]
def set_output_directory(args):
"""
set the output directory based on the given input arguments
Parameters
----------
args argparse.Namespace
the arguments from the parser
it is determined as follows:
If user provides an output directory, use it.
Otherwise, we set it to the current directory if nothing is provided.
We then create the directory if it does not exist.
Returns
-------
a Path object that contains the full path of the output directory
"""
output_dir = Path(args.output_directory).resolve() if args.output_directory else Path.cwd().resolve()
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
def _expand_user_input(args):
"""
Expands the list of inputs by adding files from file lists and wildcards.
Parameters
----------
args argparse.Namespace
the arguments from the parser
Returns
-------
the arguments with the modified input list
"""
file_list_inputs = [input_name for input_name in args.input if "file_list" in input_name]
for file_list_input in file_list_inputs:
with open(file_list_input, "r") as f:
file_inputs = [input_name.strip() for input_name in f.readlines()]
args.input.extend(file_inputs)
args.input.remove(file_list_input)
wildcard_inputs = [input_name for input_name in args.input if "*" in input_name]
for wildcard_input in wildcard_inputs:
input_files = [str(file) for file in Path(".").glob(wildcard_input) if "file_list" not in file.name]
args.input.extend(input_files)
args.input.remove(wildcard_input)
return args
def set_input_lists(args):
"""
Set input directory and files.
It takes cli inputs, checks if they are files or directories and creates
a list of files to be processed which is stored in the args Namespace.
Parameters
----------
args argparse.Namespace
the arguments from the parser
Returns
-------
args argparse.Namespace
"""
input_paths = []
args = _expand_user_input(args)
for input_name in args.input:
input_path = Path(input_name).resolve()
if input_path.exists():
if input_path.is_file():
input_paths.append(input_path)
elif input_path.is_dir():
input_files = input_path.glob("*")
input_files = [
file.resolve() for file in input_files if file.is_file() and "file_list" not in file.name
]
input_paths.extend(input_files)
else:
raise FileNotFoundError(
f"Cannot find {input_name}. Please specify valid input file(s) or directories."
)
else:
raise FileNotFoundError(f"Cannot find {input_name}.")
setattr(args, "input_paths", list(set(input_paths)))
return args
def set_wavelength(args):
"""
Set the wavelength based on the given input arguments
Parameters
----------
args argparse.Namespace
the arguments from the parser
we raise a ValueError if the input wavelength is non-positive
or if the input anode_type is not one of the known sources
Returns
-------
args argparse.Namespace
"""
if args.wavelength is not None and args.wavelength <= 0:
raise ValueError(
"No valid wavelength. Please rerun specifying a known anode_type or a positive wavelength."
)
if not args.wavelength and args.anode_type and args.anode_type not in WAVELENGTHS:
raise ValueError(
f"Anode type not recognized. Please rerun specifying an anode_type from {*known_sources, }."
)
if args.wavelength:
delattr(args, "anode_type")
elif args.anode_type:
args.wavelength = WAVELENGTHS[args.anode_type]
else:
args.wavelength = WAVELENGTHS["Mo"]
return args
def set_xtype(args):
f"""
Set the xtype based on the given input arguments, raise an error if xtype is not one of {*XQUANTITIES, }
Parameters
----------
args argparse.Namespace
the arguments from the parser
Returns
-------
args argparse.Namespace
"""
if args.xtype.lower() not in XQUANTITIES:
raise ValueError(f"Unknown xtype: {args.xtype}. Allowed xtypes are {*XQUANTITIES, }.")
args.xtype = "q" if args.xtype.lower() in QQUANTITIES else "tth"
return args
def set_mud(args):
"""
Set the mud based on the given input arguments
Parameters
----------
args argparse.Namespace
the arguments from the parser
Returns
-------
args argparse.Namespace
"""
if args.z_scan_file:
filepath = Path(args.z_scan_file).resolve()
if not filepath.is_file():
raise FileNotFoundError(f"Cannot find {args.z_scan_file}. Please specify a valid file path.")
args.z_scan_file = str(filepath)
args.mud = compute_mud(filepath)
return args
def _load_key_value_pair(s):
items = s.split("=")
key = items[0].strip()
if len(items) > 1:
value = "=".join(items[1:])
return (key, value)
def load_user_metadata(args):
"""
Load user metadata into the provided argparse Namespace, raise ValueError if in incorrect format
Parameters
----------
args argparse.Namespace
the arguments from the parser
Returns
-------
the updated argparse Namespace with user metadata inserted as key-value pairs
"""
reserved_keys = vars(args).keys()
if args.user_metadata:
for item in args.user_metadata:
if "=" not in item:
raise ValueError(
"Please provide key-value pairs in the format key=value. "
"For more information, use `labpdfproc --help.`"
)
key, value = _load_key_value_pair(item)
if key in reserved_keys:
raise ValueError(f"{key} is a reserved name. Please rerun using a different key name. ")
if hasattr(args, key):
raise ValueError(f"Please do not specify repeated keys: {key}. ")
setattr(args, key, value)
delattr(args, "user_metadata")
return args
def load_user_info(args):
"""
Update username and email using get_user_info function from diffpy.utils
Parameters
----------
args argparse.Namespace
the arguments from the parser, default is None
Returns
-------
the updated argparse Namespace with username and email inserted
"""
config = {"username": args.username, "email": args.email}
config = get_user_info(config)
args.username = config["username"]
args.email = config["email"]
return args
def load_package_info(args):
"""
Load diffpy.labpdfproc package name and version into args using get_package_info function from diffpy.utils
Parameters
----------
args argparse.Namespace
the arguments from the parser, default is None
Returns
-------
the updated argparse Namespace with diffpy.labpdfproc name and version inserted
"""
metadata = get_package_info("diffpy.labpdfproc")
setattr(args, "package_info", metadata["package_info"])
return args
def preprocessing_args(args):
"""
Perform preprocessing on the provided argparse Namespace
Parameters
----------
args argparse.Namespace
the arguments from the parser, default is None
Returns
-------
the updated argparse Namespace with arguments preprocessed
"""
args = load_package_info(args)
args = load_user_info(args)
args = set_input_lists(args)
args.output_directory = set_output_directory(args)
args = set_wavelength(args)
args = set_xtype(args)
args = set_mud(args)
args = load_user_metadata(args)
return args
def load_metadata(args, filepath):
"""
Load relevant metadata from args
Parameters
----------
args argparse.Namespace
the arguments from the parser
Returns
-------
A dictionary with relevant arguments from the parser
"""
metadata = copy.deepcopy(vars(args))
for key in METADATA_KEYS_TO_EXCLUDE:
metadata.pop(key, None)
metadata["input_directory"] = str(filepath)
metadata["output_directory"] = str(metadata["output_directory"])
return metadata