-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdcmmeta2tsv.py
executable file
·243 lines (192 loc) · 7.34 KB
/
dcmmeta2tsv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#!/usr/bin/env python3
"""
Give a tab separated metadata value line per dicom file.
"""
import logging
import os
import re
import sys
import warnings
from typing import Optional, TypedDict
import pydicom
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# UserWarning: The DICOM readers are highly experimental...
from nibabel.nicom import csareader
logging.basicConfig(level=os.environ.get("LOGLEVEL", logging.INFO))
class NULLVAL:
"""Container to imitate ``pydicom.dcmread``.
object that has ``obj.value`` for when a dicom tag does not exist.
Using "null" to match AFNI's dicom_hinfo missing text"""
value: str = "null"
TagTuple = tuple[int, int]
def tagpair_to_hex(csv_str) -> TagTuple:
"""
move our text files has tags like "0051,1017"
to pydicom indexe like (0x51,0x1017)
:param csv_str: comma separated string to convert
:return: dicom header tag hex pair
>>> tagpair_to_hex("0051,1017")
('0x51', '0x1017')
"""
return tuple(hex(int(x, 16)) for x in csv_str.split(","))
TagDicts = list[
TypedDict("Tag", {"name": str, "tag": TagTuple, "loc": str, "desc": str})
]
#: keys are names from ``taglist.txt``, also has ``dcm_path`` key for file
TagValues = dict[str, str]
def read_known_tags(tagfile="taglist.txt") -> TagDicts:
"""
read in tsv file like with header name,tag,desc.
skip comments and header
:param tagfile: text tsv file to get name,tag(hex pair),desc from
:return: file parsed into a list of dictonaires
"""
with open(tagfile, "r") as f:
tags = [
dict(zip(["name", "tag", "desc"], line.split("\t")))
for line in f.readlines()
if not re.search("^name|^#", line)
]
# second pass to make '0x0018,0x0080' into (0x0018,0x0080)
for i in range(len(tags)):
if re.search("^[0-9]{4},", tags[i]["tag"]):
tags[i]["tag"] = tagpair_to_hex(tags[i]["tag"])
tags[i]["loc"] = "header"
elif tags[i]["name"] == "shims":
tags[i]["loc"] = "asccov"
else:
tags[i]["loc"] = "csa"
return tags
def csa_fetch(csa_tr: dict, itemname: str) -> str:
"""
safely look into ``csa_tr`` dicom dictionary.
Expect nested structure like ``'tags'->itemname->'items'->[0]``.
In future, might want to check itemname and pull out more than the first array item.
>>> csa_fetch({'notags':'badinput'}, 'PhaseEncodingDirectionPositive')
'null'
>>> csa_fetch({'tags':{'ImaPATModeText': {'items': [1]}}}, 'ImaPATModeText')
1
"""
try:
val = csa_tr["tags"][itemname]["items"]
val = val[0] if val else NULLVAL.value
except KeyError:
val = NULLVAL.value
return val
def read_shims(csa_s: Optional[dict]) -> list:
"""
:param: csa_s ``0x0029,0x1020`` CSA **Series** Header Info::
csa_s = dcmmeta2tsv.read_csa(dcm.get(())
:return: list of shim values in order of CHM matlab code
CHM maltab code concats
sAdjData.uiAdjShimMode
sGRADSPEC.asGPAData[0].lOffset{X,Y,Z}
sGRADSPEC.alShimCurrent[0:4]
sTXSPEC.asNucleusInfo[0].lFrequency
>>> csa_s = pydicom.dcmread('example_dicoms/RewardedAnti_good.dcm').get((0x0029, 0x1020))
>>> ",".join(read_shims(read_csa(csa_s)))
'1174,-2475,4575,531,-20,59,54,-8,123160323,4'
>>> read_shims(None) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
['null', ...'null']
"""
if csa_s is None:
csa_s = {}
try:
asccov = csa_s["tags"]["MrPhoenixProtocol"]["items"][0]
except KeyError:
return [NULLVAL.value] * 10
key = "|".join(
[
"sAdjData.uiAdjShimMode",
"sGRADSPEC.asGPAData\\[0\\].lOffset[XYZ]",
"sGRADSPEC.alShimCurrent\\[[0-4]\\]",
"sTXSPEC.asNucleusInfo\\[0\\].lFrequency",
]
)
# keys are like
# sGRADSPEC.asGPAData[0].lOffsetX\t = \t1174
reg = re.compile(f"({key})\\s*=\\s*([^\\s]+)")
res = reg.findall(asccov)
# could be more rigerous about order by moving tuple results into dict
return [x[1] for x in res]
def read_csa(csa) -> Optional[dict]:
"""
extract parameters from siemens CSA
:param csa: content of siemens private tag (0x0029, 0x1010)
:return: nibabel's csareader dictionary or None if cannot read
>>> read_csa(None) is None
True
"""
if csa is None:
return None
csa = csa.value
try:
csa_tr = csareader.read(csa)
except csareader.CSAReadError:
return None
return csa_tr
def read_tags(dcm_path: os.PathLike, tags: TagDicts) -> TagValues:
"""
Read dicom header and isolate tags
:param dcm_path: dicom file with headers to extract
:param tags: ordered dictionary with 'tag' key as hex pair, see :py:func:`tagpair_to_hex`
:return: dict[tag,value] values in same order as ``tags``
>>> tr = {'name': 'TR', 'tag': (0x0018,0x0080), 'loc': 'header'}
>>> ipat = {'name': 'iPAT', 'tag': 'ImaPATModeText', 'loc': 'csa'}
>>> list(read_tags('example_dicoms/RewardedAnti_good.dcm', [ipat, tr]).values())
['p2', '1300.0', 'example_dicoms/RewardedAnti_good.dcm']
>>> list(read_tags('example_dicoms/DNE.dcm', [ipat,tr]).values())
['null', 'null', 'example_dicoms/DNE.dcm']
"""
if not os.path.isfile(dcm_path):
raise Exception(f"Bad path to dicom: '{dcm_path}' DNE")
try:
dcm = pydicom.dcmread(dcm_path)
except pydicom.errors.InvalidDicomError:
logging.error("cannot read header in %s", dcm_path)
nulldict = {tag["name"]: "null" for tag in tags}
nulldict["dcm_path"] = dcm_path
return nulldict
out = dict()
csa = read_csa(dcm.get((0x0029, 0x1010)))
for tag in tags:
k = tag["name"]
if k == "Shims":
# 20241118: add shims
csa_s = read_csa(dcm.get((0x0029, 0x1020)))
shims = read_shims(csa_s)
out[k] = ",".join(shims)
elif tag["loc"] == "csa":
out[k] = csa_fetch(csa, tag["tag"]) if csa is not None else NULLVAL.value
else:
out[k] = dcm.get(tag["tag"], NULLVAL).value
# 20241120: watch out for comments with newlines or tabs
# can maybe just change 'Comments' instead of everything
if type(out[k]) is str:
out[k] = out[k].replace("\t", " ").replace("\n", " ")
out["dcm_path"] = dcm_path
return out
class DicomTagReader:
"""Class to cache :py:func:`read_known_tags` output"""
def __init__(self):
self.tags = read_known_tags()
def read_dicom_tags(self, dcm_path: os.PathLike) -> TagValues:
"""return values of dicom header priority fields
ordered as defined in ``taglist.txt``
:param dcm_path: path to dciom
:return: dict[taglist.txt tagname, tag value]
>>> dtr = DicomTagReader()
>>> hdr = dtr.read_dicom_tags('example_dicoms/RewardedAnti_good.dcm')
>>> list(hdr.values()) # doctest: +ELLIPSIS
[1, 'p2', '154833.265000', '20220913', ...
>>> list(hdr.values())[-1]
'example_dicoms/RewardedAnti_good.dcm'
"""
return read_tags(dcm_path, self.tags)
if __name__ == "__main__":
dtr = DicomTagReader()
logging.info("processing %d dicom files", len(sys.argv) - 1)
for dcm_path in sys.argv[1:]:
all_tags = dtr.read_dicom_tags(dcm_path).values()
print("\t".join([str(x) for x in all_tags]))