Source code for mdreg.fit_models

import os
from typing import Union, Tuple

from tqdm import tqdm
import numpy as np
import zarr
import dask
import dask.array as da
from scipy.integrate import cumulative_trapezoid
from scipy.optimize import curve_fit
from sklearn.decomposition import PCA

from mdreg import pixel_models, io


def _func_init(xdata, ydata, p0):
    return p0


def _fit_func(func, func_init, xdata, ydata, p0, bounds, **kwargs):

    p0 = func_init(xdata, ydata, p0)

    for i, p in enumerate(p0):
        if np.isscalar(bounds[0]):
            if p<bounds[0]:
                raise ValueError(f"Initial value {p} for parameter {i} is "
                                 f"below the lower bound {bounds[0]}.")
        else:
            if p<bounds[0][i]:
                raise ValueError(f"Initial value {p} for parameter {i} is "
                                 f"below the lower bound {bounds[0][i]}.")
        if np.isscalar(bounds[1]):
            if p>bounds[1]:
                raise ValueError(f"Initial value {p} for parameter {i} is "
                                 f"above the upper bound {bounds[1]}.")
        else:
            if p>bounds[1][i]:
                raise ValueError(f"Initial value {p} for parameter {i} is "
                                 f"above the upper bound {bounds[1][i]}.")
            
    try:
        p, _ = curve_fit(func, 
            xdata = xdata, 
            ydata = ydata, 
            p0 = p0, 
            bounds = bounds, 
            **kwargs, 
        )
        return p
    except RuntimeError:
        return p0
    

