Newer
Older
import argparse
import os
from typing import List
def get_files_recursive(_dir: str) -> List[str]:
"""
Recursively get all files in a directory. This means that all
sub-directories are recursively searched until a file is reached.
:param _dir: the directory of which files are listed
:return: a list of files in the directory and its sub-directories
"""
files: List[str] = []
for _file in os.listdir(_dir):
_file = os.path.join(_dir, _file)
if os.path.isdir(_file):
files.extend(get_files_recursive(_file))
else:
files.append(_file)
return files
def is_scatter_corrected(dcm: DICOM) -> bool:
def is_attenuation_corrected(dcm: DICOM) -> bool:
return not ("NoAC" in dcm.SeriesDescription)
def shows_segments(dcm: DICOM) -> bool:
description = dcm.SeriesDescription.lower()
if "DLAC" in description:
return "dl"
elif "CTAC" in description:
return "ct"
else:
return "symbia"
headers = argparse.Namespace()
headers.id = "id"
headers.scatter_correction = "scatter_correction"
headers.sc = headers.scatter_correction
headers.attenuation_correction = "attenuation_correction"
headers.ac = headers.attenuation_correction
headers.mu_map_type = "mu_map_type"
headers.type = headers.mu_map_type
headers.file = "file"
if __name__ == "__main__":
import cv2 as cv
import numpy as np
import pandas as pd
from mu_map.data.prepare import get_protocol
from mu_map.data.prepare import headers as meta_headers
from mu_map.file.dicom import load_dcm, DCM_TAG_PIXEL_SCALE_FACTOR, DICOM
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from mu_map.logging import add_logging_args, get_logger_by_args
parser = argparse.ArgumentParser(
description="Store/Sort in polar maps for further processing",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--polar_map_dir",
type=str,
required=True,
help="directory where the raw DICOM files of polar maps are stored",
)
parser.add_argument(
"--out_dir", type=str, required=True, help="directory where files are output to"
)
parser.add_argument(
"--images_dir",
type=str,
default="images",
help="directory under <out_dir> where images of polar maps are stored",
)
parser.add_argument(
"--csv",
type=str,
default="polar_maps.csv",
help="file unter <out_dir> where meta information is stored",
)
parser.add_argument(
"--meta_csv",
type=str,
required=True,
help="the csv file containing meta information of the dataset of which the generated polar maps are",
)
parser.add_argument(
type=int,
nargs=4,
default=[22, 1499, 22 + 588, 1500 + 588],
help="rectangle as [top, left, bottom, right] coordinates where the polar map is found in the DICOM images",
)
parser.add_argument(
"--rect_lower",
type=int,
nargs=4,
default=[634, 1499, 634 + 588, 1500 + 588],
help="rectangle as [top, left, bottom, right] coordinates where the polar map is found in the DICOM images",
)
parser.add_argument(
"-i",
"--interactive",
action="store_true",
help="start interactive mode where every polar map is displayed and confirmation is queried",
)
add_logging_args(parser, defaults={"--logfile": "prepare.log"})
args = parser.parse_args()
args.images_dir = os.path.join(args.out_dir, args.images_dir)
args.csv = os.path.join(args.out_dir, args.csv)
args.logfile = os.path.join(args.out_dir, args.logfile)
args.loglevel = "DEBUG" if args.interactive else args.loglevel
if args.interactive:
print(
"""
Start in interactive mode.
This mode automatically sets the loglevel to debug so that you can see what is going on.
Use the following keys on the displayed polar map:
n(ext): go to the next image without saving the polar map
s(ave): save the polar map and go to the next
q(uit): quit the application
"""
)
logger = get_logger_by_args(args)
logger.info(args)
if not os.path.exists(args.out_dir):
os.mkdir(args.out_dir)
if not os.path.exists(args.images_dir):
os.mkdir(args.images_dir)
meta = pd.read_csv(args.meta_csv)
meta[meta_headers.datetime_acquisition] = pd.to_datetime(meta[meta_headers.datetime_acquisition])
dcm_files = sorted(get_files_recursive(args.polar_map_dir))
data = pd.DataFrame(
{
headers.id: [],
headers.sc: [],
headers.ac: [],
headers.type: [],
headers.file: [],
}
)
data[headers.id] = data[headers.id].astype(int)
data[headers.sc] = data[headers.sc].astype(bool)
data[headers.ac] = data[headers.ac].astype(bool)
data[headers.segments] = data[headers.segments].astype(bool)
data[headers.type] = data[headers.type].astype(str)
data[headers.file] = data[headers.file].astype(str)
for dcm_file in dcm_files:
logger.debug(f"Process file {dcm_file}")
dcm, img = load_dcm(dcm_file)
protocol = get_protocol(dcm)
patient_id = int(dcm.PatientID)
(meta[meta_headers.patient_id] == patient_id)
& (meta[meta_headers.protocol] == protocol)
& (meta[meta_headers.datetime_acquisition] == DICOMTime.Acquisition.to_datetime(dcm))
]
assert len(meta_rows) == 1
meta_row = meta_rows.iloc[0]
row = {
headers.id: meta_row[meta_headers.id],
headers.sc: is_scatter_corrected(dcm),
headers.ac: is_attenuation_corrected(dcm),
headers.segments: shows_segments(dcm),
types = dcm.SeriesDescription.split("-")[-1]
types = types[:-1].strip()
types = types.split("/")
for _type, rect in zip(types, [args.rect_upper, args.rect_lower]):
top, left, bottom, right = rect
polar_map = img[top:bottom, left:right]
polar_map = cv.cvtColor(polar_map, cv.COLOR_RGB2BGR)
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
row[headers.type] = _type[:2].lower()
_seg = "segments" if row[headers.segments] else "gray"
_file = f"{row[headers.id]:04d}-{_seg}_ac_nsc_{row[headers.type]}.png"
row[headers.file] = _file
_file = os.path.join(args.images_dir, _file)
logger.debug(f"For series {dcm.SeriesDescription} store row {row}")
store = not args.interactive
if args.interactive:
while True:
cv.imshow("Polar Map", cv.resize(polar_map, (1024, 1024)))
key = cv.waitKey(0)
if key == ord("q"):
exit(0)
elif key == ord("n"):
break
elif key == ord("s"):
store = True
break
if store:
cv.imwrite(_file, polar_map)
data = pd.concat((data, pd.DataFrame(row, index=[0])), ignore_index=True)
data = data.sort_values(by=[headers.id, headers.file])
data.to_csv(args.csv, index=False)