#! /usr/bin/env python
"""
Module with helping functions for PCA.
"""
__author__ = 'Carlos Alberto Gomez Gonzalez'
__all__ = ['pca_annulus',
'pca_grid']
import numpy as np
from sklearn.decomposition import IncrementalPCA
from pandas import DataFrame
from skimage.draw import disk
from matplotlib import pyplot as plt
from ..fits import open_fits
from ..preproc import cube_rescaling_wavelengths as scwave
from .svd import svd_wrapper
from ..preproc import cube_derotate, cube_collapse, check_pa_vector
from ..config import timing, time_ini, check_array, get_available_memory
from ..config.utils_conf import vip_figsize, vip_figdpi
from ..var import frame_center, dist, prepare_matrix, reshape_matrix, get_circle
[docs]
def pca_grid(cube, angle_list, fwhm=None, range_pcs=None, source_xy=None,
cube_ref=None, mode='fullfr', annulus_width=20, svd_mode='lapack',
scaling=None, mask_center_px=None, fmerit='mean',
collapse='median', ifs_collapse_range='all', verbose=True,
full_output=False, debug=False, plot=True, save_plot=None,
start_time=None, scale_list=None, initial_4dshape=None,
weights=None, exclude_negative_lobes=False, **rot_options):
"""
Compute a grid, depending on ``range_pcs``, of residual PCA frames out of a
3d ADI cube (or a reference cube). If ``source_xy`` is provided, the number
of principal components are optimized by measuring the S/N at this location
on the frame (ADI, RDI). The metric used, set by ``fmerit``, could be the
given pixel's S/N, the maximum S/N in a FWHM circular aperture centered on
the given coordinates or the mean S/N in the same circular aperture.
Parameters
----------
cube : numpy ndarray, 3d
Input cube.
angle_list : numpy ndarray, 1d
Corresponding parallactic angle for each frame.
fwhm : None or float, optional
Size of the FWHM in pixels, used for computing S/Ns when ``source_xy``
is passed.
range_pcs : None or tuple or list, optional
The interval of PCs to be tried. If ``range_pcs`` is a tuple entered as
``(PC_INI, PC_MAX)`` a sequential grid will be evaluated between
``PC_INI`` and ``PC_MAX`` with step of 1. If ``range_pcs`` is a tuple
entered as ``(PC_INI, PC_MAX, STEP]`` a grid will be evaluated between
``PC_INI`` and ``PC_MAX`` with the given ``STEP``. If ``range_pcs`` is
None, ``PC_INI=1``, ``PC_MAX=n_frames-1`` and ``STEP=1``, which will
result in longer running time. If ``range_pcs`` is a list, it should
correspond to the number of PCs to be tested.
source_xy : None or tuple of floats
X and Y coordinates of the pixel where the source is located and whose
SNR is going to be maximized.
cube_ref : numpy ndarray, 3d, optional
Reference library cube. For Reference Star Differential Imaging.
mode : {'fullfr', 'annular'}, optional
Mode for PCA processing (full-frame or just in an annulus). There is a
catch: the optimal number of PCs in full-frame may not coincide with the
one in annular mode. This is due to the fact that the annulus matrix is
smaller (less noisy, probably not containing the central star) and also
its intrinsic rank (smaller that in the full frame case).
annulus_width : float, optional
Width in pixels of the annulus in the case of the "annular" mode.
svd_mode : {'lapack', 'arpack', 'eigen', 'randsvd', 'cupy', 'eigencupy',
'randcupy', 'pytorch', 'eigenpytorch', 'randpytorch'}, str optional
Switch for the SVD method/library to be used.
* ``lapack``: uses the LAPACK linear algebra library through Numpy
and it is the most conventional way of computing the SVD
(deterministic result computed on CPU).
* ``arpack``: uses the ARPACK Fortran libraries accessible through
Scipy (computation on CPU).
* ``eigen``: computes the singular vectors through the
eigendecomposition of the covariance M.M' (computation on CPU).
* ``randsvd``: uses the randomized_svd algorithm implemented in
Sklearn (computation on CPU), proposed in [HAL09]_.
* ``cupy``: uses the Cupy library for GPU computation of the SVD as in
the LAPACK version. `
* ``eigencupy``: offers the same method as with the ``eigen`` option
but on GPU (through Cupy).
* ``randcupy``: is an adaptation of the randomized_svd algorithm,
where all the computations are done on a GPU (through Cupy). `
* ``pytorch``: uses the Pytorch library for GPU computation of the SVD.
* ``eigenpytorch``: offers the same method as with the ``eigen``
option but on GPU (through Pytorch).
* ``randpytorch``: is an adaptation of the randomized_svd algorithm,
where all the linear algebra computations are done on a GPU
(through Pytorch).
scaling : {None, "temp-mean", spat-mean", "temp-standard",
"spat-standard"}, None or str optional
Pixel-wise scaling mode using ``sklearn.preprocessing.scale``
function. If set to None, the input matrix is left untouched. Otherwise:
* ``temp-mean``: temporal px-wise mean is subtracted.
* ``spat-mean``: spatial mean is subtracted.
* ``temp-standard``: temporal mean centering plus scaling pixel values
to unit variance (temporally).
* ``spat-standard``: spatial mean centering plus scaling pixel values
to unit variance (spatially).
DISCLAIMER: Using ``temp-mean`` or ``temp-standard`` scaling can improve
the speckle subtraction for ASDI or (A)RDI reductions. Nonetheless, this
involves a sort of c-ADI preprocessing, which (i) can be dangerous for
datasets with low amount of rotation (strong self-subtraction), and (ii)
should probably be referred to as ARDI (i.e. not RDI stricto sensu).
mask_center_px : None or int, optional
If None, no masking is done. If an integer > 1 then this value is the
radius of the circular mask.
fmerit : {'px', 'max', 'mean'}
The function of merit to be maximized. 'px' is *source_xy* pixel's SNR,
'max' the maximum SNR in a FWHM circular aperture centered on
``source_xy`` and 'mean' is the mean SNR in the same circular aperture.
collapse : {'median', 'mean', 'sum', 'trimmean'}, str optional
Sets the way of collapsing the frames for producing a final image.
ifs_collapse_range: str 'all' or tuple of 2 int
If a tuple, it should contain the first and last channels where the mSDI
residual channels will be collapsed (by default collapses all channels).
verbose : bool, optional
If True prints intermediate info and timing.
full_output : bool, optional
If True it returns the optimal number of PCs, the final PCA frame for
the optimal PCs and a cube with all the final frames for each number
of PC that was tried.
debug : bool, bool optional
Whether to print debug information or not.
plot : bool, optional
Whether to plot the SNR and flux as functions of PCs and final PCA
frame or not.
save_plot: string
If provided, the pc optimization plot will be saved to that path.
start_time : None or datetime.datetime, optional
Used when embedding this function in the main ``pca`` function. The
object datetime.datetime is the global starting time. If None, it
initiates its own counter.
scale_list : None or numpy ndarray, optional
Scaling factors in case of IFS data (ADI+mSDI cube). They will be used
to descale the spectral cubes and obtain the right residual frames,
assuming ``cube`` is a 4d ADI+mSDI cube turned into 3d.
initial_4dshape : None or tuple, optional
Shape of the initial ADI+mSDI cube.
weights: 1d numpy array or list, optional
Weights to be applied for a weighted mean. Need to be provided if
collapse mode is 'wmean'.
exclude_negative_lobes : bool, opt
Whether to include the adjacent aperture lobes to the tested location
or not. Can be set to True if the image shows significant neg lobes.
rot_options: dictionary, optional
Dictionary with optional keyword values for "nproc", "imlib",
"interpolation", "border_mode", "mask_val", "edge_blend",
"interp_zeros", "ker" (see documentation of
``vip_hci.preproc.frame_rotate``)
Returns
-------
cubeout : numpy ndarray
3D array with the residuals frames.
pclist : list
[full_output=True, source_xy is None] The list of PCs at which the
residual frames are computed.
finalfr : numpy ndarray
[source_xy is not None] Residual frame with the highest S/N for
``source_xy``.
df : pandas Dataframe
[source_xy is not None] Dataframe with the pcs, measured fluxed and
S/Ns for ``source_xy``.
opt_npc : int
[source_xy is not None] Optimal number of PCs for ``source_xy``.
"""
from ..metrics import snr, frame_report
def truncate_svd_get_finframe(matrix, angle_list, ncomp, V):
""" Projection, subtraction, derotation plus combination in one frame.
Only for full-frame"""
transformed = np.dot(V[:ncomp], matrix.T)
reconstructed = np.dot(transformed.T, V[:ncomp])
residuals = matrix - reconstructed
frsize = int(np.sqrt(matrix.shape[1])) # only for square frames
residuals_res = reshape_matrix(residuals, frsize, frsize)
# For the case of ADI+mSDI data (assuming crop_ifs=True), we descale
# and collapse each spectral residuals cube
if scale_list is not None and initial_4dshape is not None:
print("Descaling the spectral channels and obtaining a final frame")
z, n_adi, y_in, x_in = initial_4dshape
residuals_reshaped = np.zeros((n_adi, y_in, y_in))
if ifs_collapse_range == 'all':
idx_ini = 0
idx_fin = z
else:
idx_ini = ifs_collapse_range[0]
idx_fin = ifs_collapse_range[1]
for i in range(n_adi):
frame_i = scwave(residuals_res[i*z+idx_ini:i*z+idx_fin, :, :],
scale_list[idx_ini:idx_fin], full_output=False,
inverse=True, y_in=y_in, x_in=x_in,
collapse=collapse)
residuals_reshaped[i] = frame_i
else:
residuals_reshaped = residuals_res
residuals_res_der = cube_derotate(residuals_reshaped, angle_list,
**rot_options)
res_frame = cube_collapse(residuals_res_der, mode=collapse, w=weights)
return res_frame
def truncate_svd_get_finframe_ann(matrix, indices, angle_list, ncomp, V):
""" Projection, subtraction, derotation plus combination in one frame.
Only for annular mode"""
transformed = np.dot(V[:ncomp], matrix.T)
reconstructed = np.dot(transformed.T, V[:ncomp])
residuals_ann = matrix - reconstructed
residuals_res = np.zeros_like(cube)
residuals_res[:, indices[0], indices[1]] = residuals_ann
residuals_res_der = cube_derotate(residuals_res, angle_list,
**rot_options)
res_frame = cube_collapse(residuals_res_der, mode=collapse, w=weights)
return res_frame
def get_snr(frame, y, x, fwhm, fmerit):
"""
"""
if fmerit == 'max':
yy, xx = disk((y, x), fwhm / 2.)
res = [snr(frame, (x_, y_), fwhm, plot=False, verbose=False,
exclude_negative_lobes=exclude_negative_lobes,
full_output=True)
for y_, x_ in zip(yy, xx)]
snr_pixels = np.array(res, dtype=object)[:, -1]
fluxes = np.array(res, dtype=object)[:, 2]
argm = np.argmax(snr_pixels)
# integrated fluxes for the max snr
return np.max(snr_pixels), fluxes[argm]
elif fmerit == 'px':
res = snr(frame, (x, y), fwhm, plot=False, verbose=False,
exclude_negative_lobes=exclude_negative_lobes,
full_output=True)
snrpx = res[-1]
fluxpx = np.array(res, dtype=object)[2]
# integrated fluxes for the given px
return snrpx, fluxpx
elif fmerit == 'mean':
yy, xx = disk((y, x), fwhm / 2.)
res = [snr(frame, (x_, y_), fwhm, plot=False, verbose=False,
exclude_negative_lobes=exclude_negative_lobes,
full_output=True) for y_, x_
in zip(yy, xx)]
snr_pixels = np.array(res, dtype=object)[:, -1]
fluxes = np.array(res, dtype=object)[:, 2]
# mean of the integrated fluxes (shifting the aperture)
return np.mean(snr_pixels), np.mean(fluxes)
# --------------------------------------------------------------------------
check_array(cube, dim=3, msg='cube')
if start_time is None:
start_time = time_ini(verbose)
n = cube.shape[0]
if source_xy is not None:
if fwhm is None:
raise ValueError('if source_xy is provided, so should fwhm')
x, y = source_xy
else:
x = None
y = None
if isinstance(range_pcs, list):
pclist = range_pcs
pcmax = max(pclist)
else:
if range_pcs is None:
pcmin = 1
pcmax = n - 1
step = 1
elif len(range_pcs) == 2:
pcmin, pcmax = range_pcs
pcmax = min(pcmax, n)
step = 1
elif len(range_pcs) == 3:
pcmin, pcmax, step = range_pcs
pcmax = min(pcmax, n)
else:
raise TypeError('`range_pcs` must be None or a tuple, corresponding'
'to (PC_INI, PC_MAX) or (PC_INI, PC_MAX, STEP)')
pclist = list(range(pcmin, pcmax+1, step))
# Getting `pcmax` principal components once
if mode == 'fullfr':
matrix = prepare_matrix(cube, scaling, mask_center_px, verbose=False)
if cube_ref is not None:
ref_lib = prepare_matrix(cube_ref, scaling, mask_center_px,
verbose=False)
else:
ref_lib = matrix
elif mode == 'annular':
y_cent, x_cent = frame_center(cube[0])
ann_radius = dist(y_cent, x_cent, y, x)
inrad = int(ann_radius - annulus_width / 2.)
outrad = int(ann_radius + annulus_width / 2.)
matrix, annind = prepare_matrix(cube, scaling, None, mode='annular',
inner_radius=inrad, outer_radius=outrad,
verbose=False)
if cube_ref is not None:
ref_lib, _ = prepare_matrix(cube_ref, scaling, mask_center_px,
'annular', inner_radius=inrad,
outer_radius=outrad, verbose=False)
else:
ref_lib = matrix
else:
raise RuntimeError('Wrong mode. Choose either fullfr or annular')
V = svd_wrapper(ref_lib, svd_mode, pcmax, verbose)
if verbose:
timing(start_time)
snrlist = []
fluxlist = []
frlist = []
for pc in pclist:
if mode == 'fullfr':
frame = truncate_svd_get_finframe(matrix, angle_list, pc, V)
elif mode == 'annular':
frame = truncate_svd_get_finframe_ann(matrix, annind,
angle_list, pc, V)
else:
raise RuntimeError('Wrong mode. Choose either full or annular')
if x is not None and y is not None and fwhm is not None:
snr_value, flux = get_snr(frame, y, x, fwhm, fmerit)
if np.isnan(snr_value):
snr_value = 0
snrlist.append(snr_value)
fluxlist.append(flux)
frlist.append(frame)
cubeout = np.array((frlist))
# measuring the S/Ns for the given position
if x is not None and y is not None and fwhm is not None:
argmax = np.argmax(snrlist)
opt_npc = pclist[argmax]
df = DataFrame({'PCs': pclist, 'S/Ns': snrlist, 'fluxes': fluxlist})
if debug:
print(df, '\n')
if verbose:
print('Number of steps', len(pclist))
msg = 'Optimal number of PCs = {}, for S/N={:.3f}'
print(msg.format(opt_npc, snrlist[argmax]))
# Plot of SNR and flux as function of PCs
if plot:
plt.figure(figsize=vip_figsize, dpi=vip_figdpi)
ax1 = plt.subplot(211)
ax1.plot(pclist, snrlist, '-', alpha=0.5, color='C0')
ax1.plot(pclist, snrlist, 'o', alpha=0.5, color='C0')
ax1.set_xlim(np.array(pclist).min(), np.array(pclist).max())
ax1.set_ylim(min(snrlist), np.array(snrlist).max() + 1)
ax1.set_ylabel('S/N')
ax1.minorticks_on()
ax1.grid('on', 'major', linestyle='solid', alpha=0.4)
ax1.set_title('Optimal # PCs: {}'.format(opt_npc))
ax2 = plt.subplot(212)
ax2.plot(pclist, fluxlist, '-', alpha=0.5, color='C1')
ax2.plot(pclist, fluxlist, 'o', alpha=0.5, color='C1')
ax2.set_xlim(np.array(pclist).min(), np.array(pclist).max())
ax2.set_ylim(min(fluxlist), np.array(fluxlist).max() + 1)
ax2.set_xlabel('Principal components')
ax2.set_ylabel('Flux in FWHM ap. [ADUs]')
ax2.minorticks_on()
ax2.grid('on', 'major', linestyle='solid', alpha=0.4)
if save_plot is not None:
plt.savefig(save_plot, dpi=100, bbox_inches='tight')
finalfr = cubeout[argmax]
_ = frame_report(finalfr, fwhm, (x, y), verbose=verbose)
return cubeout, finalfr, df, opt_npc
else:
if verbose:
print('Computed residual frames for PCs interval: '
'{}'.format(range_pcs))
print('Number of steps', len(pclist))
if verbose:
timing(start_time)
if full_output:
return cubeout, pclist
else:
return cubeout
def pca_incremental(cube, angle_list, batch=0.25, ncomp=1, collapse='median',
verbose=True, full_output=False, return_residuals=False,
start_time=None, weights=None, **rot_options):
""" Computes the full-frame PCA-ADI algorithm in batches, for processing
fits files larger than the available system memory. It uses the incremental
PCA algorithm from Sklearn. There is no ``scaling`` parameter as in other
PCA algorithms in ``VIP``, but by default this implementation returns a
temporally mean-centered frame ("temp-mean").
Parameters
----------
cube : str or numpy ndarray
Input cube as numpy array or string with the path to the fits file to be
opened in memmap mode.
angle_list : str or numpy ndarray
Corresponding parallactic angle for each frame.
batch : int or float, optional
When int it corresponds to the number of frames in each batch. If a
float (0, 1] is passed then it is the size of the batch is computed wrt
the available memory in the system.
ncomp : int, optional
How many PCs are used as a lower-dimensional subspace to project the
target frames.
collapse : {'median', 'mean', 'sum', 'trimmean'}, str optional
Sets the way of collapsing the frames for producing a final image.
rot_options: dictionary, optional
Dictionary with optional keyword values for "nproc", "imlib",
"interpolation, "border_mode", "mask_val", "edge_blend",
"interp_zeros", "ker" (see documentation of
``vip_hci.preproc.frame_rotate``)
verbose : {True, False}, bool optional
If True prints intermediate info and timing.
full_output : boolean, optional
Whether to return the final median combined image only or with other
intermediate arrays.
return_residuals : bool, optional
If True, only the cube of residuals is returned (before de-rotating).
start_time : None or datetime.datetime, optional
Used when embedding this function in the main ``pca`` function. The
object datetime.datetime is the global starting time. If None, it
initiates its own counter.
weights: 1d numpy array or list, optional
Weights to be applied for a weighted mean. Need to be provided if
collapse mode is 'wmean'.
Returns
-------
frame : numpy ndarray
[return_residuals=False] Final frame (2d array).
ipca : scikit-learn model
[full_output=True, return_residuals=False] The incremental PCA model of
scikit-learn.
pcs : numpy ndarray
[full_output=True, return_residuals=False] Principal components reshaped
into images.
medians : numpy ndarray
[full_output=True, return_residuals=False] The median of the derotated
residuals for each batch.
cube_residuals : numpy ndarray
[return_residuals=True] Cube of residuals.
"""
if start_time is None:
start_time = time_ini(verbose)
verbose_memcheck = True
else:
verbose_memcheck = False
# checking cube and angle_list data types
if not isinstance(cube, (np.ndarray, str)):
raise TypeError('`cube` must be a str (full path on disk) or a numpy '
'array')
if not isinstance(angle_list, (np.ndarray, str)):
raise TypeError('`angle_list` must be a str (full path on disk) or a '
'numpy array')
# opening data
if isinstance(cube, str):
# assuming the first HDULIST contains the datacube
hdulist = open_fits(cube, n=0, return_memmap=True)
cube = hdulist.data
if not cube.ndim > 2:
raise TypeError('Input array is not a 3d array')
n_frames, y, x = cube.shape
# checking angles length and ncomp
if isinstance(angle_list, str):
angle_list = open_fits(angle_list)
angle_list = check_pa_vector(angle_list)
if not n_frames == angle_list.shape[0] and not return_residuals:
raise TypeError('`angle_list` vector has wrong length. It must be the '
'same as the number of frames in the cube')
if not isinstance(ncomp, (int, float)):
raise TypeError("`ncomp` must be an int or a float in the ADI case")
if ncomp > n_frames:
ncomp = min(ncomp, n_frames)
msg = 'Number of PCs too high (max PCs={}), using {} PCs instead.'
print(msg.format(n_frames, ncomp))
# checking memory and determining batch size
cube_size = cube.nbytes
aval_mem = get_available_memory(verbose_memcheck)
if isinstance(batch, int): # the batch size in n_fr
batch_size = batch
elif isinstance(batch, float): # the batch ratio wrt available memory
if 1 > batch > 0:
batch_size = min(int(n_frames * (batch * aval_mem) / cube_size),
n_frames)
else:
raise TypeError("`batch` must be an int or float")
if verbose:
msg1 = "Cube size = {:.3f} GB ({} frames)"
print(msg1.format(cube_size / 1e9, n_frames))
msg2 = "Batch size = {} frames ({:.3f} GB)\n"
print(msg2.format(batch_size, cube[:batch_size].nbytes / 1e9))
n_batches = n_frames // batch_size # floor/int division
remaining_frames = n_frames % batch_size
if remaining_frames > 0:
n_batches += 1
# computing the PCA model for each batch
ipca = IncrementalPCA(n_components=ncomp)
for i in range(n_batches):
intini = i * batch_size
intfin = (i + 1) * batch_size
batch = cube[intini:min(n_frames, intfin)]
msg = 'Batch {}/{}\tshape: {}\tsize: {:.1f} MB'
if verbose:
print(msg.format(i+1, n_batches, batch.shape, batch.nbytes / 1e6))
matrix = prepare_matrix(batch, verbose=False)
ipca.partial_fit(matrix)
if verbose:
timing(start_time)
# getting PCs and the mean in order to center each batch
V = ipca.components_
mean = ipca.mean_.reshape(y, x)
if verbose:
print('\nReconstructing and obtaining residuals')
if return_residuals:
cube_residuals = np.empty((n_frames, y, x))
else:
medians = []
for i in range(n_batches):
intini = i * batch_size
intfin = (i + 1) * batch_size
batch = cube[intini:min(n_frames, intfin)] - mean
matrix = prepare_matrix(batch, verbose=False)
reconst = np.dot(np.dot(matrix, V.T), V)
resid = matrix - reconst
resid_reshaped = resid.reshape(batch.shape)
if return_residuals:
cube_residuals[intini:intfin] = resid_reshaped
else:
resid_der = cube_derotate(resid_reshaped, angle_list[intini:intfin],
**rot_options)
medians.append(cube_collapse(resid_der, mode=collapse, w=weights))
del matrix
del batch
if return_residuals:
return cube_residuals
else:
medians = np.array(medians)
frame = np.median(medians, axis=0)
if verbose:
timing(start_time)
if full_output:
pcs = reshape_matrix(V, y, x)
return frame, ipca, pcs, medians
else:
return frame
[docs]
def pca_annulus(cube, angs, ncomp, annulus_width, r_guess, cube_ref=None,
svd_mode='lapack', scaling=None, collapse='median',
weights=None, collapse_ifs='mean', **rot_options):
"""
PCA-ADI or PCA-RDI processed only for an annulus of the cube, with a given
width and at a given radial distance to the frame center. It returns a
processed frame with non-zero values only at the location of the annulus.
Parameters
----------
cube : 3d or 4d numpy ndarray
Input data cube to be processed by PCA.
angs : numpy ndarray or None
The parallactic angles expressed as a numpy.array.
ncomp : int or list/1d numpy array of int
The number of principal component.
annulus_width : float
The annulus width in pixel on which the PCA is performed.
r_guess : float
Radius of the annulus in pixels.
cube_ref : 3d or 4d numpy ndarray, or list of 3d numpy ndarray, optional
Reference library cube for Reference Star Differential Imaging.
svd_mode : {'lapack', 'randsvd', 'eigen', 'arpack'}, str optional
Switch for different ways of computing the SVD and selected PCs.
scaling : {None, "temp-mean", spat-mean", "temp-standard",
"spat-standard"}, None or str optional
Pixel-wise scaling mode using ``sklearn.preprocessing.scale``
function. If set to None, the input matrix is left untouched. Otherwise:
* ``temp-mean``: temporal px-wise mean is subtracted.
* ``spat-mean``: spatial mean is subtracted.
* ``temp-standard``: temporal mean centering plus scaling pixel values
to unit variance (temporally).
* ``spat-standard``: spatial mean centering plus scaling pixel values
to unit variance (spatially).
DISCLAIMER: Using ``temp-mean`` or ``temp-standard`` scaling can improve
the speckle subtraction for ASDI or (A)RDI reductions. Nonetheless, this
involves a sort of c-ADI preprocessing, which (i) can be dangerous for
datasets with low amount of rotation (strong self-subtraction), and (ii)
should probably be referred to as ARDI (i.e. not RDI stricto sensu).
collapse : {'median', 'mean', 'sum', 'wmean'}, str or None, optional
Sets the way of collapsing the residual frames to produce a final image.
If None then the cube of residuals is returned.
weights: 1d numpy array or list, optional
Weights to be applied for a weighted mean. Need to be provided if
collapse mode is 'wmean' for collapse.
collapse_ifs : {'median', 'mean', 'sum', 'wmean', 'absmean'}, str or None, optional
Sets the way of collapsing the spectral frames for producing a final
image (in the case of a 4D input cube).
rot_options: dictionary, optional
Dictionary with optional keyword values for "nproc", "imlib",
"interpolation, "border_mode", "mask_val", "edge_blend",
"interp_zeros", "ker" (see documentation of
``vip_hci.preproc.frame_rotate``)
Returns
-------
pca_res: 2d or 3d ndarray
Depending on ``collapse`` and ``angs`` parameters, either a final
collapsed frame (``collapse`` not None) or the cube of residuals
(derotated if angs is not None, non-derotated otherwises).
Note: for 4d input cube, collapse must be non-None.
"""
def _pca_annulus_3d(cube, angs, ncomp, annulus_width, r_guess, cube_ref,
svd_mode, scaling, collapse, weights, **rot_options):
inrad = int(r_guess - annulus_width / 2.)
outrad = int(r_guess + annulus_width / 2.)
data, ind = prepare_matrix(cube, scaling, mode='annular', verbose=False,
inner_radius=inrad, outer_radius=outrad)
yy, xx = ind
if cube_ref is not None:
data_svd, _ = prepare_matrix(cube_ref, scaling, mode='annular',
verbose=False, inner_radius=inrad,
outer_radius=outrad)
else:
data_svd = data
V = svd_wrapper(data_svd, svd_mode, ncomp, verbose=False)
transformed = np.dot(data, V.T)
reconstructed = np.dot(transformed, V)
residuals = data - reconstructed
cube_zeros = np.zeros_like(cube)
cube_zeros[:, yy, xx] = residuals
if angs is not None:
cube_res_der = cube_derotate(cube_zeros, angs, **rot_options)
if collapse is not None:
pca_frame = cube_collapse(cube_res_der, mode=collapse,
w=weights)
return pca_frame
else:
return cube_res_der
else:
if collapse is not None:
pca_frame = cube_collapse(cube_zeros, mode=collapse, w=weights)
return pca_frame
else:
return cube_zeros
if cube.ndim == 3:
return _pca_annulus_3d(cube, angs, ncomp, annulus_width, r_guess,
cube_ref, svd_mode, scaling, collapse, weights,
**rot_options)
elif cube.ndim == 4:
nch = cube.shape[0]
if cube_ref is not None:
if cube_ref.ndim == 3:
cube_ref = [cube_ref]*nch
if np.isscalar(ncomp):
ncomp = [ncomp]*nch
elif isinstance(ncomp, list) and len(ncomp) != nch:
msg = "If ncomp is a list, in the case of a 4d input cube without "
msg += "input scale_list, it should have the same length as the "
msg += "first dimension of the cube."
raise TypeError(msg)
if collapse is None:
raise ValueError("mode not supported. Provide value for collapse")
ifs_res = np.zeros([nch, cube.shape[2], cube.shape[3]])
for ch in range(nch):
if cube_ref is not None:
if cube_ref[ch].ndim != 3:
msg = "Ref cube has wrong format for 4d input cube"
raise TypeError(msg)
cube_ref_tmp = cube_ref[ch]
else:
cube_ref_tmp = cube_ref
ifs_res[ch] = _pca_annulus_3d(cube[ch], angs, ncomp[ch],
annulus_width, r_guess, cube_ref_tmp,
svd_mode, scaling, collapse, weights,
**rot_options)
return cube_collapse(ifs_res, mode=collapse_ifs)
def _compute_stim_map(cube_der):
"""
Computes the STIM detection map.
Note: this is a duplicate of the STIM map routine in the metrics module
that is necessary to avoid circular imports (used in iterative PCA function)
Parameters
----------
cube_der : 3d numpy ndarray
Input de-rotated cube, e.g. output 'residuals_cube_' from
``vip_hci.pca.pca``.
Returns
-------
detection_map : 2d ndarray
STIM detection map.
"""
t, n, _ = cube_der.shape
mu = np.mean(cube_der, axis=0)
sigma = np.sqrt(np.var(cube_der, axis=0))
detection_map = np.divide(mu, sigma, out=np.zeros_like(mu),
where=sigma != 0)
return get_circle(detection_map, int(np.round(n/2.)))
def _compute_inverse_stim_map(cube, angle_list, **rot_options):
"""
Computes the inverse STIM detection map, i.e. obtained with opposite
derotation angles.
Note: this is a duplicate of the STIM map routine in the metrics module
that is necessary to avoid circular imports (used in iterative PCA function)
Parameters
----------
cube : 3d numpy ndarray
Non de-rotated residuals from reduction algorithm, eg. output residuals
from ``vip_hci.pca.pca``.
angle_list : numpy ndarray, 1d
Corresponding parallactic angle for each frame.
rot_options: dictionary, optional
Dictionary with optional keyword values for "nproc", "imlib",
"interpolation, "border_mode", "mask_val", "edge_blend",
"interp_zeros", "ker" (see documentation of
``vip_hci.preproc.frame_rotate``)
Returns
-------
inverse_stim_map : 2d ndarray
Inverse STIM detection map.
"""
t, n, _ = cube.shape
cube_inv_der = cube_derotate(cube, -angle_list, **rot_options)
inverse_stim_map = _compute_stim_map(cube_inv_der)
return inverse_stim_map