Source code for vip_hci.psfsub.pca_local

#! /usr/bin/env python
"""
Module with local/smart PCA (annulus or patch-wise in a multi-processing
fashion) model PSF subtraction for ADI, ADI+SDI (IFS) and ADI+RDI datasets.

.. [ABS13]
   | Absil et al. 2013
   | **Searching for companions down to 2 AU from beta Pictoris using the
     L'-band AGPM coronagraph on VLT/NACO**
   | *Astronomy & Astrophysics, Volume 559, Issue 1, p. 12*
   | `https://arxiv.org/abs/1311.4298
     <https://arxiv.org/abs/1311.4298>`_

"""

__author__ = "Carlos Alberto Gomez Gonzalez, Valentin Christiaens, Thomas Bédrine"
__all__ = ["pca_annular", "PCA_ANNULAR_Params"]

import numpy as np
from multiprocessing import cpu_count
from typing import Tuple, List, Union
from enum import Enum
from dataclasses import dataclass
from .svd import get_eigenvectors
from ..preproc import (cube_derotate, cube_collapse, check_pa_vector,
                       check_scal_vector)
from ..preproc import cube_rescaling_wavelengths as scwave
from ..preproc.derotation import _find_indices_adi, _define_annuli
from ..preproc.rescaling import _find_indices_sdi
from ..config import time_ini, timing
from ..config.paramenum import SvdMode, Imlib, Interpolation, Collapse, ALGO_KEY
from ..config.utils_conf import pool_map, iterable
from ..config.utils_param import setup_parameters, separate_kwargs_dict
from ..stats import descriptive_stats
from ..var import get_annulus_segments, matrix_scaling
AUTO = "auto"


