Source code for vip_hci.metrics.contrcurve

#! /usr/bin/env python
"""
Module with contrast curve generation function.

"""

__author__ = "C. Gomez, O. Absil @ ULg"
__all__ = ["contrast_curve", "noise_per_annulus", "throughput", "aperture_flux"]

import numpy as np
import pandas as pd
from inspect import getfullargspec
try:
    from photutils.aperture import aperture_photometry, CircularAperture
except:
    from photutils import aperture_photometry, CircularAperture
from scipy.interpolate import InterpolatedUnivariateSpline
from scipy import stats
from scipy.signal import savgol_filter
from skimage.draw import disk
from matplotlib import pyplot as plt
from ..fm import cube_inject_companions, frame_inject_companion, normalize_psf
from ..config import time_ini, timing
from ..config.utils_conf import vip_figsize, vip_figdpi
from ..var import frame_center, dist


# TODO: Include algo_class modifications in any tutorial using this function
[docs] def contrast_curve( cube, angle_list, psf_template, fwhm, pxscale, starphot, algo, sigma=5, nbranch=1, theta=0, inner_rad=1, fc_rad_sep=3, noise_sep=1, wedge=(0, 360), fc_snr=100, student=True, transmission=None, smooth=True, interp_order=2, plot=True, dpi=vip_figdpi, debug=False, verbose=True, full_output=False, save_plot=None, object_name=None, frame_size=None, fix_y_lim=(), figsize=vip_figsize, algo_class=None, **algo_dict, ): """Computes the contrast curve at a given confidence (``sigma``) level for an ADI cube or ADI+IFS cube. The contrast is calculated as sigma*noise/throughput. This implementation takes into account the small sample statistics correction proposed in [MAW14]_. Parameters ---------- cube : 3d or 4d numpy ndarray The input cube, 3d (ADI data) or 4d array (IFS data), without fake companions. angle_list : 1d numpy ndarray Vector with parallactic angles. psf_template : 2d or 3d numpy ndarray Frame with the psf template for the fake companion(s). PSF must be centered in array. Normalization is done internally. fwhm: int or float or 1d array, optional The the Full Width Half Maximum in pixels. It can handle a different FWHM value for different wavelengths (IFS data). pxscale : float Plate scale or pixel scale of the instrument (only used for plots) starphot : int or float or 1d array If int or float it corresponds to the aperture photometry of the non-coronagraphic PSF which we use to scale the contrast. If a vector is given it must contain the photometry correction for each frame. algo : callable or function The post-processing algorithm, e.g. vip_hci.pca.pca. sigma : int Sigma level for contrast calculation. Note this is a "Gaussian sigma" regardless of whether Student t correction is performed (set by the 'student' parameter). E.g. setting sigma to 5 will yield the contrast curve corresponding to a false alarm probability of 3e-7. nbranch : int, optional Number of branches on which to inject fakes companions. Each branch is tested individually. theta : float, optional Angle in degrees for rotating the position of the first branch that by default is located at zero degrees. Theta counts counterclockwise from the positive x axis. When working on a wedge, make sure that theta is located inside of it. inner_rad : int, optional Innermost radial distance to be considered in terms of FWHM. Should be >= 1. fc_rad_sep : int optional Radial separation between the injected companions (in each of the patterns) in FWHM. Must be large enough to avoid overlapping. With the maximum possible value, a single fake companion will be injected per cube and algorithm post-processing (which greatly affects computation time). noise_sep: int or None, optional Radial sampling of the noise level. By default it is performed with a radial step of 1 pixel. If set to None, it will be automatically set to be sampled every fwhm pixels radially. wedge : tuple of floats, optional Initial and Final angles for using a wedge. For example (-90,90) only considers the right side of an image. fc_snr: float optional Signal to noise ratio of injected fake companions (w.r.t a Gaussian distribution). student : bool, optional If True uses Student t correction to inject fake companion. transmission: numpy array, optional Radial transmission of the coronagraph, if any. Array with either 2 x n_rad or 1+n_ch x n_rad columns. The first column should contain the radial separation in pixels, while the other column(s) are the corresponding off-axis transmission (between 0 and 1), for either all, or each spectral channel (only relevant for a 4D input cube). smooth : bool, optional If True the radial noise curve is smoothed with a Savitzky-Golay filter of order 2. interp_order : int or None, optional If an integer is provided, the throughput vector is interpolated with a spline of order ``interp_order``. Takes values from 1 to 5. If None, then the throughput is not interpolated. plot : bool, optional Whether to plot the final contrast curve or not. True by default. dpi : int optional Dots per inch for the plots. 100 by default. 300 for printing quality. imlib : {'opencv', 'ndimage-fourier', 'ndimage-interp', 'vip-fft'}, str opt Library or method used for image operations (rotations). Opencv is the default for being the fastest. See description of `vip_hci.preproc.frame_rotate`. interpolation: str, opt See description of ``vip_hci.preproc.frame_rotate`` function debug : bool, optional Whether to print and plot additional info such as the noise, throughput, the contrast curve with different X axis and the delta magnitude instead of contrast. verbose : {True, False, 0, 1, 2}, optional If True or 1 the function prints to stdout intermediate info and timing, if set to 2 more output will be shown. full_output : bool, optional If True returns intermediate arrays. save_plot: string or None, optional If provided, the contrast curve will be saved to this path. object_name: string or None, optional Target name, used in the plot title. frame_size: int, optional Frame size used for generating the contrast curve, used in the plot title. fix_y_lim: tuple, optional If provided, the y axis limits will be fixed, for easier comparison between plots. fig_size: tuple, optional Figure size **algo_dict Any other valid parameter of the post-processing algorithms can be passed here, including e.g. imlib and interpolation. Returns ------- datafr : pandas dataframe Dataframe containing the sensitivity (Gaussian and Student corrected if Student parameter is True), the interpolated throughput, the distance in pixels, the noise and the sigma corrected (if Student is True). If full_output is True then the function returns: datafr, cube_fc_all, frame_fc_all, frame_nofc and fc_map_all. frame_fc_all : numpy ndarray 3d array with the 3 frames of the 3 (patterns) processed cubes with companions. frame_nofc : numpy ndarray 2d array, PCA processed frame without companions. fc_map_all : numpy ndarray 3d array with 3 frames containing the position of the companions in the 3 patterns. """ if cube.ndim != 3 and cube.ndim != 4: raise TypeError("The input array is not a 3d or 4d cube") if cube.ndim == 3 and (cube.shape[0] != angle_list.shape[0]): raise TypeError("Input parallactic angles vector has wrong length") if cube.ndim == 4 and (cube.shape[1] != angle_list.shape[0]): raise TypeError("Input parallactic angles vector has wrong length") if cube.ndim == 3 and psf_template.ndim != 2: raise TypeError("Template PSF is not a frame (for ADI case)") if cube.ndim == 4 and psf_template.ndim != 3: raise TypeError("Template PSF is not a cube (for ADI+IFS case)") if transmission is not None: if len(transmission) != 2 and len(transmission) != cube.shape[0] + 1: msg = "Wrong shape for transmission should be 2xn_rad or (nch+1) " msg += "x n_rad, instead of {}".format(transmission.shape) raise TypeError(msg) if isinstance(fwhm, (np.ndarray, list)): fwhm_med = np.median(fwhm) else: fwhm_med = fwhm if verbose: start_time = time_ini() if isinstance(starphot, float) or isinstance(starphot, int): msg0 = "ALGO : {}, FWHM = {}, # BRANCHES = {}, SIGMA = {}," msg0 += " STARPHOT = {}" print(msg0.format(algo.__name__, fwhm_med, nbranch, sigma, starphot)) else: msg0 = "ALGO : {}, FWHM = {}, # BRANCHES = {}, SIGMA = {}" print(msg0.format(algo.__name__, fwhm_med, nbranch, sigma)) # throughput verbose_thru = False if verbose == 2: verbose_thru = True res_throug = throughput( cube, angle_list, psf_template, fwhm, algo=algo, nbranch=nbranch, theta=theta, inner_rad=inner_rad, fc_rad_sep=fc_rad_sep, wedge=wedge, fc_snr=fc_snr, noise_sep=noise_sep, full_output=True, verbose=verbose_thru, algo_class=algo_class, **algo_dict, ) vector_radd = res_throug[3] if res_throug[0].shape[0] > 1: thruput_mean = np.nanmean(res_throug[0], axis=0) else: thruput_mean = res_throug[0][0] frame_fc_all = res_throug[4] frame_nofc = res_throug[5] fc_map_all = res_throug[6] if verbose: print("Finished the throughput calculation") timing(start_time) if transmission is not None: t_nz = transmission.shape[0] if transmission.ndim != 2: raise ValueError("transmission should be a 2D ndarray") elif t_nz != 2 and t_nz != 1 + cube.shape[0]: msg = "transmission dimensions should be (2,N) or (n_wave+1,N)" raise ValueError(msg) # if transmission doesn't have right format for interpolation, adapt it diag = np.sqrt(2) * cube.shape[-1] if transmission[0, 0] != 0 or transmission[0, -1] < diag: trans_rad_list = transmission[0].tolist() for j in range(t_nz - 1): trans_list = transmission[j + 1].tolist() # should have a zero point if transmission[0, 0] != 0: if j == 0: trans_rad_list = [0] + trans_rad_list trans_list = [0] + trans_list # last point should be max possible distance between fc and star if transmission[0, -1] < np.sqrt(2) * cube.shape[-1] / 2.0: if j == 0: trans_rad_list = trans_rad_list + [diag] trans_list = trans_list + [1] if j == 0: ntransmission = np.zeros([t_nz, len(trans_rad_list)]) ntransmission[0] = trans_rad_list ntransmission[j + 1] = trans_list transmission = ntransmission.copy() if t_nz > 2: # take the mean transmission over all wavelengths ntransmission = np.zeros([2, len(trans_rad_list)]) ntransmission[0] = transmission[0] ntransmission[1] = np.mean(transmission[1:], axis=0) transmission = ntransmission.copy() if interp_order is not None or noise_sep is not None: # noise measured in the empty frame with nosie_sep sampling if noise_sep is None: rad_samp = vector_radd noise_samp = res_throug[1] res_lev_samp = res_throug[2] else: # starting from 1*FWHM noise_samp, res_lev_samp, rad_samp = noise_per_annulus( frame_nofc, separation=noise_sep, fwhm=fwhm_med, init_rad=fwhm_med, wedge=wedge, ) radmin = vector_radd.astype(int).min() cutin1 = np.where(rad_samp.astype(int) == radmin)[0][0] noise_samp = noise_samp[cutin1:] res_lev_samp = res_lev_samp[cutin1:] rad_samp = rad_samp[cutin1:] # define max radius, ensuring it is > fwhm/2 from the outer image edge radmax_fwhm = ((cube.shape[-1]-1)//2)-fwhm_med/2 radmax = min(vector_radd.astype(int).max(), radmax_fwhm) cutin2 = np.where(rad_samp.astype(int) == radmax)[0][0] noise_samp = noise_samp[: cutin2 + 1] res_lev_samp = res_lev_samp[: cutin2 + 1] rad_samp = rad_samp[: cutin2 + 1] if interp_order is not None: # interpolating the throughput vector, spline order 2 f = InterpolatedUnivariateSpline( vector_radd, thruput_mean, k=interp_order) thruput_interp = f(rad_samp) else: thruput_interp = thruput_mean.copy() # interpolating the transmission vector, spline order 1 if transmission is not None: trans = transmission[1] radvec_trans = transmission[0] f2 = InterpolatedUnivariateSpline(radvec_trans, trans, k=1) trans_interp = f2(rad_samp) thruput_interp *= trans_interp else: rad_samp = vector_radd noise_samp = res_throug[1] res_lev_samp = res_throug[2] thruput_interp = thruput_mean if transmission is not None: if not transmission[1].shape == thruput_interp.shape[0]: msg = "Transmiss. and throughput vectors have different length" raise ValueError(msg) thruput_interp *= transmission[1] rad_samp_arcsec = rad_samp * pxscale # take abs value of the mean residual fluxes otherwise the more # oversubtraction (negative res_lev_samp), the better the contrast!! # res_lev_samp = np.abs(res_lev_samp) # correction (noticed in cases of oversubtraction) - to be confirmed: res_lev_samp = np.zeros_like(res_lev_samp) if smooth: # smoothing the noise vector using a Savitzky-Golay filter win = min(noise_samp.shape[0] - 2, int(2 * fwhm_med)) if win % 2 == 0: win += 1 noise_samp_sm = savgol_filter( noise_samp, polyorder=2, mode="nearest", window_length=win ) res_lev_samp_sm = savgol_filter( res_lev_samp, polyorder=2, mode="nearest", window_length=win ) else: noise_samp_sm = noise_samp res_lev_samp_sm = res_lev_samp # calculating the contrast if isinstance(starphot, float) or isinstance(starphot, int): cont_curve_samp = ( (sigma * noise_samp_sm + res_lev_samp_sm) / thruput_interp ) / starphot else: cont_curve_samp = ( (sigma * noise_samp_sm + res_lev_samp_sm) / thruput_interp ) / np.median(starphot) cont_curve_samp[np.where(cont_curve_samp < 0)] = 1 cont_curve_samp[np.where(cont_curve_samp > 1)] = 1 # calculating the Student corrected contrast if student: n_res_els = np.floor(rad_samp / fwhm_med * 2 * np.pi) ss_corr = np.sqrt(1 + 1 / n_res_els) sigma_corr = stats.t.ppf(stats.norm.cdf(sigma), n_res_els - 1) * ss_corr if isinstance(starphot, float) or isinstance(starphot, int): cont_curve_samp_corr = ( (sigma_corr * noise_samp_sm + res_lev_samp_sm) / thruput_interp ) / starphot else: cont_curve_samp_corr = ( (sigma_corr * noise_samp_sm + res_lev_samp_sm) / thruput_interp ) / np.median(starphot) cont_curve_samp_corr[np.where(cont_curve_samp_corr < 0)] = 1 cont_curve_samp_corr[np.where(cont_curve_samp_corr > 1)] = 1 if debug: plt.rc("savefig", dpi=dpi) plt.figure(figsize=figsize, dpi=dpi) # throughput plt.plot(vector_radd * pxscale, thruput_mean, ".", label="computed", alpha=0.6) plt.plot( rad_samp_arcsec, thruput_interp, ",-", label="interpolated", lw=2, alpha=0.5 ) plt.grid("on", which="both", alpha=0.2, linestyle="solid") plt.xlabel("Angular separation [arcsec]") plt.ylabel("Throughput") plt.legend(loc="best") plt.xlim(0, np.max(rad_samp * pxscale)) # noise plt.figure(figsize=figsize, dpi=dpi) plt.plot(rad_samp_arcsec, noise_samp, ".", label="computed", alpha=0.6) if smooth: plt.plot( rad_samp_arcsec, noise_samp_sm, ",-", label="noise smoothed", lw=2, alpha=0.5, ) plt.grid("on", alpha=0.2, linestyle="solid") plt.xlabel("Angular separation [arcsec]") plt.ylabel("Noise") plt.legend(loc="best") plt.xlim(0, np.max(rad_samp_arcsec)) # mean residual level plt.figure(figsize=figsize, dpi=dpi) plt.plot( rad_samp_arcsec, res_lev_samp, ".", label="computed residual level", alpha=0.6, ) if smooth: plt.plot( rad_samp_arcsec, res_lev_samp_sm, ",-", label="smoothed residual level", lw=2, alpha=0.5, ) plt.grid("on", alpha=0.2, linestyle="solid") plt.xlabel("Angular separation [arcsec]") plt.ylabel("Mean residual level") plt.legend(loc="best") plt.xlim(0, np.max(rad_samp_arcsec)) # plotting if plot or debug: if student: label = ["Sensitivity (Gaussian)", "Sensitivity (Student-t correction)"] else: label = ["Sensitivity (Gaussian)"] plt.rc("savefig", dpi=dpi) fig = plt.figure(figsize=figsize, dpi=dpi) ax1 = fig.add_subplot(111) (con1,) = ax1.plot( rad_samp_arcsec, cont_curve_samp, "-", alpha=0.2, lw=2, color="green" ) (con2,) = ax1.plot( rad_samp_arcsec, cont_curve_samp, ".", alpha=0.2, color="green" ) if student: (con3,) = ax1.plot( rad_samp_arcsec, cont_curve_samp_corr, "-", alpha=0.4, lw=2, color="blue", ) (con4,) = ax1.plot( rad_samp_arcsec, cont_curve_samp_corr, ".", alpha=0.4, color="blue" ) lege = [(con1, con2), (con3, con4)] else: lege = [(con1, con2)] plt.legend(lege, label, fancybox=True, fontsize="medium") plt.xlabel("Angular separation [arcsec]") plt.ylabel(str(sigma) + " sigma contrast") plt.grid("on", which="both", alpha=0.2, linestyle="solid") ax1.set_yscale("log") ax1.set_xlim(0, np.max(rad_samp_arcsec)) # Give a title to the contrast curve plot if object_name is not None and frame_size is not None: # Retrieve ncomp and pca_type info to use in title ncomp = algo_dict["ncomp"] if algo_dict["cube_ref"] is None: pca_type = "ADI" else: pca_type = "RDI" title = "{} {} {}pc {} + {}".format( pca_type, object_name, ncomp, frame_size, inner_rad ) plt.title(title, fontsize=14) # Option to fix the y-limit if len(fix_y_lim) == 2: min_y_lim = min(fix_y_lim[0], fix_y_lim[1]) max_y_lim = max(fix_y_lim[0], fix_y_lim[1]) ax1.set_ylim(min_y_lim, max_y_lim) # Optionally, save the figure to a path if save_plot is not None: fig.savefig(save_plot, dpi=dpi) if debug: fig2 = plt.figure(figsize=figsize, dpi=dpi) ax3 = fig2.add_subplot(111) cc_mags = -2.5 * np.log10(cont_curve_samp) (con4,) = ax3.plot( rad_samp_arcsec, cc_mags, "-", alpha=0.2, lw=2, color="green" ) (con5,) = ax3.plot(rad_samp_arcsec, cc_mags, ".", alpha=0.2, color="green") if student: cc_mags_corr = -2.5 * np.log10(cont_curve_samp_corr) (con6,) = ax3.plot( rad_samp_arcsec, cc_mags_corr, "-", alpha=0.4, lw=2, color="blue" ) (con7,) = ax3.plot( rad_samp_arcsec, cc_mags_corr, ".", alpha=0.4, color="blue" ) lege = [(con4, con5), (con6, con7)] else: lege = [(con4, con5)] plt.legend(lege, label, fancybox=True, fontsize="medium") plt.xlabel("Angular separation [arcsec]") plt.ylabel("Delta magnitude") plt.gca().invert_yaxis() plt.grid("on", which="both", alpha=0.2, linestyle="solid") ax3.set_xlim(0, np.max(rad_samp * pxscale)) ax4 = ax3.twiny() ax4.set_xlabel("Distance [pixels]") ax4.plot(rad_samp, cc_mags, "", alpha=0.0) ax4.set_xlim(0, np.max(rad_samp)) if student: datafr = pd.DataFrame( { "sensitivity_gaussian": cont_curve_samp, "sensitivity_student": cont_curve_samp_corr, "throughput": thruput_interp, "distance": rad_samp, "distance_arcsec": rad_samp_arcsec, "noise": noise_samp_sm, "residual_level": res_lev_samp_sm, "sigma corr": sigma_corr, } ) else: datafr = pd.DataFrame( { "sensitivity_gaussian": cont_curve_samp, "throughput": thruput_interp, "distance": rad_samp, "distance_arcsec": rad_samp_arcsec, "noise": noise_samp_sm, "residual_level": res_lev_samp_sm, } ) if full_output: return datafr, frame_fc_all, frame_nofc, fc_map_all else: return datafr
# TODO: Include algo_class modifications in any tutorial using this function
[docs] def throughput( cube, angle_list, psf_template, fwhm, algo, nbranch=1, theta=0, inner_rad=1, fc_rad_sep=3, wedge=(0, 360), fc_snr=100, noise_sep=1, full_output=False, verbose=True, algo_class=None, **algo_dict, ): """Measure the throughput for chosen algorithm and input dataset (ADI or\ ADI+mSDI). The final throughput is the average of the same procedure measured in ``nbranch`` azimutally equidistant branches. Parameters ---------- cube : 3d or 4d numpy ndarray The input cube, 3d (ADI data) or 4d array (IFS data), without fake companions. angle_list : 1d numpy ndarray Vector with parallactic angles. psf_template : 2d or 3d numpy ndarray Frame with the psf template for the fake companion(s). PSF must be centered in array. Normalization is done internally. fwhm: int or float or 1d array, optional The the Full Width Half Maximum in pixels. It can handle a different FWHM value for different wavelengths (IFS data). algo : callable or function The post-processing algorithm, e.g. vip_hci.pca.pca. Third party Python algorithms can be plugged here. They must have the parameters: 'cube', 'angle_list' and 'verbose'. Optionally a wrapper function can be used. nbranch : int optional Number of branches on which to inject fakes companions. Each branch is tested individually. theta : float, optional Angle in degrees for rotating the position of the first branch that by default is located at zero degrees. Theta counts counterclockwise from the positive x axis. inner_rad : int, optional Innermost radial distance to be considered in terms of FWHM. Should be >= 1. fc_rad_sep : int optional Radial separation between the injected companions (in each of the patterns) in FWHM. Must be large enough to avoid overlapping. With the maximum possible value, a single fake companion will be injected per cube and algorithm post-processing (which greatly affects computation time). wedge : tuple of floats, optional Initial and Final angles for using a wedge. For example (-90,90) only considers the right side of an image. fc_snr: float optional Signal to noise ratio of injected fake companions (w.r.t a Gaussian distribution). noise_sep: int or None, optional Radial sampling of the noise level. By default it is performed with a radial step of 1 pixel. If set to None, it will be automatically set to be sampled every fwhm pixels radially. full_output : bool, optional If True returns intermediate arrays. verbose : bool, optional If True prints out timing and information. **algo_dict Parameters of the post-processing algorithms can be passed here, including e.g. ``imlib``, ``interpolation`` or ``nproc``. Returns ------- thruput_arr : numpy ndarray 2d array whose rows are the annulus-wise throughput values for each branch. vector_radd : numpy ndarray 1d array with the distances in FWHM (the positions of the annuli). If full_output is True then the function returns: thruput_arr, noise, vector_radd, cube_fc_all, frame_fc_all, frame_nofc and fc_map_all. noise : numpy ndarray 1d array with the noise per annulus. frame_fc_all : numpy ndarray 3d array with the 3 frames of the 3 (patterns) processed cubes with companions. frame_nofc : numpy ndarray 2d array, PCA processed frame without companions. fc_map_all : numpy ndarray 3d array with 3 frames containing the position of the companions in the 3 patterns. """ array = cube parangles = angle_list nproc = algo_dict.get("nproc", 1) imlib = algo_dict.get("imlib", "vip-fft") interpolation = algo_dict.get("interpolation", "lanczos4") scaling = algo_dict.get("scaling", None) if array.ndim != 3 and array.ndim != 4: raise TypeError("The input array is not a 3d or 4d cube") else: if array.ndim == 3: if array.shape[0] != parangles.shape[0]: msg = "Input parallactic angles vector has wrong length" raise TypeError(msg) if psf_template.ndim != 2: raise TypeError("Template PSF is not a frame or 2d array") maxfcsep = int((array.shape[1] / 2.0) / fwhm) - 1 if fc_rad_sep < 3 or fc_rad_sep > maxfcsep: msg = "Too large separation between companions in the radial " msg += "patterns. Should lie between 3 and {}" raise ValueError(msg.format(maxfcsep)) elif array.ndim == 4: if array.shape[1] != parangles.shape[0]: msg = "Input vector or parallactic angles has wrong length" raise TypeError(msg) if psf_template.ndim != 3: raise TypeError("Template PSF is not a frame, 3d array") if "scale_list" not in algo_dict: raise ValueError("Vector of wavelength not found") else: if algo_dict["scale_list"].shape[0] != array.shape[0]: raise TypeError("Input wavelength vector has wrong length") if isinstance(fwhm, float) or isinstance(fwhm, int): maxfcsep = int((array.shape[2] / 2.0) / fwhm) - 1 else: maxfcsep = int((array.shape[2] / 2.0) / np.amin(fwhm)) - 1 if fc_rad_sep < 3 or fc_rad_sep > maxfcsep: msg = "Too large separation between companions in the " msg += "radial patterns. Should lie between 3 and {}" raise ValueError(msg.format(maxfcsep)) if psf_template.shape[1] % 2 == 0: raise ValueError("Only odd-sized PSF is accepted") if not hasattr(algo, "__call__"): raise TypeError("Parameter `algo` must be a callable function") if not isinstance(inner_rad, int): raise TypeError("inner_rad must be an integer") angular_range = wedge[1] - wedge[0] if nbranch > 1 and angular_range < 360: msg = "Only a single branch is allowed when working on a wedge" raise RuntimeError(msg) if isinstance(fwhm, (np.ndarray, list)): fwhm_med = np.median(fwhm) else: fwhm_med = fwhm if verbose: start_time = time_ini() # *************************************************************************** # Compute noise in concentric annuli on the "empty frame" # TODO: Clean below? # Consider 3 cases depending on whether algo is (i) defined externally, # (ii) a VIP postproc algorithm; (iii) ineligible for contrast curves argl = getfullargspec(algo).args if "cube" in argl and "angle_list" in argl and "verbose" in argl: # (i) external algorithm with appropriate parameters [OK] pass else: algo_name = algo.__name__ idx = algo.__module__.index('.', algo.__module__.index('.') + 1) mod = algo.__module__[:idx] tmp = __import__(mod, fromlist=[algo_name.upper()+'_Params']) algo_params = getattr(tmp, algo_name.upper()+'_Params') argl = [attr for attr in dir(algo_params)] if "cube" in argl and "angle_list" in argl and "verbose" in argl: # (ii) a VIP postproc algorithm [OK] pass else: # (iii) ineligible routine for contrast curves [Raise error] msg = "Ineligible algo for contrast curve function. algo should " msg += "have parameters 'cube', 'angle_list' and 'verbose'" raise TypeError(msg) if "cube" in argl and "angle_list" in argl and "verbose" in argl: if "fwhm" in argl: frame_nofc = algo(cube=array, angle_list=parangles, fwhm=fwhm_med, verbose=False, **algo_dict) if algo_dict.pop("scaling", None): new_algo_dict = algo_dict.copy() new_algo_dict["scaling"] = None frame_nofc_noscal = algo(cube=array, angle_list=parangles, fwhm=fwhm_med, verbose=False, **new_algo_dict) else: frame_nofc_noscal = frame_nofc else: frame_nofc = algo(cube=array, angle_list=parangles, verbose=False, **algo_dict) if algo_dict.pop("scaling", None): new_algo_dict = algo_dict.copy() new_algo_dict["scaling"] = None frame_nofc_noscal = algo(cube=array, angle_list=parangles, verbose=False, **new_algo_dict) else: frame_nofc_noscal = frame_nofc else: msg = "The algorithm used should take input parameters named 'cube', " msg += "'angle_list' and 'verbose'." raise ValueError(msg) if verbose: msg1 = "Cube without fake companions processed with {}" print(msg1.format(algo.__name__)) timing(start_time) if noise_sep is None: sep = fwhm_med else: sep = noise_sep noise, res_level, vector_radd = noise_per_annulus(frame_nofc, separation=sep, fwhm=fwhm_med, wedge=wedge) if scaling is not None: noise_noscal, _, _ = noise_per_annulus(frame_nofc_noscal, separation=sep, fwhm=fwhm_med, wedge=wedge) else: noise_noscal = noise.copy() vector_radd = vector_radd[inner_rad - 1:] noise = noise[inner_rad - 1:] res_level = res_level[inner_rad - 1:] noise_noscal = noise_noscal[inner_rad - 1:] if verbose: print("Measured annulus-wise noise in resulting frame") timing(start_time) # We crop the PSF and check if PSF has been normalized (so that flux in # 1*FWHM aperture = 1) and fix if needed new_psf_size = int(round(3 * fwhm_med)) if new_psf_size % 2 == 0: new_psf_size += 1 if cube.ndim == 3: n, y, x = array.shape psf_template = normalize_psf( psf_template, fwhm=fwhm, verbose=verbose, size=min(new_psf_size, psf_template.shape[1]), ) # Initialize the fake companions angle_branch = angular_range / nbranch thruput_arr = np.zeros((nbranch, noise.shape[0])) fc_map_all = np.zeros((nbranch * fc_rad_sep, y, x)) frame_fc_all = np.zeros((nbranch * fc_rad_sep, y, x)) cy, cx = frame_center(array[0]) # each branch is computed separately for br in range(nbranch): # each pattern is computed separately. For each one the companions # are separated by "fc_rad_sep * fwhm", interleaving the injections for irad in range(fc_rad_sep): radvec = vector_radd[irad::fc_rad_sep] cube_fc = array.copy() # filling map with small numbers fc_map = np.ones_like(array[0]) * 1e-6 fcy = [] fcx = [] for i in range(radvec.shape[0]): flux = fc_snr * noise_noscal[irad + i * fc_rad_sep] cube_fc = cube_inject_companions( cube_fc, psf_template, parangles, flux, rad_dists=[radvec[i]], theta=br * angle_branch + theta, nproc=nproc, imlib=imlib, interpolation=interpolation, verbose=False, ) y = cy + radvec[i] * \ np.sin(np.deg2rad(br * angle_branch + theta)) x = cx + radvec[i] * \ np.cos(np.deg2rad(br * angle_branch + theta)) fc_map = frame_inject_companion( fc_map, psf_template, y, x, flux, imlib, interpolation ) fcy.append(y) fcx.append(x) if verbose: msg2 = "Fake companions injected in branch {} " msg2 += "(pattern {}/{})" print(msg2.format(br + 1, irad + 1, fc_rad_sep)) timing(start_time) # *************************************************************** # TODO: Clean below? # Consider 3 cases depending on whether algo is # (i) defined externally, # (ii) a VIP postproc algorithm; # (iii) ineligible for contrast curves ar = getfullargspec(algo).args if "cube" in ar and "angle_list" in ar and "verbose" in ar: # (i) external algorithm with appropriate parameters [OK] pass else: algo_name = algo.__name__ idx = algo.__module__.index( '.', algo.__module__.index('.') + 1) mod = algo.__module__[:idx] tmp = __import__( mod, fromlist=[algo_name.upper()+'_Params']) algo_params = getattr(tmp, algo_name.upper()+'_Params') ar = [attr for attr in dir(algo_params)] if "cube" in ar and "angle_list" in ar and "verbose" in ar: # (ii) a VIP postproc algorithm [OK] pass else: # (iii) ineligible routine [Raise error] msg = "Ineligible algo for contrast curve function. " msg += "algo should have parameters 'cube', " msg += "'angle_list' and 'verbose'" raise TypeError(msg) if "fwhm" in ar: frame_fc = algo( cube=cube_fc, angle_list=parangles, fwhm=fwhm_med, verbose=False, **algo_dict, ) else: frame_fc = algo( cube=cube_fc, angle_list=parangles, verbose=False, **algo_dict, ) if verbose: msg3 = "Cube with fake companions processed with {}" msg3 += "\nMeasuring its annulus-wise throughput" print(msg3.format(algo.__name__)) timing(start_time) # ************************************************************** injected_flux = aperture_flux(fc_map, fcy, fcx, fwhm_med) recovered_flux = aperture_flux( (frame_fc - frame_nofc), fcy, fcx, fwhm_med ) thruput = recovered_flux / injected_flux thruput[np.where(thruput < 0)] = 0 thruput_arr[br, irad::fc_rad_sep] = thruput fc_map_all[br * fc_rad_sep + irad, :, :] = fc_map frame_fc_all[br * fc_rad_sep + irad, :, :] = frame_fc elif cube.ndim == 4: w, n, y, x = array.shape if isinstance(fwhm, (int, float)): fwhm = [fwhm] * w psf_template = normalize_psf( psf_template, fwhm=fwhm, verbose=verbose, size=min(new_psf_size, psf_template.shape[1]), ) # Initialize the fake companions angle_branch = angular_range / nbranch thruput_arr = np.zeros((nbranch, noise.shape[0])) fc_map_all = np.zeros((nbranch * fc_rad_sep, w, y, x)) frame_fc_all = np.zeros((nbranch * fc_rad_sep, y, x)) cy, cx = frame_center(array[0, 0]) # each branch is computed separately for br in range(nbranch): # each pattern is computed separately. For each pattern the # companions are separated by "fc_rad_sep * fwhm" # radius = vector_radd[irad::fc_rad_sep] for irad in range(fc_rad_sep): radvec = vector_radd[irad::fc_rad_sep] thetavec = range(int(theta), int(theta) + 360, 360 // len(radvec)) cube_fc = array.copy() # filling map with small numbers fc_map = np.ones_like(array[:, 0]) * 1e-6 fcy = [] fcx = [] for i in range(radvec.shape[0]): flux = fc_snr * noise_noscal[irad + i * fc_rad_sep] cube_fc = cube_inject_companions( cube_fc, psf_template, parangles, flux, rad_dists=[radvec[i]], theta=thetavec[i], verbose=False, imlib=imlib, interpolation=interpolation, ) y = cy + radvec[i] * np.sin( np.deg2rad(br * angle_branch + thetavec[i]) ) x = cx + radvec[i] * np.cos( np.deg2rad(br * angle_branch + thetavec[i]) ) fc_map = frame_inject_companion( fc_map, psf_template, y, x, flux) fcy.append(y) fcx.append(x) if verbose: msg2 = "Fake companions injected in branch {} " msg2 += "(pattern {}/{})" print(msg2.format(br + 1, irad + 1, fc_rad_sep)) timing(start_time) # ************************************************************** # TODO: Clean below? # Consider 3 cases depending on whether algo is # (i) defined externally, # (ii) a VIP postproc algorithm; # (iii) ineligible for contrast curves ar = getfullargspec(algo).args if "cube" in ar and "angle_list" in ar and "verbose" in ar: # (i) external algorithm with appropriate parameters [OK] pass else: algo_name = algo.__name__ idx = algo.__module__.index( '.', algo.__module__.index('.') + 1) mod = algo.__module__[:idx] tmp = __import__( mod, fromlist=[algo_name.upper()+'_Params']) algo_params = getattr(tmp, algo_name.upper()+'_Params') ar = [attr for attr in dir(algo_params)] if "cube" in ar and "angle_list" in ar and "verbose" in ar: # (ii) a VIP postproc algorithm [OK] pass else: # (iii) ineligible routine [Raise error] msg = "Ineligible algo for contrast curve function. " msg += "algo should have parameters 'cube', " msg += "'angle_list' and 'verbose'" raise TypeError(msg) if "fwhm" in ar: frame_fc = algo( cube=cube_fc, angle_list=parangles, fwhm=fwhm_med, verbose=False, **algo_dict, ) else: frame_fc = algo( cube=cube_fc, angle_list=parangles, verbose=False, **algo_dict, ) if verbose: msg3 = "Cube with fake companions processed with {}" msg3 += "\nMeasuring its annulus-wise throughput" print(msg3.format(algo.__name__)) timing(start_time) # ************************************************************* injected_flux = [ aperture_flux(fc_map[i], fcy, fcx, fwhm[i]) for i in range(array.shape[0]) ] injected_flux = np.mean(injected_flux, axis=0) recovered_flux = aperture_flux( (frame_fc - frame_nofc), fcy, fcx, fwhm_med ) thruput = recovered_flux / injected_flux thruput[np.where(thruput < 0)] = 0 thruput_arr[br, irad::fc_rad_sep] = thruput fc_map_all[br * fc_rad_sep + irad, :, :] = fc_map frame_fc_all[br * fc_rad_sep + irad, :, :] = frame_fc if verbose: msg = "Finished measuring the throughput in {} branches" print(msg.format(nbranch)) timing(start_time) if full_output: return ( thruput_arr, noise, res_level, vector_radd, frame_fc_all, frame_nofc, fc_map_all, ) else: return thruput_arr, vector_radd
[docs] def noise_per_annulus( array, separation, fwhm, init_rad=None, wedge=(0, 360), verbose=False, debug=False ): """Measure the noise and mean residual level as the standard deviation\ and mean, respectively, of apertures defined in each annulus with a given\ separation. The annuli start at init_rad (= fwhm by default if not provided) and stop 2*separation before the edge of the frame. Parameters ---------- array : numpy ndarray Input frame. separation : float Separation in pixels of the centers of the annuli measured from the center of the frame. fwhm : float FWHM in pixels. init_rad : float Initial radial distance to be used. If None then the init_rad = FWHM. wedge : tuple of floats, optional Initial and Final angles for using a wedge. For example (-90,90) only considers the right side of an image. Be careful when using small wedges, this leads to computing a standard deviation of very small samples (<10 values). verbose : bool, optional If True prints information. debug : bool, optional If True plots the positioning of the apertures. Returns ------- noise : numpy ndarray Vector with the noise value per annulus. res_level : numpy ndarray Vector with the mean residual level per annulus. vector_radd : numpy ndarray Vector with the radial distances values. """ def find_coords(rad, sep, init_angle, fin_angle): angular_range = fin_angle - init_angle npoints = (np.deg2rad(angular_range) * rad) / sep # (2*np.pi*rad)/sep ang_step = angular_range / npoints # 360/npoints x = [] y = [] for i in range(int(npoints)): newx = rad * np.cos(np.deg2rad(ang_step * i + init_angle)) newy = rad * np.sin(np.deg2rad(ang_step * i + init_angle)) x.append(newx) y.append(newy) return np.array(y), np.array(x) ### if array.ndim != 2: raise TypeError("Input array is not a frame or 2d array") if not isinstance(wedge, tuple): raise TypeError( "Wedge must be a tuple with the initial and final " "angles") if init_rad is None: init_rad = fwhm init_angle, fin_angle = wedge centery, centerx = frame_center(array) n_annuli = int(np.floor((centery - init_rad) / separation)) - 1 noise = [] res_level = [] vector_radd = [] if verbose: print("{} annuli".format(n_annuli)) if debug: _, ax = plt.subplots(figsize=(6, 6)) ax.imshow( array, origin="lower", interpolation="nearest", alpha=0.5, cmap="gray" ) for i in range(n_annuli): y = centery + init_rad + separation * i rad = dist(centery, centerx, y, centerx) yy, xx = find_coords(rad, fwhm, init_angle, fin_angle) yy += centery xx += centerx apertures = CircularAperture(np.array((xx, yy)).T, fwhm / 2) fluxes = aperture_photometry(array, apertures) fluxes = np.array(fluxes["aperture_sum"]) noise_ann = np.std(fluxes) mean_ann = np.mean(fluxes) noise.append(noise_ann) res_level.append(mean_ann) vector_radd.append(rad) if debug: for j in range(xx.shape[0]): # Circle takes coordinates as (X,Y) aper = plt.Circle( (xx[j], yy[j]), radius=fwhm / 2, color="r", fill=False, alpha=0.8 ) ax.add_patch(aper) cent = plt.Circle( (xx[j], yy[j]), radius=0.8, color="r", fill=True, alpha=0.5 ) ax.add_patch(cent) if verbose: print("Radius(px) = {}, Noise = {:.3f} ".format(rad, noise_ann)) return np.array(noise), np.array(res_level), np.array(vector_radd)
[docs] def aperture_flux(array, yc, xc, fwhm, ap_factor=1, mean=False, verbose=False): """Return the sum of pixel values in a circular aperture centered on the\ input coordinates. The radius of the aperture is set as (ap_factor*fwhm)/2. Parameters ---------- array : numpy ndarray Input frame. yc, xc : list or 1d arrays List of y and x coordinates of sources. fwhm : float FWHM in pixels. ap_factor : int, optional Diameter of aperture in terms of the FWHM. Returns ------- flux : list of floats List of fluxes. Note ---- From Photutils documentation, the aperture photometry defines the aperture using one of 3 methods: 'center': A pixel is considered to be entirely in or out of the aperture depending on whether its center is in or out of the aperture. 'subpixel': A pixel is divided into subpixels and the center of each subpixel is tested (as above). 'exact': (default) The exact overlap between the aperture and each pixel is calculated. """ n_obj = len(yc) flux = np.zeros((n_obj)) for i, (y, x) in enumerate(zip(yc, xc)): if mean: ind = disk((y, x), (ap_factor * fwhm) / 2) values = array[ind] obj_flux = np.mean(values) else: aper = CircularAperture((x, y), (ap_factor * fwhm) / 2) obj_flux = aperture_photometry(array, aper, method="exact") obj_flux = np.array(obj_flux["aperture_sum"]) flux[i] = obj_flux if verbose: print("Coordinates of object {} : ({},{})".format(i, y, x)) print("Object Flux = {:.2f}".format(flux[i])) return flux