Source code for vip_hci.invprob.fmmf

#! /usr/bin/env python
"""
Forward model matched filter relying on either KLIP [SOU12]_ / [PUE16]_ or LOCI
[LAF07]_ for the PSF reference approximation. The original concept of matched
filter applied to KLIP has been first proposed in [RUF17]_ and then adapted in
[DAH21a]_ to use the LOCI framework. For both PSF-subtraction techniques, a
forward model of the PSF is computed for each pixel contained in the field of
view and each frame to account for the over-subtraction and self-subtraction
of potential planetary signal due to the reference PSF subtraction. The
obtained model is then compared to the pixels intensities within each frame of
the residual cube. The SNR associated to each pixel contained in the field of
view, as well as its estimated contrast is then obtained via a Gaussian maximum
likelihood approach.

.. [DAH21a]
   | Dahlqvist et al. 2021a
   | **Improving the RSM map exoplanet detection algorithm. PSF forward
     modelling and optimal selection of PSF subtraction techniques**
   | *The Astrophysical Journal Letters, Volume 646, p. 49*
   | `https://arxiv.org/abs/2012.05094
     <https://arxiv.org/abs/2012.05094>`_

.. [LAF07]
   | Lafreniere et al. 2007
   | **A New Algorithm for Point-Spread Function Subtraction in High-Contrast
     Imaging: A Demonstration with Angular Differential Imaging**
   | *The Astrophysical Journal, Volume 660, Issue 4, pp. 770-780*
   | `https://arxiv.org/abs/astro-ph/0702697
     <https://arxiv.org/abs/astro-ph/0702697>`_

.. [PUE16]
   | Pueyo 2016
   | **Detection and Characterization of Exoplanets using Projections on
     Karhunen Loeve Eigenimages: Forward Modeling**
   | *The Astrophysical Journal, Volume 824, Issue 2, p. 117*
   | `https://arxiv.org/abs/1604.06097
     <https://arxiv.org/abs/1604.06097>`_

.. [RUF17]
   | Ruffio et al. 2017
   | **Improving and Assessing Planet Sensitivity of the GPI Exoplanet Survey
     with a Forward Model Matched Filter**
   | *The Astrophysical Journal, Volume 842, p. 14*
   | `https://arxiv.org/abs/1705.05477
     <https://arxiv.org/abs/1705.05477>`_

.. [SOU12]
   | Soummer et al. 2012
   | **Detection and Characterization of Exoplanets and Disks Using Projections
     on Karhunen-Loève Eigenimages**
   | *The Astrophysical Journal Letters, Volume 755, Issue 2, p. 28*
   | `https://arxiv.org/abs/1207.4197
     <https://arxiv.org/abs/1207.4197>`_

"""

__author__ = "Carl-Henrik Dahlqvist, Thomas Bédrine"
__all__ = ["fmmf", "FMMF_Params"]

from multiprocessing import cpu_count
from dataclasses import dataclass, field
import numpy as np
import numpy.linalg as la
from enum import Enum
from skimage.draw import disk
from ..config.utils_param import setup_parameters, separate_kwargs_dict
from ..config.utils_conf import pool_map, iterable
from ..config import time_ini, timing
from ..config.paramenum import VarEstim, Imlib, Interpolation, ALGO_KEY
from ..fm import cube_inject_companions
from ..preproc.derotation import _find_indices_adi
from ..preproc import frame_crop, cube_crop_frames, cube_derotate
from ..var import get_annulus_segments, frame_center


