#! /usr/bin/env python
"""
Module with completeness curve and map generation function.
.. [DAH21b]
| Dahlqvist et al. 2021b
| **Auto-RSM: An automated parameter-selection algorithm for the RSM map
exoplanet detection algorithm**
| *Astronomy & Astrophysics, Volume 656, Issue 2, p. 54*
| `https://arxiv.org/abs/2109.14318
<https://arxiv.org/abs/2109.14318>`_
.. [JEN18]
| Jensen-Clem et al. 2018
| **A New Standard for Assessing the Performance of High Contrast Imaging
Systems**
| *The Astrophysical Journal, Volume 155, Issue 1, p. 19*
| `https://arxiv.org/abs/1711.01215
<https://arxiv.org/abs/1711.01215>`_
.. [MAW14]
| Mawet et al. 2014
| **Fundamental Limitations of High Contrast Imaging Set by Small Sample
Statistics**
| *The Astrophysical Journal, Volume 792, Issue 1, p. 97*
| `https://arxiv.org/abs/1407.2247
<https://arxiv.org/abs/1407.2247>`_
"""
__author__ = "C.H. Dahlqvist, V. Christiaens, T. Bédrine"
__all__ = ["completeness_curve", "completeness_map"]
from math import gcd
from inspect import getfullargspec
from multiprocessing import cpu_count
import numpy as np
from astropy.convolution import convolve, Tophat2DKernel
from matplotlib import pyplot as plt
from skimage.draw import disk
from .contrcurve import contrast_curve
from .snr_source import snrmap, _snr_approx, snr
from ..config.utils_conf import pool_map, iterable, vip_figsize, vip_figdpi
from ..fm import cube_inject_companions, normalize_psf
from ..fm.utils_negfc import find_nearest
from ..preproc import cube_crop_frames
from ..var import get_annulus_segments, frame_center
def _estimate_snr_fc(
a,
b,
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot=1,
approximated=True,
):
cubefc = cube_inject_companions(
cube,
psf,
angle_list,
flevel=level * starphot,
plsc=0.1,
rad_dists=a,
theta=b / n_fc * 360,
n_branches=1,
verbose=False,
)
if isinstance(fwhm, (np.ndarray, list)):
fwhm_med = np.median(fwhm)
else:
fwhm_med = fwhm
if cube.ndim == 4:
cy, cx = frame_center(cube[0, 0, :, :])
else:
cy, cx = frame_center(cube[0])
# TODO: Clean below?
# Consider 3 cases depending on whether algo is (i) defined externally,
# (ii) a VIP postproc algorithm; (iii) ineligible for contrast curves
argl = getfullargspec(algo).args
if "cube" in argl and "angle_list" in argl and "verbose" in argl:
# (i) external algorithm with appropriate parameters [OK]
pass
else:
algo_name = algo.__name__
idx = algo.__module__.index(
'.', algo.__module__.index('.') + 1)
mod = algo.__module__[:idx]
tmp = __import__(
mod, fromlist=[algo_name.upper()+'_Params'])
algo_params = getattr(tmp, algo_name.upper()+'_Params')
argl = [attr for attr in vars(algo_params)]
if "cube" in argl and "angle_list" in argl and "verbose" in argl:
# (ii) a VIP postproc algorithm [OK]
pass
else:
# (iii) ineligible routine for contrast curves [Raise error]
msg = "Ineligible algo for contrast curve function. algo should "
msg += "have parameters 'cube', 'angle_list' and 'verbose'"
raise TypeError(msg)
# algo_params = signature(algo).parameters
# param_name = next(iter(algo_params))
# class_name = algo_params[param_name].annotation
# argl = [attr for attr in vars(class_name)]
if "verbose" in argl:
algo_dict["verbose"] = False
if "fwhm" in argl:
algo_dict["fwhm"] = fwhm_med
if "radius_int" in argl:
if algo_dict.get("asize") is None:
annulus_width = int(np.ceil(fwhm))
elif isinstance(algo_dict.get("asize"), (int, float)):
annulus_width = algo_dict.get("asize")
if a > 2 * annulus_width:
n_annuli = 5
radius_int = (a // annulus_width - 2) * annulus_width
else:
n_annuli = 4
radius_int = (a // annulus_width - 1) * annulus_width
if 2 * (radius_int + n_annuli * annulus_width) < cube.shape[-1]:
cubefc_crop = cube_crop_frames(
cubefc,
int(2 * (radius_int + n_annuli * annulus_width)),
xy=(cx, cy),
verbose=False,
)
else:
cubefc_crop = cubefc
frame_temp = algo(
cube=cubefc_crop, angle_list=angle_list, radius_int=radius_int, **algo_dict
)
frame_fin = np.zeros((cube.shape[-2], cube.shape[-1]))
indices = get_annulus_segments(
frame_fin, 0, radius_int + n_annuli * annulus_width, 1
)
sub = (frame_fin.shape[0] - frame_temp.shape[0]) // 2
frame_fin[indices[0][0], indices[0][1]] = frame_temp[
indices[0][0] - sub, indices[0][1] - sub
]
else:
frame_fin = algo(cube=cubefc, angle_list=angle_list, **algo_dict)
snrmap_temp = np.zeros_like(frame_fin)
cy, cx = frame_center(frame_fin)
if "radius_int" in argl:
mask = get_annulus_segments(
frame_fin, a - (fwhm_med // 2), fwhm_med + 1, mode="mask"
)[0]
else:
width = min(frame_fin.shape) / 2 - 1.5 * fwhm_med
mask = get_annulus_segments(frame_fin, (fwhm_med / 2) + 2, width, mode="mask")[
0
]
bmask = np.ma.make_mask(mask)
yy, xx = np.where(bmask)
if approximated:
coords = [(int(x), int(y)) for (x, y) in zip(xx, yy)]
tophat_kernel = Tophat2DKernel(fwhm / 2)
frame_fin = convolve(frame_fin, tophat_kernel)
res = pool_map(1, _snr_approx, frame_fin,
iterable(coords), fwhm_med, cy, cx)
res = np.array(res, dtype=object)
yy = res[:, 0]
xx = res[:, 1]
snr_value = res[:, 2]
snrmap_temp[yy.astype(int), xx.astype(int)] = snr_value
else:
coords = zip(xx, yy)
res = pool_map(
1, snr, frame_fin, iterable(
coords), fwhm_med, True, None, False, True
)
res = np.array(res, dtype=object)
yy = res[:, 0]
xx = res[:, 1]
snr_value = res[:, -1]
snrmap_temp[yy.astype("int"), xx.astype("int")] = snr_value
snrmap_fin = np.where(
abs(np.nan_to_num(snrmap_temp)) > 0.000001, 0, snrmap_empty
) + np.nan_to_num(snrmap_temp)
y, x = frame_fin.shape
twopi = 2 * np.pi
sigposy = int(y / 2 + np.sin(b / n_fc * twopi) * a)
sigposx = int(x / 2 + np.cos(b / n_fc * twopi) * a)
indc = disk((sigposy, sigposx), 4)
max_target = np.nan_to_num(snrmap_fin[indc[0], indc[1]]).max()
snrmap_fin[indc[0], indc[1]] = 0
max_map = np.nan_to_num(snrmap_fin).max()
if b == 2 and max_target - max_map < 0:
from hciplot import plot_frames
plot_frames((snrmap_empty, snrmap_temp, snrmap_fin))
return max_target - max_map, b
# TODO: Include algo_class modifications in any tutorial using this function
[docs]
def completeness_curve(
cube,
angle_list,
psf,
fwhm,
algo,
an_dist=None,
ini_contrast=None,
starphot=1,
pxscale=0.1,
n_fc=20,
completeness=0.95,
snr_approximation=True,
max_iter=50,
nproc=1,
algo_dict={},
verbose=True,
plot=True,
dpi=vip_figdpi,
save_plot=None,
object_name=None,
fix_y_lim=(),
figsize=vip_figsize,
algo_class=None,
):
"""
Function allowing the computation of completeness-based contrast curves with
any of the psf-subtraction algorithms provided by VIP. The code relies on
the approach proposed in [DAH21b]_, itself inspired by the framework
developed in [JEN18]_. It relies on the computation of the contrast
associated to a completeness level achieved at a level defined as the first
false positive in the original SNR map (brightest speckle observed in the
empty map) instead of the computation o the local noise and throughput (see
the ``vip_hci.metrics.contrast_curve`` function). The computation of the
completeness level associated to a contrast is done via the sequential
injection of multiple fake companions. The algorithm uses multiple
interpolations to find the contrast associated to the selected completeness
level (0.95 by default). More information about the algorithm can be found
in [DAH21b]_.
Parameters
----------
cube : 3d or 4d numpy ndarray
The input cube, 3d (ADI data) or 4d array (IFS data), without fake
companions.
angle_list : 1d numpy ndarray
Vector with parallactic angles.
psf : 2d or 3d numpy ndarray
Frame with the psf template for the fake companion(s).
PSF must be centered in array. Normalization is done internally.
fwhm: int or float or 1d array, optional
The the Full Width Half Maximum in pixels. It can handle a different
FWHM value for different wavelengths (IFS data).
algo : callable or function
The post-processing algorithm, e.g. ``vip_hci.pca.pca``.
an_dist: list or ndarray, optional
List of angular separations for which a contrast has to be estimated.
Default is None which corresponds to a range spanning 2 FWHM to half
the size of the provided cube - PSF size //2 with a step of 5 pixels
ini_contrast: list, 1d ndarray or None, optional
Initial contrast for the range of angular separations included in
`an_dist`. The number of initial contrasts should be equivalent to the
number of angular separations. Default is None which corresponds to the
5-sigma contrast_curve obtained with ``vip_hci.metrics.contrast_curve``.
starphot : int or float or 1d array, optional
If int or float it corresponds to the aperture photometry of the
non-coronagraphic PSF which we use to scale the contrast.
Default is 1, which corresponds to an output contrast expressed in ADU.
pxscale : float, optional
Plate scale or pixel scale of the instrument. Only used for plots.
n_fc: int, optional
Number of azimuths considered for the computation of the True
positive rate/completeness,(number of fake companions injected
sequentially). The number of azimuths is defined such that the
selected completeness is reachable (e.g. 95% of completeness
requires at least 20 fake companion injections). Default 20.
completeness: float, optional
The completeness level to be achieved when computing the contrasts,
i.e. the True positive rate reached at the threshold associated to
the first false positive (the first false positive is defined as
the brightest speckle present in the entire detection map).
Default 95.
snr_approximation : bool, optional
If True, an approximated S/N map is generated. If False the
approach of [MAW14]_ is used. Default is True, for speed.
max_iter: int, optional
Maximum number of iterations to consider in the search for the contrast
level achieving desired completeness before considering it unachievable.
nproc : int or None, optional
Number of processes for parallel computing.
algo_dict: dictionary, optional
Any other valid parameter of the post-processing algorithms can be
passed here, including e.g. imlib and interpolation.
verbose : bool, optional
Whether to print more info while running the algorithm. Default: True.
plot : bool, optional
Whether to plot the final contrast curve or not. True by default.
dpi : int optional
Dots per inch for the plots. 100 by default. 300 for printing quality.
save_plot: string or None, optional
If provided, the contrast curve will be saved to this path.
object_name: string or None, optional
Target name, used in the plot title.
fix_y_lim: tuple, optional
If provided, the y axis limits will be fixed, for easier comparison
between plots.
fig_size: tuple, optional
Figure size
Returns
-------
an_dist: 1d numpy ndarray
Radial distances where the contrasts are calculated
cont_curve: 1d numpy ndarray
Contrasts for the considered radial distances and selected completeness
level.
"""
if (100 * completeness) % (100 / n_fc) > 0:
n_fc = int(100 / gcd(int(100 * completeness), 100))
if cube.ndim != 3 and cube.ndim != 4:
raise TypeError("The input array is not a 3d or 4d cube")
if cube.ndim == 3 and (cube.shape[0] != angle_list.shape[0]):
raise TypeError("Input parallactic angles vector has wrong length")
if cube.ndim == 4 and (cube.shape[1] != angle_list.shape[0]):
raise TypeError("Input parallactic angles vector has wrong length")
if cube.ndim == 3 and psf.ndim != 2:
raise TypeError("Template PSF is not a frame (for ADI case)")
if cube.ndim == 4 and psf.ndim != 3:
raise TypeError("Template PSF is not a cube (for ADI+IFS case)")
if nproc is None:
nproc = cpu_count() // 2
if isinstance(fwhm, (np.ndarray, list)):
fwhm_med = np.median(fwhm)
else:
fwhm_med = fwhm
if an_dist is None:
an_dist = np.array(
range(2 * round(fwhm_med),
int(cube.shape[-1] // 2 - 2 * fwhm_med), 5)
)
print("an_dist not provided, the following list will be used:", an_dist)
elif an_dist[-1] > cube.shape[-1] // 2 - 2 * fwhm_med:
raise TypeError("Please decrease the maximum annular distance")
if ini_contrast is None:
print("Contrast curve not provided => will be computed first...")
ini_cc = contrast_curve(
cube,
angle_list,
psf,
fwhm_med,
pxscale,
starphot,
algo,
sigma=3,
nbranch=1,
theta=0,
inner_rad=1,
wedge=(0, 360),
fc_snr=100,
plot=False,
algo_class=algo_class,
**algo_dict,
)
ini_rads = np.array(ini_cc["distance"])
ini_cc = np.array(ini_cc["sensitivity_student"])
if np.amax(an_dist) > np.amax(ini_rads):
msg = "Max requested annular distance larger than covered by "
msg += "contrast curve. Please decrease the maximum annular distance"
raise ValueError(msg)
# find closest contrast values to requested radii
ini_contrast = []
for aa, ad in enumerate(an_dist):
idx = find_nearest(ini_rads, ad)
ini_contrast.append(ini_cc[idx])
if verbose:
print("Calculating initial SNR map with no injected companion...")
# TODO: Clean below?
# Consider 3 cases depending on whether algo is (i) defined externally,
# (ii) a VIP postproc algorithm; (iii) ineligible for contrast curves
argl = getfullargspec(algo).args
if "cube" in argl and "angle_list" in argl and "verbose" in argl:
# (i) external algorithm with appropriate parameters [OK]
pass
else:
algo_name = algo.__name__
idx = algo.__module__.index(
'.', algo.__module__.index('.') + 1)
mod = algo.__module__[:idx]
tmp = __import__(
mod, fromlist=[algo_name.upper()+'_Params'])
algo_params = getattr(tmp, algo_name.upper()+'_Params')
argl = [attr for attr in vars(algo_params)]
if "cube" in argl and "angle_list" in argl and "verbose" in argl:
# (ii) a VIP postproc algorithm [OK]
pass
else:
# (iii) ineligible routine for contrast curves [Raise error]
msg = "Ineligible algo for contrast curve function. algo should "
msg += "have parameters 'cube', 'angle_list' and 'verbose'"
raise TypeError(msg)
if "cube" in argl and "angle_list" in argl:
if "fwhm" in argl:
frame_fin = algo(
cube=cube,
angle_list=angle_list,
fwhm=fwhm_med,
verbose=False,
**algo_dict,
)
else:
frame_fin = algo(
cube=cube, angle_list=angle_list, verbose=False, **algo_dict
)
else:
raise ValueError("'cube' and 'angle_list' must be arguments of algo")
snrmap_empty = snrmap(
frame_fin,
fwhm,
approximated=snr_approximation,
plot=False,
known_sources=None,
nproc=nproc,
array2=None,
use2alone=False,
exclude_negative_lobes=False,
verbose=False,
)
cont_curve = np.zeros((len(an_dist)))
# We crop the PSF and check if PSF has been normalized (so that flux in
# 1*FWHM aperture = 1) and fix if needed
new_psf_size = int(round(3 * fwhm_med))
if new_psf_size % 2 == 0:
new_psf_size += 1
# Normalize psf
psf = normalize_psf(
psf, fwhm=fwhm, verbose=False, size=min(new_psf_size, psf.shape[1])
)
for k in range(len(an_dist)):
a = an_dist[k]
level = ini_contrast[k]
pos_detect = []
if verbose:
print("*** Calculating contrast at r = {} ***".format(a))
detect_bound = [None, None]
level_bound = [None, None]
ii = 0
err_msg = "Could not converge on a contrast level matching required "
err_msg += "completeness within {} iterations. Tested level: {}. "
err_msg += "Is there too much self-subtraction? Consider decreasing "
err_msg += "ncomp if using PCA, or increasing minimum requested radius."
while len(pos_detect) == 0 and ii < max_iter:
pos_detect = []
pos_non_detect = []
val_detect = []
val_non_detect = []
res = pool_map(
nproc,
_estimate_snr_fc,
a,
iterable(range(0, n_fc)),
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot,
approximated=snr_approximation,
)
for res_i in res:
if res_i[0] > 0:
pos_detect.append(res_i[1])
val_detect.append(res_i[0])
else:
pos_non_detect.append(res_i[1])
val_non_detect.append(res_i[0])
if len(pos_detect) == 0:
level = level * 1.5
ii += 1
if verbose:
msg = "Found contrast level for first TP detection: {}"
print(msg.format(level))
if ii == max_iter:
raise ValueError(err_msg.format(max_iter, level))
if len(pos_detect) > round(completeness * n_fc):
detect_bound[1] = len(pos_detect)
level_bound[1] = level
elif len(pos_detect) < round(completeness * n_fc):
detect_bound[0] = len(pos_detect)
level_bound[0] = level
pos_non_detect_temp = pos_non_detect.copy()
val_non_detect_temp = val_non_detect.copy()
pos_detect_temp = pos_detect.copy()
val_detect_temp = val_detect.copy()
cond1 = detect_bound[0] is None or detect_bound[1] is None
cond2 = len(pos_detect) != round(completeness * n_fc)
ii = 0
while cond1 and cond2 and ii < max_iter:
if detect_bound[0] is None:
level = level * 0.5
pos_detect = []
pos_non_detect = []
val_detect = []
val_non_detect = []
res = pool_map(
nproc,
_estimate_snr_fc,
a,
iterable(range(0, n_fc)),
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot,
approximated=snr_approximation,
)
for res_i in res:
if res_i[0] > 0:
pos_detect.append(res_i[1])
val_detect.append(res_i[0])
else:
pos_non_detect.append(res_i[1])
val_non_detect.append(res_i[0])
comp_temp = round(completeness * n_fc)
if len(pos_detect) > comp_temp and level_bound[1] > level:
detect_bound[1] = len(pos_detect)
level_bound[1] = level
elif len(pos_detect) < comp_temp:
detect_bound[0] = len(pos_detect)
level_bound[0] = level
pos_non_detect_temp = pos_non_detect.copy()
val_non_detect_temp = val_non_detect.copy()
pos_detect_temp = pos_detect.copy()
val_detect_temp = val_detect.copy()
elif detect_bound[1] is None:
level = level * 1.5
res = pool_map(
nproc,
_estimate_snr_fc,
a,
iterable(-np.sort(-np.array(pos_non_detect))),
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot,
approximated=snr_approximation,
)
it = len(pos_non_detect) - 1
for res_i in res:
if res_i[0] > 0:
pos_detect.append(res_i[1])
val_detect.append(res_i[0])
del pos_non_detect[it]
del val_non_detect[it]
it -= 1
comp_temp = round(completeness * n_fc)
if len(pos_detect) > comp_temp:
detect_bound[1] = len(pos_detect)
level_bound[1] = level
elif len(pos_detect) < comp_temp and level_bound[0] < level:
detect_bound[0] = len(pos_detect)
level_bound[0] = level
pos_non_detect_temp = pos_non_detect.copy()
val_non_detect_temp = val_non_detect.copy()
pos_detect_temp = pos_detect.copy()
val_detect_temp = val_detect.copy()
cond1 = detect_bound[0] is None or detect_bound[1] is None
cond2 = len(pos_detect) != comp_temp
ii += 1
if verbose:
msg = "Found lower and upper bounds of sought contrast: {}"
print(msg.format(level_bound))
if ii == max_iter:
raise ValueError(err_msg.format(max_iter, level))
if len(pos_detect) != round(completeness * n_fc):
pos_non_detect = pos_non_detect_temp.copy()
val_non_detect = val_non_detect_temp.copy()
pos_detect = pos_detect_temp.copy()
val_detect = val_detect_temp.copy()
ii = 0
while len(pos_detect) != round(completeness * n_fc) and ii < max_iter:
fact = (level_bound[1] - level_bound[0]) / (
detect_bound[1] - detect_bound[0]
)
level = level_bound[0] + fact * \
(completeness * n_fc - detect_bound[0])
res = pool_map(
nproc,
_estimate_snr_fc,
a,
iterable(-np.sort(-np.array(pos_non_detect))),
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot,
approximated=snr_approximation,
)
it = len(pos_non_detect) - 1
for res_i in res:
if res_i[0] > 0:
pos_detect.append(res_i[1])
val_detect.append(res_i[0])
del pos_non_detect[it]
del val_non_detect[it]
it -= 1
comp_temp = round(completeness * n_fc)
if len(pos_detect) > comp_temp:
detect_bound[1] = len(pos_detect)
level_bound[1] = level
elif len(pos_detect) < comp_temp and level_bound[0] < level:
detect_bound[0] = len(pos_detect)
level_bound[0] = level
pos_non_detect_temp = pos_non_detect.copy()
val_non_detect_temp = val_non_detect.copy()
pos_detect_temp = pos_detect.copy()
val_detect_temp = val_detect.copy()
if len(pos_detect) != comp_temp:
pos_non_detect = pos_non_detect_temp.copy()
val_non_detect = val_non_detect_temp.copy()
pos_detect = pos_detect_temp.copy()
val_detect = val_detect_temp.copy()
ii += 1
if ii == max_iter:
raise ValueError(err_msg.format(max_iter, level))
if verbose:
msg = "=> found final contrast for {}% completeness: {}"
print(msg.format(completeness * 100, level))
cont_curve[k] = level
# plotting
if plot:
an_dist_arcsec = np.asarray(an_dist) * pxscale
label = ["Sensitivity"]
plt.rc("savefig", dpi=dpi)
fig = plt.figure(figsize=figsize, dpi=dpi)
ax1 = fig.add_subplot(111)
(con1,) = ax1.plot(
an_dist_arcsec, cont_curve, "-", alpha=0.2, lw=2, color="green"
)
(con2,) = ax1.plot(an_dist_arcsec, cont_curve, ".", alpha=0.2, color="green")
lege = [(con1, con2)]
plt.legend(lege, label, fancybox=True, fontsize="medium")
plt.xlabel("Angular separation [arcsec]")
plt.ylabel(str(int(completeness * 100)) + "% completeness contrast")
plt.grid("on", which="both", alpha=0.2, linestyle="solid")
ax1.set_yscale("log")
ax1.set_xlim(0, 1.1 * np.max(an_dist_arcsec))
# Give a title to the contrast curve plot
if object_name is not None:
# Retrieve ncomp and pca_type info to use in title
ncomp = algo_dict["ncomp"]
if algo_dict["cube_ref"] is None:
pca_type = "ADI"
else:
pca_type = "RDI"
title = "{} {} {}pc".format(pca_type, object_name, ncomp)
plt.title(title, fontsize=14)
# Option to fix the y-limit
if len(fix_y_lim) == 2:
min_y_lim = min(fix_y_lim[0], fix_y_lim[1])
max_y_lim = max(fix_y_lim[0], fix_y_lim[1])
ax1.set_ylim(min_y_lim, max_y_lim)
# Optionally, save the figure to a path
if save_plot is not None:
fig.savefig(save_plot, dpi=dpi)
return an_dist, cont_curve
# TODO: Include the algo_class in the metrics tutorial !!
[docs]
def completeness_map(
cube,
angle_list,
psf,
fwhm,
algo,
an_dist,
ini_contrast,
starphot=1,
n_fc=20,
snr_approximation=True,
nproc=1,
algo_dict={},
verbose=True,
algo_class=None,
):
"""
Function allowing the computation of two dimensional (radius and
completeness) contrast curves with any psf-subtraction algorithm provided by
VIP. The code relies on the approach proposed by [DAH21b]_, itself inspired
by the framework developped in [JEN18]_. It relies on the computation of
the contrast associated to a completeness level achieved at a level defined
as the first false positive in the original SNR map (brightest speckle
observed in the empty map). The computation of the completeness level
associated to a contrast is done via the sequential injection of multiple
fake companions. The algorithm uses multiple interpolations to find the
contrast associated to the selected completeness level (0.95 by default).
The function allows the computation of three dimensional completeness maps,
with contrasts computed for multiple completeness level, allowing the
reconstruction of the contrast/completeness distribution for every
considered angular separations. For more details see [DAH21b]_.
Parameters
----------
cube : 3d or 4d numpy ndarray
The input cube, 3d (ADI data) or 4d array (IFS data), without fake
companions.
angle_list : 1d numpy ndarray
Vector with parallactic angles.
psf : 2d or 3d numpy ndarray
Frame with the psf template for the fake companion(s).
PSF must be centered in array. Normalization is done internally.
fwhm: int or float or 1d array, optional
The the Full Width Half Maximum in pixels. It can handle a different
FWHM value for different wavelengths (IFS data).
algo : callable or function
The post-processing algorithm, e.g. vip_hci.pca.pca.
an_dist: list or ndarray
List of angular separations for which a contrast has to be estimated.
Default is None which corresponds to a range of spanning between 2
FWHM and half the size of the provided cube - PSF size //2 with a
step of 5 pixels
ini_contrast: list, 1d ndarray or None, optional
Initial contrast for the range of angular separations included in
`an_dist`. The number of initial contrasts should be equivalent to the
number of angular separations. Default is None which corresponds to the
5-sigma contrast_curve obtained with ``vip_hci.metrics.contrast_curve``.
starphot : int or float or 1d array
If int or float it corresponds to the aperture photometry of the
non-coronagraphic PSF which we use to scale the contrast.
Default is 1 which corresponds to the contrast expressed in ADU.
n_fc: int, optional
Number of azimuths considered for the computation of the True
positive rate/completeness, (number of fake companions injected
separately). The range of achievable completeness depends on the
number of considered azimuths (the minimum completeness is defined
as 1/n_fc and the maximum is 1-1/n_fc). Default is 20.
snr_approximated : bool, optional
If True, an approximated S/N map is generated. If False the
approach of [MAW14]_ is used. Default is True
nproc : int or None
Number of processes for parallel computing.
algo_dict
Any other valid parameter of the post-processing algorithms can be
passed here, including e.g. imlib and interpolation.
verbose : Boolean, optional
If True the function prints intermediate info about the comptation of
the completeness map. Default is True.
Returns
-------
an_dist: 1d numpy ndarray
Radial distances where the contrasts are calculated
comp_levels: 1d numpy ndarray
Completeness levels for which the contrasts are calculated
cont_curve: 2d numpy ndarray
Contrast matrix, with the first axis associated to the radial distances
and the second axis associated to the completeness level, calculated
from 1/n_fc to (n_fc-1)/n_fc.
"""
if cube.ndim != 3 and cube.ndim != 4:
raise TypeError("The input array is not a 3d or 4d cube")
if cube.ndim == 3 and (cube.shape[0] != angle_list.shape[0]):
raise TypeError("Input parallactic angles vector has wrong length")
if cube.ndim == 4 and (cube.shape[1] != angle_list.shape[0]):
raise TypeError("Input parallactic angles vector has wrong length")
if cube.ndim == 3 and psf.ndim != 2:
raise TypeError("Template PSF is not a frame (for ADI case)")
if cube.ndim == 4 and psf.ndim != 3:
raise TypeError("Template PSF is not a cube (for ADI+IFS case)")
if nproc is None:
nproc = cpu_count() // 2
if isinstance(fwhm, (np.ndarray, list)):
fwhm_med = np.median(fwhm)
else:
fwhm_med = fwhm
if an_dist is None:
an_dist = np.array(
range(2 * round(fwhm), cube.shape[-1] // 2 - 2 * fwhm_med, 5)
)
elif an_dist[-1] > cube.shape[-1] // 2 - 2 * fwhm_med:
raise TypeError("Please decrease the maximum annular distance")
if ini_contrast is None:
print("Contrast curve not provided => will be computed first...")
# pxscale unused if plot=False
ini_cc = contrast_curve(
cube,
angle_list,
psf,
fwhm_med,
pxscale=0.1,
starphot=starphot,
algo=algo,
sigma=3,
plot=False,
**algo_dict,
)
ini_rads = np.array(ini_cc["distance"])
ini_cc = np.array(ini_cc["sensitivity_student"])
if np.amax(an_dist) > np.amax(ini_rads):
msg = "Max requested annular distance larger than covered by "
msg += "contrast curve. Please decrease the maximum annular distance"
raise ValueError(msg)
# find closest contrast values to requested radii
ini_contrast = []
for aa, ad in enumerate(an_dist):
idx = find_nearest(ini_rads, ad)
ini_contrast.append(ini_cc[idx])
# argl = getfullargspec(algo).args # does not work with recent object support
"""Because all psfsub algorithms now take more vague parameters, looking for
specific named arguments can be a puzzle. We have to first identify the parameters
tied to the algorithm by looking at its object of parameters."""
# TODO: Clean below?
# Consider 3 cases depending on whether algo is (i) defined externally,
# (ii) a VIP postproc algorithm; (iii) ineligible for contrast curves
argl = getfullargspec(algo).args
if "cube" in argl and "angle_list" in argl and "verbose" in argl:
# (i) external algorithm with appropriate parameters [OK]
pass
else:
algo_name = algo.__name__
idx = algo.__module__.index(
'.', algo.__module__.index('.') + 1)
mod = algo.__module__[:idx]
tmp = __import__(
mod, fromlist=[algo_name.upper()+'_Params'])
algo_params = getattr(tmp, algo_name.upper()+'_Params')
argl = [attr for attr in vars(algo_params)]
if "cube" in argl and "angle_list" in argl and "verbose" in argl:
# (ii) a VIP postproc algorithm [OK]
pass
else:
# (iii) ineligible routine for contrast curves [Raise error]
msg = "Ineligible algo for contrast curve function. algo should "
msg += "have parameters 'cube', 'angle_list' and 'verbose'"
raise TypeError(msg)
if "cube" in argl and "angle_list" in argl and "verbose" in argl:
if "fwhm" in argl:
frame_fin = algo(
cube=cube,
angle_list=angle_list,
fwhm=fwhm_med,
verbose=False,
**algo_dict,
)
else:
frame_fin = algo(
cube=cube, angle_list=angle_list, verbose=False, **algo_dict
)
snrmap_empty = snrmap(
frame_fin,
fwhm_med,
approximated=snr_approximation,
plot=False,
known_sources=None,
nproc=nproc,
array2=None,
use2alone=False,
exclude_negative_lobes=False,
verbose=False,
)
contrast_matrix = np.zeros((len(an_dist), n_fc + 1))
detect_pos_matrix = [[]] * (n_fc + 1)
for k in range(0, len(an_dist)):
a = an_dist[k]
level = ini_contrast[k]
pos_detect = []
det_bound = [None, None]
lvl_bound = [None, None]
print("Starting annulus " + "{}".format(a))
while len(pos_detect) == 0:
pos_detect = []
pos_non_detect = []
res = pool_map(
nproc,
_estimate_snr_fc,
a,
iterable(range(0, n_fc)),
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot,
approximated=snr_approximation,
)
for res_i in res:
if res_i[0] > 0:
pos_detect.append(res_i[1])
else:
pos_non_detect.append(res_i[1])
contrast_matrix[k, len(pos_detect)] = level
detect_pos_matrix[len(pos_detect)] = [
list(pos_detect.copy()),
list(pos_non_detect.copy()),
]
if len(pos_detect) == 0:
level = level * 1.5
while contrast_matrix[k, 0] == 0:
level = level * 0.75
res = pool_map(
nproc,
_estimate_snr_fc,
a,
iterable(-np.sort(-np.array(pos_detect))),
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot,
approximated=snr_approximation,
)
it = len(pos_detect) - 1
for res_i in res:
if res_i[0] < 0:
pos_non_detect.append(res_i[1])
del pos_detect[it]
it -= 1
contrast_matrix[k, len(pos_detect)] = level
detect_pos_matrix[len(pos_detect)] = [
list(pos_detect.copy()),
list(pos_non_detect.copy()),
]
if verbose:
print("Lower bound ({:.0f}%) found: {}".format(100 / n_fc, level))
level = contrast_matrix[k, np.where(contrast_matrix[k, :] > 0)[0][-1]]
pos_detect = []
pos_non_detect = list(np.arange(0, n_fc))
while contrast_matrix[k, n_fc] == 0:
level = level * 1.25
res = pool_map(
nproc,
_estimate_snr_fc,
a,
iterable(-np.sort(-np.array(pos_non_detect))),
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot,
approximated=snr_approximation,
)
it = len(pos_non_detect) - 1
for res_i in res:
if res_i[0] > 0:
pos_detect.append(res_i[1])
del pos_non_detect[it]
it -= 1
contrast_matrix[k, len(pos_detect)] = level
detect_pos_matrix[len(pos_detect)] = [
list(pos_detect.copy()),
list(pos_non_detect.copy()),
]
if verbose:
print(
"Upper bound ({:.0f}%) found: {}".format(
100 * (n_fc - 1) / n_fc, level)
)
missing = np.where(contrast_matrix[k, :] == 0)[0]
computed = np.where(contrast_matrix[k, :] > 0)[0]
while len(missing) > 0:
pos_temp = np.argmax((computed - missing[0])[computed < missing[0]])
det_bound[0] = computed[pos_temp]
lvl_bound[0] = contrast_matrix[k, det_bound[0]]
sort_temp = np.sort((missing[0] - computed))
sort_temp = sort_temp[np.sort((missing[0] - computed)) < 0]
det_bound[1] = -np.sort(-computed)[np.argmax(sort_temp)]
lvl_bound[1] = contrast_matrix[k, det_bound[1]]
it = 0
while len(pos_detect) != missing[0]:
if (
np.argmin(
[
len(detect_pos_matrix[det_bound[1]][0]),
len(detect_pos_matrix[det_bound[0]][1]),
]
)
== 0
):
pos_detect = np.sort(detect_pos_matrix[det_bound[1]][0])
pos_non_detect = np.sort(detect_pos_matrix[det_bound[1]][1])
pos_detect = list(pos_detect)
pos_non_detect = list(pos_non_detect)
num = lvl_bound[1] - lvl_bound[0]
denom = det_bound[1] - det_bound[0]
level = lvl_bound[1] + num * \
(missing[0] - det_bound[1]) / denom
res = pool_map(
nproc,
_estimate_snr_fc,
a,
iterable(-np.sort(-np.array(pos_detect))),
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot,
approximated=snr_approximation,
)
it = len(pos_detect) - 1
for res_i in res:
if res_i[0] < 0:
pos_non_detect.append(res_i[1])
del pos_detect[it]
it -= 1
else:
pos_detect = np.sort(detect_pos_matrix[det_bound[0]][0])
pos_non_detect = np.sort(detect_pos_matrix[det_bound[0]][1])
pos_detect = list(pos_detect)
pos_non_detect = list(pos_non_detect)
num = lvl_bound[1] - lvl_bound[0]
denom = det_bound[1] - det_bound[0]
level = lvl_bound[0] + num * \
(missing[0] - det_bound[0]) / denom
res = pool_map(
nproc,
_estimate_snr_fc,
a,
iterable(-np.sort(-np.array(pos_non_detect))),
level,
n_fc,
cube,
psf,
angle_list,
fwhm,
algo,
algo_dict,
snrmap_empty,
starphot,
approximated=snr_approximation,
)
it = len(pos_non_detect) - 1
for res_i in res:
if res_i[0] > 0:
pos_detect.append(res_i[1])
del pos_non_detect[it]
it -= 1
if len(pos_detect) > missing[0]:
det_bound[1] = len(pos_detect)
lvl_bound[1] = level
elif len(pos_detect) < missing[0] and lvl_bound[0] < level:
det_bound[0] = len(pos_detect)
lvl_bound[0] = level
contrast_matrix[k, len(pos_detect)] = level
detect_pos_matrix[len(pos_detect)] = [
list(pos_detect.copy()),
list(pos_non_detect.copy()),
]
if len(pos_detect) == missing[0]:
if verbose:
print(
"Data point "
+ "{}".format(len(pos_detect) / n_fc)
+ " found. Still "
+ "{}".format(len(missing) - it - 1)
+ " data point(s) missing"
)
computed = np.where(contrast_matrix[k, :] > 0)[0]
missing = np.where(contrast_matrix[k, :] == 0)[0]
comp_levels = np.linspace(1 / n_fc, 1 - 1 / n_fc, n_fc - 1, endpoint=True)
return an_dist, comp_levels, contrast_matrix[:, 1:-1]