[docs] def fit_pixels(ydata, model = None, p0 = None, xdata = None, func_init = _func_init, parallel = True, progress_bar = True, path = None, bounds = (-np.inf, +np.inf), memdim = 2, **kwargs, ): """ Fit a model pixel-wise Parameters ---------- ydata : numpy.ndarray | zarr.Array 2D or 3D array of signal intensities model : function Model function to fit to the data (required). Model functions need to have one argument *xdata* followed by the free parameters of the model. The return value is the signal at each value of *xdata*. p0 : array Initial guess for the model parameters (required) xdata : array-like Independent variables for the model. If this is not provided, an index array is used. func_init : function Function to initialize the model parameters. It must take *xdata* and *ydata* as arguments, followed by *p0* (see below). It returns estimates for the free model parameters as a tuple. parallel : bool Option to perform fitting in parallel. Default is True. progress_bar : bool Option to display a progress bar. This option is ignored when parallel = True. Default is True. path : str, optional Path on disk where to save the results. If no path is provided, the results are not saved to disk. Defaults to None. memdim : int Number of array dimensions to be held in memory at any one time. Possible values for memdim range from 0 (pixel-by-pixel processing) to ydata.ndim-1 (process the whole array at once). With memdim=1, data are loaded and processed row-by-row, with memdim=2 the are processed slice-by-slice, and so on. Default is 2. bounds : tuple Bounds for the model parameters, in the format required by scipy.curve_fit. Note the initial value p0 needs to be contained in the bounds. **kwargs : Any additional arguments accepted by scipy.curve_fit(). Returns ------- fit : numpy.ndarray | zarr.Array The fitted signal model with the same dimensions as *arr*. pars : numpy.ndarray | zarr.Array The parameters of the fitted signal model with dimensions (x,y,n) or (x,y,z,n), where n is the number of free parameters of the signal model. """ # Check parameters if model is None: raise ValueError('model is a required argument') if p0 is None: raise ValueError('p0 is a required argument') if xdata is None: xdata = np.arange(ydata.shape[-1]) if isinstance(ydata, zarr.Array): fit, par = _fit_pixels_zarr( ydata, model, xdata, func_init, parallel, progress_bar, bounds, p0, path, memdim, **kwargs) else: fit, par = _fit_pixels_numpy( ydata, model, xdata, func_init, parallel, progress_bar, bounds, p0, **kwargs) if path is not None: np.save(os.path.join(path, 'fit'), fit) np.save(os.path.join(path, 'pars'), par) return fit, par
def _fit_pixels_numpy( ydata, model, xdata, func_init, parallel, progress_bar, bounds, p0, **kwargs): shape = ydata.shape ydata = ydata.reshape((-1,shape[-1])) nx, nt = ydata.shape if not parallel: p = [] for x in tqdm( range(nx), desc='Fitting pixels', disable=not progress_bar, ): p_x = _fit_func( model, func_init, xdata, ydata[x,:], p0, bounds, **kwargs, ) p.append(p_x) else: tasks = [] for x in range(nx): task_x = dask.delayed(_fit_func)( model, func_init, xdata, ydata[x,:], p0, bounds, **kwargs, ) tasks.append(task_x) p = dask.compute(*tasks) # Compute output arrays n = len(p[0]) par = np.empty((nx, n)) fit = np.empty((nx, nt)) for x in range(nx): par[x,:] = p[x] fit[x,:] = model(xdata, *tuple(p[x])) fit = fit.reshape(shape) par = par.reshape(shape[:-1]+(n,)) return fit, par def _fit_pixels_zarr( ydata, model, xdata, func_init, parallel, progress_bar, bounds, p0, path, memdim, **kwargs): if memdim is None: memdim = ydata.ndim-1 # Dimension of data in memory if memdim < 0: raise ValueError("memdim cannot be negative.") if memdim > ydata.ndim-1: raise ValueError("memdim cannot be larger than ydata.ndim-1.") # Get the shape and number of slice dimensions shape = ydata.shape[memdim:-1] n = int(np.prod(shape)) # All indices in slice and time dimensions p = tuple([slice(None) for _ in range(memdim)]) t = (slice(None), ) for k in tqdm( range(n), desc='Fitting zarray', disable=(not progress_bar) or (n==1), ): # Convert flat index to multi-index z = np.unravel_index(k, shape) # Load slice z into memory ydata_k = ydata[p + z + t] # Fit in memory fit_k, par_k = _fit_pixels_numpy( ydata_k, model, xdata, func_init, parallel, progress_bar=progress_bar and (n==1), bounds=bounds, p0=p0, **kwargs) # If this is the first slice, create the zarrays if k==0: fit, par = io._fit_models_init(ydata, path, par_k.shape[-1]) # Save results for slize z in the zarray fit[p + z + t] = fit_k par[p + z + t] = par_k return fit, par
[docs] def fit_deconvolution(signals: np.ndarray, aif=None, tol=0.2, n0=1): """Fits DCE signals with a model-free deconvolution Args: signals (np.ndarray): Input array aif (np.ndarray, optional): Arterial input signal (1D). Defaults to None. tol (float, optional): Cut-off value for the singular values. Defaults to 0.2. n0 (int, optional): Baseline length. Defaults to 1. Returns: tuple: reconstructed signals, None """ shape = signals.shape signals = signals.reshape(-1, shape[-1]) # Build signal change ca = aif - np.mean(aif[:n0]) S0 = np.mean(signals[...,:n0], axis=-1)[..., None] conc = signals - S0 # Build matrix n = len(ca) mat = np.zeros((n,n)) for i in range(0,n): for j in range(0,i+1): mat[i,j] = ca[i-j] # Invert matrix U, s, Vt = np.linalg.svd(mat, full_matrices=False) svmin = tol*np.amax(s) s_inv = np.array([1/x if x > svmin else 0 for x in s]) mat_inv = np.dot(Vt.T * s_inv, U.T) # Apply matrices conc_rec = (mat @ mat_inv) @ conc.T signal_rec = conc_rec.T + S0 return signal_rec.reshape(shape), None
[docs] def fit_pca(data_4d, n_components=None): """ Performs Principal Component Analysis (PCA) on a 4D dataset (3D spatial + 1D time). The function reshapes the 4D array into a 2D matrix where each row represents the time series of a single voxel. PCA is then applied to this matrix to identify the principal components of the temporal variations. Args: data_4d (np.ndarray): The input 4D array with shape (X, Y, Z, T), where T is the time dimension. n_components (int, optional): The number of principal components to keep. If None, all components are kept. Defaults to None. Returns: tuple: A tuple containing: - components (np.ndarray): The principal components (eigen-curves) of the time series. Shape: (n_components, T). - spatial_maps (np.ndarray): The 3D spatial weights (scores) for each component. Shape: (X, Y, Z, n_components). - explained_variance (np.ndarray): The amount of variance explained by each component. """ # --- 2. Reshape the 4D data to 2D --- # The new shape will be (number_of_voxels, time_points) # This is the format required by scikit-learn's PCA reshaped_data = data_4d.reshape(-1, data_4d.shape[-1]) # --- 3. Perform PCA --- pca = PCA(n_components=n_components) # fit_transform calculates the principal components and projects the data onto them scores = pca.fit_transform(reshaped_data) # The principal components are the "eigen-curves" components = pca.components_ # --- 4. Reshape the scores back to 3D spatial maps --- # This gives us a 3D map for each component, showing its spatial distribution num_actual_components = components.shape[0] spatial_maps = scores.reshape(data_4d.shape[:-1] + (num_actual_components, )) pca_fit = _reconstruct_from_pca(spatial_maps, components) return pca_fit, spatial_maps
def _reconstruct_from_pca(spatial_maps, components): """ Reconstructs the 4D signal from its PCA components and spatial maps. This is the inverse operation of the PCA decomposition. It performs a matrix multiplication of the scores (spatial maps) and the components to rebuild the time series for each voxel. Args: spatial_maps (np.ndarray): The 3D spatial weights (scores) for each component. Shape: (X, Y, Z, n_components). components (np.ndarray): The principal components (eigen-curves). Shape: (n_components, T). Returns: np.ndarray: The reconstructed 4D data array. Shape: (X, Y, Z, T). """ # Get original dimensions t_dim = components.shape[1] # Reshape spatial maps from (X, Y, Z, n_components) to (X*Y*Z, n_components) scores = spatial_maps.reshape(-1, spatial_maps.shape[-1]) # Reconstruct the 2D data matrix by matrix multiplication # (N_voxels, n_components) @ (n_components, T) -> (N_voxels, T) reconstructed_2d = scores @ components # Reshape the 2D data back to the original 4D shape reconstructed_4d = reconstructed_2d.reshape(spatial_maps.shape[:-1] + (t_dim, )) return reconstructed_4d
[docs] def fit_constant(signal: Union[np.ndarray, zarr.Array]): r""" Fit to a constant. .. math:: S(\mathbf{r},t) = S_0(\mathbf{r}) Parameters ---------- signal : numpy.ndarray | zarr.Array 3D or 4D array with signal intensities. Dimensions are (x, y, t) or (x, y, z, t). Returns ------- fit : numpy.ndarray | zarr.Array Fit to the signal data, with same dimensions as the signal array. pars : numpy.ndarray | zarr.Array Fitted model parameter S0. Dimensions are (x,y,1) or (x,y,z,1). """ if isinstance(signal, zarr.Array): signal = da.from_zarr(signal) else: signal = da.from_array(signal) shape = signal.shape avr = da.mean(signal, axis=-1) par = da.reshape(avr, shape[:-1] + (1,)) par.compute() fit = da.repeat(par, repeats=shape[-1], axis=-1) fit.compute() return fit, par
[docs] def fit_exp_decay( signal, time=None, parallel=True, bounds=([0,0], [np.inf, np.inf]), p0=[1,1], **kwargs): r""" Fit to an exponential decay. .. math:: S(\mathbf{r},t) = S_0(\mathbf{r}) e^{-t/T(\mathbf{r})} Parameters ---------- signal : numpy.ndarray | zarr.Array 3D or 4D array with signal intensities. Dimensions are (x, y, t) or (x, y, z, t). time : numpy.ndarray Timepoints of the signal data parallel : bool If True, use parallel processing. Default is False bounds : tuple Bounds for the fit as (lower_bound, upper_bound) where lower_bound and upper_bound are either a scalar or a list of 2 values. p0 : list Initial values as a 2-element list. **kwargs : Additional keyword arguments accepted by fit_pixels. Returns ------- fit : numpy.ndarray | zarr.Array Fitted to the signal data, with same dimensions as the signal array. pars : numpy.ndarray | zarr.Array Fitted model parameters S0 and T. Dimensions are (x,y,2) or (x,y,z,2). """ if time is None: raise ValueError('time is a required argument.') return fit_pixels(signal, model = pixel_models.exp_decay, xdata = np.array(time), func_init = pixel_models.exp_decay_init, parallel = parallel, bounds = bounds, p0 = p0, **kwargs, )
[docs] def fit_abs_exp_recovery_2p( signal, TI=None, parallel=True, bounds=([0,0], [np.inf, np.inf]), p0=[1,1.3], **kwargs, ): r""" 2-parameter fit to an absolute exponential-recovery model fit .. math:: S(\mathbf{r},T_I) = \left| S_0(\mathbf{r}) \left( 1 - 2 e^{-T_I/T(\mathbf{r})} \right) \right| Parameters ---------- signal : numpy.ndarray | zarr.Array 3D or 4D array with signal intensities. Dimensions are (x, y, t) or (x, y, z, t). TI : numpy.array Inversion times parallel : bool If True, use parallel processing. Default is False. bounds : tuple Bounds for the fit as (lower_bound, upper_bound) where lower_bound and upper_bound are either a scalar or a list of 2 values. p0 : list Initial values as a 2-element list. **kwargs : Additional keyword arguments accepted by fit_pixels. Returns ------- fit : numpy.ndarray | zarr.Array Fit to the signal data, with same dimensions as the signal array. pars : numpy.ndarray | zarr.Array Fitted model parameters S0 and T. Dimensions are (x,y,2) or (x,y,z,2). """ if TI is None: raise ValueError('TI is a required parameter.') return fit_pixels(signal, model=pixel_models.abs_exp_recovery_2p, xdata = np.array(TI), func_init = pixel_models.abs_exp_recovery_2p_init, bounds = bounds, p0 = p0, parallel = parallel, **kwargs, )
[docs] def fit_exp_recovery_2p(signal, TI=None, parallel=True, bounds=([0,0], [np.inf, np.inf]), p0=[1,1.3], **kwargs): r""" 2-parameter fit to an exponential recovery model .. math:: S(\mathbf{r},T_I) = S_0(\mathbf{r}) \left( 1 - 2 e^{-T_I/T(\mathbf{r})} \right) Parameters ---------- signal : numpy.ndarray | zarr.Array 3D or 4D array with signal intensities. Dimensions are (x, y, t) or (x, y, z, t). TI : numpy.array Inversion times parallel : bool If True, use parallel processing. Default is False. bounds : tuple Bounds for the fit as (lower_bound, upper_bound) where lower_bound and upper_bound are either a scalar or a list of 2 values. p0 : list Initial values as a 2-element list. **kwargs : Additional keyword arguments accepted by fit_pixels. Returns ------- fit : numpy.ndarray | zarr.Array Fit to the signal data, with same dimensions as the signal array. pars : numpy.ndarray | zarr.Array Fitted model parameters S0 and T. Dimensions are (x,y,2) or (x,y,z,2). """ if TI is None: raise ValueError('TI is a required parameter.') return fit_pixels(signal, model=pixel_models.exp_recovery_2p, xdata=np.array(TI), func_init=pixel_models.exp_recovery_2p_init, parallel=parallel, bounds=bounds, p0=p0, **kwargs, )
[docs] def fit_abs_exp_recovery_3p(signal, TI=None, parallel=True, bounds=([0,0,0], [np.inf, np.inf, 2]), p0=[1, 1.3, 2], **kwargs): r""" 2-parameter fit to an absolute exponential-recovery model fit. .. math:: S(\mathbf{r},T_I) = \left| S_0(\mathbf{r}) \left( 1 - A(\mathbf{r}) e^{-T_I/T(\mathbf{r})} \right) \right| Parameters ---------- signal : numpy.ndarray | zarr.Array 3D or 4D array with signal intensities. Dimensions are (x, y, t) or (x, y, z, t). TI : numpy.array Inversion times parallel : bool If True, use parallel processing. Default is False. bounds : tuple Bounds for the fit as (lower_bound, upper_bound) where lower_bound and upper_bound are either a scalar or a list of 3 values. p0 : list Initial values as a 3-element list. **kwargs : Additional keyword arguments accepted by fit_pixels. Returns ------- fit : numpy.ndarray | zarr.Array Fit to the signal data, with same dimensions as the signal array. pars : numpy.ndarray | zarr.Array Fitted model parameters S0, T and A. Dimensions are (x,y,3) or (x,y,z,3). """ if TI is None: raise ValueError('TI is a required parameter.') return fit_pixels(signal, model=pixel_models.abs_exp_recovery_3p, xdata = np.array(TI), func_init = pixel_models.abs_exp_recovery_3p_init, bounds = bounds, p0 = p0, parallel = parallel, **kwargs, )
[docs] def fit_exp_recovery_3p(signal, TI=None, parallel=True, bounds=([0,0,0],[np.inf, np.inf, 2]), p0=[1,1.3,2], **kwargs, ): r""" 3-parameter fit to an exponential-recovery model fit. .. math:: S(\mathbf{r},T_I) = S_0(\mathbf{r}) \left( 1 - A(\mathbf{r}) e^{-T_I/T(\mathbf{r})} \right) Parameters ---------- signal : numpy.ndarray | zarr.Array 3D or 4D array with signal intensities. Dimensions are (x, y, t) or (x, y, z, t). TI : numpy.array Inversion times parallel : bool If True, use parallel processing. Default is False. bounds : tuple Bounds for the fit as (lower_bound, upper_bound) where lower_bound and upper_bound are either a scalar or a list of 3 values. p0 : list Initial values as a 3-element list. **kwargs : Additional keyword arguments accepted by fit_pixels. Returns ------- fit : numpy.ndarray | zarr.Array Fit to the signal data, with same dimensions as the signal array. pars : numpy.ndarray | zarr.Array Fitted model parameters S0, T and A. Dimensions are (x,y,3) or (x,y,z,3). """ if TI is None: raise ValueError('TI is a required parameter.') return fit_pixels(signal, model=pixel_models.exp_recovery_3p, xdata = np.array(TI), func_init = pixel_models.exp_recovery_3p_init, parallel = parallel, bounds = bounds, p0 = p0, **kwargs, )
[docs] def fit_spgr_vfa(signal, FA=None, parallel=True, bounds=([0, 0], [np.inf, 1]), p0=[1, 0.5], **kwargs, ): r""" Non-linear fit to a variable flip angle model. .. math:: S(\mathbf{r},\alpha) = S_0(\mathbf{r}) \sin(\alpha) \frac{1 - e^{-T_R/T_1(\mathbf{r})}}{1 - \cos(\alpha) e^{-T_R/T_1(\mathbf{r})}} Parameters ---------- signal : numpy.ndarray | zarr.Array Array with signal intensities for different flip angles (FA). Dimensions are (x,y,FA) or (x,y,z,FA) FA : array-like Flip angles in degrees (required). This is a 1D array with the same length as the last dimension of the signal array. Defaults to None. parallel : bool If True, use parallel processing. Default is False. bounds : tuple Bounds for the fit as (lower_bound, upper_bound) where lower_bound and upper_bound are either a scalar or a list of 3 values. p0 : list Initial values as a 3-element list. **kwargs : Additional keyword arguments accepted by fit_pixels. Returns ------- fit : numpy.ndarray | zarr.Array Fit to the signal data, with same dimensions as the signal array. pars : numpy.ndarray | zarr.Array Fitted model parameters S0 and E. Dimensions are (x,y,2) or (x,y,z,2). """ if FA is None: raise ValueError('Flip angle (FA) is a required parameter.') return fit_pixels(signal, model=pixel_models.spgr_vfa, xdata=FA, func_init=pixel_models.spgr_vfa_init, parallel=parallel, bounds=bounds, p0=p0, **kwargs, )
[docs] def fit_spgr_vfa_lin( signal:np.ndarray, FA=None, path=None, memdim=2, parallel=True, progress_bar=False, ): r""" Linear fit to a variable flip angle model. .. math:: S(\mathbf{r},\alpha)=S_0(\mathbf{r})\sin{\alpha} \frac{1-e^{-T_R/T_1(\mathbf{r})}}{1-\cos{\alpha}\,e^{-T_R/T_1(\mathbf{r})}} Here :math:`S(\alpha)` is the signal at flip angle :math:`\alpha`, :math:`S_0` a scaling factor, :math:`T_R` the repetition time and :math:`T_1` the longitudinal relaxation time. Parameters ---------- signal : numpy.ndarray | zarr.Array Array with signal intensities for different flip angles (FA). Dimensions are (x,y,FA) or (x,y,z,FA) FA : array Flip Angles in degrees (required). This is a 1D array with the same length as the last dimension of the signal array. Defaults to None. path : str, optional Path on disk where to save the results. If no path is provided, the results are not saved to disk. Defaults to None. memdim : int For zarrays, the number of array dimensions to be held in memory at any one time. This keyword is ignored when the argument is a numpy array. Possible values for memdim range from 0 (pixel-by-pixel processing) to ydata.ndim-1 (process the whole array at once). With memdim=1, data are loaded and processed row-by-row, with memdim=2 the are processed slice-by-slice, and so on. Default is 2. parallel : bool Option to perform fitting in parallel. This is only available for zarrays when memdim is provided. progress_bar: bool Display a progress bar (default = False). Returns ------- fit : numpy.ndarray | zarr.Array Fit to the signal data, with same dimensions as the signal array. pars : numpy.ndarray | zarr.Array Fitted model parameters S0 and E. Dimensions are (x,y,2) or (x,y,z,2). Notes: To derive the linear version of the model, the equation can be rewritten as: .. math:: Y(\alpha) = AX(\alpha)+B with the variables defined as: .. math:: X = S(\alpha/\sin{\alpha};~~~~Y=S(\alpha)\cos{\alpha} / \sin{\alpha} and the constants: .. math:: E=e^{-T_R/T_1};~~~~A=\frac{1}{E};~~~~B=-S_0\frac{1-E}{E}~ Plotting :math:`Y(\alpha)` against :math:`X(\alpha)` produces a straight line with slope :math:`A` and intercept :math:`B`. After solving for :math:`A, B` these constants can then be used reconstruct the signal: .. math:: S(\alpha)=\frac{B\sin{\alpha}}{\cos{\alpha}-A} """ if isinstance(signal, zarr.Array): raise ValueError("fit_spgr_vfa_lin is not yet available for zarrays.") if FA is None: raise ValueError('Flip angle (FA) is a required parameter.') if isinstance(signal, np.ndarray): fit, par = _fit_spgr_vfa_lin_compute(signal, FA, progress_bar) if path is not None: np.save(os.path.join(path, 'fit'), fit) np.save(os.path.join(path, 'pars'), par) return fit, par if memdim is None: memdim = signal.ndim-1 # Dimension of data in memory if memdim < 0: raise ValueError("memdim cannot be negative.") if memdim > signal.ndim-1: raise ValueError("memdim cannot be larger than signal.ndim-1.") # Build stores for outputs fit, par = io._fit_models_init(signal, path, 2) # Get the shape and number of slice dimensions shape = signal.shape[memdim:-1] n = int(np.prod(shape)) # All indices in slice dimensions p = tuple([slice(None) for _ in range(memdim)]) if parallel: pbar = False tasks = [ dask.delayed(_fit_spgr_vfa_lin_slice)( k, signal, shape, p, FA, fit, par, pbar, ) for k in range(n) ] dask.compute(*tasks) else: for k in tqdm( range(n), desc='Fitting vfa', disable=(not progress_bar) or (n==1), ): pbar = progress_bar and (n==1) _fit_spgr_vfa_lin_slice( k, signal, shape, p, FA, fit, par, pbar, ) return fit, par
def _fit_spgr_vfa_lin_slice( k, signal, shape, p, FA, fit, par, progress_bar): # Convert flat index to multi-index z = np.unravel_index(k, shape) # Load all values for slice z into memory t = (slice(None), ) signal_k = signal[p + z + t] # Compute fit_k, par_k = _fit_spgr_vfa_lin_compute(signal_k, FA, progress_bar) # Save results for slize z in the zarray fit[p + z + t] = fit_k par[p + z + t] = par_k def _fit_spgr_vfa_lin_compute(signal, FA, progress_bar): FA = np.deg2rad(FA) # Construct FA array in matching shape to signal data FA_array = np.ones_like(signal)*FA sFA_array = np.sin(FA_array) X = signal / sFA_array Y = signal * np.cos(FA_array) / sFA_array shape = signal.shape X = X.reshape(-1, shape[-1]) Y = Y.reshape(-1, shape[-1]) signal = signal.reshape(-1, shape[-1]) pars = np.empty((X.shape[0], 2)) fit = np.empty(X.shape) sFA, cFA = np.sin(FA), np.cos(FA) ones = np.ones(shape[-1]) for i in tqdm( range(X.shape[0]), desc='Fitting VFA model', disable=not progress_bar, ): A = np.vstack([X[i,:], ones]) pars[i,:] = np.linalg.lstsq(A.T, Y[i,:], rcond=None)[0] if 0 in (cFA - pars[i,0]): fit[i,:] = signal[i,:] else: fit[i,:] = pars[i,1] * sFA / (cFA - pars[i,0]) smax = np.amax(signal[i,:]) fit[i,:][fit[i,:] > smax] = smax fit[fit<0] = 0 fit[np.isnan(fit)] = 0 # Convert to T1 and S0 A = pars[:,0] B = pars[:,1] S0 = -B/(A-1) E = 1/A pars[:,0] = S0 pars[:,1] = E return fit.reshape(shape), pars.reshape(shape[:-1] + (2,))
[docs] def fit_2cm_lin( signal: Union[np.ndarray, zarr.Array], aif=None, time=None, baseline=1, path=None, memdim=2, parallel=False, progress_bar=True, input_corr=False, ) -> Tuple[Union[np.ndarray, zarr.Array], Union[np.ndarray, zarr.Array]]: """ Linearised 2-compartment model fit Parameters ---------- signal : numpy.ndarray | zarr.Array Array with signal intensities at different times. Dimensions are (x,y,t) or (x,y,z,t) aif : numpy.ndarray Arterial input function. 1D array of input artery signal intensities, length equal to the number of time points in the signal data. time : numpy.ndarray Timepoints of the signal data baseline : int Baseline. Number of time points to use for the baseline signal. Default is 1. path : str, optional Path on disk where to save the results. If no path is provided, the results are not saved to disk. Defaults to None. memdim : int For zarrays, the number of array dimensions to be held in memory at any one time. This keyword is ignored when the argument is a numpy array. Possible values for memdim range from 0 (pixel-by-pixel processing) to ydata.ndim-1 (process the whole array at once). With memdim=1, data are loaded and processed row-by-row, with memdim=2 the are processed slice-by-slice, and so on. Default is 2. parallel : bool Option to perform fitting in parallel. This is only available for zarrays when memdim is provided. progress_bar : bool Set to True to display a progress bar during the computations. This is ignored if parallel=True. Returns ------- fit : numpy.ndarray | zarr.Array Fit to the signal data with same dimensions as the signal array. pars : numpy.ndarray | zarr.Array Fitted model parameters Fb, Tb, PS, Te. Dimensions are (x,y,4) or (x,y,z,4). """ # Check arguments if aif is None: raise ValueError('aif is a required parameter.') if time is None: raise ValueError('Time is a required parameter.') if parallel: if progress_bar: raise ValueError( "A progress bar cannot be shown when parallel=True. " "Set parallel=False or progress_bar=False. " ) if isinstance(signal, np.ndarray): fit, par = _fit_2cm_lin_compute( signal, aif, time, baseline, input_corr, progress_bar, ) if path is not None: np.save(os.path.join(path, 'fit'), fit) np.save(os.path.join(path, 'pars'), par) return fit, par if memdim is None: memdim = signal.ndim-1 # Dimension of data in memory if memdim < 0: raise ValueError("memdim cannot be negative.") if memdim > signal.ndim-1: raise ValueError("memdim cannot be larger than signal.ndim-1.") # Build stores for outputs npar = 5 if input_corr else 4 fit, par = io._fit_models_init(signal, path, npar) # Get the shape and number of slice dimensions shape = signal.shape[memdim:-1] n = int(np.prod(shape)) # All indices in slice dimensions p = tuple([slice(None) for _ in range(memdim)]) if parallel: pbar = False tasks = [ dask.delayed(_fit_2cm_lin_slice)( k, signal, shape, p, aif, time, baseline, input_corr, fit, par, pbar, ) for k in range(n) ] dask.compute(*tasks) else: for k in tqdm( range(n), desc='Fitting 2cm', disable=(not progress_bar) or (n==1), ): pbar = progress_bar and (n==1) _fit_2cm_lin_slice( k, signal, shape, p, aif, time, baseline, input_corr, fit, par, pbar, ) return fit, par
def _fit_2cm_lin_slice(k, signal, shape, p, aif, time, baseline, input_corr, fit, par, progress_bar): # Convert flat index to multi-index z = np.unravel_index(k, shape) # Load all values for slice z into memory t = (slice(None), ) signal_k = signal[p + z + t] # Compute fit_k, par_k = _fit_2cm_lin_compute( signal_k, aif, time, baseline, input_corr, progress_bar, ) # Save results for slize z in the zarray fit[p + z + t] = fit_k par[p + z + t] = par_k def _fit_2cm_lin_compute(signal, aif, time, baseline, input_corr, progress_bar): npar = 5 if input_corr else 4 # Reshape to 2D (x,t) shape = signal.shape signal = signal.reshape((-1,shape[-1])) S0 = np.mean(signal[:,:baseline], axis=1) ca = aif-np.mean(aif[:baseline]) A = np.empty((signal.shape[1],npar)) A[:,2], A[:,3] = _ddint(ca, time) if input_corr: A[:,4] = ca fit = np.empty(signal.shape) par = np.empty((signal.shape[0], npar)) for x in tqdm( range(signal.shape[0]), desc='Fitting 2-comp model', disable=not progress_bar, ): c = signal[x,:] - S0[x] ctii, cti = _ddint(c, time) A[:,0] = -ctii A[:,1] = -cti p = np.linalg.lstsq(A, c, rcond=None)[0] fit[x,:] = S0[x] + p[0]*A[:,0] + p[1]*A[:,1] + p[2]*A[:,2] + p[3]*A[:,3] if input_corr: fit[x,:] += p[4]*A[:,4] if input_corr: par[x,:] = _2cm_lin_params(p[:4]) + [p[4]] else: par[x,:] = _2cm_lin_params(p) # Apply bounds smax = np.amax(signal) fit[fit<0]=0 fit[fit>2*smax]=2*smax # Return in original shape fit = fit.reshape(shape) par = par.reshape(shape[:-1] + (npar,)) return fit, par def _ddint(c, t): ci = cumulative_trapezoid(c, t, initial=0) cii = cumulative_trapezoid(ci, t, initial=0) return cii, ci def _2cm_lin_params(X): alpha = X[0] beta = X[1] gamma = X[2] Fp = X[3] if alpha == 0: if beta == 0: return [Fp, 0, 0, 0] else: return [Fp, 1/beta, 0, 0] nom = 2*alpha det = beta**2 - 4*alpha if det < 0 : Tp = beta/nom Te = Tp else: root = np.sqrt(det) Tp = (beta - root)/nom Te = (beta + root)/nom if Te == 0: PS = 0 else: if Fp == 0: PS = 0 else: T = gamma/(alpha*Fp) PS = Fp*(T-Tp)/Te return [Fp, Tp, PS, Te]