#! /usr/bin/env python
"""
Full-frame PCA algorithm for ADI, (ADI+)RDI and (ADI+)mSDI (IFS data) cubes.
Options :
- *Full-frame PCA*, using the whole cube as the PCA reference library in the
case of ADI or ADI+mSDI (IFS cube), or a sequence of reference frames
(reference star) in the case of RDI. For ADI a big data matrix NxP, where N
is the number of frames and P the number of pixels in a frame is created. Then
PCA is done through eigen-decomposition of the covariance matrix (~$DD^T$) or
the SVD of the data matrix. SVD can be calculated using different libraries
including the fast randomized SVD.
- *Full-frame incremental PCA* for big (larger than available memory) cubes.
.. [AMA12]
| Amara & Quanz 2012
| **PYNPOINT: an image processing package for finding exoplanets**
| *MNRAS, Volume 427, Issue 1, pp. 948-955*
| `https://arxiv.org/abs/1207.6637
<https://arxiv.org/abs/1207.6637>`_
.. [CHR19]
| Christiaens et al. 2019
| **Separating extended disc features from the protoplanet in PDS 70 using
VLT/SINFONI**
| *MNRAS, Volume 486, Issue 4, pp. 5819-5837*
| `https://arxiv.org/abs/1905.01860
<https://arxiv.org/abs/1905.01860>`_
.. [HAL09]
| Halko et al. 2009
| **Finding structure with randomness: Probabilistic algorithms for
constructing approximate matrix decompositions**
| *arXiv e-prints*
| `https://arxiv.org/abs/0909.4061
<https://arxiv.org/abs/0909.4061>`_
.. [REN23]
| Ren 2023
| **Karhunen-Loève data imputation in high-contrast imaging**
| *Astronomy & Astrophysics, Volume 679, p. 8*
| `https://arxiv.org/abs/2308.16912
<https://arxiv.org/abs/2308.16912>`_
"""
__author__ = "C.A. Gomez Gonzalez, V. Christiaens, T. Bédrine"
__all__ = ["pca", "PCA_Params"]
import numpy as np
from multiprocessing import cpu_count
from typing import Tuple, Union, List
from dataclasses import dataclass
from enum import Enum
from .svd import svd_wrapper, SVDecomposer
from .utils_pca import pca_incremental, pca_grid
from ..config import (timing, time_ini, check_enough_memory, Progressbar,
check_array)
from ..config.paramenum import (
SvdMode,
Adimsdi,
Interpolation,
Imlib,
Collapse,
ALGO_KEY,
)
from ..config.utils_conf import pool_map, iterable
from ..config.utils_param import setup_parameters, separate_kwargs_dict
from ..preproc.derotation import _find_indices_adi, _compute_pa_thresh
from ..preproc import cube_rescaling_wavelengths as scwave
from ..preproc import (
cube_derotate,
cube_collapse,
cube_subtract_sky_pca,
check_pa_vector,
check_scal_vector,
cube_crop_frames,
)
from ..stats import descriptive_stats
from ..var import (
frame_center,
dist,
prepare_matrix,
reshape_matrix,
cube_filter_lowpass,
mask_circle,
)
[docs]
@dataclass
class PCA_Params:
"""
Set of parameters for the PCA module.
See function `pca` below for the documentation.
"""
cube: np.ndarray = None
angle_list: np.ndarray = None
cube_ref: np.ndarray = None
scale_list: np.ndarray = None
ncomp: Union[Tuple, List, float, int] = 1
svd_mode: Enum = SvdMode.LAPACK
scaling: Enum = None
mask_center_px: int = None
source_xy: Tuple[int] = None
delta_rot: int = None
fwhm: float = 4
adimsdi: Enum = Adimsdi.SINGLE
crop_ifs: bool = True
imlib: Enum = Imlib.VIPFFT
imlib2: Enum = Imlib.VIPFFT
interpolation: Enum = Interpolation.LANCZOS4
collapse: Enum = Collapse.MEDIAN
collapse_ifs: Enum = Collapse.MEAN
ifs_collapse_range: Union[str, Tuple[int]] = "all"
mask_rdi: np.ndarray = None
check_memory: bool = True
batch: Union[int, float] = None
nproc: int = 1
full_output: bool = False
verbose: bool = True
weights: np.ndarray = None
left_eigv: bool = False
min_frames_pca: int = 10
cube_sig: np.ndarray = None
[docs]
def pca(*all_args: List, **all_kwargs: dict):
"""Full-frame PCA algorithm applied to PSF substraction.
The reference PSF and the quasi-static speckle pattern are modeled using
Principal Component Analysis. Depending on the input parameters this PCA
function can work in ADI, RDI or mSDI (IFS data) mode.
ADI: the target ``cube`` itself is used to learn the PCs and to obtain a
low-rank approximation model PSF (star + speckles). Both `cube_ref`` and
``scale_list`` must be None. The full-frame ADI-PCA implementation is based
on [AMA12]_ and [SOU12]_. If ``batch`` is provided then the cube is
processed with incremental PCA as described in [GOM17]_.
(ADI+)RDI: if a reference cube is provided (``cube_ref``), its PCs are used
to reconstruct the target frames to obtain the model PSF (star + speckles).
(ADI+)mSDI (IFS data): if a scaling vector is provided (``scale_list``) and
the cube is a 4d array [# channels, # adi-frames, Y, X], it's assumed it
contains several multi-spectral frames acquired in pupil-stabilized mode.
A single or two stages PCA can be performed, depending on ``adimsdi``, as
explained in [CHR19]_.
Parameters
----------
all_args: list, optional
Positionnal arguments for the PCA algorithm. Full list of parameters
below.
all_kwargs: dictionary, optional
Mix of keyword arguments that can initialize a PCA_Params and the
optional 'rot_options' dictionnary (with keyword values ``border_mode``,
``mask_val``, ``edge_blend``, ``interp_zeros``, ``ker``; see docstring
of ``vip_hci.preproc.frame_rotate``). Can also contain a PCA_Params
dictionary named `algo_params`.
PCA parameters
----------
cube : str or numpy ndarray, 3d or 4d
Input cube (ADI or ADI+mSDI). If 4D, the first dimension should be
spectral. If a string is given, it must correspond to the path to the
fits file to be opened in memmap mode (incremental PCA-ADI of 3D cubes
only).
angle_list : numpy ndarray, 1d
Corresponding parallactic angle for each frame.
cube_ref : 3d or 4d numpy ndarray, or list of 3D numpy ndarray, optional
Reference library cube for Reference Star Differential Imaging. Should
be 3D, except if input cube is 4D and no scale_list is provided,
reference cube can then either be 4D or a list of 3D cubes (i.e.
providing the reference cube for each individual spectral cube).
scale_list : numpy ndarray, 1d, optional
If provided, triggers mSDI reduction. These should be the scaling
factors used to re-scale the spectral channels and align the speckles
in case of IFS data (ADI+mSDI cube). Usually, the
scaling factors are the last channel wavelength divided by the
other wavelengths in the cube (more thorough approaches can be used
to get the scaling factors, e.g. with
``vip_hci.preproc.find_scal_vector``).
ncomp : int, float, tuple of int/None, or list, optional
How many PCs are used as a lower-dimensional subspace to project the
target frames.
* ADI (``cube`` is a 3d array): if an int is provided, ``ncomp`` is the
number of PCs extracted from ``cube`` itself. If ``ncomp`` is a float
in the interval [0, 1] then it corresponds to the desired cumulative
explained variance ratio (the corresponding number of components is
estimated). If ``ncomp`` is a tuple of two integers, then it
corresponds to an interval of PCs in which final residual frames are
computed (optionally, if a tuple of 3 integers is passed, the third
value is the step). If ``ncomp`` is a list of int, these will be used to
calculate residual frames. When ``ncomp`` is a tuple or list, and
``source_xy`` is not None, then the S/Ns (mean value in a 1xFWHM
circular aperture) of the given (X,Y) coordinates are computed.
* ADI+RDI (``cube`` and ``cube_ref`` are 3d arrays): ``ncomp`` is the
number of PCs obtained from ``cube_ref``. If ``ncomp`` is a tuple,
then it corresponds to an interval of PCs (obtained from ``cube_ref``)
in which final residual frames are computed. If ``ncomp`` is a list of
int, these will be used to calculate residual frames. When ``ncomp`` is
a tuple or list, and ``source_xy`` is not None, then the S/Ns (mean
value in a 1xFWHM circular aperture) of the given (X,Y) coordinates are
computed.
* ADI or ADI+RDI (``cube`` is a 4d array): same input format allowed as
above. If ``ncomp`` is a list with the same length as the number of
channels, each element of the list will be used as ``ncomp`` value
(be it int, float or tuple) for each spectral channel. If not a
list or a list with a different length as the number of spectral
channels, these will be tested for all spectral channels respectively.
* ADI+mSDI (``cube`` is a 4d array and ``adimsdi="single"``): ``ncomp``
is the number of PCs obtained from the whole set of frames
(n_channels * n_adiframes). If ``ncomp`` is a float in the interval
(0, 1] then it corresponds to the desired CEVR, and the corresponding
number of components will be estimated. If ``ncomp`` is a tuple, then
it corresponds to an interval of PCs in which final residual frames
are computed. If ``ncomp`` is a list of int, these will be used to
calculate residual frames. When ``ncomp`` is a tuple or list, and
``source_xy`` is not None, then the S/Ns (mean value in a 1xFWHM
circular aperture) of the given (X,Y) coordinates are computed.
* ADI+mSDI (``cube`` is a 4d array and ``adimsdi="double"``): ``ncomp``
must be a tuple, where the first value is the number of PCs obtained
from each multi-spectral frame (if None then this stage will be
skipped and the spectral channels will be combined without
subtraction); the second value sets the number of PCs used in the
second PCA stage, ADI-like using the residuals of the first stage (if
None then the second PCA stage is skipped and the residuals are
de-rotated and combined).
svd_mode : Enum, see `vip_hci.config.paramenum.SvdMode`
Switch for the SVD method/library to be used.
scaling : Enum, see `vip_hci.config.paramenum.Scaling`
Pixel-wise scaling mode using ``sklearn.preprocessing.scale``
function. If set to None, the input matrix is left untouched.
mask_center_px : None or int
If None, no masking is done. If an integer > 1 then this value is the
radius of the circular mask.
source_xy : tuple of int, optional
For ADI-PCA, this triggers a frame rejection in the PCA library, with
``source_xy`` as the coordinates X,Y of the center of the annulus where
the PA criterion is estimated. When ``ncomp`` is a tuple, a PCA grid is
computed and the S/Ns (mean value in a 1xFWHM circular aperture) of the
given (X,Y) coordinates are computed.
delta_rot : int, optional
Factor for tuning the parallactic angle threshold, expressed in FWHM.
Default is 1 (excludes 1xFWHM on each side of the considered frame).
fwhm : float, list or 1d numpy array, optional
Known size of the FWHM in pixels to be used. Default value is 4.
Can be a list or 1d numpy array for a 4d input cube with no scale_list.
adimsdi : Enum, see `vip_hci.config.paramenum.Adimsdi`
Changes the way the 4d cubes (ADI+mSDI) are processed. Basically it
determines whether a single or double pass PCA is going to be computed.
crop_ifs: bool, optional
[adimsdi='single'] If True cube is cropped at the moment of frame
rescaling in wavelength. This is recommended for large FOVs such as the
one of SPHERE, but can remove significant amount of information close to
the edge of small FOVs (e.g. SINFONI).
imlib : Enum, see `vip_hci.config.paramenum.Imlib`
See the documentation of ``vip_hci.preproc.frame_rotate``.
imlib2 : Enum, see `vip_hci.config.paramenum.Imlib`
See the documentation of ``vip_hci.preproc.cube_rescaling_wavelengths``.
interpolation : Enum, see `vip_hci.config.paramenum.Interpolation`
See the documentation of the ``vip_hci.preproc.frame_rotate`` function.
collapse : Enum, see `vip_hci.config.paramenum.Collapse`
Sets how temporal residual frames should be combined to produce an
ADI image.
collapse_ifs : Enum, see `vip_hci.config.paramenum.Collapse`
Sets how spectral residual frames should be combined to produce an
mSDI image.
ifs_collapse_range: str 'all' or tuple of 2 int
If a tuple, it should contain the first and last channels where the mSDI
residual channels will be collapsed (by default collapses all channels).
mask_rdi: tuple of two numpy array or one signle 2d numpy array, opt
If provided, binary mask(s) will be used either in RDI mode or in
ADI+mSDI (2 steps) mode. They will be used as boat and anchor masks
following the procedure described in [REN23], which is useful to avoid
self-subtraction in the presence of a bright disc signal. If only one
mask is provided, the boat images will be unmasked
(i.e., full frames will be used).
check_memory : bool, optional
If True, it checks that the input cube is smaller than the available
system memory.
batch : None, int or float, optional
When it is not None, it triggers the incremental PCA (for ADI and
ADI+mSDI cubes). If an int is given, it corresponds to the number of
frames in each sequential mini-batch. If a float (0, 1] is given, it
corresponds to the size of the batch is computed wrt the available
memory in the system.
nproc : None or int, optional
Number of processes for parallel computing. If None the number of
processes will be set to (cpu_count()/2). Defaults to ``nproc=1``.
full_output: bool, optional
Whether to return the final median combined image only or with other
intermediate arrays.
verbose : bool, optional
If True prints intermediate info and timing.
weights: 1d numpy array or list, optional
Weights to be applied for a weighted mean. Need to be provided if
collapse mode is 'wmean'.
left_eigv : bool, optional
Whether to use rather left or right singularvectors
This mode is not compatible with 'mask_rdi' and 'batch'
min_frames_pca : int, optional
Minimum number of frames required in the PCA library. An error is raised
if less than such number of frames can be found.
cube_sig: numpy ndarray, opt
Cube with estimate of significant authentic signals. If provided, this
will be subtracted before projecting considering the science cube as
reference cube.
Return
-------
final_residuals_cube : List of numpy ndarray
[(ncomp is tuple or list) & (source_xy=None or full_output=True)] List
of residual final PCA frames obtained for a grid of PC values.
frame : numpy ndarray
[ncomp is scalar or source_xy!=None] 2D array, median combination of the
de-rotated/re-scaled residuals cube.
pcs : numpy ndarray
[full_output=True, source_xy=None] Principal components. Valid for
ADI cubes 3D or 4D (i.e. ``scale_list=None``). This is also returned
when ``batch`` is not None (incremental PCA).
recon_cube, recon : numpy ndarray
[full_output=True] Reconstructed cube. Valid for ADI cubes 3D or 4D
(i.e. ``scale_list=None``)
residuals_cube : numpy ndarray
[full_output=True] Residuals cube. Valid for ADI cubes 3D or 4D
(i.e. ``scale_list=None``)
residuals_cube_ : numpy ndarray
[full_output=True] Derotated residuals cube. Valid for ADI cubes 3D or
4D (i.e. ``scale_list=None``)
residuals_cube_channels : numpy ndarray
[full_output=True, adimsdi='double'] Residuals for each multi-spectral
cube. Valid for ADI+mSDI (4D) cubes (when ``scale_list`` is provided)
residuals_cube_channels_ : numpy ndarray
[full_output=True, adimsdi='double'] Derotated final residuals. Valid
for ADI+mSDI (4D) cubes (when ``scale_list`` is provided)
cube_allfr_residuals : numpy ndarray
[full_output=True, adimsdi='single'] Residuals cube (of the big cube
with channels and time processed together). Valid for ADI+mSDI (4D)
cubes (when ``scale_list`` is provided)
cube_adi_residuals : numpy ndarray
[full_output=True, adimsdi='single'] Residuals cube (of the big cube
with channels and time processed together) after de-scaling the wls.
Valid for ADI+mSDI (4D) (when ``scale_list`` is provided).
medians : numpy ndarray
[full_output=True, source_xy=None] This is also returned when ``batch``
is not None (incremental PCA).
ifs_adi_frames : numpy ndarray
[full_output=True, 4D input cube, ``scale_list=None``] This is the cube
of individual ADI reductions for each channel of the IFS cube.
"""
# Separating the parameters of the ParamsObject from optional rot_options
class_params, rot_options = separate_kwargs_dict(
initial_kwargs=all_kwargs, parent_class=PCA_Params
)
# Extracting the object of parameters (if any)
algo_params = None
if ALGO_KEY in rot_options.keys():
algo_params = rot_options[ALGO_KEY]
del rot_options[ALGO_KEY]
if algo_params is None:
algo_params = PCA_Params(*all_args, **class_params)
# by default, interpolate masked area before derotation if a mask is used
if algo_params.mask_center_px and len(rot_options) == 0:
rot_options['mask_val'] = 0
rot_options['ker'] = 1
rot_options['interp_zeros'] = True
start_time = time_ini(algo_params.verbose)
if algo_params.batch is None:
check_array(algo_params.cube, (3, 4), msg="cube")
else:
if not isinstance(algo_params.cube, (str, np.ndarray)):
raise TypeError(
"`cube` must be a numpy (3d or 4d) array or a str "
"with the full path on disk"
)
if algo_params.left_eigv:
if (
algo_params.batch is not None
or algo_params.mask_rdi is not None
or algo_params.cube_ref is not None
):
raise NotImplementedError(
"left_eigv is not compatible with 'mask_rdi' nor 'batch'"
)
# checking memory (if in-memory numpy array is provided)
if not isinstance(algo_params.cube, str):
input_bytes = (
algo_params.cube_ref.nbytes
if algo_params.cube_ref is not None
else algo_params.cube.nbytes
)
mem_msg = (
"Set check_memory=False to override this memory check or "
"set `batch` to run incremental PCA (valid for ADI or "
"ADI+mSDI single-pass)"
)
check_enough_memory(
input_bytes,
1.0,
raise_error=algo_params.check_memory,
error_msg=mem_msg,
verbose=algo_params.verbose,
)
if algo_params.nproc is None:
algo_params.nproc = cpu_count() // 2 # Hyper-threading doubles # cores
# All possible outputs for any PCA usage must be pre-declared to None
# Default possible outputs
(
frame,
final_residuals_cube,
pclist,
pcs,
medians,
recon,
residuals_cube,
residuals_cube_,
) = (None for _ in range(8))
# Full_output/cube dimension dependant variables
(
table,
cube_allfr_residuals,
cube_adi_residuals,
residuals_cube_channels,
residuals_cube_channels_,
ifs_adi_frames,
) = (None for _ in range(6))
# ADI + mSDI. Shape of cube: (n_channels, n_adi_frames, y, x)
# isinstance(cube, np.ndarray) and cube.ndim == 4:
if algo_params.scale_list is not None:
if algo_params.adimsdi == Adimsdi.DOUBLE:
add_params = {"start_time": start_time}
func_params = setup_parameters(
params_obj=algo_params, fkt=_adimsdi_doublepca, **add_params
)
res_pca = _adimsdi_doublepca(
**func_params,
**rot_options,
)
residuals_cube_channels, residuals_cube_channels_, frame = res_pca
elif algo_params.adimsdi == Adimsdi.SINGLE:
add_params = {"start_time": start_time}
func_params = setup_parameters(
params_obj=algo_params, fkt=_adimsdi_singlepca, **add_params
)
res_pca = _adimsdi_singlepca(
**func_params,
**rot_options,
)
if isinstance(algo_params.ncomp, (int, float)):
cube_allfr_residuals, cube_adi_residuals, frame = res_pca
elif isinstance(algo_params.ncomp, (tuple, list)):
if algo_params.source_xy is None:
if algo_params.full_output:
final_residuals_cube, pclist = res_pca
else:
final_residuals_cube = res_pca
else:
final_residuals_cube, frame, table, _ = res_pca
else:
raise ValueError("`adimsdi` mode not recognized")
# 4D cube, but no mSDI desired
elif algo_params.cube.ndim == 4:
nch, nz, ny, nx = algo_params.cube.shape
ifs_adi_frames = np.zeros([nch, ny, nx])
if not isinstance(algo_params.ncomp, list):
ncomp = [algo_params.ncomp] * nch
elif len(algo_params.ncomp) != nch:
nnpc = len(algo_params.ncomp)
ifs_adi_frames = np.zeros([nch, nnpc, ny, nx])
ncomp = [algo_params.ncomp] * nch
else:
ncomp = algo_params.ncomp
if np.isscalar(algo_params.fwhm):
algo_params.fwhm = [algo_params.fwhm] * nch
pcs = []
recon = []
residuals_cube = []
residuals_cube_ = []
# (ADI+)RDI
if algo_params.cube_ref is not None:
for ch in range(nch):
if algo_params.cube_ref[ch].ndim != 3:
msg = "Ref cube has wrong format for 4d input cube"
raise TypeError(msg)
add_params = {
"start_time": start_time,
"cube": algo_params.cube[ch],
"cube_ref": algo_params.cube_ref[ch],
"ncomp": ncomp[ch],
}
func_params = setup_parameters(
params_obj=algo_params, fkt=_adi_rdi_pca, **add_params
)
res_pca = _adi_rdi_pca(
**func_params,
**rot_options,
)
pcs.append(res_pca[0])
recon.append(res_pca[1])
residuals_cube.append(res_pca[2])
residuals_cube_.append(res_pca[3])
ifs_adi_frames[ch] = res_pca[-1]
# ADI
else:
final_residuals_cube = []
table = []
recon_cube = []
residuals_cube = []
residuals_cube_ = []
pclist = []
medians = []
for ch in range(nch):
add_params = {
"start_time": start_time,
"cube": algo_params.cube[ch],
"ncomp": ncomp[ch], # algo_params.ncomp[ch],
"fwhm": algo_params.fwhm[ch],
"full_output": True,
}
func_params = setup_parameters(
params_obj=algo_params, fkt=_adi_rdi_pca, **add_params
)
res_pca = _adi_rdi_pca(
**func_params,
**rot_options,
)
grid_case = False
if algo_params.batch is None:
if algo_params.source_xy is not None:
# PCA grid, computing S/Ns
if isinstance(ncomp[ch], (tuple, list)):
final_residuals_cube.append(res_pca[0])
ifs_adi_frames[ch] = res_pca[1]
table.append(res_pca[2])
# full-frame PCA with rotation threshold
else:
recon_cube.append(res_pca[0])
residuals_cube.append(res_pca[1])
residuals_cube_.append(res_pca[2])
ifs_adi_frames[ch] = res_pca[-1]
else:
# PCA grid
if isinstance(ncomp[ch], (tuple, list)):
ifs_adi_frames[ch] = res_pca[0]
pclist.append(res_pca[1])
grid_case = True
# full-frame standard PCA
else:
pcs.append(res_pca[0])
recon.append(res_pca[1])
residuals_cube.append(res_pca[2])
residuals_cube_.append(res_pca[3])
ifs_adi_frames[ch] = res_pca[-1]
# full-frame incremental PCA
else:
ifs_adi_frames[ch] = res_pca[0]
pcs.append(res_pca[2])
medians.append(res_pca[3])
if grid_case:
for i in range(len(ncomp[0])):
frame = [cube_collapse(ifs_adi_frames[:, i],
mode=algo_params.collapse_ifs)]
final_residuals_cube = frame
else:
frame = cube_collapse(ifs_adi_frames,
mode=algo_params.collapse_ifs)
# convert to numpy arrays when relevant
if len(pcs) > 0:
pcs = np.array(pcs)
if len(recon) > 0:
recon = np.array(recon)
if len(residuals_cube) > 0:
residuals_cube = np.array(residuals_cube)
if len(residuals_cube_) > 0:
residuals_cube_ = np.array(residuals_cube_)
if len(final_residuals_cube) > 0:
final_residuals_cube = np.array(final_residuals_cube)
if len(recon_cube) > 0:
recon_cube = np.array(recon_cube)
if len(medians) > 0:
medians = np.array(medians)
# ADI + RDI
# elif algo_params.cube_ref is not None:
# add_params = {
# "start_time": start_time,
# "full_output": True,
# }
# func_params = setup_parameters(
# params_obj=algo_params, fkt=_adi_rdi_pca, **add_params
# )
# res_pca = _adi_rdi_pca(
# **func_params,
# **rot_options,
# )
# pcs, recon, residuals_cube, residuals_cube_, frame = res_pca
# ADI+RDI OR ADI. Shape of cube: (n_adi_frames, y, x)
else:
add_params = {
"start_time": start_time,
"full_output": True,
}
func_params = setup_parameters(params_obj=algo_params,
fkt=_adi_rdi_pca, **add_params)
if algo_params.cube_ref is not None and algo_params.batch is not None:
raise ValueError("RDI not compatible with batch mode")
res_pca = _adi_rdi_pca(**func_params, **rot_options)
if algo_params.batch is None:
if algo_params.source_xy is not None:
# PCA grid, computing S/Ns
if isinstance(algo_params.ncomp, (tuple, list)):
if algo_params.full_output:
final_residuals_cube, frame, table, _ = res_pca
else:
# returning only the optimal residual
final_residuals_cube = res_pca[1]
# full-frame PCA with rotation threshold
else:
recon_cube, residuals_cube, residuals_cube_, frame = res_pca
else:
# PCA grid
if isinstance(algo_params.ncomp, (tuple, list)):
final_residuals_cube, pclist = res_pca
# full-frame standard PCA
else:
pcs, recon, residuals_cube, residuals_cube_, frame = res_pca
# full-frame incremental PCA
else:
frame, _, pcs, medians = res_pca
# else:
# raise RuntimeError(
# "Only ADI, ADI+RDI and ADI+mSDI observing techniques are supported"
# )
# --------------------------------------------------------------------------
# Returns for each case (ADI, ADI+RDI and ADI+mSDI) and combination of
# parameters: full_output, source_xy, batch, ncomp
# --------------------------------------------------------------------------
isarr = isinstance(algo_params.cube, np.ndarray)
if isarr and algo_params.scale_list is not None:
# ADI+mSDI double-pass PCA
if algo_params.adimsdi == Adimsdi.DOUBLE:
if algo_params.full_output:
return frame, residuals_cube_channels, residuals_cube_channels_
else:
return frame
elif algo_params.adimsdi == Adimsdi.SINGLE:
# ADI+mSDI single-pass PCA
if isinstance(algo_params.ncomp, (float, int)):
if algo_params.full_output:
return frame, cube_allfr_residuals, cube_adi_residuals
else:
return frame
# ADI+mSDI single-pass PCA grid
elif isinstance(algo_params.ncomp, (tuple, list)):
if algo_params.source_xy is None:
if algo_params.full_output:
return final_residuals_cube, pclist
else:
return final_residuals_cube
else:
if algo_params.full_output:
return final_residuals_cube, frame, table
else:
return frame
else:
msg = "ncomp value should only be a float, an int or a tuple of"
msg += f" those, not a {type(algo_params.ncomp)}."
raise ValueError(msg)
else:
msg = f"ADIMSDI value should only be {Adimsdi.SINGLE} or"
msg += f" {Adimsdi.DOUBLE}."
raise ValueError(msg)
# ADI and ADI+RDI (3D or 4D)
elif isinstance(algo_params.cube, str) or algo_params.scale_list is None:
if algo_params.source_xy is None and algo_params.full_output:
# incremental PCA
if algo_params.batch is not None:
final_res = [frame, pcs, medians]
else:
# PCA grid
if isinstance(algo_params.ncomp, (tuple, list)):
final_res = [final_residuals_cube, pclist]
# full-frame standard PCA or ADI+RDI
else:
final_res = [frame, pcs, recon, residuals_cube,
residuals_cube_]
if algo_params.cube.ndim == 4:
final_res.append(ifs_adi_frames)
return tuple(final_res)
elif algo_params.source_xy is not None and algo_params.full_output:
# PCA grid, computing S/Ns
if isinstance(algo_params.ncomp, (tuple, list)):
final_res = [final_residuals_cube, frame, table]
# full-frame PCA with rotation threshold
else:
final_res = [frame, recon_cube, residuals_cube, residuals_cube_]
return tuple(final_res)
elif not algo_params.full_output:
# PCA grid
if isinstance(algo_params.ncomp, (tuple, list)):
return final_residuals_cube
# full-frame standard PCA or ADI+RDI
else:
return frame
else:
msg = "cube value should only be a str or a numpy.ndarray, not a "
msg += f"{type(algo_params.cube)}."
raise ValueError(msg)
def _adi_rdi_pca(
cube,
cube_ref,
angle_list,
ncomp,
batch,
source_xy,
delta_rot,
fwhm,
scaling,
mask_center_px,
svd_mode,
imlib,
interpolation,
collapse,
verbose,
start_time,
nproc,
full_output,
weights=None,
mask_rdi=None,
cube_sig=None,
left_eigv=False,
min_frames_pca=10,
**rot_options,
):
"""Handle the ADI or ADI+RDI PCA post-processing."""
(
frame,
pcs,
recon,
residuals_cube,
residuals_cube_,
) = (None for _ in range(5))
# Full/Single ADI processing, incremental PCA
if batch is not None:
result = pca_incremental(
cube,
angle_list,
batch=batch,
ncomp=ncomp,
collapse=collapse,
verbose=verbose,
full_output=full_output,
start_time=start_time,
weights=weights,
nproc=nproc,
imlib=imlib,
interpolation=interpolation,
**rot_options,
)
return result
else:
# Full/Single ADI processing
n, y, x = cube.shape
angle_list = check_pa_vector(angle_list)
if not n == angle_list.shape[0]:
raise ValueError(
"`angle_list` vector has wrong length. It must "
"equal the number of frames in the cube"
)
if not isinstance(ncomp, (int, float, tuple, list)):
msg = "`ncomp` must be an int, float, tuple or list in the ADI case"
raise TypeError(msg)
if np.isscalar(ncomp):
if isinstance(ncomp, int) and ncomp > n:
ncomp = min(ncomp, n)
print(
"Number of PCs too high (max PCs={}), using {} PCs "
"instead.".format(n, ncomp)
)
if mask_rdi is None:
if source_xy is None:
residuals_result = _project_subtract(
cube,
cube_ref,
ncomp,
scaling,
mask_center_px,
svd_mode,
verbose,
full_output,
cube_sig=cube_sig,
left_eigv=left_eigv,
)
if verbose:
timing(start_time)
if full_output:
residuals_cube = residuals_result[0]
reconstructed = residuals_result[1]
V = residuals_result[2]
pcs = reshape_matrix(V, y, x) if not left_eigv else V.T
recon = reshape_matrix(reconstructed, y, x)
else:
residuals_cube = residuals_result
# A rotation threshold is applied
else:
if delta_rot is None or fwhm is None:
msg = "Delta_rot or fwhm parameters missing. Needed for"
msg += "PA-based rejection of frames from the library"
raise TypeError(msg)
nfrslib = []
residuals_cube = np.zeros_like(cube)
recon_cube = np.zeros_like(cube)
yc, xc = frame_center(cube[0], False)
x1, y1 = source_xy
ann_center = dist(yc, xc, y1, x1)
pa_thr = _compute_pa_thresh(ann_center, fwhm, delta_rot)
for frame in range(n):
ind = _find_indices_adi(angle_list, frame, pa_thr)
res_result = _project_subtract(
cube,
cube_ref,
ncomp,
scaling,
mask_center_px,
svd_mode,
verbose,
full_output,
ind,
frame,
cube_sig=cube_sig,
left_eigv=left_eigv,
min_frames_pca=min_frames_pca,
)
if full_output:
nfrslib.append(res_result[0])
residual_frame = res_result[1]
recon_frame = res_result[2]
residuals_cube[frame] = residual_frame.reshape((y,
x))
recon_cube[frame] = recon_frame.reshape((y, x))
else:
nfrslib.append(res_result[0])
residual_frame = res_result[1]
residuals_cube[frame] = residual_frame.reshape((y,
x))
# number of frames in library printed for each ann. quadrant
if verbose:
descriptive_stats(nfrslib, verbose=verbose,
label="Size LIB: ")
else:
residuals_result = cube_subtract_sky_pca(
cube, cube_ref, mask_rdi, ncomp=ncomp, full_output=True
)
residuals_cube = residuals_result[0]
pcs = residuals_result[2]
recon = residuals_result[-1]
residuals_cube_ = cube_derotate(
residuals_cube,
angle_list,
nproc=nproc,
imlib=imlib,
interpolation=interpolation,
**rot_options,
)
if mask_center_px:
residuals_cube_ = mask_circle(residuals_cube_, mask_center_px)
frame = cube_collapse(residuals_cube_, mode=collapse, w=weights)
if verbose:
print("Done de-rotating and combining")
timing(start_time)
if source_xy is not None:
if full_output:
return (recon_cube,
residuals_cube,
residuals_cube_,
frame)
else:
return frame
else:
if full_output:
return (pcs,
recon,
residuals_cube,
residuals_cube_,
frame)
else:
return frame
# When ncomp is a tuple, pca_grid is called
else:
gridre = pca_grid(
cube,
angle_list,
fwhm,
range_pcs=ncomp,
source_xy=source_xy,
cube_ref=cube_ref,
mode="fullfr",
svd_mode=svd_mode,
scaling=scaling,
mask_center_px=mask_center_px,
fmerit="mean",
collapse=collapse,
verbose=verbose,
full_output=full_output,
debug=False,
plot=verbose,
start_time=start_time,
weights=weights,
nproc=nproc,
imlib=imlib,
interpolation=interpolation,
**rot_options,
)
return gridre
def _adimsdi_singlepca(
cube,
angle_list,
scale_list,
ncomp,
fwhm,
source_xy,
scaling,
mask_center_px,
svd_mode,
imlib,
imlib2,
interpolation,
collapse,
collapse_ifs,
ifs_collapse_range,
verbose,
start_time,
nproc,
crop_ifs,
batch,
full_output,
weights=None,
left_eigv=False,
min_frames_pca=10,
**rot_options,
):
"""Handle the full-frame ADI+mSDI single PCA post-processing."""
z, n, y_in, x_in = cube.shape
angle_list = check_pa_vector(angle_list)
if not angle_list.shape[0] == n:
msg = "Angle list vector has wrong length. It must equal the number"
msg += " frames in the cube"
raise ValueError(msg)
if scale_list is None:
raise ValueError("`scale_list` must be provided")
else:
check_array(scale_list, dim=1, msg="scale_list")
if not scale_list.shape[0] == z:
raise ValueError("`scale_list` has wrong length")
scale_list = check_scal_vector(scale_list)
big_cube = []
if verbose:
print("Rescaling the spectral channels to align the speckles")
for i in Progressbar(range(n), verbose=verbose):
cube_resc = scwave(cube[:, i, :, :], scale_list, imlib=imlib2,
interpolation=interpolation)[0]
if crop_ifs:
cube_resc = cube_crop_frames(cube_resc, size=y_in, verbose=False)
big_cube.append(cube_resc)
big_cube = np.array(big_cube)
big_cube = big_cube.reshape(z * n, big_cube.shape[2], big_cube.shape[3])
if verbose:
timing(start_time)
print("{} total frames".format(n * z))
print("Performing single-pass PCA")
if isinstance(ncomp, (int, float)):
# When ncomp is a int and batch is not None, incremental ADI-PCA is run
if batch is not None:
res_cube = pca_incremental(
big_cube,
angle_list,
batch,
ncomp,
collapse,
verbose,
return_residuals=True,
start_time=start_time,
weights=weights,
nproc=nproc,
imlib=imlib,
interpolation=interpolation,
**rot_options,
)
# When ncomp is a int/float and batch is None, standard ADI-PCA is run
else:
res_cube = _project_subtract(
big_cube,
None,
ncomp,
scaling,
mask_center_px,
svd_mode,
verbose,
False,
left_eigv=left_eigv,
min_frames_pca=min_frames_pca,
)
if verbose:
timing(start_time)
resadi_cube = np.zeros((n, y_in, x_in))
if verbose:
print("Descaling the spectral channels")
if ifs_collapse_range == "all":
idx_ini = 0
idx_fin = z
else:
idx_ini = ifs_collapse_range[0]
idx_fin = ifs_collapse_range[1]
for i in Progressbar(range(n), verbose=verbose):
frame_i = scwave(
res_cube[i * z + idx_ini:i * z + idx_fin],
scale_list[idx_ini:idx_fin],
full_output=False,
inverse=True,
y_in=y_in,
x_in=x_in,
imlib=imlib2,
interpolation=interpolation,
collapse=collapse_ifs,
)
resadi_cube[i] = frame_i
if verbose:
print("De-rotating and combining residuals")
timing(start_time)
der_res = cube_derotate(
resadi_cube,
angle_list,
nproc=nproc,
imlib=imlib,
interpolation=interpolation,
**rot_options,
)
if mask_center_px:
der_res = mask_circle(der_res, mask_center_px)
frame = cube_collapse(der_res, mode=collapse, w=weights)
cube_allfr_residuals = res_cube
cube_adi_residuals = resadi_cube
return cube_allfr_residuals, cube_adi_residuals, frame
# When ncomp is a tuple, pca_grid is called
elif isinstance(ncomp, (tuple, list)):
gridre = pca_grid(
big_cube,
angle_list,
fwhm,
range_pcs=ncomp,
source_xy=source_xy,
cube_ref=None,
mode="fullfr",
svd_mode=svd_mode,
scaling=scaling,
mask_center_px=mask_center_px,
fmerit="mean",
collapse=collapse,
ifs_collapse_range=ifs_collapse_range,
verbose=verbose,
full_output=full_output,
debug=False,
plot=verbose,
start_time=start_time,
scale_list=scale_list,
initial_4dshape=cube.shape,
weights=weights,
nproc=nproc,
imlib=imlib,
interpolation=interpolation,
**rot_options,
)
return gridre
else:
raise TypeError(
"`ncomp` must be an int, float, tuple or list for single-pass PCA"
)
def _adimsdi_doublepca(
cube,
angle_list,
scale_list,
ncomp,
scaling,
mask_center_px,
svd_mode,
imlib,
imlib2,
interpolation,
collapse,
collapse_ifs,
ifs_collapse_range,
verbose,
start_time,
nproc,
weights=None,
fwhm=4,
conv=False,
mask_rdi=None,
cube_sig=None,
left_eigv=False,
**rot_options,
):
"""Handle the full-frame ADI+mSDI double PCA post-processing."""
z, n, y_in, x_in = cube.shape
global ARRAY
ARRAY = cube # to be passed to _adimsdi_doublepca_ifs
if not isinstance(ncomp, tuple):
raise TypeError(
"`ncomp` must be a tuple when a double pass PCA" " is performed"
)
else:
ncomp_ifs, ncomp_adi = ncomp
angle_list = check_pa_vector(angle_list)
if not angle_list.shape[0] == n:
msg = "Angle list vector has wrong length. It must equal the number"
msg += " frames in the cube"
raise ValueError(msg)
if scale_list is None:
raise ValueError("Scaling factors vector must be provided")
else:
if np.array(scale_list).ndim > 1:
raise ValueError("Scaling factors vector is not 1d")
if not scale_list.shape[0] == cube.shape[0]:
raise ValueError("Scaling factors vector has wrong length")
scale_list = check_scal_vector(scale_list)
if verbose:
print("{} spectral channels in IFS cube".format(z))
if ncomp_ifs is None:
print("Combining multi-spectral frames (skipping PCA)")
else:
print("First PCA stage exploiting spectral variability")
if ncomp_ifs is not None and ncomp_ifs > z:
ncomp_ifs = min(ncomp_ifs, z)
msg = "Number of PCs too high (max PCs={}), using {} PCs instead"
print(msg.format(z, ncomp_ifs))
res = pool_map(
nproc,
_adimsdi_doublepca_ifs,
iterable(range(n)),
ncomp_ifs,
scale_list,
scaling,
mask_center_px,
svd_mode,
imlib2,
interpolation,
collapse_ifs,
ifs_collapse_range,
fwhm,
conv,
mask_rdi,
left_eigv,
)
residuals_cube_channels = np.array(res)
if verbose:
timing(start_time)
# de-rotation of the PCA processed channels, ADI fashion
if ncomp_adi is None:
if verbose:
print("{} ADI frames".format(n))
print("De-rotating and combining frames (skipping PCA)")
residuals_cube_channels_ = cube_derotate(
residuals_cube_channels,
angle_list,
nproc=nproc,
imlib=imlib,
interpolation=interpolation,
**rot_options,
)
frame = cube_collapse(residuals_cube_channels_, mode=collapse,
w=weights)
if verbose:
timing(start_time)
else:
if ncomp_adi > n:
ncomp_adi = n
msg = "Number of PCs too high, using maximum of {} PCs instead"
print(msg.format(n))
if verbose:
print("{} ADI frames".format(n))
print("Second PCA stage exploiting rotational variability")
res_ifs_adi = _project_subtract(
residuals_cube_channels,
None,
ncomp_adi,
scaling,
mask_center_px,
svd_mode,
verbose=False,
full_output=False,
cube_sig=cube_sig,
left_eigv=left_eigv,
)
if verbose:
print("De-rotating and combining residuals")
der_res = cube_derotate(
res_ifs_adi,
angle_list,
nproc=nproc,
imlib=imlib,
interpolation=interpolation,
**rot_options,
)
residuals_cube_channels_ = der_res
frame = cube_collapse(residuals_cube_channels_, mode=collapse,
w=weights)
if verbose:
timing(start_time)
return residuals_cube_channels, residuals_cube_channels_, frame
def _adimsdi_doublepca_ifs(
fr,
ncomp,
scale_list,
scaling,
mask_center_px,
svd_mode,
imlib,
interpolation,
collapse,
ifs_collapse_range,
fwhm,
conv,
mask_rdi=None,
left_eigv=False,
):
"""Call by _adimsdi_doublepca with pool_map."""
global ARRAY
z, n, y_in, x_in = ARRAY.shape
multispec_fr = ARRAY[:, fr, :, :]
if ifs_collapse_range == "all":
idx_ini = 0
idx_fin = z
else:
idx_ini = ifs_collapse_range[0]
idx_fin = ifs_collapse_range[1]
if ncomp is None:
frame_i = cube_collapse(multispec_fr[idx_ini:idx_fin])
else:
cube_resc = scwave(
multispec_fr, scale_list, imlib=imlib, interpolation=interpolation
)[0]
if conv:
# convolve all frames with the same kernel
cube_resc = cube_filter_lowpass(
cube_resc, mode="gauss", fwhm_size=fwhm, verbose=False
)
if mask_rdi is None:
residuals = _project_subtract(
cube_resc,
None,
ncomp,
scaling,
mask_center_px,
svd_mode,
verbose=False,
full_output=False,
left_eigv=left_eigv,
)
else:
residuals = np.zeros_like(cube_resc)
for i in range(z):
cube_tmp = np.array([cube_resc[i]])
cube_ref = np.array([cube_resc[j] for j in range(z) if j != i])
residuals[i] = cube_subtract_sky_pca(
cube_tmp, cube_ref, mask_rdi, ncomp=ncomp, full_output=False
)
frame_i = scwave(
residuals[idx_ini:idx_fin],
scale_list[idx_ini:idx_fin],
full_output=False,
inverse=True,
y_in=y_in,
x_in=x_in,
imlib=imlib,
interpolation=interpolation,
collapse=collapse,
)
if mask_center_px:
frame_i = mask_circle(frame_i, mask_center_px)
return frame_i
# def _adi_rdi_pca(
# cube,
# cube_ref,
# angle_list,
# ncomp,
# source_xy,
# delta_rot,
# fwhm,
# scaling,
# mask_center_px,
# svd_mode,
# imlib,
# interpolation,
# collapse,
# verbose,
# start_time,
# nproc,
# weights=None,
# mask_rdi=None,
# cube_sig=None,
# left_eigv=False,
# **rot_options,
# ):
# """Handle the ADI+RDI post-processing."""
# n, y, x = cube.shape
# n_ref, y_ref, x_ref = cube_ref.shape
# angle_list = check_pa_vector(angle_list)
# if not isinstance(ncomp, int):
# raise TypeError("`ncomp` must be an int in the ADI+RDI case")
# if ncomp > n_ref:
# msg = (
# "Requested number of PCs ({}) higher than the number of frames "
# + "in the reference cube ({}); using the latter instead."
# )
# print(msg.format(ncomp, n_ref))
# ncomp = n_ref
# if not cube_ref.ndim == 3:
# msg = "Input reference array is not a cube or 3d array"
# raise ValueError(msg)
# if not y_ref == y and x_ref == x:
# msg = "Reference and target frames have different shape"
# raise TypeError(msg)
# if mask_rdi is None:
# if source_xy is None:
# residuals_result = _project_subtract(
# cube,
# cube_ref,
# ncomp,
# scaling,
# mask_center_px,
# svd_mode,
# verbose,
# True,
# cube_sig=cube_sig,
# left_eigv=left_eigv,
# )
# residuals_cube = residuals_result[0]
# reconstructed = residuals_result[1]
# V = residuals_result[2]
# pcs = reshape_matrix(V, y, x) if not left_eigv else V.T
# recon = reshape_matrix(reconstructed, y, x)
# # A rotation threshold is applied
# else:
# if delta_rot is None or fwhm is None:
# msg = "Delta_rot or fwhm parameters missing. Needed for the"
# msg += "PA-based rejection of frames from the library"
# raise TypeError(msg)
# nfrslib = []
# residuals_cube = np.zeros_like(cube)
# recon_cube = np.zeros_like(cube)
# yc, xc = frame_center(cube[0], False)
# x1, y1 = source_xy
# ann_center = dist(yc, xc, y1, x1)
# pa_thr = _compute_pa_thresh(ann_center, fwhm, delta_rot)
# mid_range = np.abs(np.max(angle_list) - np.min(angle_list)) / 2
# if pa_thr >= mid_range - mid_range * 0.1:
# new_pa_th = float(mid_range - mid_range * 0.1)
# if verbose:
# msg = "PA threshold {:.2f} is too big, will be set to "
# msg += "{:.2f}"
# print(msg.format(pa_thr, new_pa_th))
# pa_thr = new_pa_th
# for frame in range(n):
# if ann_center > fwhm * 3: # TODO: 3 optimal value? new par?
# ind = _find_indices_adi(
# angle_list, frame, pa_thr, truncate=True
# )
# else:
# ind = _find_indices_adi(angle_list, frame, pa_thr)
# res_result = _project_subtract(
# cube,
# cube_ref,
# ncomp,
# scaling,
# mask_center_px,
# svd_mode,
# verbose,
# full_output,
# ind,
# frame,
# cube_sig=cube_sig,
# left_eigv=left_eigv,
# min_frames_pca=min_frames_pca,
# )
# if full_output:
# nfrslib.append(res_result[0])
# residual_frame = res_result[1]
# recon_frame = res_result[2]
# residuals_cube[frame] = residual_frame.reshape((y, x))
# recon_cube[frame] = recon_frame.reshape((y, x))
# else:
# nfrslib.append(res_result[0])
# residual_frame = res_result[1]
# residuals_cube[frame] = residual_frame.reshape((y, x))
# # number of frames in library printed for each annular quadrant
# if verbose:
# descriptive_stats(nfrslib, verbose=verbose,
# label="Size LIB: ")
# else:
# residuals_result = cube_subtract_sky_pca(
# cube, cube_ref, mask_rdi, ncomp=ncomp, full_output=True
# )
# residuals_cube = residuals_result[0]
# pcs = residuals_result[2]
# recon = residuals_result[-1]
# residuals_cube_ = cube_derotate(
# residuals_cube,
# angle_list,
# nproc=nproc,
# imlib=imlib,
# interpolation=interpolation,
# **rot_options,
# )
# frame = cube_collapse(residuals_cube_, mode=collapse, w=weights)
# if mask_center_px:
# frame = mask_circle(frame, mask_center_px)
# if verbose:
# print("Done de-rotating and combining")
# timing(start_time)
# return pcs, recon, residuals_cube, residuals_cube_, frame
def _project_subtract(
cube,
cube_ref,
ncomp,
scaling,
mask_center_px,
svd_mode,
verbose,
full_output,
indices=None,
frame=None,
cube_sig=None,
left_eigv=False,
min_frames_pca=10,
):
"""
PCA projection and model PSF subtraction.
Used as a helping function by each of the PCA modes (ADI, ADI+RDI,
ADI+mSDI).
Parameters
----------
cube : numpy ndarray
Input cube.
cube_ref : numpy ndarray
Reference cube.
ncomp : int
Number of principal components.
scaling : str
Scaling of pixel values. See ``pca`` docstrings.
mask_center_px : int
Masking out a centered circular aperture.
svd_mode : str
Mode for SVD computation. See ``pca`` docstrings.
verbose : bool
Verbosity.
full_output : bool
Whether to return intermediate arrays or not.
left_eigv : bool, optional
Whether to use rather left or right singularvectors
indices : list
Indices to be used to discard frames (a rotation threshold is used).
frame : int
Index of the current frame (when indices is a list and a rotation
threshold was applied).
cube_sig: numpy ndarray, opt
Cube with estimate of significant authentic signals. If provided, this
will be subtracted from both the cube and the PCA library, before
projecting the cube onto the principal components.
Returns
-------
ref_lib_shape : int
[indices is not None, frame is not None] Number of
rows in the reference library for the given frame.
residuals: numpy ndarray
Residuals, returned in every case.
reconstructed : numpy ndarray
[full_output=True] The reconstructed array.
V : numpy ndarray
[full_output=True, indices is None, frame is None]
The right singular vectors of the input matrix, as returned by
``svd/svd_wrapper()``
"""
_, y, x = cube.shape
if isinstance(ncomp, (int, np.int_)):
if indices is not None and frame is not None:
matrix = prepare_matrix(
cube, scaling, mask_center_px, mode="fullfr", verbose=False
)
elif left_eigv:
matrix = prepare_matrix(cube, scaling, mask_center_px,
mode="fullfr", verbose=verbose,
discard_mask_pix=True)
else:
matrix = prepare_matrix(
cube, scaling, mask_center_px, mode="fullfr", verbose=verbose
)
if cube_sig is None:
matrix_emp = matrix.copy()
else:
if left_eigv:
matrix_sig = prepare_matrix(cube_sig, scaling, mask_center_px,
mode="fullfr", verbose=verbose,
discard_mask_pix=True)
else:
nfr = cube_sig.shape[0]
matrix_sig = np.reshape(cube_sig, (nfr, -1))
matrix_emp = matrix - matrix_sig
if cube_ref is not None:
if left_eigv:
matrix_ref = prepare_matrix(cube_sig, scaling, mask_center_px,
mode="fullfr", verbose=verbose,
discard_mask_pix=True)
else:
matrix_ref = prepare_matrix(cube_ref, scaling, mask_center_px,
mode="fullfr", verbose=verbose)
# check whether indices are well defined (i.e. not empty)
msg = "{} frames comply to delta_rot condition < less than "
msg1 = msg + "min_frames_pca ({}). Try decreasing delta_rot or "
msg1 += "min_frames_pca"
msg2 = msg + "ncomp ({}). Try decreasing the parameter delta_rot or "
msg2 += "ncomp"
if indices is not None and frame is not None:
try:
ref_lib = matrix_emp[indices]
except IndexError:
indices = None
if cube_ref is None and indices is None:
raise RuntimeError(msg1.format(0, min_frames_pca))
# a rotation threshold is used (frames are processed one by one)
if indices is not None and frame is not None:
if cube_ref is not None:
ref_lib = np.concatenate((ref_lib, matrix_ref))
if ref_lib.shape[0] < min_frames_pca:
raise RuntimeError(msg1.format(ref_lib.shape[0],
min_frames_pca))
if ref_lib.shape[0] < ncomp:
raise RuntimeError(msg2.format(ref_lib.shape[0], ncomp))
curr_frame = matrix[frame] # current frame
curr_frame_emp = matrix_emp[frame]
if left_eigv:
V = svd_wrapper(ref_lib, svd_mode, ncomp, False,
left_eigv=left_eigv)
transformed = np.dot(curr_frame_emp.T, V)
reconstructed = np.dot(V, transformed.T)
else:
V = svd_wrapper(ref_lib, svd_mode, ncomp, False)
transformed = np.dot(curr_frame_emp, V.T)
reconstructed = np.dot(transformed.T, V)
residuals = curr_frame - reconstructed
if full_output:
return ref_lib.shape[0], residuals, reconstructed
else:
return ref_lib.shape[0], residuals
# the whole matrix is processed at once
else:
if cube_ref is not None:
ref_lib = matrix_ref
else:
ref_lib = matrix_emp
if left_eigv:
V = svd_wrapper(ref_lib, svd_mode, ncomp, verbose,
left_eigv=left_eigv)
transformed = np.dot(matrix_emp.T, V)
reconstructed = np.dot(V, transformed.T)
else:
V = svd_wrapper(ref_lib, svd_mode, ncomp, verbose)
transformed = np.dot(V, matrix_emp.T)
reconstructed = np.dot(transformed.T, V)
residuals = matrix - reconstructed
residuals_res = reshape_matrix(residuals, y, x)
if full_output:
return residuals_res, reconstructed, V
else:
return residuals_res
elif isinstance(ncomp, (float, np.float16, np.float32, np.float64)):
if not 1 > ncomp > 0:
raise ValueError(
"when `ncomp` is float, it must lie in the " "interval (0,1]"
)
svdecomp = SVDecomposer(cube, mode="fullfr", svd_mode=svd_mode,
scaling=scaling, verbose=verbose)
_ = svdecomp.get_cevr(plot=False)
# in this case ncomp is the desired CEVR
cevr = ncomp
ncomp = svdecomp.cevr_to_ncomp(cevr)
V = svdecomp.v[:ncomp]
transformed = np.dot(V, svdecomp.matrix.T)
reconstructed = np.dot(transformed.T, V)
residuals = svdecomp.matrix - reconstructed
residuals_res = reshape_matrix(residuals, y, x)
if verbose and isinstance(cevr, float):
print("Components used : {}".format(V.shape[0]))
if full_output:
return residuals_res, reconstructed, V
else:
return residuals_res
else:
raise TypeError("Type not recognized for ncomp, should be int or float")