Skip to content
Snippets Groups Projects
Commit a39d7b07 authored by Tamino Huxohl's avatar Tamino Huxohl
Browse files
parents 7966db83 c2d5166e
No related branches found
No related tags found
No related merge requests found
from dataclasses import dataclass
from typing import Tuple
import cv2 as cv
import numpy as np
import pytesseract
@dataclass
class PolarMapSegment:
id: int
rect: Tuple[int, int, int, int]
name: str
location: str
# see https://www.ahajournals.org/doi/full/10.1161/hc0402.102975
SEGMENTS = [
PolarMapSegment( id=1, location="basal", name="anterior", rect=( 14, 190, 41, 217)),
PolarMapSegment( id=2, location="basal", name="anteroseptal", rect=(101, 36, 128, 63)),
PolarMapSegment( id=3, location="basal", name="inferoseptal", rect=(277, 36, 304, 63)),
PolarMapSegment( id=4, location="basal", name="inferior", rect=(365, 190, 392, 217)),
PolarMapSegment( id=5, location="basal", name="inferolateral", rect=(277, 341, 304, 370)),
PolarMapSegment( id=6, location="basal", name="anterolateral", rect=(101, 340, 128, 367)),
PolarMapSegment( id=7, location="mid", name="anterior", rect=( 64, 190, 91, 217)),
PolarMapSegment( id=8, location="mid", name="anteroseptal", rect=(128, 80, 155, 107)),
PolarMapSegment( id=9, location="mid", name="inferoseptal", rect=(251, 80, 278, 107)),
PolarMapSegment(id=10, location="mid", name="inferior", rect=(313, 188, 340, 215)),
PolarMapSegment(id=11, location="mid", name="inferolateral", rect=(251, 298, 278, 325)),
PolarMapSegment(id=12, location="mid", name="anterolateral", rect=(128, 298, 155, 325)),
PolarMapSegment(id=13, location="apical", name="anterior", rect=(115, 190, 142, 217)),
PolarMapSegment(id=14, location="apical", name="septal", rect=(190, 113, 217, 140)),
PolarMapSegment(id=15, location="apical", name="inferior", rect=(265, 188, 292, 215)),
PolarMapSegment(id=16, location="apical", name="lateral", rect=(190, 265, 217, 292)),
PolarMapSegment(id=17, location="apex", name="apex", rect=(190, 190, 217, 217)),
]
hsv_green_lower = np.array([40, 100, 100])
hsv_green_upper = np.array([80, 255, 255])
if __name__ == "__main__":
import argparse
import os
import pandas as pd
from mu_map.data.prepare_polar_maps import headers
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--polar_map_dir", type=str, required=True, help="directory where files are output to"
)
parser.add_argument(
"--images_dir",
type=str,
default="images",
help="directory under <out_dir> where images of polar maps are stored",
)
parser.add_argument(
"--csv",
type=str,
default="polar_maps.csv",
help="file unter <out_dir> where meta information is stored",
)
parser.add_argument(
"--perfusion_csv",
type=str,
default="perfusion.csv",
help="csv file output by this script under polar_map_dir",
)
parser.add_argument(
"--number_res",
type=int,
default=128,
help="numbers cutouts are rescaled to this resolution for better number recognition",
)
args = parser.parse_args()
args.images_dir = os.path.join(args.polar_map_dir, args.images_dir)
args.csv = os.path.join(args.polar_map_dir, args.csv)
args.perfusion_csv = os.path.join(args.polar_map_dir, args.perfusion_csv)
data = pd.read_csv(args.csv)
rows = []
for i in range(len(data)):
row = data.iloc[i]
_id = row[headers.id]
_file = os.path.join(args.images_dir, row[headers.file])
polar_map = cv.imread(_file)
for segment in SEGMENTS:
# extract number segment
top, left, bottom, right = segment.rect
img_number = polar_map[top:bottom, left:right]
# process segment for improved automatic number detection
img_number = cv.resize(img_number, (args.number_res, args.number_res))
img_number = cv.cvtColor(img_number, cv.COLOR_BGR2HSV)
img_number = cv.inRange(img_number, hsv_green_lower, hsv_green_upper)
img_number = cv.morphologyEx(img_number, cv.MORPH_CLOSE, np.ones((4, 4), np.uint8))
img_number = cv.morphologyEx(img_number, cv.MORPH_OPEN, np.ones((2, 2), np.uint8))
_, img_number = cv.threshold(img_number, 0, 255, cv.THRESH_BINARY_INV)
# try to recognize number
str_number = pytesseract.image_to_string(img_number, config="-c tessedit_char_whitelist=[1,2,3,4,5,6,7,8,9,0] --psm 7")
str_number = str_number.strip()
# prepare image for visualization
_polar_map = polar_map.copy()
_polar_map = cv.rectangle(_polar_map, (left, top), (right, bottom), (255, 255, 255), 1)
_polar_map = cv.resize(_polar_map, (512, 512))
img_number = img_number.repeat(3).reshape((*img_number.shape, 3))
img_number = cv.resize(img_number, (512, 512))
space_h = np.full((512, 10, 3), 239, np.uint8)
cv.imshow("Polar Map - Segment", np.hstack((_polar_map, space_h, img_number)))
cv.waitKey(50)
while True:
_input = input(f"Number is {str_number} (y/other): ")
try:
if _input == "y":
number = int(str_number)
break
elif _input == "q":
exit(0)
else:
number = int(_input)
break
except ValueError:
print(f"Cannot parse {_input} as a number. Please enter a valid number.")
_value = pd.Series({f"segment_{segment.id}": number})
row = pd.concat([row, _value])
rows.append(row)
pd.DataFrame(rows).to_csv(args.perfusion_csv)
import argparse
import os
from typing import List
from mu_map.file.dicom import dcm_type
def get_files_recursive(_dir: str) -> List[str]:
"""
Recursively get all files in a directory. This means that all
sub-directories are recursively searched until a file is reached.
:param _dir: the directory of which files are listed
:return: a list of files in the directory and its sub-directories
"""
files: List[str] = []
for _file in os.listdir(_dir):
_file = os.path.join(_dir, _file)
if os.path.isdir(_file):
files.extend(get_files_recursive(_file))
else:
files.append(_file)
return files
def is_scatter_corrected(dcm: dcm_type) -> bool:
return not ("NoSC" in dcm.SeriesDescription)
def is_attenuation_corrected(dcm: dcm_type) -> bool:
return not ("NoAC" in dcm.SeriesDescription)
def get_type(dcm: dcm_type) -> str:
description = dcm.SeriesDescription.lower()
if "syn" in description:
return "synthetic"
elif "ct" in description:
return "ct"
else:
return "symbia"
headers = argparse.Namespace()
headers.id = "id"
headers.scatter_correction = "scatter_correction"
headers.sc = headers.scatter_correction
headers.attenuation_correction = "attenuation_correction"
headers.ac = headers.attenuation_correction
headers.mu_map_type = "mu_map_type"
headers.type = headers.mu_map_type
headers.file = "file"
if __name__ == "__main__":
import cv2 as cv
import numpy as np
import pandas as pd
from mu_map.data.prepare import get_protocol
from mu_map.data.prepare import headers as meta_headers
from mu_map.file.dicom import load_dcm, DCM_TAG_PIXEL_SCALE_FACTOR, dcm_type
from mu_map.logging import add_logging_args, get_logger_by_args
parser = argparse.ArgumentParser(
description="Store/Sort in polar maps for further processing",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--polar_map_dir",
type=str,
required=True,
help="directory where the raw DICOM files of polar maps are stored",
)
parser.add_argument(
"--out_dir", type=str, required=True, help="directory where files are output to"
)
parser.add_argument(
"--images_dir",
type=str,
default="images",
help="directory under <out_dir> where images of polar maps are stored",
)
parser.add_argument(
"--csv",
type=str,
default="polar_maps.csv",
help="file unter <out_dir> where meta information is stored",
)
parser.add_argument(
"--meta_csv",
type=str,
required=True,
help="the csv file containing meta information of the dataset of which the generated polar maps are",
)
parser.add_argument(
"--rect",
type=int,
nargs=4,
default=[21, 598, 425, 1002],
help="rectangle as [top, left, bottom, right] coordinates where the polar map is found in the DICOM images",
)
parser.add_argument(
"-i",
"--interactive",
action="store_true",
help="start interactive mode where every polar map is displayed and confirmation is queried",
)
add_logging_args(parser, defaults={"--logfile": "prepare.log"})
args = parser.parse_args()
args.images_dir = os.path.join(args.out_dir, args.images_dir)
args.csv = os.path.join(args.out_dir, args.csv)
args.logfile = os.path.join(args.out_dir, args.logfile)
args.loglevel = "DEBUG" if args.interactive else args.loglevel
if args.interactive:
print(
"""
Start in interactive mode.
This mode automatically sets the loglevel to debug so that you can see what is going in.
Use the following keys on the displayed polar map:
n(ext): go to the next image without saving the polar map
s(ave): save the polar map and go to the next
q(uit): quit the application
"""
)
logger = get_logger_by_args(args)
logger.info(args)
if not os.path.exists(args.out_dir):
os.mkdir(args.out_dir)
if not os.path.exists(args.images_dir):
os.mkdir(args.images_dir)
meta = pd.read_csv(args.meta_csv)
dcm_files = sorted(get_files_recursive(args.polar_map_dir))
data = pd.DataFrame(
{
headers.id: [],
headers.sc: [],
headers.ac: [],
headers.type: [],
headers.file: [],
}
)
data[headers.id] = data[headers.sc].astype(int)
data[headers.sc] = data[headers.sc].astype(bool)
data[headers.ac] = data[headers.ac].astype(bool)
data[headers.type] = data[headers.ac].astype(str)
data[headers.file] = data[headers.ac].astype(str)
for dcm_file in dcm_files:
logger.debug(f"Process file {dcm_file}")
dcm, img = load_dcm(dcm_file)
protocol = get_protocol(dcm)
patient_id = int(dcm.PatientID)
meta_row = meta[
(meta[meta_headers.patient_id] == patient_id)
& (meta[meta_headers.protocol] == protocol)
].iloc[0]
row = {
headers.id: meta_row[meta_headers.id],
headers.sc: is_scatter_corrected(dcm),
headers.ac: is_attenuation_corrected(dcm),
headers.type: get_type(dcm),
}
top, left, bottom, right = args.rect
polar_map = img[top:bottom, left:right]
polar_map = cv.cvtColor(polar_map, cv.COLOR_RGB2BGR)
_ac = "ac" if row[headers.ac] else "nac"
_sc = "sc" if row[headers.sc] else "nsc"
_file = f"{row[headers.id]:04d}-{_ac}_{_sc}_{row[headers.type]}.png"
row[headers.file] = _file
_file = os.path.join(args.images_dir, _file)
if (
len(
data[
(data[headers.id] == row[headers.id])
& (data[headers.sc] == row[headers.sc])
& (data[headers.ac] == row[headers.ac])
& (data[headers.type] == row[headers.type])
]
)
> 0
):
logger.warning(f"Skip {dcm_file} as it is a duplicate for row: {row}")
continue
logger.debug(f"For series {dcm.SeriesDescription} store row {row}")
store = not args.interactive
if args.interactive:
while True:
cv.imshow("Polar Map", cv.resize(polar_map, (512, 512)))
key = cv.waitKey(0)
if key == ord("q"):
exit(0)
elif key == ord("n"):
break
elif key == ord("s"):
store = True
break
if store:
cv.imwrite(_file, polar_map)
row = pd.DataFrame(row, index=[0])
data = pd.concat((data, row), ignore_index=True)
data = data.sort_values(by=[headers.id, headers.file])
data.to_csv(args.csv, index=False)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment