Skip to content
Snippets Groups Projects
Commit 673dfc18 authored by Tamino Huxohl's avatar Tamino Huxohl
Browse files

document and update osem recon

parent 3858361b
No related branches found
No related tags found
No related merge requests found
......@@ -5,8 +5,8 @@ import tempfile
import numpy as np
import stir
from mu_map.file.util import load_as_interfile
from mu_map.file.interfile import (
Interfile,
parse_interfile_header_str,
load_interfile,
write_interfile,
......@@ -16,6 +16,9 @@ from mu_map.file.interfile import (
from mu_map.recon.filter import GaussianFilter
"""
Template for a STIR OSEM reconstruction configuration.
"""
TEMPLATE_RECON_PARAMS = """
OSMAPOSLParameters :=
......@@ -67,7 +70,14 @@ END :=
"""
def uniform_estimate(projection: Tuple[Dict[str, str], np.ndarray]):
def uniform_estimate(projection: Interfile) -> Interfile:
"""
Create a uniform estimate (image with all ones) for a projection.
This serves as the initial estimate for the EM reconstruction.
:param projection: the projection for which the uniform estimate is created
:return: an interfile image with all ones in a size matching the projection
"""
header_proj, image_proj = projection
image = np.ones(
......@@ -83,28 +93,48 @@ def uniform_estimate(projection: Tuple[Dict[str, str], np.ndarray]):
header = header.replace("{SPACING_Z}", header_proj[InterfileKeys.spacing(1)])
header = header.replace("{OFFSET_X}", f"{offset:.4f}")
header = header.replace("{OFFSET_Y}", f"{offset:.4f}")
header = header.replace(
"{PATIENT_ORIENTATION}", header_proj[InterfileKeys.patient_orientation]
)
header = header.replace(
"{PATIENT_ROTATION}", header_proj[InterfileKeys.patient_rotation]
)
header = parse_interfile_header_str(header)
return header, image
return Interfile(header, image)
def reconstruct(
projection: Tuple[Dict[str, str], np.ndarray],
mu_map: Optional[Tuple[Dict[str, str], np.ndarray]] = None,
mask: Optional[Tuple[Dict[str, str], np.ndarray]] = None,
init: Optional[Tuple[Dict[str, str], np.ndarray]] = None,
projection: Interfile,
mu_map: Optional[Interfile] = None,
mask: Optional[Interfile] = None,
init: Optional[Interfile] = None,
n_subsets: Optional[int] = 4,
n_iterations: Optional[int] = 10,
postfilter: Optional[GaussianFilter] = None,
**kwargs,
):
# sanitize parameters
"""
Perform OSEM reconstruction with STIR.
:param projection: the projection to reconstruct
:param mu_map: the attenuation map used for reconstruction
:param mask: a mask defining which voxels should be reconstructed
- if not given all voxels are reconstructed
- if an attenuation map is given only positive pixels in the
attenuation map are reconstructed
:param init: an initial estimate for the reconstruction which defaults to
all ones
:param n_subsets: number of subsets
:param n_iterations: number of iterations
:param postfilter: optional filter applied after the reconstruction
:returns: a reconstruction of the projection
"""
# sanitize parameters for STIR
n_subiterations = n_subsets * n_iterations
save_intervals = n_subiterations
dir_tmp = tempfile.TemporaryDirectory()
filename_projection = os.path.join(dir_tmp.name, "projection.hv")
write_interfile(filename_projection, *projection)
write_interfile(filename_projection, projection)
params = TEMPLATE_RECON_PARAMS.replace("{PROJECTION}", filename_projection)
output_prefix = os.path.join(dir_tmp.name, "out")
......@@ -114,18 +144,20 @@ def reconstruct(
params = params.replace("{N_SUBITERATIONS}", str(n_subiterations))
params = params.replace("{SAVE_INTERVALS}", str(save_intervals))
# prepare attenuation parameters
if mu_map is not None:
filename_mu_map = os.path.join(dir_tmp.name, "mu_map.hv")
write_interfile(filename_mu_map, *mu_map)
write_interfile(filename_mu_map, mu_map)
params = params.replace("{ATTENUATION_TYPE}", "Full")
params = params.replace("{ATTENUATION_MAP}", filename_mu_map)
else:
params = params.replace("{ATTENUATION_TYPE}", "No")
params = params.replace("attenuation map", ";attenuation map")
# prepare mask parameters
if mask is not None:
filename_mask = os.path.join(dir_tmp.name, "mask.hv")
write_interfile(filename_mask, *mask)
write_interfile(filename_mask, mask)
params = params.replace("{MASK_TYPE}", "Explicit Mask")
params = params.replace("{MASK_FILE}", filename_mask)
else:
......@@ -136,7 +168,7 @@ def reconstruct(
init = uniform_estimate(projection) if init is None else init
filename_init = os.path.join(dir_tmp.name, "init.hv")
write_interfile(filename_init, *init)
write_interfile(filename_init, init)
params = params.replace("{INIT_FILE}", filename_init)
if postfilter:
......@@ -155,11 +187,11 @@ def reconstruct(
if __name__ == "__main__":
import argparse
from mu_map.file import load_as_interfile
from mu_map.file.util import load_as_interfile
from mu_map.recon.project import forward_project
parser = argparse.ArgumentParser(
description="Reconstruct a projection or another reconstruction",
description="Reconstruct a projection or another reconstruction with attenuation correction",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
......@@ -168,25 +200,23 @@ if __name__ == "__main__":
parser.add_argument(
"--recon",
type=str,
help="a reconstruction in DICOM or INTERFILE format - if specified the projection will be overwritten",
help="a reconstruction in DICOM or Interfile format - if specified the projection will be overwritten",
)
parser.add_argument(
"--mu_map",
type=str,
help="a mu map for attenuation correction in DICOM or INTERFILE format",
help="a mu map for attenuation correction in DICOM or Interfile format",
)
parser.add_argument(
"--skip_mu_map_z_inv",
action="store_true",
help="per default mu maps need to be inverted in z direction for attenuation correction to work correctly - this parameter can skip this",
)
parser.add_argument(
"--out", type=str, help="the filename to store the reconstruction"
"--out",
type=str,
required=True,
help="the filename to store the reconstruction",
)
parser.add_argument(
"--n_subsets",
type=int,
default=4,
default=3,
help="the number of subsets for OSEM reconstruction",
)
parser.add_argument(
......@@ -208,7 +238,7 @@ if __name__ == "__main__":
help="the filter witdth for the postfilter is based on the spacing in the projection and can be modified with this factor",
)
parser.add_argument(
"-v", "--verbosity", type=int, default=0, help="configure the verbosity of STIR"
"-v", "--verbosity", type=int, default=1, help="configure the verbosity of STIR"
)
args = parser.parse_args()
......@@ -218,13 +248,11 @@ if __name__ == "__main__":
stir.Verbosity_set(args.verbosity)
mu_map = load_as_interfile(args.mu_map) if args.mu_map else None
if not args.skip_mu_map_z_inv:
mu_map = (mu_map[0], mu_map[1][::-1])
mu_map_slices = None if mu_map is None else mu_map[1].shape[0]
mu_map_slices = None if mu_map is None else mu_map.image.shape[0]
if args.recon:
recon = load_as_interfile(args.recon)
projection = forward_project(*recon, n_slices=mu_map_slices)
projection = forward_project(recon, n_slices=mu_map_slices)
else:
projection = load_as_interfile(args.projection)
......@@ -232,15 +260,16 @@ if __name__ == "__main__":
if args.postfilter == "gaussian":
postfilter = GaussianFilter(projection, args.postfilter_width)
kwargs = vars(args)
del kwargs["mu_map"]
del kwargs["projection"]
del kwargs["postfilter"]
header, image = reconstruct(
projection, mu_map=mu_map, postfilter=postfilter, **kwargs
recon_header, recon_image = reconstruct(
projection,
mu_map=mu_map,
n_subsets=args.n_subsets,
n_iterations=args.n_iterations,
postfilter=postfilter,
)
image = image[
recon_image = recon_image[
:, :, ::-1
] # STIR creates reconstructions flipped on the x-axes, this is reverted here
write_interfile(args.out, header, image)
recon = Interfile(recon_header, recon_image)
write_interfile(args.out, recon)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment