Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import argparse
import os
from typing import List
from mu_map.file.dicom import dcm_type
def get_files_recursive(_dir: str) -> List[str]:
"""
Recursively get all files in a directory. This means that all
sub-directories are recursively searched until a file is reached.
:param _dir: the directory of which files are listed
:return: a list of files in the directory and its sub-directories
"""
files: List[str] = []
for _file in os.listdir(_dir):
_file = os.path.join(_dir, _file)
if os.path.isdir(_file):
files.extend(get_files_recursive(_file))
else:
files.append(_file)
return files
def is_scatter_corrected(dcm: dcm_type) -> bool:
return not ("NoSC" in dcm.SeriesDescription)
def is_attenuation_corrected(dcm: dcm_type) -> bool:
return not ("NoAC" in dcm.SeriesDescription)
def get_type(dcm: dcm_type) -> str:
description = dcm.SeriesDescription.lower()
if "syn" in description:
return "synthetic"
elif "ct" in description:
return "ct"
else:
return "symbia"
headers = argparse.Namespace()
headers.id = "id"
headers.scatter_correction = "scatter_correction"
headers.sc = headers.scatter_correction
headers.attenuation_correction = "attenuation_correction"
headers.ac = headers.attenuation_correction
headers.mu_map_type = "mu_map_type"
headers.type = headers.mu_map_type
headers.file = "file"
if __name__ == "__main__":
import cv2 as cv
import numpy as np
import pandas as pd
from mu_map.data.prepare import get_protocol
from mu_map.data.prepare import headers as meta_headers
from mu_map.file.dicom import load_dcm, DCM_TAG_PIXEL_SCALE_FACTOR, dcm_type
from mu_map.logging import add_logging_args, get_logger_by_args
parser = argparse.ArgumentParser(
description="Store/Sort in polar maps for further processing",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--polar_map_dir",
type=str,
required=True,
help="directory where the raw DICOM files of polar maps are stored",
)
parser.add_argument(
"--out_dir", type=str, required=True, help="directory where files are output to"
)
parser.add_argument(
"--images_dir",
type=str,
default="images",
help="directory under <out_dir> where images of polar maps are stored",
)
parser.add_argument(
"--csv",
type=str,
default="polar_maps.csv",
help="file unter <out_dir> where meta information is stored",
)
parser.add_argument(
"--meta_csv",
type=str,
required=True,
help="the csv file containing meta information of the dataset of which the generated polar maps are",
)
parser.add_argument(
"--rect",
type=int,
nargs=4,
default=[21, 598, 425, 1002],
help="rectangle as [top, left, bottom, right] coordinates where the polar map is found in the DICOM images",
)
parser.add_argument(
"-i",
"--interactive",
action="store_true",
help="start interactive mode where every polar map is displayed and confirmation is queried",
)
add_logging_args(parser, defaults={"--logfile": "prepare.log"})
args = parser.parse_args()
args.images_dir = os.path.join(args.out_dir, args.images_dir)
args.csv = os.path.join(args.out_dir, args.csv)
args.logfile = os.path.join(args.out_dir, args.logfile)
args.loglevel = "DEBUG" if args.interactive else args.loglevel
if args.interactive:
print(
"""
Start in interactive mode.
This mode automatically sets the loglevel to debug so that you can see what is going in.
Use the following keys on the displayed polar map:
n(ext): go to the next image without saving the polar map
s(ave): save the polar map and go to the next
q(uit): quit the application
"""
)
logger = get_logger_by_args(args)
logger.info(args)
if not os.path.exists(args.out_dir):
os.mkdir(args.out_dir)
if not os.path.exists(args.images_dir):
os.mkdir(args.images_dir)
meta = pd.read_csv(args.meta_csv)
dcm_files = sorted(get_files_recursive(args.polar_map_dir))
data = pd.DataFrame(
{
headers.id: [],
headers.sc: [],
headers.ac: [],
headers.type: [],
headers.file: [],
}
)
data[headers.id] = data[headers.sc].astype(int)
data[headers.sc] = data[headers.sc].astype(bool)
data[headers.ac] = data[headers.ac].astype(bool)
data[headers.type] = data[headers.ac].astype(str)
data[headers.file] = data[headers.ac].astype(str)
for dcm_file in dcm_files:
logger.debug(f"Process file {dcm_file}")
dcm, img = load_dcm(dcm_file)
protocol = get_protocol(dcm)
patient_id = int(dcm.PatientID)
meta_row = meta[
(meta[meta_headers.patient_id] == patient_id)
& (meta[meta_headers.protocol] == protocol)
].iloc[0]
row = {
headers.id: meta_row[meta_headers.id],
headers.sc: is_scatter_corrected(dcm),
headers.ac: is_attenuation_corrected(dcm),
headers.type: get_type(dcm),
}
top, left, bottom, right = args.rect
polar_map = img[top:bottom, left:right]
polar_map = cv.cvtColor(polar_map, cv.COLOR_RGB2BGR)
_ac = "ac" if row[headers.ac] else "nac"
_sc = "sc" if row[headers.sc] else "nsc"
_file = f"{row[headers.id]:04d}-{_ac}_{_sc}_{row[headers.type]}.png"
row[headers.file] = _file
_file = os.path.join(args.images_dir, _file)
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
if (
len(
data[
(data[headers.id] == row[headers.id])
& (data[headers.sc] == row[headers.sc])
& (data[headers.ac] == row[headers.ac])
& (data[headers.type] == row[headers.type])
]
)
> 0
):
logger.warning(f"Skip {dcm_file} as it is a duplicate for row: {row}")
continue
logger.debug(f"For series {dcm.SeriesDescription} store row {row}")
store = not args.interactive
if args.interactive:
while True:
cv.imshow("Polar Map", cv.resize(polar_map, (512, 512)))
key = cv.waitKey(0)
if key == ord("q"):
exit(0)
elif key == ord("n"):
break
elif key == ord("s"):
store = True
break
if store:
cv.imwrite(_file, polar_map)
row = pd.DataFrame(row, index=[0])
data = pd.concat((data, row), ignore_index=True)
data = data.sort_values(by=[headers.id, headers.file])
data.to_csv(args.csv, index=False)