[docs] @dataclass class PCA_ANNULAR_Params: """ Set of parameters for the annular PCA module. """ cube: np.ndarray = None angle_list: np.ndarray = None cube_ref: np.ndarray = None scale_list: np.ndarray = None radius_int: int = 0 fwhm: float = 4 asize: float = 4 n_segments: Union[int, List[int], AUTO] = 1 delta_rot: Union[float, Tuple[float]] = (0.1, 1) delta_sep: Union[float, Tuple[float]] = (0.1, 1) ncomp: Union[int, Tuple, np.ndarray, AUTO] = 1 svd_mode: Enum = SvdMode.LAPACK nproc: int = 1 min_frames_lib: int = 2 max_frames_lib: int = 200 tol: float = 1e-1 scaling: Enum = None imlib: Enum = Imlib.VIPFFT interpolation: Enum = Interpolation.LANCZOS4 collapse: Enum = Collapse.MEDIAN collapse_ifs: Enum = Collapse.MEAN ifs_collapse_range: Union["all", Tuple[int]] = "all" theta_init: int = 0 weights: np.ndarray = None cube_sig: np.ndarray = None full_output: bool = False verbose: bool = True left_eigv: bool = False
[docs] def pca_annular(*all_args: List, **all_kwargs: dict): """PCA model PSF subtraction for ADI, ADI+RDI or ADI+mSDI (IFS) data. The PCA model is computed locally in each annulus (or annular sectors according to ``n_segments``). For each sector we discard reference frames taking into account a parallactic angle threshold (``delta_rot``) and optionally a radial movement threshold (``delta_sep``) for 4d cubes. For ADI+RDI data, it computes the principal components from the reference library/cube, forcing pixel-wise temporal standardization. The number of principal components can be automatically adjusted by the algorithm by minimizing the residuals inside each patch/region. References: [AMA12]_ for PCA-ADI; [ABS13]_ for PCA-ADI in concentric annuli considering a parallactic angle threshold; [CHR19]_ for PCA-ASDI and PCA-SADI in one or two steps. Parameters ---------- all_args: list, optional Positionnal arguments for the PCA annular algorithm. Full list of parameters below. all_kwargs: dictionary, optional Mix of keyword arguments that can initialize a PCA_ANNULAR_Params and the optional 'rot_options' dictionnary, with keyword values for "border_mode", "mask_val", "edge_blend", "interp_zeros", "ker" (see documentation of ``vip_hci.preproc.frame_rotate``). Can also contain a PCA_ANNULAR_Params named as `algo_params`. PCA annular parameters ---------- cube : numpy ndarray, 3d or 4d Input cube. angle_list : numpy ndarray, 1d Corresponding parallactic angle for each frame. cube_ref : numpy ndarray, 3d, optional Reference library cube. For Reference Star Differential Imaging. scale_list : numpy ndarray, 1d, optional If provided, triggers mSDI reduction. These should be the scaling factors used to re-scale the spectral channels and align the speckles in case of IFS data (ADI+mSDI cube). Usually, these can be approximated by the last channel wavelength divided by the other wavelengths in the cube (more thorough approaches can be used to get the scaling factors, e.g. with ``vip_hci.preproc.find_scal_vector``). radius_int : int, optional The radius of the innermost annulus. By default is 0, if >0 then the central circular region is discarded. fwhm : float, optional Size of the FWHM in pixels. Default is 4. asize : float, optional The size of the annuli, in pixels. n_segments : int or list of ints or 'auto', optional The number of segments for each annulus. When a single integer is given it is used for all annuli. When set to 'auto', the number of segments is automatically determined for every annulus, based on the annulus width. delta_rot : float or tuple of floats, optional Factor for adjusting the parallactic angle threshold, expressed in FWHM. Default is 1 (excludes 1 FWHM on each side of the considered frame). If a tuple of two floats is provided, they are used as the lower and upper intervals for the threshold (grows linearly as a function of the separation). delta_sep : float or tuple of floats, optional The threshold separation in terms of the mean FWHM (for ADI+mSDI data). If a tuple of two values is provided, they are used as the lower and upper intervals for the threshold (grows as a function of the separation). ncomp : 'auto', int, tuple/1d numpy array of int, list, tuple of lists, opt How many PCs are used as a lower-dimensional subspace to project the target (sectors of) frames. Depends on the dimensionality of `cube`. * ADI and ADI+RDI (``cube`` is a 3d array): if a single integer is provided, then the same number of PCs will be subtracted at each separation (annulus). If a tuple is provided, then a different number of PCs will be used for each annulus (starting with the innermost one). If ``ncomp`` is set to ``auto`` then the number of PCs are calculated for each region/patch automatically. If a list of int is provided, several npc will be tried at once, but the same value of npc will be used for all annuli. If a tuple of lists of int is provided, the length of tuple should match the number of annuli and different sets of npc will be calculated simultaneously for each annulus, with the exact values of npc provided in the respective lists. * ADI or ADI+RDI (``cube`` is a 4d array): same input format allowed as above, but with a slightly different behaviour if ncomp is a list: if it has the same length as the number of channels, each element of the list will be used as ``ncomp`` value (whether int, float or tuple) for each spectral channel. Otherwise the same behaviour as above is assumed. * ADI+mSDI case: ``ncomp`` must be a tuple of two integers or a list of tuples of two integers, with the number of PCs obtained from each multi-spectral frame (for each sector) and the number of PCs used in the second PCA stage (ADI fashion, using the residuals of the first stage). If None then the second PCA stage is skipped and the residuals are de-rotated and combined. svd_mode : Enum, see `vip_hci.config.paramenum.SvdMode` Switch for the SVD method/library to be used. nproc : None or int, optional Number of processes for parallel computing. If None the number of processes will be set to (cpu_count()/2). min_frames_lib : int, optional Minimum number of frames in the PCA reference library. max_frames_lib : int, optional Maximum number of frames in the PCA reference library. The more distant/decorrelated frames are removed from the library. tol : float, optional Stopping criterion for choosing the number of PCs when ``ncomp`` is None. Lower values will lead to smaller residuals and more PCs. scaling : Enum, see `vip_hci.config.paramenum.Scaling` Pixel-wise scaling mode using ``sklearn.preprocessing.scale`` function. If set to None, the input matrix is left untouched. imlib : Enum, see `vip_hci.config.paramenum.Imlib` See the documentation of the ``vip_hci.preproc.frame_rotate`` function. interpolation : Enum, see `vip_hci.config.paramenum.Interpolation` See the documentation of the ``vip_hci.preproc.frame_rotate`` function. collapse : Enum, see `vip_hci.config.paramenum.Collapse` Sets the way of collapsing the frames for producing a final image. collapse_ifs : Enum, see `vip_hci.config.paramenum.Collapse` Sets how spectral residual frames should be combined to produce an mSDI image. ifs_collapse_range: str 'all' or tuple of 2 int If a tuple, it should contain the first and last channels where the mSDI residual channels will be collapsed (by default collapses all channels). full_output: boolean, optional Whether to return the final median combined image only or with other intermediate arrays. verbose : bool, optional If True prints to stdout intermediate info. theta_init : int Initial azimuth [degrees] of the first segment, counting from the positive x-axis counterclockwise (irrelevant if n_segments=1). weights: 1d numpy array or list, optional Weights to be applied for a weighted mean. Need to be provided if collapse mode is 'wmean'. cube_sig: numpy ndarray, opt Cube with estimate of significant authentic signals. If provided, this will be subtracted before projecting cube onto reference cube. Returns ------- frame : numpy ndarray, 2d [full_output=False] Median combination of the de-rotated cube. array_out : numpy ndarray, 3d or 4d [full_output=True] Cube of residuals. array_der : numpy ndarray, 3d or 4d [full_output=True] Cube residuals after de-rotation. frame : numpy ndarray, 2d [full_output=True] Median combination of the de-rotated cube. """ # Separating the parameters of the ParamsObject from the optionnal rot_options class_params, rot_options = separate_kwargs_dict(initial_kwargs=all_kwargs, parent_class=PCA_ANNULAR_Params ) # Extracting the object of parameters (if any) algo_params = None if ALGO_KEY in rot_options.keys(): algo_params = rot_options[ALGO_KEY] del rot_options[ALGO_KEY] if algo_params is None: algo_params = PCA_ANNULAR_Params(*all_args, **class_params) # by default, interpolate masked area before derotation if a mask is used if algo_params.radius_int and len(rot_options) == 0: rot_options['mask_val'] = 0 rot_options['ker'] = 1 rot_options['interp_zeros'] = True global start_time start_time = time_ini() if algo_params.left_eigv: if ( (algo_params.cube_ref is not None) or (algo_params.cube_sig is not None) or (algo_params.ncomp == "auto") ): raise NotImplementedError( "left_eigv is not compatible" "with 'cube_ref', 'cube_sig', ncomp='auto'" ) # ADI or ADI+RDI data if algo_params.cube.ndim == 3: add_params = {"start_time": start_time, "full_output": True} func_params = setup_parameters( params_obj=algo_params, fkt=_pca_adi_rdi, **add_params ) res = _pca_adi_rdi(**func_params, **rot_options) cube_out, cube_der, frame = res if algo_params.full_output: return cube_out, cube_der, frame else: return frame # 4D cube, but no mSDI desired elif algo_params.cube.ndim == 4 and algo_params.scale_list is None: nch, nz, ny, nx = algo_params.cube.shape ifs_adi_frames = np.zeros([nch, ny, nx]) if not isinstance(algo_params.ncomp, list): algo_params.ncomp = [algo_params.ncomp] * nch elif len(algo_params.ncomp) != nch: algo_params.ncomp = [algo_params.ncomp] * nch if np.isscalar(algo_params.fwhm): algo_params.fwhm = [algo_params.fwhm] * nch cube_out = [] cube_der = [] # ADI or RDI in each channel for ch in range(nch): if algo_params.cube_ref is not None: if algo_params.cube_ref[ch].ndim != 3: msg = "Ref cube has wrong format for 4d input cube" raise TypeError(msg) cube_ref_tmp = algo_params.cube_ref[ch] else: cube_ref_tmp = algo_params.cube_ref add_params = { "cube": algo_params.cube[ch], "fwhm": algo_params.fwhm[ch], "ncomp": algo_params.ncomp[ch], "full_output": True, "cube_ref": cube_ref_tmp, } func_params = setup_parameters( params_obj=algo_params, fkt=_pca_adi_rdi, **add_params ) res_pca = _pca_adi_rdi(**func_params, **rot_options) cube_out.append(res_pca[0]) cube_der.append(res_pca[1]) ifs_adi_frames[ch] = res_pca[-1] frame = cube_collapse(ifs_adi_frames, mode=algo_params.collapse_ifs) # convert to numpy arrays cube_out = np.array(cube_out) cube_der = np.array(cube_der) if algo_params.full_output: return cube_out, cube_der, frame else: return frame # ADI+mSDI (IFS) datacubes elif algo_params.cube.ndim == 4: global ARRAY ARRAY = algo_params.cube z, n, y_in, x_in = algo_params.cube.shape algo_params.fwhm = int(np.round(np.mean(algo_params.fwhm))) n_annuli = int((y_in / 2 - algo_params.radius_int) / algo_params.asize) if np.array(algo_params.scale_list).ndim > 1: raise ValueError("Scaling factors vector is not 1d") if not algo_params.scale_list.shape[0] == z: raise ValueError("Scaling factors vector has wrong length") if not isinstance(algo_params.ncomp, tuple): msg = "`ncomp` must be a tuple of two integers when " msg += "`cube` is a 4d array" raise TypeError(msg) else: ncomp2 = algo_params.ncomp[1] algo_params.ncomp = algo_params.ncomp[0] if algo_params.verbose: print("First PCA subtraction exploiting the spectral variability") print("{} spectral channels per IFS frame".format(z)) print( "N annuli = {}, mean FWHM = {:.3f}".format( n_annuli, algo_params.fwhm) ) add_params = { "fr": iterable(range(n)), "scal": algo_params.scale_list, "collapse": algo_params.collapse_ifs, } func_params = setup_parameters( params_obj=algo_params, fkt=_pca_sdi_fr, as_list=True, **add_params ) res = pool_map( algo_params.nproc, _pca_sdi_fr, verbose=algo_params.verbose, *func_params, ) residuals_cube_channels = np.array(res) # Exploiting rotational variability if algo_params.verbose: timing(start_time) print("{} ADI frames".format(n)) if ncomp2 is None: if algo_params.verbose: print("Skipping the second PCA subtraction") cube_out = residuals_cube_channels cube_der = cube_derotate( cube_out, algo_params.angle_list, nproc=algo_params.nproc, imlib=algo_params.imlib, interpolation=algo_params.interpolation, **rot_options, ) frame = cube_collapse( cube_der, mode=algo_params.collapse, w=algo_params.weights ) else: if algo_params.verbose: print("Second PCA subtraction exploiting angular variability") add_params = { "cube": residuals_cube_channels, "ncomp": ncomp2, "cube_ref": None, } func_params = setup_parameters( params_obj=algo_params, fkt=_pca_adi_rdi, **add_params ) res = _pca_adi_rdi(**func_params, **rot_options) if algo_params.full_output: cube_out, cube_der, frame = res else: frame = res if algo_params.full_output: return cube_out, cube_der, frame else: return frame else: raise TypeError("Input array is not a 4d or 3d array")
################################################################################ # Functions encapsulating portions of the main algorithm ################################################################################ def _pca_sdi_fr( fr, scal, radius_int, fwhm, asize, n_segments, delta_sep, ncomp, svd_mode, tol, scaling, imlib, interpolation, collapse, ifs_collapse_range, theta_init, ): """Optimized PCA subtraction on a multi-spectral frame (IFS data).""" z, n, y_in, x_in = ARRAY.shape scale_list = check_scal_vector(scal) # rescaled cube, aligning speckles multispec_fr = scwave( ARRAY[:, fr, :, :], scale_list, imlib=imlib, interpolation=interpolation )[0] # Exploiting spectral variability (radial movement) fwhm = int(np.round(np.mean(fwhm))) n_annuli = int((y_in / 2 - radius_int) / asize) if isinstance(n_segments, int): n_segments = [n_segments for _ in range(n_annuli)] elif n_segments == "auto": n_segments = list() n_segments.append(2) # for first annulus n_segments.append(3) # for second annulus ld = 2 * np.tan(360 / 4 / 2) * asize for i in range(2, n_annuli): # rest of annuli radius = i * asize ang = np.rad2deg(2 * np.arctan(ld / (2 * radius))) n_segments.append(int(np.ceil(360 / ang))) cube_res = np.zeros_like(multispec_fr) # shape (z, resc_y, resc_x) if isinstance(delta_sep, tuple): delta_sep_vec = np.linspace(delta_sep[0], delta_sep[1], n_annuli) else: delta_sep_vec = [delta_sep] * n_annuli for ann in range(n_annuli): if ann == n_annuli - 1: inner_radius = radius_int + (ann * asize - 1) else: inner_radius = radius_int + ann * asize ann_center = inner_radius + (asize / 2) indices = get_annulus_segments( multispec_fr[0], inner_radius, asize, n_segments[ann], theta_init ) # Library matrix is created for each segment and scaled if needed for seg in range(n_segments[ann]): yy = indices[seg][0] xx = indices[seg][1] matrix = multispec_fr[:, yy, xx] # shape (z, npx_annsegm) matrix = matrix_scaling(matrix, scaling) for j in range(z): indices_left = _find_indices_sdi( scal, ann_center, j, fwhm, delta_sep_vec[ann] ) matrix_ref = matrix[indices_left] curr_frame = matrix[j] # current frame V = get_eigenvectors( ncomp, matrix_ref, svd_mode, noise_error=tol, debug=False, scaling=scaling, ) transformed = np.dot(curr_frame, V.T) reconstructed = np.dot(transformed.T, V) residuals = curr_frame - reconstructed # return residuals, V.shape[0], matrix_ref.shape[0] cube_res[j, yy, xx] = residuals if ifs_collapse_range == "all": idx_ini = 0 idx_fin = z else: idx_ini = ifs_collapse_range[0] idx_fin = ifs_collapse_range[1] frame_desc = scwave( cube_res[idx_ini:idx_fin], scale_list[idx_ini:idx_fin], full_output=False, inverse=True, y_in=y_in, x_in=x_in, imlib=imlib, interpolation=interpolation, collapse=collapse, ) return frame_desc def _pca_adi_rdi( cube, angle_list, radius_int=0, fwhm=4, asize=2, n_segments=1, delta_rot=1, ncomp=1, svd_mode="lapack", nproc=None, min_frames_lib=2, max_frames_lib=200, tol=1e-1, scaling=None, imlib="vip-fft", interpolation="lanczos4", collapse="median", full_output=False, verbose=1, cube_ref=None, theta_init=0, weights=None, cube_sig=None, left_eigv=False, **rot_options, ): """PCA exploiting angular variability (ADI fashion).""" array = cube if array.ndim != 3: raise TypeError("Input array is not a cube or 3d array") if array.shape[0] != angle_list.shape[0]: raise TypeError("Input vector or parallactic angles has wrong length") n, y, x = array.shape angle_list = check_pa_vector(angle_list) n_annuli = int((y / 2 - radius_int) / asize) if isinstance(delta_rot, tuple): delta_rot = np.linspace(delta_rot[0], delta_rot[1], num=n_annuli) elif np.isscalar(delta_rot): delta_rot = [delta_rot] * n_annuli if isinstance(n_segments, int): n_segments = [n_segments for _ in range(n_annuli)] elif n_segments == "auto": n_segments = list() n_segments.append(2) # for first annulus n_segments.append(3) # for second annulus ld = 2 * np.tan(360 / 4 / 2) * asize for i in range(2, n_annuli): # rest of annuli radius = i * asize ang = np.rad2deg(2 * np.arctan(ld / (2 * radius))) n_segments.append(int(np.ceil(360 / ang))) if verbose: msg = "N annuli = {}, FWHM = {:.3f}" print(msg.format(n_annuli, fwhm)) print("PCA per annulus (or annular sectors):") if nproc is None: # Hyper-threading "duplicates" the cores -> cpu_count/2 nproc = cpu_count() // 2 # The annuli are built, and the corresponding PA thresholds for frame # rejection are calculated (at the center of the annulus) cube_out = np.zeros_like(array) if isinstance(ncomp, list): nncomp = len(ncomp) cube_out = np.zeros([nncomp, array.shape[0], array.shape[1], array.shape[2]]) for ann in range(n_annuli): if isinstance(ncomp, tuple) or isinstance(ncomp, np.ndarray): if len(ncomp) == n_annuli: ncompann = ncomp[ann] else: msg = "If `ncomp` is a tuple, its length must match the number " msg += "of annuli" raise TypeError(msg) else: ncompann = ncomp n_segments_ann = n_segments[ann] res_ann_par = _define_annuli( angle_list, ann, n_annuli, fwhm, radius_int, asize, delta_rot[ann], n_segments_ann, verbose, True, ) pa_thr, inner_radius, ann_center = res_ann_par indices = get_annulus_segments( array[0], inner_radius, asize, n_segments_ann, theta_init ) if left_eigv: indices_out = get_annulus_segments(array[0], inner_radius, asize, n_segments_ann, theta_init, out=True) # Library matrix is created for each segment and scaled if needed for j in range(n_segments_ann): yy = indices[j][0] xx = indices[j][1] matrix_segm = array[:, yy, xx] # shape [nframes x npx_segment] matrix_segm = matrix_scaling(matrix_segm, scaling) if cube_ref is not None: matrix_segm_ref = cube_ref[:, yy, xx] matrix_segm_ref = matrix_scaling(matrix_segm_ref, scaling) else: matrix_segm_ref = None if cube_sig is not None: matrix_sig_segm = cube_sig[:, yy, xx] else: matrix_sig_segm = None if not left_eigv: res = pool_map( nproc, do_pca_patch, matrix_segm, iterable(range(n)), angle_list, fwhm, pa_thr, ann_center, svd_mode, ncompann, min_frames_lib, max_frames_lib, tol, matrix_segm_ref, matrix_sig_segm, ) if isinstance(ncomp, list): nncomp = len(ncomp) residuals = [] for nn in range(nncomp): tmp = np.array([res[i][0][nn] for i in range(n)]) residuals.append(tmp) else: res = np.array(res, dtype=object) residuals = np.array(res[:, 0]) ncomps = res[:, 1] nfrslib = res[:, 2] else: yy_out = indices_out[j][0] xx_out = indices_out[j][1] matrix_out_segm = array[ :, yy_out, xx_out ] # shape [nframes x npx_out_segment] matrix_out_segm = matrix_scaling(matrix_out_segm, scaling) if isinstance(ncomp, list): npc = max(ncomp) else: npc = ncomp V = get_eigenvectors(npc, matrix_out_segm, svd_mode, noise_error=tol, left_eigv=True) if isinstance(ncomp, list): residuals = [] for nn, npc_tmp in enumerate(ncomp): transformed = np.dot(V[:npc_tmp], matrix_segm) reconstructed = np.dot(transformed.T, V[:npc_tmp]) residuals.append(matrix_segm - reconstructed.T) else: transformed = np.dot(V, matrix_segm) reconstructed = np.dot(transformed.T, V) residuals = matrix_segm - reconstructed.T nfrslib = matrix_out_segm.shape[0] if isinstance(ncomp, list): for nn, npc in enumerate(ncomp): for fr in range(n): cube_out[nn, fr][yy, xx] = residuals[nn][fr] else: for fr in range(n): cube_out[fr][yy, xx] = residuals[fr] # number of frames in library printed for each annular quadrant # number of PCs printed for each annular quadrant if verbose == 2 and not isinstance(ncomp, list): descriptive_stats(nfrslib, verbose=verbose, label="\tLIBsize: ") descriptive_stats(ncomps, verbose=verbose, label="\tNum PCs: ") if verbose == 1: print("Done PCA with {} for current annulus".format(svd_mode)) timing(start_time) if isinstance(ncomp, list): cube_der = np.zeros_like(cube_out) frame = [] for nn, npc in enumerate(ncomp): cube_der[nn] = cube_derotate(cube_out[nn], angle_list, nproc=nproc, imlib=imlib, interpolation=interpolation, **rot_options) frame.append(cube_collapse(cube_der[nn], mode=collapse, w=weights)) else: # Cube is derotated according to the parallactic angle and collapsed cube_der = cube_derotate( cube_out, angle_list, nproc=nproc, imlib=imlib, interpolation=interpolation, **rot_options, ) frame = cube_collapse(cube_der, mode=collapse, w=weights) if verbose: print("Done derotating and combining.") timing(start_time) if full_output: return cube_out, cube_der, frame else: return frame def do_pca_patch( matrix, frame, angle_list, fwhm, pa_threshold, ann_center, svd_mode, ncomp, min_frames_lib, max_frames_lib, tol, matrix_ref, matrix_sig_segm, ): """Do the SVD/PCA for each frame patch (small matrix). For each frame, frames to be rejected from the PCA library are found depending on the criterion in field rotation. The library is also truncated on the other end (frames too far in time, which have rotated more) which are more decorrelated, to keep the computational cost lower. This truncation is done on the annuli beyong 10*FWHM radius and the goal is to keep min(num_frames/2, 200) in the library. """ if pa_threshold != 0: indices_left = _find_indices_adi(angle_list, frame, pa_threshold, truncate=True, max_frames=max_frames_lib) msg = "Too few frames left in the PCA library. " msg += "Accepted indices length ({:.0f}) less than {:.0f}. " msg += "Try decreasing either delta_rot or min_frames_lib." try: if matrix_sig_segm is not None: data_ref = matrix[indices_left] - matrix_sig_segm[indices_left] else: data_ref = matrix[indices_left] except IndexError: if matrix_ref is None: raise RuntimeError(msg.format(0, min_frames_lib)) data_ref = None if data_ref.shape[0] < min_frames_lib and matrix_ref is None: raise RuntimeError(msg.format(len(indices_left), min_frames_lib)) else: if matrix_sig_segm is not None: data_ref = matrix - matrix_sig_segm else: data_ref = matrix if matrix_ref is not None: # Stacking the ref and the target ref (pa thresh) libraries if data_ref is not None: data_ref = np.vstack((matrix_ref, data_ref)) else: data_ref = matrix_ref curr_frame = matrix[frame] # current frame if matrix_sig_segm is not None: curr_frame_emp = matrix[frame] - matrix_sig_segm[frame] else: curr_frame_emp = curr_frame if isinstance(ncomp, list): npc = max(ncomp) else: npc = ncomp V = get_eigenvectors(npc, data_ref, svd_mode, noise_error=tol) if isinstance(ncomp, list): residuals = [] for nn, npc_tmp in enumerate(ncomp): transformed = np.dot(curr_frame_emp, V[:npc_tmp].T) reconstructed = np.dot(transformed.T, V[:npc_tmp]) residuals.append(curr_frame - reconstructed) else: transformed = np.dot(curr_frame_emp, V.T) reconstructed = np.dot(transformed.T, V) residuals = curr_frame - reconstructed return residuals, V.shape[0], data_ref.shape[0]