#! /usr/bin/env python
"""
Module with the HCI<post-processing algorithms> classes.
"""
__author__ = 'Carlos Alberto Gomez Gonzalez, Ralf Farkas'
__all__ = ['HCIMedianSub',
'HCIPca',
'HCILoci',
'HCILLSG',
'HCIAndromeda']
import pickle
import numpy as np
from sklearn.base import BaseEstimator
from .hci_dataset import Dataset
from .metrics import snrmap
from .invprob import andromeda
from .psfsub import pca, llsg, median_sub, xloci
from .config.utils_conf import algo_calculates_decorator as calculates
class HCIPostProcAlgo(BaseEstimator):
"""
Base HCI post-processing algorithm class.
"""
def __init__(self, locals_dict, *skip):
"""
Set up the algorithm parameters.
This does multiple things:
- verify that ``dataset`` is a HCIDataset object or ``None`` (it could
also be provided to ``run``)
- store all the keywords (from ``locals_dict``) as object attributes, so
they can be accessed e.g. in the ``run()`` method
- print out the full algorithm settings (user provided parameters +
default ones) if ``verbose=True``
Parameters
----------
locals_dict : dict
This should be ``locals()``. ``locals()`` contains *all* the
variables defined in the local scope. Passed to
``self._store_args``.
*skip : list of strings
Passed on to ``self._store_args``. Refer to its documentation.
Examples
--------
.. code:: python
# when subclassing HCIPostProcAlgo, make sure you call super()
# with locals()! This means:
class MySuperAlgo(HCIPostProcAlgo):
def __init__(self, algo_param_1=42, cool=True):
super(MySuperAlgo, self).__init__(locals())
@calculates("frame")
def run(self, dataset=None):
self.frame = 2 * self.algo_param_1
"""
dataset = locals_dict.get("dataset", None)
if not isinstance(dataset, (Dataset, type(None))):
raise ValueError('`dataset` must be a HCIDataset object or None')
self._store_args(locals_dict, *skip)
verbose = locals_dict.get("verbose", True)
if verbose:
self._print_parameters()
def _print_parameters(self):
""" Printing out the parameters of the algorithm.
"""
dicpar = self.get_params()
for key in dicpar.keys():
print("{}: {}".format(key, dicpar[key]))
def _store_args(self, locals_dict, *skip):
# TODO: this could be integrated with sklearn's BaseEstimator methods
for k in locals_dict:
if k == "self" or k in skip:
continue
setattr(self, k, locals_dict[k])
def _get_dataset(self, dataset=None, verbose=True):
"""
Handle a dataset passed to ``run()``.
It is possible to specify a dataset using the constructor, or using the
``run()`` function. This helper function checks that there is a dataset
to work with.
Parameters
----------
dataset : HCIDataset or None, optional
verbose : bool, optional
If ``True``, a message is printed out when a previous dataset was
overwritten.
Returns
-------
dataset : Dataset
"""
if dataset is None:
dataset = self.dataset
if self.dataset is None:
raise ValueError("no dataset specified!")
else:
if self.dataset is not None and verbose:
print("a new dataset was provided to run(), all previous "
"results were cleared.")
self.dataset = dataset
self._reset_results()
return dataset
def _get_calculations(self):
"""
Get a list of all attributes which are *calculated*.
This iterates over all the elements in an object and finds the functions
which were decorated with ``@calculates`` (which are identified by the
function attribute ``_calculates``). It then stores the calculated
attributes, together with the corresponding method, and returns it.
Returns
-------
calculations : dict
Dictionary mapping a single "calculated attribute" to the method
which calculates it.
"""
calculations = {}
for e in dir(self):
try:
for k in getattr(getattr(self, e), "_calculates"):
calculations[k] = e
except AttributeError:
pass
return calculations
def _reset_results(self):
"""
Remove all calculated results from the object.
By design, the HCIPostPRocAlgo's can be initialized without a dataset,
so the dataset can be provided to the ``run`` method. This makes it
possible to run the same algorithm on multiple datasets. In order not to
keep results from an older ``run`` call when working on a new dataset,
the stored results are reset using this function every time the ``run``
method is called.
"""
for attr in self._get_calculations():
try:
delattr(self, attr)
except AttributeError:
pass # attribute/result was not calculated yet. Skip.
def __getattr__(self, a):
"""
``__getattr__`` is only called when an attribute does *not* exist.
Catching this event allows us to output proper error messages when an
attribute was not calculated yet.
"""
calculations = self._get_calculations()
if a in calculations:
raise AttributeError("The '{}' was not calculated yet. Call '{}' "
"first.".format(a, calculations[a]))
else:
# this raises a regular AttributeError:
return self.__getattribute__(a)
def _show_attribute_help(self, function_name):
"""
Print information about the attributes a method calculated.
This is called *automatically* when a method is decorated with
``@calculates``.
Parameters
----------
function_name : string
The name of the method.
"""
calculations = self._get_calculations()
print("These attributes were just calculated:")
for a, f in calculations.items():
if hasattr(self, a) and function_name == f:
print("\t{}".format(a))
not_calculated_yet = [(a, f) for a, f in calculations.items()
if (f not in self._called_calculators
and not hasattr(self, a))]
if len(not_calculated_yet) > 0:
print("The following attributes can be calculated now:")
for a, f in not_calculated_yet:
print("\t{}\twith .{}()".format(a, f))
@calculates("snr_map", "detection_map")
def make_snrmap(self, approximated=False, plot=False, known_sources=None,
nproc=None, verbose=False):
"""
Calculate a S/N map from ``self.frame_final``.
Parameters
----------
approximated : bool, optional
If True, a proxy to the S/N calculation will be used. If False, the
Mawet et al. 2014 definition is used.
plot : bool, optional
If True plots the S/N map. True by default.
known_sources : None, tuple or tuple of tuples, optional
To take into account existing sources. It should be a tuple of
float/int or a tuple of tuples (of float/int) with the coordinate(s)
of the known sources.
nproc : int or None
Number of processes for parallel computing.
verbose: bool, optional
Whether to print timing or not.
Note
----
This is needed for "classic" algorithms that produce a final residual
image in their ``.run()`` method. To obtain a "detection map", which can
be used for counting true/false positives, a SNR map has to be created.
For other algorithms (like ANDROMEDA) which directly create a SNR or a
probability map, this method should be overwritten and thus disabled.
"""
if self.dataset.cube.ndim == 4:
fwhm = np.mean(self.dataset.fwhm)
else:
fwhm = self.dataset.fwhm
self.snr_map = snrmap(self.frame_final, fwhm, approximated, plot=plot,
known_sources=known_sources, nproc=nproc,
verbose=verbose)
self.detection_map = self.snr_map
def save(self, filename):
"""
Pickle the algo object and save it to disk.
Note that this also saves the associated ``self.dataset``, in a
non-optimal way.
"""
pickle.dump(self, open(filename, "wb"))
@calculates("frame_final")
def run(self, dataset=None, nproc=1, verbose=True):
"""
Run the algorithm. Should at least set `` self.frame_final``.
Note
----
This is the required signature of the ``run`` call. Child classes can
add their own keyword arguments if needed.
"""
raise NotImplementedError
[docs]class HCIPca(HCIPostProcAlgo):
""" HCI PCA algorithm.
Parameters
----------
dataset : HCIDataset object, optional
An HCIDataset object to be processed. Can also be passed to ``.run()``.
ncomp : int, optional
How many PCs are used as a lower-dimensional subspace to project the
target frames. For an ADI cube, ``ncomp`` is the number of PCs extracted
from ``cube``. For the RDI case, when ``cube`` and ``cube_ref`` are
provided, ``ncomp`` is the number of PCs obtained from ``cube_ref``.
For an ADI+mSDI cube (e.g. SPHERE/IFS), if ``adimsdi`` is ``double``
then ``ncomp`` is the number of PCs obtained from each multi-spectral
frame (if ``ncomp`` is None then this stage will be skipped and the
spectral channels will be combined without subtraction). If ``adimsdi``
is ``single``, then ``ncomp`` is the number of PCs obtained from the
whole set of frames (n_channels * n_adiframes).
ncomp2 : int, optional
Only used for ADI+mSDI cubes, when ``adimsdi`` is set to ``double``.
``ncomp2`` sets the number of PCs used in the second PCA stage (ADI
fashion, using the residuals of the first stage). If None then the
second PCA stage is skipped and the residuals are de-rotated and
combined.
svd_mode : {'lapack', 'arpack', 'eigen', 'randsvd', 'cupy', 'eigencupy', 'randcupy'}, str
Switch for the SVD method/library to be used. ``lapack`` uses the LAPACK
linear algebra library through Numpy and it is the most conventional way
of computing the SVD (deterministic result computed on CPU). ``arpack``
uses the ARPACK Fortran libraries accessible through Scipy (computation
on CPU). ``eigen`` computes the singular vectors through the
eigendecomposition of the covariance M.M' (computation on CPU).
``randsvd`` uses the randomized_svd algorithm implemented in Sklearn
(computation on CPU). ``cupy`` uses the Cupy library for GPU computation
of the SVD as in the LAPACK version. ``eigencupy`` offers the same
method as with the ``eigen`` option but on GPU (through Cupy).
``randcupy`` is an adaptation of the randomized_svd algorithm, where all
the computations are done on a GPU.
scaling : {None, 'temp-mean', 'spat-mean', 'temp-standard', 'spat-standard'}
With None, no scaling is performed on the input data before SVD. With
"temp-mean" then temporal px-wise mean subtraction is done, with
"spat-mean" then the spatial mean is subtracted, with "temp-standard"
temporal mean centering plus scaling to unit variance is done and with
"spat-standard" spatial mean centering plus scaling to unit variance is
performed.
adimsdi : {'double', 'single'}, str optional
In the case ``cube`` is a 4d array, ``adimsdi`` determines whether a
single or double pass PCA is going to be computed. In the ``single``
case, the multi-spectral frames are rescaled wrt the largest wavelength
to align the speckles and all the frames are processed with a single
PCA low-rank approximation. In the ``double`` case, a firt stage is run
on the rescaled spectral frames, and a second PCA frame is run on the
residuals in an ADI fashion.
mask_center_px : None or int
If None, no masking is done. If an integer > 1 then this value is the
radius of the circular mask.
source_xy : tuple of int, optional
For ADI PCA, this triggers a frame rejection in the PCA library.
source_xy are the coordinates X,Y of the center of the annulus where the
PA criterion will be used to reject frames from the library.
delta_rot : int, optional
Factor for tunning the parallactic angle threshold, expressed in FWHM.
Default is 1 (excludes 1xFHWM on each side of the considered frame).
imlib : {'opencv', 'skimage', 'vip-fft'}, str optional
Library used for image transformations. Opencv is faster than ndimage or
skimage.
interpolation : str, optional
For 'skimage' library: 'nearneig', bilinear', 'bicuadratic',
'bicubic', 'biquartic', 'biquintic'. The 'nearneig' interpolation
is the fastest and the 'biquintic' the slowest. The 'nearneig' is
the poorer option for interpolation of noisy astronomical images.
For 'opencv' library: 'nearneig', 'bilinear', 'bicubic', 'lanczos4'.
The 'nearneig' interpolation is the fastest and the 'lanczos4' the
slowest and accurate. 'lanczos4' is the default.
collapse : {'median', 'mean', 'sum', 'trimmean'}, str optional
Sets the way of collapsing the frames for producing a final image.
check_mem : bool, optional
If True, it check that the input cube(s) are smaller than the available
system memory.
"""
def __init__(self, dataset=None, ncomp=1, ncomp2=1, svd_mode='lapack', scaling=None,
adimsdi='double', mask_center_px=None, source_xy=None,
delta_rot=1, imlib='vip-fft', interpolation='lanczos4',
collapse='median', check_mem=True, crop_ifs=True, verbose=True):
super(HCIPca, self).__init__(locals())
# TODO: order/names of parameters are not consistent with ``pca`` core
# function
[docs] @calculates("frame_final",
"cube_reconstructed", "cube_residuals", "cube_residuals_der",
"pcs",
"cube_residuals_per_channel", "cube_residuals_per_channel_der",
"cube_residuals_resc")
def run(self, dataset=None, nproc=1, verbose=True, debug=False):
"""
Run the HCI PCA algorithm for model PSF subtraction.
Note
----
creates/sets the ``self.frame_final`` attribute, and depending on the
input parameters:
3D case:
cube_reconstructed
cube_residuals
cube_residuals_der
3D case, source_xy is None:
cube_residuals
pcs
4D case, adimsdi="double":
cube_residuals_per_channel
cube_residuals_per_channel_der
4D case, adimsdi="single":
cube_residuals
cube_residuals_resc
Parameters
----------
dataset : HCIDataset, optional
Dataset to process. If not provided, ``self.dataset`` is used (as
set when initializing this object).
nproc : int, optional
(not used) Note that ``HCIPca`` always works in single-processing
mode.
verbose : bool, optional
Show more output.
debug : bool, optional
Whether to print debug information or not.
"""
dataset = self._get_dataset(dataset, verbose)
if self.source_xy is not None and dataset.fwhm is None:
raise ValueError('`fwhm` has not been set')
res = pca(dataset.cube, dataset.angles, dataset.cuberef,
dataset.wavelengths, self.ncomp, self.ncomp2, self.svd_mode,
self.scaling, self.adimsdi, self.mask_center_px,
self.source_xy, self.delta_rot, dataset.fwhm, self.imlib,
self.interpolation, self.collapse, self.check_mem,
self.crop_ifs, nproc, full_output=True, verbose=verbose,
debug=debug)
if dataset.cube.ndim == 3:
if self.source_xy is not None:
cuberecon, cuberes, cuberesder, frame = res
self.cube_reconstructed = cuberecon
self.cube_residuals = cuberes
self.cube_residuals_der = cuberesder
self.frame_final = frame
else:
pcs, cuberecon, cuberes, cuberesder, frame = res
self.pcs = pcs
self.cube_reconstructed = cuberecon
self.cube_residuals = cuberes
self.cube_residuals_der = cuberesder
self.frame_final = frame
elif dataset.cube.ndim == 4:
if self.adimsdi == 'double':
cubereschan, cubereschander, frame = res
self.cube_residuals_per_channel = cubereschan
self.cube_residuals_per_channel_der = cubereschander
self.frame_final = frame
elif self.adimsdi == 'single':
cuberes, cuberesresc, frame = res
self.cube_residuals = cuberes
self.cube_residuals_resc = cuberesresc
self.frame_final = frame
[docs]class HCILoci(HCIPostProcAlgo):
"""
HCI LOCI algorithm.
"""
def __init__(self, dataset=None, scale_list=None, metric="manhattan",
dist_threshold=90, delta_rot=0.5, delta_sep=(0.1, 1),
radius_int=0, asize=4, n_segments=4, solver="lstsq", tol=1e-3,
optim_scale_fact=1, adimsdi="skipadi", imlib='vip-fft',
interpolation="lanczos4", collapse="median", verbose=True):
super(HCILoci, self).__init__(locals())
[docs] @calculates("frame_final", "cube_res", "cube_der")
def run(self, dataset=None, nproc=1, verbose=True):
"""
Run the HCI LOCI algorithm for model PSF subtraction.
"""
dataset = self._get_dataset(dataset, verbose)
res = xloci(dataset.cube, dataset.angles, self.scale_list, dataset.fwhm,
self.metric, self.dist_threshold, self.delta_rot,
self.delta_sep, self.radius_int, self.asize,
self.n_segments, nproc, self.solver, self.tol,
self.optim_scale_fact, self.adimsdi, self.imlib,
self.interpolation, self.collapse, verbose,
full_output=True)
self.cube_res, self.cube_der, self.frame_final = res
[docs]class HCILLSG(HCIPostProcAlgo):
"""
HCI LLSG algorithm.
"""
def __init__(self, dataset=None, rank=10, thresh=1, max_iter=10,
low_rank_ref=False, low_rank_mode='svd', auto_rank_mode='noise',
residuals_tol=1e-1, cevr=0.9, thresh_mode='soft', nproc=1,
asize=None, n_segments=4, azimuth_overlap=None, radius_int=None,
random_seed=None, imlib='vip-fft', interpolation='lanczos4',
high_pass=None, collapse='median', verbose=True):
super(HCILLSG, self).__init__(locals())
[docs] @calculates("frame_final", "frame_l", "frame_s", "frame_g")
def run(self, dataset=None, nproc=1, verbose=True):
"""
Run the HCI LLSG algorithm for model PSF subtraction.
"""
dataset = self._get_dataset(dataset, verbose)
res = llsg(
dataset.cube, dataset.angles, dataset.fwhm,
rank=self.rank, thresh=self.thresh, max_iter=self.max_iter,
low_rank_ref=self.low_rank_ref, low_rank_mode=self.low_rank_mode,
auto_rank_mode=self.auto_rank_mode,
residuals_tol=self.residuals_tol, cevr=self.cevr,
thresh_mode=self.thresh_mode, nproc=nproc, asize=self.asize,
n_segments=self.n_segments, azimuth_overlap=self.azimuth_overlap,
radius_int=self.radius_int, random_seed=self.random_seed,
imlib=self.imlib, interpolation=self.interpolation,
high_pass=self.high_pass, collapse=self.collapse, full_output=True,
verbose=verbose, debug=False
)
self.list_l_array_der = res[0]
self.list_s_array_der = res[1]
self.list_g_array_der = res[2]
self.frame_l = res[3]
self.frame_s = res[4]
self.frame_g = res[5]
self.frame_final = self.frame_s
[docs]class HCIAndromeda(HCIPostProcAlgo):
"""
HCI ANDROMEDA algorithm.
Parameters
----------
dataset : HCIDataset object, optional
An HCIDataset object to be processed. Can also be passed to ``.run()``.
oversampling_fact : float, optional
Oversampling factor for the wavelength corresponding to the filter used
for obtaining ``cube`` (defined as the ratio between the wavelength of
the filter and the Shannon wavelength).
filtering_fraction : float, optional
Strength of the high-pass filter. If set to ``1``, no high-pass filter
is used.
min_sep : float, optional
Angular separation is assured to be above ``min_sep*lambda/D``.
annuli_width : float, optional
Annuli width on which the subtraction are performed. The same for all
annuli.
roa : float, optional
Ratio of the optimization area. The optimization annulus area is defined
by ``roa * annuli_width``.
opt_method : {'no', 'total', 'lsq', 'robust'}, optional
Method used to balance for the flux difference that exists between the
two subtracted annuli in an optimal way during ADI.
nsmooth_snr : int, optional
Number of pixels over which the radial robust standard deviation profile
of the SNR map is smoothed to provide a global trend for the SNR map
normalization. For ``nsmooth_snr=0`` the SNR map normalization is
disabled, and the positivity constraint is applied when calculating the
flux.
iwa : float, optional
Inner working angle / inner radius of the first annulus taken into
account, expressed in $\\lambda/D$.
precision : int, optional
Number of shifts applied to the PSF. Passed to
``calc_psf_shift_subpix`` , which then creates a 4D cube with shape
(precision+1, precision+1, N, N).
homogeneous_variance : bool, optional
If set, variance is treated as homogeneous and is calculated as a mean
of variance in each position through time.
multiply_gamma : bool, optional
Use gamma for signature computation too.
verbose : bool, optional
Print some parameter values for control.
"""
def __init__(self, dataset=None, oversampling_fact=0.5,
filtering_fraction=0.25, min_sep=0.5, annuli_width=1., roa=2.,
opt_method='lsq', nsmooth_snr=18, iwa=None, owa=None, precision=50,
fast=False,
homogeneous_variance=True, ditimg=1.0, ditpsf=None, tnd=1.0,
total=False, multiply_gamma=True,
verbose=True):
super(HCIAndromeda, self).__init__(locals())
[docs] @calculates("frame_final", "contrast_map", "likelihood_map", "snr_map",
"stdcontrast_map", "snr_map_notnorm", "stdcontrast_map_notnorm",
"ext_radius", "detection_map")
def run(self, dataset=None, nproc=1, verbose=True):
"""
Run the ANDROMEDA algorithm for model PSF subtraction.
Parameters
----------
dataset : HCIDataset, optional
Dataset to process. If not provided, ``self.dataset`` is used (as
set when initializing this object).
nproc : int, optional
Number of processes to use.
verbose : bool, optional
Print some parameter values for control.
"""
dataset = self._get_dataset(dataset, verbose)
res = andromeda(cube=dataset.cube,
oversampling_fact=self.oversampling_fact,
angles=dataset.angles, psf=dataset.psf,
filtering_fraction=self.filtering_fraction,
min_sep=self.min_sep, annuli_width=self.annuli_width,
roa=self.roa, opt_method=self.opt_method,
nsmooth_snr=self.nsmooth_snr, iwa=self.iwa,
owa=self.owa,
precision=self.precision, fast=self.fast,
homogeneous_variance=self.homogeneous_variance,
ditimg=self.ditimg, ditpsf=self.ditpsf, tnd=self.tnd,
total=self.total,
multiply_gamma=self.multiply_gamma, nproc=nproc,
verbose=verbose)
self.contrast_map = res[0]
self.likelihood_map = res[5]
self.ext_radius = res[6]
# normalized/not normalized depending on nsmooth:
self.snr_map = res[2]
self.stdcontrast_map = res[4]
if self.nsmooth_snr != 0:
self.snr_map_notnorm = res[1]
self.stdcontrast_map_notnorm = res[3]
# general attributes:
self.frame_final = self.contrast_map
self.detection_map = self.snr_map
[docs] def make_snr_map(self, *args, **kwargs):
"""
Does nothing. For Andromeda, ``snr_map`` is calculated by ``run()``.
Note
----
The ``@calculates`` decorator is not present in this function
definition, so ``self._get_calculations`` does not mark ``snr_map`` as
created by this function.
"""
pass