#! /usr/bin/env python
"""
Module containing the Local Low-rank plus Sparse plus Gaussian-noise
decomposition algorithm for ADI data.
.. [GOM16]
| Gomez-Gonzalez et al. 2016
| **Low-rank plus sparse decomposition for exoplanet detection in
direct-imaging ADI sequences. The LLSG algorithm**
| *Astronomy & Astrophysics, Volume 589, Issue 1, p. 54*
| `https://arxiv.org/abs/1602.08381
<https://arxiv.org/abs/1602.08381>`_
"""
__author__ = "Carlos Alberto Gomez Gonzalez, Thomas Bédrine"
__all__ = ["llsg", "thresholding", "LLSG_Params"]
import numpy as np
from scipy.linalg import qr
from multiprocessing import cpu_count
from astropy.stats import median_absolute_deviation
from dataclasses import dataclass
from typing import List
from enum import Enum
from .svd import svd_wrapper, get_eigenvectors
from ..config import time_ini, timing
from ..config.paramenum import Collapse, LowRankMode, AutoRankMode, ThreshMode, ALGO_KEY
from ..config.utils_conf import pool_map, iterable
from ..config.utils_param import setup_parameters, separate_kwargs_dict
from ..preproc import cube_derotate, cube_collapse
from ..var import get_annulus_segments, cube_filter_highpass
[docs]
@dataclass
class LLSG_Params:
"""
Set of parameters for the LLSG algorithm.
See function `llsg` below for the documentation.
"""
cube: np.ndarray = None
angle_list: np.ndarray = None
fwhm: float = None
rank: int = 10
thresh: float = 1
max_iter: int = 10
low_rank_ref: bool = False
low_rank_mode: Enum = LowRankMode.SVD
auto_rank_mode: Enum = AutoRankMode.NOISE
residuals_tol: float = 1e-1
cevr: float = 0.9
thresh_mode: Enum = ThreshMode.SOFT
nproc: int = 1
asize: int = None
n_segments: int = 4
azimuth_overlap: int = None
radius_int: int = None
random_seed: int = None
high_pass: int = None
collapse: Enum = Collapse.MEDIAN
full_output: bool = False
verbose: bool = True
debug: bool = False
[docs]
def llsg(*all_args: List, **all_kwargs: dict):
"""Local Low-rank plus Sparse plus Gaussian-noise decomposition (LLSG) as
described in [GOM16]_. This first version of our algorithm aims at
decomposing ADI cubes into three terms L+S+G (low-rank, sparse and Gaussian
noise). Separating the noise from the S component (where the moving planet
should stay) allow us to increase the SNR of potential planets.
The three tunable parameters are the *rank* or expected rank of the L
component, the ``thresh`` or threshold for encouraging sparsity in the S
component and ``max_iter`` which sets the number of iterations. The rest of
parameters can be tuned at the users own risk (do it if you know what you're
doing).
Parameters
----------
all_args: list, optional
Positionnal arguments for the LLSG algorithm. Full list of parameters
below.
all_kwargs: dictionary, optional
Mix of keyword arguments that can initialize a LLSGParams and the optional
'rot_options' dictionnary, with keyword values for "border_mode", "mask_val",
"edge_blend", "interp_zeros", "ker" (see documentation of
``vip_hci.preproc.frame_rotate``). Can also contain a LLSGParams named as
`algo_params`.
LLSG parameters
----------
cube : numpy ndarray, 3d
Input ADI cube.
angle_list : numpy ndarray, 1d
Corresponding parallactic angle for each frame.
fwhm : float
Known size of the FWHM in pixels to be used.
rank : int, optional
Expected rank of the L component.
thresh : float, optional
Factor that scales the thresholding step in the algorithm.
max_iter : int, optional
Sets the number of iterations.
low_rank_ref :
If True the first estimation of the L component is obtained from the
remaining segments in the same annulus.
low_rank_mode : Enum, see `vip_hci.config.paramenum.LowRankMode`
Sets the method of solving the L update.
auto_rank_mode : Enum, see `vip_hci.config.paramenum.AutoRankMode`
If ``rank`` is None, then ``auto_rank_mode`` sets the way that the
``rank`` is determined: the noise minimization or the cumulative
explained variance ratio (when 'svd' is used).
residuals_tol : float, optional
The value of the noise decay to be used when ``rank`` is None and
``auto_rank_mode`` is set to ``noise``.
cevr : float, optional
Float value in the range [0,1] for selecting the cumulative explained
variance ratio to choose the rank automatically (if ``rank`` is None).
thresh_mode : Enum, see `vip_hci.config.paramenum.ThreshMode`
Sets the type of thresholding.
nproc : None or int, optional
Number of processes for parallel computing. If None the number of
processes will be set to cpu_count()/2. By default the algorithm works
in single-process mode.
asize : int or None, optional
If ``asize`` is None then each annulus will have a width of ``2*asize``.
If an integer then it is the width in pixels of each annulus.
n_segments : int or list of ints, optional
The number of segments for each annulus. When a single integer is given
it is used for all annuli.
azimuth_overlap : int or None, optional
Sets the amount of azimuthal averaging.
radius_int : int, optional
The radius of the innermost annulus. By default is 0, if >0 then the
central circular area is discarded.
random_seed : int or None, optional
Controls the seed for the Pseudo Random Number generator.
high_pass : odd int or None, optional
If set to an odd integer <=7, a high-pass filter is applied to the
frames. The ``vip_hci.var.frame_filter_highpass`` is applied twice,
first with the mode ``median-subt`` and a large window, and then with
``laplacian-conv`` and a kernel size equal to ``high_pass``. 5 is an
optimal value when ``fwhm`` is ~4.
collapse : Enum, see `vip_hci.config.paramenum.Collapse`
Sets the way of collapsing the frames for producing a final image.
full_output: bool, optional
Whether to return the final median combined image only or with other
intermediate arrays.
verbose : bool, optional
If True prints to stdout intermediate info.
debug : bool, optional
Whether to output some intermediate information.
Returns
-------
frame_s : numpy ndarray, 2d
Final frame (from the S component) after rotation and median-combination.
If ``full_output`` is True, the following intermediate arrays are returned:
list_l_array_der, list_s_array_der, list_g_array_der, frame_l, frame_s,
frame_g
"""
# Separating the parameters of the ParamsObject from the optionnal rot_options
class_params, rot_options = separate_kwargs_dict(
initial_kwargs=all_kwargs, parent_class=LLSG_Params
)
# Extracting the object of parameters (if any)
algo_params = None
if ALGO_KEY in rot_options.keys():
algo_params = rot_options[ALGO_KEY]
del rot_options[ALGO_KEY]
if algo_params is None:
algo_params = LLSG_Params(*all_args, **class_params)
if algo_params.cube.ndim != 3:
raise TypeError("Input array is not a cube (3d array)")
if not algo_params.cube.shape[0] == algo_params.angle_list.shape[0]:
msg = "Angle list vector has wrong length. It must equal the number"
msg += " frames in the cube"
raise TypeError(msg)
if algo_params.low_rank_mode == LowRankMode.BRP:
if algo_params.rank is None:
msg = "Auto rank only works with SVD low_rank_mode."
msg += " Set a value for the rank parameter"
raise ValueError(msg)
if algo_params.low_rank_ref:
msg = "Low_rank_ref only works with SVD low_rank_mode"
raise ValueError(msg)
global cube_init
if algo_params.high_pass is not None:
cube_init = cube_filter_highpass(
algo_params.cube, "median-subt", median_size=19, verbose=False
)
cube_init = cube_filter_highpass(
cube_init,
"laplacian-conv",
kernel_size=algo_params.high_pass,
verbose=False,
)
else:
cube_init = algo_params.cube
if algo_params.verbose:
start_time = time_ini()
n, y, x = algo_params.cube.shape
if algo_params.azimuth_overlap == 0:
algo_params.azimuth_overlap = None
if algo_params.radius_int is None:
algo_params.radius_int = 0
if algo_params.nproc is None:
algo_params.nproc = cpu_count() // 2 # Hyper-threading doubles the # of cores
# Same number of pixels per annulus
if algo_params.asize is None:
annulus_width = int(np.ceil(2 * algo_params.fwhm)) # as in the paper
elif isinstance(algo_params.asize, int):
annulus_width = algo_params.asize
n_annuli = int((y / 2 - algo_params.radius_int) / annulus_width)
# TODO: asize in pxs to be consistent with other functions
if algo_params.n_segments is None:
algo_params.n_segments = [4 for _ in range(n_annuli)] # as in the paper
elif isinstance(algo_params.n_segments, int):
algo_params.n_segments = [algo_params.n_segments] * n_annuli
elif algo_params.n_segments == "auto":
algo_params.n_segments = []
algo_params.n_segments.append(2) # for first annulus
algo_params.n_segments.append(3) # for second annulus
ld = 2 * np.tan(360 / 4 / 2) * annulus_width
for i in range(2, n_annuli): # rest of annuli
radius = i * annulus_width
ang = np.rad2deg(2 * np.arctan(ld / (2 * radius)))
algo_params.n_segments.append(int(np.ceil(360 / ang)))
if algo_params.verbose:
print("Annuli = {}".format(n_annuli))
# Azimuthal averaging of residuals
if algo_params.azimuth_overlap is None:
algo_params.azimuth_overlap = 360 # no overlapping, single config of segments
n_rots = int(360 / algo_params.azimuth_overlap)
matrix_s = np.zeros((n_rots, n, y, x))
if algo_params.full_output:
matrix_l = np.zeros((n_rots, n, y, x))
matrix_g = np.zeros((n_rots, n, y, x))
# Looping the he annuli
if algo_params.verbose:
print("Processing annulus: ")
for ann in range(n_annuli):
inner_radius = algo_params.radius_int + ann * annulus_width
n_segments_ann = algo_params.n_segments[ann]
if algo_params.verbose:
print(f"{ann+1} : in_rad={inner_radius}, n_segm={n_segments_ann}")
# TODO: pool_map as in xloci function: build first a list
for i in range(n_rots):
theta_init = i * algo_params.azimuth_overlap
indices = get_annulus_segments(
algo_params.cube[0],
inner_radius,
annulus_width,
n_segments_ann,
theta_init,
)
add_params = {
"indices": indices,
"i_patch": iterable(range(n_segments_ann)),
"n_segments_ann": n_segments_ann,
"verbose": False,
}
func_params = setup_parameters(
params_obj=algo_params, fkt=_decompose_patch, as_list=True, **add_params
)
patches = pool_map(
algo_params.nproc,
_decompose_patch,
*func_params,
)
for j in range(n_segments_ann):
yy = indices[j][0]
xx = indices[j][1]
if algo_params.full_output:
matrix_l[i, :, yy, xx] = patches[j][0]
matrix_s[i, :, yy, xx] = patches[j][1]
matrix_g[i, :, yy, xx] = patches[j][2]
else:
matrix_s[i, :, yy, xx] = patches[j]
if algo_params.full_output:
list_s_array_der = [
cube_derotate(
matrix_s[k],
algo_params.angle_list,
nproc=algo_params.nproc,
**rot_options,
)
for k in range(n_rots)
]
list_frame_s = [
cube_collapse(list_s_array_der[k], mode=algo_params.collapse)
for k in range(n_rots)
]
frame_s = cube_collapse(np.array(list_frame_s),
mode=algo_params.collapse)
list_l_array_der = [
cube_derotate(
matrix_l[k],
algo_params.angle_list,
nproc=algo_params.nproc,
**rot_options,
)
for k in range(n_rots)
]
list_frame_l = [
cube_collapse(list_l_array_der[k], mode=algo_params.collapse)
for k in range(n_rots)
]
frame_l = cube_collapse(np.array(list_frame_l),
mode=algo_params.collapse)
list_g_array_der = [
cube_derotate(
matrix_g[k],
algo_params.angle_list,
nproc=algo_params.nproc,
**rot_options,
)
for k in range(n_rots)
]
list_frame_g = [
cube_collapse(list_g_array_der[k], mode=algo_params.collapse)
for k in range(n_rots)
]
frame_g = cube_collapse(np.array(list_frame_g),
mode=algo_params.collapse)
else:
list_s_array_der = [
cube_derotate(
matrix_s[k],
algo_params.angle_list,
nproc=algo_params.nproc,
**rot_options,
)
for k in range(n_rots)
]
list_frame_s = [
cube_collapse(list_s_array_der[k], mode=algo_params.collapse)
for k in range(n_rots)
]
frame_s = cube_collapse(np.array(list_frame_s),
mode=algo_params.collapse)
if algo_params.verbose:
print("")
timing(start_time)
if algo_params.full_output:
return (
list_l_array_der,
list_s_array_der,
list_g_array_der,
frame_l,
frame_s,
frame_g,
)
else:
return frame_s
def _decompose_patch(
indices,
i_patch,
n_segments_ann,
rank,
low_rank_ref,
low_rank_mode,
thresh,
thresh_mode,
max_iter,
auto_rank_mode,
cevr,
residuals_tol,
random_seed,
debug=False,
full_output=False,
):
"""Patch decomposition."""
j = i_patch
yy = indices[j][0]
xx = indices[j][1]
data_segm = cube_init[:, yy, xx]
if low_rank_ref:
ref_segments = list(range(n_segments_ann))
ref_segments.pop(j)
for m, n in enumerate(ref_segments):
if m == 0:
yy_ref = indices[n][0]
xx_ref = indices[n][1]
else:
yy_ref = np.hstack((yy_ref, indices[n][0]))
xx_ref = np.hstack((xx_ref, indices[n][1]))
data_ref = cube_init[:, yy_ref, xx_ref]
else:
data_ref = data_segm
patch = _patch_rlrps(
data_segm,
data_ref,
rank,
low_rank_ref,
low_rank_mode,
thresh,
thresh_mode,
max_iter,
auto_rank_mode,
cevr,
residuals_tol,
random_seed,
debug=debug,
full_output=full_output,
)
return patch
def _patch_rlrps(
array,
array_ref,
rank,
low_rank_ref,
low_rank_mode,
thresh,
thresh_mode,
max_iter,
auto_rank_mode="noise",
cevr=0.9,
residuals_tol=1e-2,
random_seed=None,
debug=False,
full_output=False,
):
"""Patch decomposition based on GoDec/SSGoDec (Zhou & Tao 2011)"""
############################################################################
# Initializing L and S
############################################################################
L = array
if low_rank_ref:
L_ref = array_ref.T
else:
L_ref = None
S = np.zeros_like(L)
random_state = np.random.RandomState(random_seed)
itr = 0
power = 0
svdlib = "lapack"
while itr <= max_iter:
########################################################################
# Updating L
########################################################################
if low_rank_mode == "brp":
Y2 = random_state.randn(L.shape[1], rank)
for _ in range(power + 1):
Y1 = np.dot(L, Y2)
Y2 = np.dot(L.T, Y1)
Q, _ = qr(Y2, mode="economic")
Lnew = np.dot(np.dot(L, Q), Q.T)
elif low_rank_mode == "svd":
if itr == 0:
PC = get_eigenvectors(
rank,
L,
svdlib,
mode=auto_rank_mode,
cevr=cevr,
noise_error=residuals_tol,
data_ref=L_ref,
debug=debug,
collapse=True,
scaling="temp-standard",
)
rank = PC.shape[0] # so we can use the optimized rank
if low_rank_ref:
Lnew = np.dot(np.dot(PC, L).T, PC).T
else:
Lnew = np.dot(np.dot(L, PC.T), PC)
else:
rank_i = min(rank, min(L.shape[0], L.shape[1]))
PC = svd_wrapper(L, svdlib, rank_i, False,
random_state=random_state)
Lnew = np.dot(np.dot(L, PC.T), PC)
else:
raise RuntimeError("Low Rank estimation mode not recognized.")
########################################################################
# Updating S
########################################################################
T = L - Lnew + S
threshold = np.sqrt(median_absolute_deviation(T.ravel())) * thresh
# threshold = np.sqrt(median_absolute_deviation(T, axis=0)) * thresh
# threshmat = np.zeros_like(T)
# for i in range(threshmat.shape[0]):
# threshmat[i] = threshold
# threshold = threshmat
if debug:
print("threshold = {:.3f}".format(threshold))
S = thresholding(T, threshold, thresh_mode)
T -= S
L = Lnew + T
itr += 1
G = array - L - S
L = L.T
S = S.T
G = G.T
if full_output:
return L, S, G
else:
return S
[docs]
def thresholding(array, threshold, mode):
"""Array thresholding strategies."""
x = array.copy()
if mode == "soft":
j = np.abs(x) <= threshold
x[j] = 0
k = np.abs(x) > threshold
if isinstance(threshold, float):
x[k] = x[k] - np.sign(x[k]) * threshold
else:
x[k] = x[k] - np.sign(x[k]) * threshold[k]
elif mode == "hard":
j = np.abs(x) < threshold
x[j] = 0
elif mode == "nng":
j = np.abs(x) <= threshold
x[j] = 0
j = np.abs(x) > threshold
x[j] = x[j] - threshold**2 / x[j]
elif mode == "greater":
j = x < threshold
x[j] = 0
elif mode == "less":
j = x > threshold
x[j] = 0
else:
raise RuntimeError("Thresholding mode not recognized")
return x