Source code for vip_hci.invprob.andromeda

"""
Implementation of the ANDROMEDA algorithm from [MUG09]_ / [CAN15]_.

Based on ANDROMEDA v3.1 from 28/06/2018.

.. [MUG09]
   | Mugnier et al, 2009
   | **Optimal method for exoplanet detection by angular differential imaging**
   | *J. Opt. Soc. Am. A, 26(6), 1326-1334*
   | `doi:10.1364/JOSAA.26.001326 <http://doi.org/10.1364/JOSAA.26.001326>`_

.. [CAN15]
   | Cantalloube et al, 2015
   | **Direct exoplanet detection and characterization using the ANDROMEDA
     method: Performance on VLT/NaCo data**
   | *A&A, Volume 582, p. 89*
   | `https://arxiv.org/abs/1508.06406
     <https://arxiv.org/abs/1508.06406>`_

"""

__author__ = "Thomas Bédrine, Ralf Farkas"
__all__ = ["andromeda", "ANDROMEDA_Params"]

import numpy as np
from dataclasses import dataclass
from enum import Enum

from typing import Union, List

from ..config.utils_param import setup_parameters, separate_kwargs_dict
from ..config.utils_conf import pool_map, iterable
from ..config.paramenum import OptMethod, ALGO_KEY
from ..var.filters import frame_filter_highpass, cube_filter_highpass
from ..var import dist_matrix

from .utils_andro import (
    calc_psf_shift_subpix,
    fitaffine,
    idl_round,
    idl_where,
    robust_std,
    subpixel_shift,
)

global CUBE