[docs] @dataclass class FMMF_Params: """ Set of parameters for the FMMF algorithm. See function `fmmf` below for the documentation. """ cube: np.ndarray = None angle_list: np.ndarray = None psf: np.ndarray = None fwhm: float = None min_r: int = None max_r: int = None model: str = "KLIP" var: Enum = VarEstim.FR param: dict = field( default_factory=lambda: {"ncomp": 20, "tolerance": 5e-3, "delta_rot": 0.5} ) crop: int = 5 imlib: Enum = Imlib.VIPFFT interpolation: Enum = Interpolation.LANCZOS4 nproc: int = 1 verbose: bool = True
[docs] def fmmf(*all_args, **all_kwargs: dict): """ Forward model matched filter generating SNR map and contrast map, using either KLIP or LOCI as PSF subtraction techniques, as implemented in [RUF17]_ and [DAH21a]_. Parameters ---------- all_args: list, optional Positionnal arguments for the FMMF algorithm. Full list of parameters below. all_kwargs: dictionary, optional Mix of keyword arguments that can initialize a FMMFParams or a FMMFParams itself. Parameters ---------- cube : numpy ndarray, 3d Input cube (ADI sequences), Dim 1 = temporal axis, Dim 2-3 = spatial axis angle_list : numpy ndarray, 1d Parallactic angles for each frame of the ADI sequences. psf : numpy ndarray 2d 2d array with the normalized PSF template, with an odd shape. The PSF image must be centered wrt to the array! Therefore, it is recommended to run the function ``normalize_psf`` to generate a centered and flux-normalized PSF template. fwhm: int Full width at half maximum for the instrument PSF min_r : int,optional Center radius of the first annulus considered in the FMMF detection map estimation. The radius should be larger than half the value of the 'crop' parameter . Default is None which corresponds to one FWHM. max_r : int Center radius of the last annulus considered in the FMMF detection map estimation. The radius should be smaller or equal to half the size of the image minus half the value of the 'crop' parameter. Default is None which corresponds to half the size of the image minus half the value of the 'crop' parameter. model: string, optional Selected PSF-subtraction technique for the computation of the FMMF detection map. FMMF work either with KLIP or LOCI. Default is 'KLIP'. var: Enum, see `vip_hci.config.paramenum.VarEstim` Model used for the residual noise variance estimation used in the matched filtering (maximum likelihood estimation of the flux and SNR). param: dict, optional Dictionnary regrouping the parameters used by the KLIP (ncomp and delta_rot) or LOCI (tolerance and delta_rot) PSF-subtraction technique: * ncomp : int, optional. Number of components used for the low-rank approximation of the speckle field. Default is 20. * tolerance: float, optional. Tolerance level for the approximation of the speckle field via a linear combination of the reference images in the LOCI algorithm. Default is 5e-3. * delta_rot : float, optional. Factor for tunning the parallactic angle threshold, expressed in FWHM. Default is 0.5 (excludes 0.5xFHWM on each side of the considered frame). crop: int, optional Part of the PSF template considered in the estimation of the FMMF detection map. Default is 5. imlib : Enum, see `vip_hci.config.paramenum.Imlib` Parameter used for the derotation of the residual cube. See the documentation of the ``vip_hci.preproc.frame_rotate`` function. interpolation : Enum, see `vip_hci.config.paramenum.Interpolation` Parameter used for the derotation of the residual cube. See the documentation of the ``vip_hci.preproc.frame_rotate`` function. nproc : int or None, optional Number of processes for parallel computing. By default ('nproc=1') the algorithm works in single-process mode. If set to None, nproc is automatically set to half the number of available CPUs. verbose: bool, optional If True provide a message each time an annulus has been treated. Default True. Returns ------- flux_matrix : 2d ndarray Maximum likelihood estimate of the contrast for each pixel in the field of view snr_matrix : 2d ndarray Signal to noise ratio map (defined as the estimated contrast divided by the estimated standard deviation of the contrast). """ class_params, other_options = separate_kwargs_dict( initial_kwargs=all_kwargs, parent_class=FMMF_Params ) # Extracting the object of parameters (if any) algo_params = None if ALGO_KEY in other_options.keys(): algo_params = other_options[ALGO_KEY] del other_options[ALGO_KEY] if algo_params is None: algo_params = FMMF_Params(*all_args, **class_params) start_time = time_ini(algo_params.verbose) if algo_params.crop >= 2 * round(algo_params.fwhm) + 1: raise ValueError( "Maximum cropsize should be lower or equal to two" + " FWHM,please change accordingly the value of 'crop'" ) if algo_params.min_r is None: algo_params.min_r = int(round(algo_params.fwhm)) if algo_params.max_r is None: algo_params.max_r = algo_params.cube.shape[-1] // 2 - ( algo_params.crop // 2 + 1 ) if algo_params.nproc is None: algo_params.nproc = cpu_count() // 2 add_params = {"ann_center": iterable( range(algo_params.min_r, algo_params.max_r))} func_params = setup_parameters( params_obj=algo_params, fkt=_snr_contrast_esti, as_list=True, show_params=False, **add_params, ) res_full = pool_map( algo_params.nproc, _snr_contrast_esti, *func_params, ) flux_matrix = np.zeros( (algo_params.cube.shape[1], algo_params.cube.shape[2])) snr_matrix = np.zeros( (algo_params.cube.shape[1], algo_params.cube.shape[2])) for res_temp in res_full: indices = get_annulus_segments(algo_params.cube[0], res_temp[2], 1) flux_matrix[indices[0][0], indices[0][1]] = res_temp[0] snr_matrix[indices[0][0], indices[0][1]] = res_temp[1] if algo_params.verbose: timing(start_time) return flux_matrix, snr_matrix
def _snr_contrast_esti( ann_center, cube, angle_list, psf, fwhm, model, var, param, crop, imlib, interpolation, verbose, ): """ Computation of the SNR and contrast associated to the pixels contained in a given annulus via the foward model matched filter """ n, y, x = cube.shape evals_matrix = [] evecs_matrix = [] KL_basis_matrix = [] refs_mean_sub_matrix = [] sci_mean_sub_matrix = [] resicube_klip = None ind_ref_list = None coef_list = None ncomp = param["ncomp"] tolerance = param["tolerance"] delta_rot = param["delta_rot"] # Computation of the reference PSF, and the matrices # required for the computation of the PSF forward models pa_threshold = np.rad2deg( 2 * np.arctan(delta_rot * fwhm / (2 * (ann_center)))) mid_range = np.abs(np.amax(angle_list) - np.amin(angle_list)) / 2 if pa_threshold >= mid_range - mid_range * 0.1: pa_threshold = float(mid_range - mid_range * 0.1) if model == "KLIP": resicube_klip = np.zeros_like(cube) indices = get_annulus_segments( cube[0], ann_center - int(round(fwhm) / 2), int(round(fwhm)), 1 ) for k in range(0, cube.shape[0]): res_temp = KLIP_patch( k, cube[:, indices[0][0], indices[0][1]], ncomp, angle_list, int(round(fwhm)), pa_threshold, ann_center, ) evals_temp = res_temp[0] evecs_temp = res_temp[1] KL_basis_temp = res_temp[2] sub_img_rows_temp = res_temp[3] refs_mean_sub_temp = res_temp[4] sci_mean_sub_temp = res_temp[5] resicube_klip[k, indices[0][0], indices[0][1]] = sub_img_rows_temp evals_matrix.append(evals_temp) evecs_matrix.append(evecs_temp) KL_basis_matrix.append(KL_basis_temp) refs_mean_sub_matrix.append(refs_mean_sub_temp) sci_mean_sub_matrix.append(sci_mean_sub_temp) mcube = cube_derotate( resicube_klip, angle_list, imlib=imlib, interpolation=interpolation ) elif model == "LOCI": resicube, ind_ref_list, coef_list = LOCI_FM( cube, psf, ann_center, angle_list, int(round(fwhm)), fwhm, tolerance, delta_rot, pa_threshold, ) mcube = cube_derotate( resicube, angle_list, imlib=imlib, interpolation=interpolation ) ceny, cenx = frame_center(cube[0]) indices = get_annulus_segments(mcube[0], ann_center, 1, 1) indicesy = indices[0][0] indicesx = indices[0][1] flux_esti = np.zeros_like(indicesy) prob_esti = np.zeros_like(indicesy) var_f = _var_esti(mcube, angle_list, var, crop, ann_center) for i in range(0, len(indicesy)): psfm_temp = None poscenty = indicesy[i] poscentx = indicesx[i] indices = get_annulus_segments( cube[0], ann_center - int(round(fwhm) / 2), int(round(fwhm)), 1 ) an_dist = np.sqrt((poscenty - ceny) ** 2 + (poscentx - cenx) ** 2) theta = np.degrees(np.arctan2(poscenty - ceny, poscentx - cenx)) model_matrix = cube_inject_companions( np.zeros_like(cube), psf, angle_list, flevel=1, rad_dists=an_dist, theta=theta, n_branches=1, verbose=False, imlib=imlib, interpolation=interpolation, ) # PSF forward model computation for KLIP if model == "KLIP": psf_map = np.zeros_like(model_matrix) for b in range(0, n): psf_map_temp = _perturb( b, model_matrix[:, indices[0][0], indices[0][1]], ncomp, evals_matrix, evecs_matrix, KL_basis_matrix, sci_mean_sub_matrix, refs_mean_sub_matrix, angle_list, fwhm, pa_threshold, ann_center, ) psf_map[b, indices[0][0], indices[0][1]] = psf_map_temp psf_map[b, indices[0][0], indices[0] [1]] -= np.mean(psf_map_temp) psf_map_der = cube_derotate( psf_map, angle_list, imlib=imlib, interpolation=interpolation ) psfm_temp = cube_crop_frames( psf_map_der, int(2 * round(fwhm) + 1), xy=(poscentx, poscenty), verbose=False, ) # PSF forward model computation for LOCI if model == "LOCI": values_fc = model_matrix[:, indices[0][0], indices[0][1]] cube_res_fc = np.zeros_like(model_matrix) matrix_res_fc = np.zeros( (values_fc.shape[0], indices[0][0].shape[0])) for e in range(values_fc.shape[0]): recon_fc = np.dot(coef_list[e], values_fc[ind_ref_list[e]]) matrix_res_fc[e] = values_fc[e] - recon_fc cube_res_fc[:, indices[0][0], indices[0][1]] = matrix_res_fc cube_der_fc = cube_derotate( cube_res_fc - np.mean(cube_res_fc), angle_list, imlib=imlib, interpolation=interpolation, ) psfm_temp = cube_crop_frames( cube_der_fc, int(2 * round(fwhm) + 1), xy=(poscentx, poscenty), verbose=False, ) num = [] denom = [] # Matched Filter for j in range(n): if var == "FR": svar = var_f[j] elif var == "FM": svar = var_f[i, j] elif var == "TE": svar = var_f[i, j] if psfm_temp.shape[1] == crop: psfm = psfm_temp[j] else: psfm = frame_crop( psfm_temp[j], crop, cenxy=[int(psfm_temp.shape[-1] / 2), int(psfm_temp.shape[-1] / 2)], verbose=False, ) num.append( np.multiply( frame_crop( mcube[j], crop, cenxy=[ poscentx, poscenty], verbose=False ), psfm, ).sum() / svar ) denom.append(np.multiply(psfm, psfm).sum() / svar) flux_esti[i] = sum(num) / np.sqrt(sum(denom)) prob_esti[i] = sum(num) / sum(denom) if verbose == True: print("Radial distance " + "{}".format(ann_center) + " done!") return prob_esti, flux_esti, ann_center def _var_esti(mcube, angle_list, var, crop, ann_center): """ Computation of the residual noise variance """ n, y, x = mcube.shape if var == "FR": var_f = np.zeros(n) indices = get_annulus_segments( mcube[0], ann_center - int(crop / 2), crop, 1) poscentx = indices[0][1] poscenty = indices[0][0] for a in range(n): var_f[a] = np.var(mcube[a, poscenty, poscentx]) elif var == "FM": indices = get_annulus_segments(mcube[0], ann_center, 1, 1) indicesy = indices[0][0] indicesx = indices[0][1] var_f = np.zeros((len(indicesy), n)) indices = get_annulus_segments( mcube[0], ann_center - int(crop / 2), crop, 1) for a in range(len(indicesy)): indc = disk((indicesy[a], indicesx[a]), 3) positionx = [] positiony = [] for k in range(0, len(indices[0][1])): cond1 = set(np.where(indices[0][1][k] == indc[1])[0]) cond2 = set(np.where(indices[0][0][k] == indc[0])[0]) if len(cond1 & cond2) == 0: positionx.append(indices[0][1][k]) positiony.append(indices[0][0][k]) for b in range((n)): var_f[a, b] = np.var(mcube[b, positiony, positionx]) elif var == "TE": indices = get_annulus_segments(mcube[0], ann_center, 1, 1) indicesy = indices[0][0] indicesx = indices[0][1] var_f = np.zeros((len(indicesy), n)) mcube_derot = cube_derotate(mcube, -angle_list) for a in range(0, len(indicesy)): radist = np.sqrt( (indicesx[a] - int(x / 2)) ** 2 + (indicesy[a] - int(y / 2)) ** 2 ) if (indicesy[a] - int(y / 2)) >= 0: ang_s = np.arccos( (indicesx[a] - int(x / 2)) / radist) / np.pi * 180 else: ang_s = ( 360 - np.arccos((indicesx[a] - int(x / 2)) / radist) / np.pi * 180 ) for b in range(n): twopi = 2 * np.pi sigposy = int( y / 2 + np.sin((ang_s - angle_list[b]) / 360 * twopi) * radist ) sigposx = int( x / 2 + np.cos((ang_s - angle_list[b]) / 360 * twopi) * radist ) y0 = int(sigposy - int(crop / 2)) y1 = int(sigposy + int(crop / 2) + 1) # +1 cause endpoint is # excluded when slicing x0 = int(sigposx - int(crop / 2)) x1 = int(sigposx + int(crop / 2) + 1) mask = np.ones(mcube_derot.shape[0], dtype=bool) mask[b] = False mcube_sel = mcube_derot[mask, y0:y1, x0:x1] var_f[a, b] = np.var(np.asarray(mcube_sel)) return var_f def _perturb( frame, model_matrix, numbasis, evals_matrix, evecs_matrix, KL_basis_matrix, sci_mean_sub_matrix, refs_mean_sub_matrix, angle_list, fwhm, pa_threshold, ann_center, ): """ Function allowing the estimation of the PSF forward model when relying on KLIP for the computation of the speckle field. The code is based on the PyKLIP library considering only the ADI case with a single number of principal components considered. For more details about the code, consider the PyKLIP library or the original articles (Pueyo, L. 2016, ApJ, 824, 117 or Ruffio, J.-B., Macintosh, B., Wang, J. J., & Pueyo, L. 2017, ApJ, 842) """ # Selection of the reference library based on the given parallactic angle # threshold if pa_threshold != 0: indices_left = _find_indices_adi( angle_list, frame, pa_threshold, truncate=False ) models_ref = model_matrix[indices_left] else: models_ref = model_matrix # Computation of the self-subtraction and over-subtraction for the current # frame model_sci = model_matrix[frame] KL_basis = KL_basis_matrix[frame] sci_mean_sub = sci_mean_sub_matrix[frame] refs_mean_sub = refs_mean_sub_matrix[frame] evals = evals_matrix[frame] evecs = evecs_matrix[frame] max_basis = KL_basis.shape[0] N_pix = KL_basis.shape[1] models_msub = models_ref - np.nanmean(models_ref, axis=1)[:, None] models_msub[np.where(np.isnan(models_msub))] = 0 model_sci_msub = model_sci - np.nanmean(model_sci) model_sci_msub[np.where(np.isnan(model_sci_msub))] = 0 model_sci_msub_rows = np.reshape(model_sci_msub, (1, N_pix)) sci_mean_sub_rows = np.reshape(sci_mean_sub, (1, N_pix)) delta_KL = np.zeros([max_basis, N_pix]) proj_models_T = models_msub.dot(refs_mean_sub.transpose()) for k in range(max_basis): Zk = np.reshape(KL_basis[k, :], (1, KL_basis[k, :].size)) Vk = (evecs[:, k])[:, None] diagVk_T = (Vk.T).dot(proj_models_T) proj_models_Vk = proj_models_T.dot(Vk) fac = -(1 / (2 * np.sqrt(evals[k]))) term1 = (diagVk_T.dot(Vk) + ((Vk.T).dot(proj_models_Vk))).dot(Zk) term2 = (Vk.T).dot(models_msub) DeltaZk = fac * term1 + term2 for j in range(k): Zj = KL_basis[j, :][None, :] Vj = evecs[:, j][:, None] fac = np.sqrt(evals[j]) / (evals[k] - evals[j]) t1 = diagVk_T.dot(Vj) t2 = (Vj.T).dot(proj_models_Vk) DeltaZk += fac * (t1 + t2).dot(Zj) for j in range(k + 1, max_basis): Zj = KL_basis[j, :][None, :] Vj = evecs[:, j][:, None] fac = np.sqrt(evals[j]) / (evals[k] - evals[j]) t1 = diagVk_T.dot(Vj) t2 = (Vj.T).dot(proj_models_Vk) DeltaZk += fac * (t1 + t2).dot(Zj) delta_KL[k] = DeltaZk / np.sqrt(evals[k]) oversubtraction_inner_products = np.dot(model_sci_msub_rows, KL_basis.T) selfsubtraction_1_inner_products = np.dot(sci_mean_sub_rows, delta_KL.T) selfsubtraction_2_inner_products = np.dot(sci_mean_sub_rows, KL_basis.T) oversubtraction_inner_products[max_basis::] = 0 klipped_oversub = np.dot(oversubtraction_inner_products, KL_basis) selfsubtraction_1_inner_products[0, max_basis::] = 0 selfsubtraction_2_inner_products[0, max_basis::] = 0 klipped_selfsub = np.dot(selfsubtraction_1_inner_products, KL_basis) + np.dot( selfsubtraction_2_inner_products, delta_KL ) return model_sci[None, :] - klipped_oversub - klipped_selfsub def KLIP_patch(frame, matrix, numbasis, angle_list, fwhm, pa_threshold, ann_center, nframes=None): """ Function allowing the computation of the reference PSF via KLIP for a given sub-region of the original ADI sequence. Code inspired by the PyKLIP librabry """ max_frames_lib = 200 if pa_threshold != 0: if ann_center > fwhm * 20: indices_left = _find_indices_adi( angle_list, frame, pa_threshold, truncate=True, max_frames=max_frames_lib, ) else: indices_left = _find_indices_adi( angle_list, frame, pa_threshold, truncate=False, nframes=nframes ) refs = matrix[indices_left] else: refs = matrix sci = matrix[frame] sci_mean_sub = sci - np.nanmean(sci) # sci_mean_sub[np.where(np.isnan(sci_mean_sub))] = 0 refs_mean_sub = refs - np.nanmean(refs, axis=1)[:, None] # refs_mean_sub[np.where(np.isnan(refs_mean_sub))] = 0 # Covariance matrix definition covar_psfs = np.cov(refs_mean_sub) covar_psfs *= np.size(sci) - 1 tot_basis = covar_psfs.shape[0] numbasis = np.clip(numbasis - 1, 0, tot_basis - 1) max_basis = np.max(numbasis) + 1 # Computation of the eigenvectors/values of the covariance matrix evals, evecs = la.eigh(covar_psfs) evals = np.copy(evals[int(tot_basis - max_basis): int(tot_basis)]) evecs = np.copy(evecs[:, int(tot_basis - max_basis): int(tot_basis)]) evals = np.copy(evals[::-1]) evecs = np.copy(evecs[:, ::-1]) # Computation of the principal components KL_basis = np.dot(refs_mean_sub.T, evecs) KL_basis = KL_basis * (1.0 / np.sqrt(evals))[None, :] KL_basis = KL_basis.T N_pix = np.size(sci_mean_sub) sci_rows = np.reshape(sci_mean_sub, (1, N_pix)) inner_products = np.dot(sci_rows, KL_basis.T) inner_products[0, int(max_basis)::] = 0 # Projection of the science image on the selected prinicpal component # to generate the speckle field model klip_reconstruction = np.dot(inner_products, KL_basis) # Subtraction of the speckle field model from the riginal science image # to obtain the residual frame sub_img_rows = sci_rows - klip_reconstruction return ( evals, evecs, KL_basis, np.reshape(sub_img_rows, (N_pix)), refs_mean_sub, sci_mean_sub, ) def LOCI_FM(cube, psf, ann_center, angle_list, asize, fwhm, Tol, delta_rot, pa_threshold): """ Computation of the optimal factors weigthing the linear combination of reference frames used to obtain the modeled speckle field for each frame and allowing the determination of the forward modeled PSF. Estimation of the cube of residuals based on the modeled speckle field. """ cube_res = np.zeros_like(cube) ceny, cenx = frame_center(cube[0]) radius_int = ann_center - int(1.5 * asize) if radius_int <= 0: radius_int = 1 for ann in range(3): n_segments_ann = 1 inner_radius_ann = radius_int + ann * asize indices = get_annulus_segments( cube[0], inner_radius=inner_radius_ann, width=asize, nsegm=n_segments_ann ) ind_opt = get_annulus_segments( cube[0], inner_radius=inner_radius_ann, width=asize, nsegm=n_segments_ann, optim_scale_fact=2, ) ayxyx = [ inner_radius_ann, pa_threshold, indices[0][0], indices[0][1], ind_opt[0][0], ind_opt[0][1], ] matrix_res, ind_ref, coef, yy, xx = _leastsq_patch_fm( ayxyx, angle_list, fwhm, cube, 100, Tol, psf=psf ) if ann == 1: ind_ref_list = ind_ref coef_list = coef cube_res[:, yy, xx] = matrix_res return cube_res, ind_ref_list, coef_list def _leastsq_patch_fm(ayxyx, angle_list, fwhm, cube, dist_threshold, tol, psf=None): """ Function allowing th estimation of the optimal factors for the modeled speckle field estimation via the LOCI framework. The code has been developped based on the VIP python function _leastsq_patch, but return additionnaly the set of coefficients used for the speckle field computation. """ ann_center, pa_threshold, yy, xx, yy_opti, xx_opti = ayxyx ind_ref_list = [] coef_list = [] yy_opt = [] xx_opt = [] for j in range(0, len(yy_opti)): if not any( x in np.where(yy == yy_opti[j])[0] for x in np.where(xx == xx_opti[j])[0] ): xx_opt.append(xx_opti[j]) yy_opt.append(yy_opti[j]) values = cube[:, yy, xx] matrix_res = np.zeros((values.shape[0], yy.shape[0])) values_opt = cube[:, yy_opti, xx_opti] n_frames = cube.shape[0] for i in range(n_frames): ind_fr_i = _find_indices_adi( angle_list, i, pa_threshold, truncate=False) if len(ind_fr_i) > 0: A = values_opt[ind_fr_i] b = values_opt[i] coef = np.linalg.lstsq(A.T, b, rcond=tol)[0] # SVD method else: msg = "No frames left in the reference set. Try increasing " msg += "`dist_threshold` or decreasing `delta_rot`." raise RuntimeError(msg) ind_ref_list.append(ind_fr_i) coef_list.append(coef) recon = np.dot(coef, values[ind_fr_i]) matrix_res[i] = values[i] - recon return matrix_res, ind_ref_list, coef_list, yy, xx