#! /usr/bin/env python
"""
Module with local/smart PCA (annulus or patch-wise in a multi-processing
fashion) model PSF subtraction for ADI, ADI+SDI (IFS) and ADI+RDI datasets.
.. [ABS13]
| Absil et al. 2013
| **Searching for companions down to 2 AU from beta Pictoris using the
L'-band AGPM coronagraph on VLT/NACO**
| *Astronomy & Astrophysics, Volume 559, Issue 1, p. 12*
| `https://arxiv.org/abs/1311.4298
<https://arxiv.org/abs/1311.4298>`_
"""
__author__ = 'Carlos Alberto Gomez Gonzalez, Valentin Christiaens'
__all__ = ['pca_annular']
import numpy as np
from multiprocessing import cpu_count
from ..preproc import (cube_derotate, cube_collapse, check_pa_vector,
check_scal_vector)
from ..preproc import cube_rescaling_wavelengths as scwave
from ..preproc.derotation import _find_indices_adi, _define_annuli
from ..preproc.rescaling import _find_indices_sdi
from ..config import time_ini, timing
from ..config.utils_conf import pool_map, iterable
from ..var import get_annulus_segments, matrix_scaling
from ..stats import descriptive_stats
from .svd import get_eigenvectors
[docs]def pca_annular(cube, angle_list, cube_ref=None, scale_list=None, radius_int=0,
fwhm=4, asize=4, n_segments=1, delta_rot=(0.1, 1),
delta_sep=(0.1, 1), ncomp=1, svd_mode='lapack', nproc=1,
min_frames_lib=2, max_frames_lib=200, tol=1e-1, scaling=None,
imlib='vip-fft', interpolation='lanczos4', collapse='median',
collapse_ifs='mean', ifs_collapse_range='all', theta_init=0,
weights=None, cube_sig=None, full_output=False, verbose=True,
**rot_options):
""" PCA model PSF subtraction for ADI, ADI+RDI or ADI+mSDI (IFS) data. The
PCA model is computed locally in each annulus (or annular sectors according
to ``n_segments``). For each sector we discard reference frames taking into
account a parallactic angle threshold (``delta_rot``) and optionally a
radial movement threshold (``delta_sep``) for 4d cubes.
For ADI+RDI data, it computes the principal components from the reference
library/cube, forcing pixel-wise temporal standardization. The number of
principal components can be automatically adjusted by the algorithm by
minimizing the residuals inside each patch/region.
References: [AMA12]_ for PCA-ADI; [ABS13]_ for PCA-ADI in concentric annuli
considering a parallactic angle threshold; [CHR19]_ for PCA-ASDI and
PCA-SADI in one or two steps.
Parameters
----------
cube : numpy ndarray, 3d or 4d
Input cube.
angle_list : numpy ndarray, 1d
Corresponding parallactic angle for each frame.
cube_ref : numpy ndarray, 3d, optional
Reference library cube. For Reference Star Differential Imaging.
scale_list : numpy ndarray, 1d, optional
If provided, triggers mSDI reduction. These should be the scaling
factors used to re-scale the spectral channels and align the speckles
in case of IFS data (ADI+mSDI cube). Usually, these can be approximated
by the last channel wavelength divided by the other wavelengths in the
cube (more thorough approaches can be used to get the scaling factors,
e.g. with ``vip_hci.preproc.find_scal_vector``).
radius_int : int, optional
The radius of the innermost annulus. By default is 0, if >0 then the
central circular region is discarded.
fwhm : float, optional
Size of the FHWM in pixels. Default is 4.
asize : float, optional
The size of the annuli, in pixels.
n_segments : int or list of ints or 'auto', optional
The number of segments for each annulus. When a single integer is given
it is used for all annuli. When set to 'auto', the number of segments is
automatically determined for every annulus, based on the annulus width.
delta_rot : float or tuple of floats, optional
Factor for adjusting the parallactic angle threshold, expressed in
FWHM. Default is 1 (excludes 1 FHWM on each side of the considered
frame). If a tuple of two floats is provided, they are used as the lower
and upper intervals for the threshold (grows linearly as a function of
the separation).
delta_sep : float or tuple of floats, optional
The threshold separation in terms of the mean FWHM (for ADI+mSDI data).
If a tuple of two values is provided, they are used as the lower and
upper intervals for the threshold (grows as a function of the
separation).
ncomp : 'auto', int, tuple, 1d numpy array or tuple, optional
How many PCs are used as a lower-dimensional subspace to project the
target (sectors of) frames. Depends on the dimensionality of `cube`.
* ADI and ADI+RDI (``cube`` is a 3d array): if a single integer is
provided, then the same number of PCs will be subtracted at each
separation (annulus). If a tuple is provided, then a different number
of PCs will be used for each annulus (starting with the innermost
one). If ``ncomp`` is set to ``auto`` then the number of PCs are
calculated for each region/patch automatically.
* ADI or ADI+RDI (``cube`` is a 4d array): same input format allowed as
above. If ncomp is a list with the same length as the number of
channels, each element of the list will be used as ``ncomp`` value
(whether int, float or tuple) for each spectral channel.
* ADI+mSDI case: ``ncomp`` must be a tuple (two integers) with the
number of PCs obtained from each multi-spectral frame (for each
sector) and the number of PCs used in the second PCA stage (ADI
fashion, using the residuals of the first stage). If None then the
second PCA stage is skipped and the residuals are de-rotated and
combined.
svd_mode : {'lapack', 'arpack', 'eigen', 'randsvd', 'cupy', 'eigencupy',
'randcupy', 'pytorch', 'eigenpytorch', 'randpytorch'}, str optional
Switch for the SVD method/library to be used.
* ``lapack``: uses the LAPACK linear algebra library through Numpy
and it is the most conventional way of computing the SVD
(deterministic result computed on CPU).
* ``arpack``: uses the ARPACK Fortran libraries accessible through
Scipy (computation on CPU).
* ``eigen``: computes the singular vectors through the
eigendecomposition of the covariance M.M' (computation on CPU).
* ``randsvd``: uses the randomized_svd algorithm implemented in
Sklearn (computation on CPU), proposed in [HAL09]_.
* ``cupy``: uses the Cupy library for GPU computation of the SVD as in
the LAPACK version. `
* ``eigencupy``: offers the same method as with the ``eigen`` option
but on GPU (through Cupy).
* ``randcupy``: is an adaptation of the randomized_svd algorithm,
where all the computations are done on a GPU (through Cupy). `
* ``pytorch``: uses the Pytorch library for GPU computation of the SVD.
* ``eigenpytorch``: offers the same method as with the ``eigen``
option but on GPU (through Pytorch).
* ``randpytorch``: is an adaptation of the randomized_svd algorithm,
where all the linear algebra computations are done on a GPU
(through Pytorch).
nproc : None or int, optional
Number of processes for parallel computing. If None the number of
processes will be set to (cpu_count()/2).
min_frames_lib : int, optional
Minimum number of frames in the PCA reference library.
max_frames_lib : int, optional
Maximum number of frames in the PCA reference library. The more
distant/decorrelated frames are removed from the library.
tol : float, optional
Stopping criterion for choosing the number of PCs when ``ncomp``
is None. Lower values will lead to smaller residuals and more PCs.
scaling : {None, "temp-mean", spat-mean", "temp-standard",
"spat-standard"}, None or str optional
Pixel-wise scaling mode using ``sklearn.preprocessing.scale``
function. If set to None, the input matrix is left untouched. Otherwise:
* ``temp-mean``: temporal px-wise mean is subtracted.
* ``spat-mean``: spatial mean is subtracted.
* ``temp-standard``: temporal mean centering plus scaling pixel values
to unit variance. HIGHLY RECOMMENDED FOR ASDI AND RDI CASES!
* ``spat-standard``: spatial mean centering plus scaling pixel values
to unit variance.
imlib : str, optional
See the documentation of the ``vip_hci.preproc.frame_rotate`` function.
interpolation : str, optional
See the documentation of the ``vip_hci.preproc.frame_rotate`` function.
collapse : {'median', 'mean', 'sum', 'trimmean'}, str optional
Sets the way of collapsing the frames for producing a final image.
collapse_ifs : {'median', 'mean', 'sum', 'trimmean', 'absmean'}, str opt
Sets how spectral residual frames should be combined to produce an
mSDI image.
ifs_collapse_range: str 'all' or tuple of 2 int
If a tuple, it should contain the first and last channels where the mSDI
residual channels will be collapsed (by default collapses all channels).
full_output: boolean, optional
Whether to return the final median combined image only or with other
intermediate arrays.
verbose : bool, optional
If True prints to stdout intermediate info.
theta_init : int
Initial azimuth [degrees] of the first segment, counting from the
positive x-axis counterclockwise (irrelevant if n_segments=1).
weights: 1d numpy array or list, optional
Weights to be applied for a weighted mean. Need to be provided if
collapse mode is 'wmean'.
cube_sig: numpy ndarray, opt
Cube with estimate of significant authentic signals. If provided, this
will be subtracted before projecting cube onto reference cube.
rot_options: dictionary, optional
Dictionary with optional keyword values for "border_mode", "mask_val",
"edge_blend", "interp_zeros", "ker" (see documentation of
``vip_hci.preproc.frame_rotate``)
Returns
-------
frame : numpy ndarray, 2d
[full_output=False] Median combination of the de-rotated cube.
array_out : numpy ndarray, 3d or 4d
[full_output=True] Cube of residuals.
array_der : numpy ndarray, 3d or 4d
[full_output=True] Cube residuals after de-rotation.
frame : numpy ndarray, 2d
[full_output=True] Median combination of the de-rotated cube.
"""
if verbose:
global start_time
start_time = time_ini()
# ADI or ADI+RDI data
if cube.ndim == 3:
res = _pca_adi_rdi(cube, angle_list, radius_int, fwhm, asize,
n_segments, delta_rot, ncomp, svd_mode, nproc,
min_frames_lib, max_frames_lib, tol, scaling, imlib,
interpolation, collapse, True, verbose, cube_ref,
theta_init, weights, cube_sig, **rot_options)
cube_out, cube_der, frame = res
if full_output:
return cube_out, cube_der, frame
else:
return frame
# 4D cube, but no mSDI desired
elif cube.ndim == 4 and scale_list is None:
nch, nz, ny, nx = cube.shape
ifs_adi_frames = np.zeros([nch, ny, nx])
if not isinstance(ncomp, list):
ncomp = [ncomp]*nch
elif isinstance(ncomp, list) and len(ncomp) != nch:
msg = "If ncomp is a list, in the case of a 4d input cube without "
msg += "input scale_list, it should have the same length as the "
msg += "first dimension of the cube."
raise TypeError()
if np.isscalar(fwhm):
fwhm = [fwhm]*nch
cube_out = []
cube_der = []
# ADI or RDI in each channel
for ch in range(nch):
if cube_ref is not None:
if cube_ref[ch].ndim != 3:
msg = "Ref cube has wrong format for 4d input cube"
raise TypeError(msg)
cube_ref_tmp = cube_ref[ch]
else:
cube_ref_tmp = cube_ref
res_pca = _pca_adi_rdi(cube[ch], angle_list, radius_int, fwhm[ch],
asize, n_segments, delta_rot, ncomp[ch],
svd_mode, nproc, min_frames_lib,
max_frames_lib, tol, scaling, imlib,
interpolation, collapse, True, verbose,
cube_ref_tmp, theta_init, weights, cube_sig,
**rot_options)
cube_out.append(res_pca[0])
cube_der.append(res_pca[1])
ifs_adi_frames[ch] = res_pca[-1]
frame = cube_collapse(ifs_adi_frames, mode=collapse_ifs)
# convert to numpy arrays
cube_out = np.array(cube_out)
cube_der = np.array(cube_der)
if full_output:
return cube_out, cube_der, frame
else:
return frame
# ADI+mSDI (IFS) datacubes
elif cube.ndim == 4:
global ARRAY
ARRAY = cube
z, n, y_in, x_in = cube.shape
fwhm = int(np.round(np.mean(fwhm)))
n_annuli = int((y_in / 2 - radius_int) / asize)
if np.array(scale_list).ndim > 1:
raise ValueError('Scaling factors vector is not 1d')
if not scale_list.shape[0] == z:
raise ValueError('Scaling factors vector has wrong length')
if not isinstance(ncomp, tuple):
raise TypeError("`ncomp` must be a tuple of two integers when "
"`cube` is a 4d array")
else:
ncomp2 = ncomp[1]
ncomp = ncomp[0]
if verbose:
print('First PCA subtraction exploiting the spectral variability')
print('{} spectral channels per IFS frame'.format(z))
print('N annuli = {}, mean FWHM = {:.3f}'.format(n_annuli, fwhm))
res = pool_map(nproc, _pca_sdi_fr, iterable(range(n)), scale_list,
radius_int, fwhm, asize, n_segments, delta_sep, ncomp,
svd_mode, tol, scaling, imlib, interpolation,
collapse_ifs, ifs_collapse_range, theta_init,
verbose=verbose)
residuals_cube_channels = np.array(res)
# Exploiting rotational variability
if verbose:
timing(start_time)
print('{} ADI frames'.format(n))
if ncomp2 is None:
if verbose:
print('Skipping the second PCA subtraction')
cube_out = residuals_cube_channels
cube_der = cube_derotate(cube_out, angle_list, nproc=nproc,
imlib=imlib, interpolation=interpolation,
**rot_options)
frame = cube_collapse(cube_der, mode=collapse, w=weights)
else:
if verbose:
print('Second PCA subtraction exploiting the angular '
'variability')
res = _pca_adi_rdi(residuals_cube_channels, angle_list, radius_int,
fwhm, asize, n_segments, delta_rot, ncomp2,
svd_mode, nproc, min_frames_lib, max_frames_lib,
tol, scaling, imlib, interpolation, collapse,
full_output, verbose, None, theta_init, weights,
cube_sig, **rot_options)
if full_output:
cube_out, cube_der, frame = res
else:
frame = res
if full_output:
return cube_out, cube_der, frame
else:
return frame
else:
raise TypeError('Input array is not a 4d or 3d array')
################################################################################
# Functions encapsulating portions of the main algorithm
################################################################################
def _pca_sdi_fr(fr, scal, radius_int, fwhm, asize, n_segments, delta_sep, ncomp,
svd_mode, tol, scaling, imlib, interpolation, collapse,
ifs_collapse_range, theta_init):
""" Optimized PCA subtraction on a multi-spectral frame (IFS data).
"""
z, n, y_in, x_in = ARRAY.shape
scale_list = check_scal_vector(scal)
# rescaled cube, aligning speckles
multispec_fr = scwave(ARRAY[:, fr, :, :], scale_list,
imlib=imlib, interpolation=interpolation)[0]
# Exploiting spectral variability (radial movement)
fwhm = int(np.round(np.mean(fwhm)))
n_annuli = int((y_in / 2 - radius_int) / asize)
if isinstance(n_segments, int):
n_segments = [n_segments for _ in range(n_annuli)]
elif n_segments == 'auto':
n_segments = list()
n_segments.append(2) # for first annulus
n_segments.append(3) # for second annulus
ld = 2 * np.tan(360 / 4 / 2) * asize
for i in range(2, n_annuli): # rest of annuli
radius = i * asize
ang = np.rad2deg(2 * np.arctan(ld / (2 * radius)))
n_segments.append(int(np.ceil(360 / ang)))
cube_res = np.zeros_like(multispec_fr) # shape (z, resc_y, resc_x)
if isinstance(delta_sep, tuple):
delta_sep_vec = np.linspace(delta_sep[0], delta_sep[1], n_annuli)
else:
delta_sep_vec = [delta_sep] * n_annuli
for ann in range(n_annuli):
if ann == n_annuli - 1:
inner_radius = radius_int + (ann * asize - 1)
else:
inner_radius = radius_int + ann * asize
ann_center = inner_radius + (asize / 2)
indices = get_annulus_segments(multispec_fr[0], inner_radius, asize,
n_segments[ann], theta_init)
# Library matrix is created for each segment and scaled if needed
for seg in range(n_segments[ann]):
yy = indices[seg][0]
xx = indices[seg][1]
matrix = multispec_fr[:, yy, xx] # shape (z, npx_annsegm)
matrix = matrix_scaling(matrix, scaling)
for j in range(z):
indices_left = _find_indices_sdi(scal, ann_center, j,
fwhm, delta_sep_vec[ann])
matrix_ref = matrix[indices_left]
curr_frame = matrix[j] # current frame
V = get_eigenvectors(ncomp, matrix_ref, svd_mode,
noise_error=tol, debug=False,
scaling=scaling)
transformed = np.dot(curr_frame, V.T)
reconstructed = np.dot(transformed.T, V)
residuals = curr_frame - reconstructed
# return residuals, V.shape[0], matrix_ref.shape[0]
cube_res[j, yy, xx] = residuals
if ifs_collapse_range == 'all':
idx_ini = 0
idx_fin = z
else:
idx_ini = ifs_collapse_range[0]
idx_fin = ifs_collapse_range[1]
frame_desc = scwave(cube_res[idx_ini:idx_fin], scale_list[idx_ini:idx_fin],
full_output=False, inverse=True,
y_in=y_in, x_in=x_in, imlib=imlib,
interpolation=interpolation, collapse=collapse)
return frame_desc
def _pca_adi_rdi(cube, angle_list, radius_int=0, fwhm=4, asize=2, n_segments=1,
delta_rot=1, ncomp=1, svd_mode='lapack', nproc=None,
min_frames_lib=2, max_frames_lib=200, tol=1e-1, scaling=None,
imlib='vip-fft', interpolation='lanczos4', collapse='median',
full_output=False, verbose=1, cube_ref=None, theta_init=0,
weights=None, cube_sig=None, **rot_options):
""" PCA exploiting angular variability (ADI fashion).
"""
array = cube
if array.ndim != 3:
raise TypeError('Input array is not a cube or 3d array')
if array.shape[0] != angle_list.shape[0]:
raise TypeError('Input vector or parallactic angles has wrong length')
n, y, _ = array.shape
angle_list = check_pa_vector(angle_list)
n_annuli = int((y / 2 - radius_int) / asize)
if isinstance(delta_rot, tuple):
delta_rot = np.linspace(delta_rot[0], delta_rot[1], num=n_annuli)
elif np.isscalar(delta_rot):
delta_rot = [delta_rot] * n_annuli
if isinstance(n_segments, int):
n_segments = [n_segments for _ in range(n_annuli)]
elif n_segments == 'auto':
n_segments = list()
n_segments.append(2) # for first annulus
n_segments.append(3) # for second annulus
ld = 2 * np.tan(360 / 4 / 2) * asize
for i in range(2, n_annuli): # rest of annuli
radius = i * asize
ang = np.rad2deg(2 * np.arctan(ld / (2 * radius)))
n_segments.append(int(np.ceil(360 / ang)))
if verbose:
msg = 'N annuli = {}, FWHM = {:.3f}'
print(msg.format(n_annuli, fwhm))
print('PCA per annulus (or annular sectors):')
if nproc is None: # Hyper-threading "duplicates" the cores -> cpu_count/2
nproc = cpu_count() // 2
# The annuli are built, and the corresponding PA thresholds for frame
# rejection are calculated (at the center of the annulus)
cube_out = np.zeros_like(array)
for ann in range(n_annuli):
if isinstance(ncomp, tuple) or isinstance(ncomp, np.ndarray):
if len(ncomp) == n_annuli:
ncompann = ncomp[ann]
else:
raise TypeError('If `ncomp` is a tuple, it must match the '
'number of annuli')
else:
ncompann = ncomp
n_segments_ann = n_segments[ann]
res_ann_par = _define_annuli(angle_list, ann, n_annuli, fwhm,
radius_int, asize, delta_rot[ann],
n_segments_ann, verbose, True)
pa_thr, inner_radius, ann_center = res_ann_par
indices = get_annulus_segments(array[0], inner_radius, asize,
n_segments_ann, theta_init)
# Library matrix is created for each segment and scaled if needed
for j in range(n_segments_ann):
yy = indices[j][0]
xx = indices[j][1]
matrix_segm = array[:, yy, xx] # shape [nframes x npx_segment]
matrix_segm = matrix_scaling(matrix_segm, scaling)
if cube_ref is not None:
matrix_segm_ref = cube_ref[:, yy, xx]
matrix_segm_ref = matrix_scaling(matrix_segm_ref, scaling)
else:
matrix_segm_ref = None
if cube_sig is not None:
matrix_sig_segm = cube_sig[:, yy, xx]
else:
matrix_sig_segm = None
res = pool_map(nproc, do_pca_patch, matrix_segm, iterable(range(n)),
angle_list, fwhm, pa_thr, ann_center, svd_mode,
ncompann, min_frames_lib, max_frames_lib, tol,
matrix_segm_ref, matrix_sig_segm)
res = np.array(res, dtype=object)
residuals = np.array(res[:, 0])
ncomps = res[:, 1]
nfrslib = res[:, 2]
for fr in range(n):
cube_out[fr][yy, xx] = residuals[fr]
# number of frames in library printed for each annular quadrant
# number of PCs printed for each annular quadrant
if verbose == 2:
descriptive_stats(nfrslib, verbose=verbose,
label='\tLIBsize: ')
descriptive_stats(ncomps, verbose=verbose, label='\tNum PCs: ')
if verbose == 1:
print('Done PCA with {} for current annulus'.format(svd_mode))
timing(start_time)
# Cube is derotated according to the parallactic angle and collapsed
cube_der = cube_derotate(cube_out, angle_list, nproc=nproc, imlib=imlib,
interpolation=interpolation, **rot_options)
frame = cube_collapse(cube_der, mode=collapse, w=weights)
if verbose:
print('Done derotating and combining.')
timing(start_time)
if full_output:
return cube_out, cube_der, frame
else:
return frame
def do_pca_patch(matrix, frame, angle_list, fwhm, pa_threshold, ann_center,
svd_mode, ncomp, min_frames_lib, max_frames_lib, tol,
matrix_ref, matrix_sig_segm):
""" Does the SVD/PCA for each frame patch (small matrix). For each frame we
find the frames to be rejected depending on the amount of rotation. The
library is also truncated on the other end (frames too far or which have
rotated more) which are more decorrelated to keep the computational cost
lower. This truncation is done on the annuli after 10*FWHM and the goal is
to keep min(num_frames/2, 200) in the library.
"""
if pa_threshold != 0:
# if ann_center > fwhm*10:
indices_left = _find_indices_adi(angle_list, frame, pa_threshold,
truncate=True,
max_frames=max_frames_lib)
# else:
# indices_left = _find_indices_adi(angle_list, frame,
# pa_threshold, truncate=False)
msg = 'Too few frames left in the PCA library. '
msg += 'Accepted indices length ({:.0f}) less than {:.0f}. '
msg += 'Try decreasing either delta_rot or min_frames_lib.'
try:
if matrix_sig_segm is not None:
data_ref = matrix[indices_left]-matrix_sig_segm[indices_left]
else:
data_ref = matrix[indices_left]
except IndexError:
if matrix_ref is None:
raise RuntimeError(msg.format(0, min_frames_lib))
data_ref = None
if data_ref.shape[0] < min_frames_lib and matrix_ref is None:
raise RuntimeError(msg.format(len(indices_left), min_frames_lib))
else:
data_ref = None
if matrix_ref is not None:
#data_ref = None
# if matrix_ref is not None:
# Stacking the ref and the target ref (pa thresh) libraries
if data_ref is not None:
data_ref = np.vstack((matrix_ref, data_ref))
else:
data_ref = matrix_ref
elif pa_threshold == 0:
if matrix_sig_segm is not None:
data_ref = matrix-matrix_sig_segm
else:
data_ref = matrix
if matrix_ref is not None and matrix_sig_segm is None:
# Stacking the ref and the target (pa thresh) libraries
if data_ref is not None:
data_ref = np.vstack((matrix_ref, data_ref))
else:
data_ref = matrix_ref
curr_frame = matrix[frame] # current frame
if matrix_sig_segm is not None:
curr_frame_emp = matrix[frame]-matrix_sig_segm[frame]
else:
curr_frame_emp = curr_frame
V = get_eigenvectors(ncomp, data_ref, svd_mode, noise_error=tol)
transformed = np.dot(curr_frame_emp, V.T)
reconstructed = np.dot(transformed.T, V)
residuals = curr_frame - reconstructed
return residuals, V.shape[0], data_ref.shape[0]