[docs] @dataclass class ANDROMEDA_Params: """ Set of parameters for the ANDROMEDA algorithm. See function `andromeda` below for the documentation. """ cube: np.ndarray = None oversampling_fact: float = None angle_list: np.ndarray = None psf: np.ndarray = None filtering_fraction: float = 0.25 min_sep: float = 0.5 annuli_width: float = 1.0 roa: float = 2 opt_method: Enum = OptMethod.LSQ nsmooth_snr: int = 18 iwa: float = None owa: float = None precision: int = 50 fast: Union[float, bool] = False homogeneous_variance: bool = True ditimg: float = 1.0 ditpsf: float = None tnd: float = 1.0 total: bool = False multiply_gamma: bool = True nproc: int = 1 verbose: bool = False
[docs] def andromeda(*all_args: List, **all_kwargs: dict): r""" Exoplanet detection in ADI sequences by maximum-likelihood approach. This is as implemented in [CAN15]_, itself inspired by the framework presented in [MUG09]_. Parameters ---------- all_args: list, optional Positionnal arguments for the andromeda algorithm. Full list of parameters below. all_kwargs: dictionary, optional Mix of keyword arguments that can initialize a AndroParams or a AndroParams itself. Andromeda parameters ---------- cube : 3d numpy ndarray Input cube. IDL parameter: ``IMAGES_1_INPUT`` oversampling_fact : float Oversampling factor for the wavelength corresponding to the filter used to obtain ``cube`` (defined as the ratio between the wavelength of the filter and the Shannon wavelength). Usually above 1 and below 3. Note that in ANDROMEDA everything is coded in lambda/D unit so this is an important parameter. See Note for example calculation. IDL parameter: ``OVERSAMPLING_1_INPUT`` angle_list : numpy ndarray List of parallactic angles associated with each frame in ``cube``. Note that, compared to the IDL version, the PA convention is different: If you would pass ``[1,2,3]`` to the IDL version, you should pass ``[-1, -2, -3]`` to this function to obtain the same results. IDL parameter: ``- ANGLES_INPUT`` psf : 2d numpy ndarray The experimental PSF used to model the planet signature in the subtracted images. This PSF is usually a non-coronographic or saturated observation of the target star. IDL parameter: ``PSF_PLANET_INPUT`` filtering_fraction : float, optional Strength of the high-pass filter. If set to ``1``, no high-pass filter is used. IDL parameter: ``FILTERING_FRACTION_INPUT`` min_sep : float, optional Angular separation is assured to be above ``min_sep*lambda/D``. IDL parameter: ``MINIMUM_SEPARATION_INPUT`` annuli_width : float, optional Annuli width on which the subtraction are performed. The same for all annuli. IDL parameter: ``ANNULI_WIDTH_INPUT`` roa : float, optional Ratio of the optimization area. The optimization annulus area is defined by ``roa * annuli_width``. ``roa`` is forced to ``1`` when ``opt_method="no"`` is chosen. IDL parameter: ``RATIO_OPT_AREA_INPUT`` opt_method : {'no', 'total', 'lsq', 'robust'}, optional Method used to balance for the flux difference that exists between the two subtracted annuli in an optimal way during ADI. IDL parameter: ``OPT_METHOD_ANG_INPUT`` nsmooth_snr : int, optional Number of pixels over which the radial robust standard deviation profile of the SNR map is smoothed to provide a global trend for the SNR map normalization. For ``nsmooth_snr=0`` the SNR map normalization is disabled. IDL parameter: ``NSMOOTH_SNR_INPUT`` iwa : float or None, optional Inner working angle / inner radius of the first annulus taken into account, expressed in ``lambda/D``. If ``None``, it is chosen automatically between the values ``0.5``, ``4`` or ``0.25``. IDL parameter: ``IWA_INPUT`` owa : float, optional Outer working angle / **inner** radius of the last annulus, expressed in ``lambda/D``. If ``None``, the value is automatically chosen based on the frame size. IDL parameter: ``OWA_INPUT`` precision : int, optional Number of shifts applied to the PSF. Passed to ``calc_psf_shift_subpix`` , which then creates a 4D cube with shape (precision+1, precision+1, N, N). IDL parameter: ``PRECISION_INPUT`` fast : float or bool, optional Size of the annuli from which the speckle noise should not be dominant anymore, in multiples of ``lambda/D``. If ``True``, a value of ``20 lambda/D`` is used, ``False`` (the default) disables the fast mode entirely. Above this threshold, the annuli width is set to ``4*annuli_width``. IDL parameter: ``FAST`` homogeneous_variance : bool, optional If set, variance is treated as homogeneous and is calculated as a mean of variance in each position through time. IDL parameter: ``HOMOGENEOUS_VARIANCE_INPUT`` ditimg : float, optional DIT for images (in sec) IDL Parameter: ``DITIMG_INPUT`` ditpsf : float or None, optional DIT for PSF (in sec) IDL Parameter: ``DITPSF_INPUT`` If set to ``None``, the value of ``ditimg`` is used. tnd : float, optional Neutral Density Transmission. IDL parameter: ``TND_INPUT`` total : bool, optional ``total=True`` is the old behaviour (normalizing the PSF to its sum). IDL parameter: ``TOTAL`` (was ``MAX`` in previous releases). multiply_gamma : bool, optional Use gamma for signature computation too. IDL parameter: ``MULTIPLY_GAMMA_INPUT`` nproc : int, optional Number of processes to use. verbose : bool, optional Print some parameter values for control. IDL parameter: ``VERBOSE`` Returns ------- contrast : 2d ndarray Calculated contrast map. (IDL return value) snr : 2d ndarray Signal to noise ratio map (defined as the estimated contrast divided by the estimated standard deviation of the contrast). IDL parameter: ``SNR_OUTPUT`` snr_norm : 2d ndarray IDL parameter: ``SNR_NORM_OUTPUT`` stdcontrast : 2d ndarray Map of the estimated standard deviation of the contrast. IDL parameter: `STDDEVCONTRAST_OUTPUT`` (previously ``STDEVFLUX_OUTPUT``) stdcontrast_norm : 2d ndarray Map of the estimated normalized standard deviation of the contrast. IDL parameter: ``STDDEVCONTRAST_NORM_OUTPUT`` likelihood : 2d ndarray likelihood IDL parameter: ``LIKELIHOOD_OUTPUT`` ext_radius : float Edge of the SNR map. Slightly decreased due to the normalization procedure. Useful to a posteriori reject potential companions that are too close to the edge to be analyzed. IDL parameter: ``EXT_RADIUS_OUTPUT`` Notes ----- IDL outputs: - SNR_OUTPUT - SNR_NORM_OUTPUT - LIKELIHOOD_OUTPUT - STDDEVCONTRAST_OUTPUT (was STDEVFLUX_OUTPUT) - STDDEVCONTRAST_NORM_OUTPUT The following IDL parameters were not implemented: - SDI-related parameters - IMAGES_2_INPUT - OVERSAMPLING_2_INPUT - OPT_METHOD_SPEC_INPUT - ROTOFF_INPUT - recentering (should be done in VIP before): - COORD_CENTRE_1_INPUT - COORD_CENTRE_2_INPUT - debug/expert testing testing - INDEX_NEG_INPUT - INDEX_POS_INPUT - ANNULI_LIMITS_INPUT - other - DISPLAY - VERSION - HELP - return parameters - IMAGES_1_CENTRED_OUTPUT - IMAGES_2_RESCALED_OUTPUT - VARIANCE_1_CENTRED_OUTPUT - VARIANCE_2_RESCALED_OUTPUT - GAMMA_INFO_OUTPUT - variances (VARIANCE_1_INPUT, VARIANCE_2_INPUT) Note ---- The oversampling factor can be computed as:\ :math:`oversampling = plsc_{NYQ} / plsc` , where:\ :math:`plsc = 12.25` [mas/px] for SPHERE/IRDIS\ :math:`plsc_{NYQ} = (0.5*lambda/diam_tel)/pi*180*3600*1e3` [mas/px]\ lambda = 3.8e-6 : Imaging wavelength [m]\ diam_tel = 8.0 : Telescope diameter [m] """ class_params, other_options = separate_kwargs_dict( initial_kwargs=all_kwargs, parent_class=ANDROMEDA_Params ) # Extracting the object of parameters (if any) algo_params = None if ALGO_KEY in other_options.keys(): algo_params = other_options[ALGO_KEY] del other_options[ALGO_KEY] if algo_params is None: algo_params = ANDROMEDA_Params(*all_args, **class_params) def info(msg, *fmt, **kwfmt): if algo_params.verbose: print(msg.format(*fmt, **kwfmt)) def info2(msg, *fmt, **kwfmt): if algo_params.verbose == 2: print(msg.format(*fmt, **kwfmt)) global CUBE # assigned after high-pass filter # ===== verify input # the andromeda algorithm handles PAs differently from the other algos in # VIP. This normalizes the API: algo_params.angle_list = -algo_params.angle_list andro_cube = np.zeros_like(algo_params.cube) if andro_cube.shape[-1] % 2 == 1: # shift and crop for idx, img in enumerate(algo_params.cube): andro_cube[idx] = subpixel_shift(img, 0.5, 0.5) andro_cube = andro_cube[:, 1:, 1:] else: # shifting due to new VIP convention for even-sized images for idx, img in enumerate(algo_params.cube): andro_cube[idx] = subpixel_shift(img, -0.5, -0.5) if algo_params.psf.shape[0] % 2 == 1: # shift and crop algo_params.psf = subpixel_shift(algo_params.psf, 0.5, 0.5) algo_params.psf = algo_params.psf[1:, 1:] else: # shifting due to new VIP convention for even-sized images algo_params.psf = subpixel_shift(algo_params.psf, -0.5, -0.5) if algo_params.filtering_fraction > 1 or algo_params.filtering_fraction < 0: raise ValueError("``filtering_fraction`` must be between 0 and 1") frames, npix, _ = andro_cube.shape npixpsf, _ = algo_params.psf.shape # ===== set default parameters: if algo_params.opt_method != "no": if algo_params.roa < 1: raise ValueError( "The optimization to subtraction area ``roa`` " "must be >= 1" ) else: algo_params.roa = 1 if algo_params.iwa is None: for test_iwa in [0.5, 4, 0.25]: # keep first IWA which produces frame pairs test_ang = 2 * np.arcsin(algo_params.min_sep / (2 * test_iwa)) * 180 / np.pi test_id, _, _ = create_indices( algo_params.angle_list, angmin=test_ang) if test_id is not None: # pairs found break algo_params.iwa = test_iwa info("iwa automatically set to {}*lambda/D", algo_params.iwa) if algo_params.owa is None: algo_params.owa = (npix / 2 - npixpsf / 2) / \ (2 * algo_params.oversampling_fact) info("owa automatically set to {} (based on frame size)", algo_params.owa) else: # radius of the last annulus taken into account for process [lambda/D]: algo_params.owa -= (npixpsf / 2) / (2 * algo_params.oversampling_fact) if algo_params.owa <= algo_params.iwa - algo_params.annuli_width: raise ValueError("You must increase `owa` or decrease `iwa`") if algo_params.fast is False: pass elif algo_params.fast is True: # IDL: IF fast EQ 1.0 algo_params.fast = 20 # [lambda/D] if algo_params.owa > algo_params.fast: dmean = algo_params.fast else: algo_params.fast = 0 if algo_params.iwa > algo_params.fast: dmean = algo_params.owa else: if algo_params.owa > algo_params.fast: dmean = algo_params.fast else: algo_params.fast = 0 if algo_params.fast: info( "annuli_width is set to {} from {} lambda/D", 4 * algo_params.annuli_width, dmean, ) # contrast maps: if algo_params.ditpsf is None: algo_params.ditpsf = algo_params.ditimg if np.asarray(algo_params.tnd).ndim == 0: # int or float info2("Throughput map: Homogeneous transmission: {}%", algo_params.tnd * 100) else: # TODO: test if really 2d map? info2("Throughput map: Inhomogeneous 2D throughput map given.") if algo_params.nsmooth_snr != 0 and algo_params.nsmooth_snr < 2: raise ValueError("`nsmooth_snr` must be >= 2") # ===== info output if algo_params.filtering_fraction == 1: info("No high-pass pre-filtering of the images!") # ===== initialize output flux = np.zeros_like(andro_cube[0]) snr = np.zeros_like(andro_cube[0]) likelihood = np.zeros_like(andro_cube[0]) stdflux = np.zeros_like(andro_cube[0]) # ===== pre-processing # normalization... if algo_params.total: psf_scale_factor = np.sum(algo_params.psf) else: psf_scale_factor = np.max(algo_params.psf) # creates new array in memory (prevent overwriting of input parameters) algo_params.psf = algo_params.psf / psf_scale_factor # ...and spatial filterin on the PSF: if algo_params.filtering_fraction != 1: algo_params.psf = frame_filter_highpass( algo_params.psf, "hann", hann_cutoff=algo_params.filtering_fraction ) # library of all different PSF positions psf_cube = calc_psf_shift_subpix( algo_params.psf, precision=algo_params.precision) # spatial filtering of the preprocessed image-cubes: if algo_params.filtering_fraction != 1: if algo_params.verbose: print( "Pre-processing filtering of the images and the PSF: " "done! F={}".format(algo_params.filtering_fraction) ) andro_cube = cube_filter_highpass( andro_cube, mode="hann", hann_cutoff=algo_params.filtering_fraction, verbose=algo_params.verbose, ) CUBE = andro_cube # definition of the width of each annuli (to perform ADI) dmin = algo_params.iwa # size of the lowest annuli, in lambda/D dmax = algo_params.owa # size of the greatest annuli, in lambda/D if algo_params.fast: first_distarray = ( dmin + np.arange( int(np.round(np.abs(dmean - dmin - 1)) / algo_params.annuli_width + 1), dtype=float, ) * algo_params.annuli_width ) second_distarray = ( dmean + dmin - 1 + np.arange( int(np.round(dmax - dmean) / (4 * algo_params.annuli_width) + 1), dtype=float, ) * 4 * algo_params.annuli_width ) distarray_lambdaonD = np.hstack([first_distarray, second_distarray]) if algo_params.iwa > algo_params.fast: distarray_lambdaonD = first_distarray if distarray_lambdaonD[-1] > dmax: distarray_lambdaonD[-1] = dmax annuli_limits = ( algo_params.oversampling_fact * 2 * distarray_lambdaonD ) # in pixels else: distarray_lambdaonD = ( dmin + np.arange( int(np.round(dmax - dmin) / algo_params.annuli_width + 1), dtype=float ) * algo_params.annuli_width ) distarray_lambdaonD[-1] = dmax annuli_limits = np.floor( algo_params.oversampling_fact * 2 * distarray_lambdaonD ).astype(int) while dmax * (2 * algo_params.oversampling_fact) < annuli_limits[-1]: # remove last element: annuli_limits = annuli_limits[:-1] # view, not a copy! annuli_number = len(annuli_limits) - 1 infomsg = "Using these user parameters, {} annuli will be processed, from a " infomsg += "separation of {} to {} pixels." info(infomsg, annuli_number, annuli_limits[0], annuli_limits[-1]) # ===== main loop add_params = { "i": iterable(range(annuli_number)[::-1]), "annuli_limits": annuli_limits, "psf_cube": psf_cube, } func_params = setup_parameters( params_obj=algo_params, fkt=_process_annulus, as_list=True, show_params=False, **add_params, ) res_all = pool_map( algo_params.nproc, _process_annulus, # start with outer annuli, they take longer: *func_params, msg="annulus", leave=False, verbose=False, ) for res in res_all: if res is None: continue flux += res[0] snr += res[1] likelihood += res[2] stdflux += res[3] # translating into contrast: # flux_factor: float or 2d array, depending on tnd factor = 1 / psf_scale_factor flux_factor = factor * algo_params.tnd * \ (algo_params.ditpsf / algo_params.ditimg) if algo_params.verbose: print("", "psf_scale_factor:", psf_scale_factor, "") print("", "tnd:", algo_params.tnd, "") print("", "ditpsf:", algo_params.ditpsf, "") print("", "ditimg:", algo_params.ditimg, "") print("", "flux_factor:", flux_factor, "") # post-processing of the output: if algo_params.nsmooth_snr != 0: if algo_params.verbose: print("Normalizing SNR...") # normalize snr map by its radial robust std: snr_norm, snr_std = normalize_snr( snr, nsmooth_snr=algo_params.nsmooth_snr, fast=algo_params.fast ) # normalization of the std of the flux (same way): stdflux_norm = np.zeros((npix, npix)) zone = snr_std != 0 stdflux_norm[zone] = stdflux[zone] * snr_std[zone] ext_radius = annuli_limits[annuli_number - 1] / ( 2 * algo_params.oversampling_fact ) # TODO: return value handling should be improved. return ( flux * flux_factor, # IDL RETURN snr, # snr_output snr_norm, # snr_norm_output stdflux * flux_factor, # IDL stddevcontrast_output stdflux_norm * flux_factor, # IDL stddevcontrast_norm_output likelihood, # IDL likelihood_output ext_radius, ) # IDL ext_radius_output, [lambda/D] # previous return values: # return flux, snr_norm, likelihood, stdflux_norm, ext_radius else: ext_radius = np.floor(annuli_limits[annuli_number]) / ( 2 * algo_params.oversampling_fact ) return ( flux * flux_factor, # IDL RETURN snr, # snr_output snr, # snr_norm_output stdflux * flux_factor, # IDL stddevcontrast_output stdflux * flux_factor, # IDL stddevcontrast_norm_output likelihood, # IDL likelihood_output ext_radius, ) # IDL ext_radius_output [lambda/D]
def _process_annulus( i, annuli_limits, roa, min_sep, oversampling_fact, angle_list, opt_method, multiply_gamma, psf_cube, homogeneous_variance, verbose=False, ): """ Process one single annulus, with diff_images and andromeda_core. Parameters ---------- i : int Number of the annulus **kwargs Returns ------- res : tuple The result of ``andromeda_core``, on the specific annulus. """ global CUBE rhomin = annuli_limits[i] rhomax = annuli_limits[i + 1] rhomax_opt = np.sqrt(roa * rhomax**2 - (roa - 1) * rhomin**2) # compute indices from min_sep if verbose: print(" Pairing frames...") min_sep_pix = min_sep * oversampling_fact * 2 angmin = 2 * np.arcsin(min_sep_pix / (2 * rhomin)) * 180 / np.pi index_neg, index_pos, indices_not_used = create_indices(angle_list, angmin) if len(indices_not_used) != 0: if verbose: print( " WARNING: {} frame(s) cannot be used because it wasn't " "possible to find any other frame to couple with them. " "Their indices are: {}".format( len(indices_not_used), indices_not_used) ) max_sep_pix = ( 2 * rhomin * np.sin(np.deg2rad((max(angle_list) - min(angle_list)) / 4)) ) max_sep_ld = max_sep_pix / (2 * oversampling_fact) if verbose: msg = " For all frames to be used in this annulus, the minimum" msg += " separation must be set at most to {} *lambda/D " msg += "(corresponding to {} pixels)." print(msg.format(max_sep_ld, max_sep_pix)) if index_neg is None: if verbose: msg = " Warning: No couples found for this distance. " msg += "Skipping annulus..." print(msg) return None # ===== angular differences if verbose: print(" Performing angular difference...") res = diff_images( cube_pos=CUBE[index_pos], cube_neg=CUBE[index_neg], rint=rhomin, rext=rhomax_opt, opt_method=opt_method, ) cube_diff, gamma, gamma_prime = res if not multiply_gamma: # reset gamma & gamma_prime to 1 (they were returned by diff_images) gamma = np.ones_like(gamma) gamma_prime = np.ones_like(gamma_prime) # TODO: gamma_info_output etc not implemented # ;Gamma_affine: # gamma_info_output[0,0,i] = min(gamma_output_ang[*,0]) # gamma_info_output[1,0,i] = max(gamma_output_ang[*,0]) # gamma_info_output[2,0,i] = mean(gamma_output_ang[*,0]) # gamma_info_output[3,0,i] = median(gamma_output_ang[*,0]) # gamma_info_output[4,0,i] = variance(gamma_output_ang[*,0]) # ;Gamma_prime: # gamma_info_output[0,1,i] = min(gamma_output_ang[*,1]) # gamma_info_output[1,1,i] = max(gamma_output_ang[*,1]) # gamma_info_output[2,1,i] = mean(gamma_output_ang[*,1]) # gamma_info_output[3,1,i] = median(gamma_output_ang[*,1]) # gamma_info_output[4,1,i] = variance(gamma_output_ang[*,1]) # # # -> they are returned, no further modification from here on. # launch andromeda core (:859) if verbose: print(" Matching...") res = andromeda_core( diffcube=cube_diff, index_neg=index_neg, index_pos=index_pos, angle_list=angle_list, psf_cube=psf_cube, homogeneous_variance=homogeneous_variance, rhomin=rhomin, rhomax=rhomax, gamma=gamma, verbose=verbose, ) # TODO: ANDROMEDA v3.1r2 calls `ANDROMEDA_CORE` with `/WITHOUT_GAMMA_INPUT`. return res # (flux, snr, likelihood, stdflux) def andromeda_core( diffcube, index_neg, index_pos, angle_list, psf_cube, rhomin, rhomax, gamma=None, homogeneous_variance=True, verbose=False, ): """ Core engine of ANDROMEDA. Estimates the flux distribution in the observation field from differential images built from different field rotation angles. Parameters ---------- diffcube : 3d ndarray Differential image cube, set of ``npairs`` differential images. Shape ``(npairs, npix, npix)``. IDL parameter: ``DIFF_IMAGES_INPUT`` index_neg : 1d ndarray index_pos : 1d ndarray angle_list : 1d ndarray IDL parameter: ``ANGLES_INPUT`` psf_cube : 4d ndarray IDL parameter: ``PSFCUBE_INPUT`` rhomin : float IDL parameter: ``RHOMIN_INPUT`` rhomax : float is ceiled for the pixel-for-loop. IDL parameter: ``RHOMAX_INPUT`` gamma IDL parameter: ``GAMMA_INPUT[*, 0]`` homogeneous_variance: bool, optional IDL parameter: ``HOMOGENEOUS_VARIANCE_INPUT`` verbose : bool, optional print more. Returns ------- flux : 2d ndarray IDL return value snr : 2d ndarray IDL output parameter: ``SNR_OUTPUT`` likelihood : 2d ndarray IDL output parameter: ``LIKELIHOOD_OUTPUT`` stdflux : 2d ndarray IDL output parameter: ``STDEVFLUX_OUTPUT`` Notes ----- - IDL 15/05/2018: add a check if there is only one couple and hence weights_diff_2D = 1. Differences from IDL implementation ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Upper case parameters/functions refer to the IDL ANDROMEDA implementation. - IDL ANDROMEDA accepts ``WITHOUT_GAMMA_INPUT`` (boolean, for test) and ``GAMMA_INPUT`` ("tuple" of ``gamma`` and ``gamma_prime``). The ``gamma_prime`` part of ``GAMMA_INPUT`` is never used inside ``ANDROMEDA_CORE``. Instead of these parameters, the python implementation accepts one single ``gamma`` parameter. - IDL's ``kmax`` was renamed to ``npairs``. - **not implemented parameters**: - The ``POSITIVITY`` parameter is not used any more in ANDROMEDA, and maybe removed in the future. It was removed in the python implementation. - ``GOOD_PIXELS_INPUT`` - This is a mask, applied to IDL's ``weight_cut`` and ``weighted_diff_images``. It is functional in ``ANDROMEDA_CORE``, but not exposed through the ``ANDROMEDA`` function. - ``MASK_INPUT`` - similar to ``GOOD_PIXELS_INPUT``, but applied to IDL's ``select_pixels`` (which controlls which pixels are processed). It is not exposed to ``ANDROMEDA``. - ``WEIGHTS_DIFF_INPUT`` - "(optional input) cube of inverse-of-variance maps. If it is not given the variance is treated as constant in time and computed empirically for each spatial position." - in the python implementation, the variance is **always** treated as constant in time. - note: ``WEIGHTS_DIFF_INPUT`` is obtained as ``WEIGHTS_OUTPUT`` from ``DIFF_IMAGES``. - ``PATTERN_OUTPUT`` - this is just an empty ``DBLARR(npix, npix, kmax)`` """ npairs, npix, _ = diffcube.shape npixpsf = psf_cube.shape[2] # shape: (p+1, p+1, x, y) precision = psf_cube.shape[0] - 1 # ===== verify + sanitize input if npix % 2 == 1: raise ValueError("size of the cube is odd!") if npixpsf % 2 == 1: raise ValueError("PSF has odd pixel size!") if gamma is None: if verbose: msg = " ANDROMEDA_CORE: The scaling factor is not taken into " msg += "account to build the model!" print(msg) # calculate variance if npairs == 1: variance_diff_2d = 1 else: variance_diff_2d = (diffcube**2).sum(0) / npairs - ( diffcube.sum(0) / npairs ) ** 2 # calculate weights from variance if homogeneous_variance: varmean = np.mean(variance_diff_2d) # idlwrap.mean weights_diff_2d = np.zeros((npix, npix)) + 1 / varmean if verbose: msg = " ANDROMEDA_CORE: Variance is considered homogeneous, mean" msg += " {:.3f}".format(varmean) print(msg) else: weights_diff_2d = variance_diff_2d > 0 weights_diff_2d /= variance_diff_2d + (variance_diff_2d == 0) if verbose: msg = " ANDROMEDA_CORE: Variance is taken equal to the " msg += "empirical variance in each pixel (inhomogeneous, but " msg += "constant in time)" print(msg) wd_images = diffcube * weights_diff_2d # create annuli d = dist_matrix(npix) select_pixels = (d > rhomin) & (d < rhomax) if verbose: msg = " ANDROMEDA_CORE: working with {} differential images, radius " msg += "{} to {}".format(npairs, rhomin, rhomax) print(msg) # definition of the expected pattern (if a planet is present) numerator = np.zeros((npix, npix)) denominator = np.ones((npix, npix)) parang = np.array( [angle_list[index_neg], angle_list[index_pos]]) * np.pi / 180 # shape (2,npairs) -> array([[1, 2, 3], # [4, 5, 6]]) (for npairs=3) # IDL: dimension = SIZE = _, npairs,2, _, _ for j in range( npix // 2 - np.ceil(rhomax).astype(int), npix // 2 + np.ceil(rhomax).astype(int) ): for i in range( npix // 2 - np.ceil(rhomax).astype(int), npix // 2 + np.ceil(rhomax).astype(int), ): # same ranges! # IDL: scans in different direction! if select_pixels[j, i]: # distance to center of rotation, in x x0 = i - (npix / 2 - 0.5) # distance to center of rotation, in y y0 = j - (npix / 2 - 0.5) decalx = x0 * np.cos(parang) - y0 * np.sin(parang) # (2,npairs) decaly = y0 * np.cos(parang) + x0 * np.sin(parang) # (2,npairs) tbr = decalx - np.floor(decalx).astype(int) subp_x = (idl_round(tbr) * precision).astype(int) # (2,npairs) tbr = decaly - np.floor(decaly).astype(int) subp_y = (idl_round(tbr) * precision).astype(int) # (2,npairs) # compute, for each k and for both positive and negative indices # the coordinates of the squares in which the psf will be placed # lef, bot, ... have shape (2,npairs) lef = npix // 2 + np.floor(decalx).astype(int) - npixpsf // 2 bot = npix // 2 + np.floor(decaly).astype(int) - npixpsf // 2 rig = npix // 2 + \ np.floor(decalx).astype(int) + npixpsf // 2 - 1 top = npix // 2 + \ np.floor(decaly).astype(int) + npixpsf // 2 - 1 # now select the minimum of the two, to compute the area to be # cut (the smallest rectangle which contains both psf's) px_xmin = np.minimum(lef[0], lef[1]) px_xmax = np.maximum(rig[0], rig[1]) px_ymin = np.minimum(bot[0], bot[1]) px_ymax = np.maximum(top[0], top[1]) # computation of planet patterns num_part = 0 den_part = 0 for k in range(npairs): # this is the innermost loop, performed MANY times patt_pos = np.zeros( (px_ymax[k] - px_ymin[k] + 1, px_xmax[k] - px_xmin[k] + 1) ) patt_neg = np.zeros( (px_ymax[k] - px_ymin[k] + 1, px_xmax[k] - px_xmin[k] + 1) ) # put the positive psf in the right place y0 = bot[1, k] - px_ymin[k] yN = bot[1, k] - px_ymin[k] + npixpsf x0 = lef[1, k] - px_xmin[k] xN = lef[1, k] - px_xmin[k] + npixpsf patt_pos[y0:yN, x0:xN] = psf_cube[subp_y[1, k], subp_x[1, k]] # TODO: should add a +1 somewhere?? # same for the negative psf, with a multiplication by gamma! y0 = bot[0, k] - px_ymin[k] yN = bot[0, k] - px_ymin[k] + npixpsf x0 = lef[0, k] - px_xmin[k] xN = lef[0, k] - px_xmin[k] + npixpsf patt_neg[y0:yN, x0:xN] = psf_cube[subp_y[0, k], subp_x[0, k]] # TODO: should add a +1 somewhere?? # subtraction between the two if gamma is None: pc = patt_pos - patt_neg else: pc = patt_pos - patt_neg * gamma[k] # compare current (2D) map of small rectangle of weights: if npairs == 1: weight_cut = weights_diff_2d else: weight_cut = weights_diff_2d[ px_ymin[k]: px_ymax[k] + 1, px_xmin[k]: px_xmax[k] + 1 ] num_part += np.sum( pc * wd_images[ k, px_ymin[k]: px_ymax[k] + 1, px_xmin[k]: px_xmax[k] + 1 ] ) den_part += np.sum(pc**2 * weight_cut) numerator[j, i] = num_part denominator[j, i] = den_part # computation of estimated flux for current assumed planet position: flux = numerator / denominator # computation of snr map: snr = numerator / np.sqrt(denominator) # computation of likelihood map: likelihood = 0.5 * snr**2 # computation of the standard deviation on the estimated flux stdflux = flux / (snr + (snr == 0)) # TODO: 0 values are replaced by 1, but # small values like 0.1 are kept. Is this # the right approach? return flux, snr, likelihood, stdflux def create_indices(angle_list, angmin, verbose=True): """ Compute the couples of indices to satisfy the minimum separation ``angmin``. Given a monotonic array of ``angle_list``, this function computes and returns the couples of indices of the array for which the separation is the closest to the value ``angmin``, by using the highest possible number of angles, all if possible. Parameters ---------- angle_list : 1d numpy ndarray ndarray containing the angles associated to each image. The array should be monotonic angmin : float The minimum acceptable difference between two angles of a couple. verbose : bool, optional Show warning if no couples can be found. Returns ------- indices_neg, indices_pos : ndarrays or None The couples of indices, so that ``index_pos[0]`` should be paired with ``index_neg[0]`` and so on. Set to None if no couples can be found. indices_not_used : list The list of the frames which were not used. This list should preferably be empty. Notes ----- - ``WASTE`` flag removed, instead this function returns ``indices_not_used`` """ # make array monotonic -> increasing if angle_list[-1] < angle_list[0]: angle_list = -angle_list good_angles = idl_where(angle_list - angle_list[0] >= angmin) if len(good_angles) == 0: if verbose: print( "Impossible to find any couple of angles! Try to " "reduce the IWA first, else you need to reduce the " "minimum separation." ) return None, None, [] indices_neg = [0] indices_pos = [good_angles[0]] indices_not_used = [] for i in range(1, len(angle_list)): good_angles = idl_where((angle_list - angle_list[i] >= angmin)) if len(good_angles) > 0: indices_neg.append(i) indices_pos.append(good_angles[0]) else: # search in other direction if i not in indices_pos: good_angles_back = idl_where( (angle_list[i] - angle_list >= angmin)) if len(good_angles_back) > 0: indices_neg.append(i) indices_pos.append(good_angles_back[-1]) else: # no new couple found indices_not_used.append(i) return np.array(indices_neg), np.array(indices_pos), indices_not_used def diff_images( cube_pos, cube_neg, rint, rext, opt_method="lsq", variance_pos=None, variance_neg=None, verbose=False, ): """ Compute the optimized difference between two cubes of images. Parameters ---------- cube_pos : 3d ndarray stack of square images (nimg x N x N) cube_neg : 3d ndarray stack of square images (nimg x N x N) rint : float inner radius of the optimization annulus (in pixels) rext : float outer radius of the optimization annulus (in pixels) opt_method : {'no', 'total', 'lsq', 'l1'}, optional Optimization for the immage difference. Numeric values kept for compatibility with the IDL version (e.g. calling both functions with the same parameters) ``"no"`` / ``1`` corresponds to ``diff_images = i1 - gamma*i2`` and ``gamma = gamma_prime = 0`` ``"total"`` / ``2`` total ratio optimization. ``diff_images = i1 - gamma*i2`` and ``gamma = sum(i1*i2 / sum(i2**2))``, ``gamma_prime = 0`` ``"lsq"`` / ``3`` least-squares optimization. ``diff_images = i1 - gamma*i2``, ``gamma = sum(i1*i2)/sum(i2**2)``, ``gamma_prime = 0`` ``"l1"`` / ``4`` L1-affine optimization, using ``fitaffine`` function. ``diff_images = i1 - gamma * i2 - gamma_prime`` verbose : bool, optional Prints some parameters, most notably the values of gamma for each difference Returns ------- cube_diff cube with differences, shape (nimg x N x N) gamma, gamma_prime arrays containing the optimization coefficient gamma and gamma'. To be used to compute the correct planet signatures used by the ANDROMEDA algorithm. Note ---- - ``GN_NO`` and ``GAIN`` keywords were never used in the IDL version, so they were not implemented. - VARIANCE_POS_INPUT, VARIANCE_NEG_INPUT, VARIANCE_TOT_OUTPUT, WEIGHTS_OUTPUT were removed - The numeric ``opt_method`` from the IDL version (``1`` for ``"no"``, etc.) are also accepted, but discouraged. Use the strings instead. """ nimg, npix, _ = cube_pos.shape # initialize cube_diff = np.zeros_like(cube_pos) gamma = np.zeros(nimg) # linear factor, per frame gamma_prime = np.zeros(nimg) # affine factor. Only !=0 for 'l1' affine fit distarray = dist_matrix(npix) annulus = (distarray > rint) & (distarray <= rext) # 2d True/False map if verbose: print("number of elements in annulus:", annulus.sum()) # compute normalization factors if opt_method in ["no", 1]: # no renormalization msg = " DIFF_IMAGES: no optimisation is being performed. Note that " msg += "keywords rint and rext will be ignored." print(msg) gamma += 1 else: if verbose: msg = " DIFF_IMAGES: optimization annulus limits: {:.1f} -> {:.1f}" print(msg.format(rint, rext)) for i in range(nimg): if opt_method in ["total", 2]: sum1 = np.sum(cube_pos[i][annulus]) sum2 = np.sum(cube_neg[i][annulus]) gamma[i] = sum1 / sum2 elif opt_method in ["lsq", 3]: sum1 = np.sum(cube_pos[i][annulus] * cube_neg[i][annulus]) sum2 = np.sum(cube_neg[i][annulus] ** 2) gamma[i] = sum1 / sum2 if verbose: msg = "DIFF_IMAGES: Factor gamma_ls for difference #{}:{}" print(msg.format(i + 1, gamma[i])) elif opt_method in ["l1", 4]: # L1-affine optimization ann_pos = cube_pos[i][annulus] ann_neg = cube_neg[i][annulus] gamma[i], gamma_prime[i] = fitaffine(y=ann_pos, x=ann_neg) if verbose: msg = " DIFF_IMAGES: Factor gamma and gamma_prime for " msg += "difference #{}/{}: {}, {}" print(msg.format(i + 1, nimg, gamma[i], gamma_prime[i])) else: raise ValueError("opt_method '{}' unknown".format(opt_method)) if verbose: msg = " DIFF_IMAGES: median gamma={:.3f}, median gamma_prime={:.3f}" print(msg.format(np.median(gamma), np.median(gamma_prime))) # compute image differences for i in range(nimg): cube_diff[i] = cube_pos[i] - cube_neg[i] * gamma[i] - gamma_prime[i] return cube_diff, gamma, gamma_prime def normalize_snr( snr, nsmooth_snr=1, iwa=None, owa=None, oversampling=None, fast=None, fit=False, show=False, ): """ Normalize each pixels of the SNR map by the robust std of its annulus. The aim is to get rid of the decreasing trend from the center of the image to its edge in order to obtain a SNR map of mean 0 and of variance 1 as expected by the algorithm if the noise model (white) was right. Thanks to this operation, a constant threshold can be applied on the SNR map to perform automatic detection. Parameters ---------- snr : 2d ndarray Square image/SNR-map to be normalized by its own radial robust standard deviation. nsmooth_snr : int [pixels], optional Number of pixel(s) over which the robust std radial profile is smoothed in the outer direction. (e.g. if ``nsmooth_snr=8``, the regarded annulus is smoothed w.r.t the 8 following adjacent pixel-annulus (at larger separation). iwa : float, optional Inner working angle in lambda/D. Radius of the smallest annulus processed by ANDROMEDA. owa : float, optional Outer working angle in lambda/D. Radius of the widest annulus processed by ANDROMEDA. oversampling : float or None, optional fast : bool (Can also be a non-zero int, as used inside ``andromeda``.) fit : bool, optional Use a 4D polynomial fit. show : bool, optional NOT IMPLEMENTED Returns ------- snr_norm Normalized SNR map of mean 0 and variance 1. snr_std In order to calculate once for all the 2D map of the SNR radial robust standard deviation, this variable records it. Note ---- - in IDL ANDROMEDA, ``/FIT`` is disabled by default, so it was not (yet) implemented. """ # ===== initialization nsnr = snr.shape[1] xcen = ycen = (nsnr - 1) / 2 # floats prof_snr = couronne_img(image=snr, xcen=xcen, ycen=ycen, verbose=False) # couronne_img, image_input=snr_input, xcen_input=xcen , ycen_input=ycen, $ # intenmoy_output=prof_snr, /SILENT it_nosmoo = np.zeros(nsnr // 2) # TODO: check even/odd frames it_robust = np.zeros(nsnr // 2) imaz_robust = np.zeros_like(snr) # ===== defaults if owa is None or oversampling is None: # If no OWA input then just take the last non-zero value dmax = nsnr // 2 else: dmax = np.ceil(owa * 2 * oversampling).astype(int) if dmax > nsnr / 2: dmax = nsnr // 2 if iwa is None or oversampling is None: # If no IWA input then just take the first non-zero value for dm in range(nsnr // 2): # TODO: floor/ceil? dmin = dm if snr[int(xcen + dm), int(ycen)] != 0: break else: dmin = np.round(iwa * 2 * oversampling).astype(int) # ===== build annulus tempo = dist_matrix(nsnr, xcen, ycen) # 2D ndarray # IDL: DIST_CIRCLE, tempo, nsnr, xcen, ycen # ===== main calculations j = 0 for i in range(dmin, dmax): if prof_snr[i] != 0: id = (tempo >= i) & (tempo <= i + nsmooth_snr) id2 = (tempo >= i - 0.5) & (tempo <= i + 0.5) id3 = (tempo >= i) & (tempo <= i + 1) it_nosmoo[i] = robust_std(snr[id3]) it_robust[i] = robust_std(snr[id]) if nsmooth_snr == 0: # IDL: IF nn EQ 1.0 imaz_robust[id3] = it_nosmoo[i] else: imaz_robust[id2] = it_robust[i] else: j = i break # IDL: `GOTO, farzone` # IDL `farzone:` dfast = 450 # [px] for SPHERE-IRDIS data # TODO: add as function argument? dnozero = snr[int(ycen), int(xcen):].nonzero()[0][-1].item() if dnozero == dmax: id5 = (tempo >= (dnozero - nsmooth_snr - 1)) & (tempo <= nsnr / 2 - 1) for i in range(dnozero - nsmooth_snr - 1, nsnr // 2): it_robust[i] = robust_std(snr[id5]) imaz_robust[id5] = it_robust[i] else: if fast and (dnozero >= dfast): # IDL: IF KEYWORD_SET(fast) # TODO: can `fast` be 0? What would happen then? for i in range(dfast - nsmooth_snr - 1, nsnr // 2): id3 = (tempo >= i) & (tempo <= i + 1) it_robust[i] = it_robust[dnozero - nsmooth_snr - 1] imaz_robust[id3] = it_robust[dnozero - nsmooth_snr - 1] else: # find the first non-zero value: k = None for i in range(j - nsmooth_snr, dnozero): if prof_snr[i] != 0: k = i if k is None: # error handling not present in IDL version. import pdb pdb.set_trace() raise RuntimeError("prof_snr is zero!") for i in range(j - nsmooth_snr, k): id = (tempo >= i) & (tempo <= dnozero) id2 = (tempo >= i - 0.5) & (tempo <= i + 0.5) id3 = (tempo >= i) & (tempo <= i + 1) id4 = (tempo >= i) & (tempo <= k) if id3.sum() > 0: # condition different from IDL version. it_nosmoo[i] = robust_std(snr[id3]) if id4.sum() > 0: it_robust[i] = robust_std(snr[id4]) if nsmooth_snr == 0: # IDL: IF nn EQ 1.0 imaz_robust[id3] = it_nosmoo[i] else: imaz_robust[id2] = it_robust[i] # using polynomial fit (4th order): # offset = 0 if fit: raise NotImplementedError("`fit` parameter is not implemented!") # xfit = np.arange(int(j - dmin + offset)) + dmin + offset # y_nosmoo = it_nosmoo[int(dmin + offset): j-1] # TODO: check ranges # ... # preview if asked: if show: raise NotImplementedError("`show` parameter is not implemented!") # xpix = np.arange(nsnr//2) # ... # normalize the SNR by its radial std: snr_norm = np.zeros((nsnr, nsnr)) # because imaz_robust has zero value, select a zone: zone = imaz_robust != 0 snr_norm[zone] = snr[zone] / imaz_robust[zone] snr_std = imaz_robust return snr_norm, snr_std def couronne_img(image, xcen, ycen=None, lieu=None, step=0.5, rmax=None, verbose=False): """ Provide intensity radial profiles of 2D images. Parameters ---------- image : 2d ndarray Input image. xcen : float Center coordinates along the horizontal direction. ycen : float, optional Center coordinates along the vertical direction. Defaults to ``xcen`` if not provided. lieu : bool mask, optional Locations of the pixels to be removed (``False``) or kepts (``True``). step : float, optional Width of the regarded annulus. rmax : int, optional Maximal radius from the image center on which calculus are performed. Defaults to half of the ``image`` size (floored). verbose : bool, optional Show more output. Returns ------- intenmoy : 1d ndarray Mean intensity per annulus. The only parameter needed for ``normalize_snr``. Note ---- **Differences from the IDL version** - All output variables except ``intenmoy_output`` are not implemented, as they are not needed for ``normalize_snr``: - inten{site,med,min,max,var,rob,cumulee}_output - imaz_{med,var,stddev,robust}_output - ``xcen`` was made a required positional argument. """ # ===== verify input if image.shape[0] != image.shape[1]: raise ValueError("`image` should be square") # ===== default values: if ycen is None: ycen = xcen if rmax is None: rmax = image.shape[0] // 2 if lieu is None: lieu = np.ones_like(image, dtype=bool) # `True` bool mask if verbose: msg = "Computation of azimuthal values from center to " "rmax={}" print(msg.format(rmax)) intenmoy = np.zeros(rmax + 1) intenmoy[0] = image[int(ycen), int(xcen)] # order? tempo = dist_matrix(image.shape[0], xcen, ycen) for i in range(1, rmax + 1): # boolean mask for annulus: mask = np.abs(tempo - i) <= step mask &= lieu if mask.sum() > 0: # check if we have matches. If `id` is full of `False`, we get a # RuntimeWarning: Mean of empty slice local = image[mask] # 1D array intenmoy[i] = np.mean(local) return intenmoy