#! /usr/bin/env python
"""
Module with NMF algorithm in concentric annuli for ADI/RDI.
"""
__author__ = "Valentin Christiaens, Thomas Bédrine"
__all__ = ["nmf_annular", "NMF_ANNULAR_Params"]
import numpy as np
from multiprocessing import cpu_count
from sklearn.decomposition import NMF
from dataclasses import dataclass, field
from typing import Tuple, List, Union
from enum import Enum
from ..preproc import cube_derotate, cube_collapse, check_pa_vector
from ..preproc.derotation import _find_indices_adi, _define_annuli
from ..var import get_annulus_segments, matrix_scaling
from ..config.utils_param import setup_parameters, separate_kwargs_dict
from ..config.paramenum import Initsvd, Imlib, Interpolation, HandleNeg, Collapse, ALGO_KEY
from ..config import timing, time_ini
from ..config.utils_conf import pool_map, iterable
[docs]
@dataclass
class NMF_ANNULAR_Params:
"""
Set of parameters for the NMF annular algorithm.
See function `nmf_annular` below for the documentation.
"""
cube: np.ndarray = None
angle_list: np.ndarray = None
cube_ref: np.ndarray = None
radius_int: int = 0
fwhm: float = 4
asize: int = 4
n_segments: int = 1
delta_rot: Union[float, Tuple[float]] = (0.1, 1)
ncomp: int = 1
init_svd: Enum = Initsvd.NNDSVD
nproc: int = 1
min_frames_lib: int = 2
max_frames_lib: int = 200
scaling: Enum = None
imlib: Enum = Imlib.VIPFFT
interpolation: Enum = Interpolation.LANCZOS4
collapse: Enum = Collapse.MEDIAN
full_output: bool = False
verbose: bool = True
theta_init: float = 0
weights: List = None
cube_sig: np.ndarray = None
handle_neg: Enum = HandleNeg.MASK
max_iter: int = 1000
random_state: int = None
nmf_args: dict = field(default_factory=lambda: {})
# TODO: update the doc of the params (some are missing)
[docs]
def nmf_annular(*all_args: List, **all_kwargs: dict):
"""Non Negative Matrix Factorization in concentric annuli, for ADI/RDI
sequences. Alternative to the annular ADI-PCA processing that does not rely
on SVD or ED for obtaining a low-rank approximation of the datacube.
This function embeds the scikit-learn NMF algorithm solved through either
the coordinate descent or the multiplicative update method.
Parameters
----------
all_args: list, optional
Positionnal arguments for the NMF annular algorithm. Full list of parameters
below.
all_kwargs: dictionary, optional
Mix of keyword arguments that can initialize a NMFAnnParams and the optional
'rot_options' dictionnary, with keyword values for "border_mode", "mask_val",
"edge_blend", "interp_zeros", "ker" (see documentation of
``vip_hci.preproc.frame_rotate``). Can also contain a NMFAnnParams named as
`algo_params`..
NMF annular parameters
----------
cube : numpy ndarray, 3d
Input cube.
angle_list : numpy ndarray, 1d
Corresponding parallactic angle for each frame.
cube_ref : numpy ndarray, 3d, optional
Reference library cube. For Reference Star Differential Imaging.
radius_int : int, optional
The radius of the innermost annulus. By default is 0, if >0 then the
central circular region is discarded.
fwhm : float, optional
Size of the FWHM in pixels. Default is 4.
asize : float, optional
The size of the annuli, in pixels.
n_segments : int or list of ints or 'auto', optional
The number of segments for each annulus. When a single integer is given
it is used for all annuli. When set to 'auto', the number of segments is
automatically determined for every annulus, based on the annulus width.
delta_rot : float or tuple of floats, optional
Factor for adjusting the parallactic angle threshold, expressed in
FWHM. Default is 1 (excludes 1 FWHM on each side of the considered
frame). If a tuple of two floats is provided, they are used as the lower
and upper intervals for the threshold (grows linearly as a function of
the separation). !!! Important: this is used even if a reference cube
is provided for RDI. This is to allow ARDI (PCA library built from both
science and reference cubes). If you want to do pure RDI, set delta_rot
to an arbitrarily high value such that the condition is never fulfilled
for science frames to make it in the PCA library.
ncomp : int, optional
How many components are used as for low-rank approximation of the
datacube.
scaling : {None, "temp-mean", spat-mean", "temp-standard",
"spat-standard"}, None or str optional
Pixel-wise scaling mode using ``sklearn.preprocessing.scale``
function. If set to None, the input matrix is left untouched. Otherwise:
* ``temp-mean``: temporal px-wise mean is subtracted.
* ``spat-mean``: spatial mean is subtracted.
* ``temp-standard``: temporal mean centering plus scaling pixel values
to unit variance (temporally).
* ``spat-standard``: spatial mean centering plus scaling pixel values
to unit variance (spatially).
DISCLAIMER: Using ``temp-mean`` or ``temp-standard`` scaling can improve
the speckle subtraction for ASDI or (A)RDI reductions. Nonetheless, this
involves a sort of c-ADI preprocessing, which (i) can be dangerous for
datasets with low amount of rotation (strong self-subtraction), and (ii)
should probably be referred to as ARDI (i.e. not RDI stricto sensu).
max_iter : int optional
The number of iterations for the coordinate descent solver.
random_state : int or None, optional
Controls the seed for the Pseudo Random Number generator.
mask_center_px : None or int
If None, no masking is done. If an integer > 1 then this value is the
radius of the circular mask.
source_xy : tuple of int, optional
For ADI-PCA, this triggers a frame rejection in the PCA library, with
``source_xy`` as the coordinates X,Y of the center of the annulus where
the PA criterion is estimated. When ``ncomp`` is a tuple, a PCA grid is
computed and the S/Ns (mean value in a 1xFWHM circular aperture) of the
given (X,Y) coordinates are computed.
delta_rot : int, optional
Factor for tunning the parallactic angle threshold, expressed in FWHM.
Default is 1 (excludes 1xFWHM on each side of the considered frame).
init_svd: str, optional {'nnsvd','nnsvda','random'}
Method used to initialize the iterative procedure to find H and W.
'nndsvd': non-negative double SVD recommended for sparseness
'nndsvda': NNDSVD where zeros are filled with the average of cube;
recommended when sparsity is not desired
'random': random initial non-negative matrix
collapse : {'median', 'mean', 'sum', 'trimmean'}, str optional
Sets the way of collapsing the frames for producing a final image.
full_output: boolean, optional
Whether to return the final median combined image only or with other
intermediate arrays.
verbose : {True, False}, bool optional
If True prints intermediate info and timing.
nmf_args: dictionary, optional
Additional arguments for scikit-learn NMF algorithm. See:
https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.NMF.html
Returns
-------
If full_output is False the final frame is returned. If True the algorithm
returns the reshaped NMF components, the reconstructed cube, the residuals,
the residuals derotated and the final frame.
"""
# Separating the parameters of the ParamsObject from the optionnal rot_options
class_params, rot_options = separate_kwargs_dict(initial_kwargs=all_kwargs,
parent_class=NMF_ANNULAR_Params
)
# Extracting the object of parameters (if any)
algo_params = None
if ALGO_KEY in rot_options.keys():
algo_params = rot_options[ALGO_KEY]
del rot_options[ALGO_KEY]
if algo_params is None:
algo_params = NMF_ANNULAR_Params(*all_args, **class_params)
if algo_params.verbose:
global start_time
start_time = time_ini()
array = algo_params.cube
if array.ndim != 3:
raise TypeError("Input array is not a cube or 3d array")
if array.shape[0] != algo_params.angle_list.shape[0]:
raise TypeError("Input vector or parallactic angles has wrong length")
n, y, _ = array.shape
algo_params.angle_list = check_pa_vector(algo_params.angle_list)
n_annuli = int((y / 2 - algo_params.radius_int) / algo_params.asize)
if isinstance(algo_params.delta_rot, tuple):
algo_params.delta_rot = np.linspace(
algo_params.delta_rot[0], algo_params.delta_rot[1], num=n_annuli
)
elif isinstance(algo_params.delta_rot, (int, float)):
algo_params.delta_rot = [algo_params.delta_rot] * n_annuli
if isinstance(algo_params.n_segments, int):
algo_params.n_segments = [
algo_params.n_segments for _ in range(n_annuli)]
elif algo_params.n_segments == "auto":
algo_params.n_segments = list()
algo_params.n_segments.append(2) # for first annulus
algo_params.n_segments.append(3) # for second annulus
ld = 2 * np.tan(360 / 4 / 2) * algo_params.asize
for i in range(2, n_annuli): # rest of annuli
radius = i * algo_params.asize
ang = np.rad2deg(2 * np.arctan(ld / (2 * radius)))
algo_params.n_segments.append(int(np.ceil(360 / ang)))
if algo_params.verbose:
msg = "N annuli = {}, FWHM = {:.3f}"
print(msg.format(n_annuli, algo_params.fwhm))
print("NMF per annulus (or annular sectors):")
if (
algo_params.nproc is None
): # Hyper-threading "duplicates" the cores -> cpu_count/2
algo_params.nproc = cpu_count() // 2
# how to handle negative values
if algo_params.handle_neg == HandleNeg.NULL:
array[np.where(array < 0)] = 0
elif algo_params.handle_neg == HandleNeg.SUBTR_MIN:
array -= np.amin(array)
elif not algo_params.handle_neg == HandleNeg.MASK:
raise ValueError("Mode to handle neg. pixels not recognized")
# The annuli are built, and the corresponding PA thresholds for frame
# rejection are calculated (at the center of the annulus)
cube_out = np.zeros_like(array)
cube_recon = np.zeros_like(array)
H_comps = np.zeros([algo_params.ncomp, array.shape[1], array.shape[2]])
if algo_params.cube_ref is None:
strict = False
else:
strict = True
for ann in range(n_annuli):
if isinstance(algo_params.ncomp, tuple) or isinstance(
algo_params.ncomp, np.ndarray
):
if len(algo_params.ncomp) == n_annuli:
ncompann = algo_params.ncomp[ann]
else:
raise TypeError(
"If `ncomp` is a tuple, it must match the " "number of annuli"
)
else:
ncompann = algo_params.ncomp
n_segments_ann = algo_params.n_segments[ann]
add_params = {
"ann": ann,
"n_annuli": n_annuli,
"annulus_width": algo_params.asize,
"n_segments": n_segments_ann,
"delta_rot": algo_params.delta_rot[ann],
"strict": strict,
}
func_params = setup_parameters(
params_obj=algo_params, fkt=_define_annuli, show_params=False, **add_params
)
res_ann_par = _define_annuli(**func_params)
pa_thr, inner_radius, ann_center = res_ann_par
indices = get_annulus_segments(
array[0],
inner_radius,
algo_params.asize,
n_segments_ann,
algo_params.theta_init,
)
# Library matrix is created for each segment and scaled if needed
for j in range(n_segments_ann):
yy = indices[j][0]
xx = indices[j][1]
if algo_params.handle_neg == "mask":
npts = range(len(yy))
if algo_params.cube_sig is not None:
yp = [
yy[i]
for i in npts
if np.amin(
array[:, yy[i], xx[i]]
- np.abs(algo_params.cube_sig[:, yy[i], xx[i]])
)
> 0
]
xp = [
xx[i]
for i in npts
if np.amin(
array[:, yy[i], xx[i]]
- np.abs(algo_params.cube_sig[:, yy[i], xx[i]])
)
> 0
]
else:
yp = [yy[i]
for i in npts if np.amin(array[:, yy[i], xx[i]]) > 0]
xp = [xx[i]
for i in npts if np.amin(array[:, yy[i], xx[i]]) > 0]
yy = tuple(yp)
xx = tuple(xp)
matrix_segm = array[:, yy, xx] # shape [nframes x npx_segment]
matrix_segm = matrix_scaling(matrix_segm, algo_params.scaling)
if algo_params.cube_ref is not None:
matrix_segm_ref = algo_params.cube_ref[:, yy, xx]
matrix_segm_ref = matrix_scaling(
matrix_segm_ref, algo_params.scaling)
else:
matrix_segm_ref = None
if algo_params.cube_sig is not None:
matrix_sig_segm = algo_params.cube_sig[:, yy, xx]
else:
matrix_sig_segm = None
add_params = {
"matrix": matrix_segm,
"frame": iterable(range(n)),
"pa_threshold": pa_thr,
"ann_center": ann_center,
"ncomp": ncompann,
"matrix_ref": matrix_segm_ref,
"matrix_sig_segm": matrix_sig_segm,
}
func_params = setup_parameters(
params_obj=algo_params,
fkt=do_nmf_patch,
as_list=True,
show_params=False,
**add_params,
)
res = pool_map(
algo_params.nproc,
do_nmf_patch,
*func_params,
)
res = np.array(res, dtype=object)
residuals = np.array(res[:, 0])
# ncomps = res[:, 1]
# nfrslib = res[:, 2]
recon = np.array(res[:, 1])
H = np.array(res[:, 2])
for fr in range(n):
cube_out[fr][yy, xx] = residuals[fr]
cube_recon[fr][yy, xx] = recon[fr]
for pp in range(algo_params.ncomp):
H_comps[pp][yy, xx] = H[0][pp] # just save H inferred for fr=0
if algo_params.verbose:
timing(start_time)
# Cube is derotated according to the parallactic angle and collapsed
cube_der = cube_derotate(
cube_out, algo_params.angle_list, nproc=algo_params.nproc, **rot_options
)
frame = cube_collapse(
cube_der, mode=algo_params.collapse, w=algo_params.weights)
if algo_params.verbose:
print("Done derotating and combining.")
timing(start_time)
if algo_params.full_output:
return cube_out, cube_der, cube_recon, H_comps, frame
else:
return frame
def do_nmf_patch(
matrix,
frame,
angle_list,
fwhm,
pa_threshold,
ann_center,
ncomp,
max_iter,
random_state,
init_svd,
min_frames_lib,
max_frames_lib,
matrix_ref,
matrix_sig_segm,
handle_neg,
**kwargs,
):
"""Solves the NMF for each frame patch (small matrix). For each frame we
find the frames to be rejected depending on the amount of rotation. The
library is also truncated on the other end (frames too far or which have
rotated more) which are more decorrelated to keep the computational cost
lower. This truncation is done on the annuli after 10*FWHM and the goal is
to keep min(num_frames/2, 200) in the library.
"""
# Note: blocks below allow the possibility of ARDI
if pa_threshold != 0:
indices_left = _find_indices_adi(
angle_list, frame, pa_threshold, truncate=True, max_frames=max_frames_lib
)
msg = "Too few frames left in the PCA library. "
msg += "Accepted indices length ({:.0f}) less than {:.0f}. "
msg += "Try decreasing either delta_rot or min_frames_lib."
try:
if matrix_sig_segm is not None:
data_ref = matrix[indices_left] - matrix_sig_segm[indices_left]
else:
data_ref = matrix[indices_left]
except IndexError:
if matrix_ref is None:
raise RuntimeError(msg.format(0, min_frames_lib))
data_ref = None
if data_ref.shape[0] < min_frames_lib and matrix_ref is None:
raise RuntimeError(msg.format(data_ref.shape[0], min_frames_lib))
elif pa_threshold == 0:
if matrix_sig_segm is not None:
data_ref = matrix - matrix_sig_segm
else:
data_ref = matrix
if matrix_ref is not None:
if data_ref is not None:
data_ref = np.vstack((matrix_ref, data_ref))
else:
data_ref = matrix_ref
# to avoid bug, just consider positive values
if np.median(data_ref) < 0:
raise ValueError("Mostly negative values in the cube")
else:
# how to handle negative values
if handle_neg == "null":
data_ref[np.where(data_ref < 0)] = 0
elif handle_neg == "subtr_min":
data_ref -= np.amin(data_ref)
else: # 'mask'
zp = np.nonzero(np.amin(data_ref, axis=0) > 0)
solver = "mu"
# if init_svd == 'nndsvda':
# solver = 'mu'
# else:
# solver = 'cd'
mod = NMF(
n_components=ncomp,
solver=solver,
init=init_svd,
max_iter=max_iter,
random_state=random_state,
**kwargs,
)
curr_frame = matrix[frame] # current frame
if matrix_sig_segm is not None:
curr_frame_emp = matrix[frame] - matrix_sig_segm[frame]
else:
curr_frame_emp = curr_frame.copy()
# how to handle negative values
if handle_neg == "null":
curr_frame_emp[np.where(curr_frame_emp < 0)] = 0
elif handle_neg == "subtr_min":
curr_frame_emp -= np.amin(curr_frame_emp)
else: # 'mask'
zzp = np.nonzero(curr_frame_emp > 0)
pos_p = np.intersect1d(zp[0], zzp[0])
curr_frame_emp = curr_frame_emp[pos_p]
data_ref = data_ref[:, pos_p]
H = mod.fit(data_ref).components_
W = mod.transform(curr_frame_emp[np.newaxis, ...])
reconstructed = np.dot(W, H)
# if masked neg values, reshape
if handle_neg == "mask": # 'mask'
recon = np.zeros(matrix.shape[1])
recon[pos_p] = reconstructed
reconstructed = recon.copy()
H_tmp = np.zeros([ncomp, matrix.shape[1]])
for pp in range(ncomp):
H_tmp[pp, pos_p] = H[pp]
H = H_tmp.copy()
residuals = curr_frame - reconstructed
return residuals, reconstructed, H
# return residuals, V.shape[0], data_ref.shape[0]