diff --git a/mu_map/recon/osem.py b/mu_map/recon/osem.py index 52f2411c3a4c6e1c7988a7208011ad00dec6c147..894b87effd0c2345c977a4f7517e9b2d5f1ccf51 100644 --- a/mu_map/recon/osem.py +++ b/mu_map/recon/osem.py @@ -5,8 +5,8 @@ import tempfile import numpy as np import stir -from mu_map.file.util import load_as_interfile from mu_map.file.interfile import ( + Interfile, parse_interfile_header_str, load_interfile, write_interfile, @@ -16,6 +16,9 @@ from mu_map.file.interfile import ( from mu_map.recon.filter import GaussianFilter +""" +Template for a STIR OSEM reconstruction configuration. +""" TEMPLATE_RECON_PARAMS = """ OSMAPOSLParameters := @@ -67,7 +70,14 @@ END := """ -def uniform_estimate(projection: Tuple[Dict[str, str], np.ndarray]): +def uniform_estimate(projection: Interfile) -> Interfile: + """ + Create a uniform estimate (image with all ones) for a projection. + This serves as the initial estimate for the EM reconstruction. + + :param projection: the projection for which the uniform estimate is created + :return: an interfile image with all ones in a size matching the projection + """ header_proj, image_proj = projection image = np.ones( @@ -83,28 +93,48 @@ def uniform_estimate(projection: Tuple[Dict[str, str], np.ndarray]): header = header.replace("{SPACING_Z}", header_proj[InterfileKeys.spacing(1)]) header = header.replace("{OFFSET_X}", f"{offset:.4f}") header = header.replace("{OFFSET_Y}", f"{offset:.4f}") + header = header.replace( + "{PATIENT_ORIENTATION}", header_proj[InterfileKeys.patient_orientation] + ) + header = header.replace( + "{PATIENT_ROTATION}", header_proj[InterfileKeys.patient_rotation] + ) header = parse_interfile_header_str(header) - - return header, image + return Interfile(header, image) def reconstruct( - projection: Tuple[Dict[str, str], np.ndarray], - mu_map: Optional[Tuple[Dict[str, str], np.ndarray]] = None, - mask: Optional[Tuple[Dict[str, str], np.ndarray]] = None, - init: Optional[Tuple[Dict[str, str], np.ndarray]] = None, + projection: Interfile, + mu_map: Optional[Interfile] = None, + mask: Optional[Interfile] = None, + init: Optional[Interfile] = None, n_subsets: Optional[int] = 4, n_iterations: Optional[int] = 10, postfilter: Optional[GaussianFilter] = None, - **kwargs, ): - # sanitize parameters + """ + Perform OSEM reconstruction with STIR. + + :param projection: the projection to reconstruct + :param mu_map: the attenuation map used for reconstruction + :param mask: a mask defining which voxels should be reconstructed + - if not given all voxels are reconstructed + - if an attenuation map is given only positive pixels in the + attenuation map are reconstructed + :param init: an initial estimate for the reconstruction which defaults to + all ones + :param n_subsets: number of subsets + :param n_iterations: number of iterations + :param postfilter: optional filter applied after the reconstruction + :returns: a reconstruction of the projection + """ + # sanitize parameters for STIR n_subiterations = n_subsets * n_iterations save_intervals = n_subiterations dir_tmp = tempfile.TemporaryDirectory() filename_projection = os.path.join(dir_tmp.name, "projection.hv") - write_interfile(filename_projection, *projection) + write_interfile(filename_projection, projection) params = TEMPLATE_RECON_PARAMS.replace("{PROJECTION}", filename_projection) output_prefix = os.path.join(dir_tmp.name, "out") @@ -114,18 +144,20 @@ def reconstruct( params = params.replace("{N_SUBITERATIONS}", str(n_subiterations)) params = params.replace("{SAVE_INTERVALS}", str(save_intervals)) + # prepare attenuation parameters if mu_map is not None: filename_mu_map = os.path.join(dir_tmp.name, "mu_map.hv") - write_interfile(filename_mu_map, *mu_map) + write_interfile(filename_mu_map, mu_map) params = params.replace("{ATTENUATION_TYPE}", "Full") params = params.replace("{ATTENUATION_MAP}", filename_mu_map) else: params = params.replace("{ATTENUATION_TYPE}", "No") params = params.replace("attenuation map", ";attenuation map") + # prepare mask parameters if mask is not None: filename_mask = os.path.join(dir_tmp.name, "mask.hv") - write_interfile(filename_mask, *mask) + write_interfile(filename_mask, mask) params = params.replace("{MASK_TYPE}", "Explicit Mask") params = params.replace("{MASK_FILE}", filename_mask) else: @@ -136,7 +168,7 @@ def reconstruct( init = uniform_estimate(projection) if init is None else init filename_init = os.path.join(dir_tmp.name, "init.hv") - write_interfile(filename_init, *init) + write_interfile(filename_init, init) params = params.replace("{INIT_FILE}", filename_init) if postfilter: @@ -155,11 +187,11 @@ def reconstruct( if __name__ == "__main__": import argparse - from mu_map.file import load_as_interfile + from mu_map.file.util import load_as_interfile from mu_map.recon.project import forward_project parser = argparse.ArgumentParser( - description="Reconstruct a projection or another reconstruction", + description="Reconstruct a projection or another reconstruction with attenuation correction", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( @@ -168,25 +200,23 @@ if __name__ == "__main__": parser.add_argument( "--recon", type=str, - help="a reconstruction in DICOM or INTERFILE format - if specified the projection will be overwritten", + help="a reconstruction in DICOM or Interfile format - if specified the projection will be overwritten", ) parser.add_argument( "--mu_map", type=str, - help="a mu map for attenuation correction in DICOM or INTERFILE format", + help="a mu map for attenuation correction in DICOM or Interfile format", ) parser.add_argument( - "--skip_mu_map_z_inv", - action="store_true", - help="per default mu maps need to be inverted in z direction for attenuation correction to work correctly - this parameter can skip this", - ) - parser.add_argument( - "--out", type=str, help="the filename to store the reconstruction" + "--out", + type=str, + required=True, + help="the filename to store the reconstruction", ) parser.add_argument( "--n_subsets", type=int, - default=4, + default=3, help="the number of subsets for OSEM reconstruction", ) parser.add_argument( @@ -208,7 +238,7 @@ if __name__ == "__main__": help="the filter witdth for the postfilter is based on the spacing in the projection and can be modified with this factor", ) parser.add_argument( - "-v", "--verbosity", type=int, default=0, help="configure the verbosity of STIR" + "-v", "--verbosity", type=int, default=1, help="configure the verbosity of STIR" ) args = parser.parse_args() @@ -218,13 +248,11 @@ if __name__ == "__main__": stir.Verbosity_set(args.verbosity) mu_map = load_as_interfile(args.mu_map) if args.mu_map else None - if not args.skip_mu_map_z_inv: - mu_map = (mu_map[0], mu_map[1][::-1]) - mu_map_slices = None if mu_map is None else mu_map[1].shape[0] + mu_map_slices = None if mu_map is None else mu_map.image.shape[0] if args.recon: recon = load_as_interfile(args.recon) - projection = forward_project(*recon, n_slices=mu_map_slices) + projection = forward_project(recon, n_slices=mu_map_slices) else: projection = load_as_interfile(args.projection) @@ -232,15 +260,16 @@ if __name__ == "__main__": if args.postfilter == "gaussian": postfilter = GaussianFilter(projection, args.postfilter_width) - kwargs = vars(args) - del kwargs["mu_map"] - del kwargs["projection"] - del kwargs["postfilter"] - - header, image = reconstruct( - projection, mu_map=mu_map, postfilter=postfilter, **kwargs + recon_header, recon_image = reconstruct( + projection, + mu_map=mu_map, + n_subsets=args.n_subsets, + n_iterations=args.n_iterations, + postfilter=postfilter, ) - image = image[ + + recon_image = recon_image[ :, :, ::-1 ] # STIR creates reconstructions flipped on the x-axes, this is reverted here - write_interfile(args.out, header, image) + recon = Interfile(recon_header, recon_image) + write_interfile(args.out, recon)