Skip to content
Snippets Groups Projects
Commit c513d919 authored by Tamino Huxohl's avatar Tamino Huxohl
Browse files

revice documentation of data module

parent 0aa2cff7
No related branches found
No related tags found
No related merge requests found
"""
Utility script to prepare raw data for further processing.
It is highly dependant on the way the raw DICOM data were exported.
The script creates a folder containing the file `meta.csv` and another
folder containing the DICOM images sorted by an id for each study.
"""
import argparse
from datetime import datetime, timedelta
from enum import Enum
......@@ -8,7 +14,7 @@ import numpy as np
import pandas as pd
import pydicom
from mu_map.file.dicom import DICOMTime, parse_age
from mu_map.file.dicom import DICOM, DICOMTime, parse_age
from mu_map.logging import add_logging_args, get_logger_by_args
......@@ -53,13 +59,20 @@ headers.file_recon_nac_nsc = "file_recon_nac_nsc"
headers.file_mu_map = "file_mu_map"
def get_protocol(projection: pydicom.dataset.FileDataset) -> str:
def get_protocol(projection: DICOM) -> str:
"""
Get the protocol (stress, rest) of a projection image by checking if
it is part of the series description.
:param projection: pydicom image of the projection
:returns: the protocol as a string (Stress or Rest)
Parameters
----------
projection: DICOM
DICOM image of the projection
Returns
-------
str
the protocol as a string (Stress or Rest)
"""
if "stress" in projection.SeriesDescription.lower():
return "Stress"
......@@ -71,13 +84,20 @@ def get_protocol(projection: pydicom.dataset.FileDataset) -> str:
def find_projections(
dicom_images: List[pydicom.dataset.FileDataset],
) -> pydicom.dataset.FileDataset:
dicom_images: List[DICOM],
) -> List[DICOM]:
"""
Find all projections in a list of DICOM images belonging to a study.
Find a projections in a list of DICOM images belonging to a study.
Parameters
----------
dicom_images: list of DICOM
DICOM images of a study
:param dicom_images: list of DICOM images of a study
:return: the extracted DICOM image
Returns
-------
list of DICOM
all projection images in the input list
"""
_filter = filter(lambda image: "TOMO" in image.ImageType, dicom_images)
dicom_images = []
......@@ -85,9 +105,8 @@ def find_projections(
# filter for allows series descriptions
if dicom_image.SeriesDescription not in ["Stress", "Rest", "Stress_2"]:
logger.warning(
f"Skip projection with unkown protocol [{dicom_image.SeriesDescription}]"
f"Skip projection with unknown protocol [{dicom_image.SeriesDescription}]"
)
# print(f" - {DICOMTime.Study.to_datetime(di)}, {DICOMTime.Series.to_datetime(di)}, {DICOMTime.Content.to_datetime(di)}, {DICOMTime.Acquisition.to_datetime(di)}")
continue
dicom_images.append(dicom_image)
......@@ -99,13 +118,21 @@ def find_projections(
def is_recon_type(
scatter_corrected: bool, attenuation_corrected: bool
) -> Callable[[pydicom.dataset.FileDataset], bool]:
) -> Callable[[DICOM], bool]:
"""
Get a filter function for reconstructions that are (non-)scatter and/or (non-)attenuation corrected.
:param scatter_corrected: if the filter should only return true for scatter corrected reconstructions
:param attenuation_corrected: if the filter should only return true for attenuation corrected reconstructions
:returns: a filter function
Parameters
----------
scatter_corrected: bool
if the filter should only return true for scatter corrected reconstructions
attenuation_corrected: bool
if the filter should only return true for attenuation corrected reconstructions
Returns
-------
Callable[DICOM, bool]
a filter function that returns true if the DICOM image has the specified corrections
"""
if scatter_corrected and attenuation_corrected:
filter_str = " SC - AC "
......@@ -120,19 +147,29 @@ def is_recon_type(
def find_reconstruction(
dicom_images: List[pydicom.dataset.FileDataset],
projection: pydicom.dataset.FileDataset,
dicom_images: List[DICOM],
projection: DICOM,
scatter_corrected: bool,
attenuation_corrected: bool,
) -> List[pydicom.dataset.FileDataset]:
) -> List[DICOM]:
"""
Find a reconstruction in a list of dicom images of a study belonging to a projection.
:param dicom_images: the list of dicom images belonging to the study
:param projection: the dicom image of the projection
:param scatter_corrected: if it should be searched fo a scatter corrected reconstruction
:param attenuation_corrected: if it should be searched fo a attenuation corrected reconstruction
:returns: the according reconstruction
Find all reconstructions in a list of DICOM images of a study belonging to a projection.
Parameters
----------
dicom_images: lost of DICOM
DICOM images belonging to the study
projection: DICOM
the DICOM image of the projection the reconstructions belong to
scatter_corrected: bool
if it should be searched for a scatter corrected reconstruction
attenuation_corrected: bool
if it should be searched fo a attenuation corrected reconstruction
Returns
-------
DICOM
the according reconstruction
"""
protocol = get_protocol(projection)
......@@ -145,15 +182,6 @@ def find_reconstruction(
lambda image: "CT" not in image.SeriesDescription, _filter
) # remove µ-maps
_filter = filter(is_recon_type(scatter_corrected, attenuation_corrected), _filter)
# _filter = list(_filter)
# print("DEBUG Reconstructions: ")
# for r in _filter:
# try:
# print(f" - {r.SeriesDescription:>50} at {DICOMTime.Study.to_datetime(r)}, {DICOMTime.Series.to_datetime(r)}, {DICOMTime.Content.to_datetime(r)}, {DICOMTime.Acquisition.to_datetime(r)}")
# except Exception as e:
# print(f"Error {e}")
_filter = filter(
lambda image: DICOMTime.Acquisition.to_datetime(image)
== DICOMTime.Acquisition.to_datetime(projection),
......@@ -177,21 +205,31 @@ def find_reconstruction(
return dicom_images[0]
MAX_TIME_DIFF_S = 30
def find_attenuation_map(
dicom_images: List[pydicom.dataset.FileDataset],
projection: pydicom.dataset.FileDataset,
reconstructions: List[pydicom.dataset.FileDataset],
) -> pydicom.dataset.FileDataset:
dicom_images: List[DICOM],
projection: DICOM,
reconstructions: List[DICOM],
max_time_diff: int = 30,
) -> DICOM:
"""
Find a reconstruction in a list of dicom images of a study belonging to a projection and reconstructions.
:param dicom_images: the list of dicom images belonging to the study
:param projection: the dicom image of the projection
:param reconstructions: dicom images of reconstructions belonging to the projection
:returns: the according attenuation map
Find an attenuation map in a list of DICOM images of a study belonging to a projection and reconstructions.
Parameters
----------
dicom_images: list of DICOM
the list of DICOM images belonging to the study
projection: DICOM
the DICOM image of the projection
reconstructions: DICOM
DICOM images of reconstructions belonging to the projection
max_time_diff: int, optional
filter out DICOM files which differ more than this value
in series time to the reconstructions
Returns
-------
DICOM
the according attenuation map
"""
protocol = get_protocol(projection)
recon_times = list(
......@@ -210,7 +248,7 @@ def find_attenuation_map(
lambda recon_time: (
DICOMTime.Series.to_datetime(image) - recon_time
).seconds
< MAX_TIME_DIFF_S,
< max_time_diff,
recon_times,
)
),
......@@ -233,26 +271,31 @@ def find_attenuation_map(
return dicom_images[0]
def get_relevant_images(
patient: pydicom.dataset.FileDataset, dicom_dir: str
) -> List[pydicom.dataset.FileDataset]:
def get_relevant_images(patient: DICOM, dicom_dir: str) -> List[DICOM]:
"""
Get all relevant images of a patient.
:param patient: pydicom dataset of a patient
:param dicom_dir: the directory of the DICOM files
:return: all relevant dicom images
Parameters
----------
patient: DICOM
DICOM dataset of a patient
dicom_dir: str
the directory of the DICOM files
Returns
-------
list of DICOM
all relevant DICOM images
"""
# get all myocardial scintigraphy studies
studies = list(
filter(
lambda child: child.DirectoryRecordType == "STUDY",
# and child.StudyDescription == "Myokardszintigraphie", # filter is disabled because there is a study without this description and only such studies are exported anyway
patient.children,
)
)
# extract all dicom images
# extract all DICOM images
dicom_images = []
for study in studies:
series = list(
......@@ -409,18 +452,6 @@ if __name__ == "__main__":
reconstructions.append(recon)
mu_map = find_attenuation_map(dicom_images, projection, reconstructions)
# for i, projection in enumerate(projections):
# _time = DICOMTime.Series.to_datetime(projection)
# print(f" - Projection: {projection.SeriesDescription:>10} at {DICOMTime.Study.to_datetime(projection)}, {DICOMTime.Series.to_datetime(projection)}, {DICOMTime.Content.to_datetime(projection)}, {DICOMTime.Acquisition.to_datetime(projection)}")
# recons = []
# for sc, ac in [(False, False), (False, True), (True, False), (True, True)]:
# r = find_reconstruction(dicom_images, projection, scatter_corrected=sc, attenuation_corrected=ac)
# print(f" - {r.SeriesDescription:>50} at {DICOMTime.Study.to_datetime(r)}, {DICOMTime.Series.to_datetime(r)}, {DICOMTime.Content.to_datetime(r)}, {DICOMTime.Acquisition.to_datetime(r)}")
# recons.append(r)
# mu_map = find_attenuation_map(dicom_images, projection, recons)
# print(f" - {mu_map.SeriesDescription:>50} at {DICOMTime.Study.to_datetime(mu_map)}, {DICOMTime.Series.to_datetime(mu_map)}, {DICOMTime.Content.to_datetime(mu_map)}")
# print(f" -")
# extract pixel spacings and assert that they are equal for all reconstruction images
_map_lists = map(
lambda image: [*image.PixelSpacing, image.SliceThickness],
......
"""
Module containing a GUI to label contours of a bed to be extracted from
an attenuation maps.
Additionally, it contains utility functions to load bed contours, remove or
add the bad.
"""
import json
from typing import Dict, List
......@@ -12,13 +18,21 @@ DEFAULT_BED_CONTOURS_FILENAME = "bed_contours.json"
def load_contours(filename: str, as_ndarry: bool = True) -> Dict[int, np.ndarray]:
"""
Load contours from a json file.
Load contours from a JSON file.
The structure of the file is a dict where the key is the id of the according
image and the value is a numpy array of the contour.
:param filename: filename of a json file containing contours
:param as_ndarry: directly parse contours as numpy arrays
:return: a dict mapping ids to contours
Parameters
----------
filename: str
filename of a JSON file containing contours
as_ndarry: bool
directly parse contours as numpy arrays
Parameters
----------
Dict
a dict mapping ids to contours either as lists of int or np.arrays
"""
with open(filename, mode="r") as f:
contours = json.load(f)
......@@ -32,13 +46,21 @@ def load_contours(filename: str, as_ndarry: bool = True) -> Dict[int, np.ndarray
return dict(_map)
def remove_bed(mu_map: np.ndarray, bed_contour: np.ndarray):
def remove_bed(mu_map: np.ndarray, bed_contour: np.ndarray) -> np.ndarray:
"""
Remove the bed defined by a contour from all slices.
:param mu_map: the mu_map from which the bed is removed.
:param bed_contour: the contour describing where the bed is found
:return: the mu_map with the bed removed
Parameters
----------
mu_map: np.ndarray
the mu_map from which the bed is removed
bed_contour: np.ndarray
the contour describing where the bed is found
Returns
-------
np.ndarray
the mu_map with the bed removed
"""
_mu_map = mu_map.copy()
for i in range(_mu_map.shape[0]):
......@@ -50,10 +72,19 @@ def add_bed(without_bed: np.ndarray, with_bed: np.ndarray, bed_contour: np.ndarr
"""
Add the bed to every slice of a mu_map.
:param without_bed: the mu_map without the bed
:param with_bed: the mu_map with the bed
:param bed_contour: the contour defining the location of the bed
:return: the mu_map with the bed added
Parameters
----------
without_bed: np.ndarray
the mu_map without the bed
with_bed: np.ndarray
the mu_map with the bed
bed_contour: np.ndarray
the contour defining the location of the bed
Returns
-------
np.ndarray
the mu_map with the bed added
"""
with_bed, without_bed = align_images(with_bed, without_bed)
......
"""
Module contains a GUI that allows to label if the first and/or last slice
of an attenuation map is broken.
In addition, there is a utility function to remove these labeled slices.
"""
from typing import Optional
import numpy as np
......@@ -8,15 +13,26 @@ HEADER_DISC_FIRST = "discard_first"
HEADER_DISC_LAST = "discard_last"
def discard_slices(row: pd.Series, μ_map: np.ndarray, recon: Optional[np.ndarray] = None) -> np.ndarray:
def discard_slices(
row: pd.Series, μ_map: np.ndarray, recon: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Discard slices based on the flags in the row of th according table.
Discard slices based on the flags in the row of the according table.
The row is expected to contain the flags 'discard_first' and 'discard_last'.
:param row: the row of meta configuration file of a dataset
:param μ_map: the μ_map
:param recon: optional reconstruction of which the same slice is discarded so that the alignment stays the same
:return: the μ_map with according slices removed
Parameters
----------
row: pd.Series
the row of meta configuration file of a dataset
μ_map: np.ndarray
the μ_map
recon: np.ndarray, optional
reconstruction of which the same slice is discarded so that the alignment stays the same
Returns
-------
np.ndarray
the μ_map (and reconstruction) with according slices removed
"""
_res = μ_map
......
"""
Functionality to split a dataset created with the `prepare` script
into train/validation/test splits.
"""
import pandas as pd
from typing import Dict, List
def parse_split_str(_str: str, delimitier: str = "/") -> List[float]:
def parse_split_str(_str: str, delimiter: str = "/") -> List[float]:
"""
Parse a string into a list of proportions representing the split.
The string should have the format 70/15/15, where / can be replaced be a specified delimiter.
The numbers must add up to 100.
:param _str: the string to be parsed
:param delimitier: the delimiter used to split the provided string
:return: a list of floats representing the percentages of the split
Parameters
----------
_str: str
the string to be parsed
delimiter: str, optional
the delimiter used to split the provided string
Returns
-------
list of float:
list representing the percentages of each split
"""
split_as_str = _str.split(delimitier)
split_as_str = _str.split(delimiter)
split_as_int = list(map(int, split_as_str))
if sum(split_as_int) != 100:
......@@ -25,11 +37,19 @@ def parse_split_str(_str: str, delimitier: str = "/") -> List[float]:
def split_csv(data: pd.DataFrame, split_csv: str) -> Dict[str, pd.DataFrame]:
"""
Split a data frames based on a file defining a split.
:param data: the data frame to be split
:param split_csv: the filename of the csv file defining the split
:return: a list of sub-data frames forming the splits
Split a data frame based on a file defining a split.
Parameters
----------
data: pd.DataFrame
the data frame to be split
split_csv: str
the filename of the csv file defining the split
Returns
-------
Dict[str, pd.DataFrame]
a dict of sub-data frames forming the splits
"""
split_data = pd.read_csv(split_csv)
split_names = split_data["split"].unique()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment