-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathscale.py
153 lines (132 loc) · 5.42 KB
/
scale.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import os, sys
import statistics, math
import json
from pathlib import Path
import subprocess
import shutil
from argparse import ArgumentParser
#return average and stdev of number of accepted reflections for a given dictionary
def avg_reflections(d):
for entry, value in d.items():
rlst = [y['accepted_reflections'] for (x,y) in value.items() if y['processing_successful'] and y['accepted_reflections'] is not None ]
return statistics.mean(rlst), statistics.stdev(rlst)
#removes datasets with accepted reflections below some multiple of the stdev
def filter_datasets(d, threshold=2):
l = {}
avg, sd = avg_reflections(d)
for entry, value in d.items():
for entry2, value2 in value.items():
if value2['accepted_reflections'] is not None and int(value2['accepted_reflections']) > avg - (threshold * sd) :
l[entry2] = value2
return l
def generate_xscale_directory(path):
xs_path = Path(path / 'xscale')
xs_path.mkdir(exist_ok=True)
return xs_path
def get_xscale_directory(path):
return Path(path / 'xscale')
def generate_xs_header(sg, unitcell, output_file="temp.ahkl"):
return """OUTPUT_FILE={c}
!SPACE_GROUP_NUMBER={a}
!UNIT_CELL_CONSTANTS={b}
SAVE_CORRECTION_IMAGES=FALSE
WFAC1=1
PRINT_CORRELATIONS=FALSE\n""".format(a=sg, b=unitcell, c=output_file)
def generate_xscaleINP(d, args, xsdir):
l = filter_datasets(d)
if args.assert_P1 or args.spacegroup == 0:
with open(Path(xsdir / 'XSCALE.INP'), 'w') as f:
f.write(generate_xs_header(1, [y for x,y in l.items()][0]['unit_cell']))
f.write(generate_xs_input(l))
else:
with open(Path(xsdir / 'XSCALE.INP'),'w') as f:
f.write(generate_xs_header(args.spacegroup,args.unitcell))
f.write(generate_xs_input(l))
def generate_xs_input(d):
inputstr = """"""
for entry, value in d.items():
inputstr += "INPUT_FILE={}/XDS_ASCII.HKL\n".format(value['path'])
return inputstr
def run_xscale(xsdir):
# Run XDS in the datawell derectory:
f = open(Path(xsdir / 'xscale.log'), "w")
proc = subprocess.Popen(r"xscale_par", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, cwd=xsdir)
for line in proc.stdout:
sys.stdout.write(line)
f.write(line)
proc.wait()
f.close()
with open(Path(xsdir / 'XSCALE.LP'),'r') as f:
line = f.readline()
while not 'SUBSET OF INTENSITY DATA WITH SIGNAL/NOISE >= -3.0 AS FUNCTION OF RESOLUTION' in line:
line = f.readline()
while not 'total' in line:
print(line, end='')
line = f.readline()
print(line)
def run_isocluster(xsdir,out='temp.ahkl'):
f = open(Path(xsdir / 'xscale_isocluster.log'), "w")
subprocess.call(r"xscale_isocluster {a}".format(a=out), stdout=f, shell=True, cwd=xsdir)
f.close()
def sort_isocluster(xsdir):
input_list = []
try:
with open(Path(xsdir / 'XSCALE.1.INP')) as f:
line = f.readline()
while line:
if line.startswith('INPUT_FILE'):
inp = line.rstrip()
line = f.readline()
x = line.split()
y = float(x[6]) * math.cos(float(x[7])*(3.14159/180))
input_list.append([inp, y, x[8]])
else:
line = f.readline()
result = sorted(input_list, key=lambda x:(x[1] ), reverse=True)
except:
sys.exit("XSCALE_isocluster failed.")
return result
def filter_isocluster(input_list, threshold=0.6):
result = []
for line in input_list:
if line[1] >= threshold:
result.append(line)
if len(result) == 0:
sys.exit("Cluster strengths are too low for a reasonable solution.")
return result
def copy_xscale_results(xsdir, suffix='old'):
if Path(xsdir / 'XSCALE.INP').is_file():
shutil.copy(Path(xsdir / 'XSCALE.INP'), Path(xsdir / 'XSCALE_{}.INP'.format(suffix)))
if Path(xsdir / 'XSCALE.LP').is_file():
shutil.copy(Path(xsdir / 'XSCALE.LP'), Path(xsdir / 'XSCALE_{}.LP'.format(suffix)))
if Path(xsdir / 'XSCALE.log').is_file():
shutil.copy(Path(xsdir / 'XSCALE.log'), Path(xsdir / 'XSCALE_{}.log'.format(suffix)))
def gen_sorted_xscaleINP(md_list, args, xsdir):
with open(Path(xsdir / 'XSCALE_isocluster.INP'),'w') as f:
f.write(generate_xs_header(args.spacegroup,args.unitcell))
for line in md_list:
f.write('{}\n'.format(line[0]))
f.write('NBATCH=3 CORRECTIONS=ALL\n')
def rerun_xscale(xsdir):
shutil.copy(Path(xsdir / 'XSCALE_isocluster.INP'), Path(xsdir / 'XSCALE.INP'))
run_xscale(xsdir)
def dict_from_json(jsonfile):
return json.load(open(jsonfile))
def main():
parser = ArgumentParser(description=
"""
Run xscale and xscale_isocluster.
""")
parser.add_argument('-i', '--input', type=lambda p: Path(p, exists=True).absolute(),
help='Path of restuls.json file')
parser.add_argument('--assert_P1', action='store_true', help="Assert P1 Space Group")
parser.add_argument('-s', '--spacegroup', type=int, help='Space Group Number')
parser.add_argument('-u', '--unitcell', type=str, help='Unit Cell')
parser.parse_args()
args = parser.parse_args()
xsdir = generate_xscale_directory(args.input.parent)
d = dict_from_json(args.input)
generate_xscaleINP(d, args, xsdir)
run_xscale(xsdir)
if __name__=='__main__':
main()