Source code for vip_hci.fm.negfd_fmerit

#! /usr/bin/env python
"""Module with function of merit definitions for the NEGFD optimization."""

__author__ = "Valentin Christiaens, O. Wertz, Carlos Alberto Gomez Gonzalez"
__all__ = ["chisquare_fd"]

import numpy as np
from .negfd_interp import interpolate_model
from .utils_negfd import cube_disk_free
from ..psfsub import pca


[docs] def chisquare_fd( modelParameters, cube, angs, disk_model, mask_fm, initialState, force_params=None, grid_param_list=None, fmerit="sum", mu_sigma=None, psfn=None, algo=pca, algo_options={}, interp_order=-1, imlib="skimage", interpolation="biquintic", transmission=None, weights=None, debug=False, rot_options={}, ): r""" Calculate the figure of merit to minimze residuals after disk subtraction. The reduced :math:`\chi^2` is defined as:: .. math:: \chi^2_r = \frac{1}{N-Npar}\sum_{j=1}^{N} \frac{(I_j-\mu)^2}{\sigma^2} (mu_sigma is a tuple) or: .. math:: \chi^2_r = \frac{1}{N-Npar}\sum_{j=1}^{N} |I_j| (mu_sigma=None), where N is the number of pixels within the binary mask mask_fm, Npar the number of parameters to be fitted (4 for a 3D input cube, 3+n_ch for a 4D input cube), and :math:`I_j` the j-th pixel intensity. Parameters ---------- modelParameters: tuple The free model parameters. E.g. (x, y, theta, scal, flux) for a 3D input cube (if force_params=None) or (x, y, theta, scal, f1, ..., fN) for a 4D cube with N spectral channels (if force_params=None). cube: 3d or 4d numpy ndarray Input ADI or ADI+IFS cube. angs: numpy.array The parallactic angle fits image expressed as a numpy.array. psfs_norm: numpy.array The scaled psf expressed as a numpy.array. fwhm : float The FHWM in pixels. annulus_width: int, optional The width in pixels of the annulus on which the PCA is done. aperture_radius: int, optional The radius of the circular aperture in terms of the FWHM. initialState: numpy.array All initial parameters (including fixed ones) applied to the disk model image. force_params: None or list/tuple of bool, optional If not None, list/tuple of bool corresponding to parameters to fix. grid_params_list: list of lists/1d nd arrays, or None If input disk_model is a grid of either images (for 3D input cube) or spectral cubes (for a 4D input cube), this should be provided. It should be a list of either lists or 1d nd arrays corresponding to the parameter values sampled by the input disk model grid, with their lengths matching the respective first dimensions of disk_model. ncomp: int or None The number of principal components for PCA-based algorithms. mu_sigma: tuple of 2 floats or None, opt If set to None: not used, and falls back to original version of the algorithm, using fmerit. If set to anything but None: will compute the mean and standard deviation of pixel intensities in an annulus centered on the location of the companion, excluding the area directly adjacent to the companion. fmerit : {'sum', 'stddev'}, string optional Chooses the figure of merit to be used. stddev works better for close in companions sitting on top of speckle noise. collapse : {'median', 'mean', 'sum', 'trimmean', None}, str or None, opt Sets the way of collapsing the frames for producing a final image. If None then the cube of residuals is used when measuring the function of merit (instead of a single final frame). algo: python routine, opt {pca_annulus, pca_annular, pca, custom} Routine to be used to model and subtract the stellar PSF. From an input cube, derotation angles, and optional arguments, it should return a post-processed frame. algo_options: dict, opt Dictionary with additional parameters related to the algorithm (e.g. tol, min_frames_lib, max_frames_lib). If 'algo' is not a vip routine, this dict should contain all necessary arguments apart from the cube and derotation angles. Note: arguments such as ncomp, svd_mode, scaling, imlib, interpolation or collapse can also be included in this dict (the latter are also kept as function arguments for consistency with older versions of vip). interp_order: int or tuple of int, optional, {-1,0,1} [only used if grid_params_list is not None] Interpolation mode for model interpolation. If a tuple of integers, the length should match the number of grid dimensions and will trigger a different interpolation mode for the different parameters. - -1: Order 1 spline interpolation in logspace for the parameter - 0: nearest neighbour model - 1: Order 1 spline interpolation imlib : str, optional See the documentation of the ``vip_hci.preproc.frame_shift`` function. interpolation : str, optional See the documentation of the ``vip_hci.preproc.frame_shift`` function. transmission: numpy array, optional Array with 2 columns. First column is the radial separation in pixels. Second column is the off-axis transmission (between 0 and 1) at the radial separation given in column 1. weights : 1d array, optional If provided, the negative fake companion fluxes will be scaled according to these weights before injection in the cube. Can reflect changes in the observing conditions throughout the sequence. debug: bool, opt Whether to debug and plot the post-processed frame after injection of the negative fake companion. Returns ------- out: float The reduced chi squared. """ grid_ndim = disk_model.ndim-cube.ndim+1 if cube.ndim == 3: multispectral = False if force_params is not None: grid_params = [] df_params = [] c_free = 0 c_forced = 0 for i in range(len(force_params)): if force_params[i]: if i < grid_ndim: grid_params.append(initialState[c_forced]) else: df_params.append(initialState[c_forced]) c_forced += 1 else: if i < grid_ndim: grid_params.append(modelParameters[c_free]) else: df_params.append(modelParameters[c_free]) c_free += 1 x, y, theta, scal = tuple(df_params[:4]) flux_tmp = df_params[-1] else: try: if grid_ndim > 0: grid_params = modelParameters[:grid_ndim] x, y, theta, scal = modelParameters[grid_ndim:grid_ndim+4] flux_tmp = modelParameters[grid_ndim+4:] except TypeError: msg = "modelParameters must be a tuple, {} was given" print(msg.format(type(modelParameters))) else: multispectral = True if force_params is not None: flux_fix = force_params[grid_ndim+4] for j in range(len(force_params) - (5+grid_ndim)): if force_params[j+5+grid_ndim] != flux_fix: msg = "All fluxes need to be either free or fixed" raise ValueError(msg) grid_params = [] df_params = [] c_free = 0 c_forced = 0 for i in range(len(force_params[:4+grid_ndim])): if force_params[i]: if i < grid_ndim: grid_params.append(initialState[c_forced]) else: df_params.append(initialState[c_forced]) c_forced += 1 else: if i < grid_ndim: grid_params.append(modelParameters[c_free]) else: df_params.append(modelParameters[c_free]) c_free += 1 if flux_fix: flux_tmp = initialState[c_forced:] else: flux_tmp = modelParameters[c_free:] x, y, theta, scal = tuple(df_params) else: try: if grid_ndim > 0: grid_params = modelParameters[:grid_ndim] x = modelParameters[grid_ndim+0] y = modelParameters[grid_ndim+1] theta = modelParameters[grid_ndim+2] flux_tmp = np.array(modelParameters[grid_ndim+3:]) except TypeError: msg = "modelParameters must be a tuple, {} was given" print(msg.format(type(modelParameters))) # apply temporal weights, if any if weights is None: flux = flux_tmp elif np.isscalar(flux_tmp): flux = flux_tmp * weights else: flux = np.outer(flux_tmp, weights) df_params = x, y, theta, scal, flux # interpolate in the model grid, if any if grid_ndim > 0: grid_params = tuple(grid_params) # Return infinity if requested grid params outside original bounds. for p in range(len(grid_param_list)): if grid_params[p] < grid_param_list[p][0]: return np.inf elif grid_params[p] > grid_param_list[p][-1]: return np.inf # Otherwise Interpolate disk_img from the input grid. disk_img = interpolate_model(grid_params, grid_param_list, disk_model, multispectral=multispectral, interp_order=interp_order) else: disk_img = disk_model.copy() # set imlib for rotation and shift if imlib == "opencv": imlib_sh = imlib imlib_rot = imlib elif imlib == "skimage" or imlib == "ndimage-interp": imlib_sh = "ndimage-interp" imlib_rot = "skimage" elif imlib == "vip-fft" or imlib == "ndimage-fourier": imlib_sh = "ndimage-fourier" imlib_rot = "vip-fft" else: raise TypeError("Interpolation not recognized.") # Create the cube with the negative fake companion injected cube_negfd = cube_disk_free( df_params, cube, angs, disk_img, psfn=None, imlib=imlib_rot, interpolation=interpolation, imlib_sh=imlib_sh, interpolation_sh=interpolation, transmission=transmission, weights=weights, **rot_options ) # post-process the empty cube res = algo(cube=cube_negfd, angle_list=angs, **algo_options) values = res[np.where(mask_fm)] # Function of merit # in case algo is run on part of the field (e.g. annulus), discard: values = values[values != 0] ddf = values.size - len(modelParameters) if ddf < 1: msg = "Not enough pixels at the intersection of input binary mask and " msg += "area where the algorithm is run. Check mask_fm and algo_params." raise ValueError(msg) elif values.size < 10: msg = "WARNING: less than 10 pixels in the optimization area (" msg += "intersection of input binary mask and where the algorithm is " msg += "run). You may want to double-check mask_fm and algo_params." print(msg) if mu_sigma is None: # old version - delete? if fmerit == "sum": chi = np.sum(np.abs(values)) / (values.size - len(modelParameters)) elif fmerit == "stddev": chi = np.std(values) * values.size / ddf else: raise RuntimeError("fmerit choice not recognized.") else: # true expression of a gaussian log probability mu = mu_sigma[0] sigma = mu_sigma[1] # check format if isinstance(mu, np.ndarray): if mu.shape == cube.shape[-2:]: mu = mu[np.where(mask_fm)] mu = mu[values != 0] else: msg = "If input mu is an array, it should have same shape as " msg += "cube frames" raise TypeError(msg) if isinstance(sigma, np.ndarray): if sigma.shape == cube.shape[-2:]: sigma = sigma[np.where(mask_fm)] sigma = sigma[values != 0] else: msg = "If input sigma is an array, it should have same shape as" msg += " cube frames" raise TypeError(msg) chi = np.sum(np.power((mu - values)/sigma, 2)) / ddf return chi