#! /usr/bin/env python
"""
Module with S/N calculation functions. We strongly recommend users to read
[MAW14]_ before using routines of this module.
"""
__author__ = 'Carlos Alberto Gomez Gonzalez, O. Absil @ ULg, V. Christiaens'
__all__ = ['snr',
'snrmap',
'indep_ap_centers',
'significance',
'frame_report']
import numpy as np
try:
from photutils.aperture import aperture_photometry, CircularAperture
except:
from photutils import aperture_photometry, CircularAperture
from scipy.stats import norm, t
from hciplot import plot_frames
from skimage.draw import disk, circle_perimeter
from matplotlib import pyplot as plt
from astropy.convolution import convolve, Tophat2DKernel
from astropy.stats import median_absolute_deviation as mad
from multiprocessing import cpu_count
from ..config.utils_conf import pool_map, iterable, sep
from ..config import time_ini, timing, check_array
from ..var import get_annulus_segments, frame_center, dist
[docs]
def snrmap(array, fwhm, approximated=False, plot=False, known_sources=None,
nproc=None, array2=None, use2alone=False,
exclude_negative_lobes=False, verbose=True, **kwargs):
"""Parallel implementation of the S/N map generation function. Applies the
S/N function (small samples penalty) at each pixel.
The S/N is computed as in [MAW14]_ for each radial separation.
**DISCLAIMER**:
Signal-to-noise ratio is not significance! For a conversion from SNR to
n-sigma (i.e. the equivalent confidence level of a Gaussian n-sigma), use
the ``significance`` function.
Parameters
----------
array : numpy ndarray
Input frame (2d array).
fwhm : float
Size in pixels of the FWHM.
approximated : bool, optional
If True, an approximated S/N map is generated.
plot : bool, optional
If True plots the S/N map. False by default.
known_sources : None, tuple or tuple of tuples, optional
To take into account existing sources. It should be a tuple of float/int
or a tuple of tuples (of float/int) with the coordinate(s) of the known
sources.
nproc : int or None
Number of processes for parallel computing.
array2 : numpy ndarray, optional
Additional image (e.g. processed image with negative derotation angles)
enabling to have more noise samples. Should have the
same dimensions as array.
use2alone: bool, optional
Whether to use array2 alone to estimate the noise (might be useful to
estimate the snr of extended disk features).
verbose: bool, optional
Whether to print timing or not.
**kwargs : dictionary, optional
Arguments to be passed to ``plot_frames`` to customize the plot (and to
save it to disk).
Returns
-------
snrmap : 2d numpy ndarray
Frame with the same size as the input frame with each pixel.
"""
if verbose:
start_time = time_ini()
check_array(array, dim=2, msg='array')
sizey, sizex = array.shape
snrmap_array = np.zeros_like(array)
width = min(sizey, sizex) / 2 - 1.5*fwhm
mask = get_annulus_segments(array, (fwhm / 2) + 1, width, mode="mask")[0]
mask = np.ma.make_mask(mask)
# by making a bool mask *after* applying the mask to the array, we also mask
# out zero values from the array. This logic cannot be simplified by using
# mode="ind"!
yy, xx = np.where(mask)
coords = zip(xx, yy)
if nproc is None:
nproc = cpu_count() // 2 # Hyper-threading doubles the # of cores
if known_sources is None:
# proxy to S/N calculation
if approximated:
cy, cx = frame_center(array)
tophat_kernel = Tophat2DKernel(fwhm / 2)
array = convolve(array, tophat_kernel)
width = min(sizey, sizex) / 2 - 1.5 * fwhm
mask = get_annulus_segments(array, (fwhm / 2) + 1, width - 1,
mode="mask")[0]
mask = np.ma.make_mask(mask)
yy, xx = np.where(mask)
coords = [(int(x), int(y)) for (x, y) in zip(xx, yy)]
res = pool_map(nproc, _snr_approx, array, iterable(coords), fwhm,
cy, cx)
res = np.array(res, dtype=object)
yy = res[:, 0]
xx = res[:, 1]
snr_value = res[:, 2]
snrmap_array[yy.astype(int), xx.astype(int)] = snr_value
# computing s/n map with Mawet+14 definition
else:
res = pool_map(nproc, snr, array, iterable(coords), fwhm, True,
array2, use2alone, exclude_negative_lobes)
res = np.array(res, dtype=object)
yy = res[:, 0]
xx = res[:, 1]
snr_value = res[:, -1]
snrmap_array[yy.astype('int'), xx.astype('int')] = snr_value
# masking known sources
else:
if not isinstance(known_sources, tuple):
raise TypeError("`known_sources` must be a tuple or tuple of "
"tuples")
else:
source_mask = np.zeros_like(array)
if isinstance(known_sources[0], tuple):
for coor in known_sources:
source_mask[coor[::-1]] = 1
elif isinstance(known_sources[0], int):
source_mask[known_sources[1], known_sources[0]] = 1
else:
raise TypeError("`known_sources` seems to have wrong type. It "
"must be a tuple of ints or tuple of tuples "
"(of ints)")
# checking the mask with the sources
if source_mask[source_mask == 1].shape[0] > 50:
msg = 'Input source mask is too crowded (check its validity)'
raise RuntimeError(msg)
soury, sourx = np.where(source_mask == 1)
sources = []
coor_ann = []
arr_masked_sources = array.copy()
centery, centerx = frame_center(array)
for y, x in zip(soury, sourx):
radd = dist(centery, centerx, int(y), int(x))
if int(radd) < centery - np.ceil(fwhm):
sources.append((y, x))
for source in sources:
y, x = source
radd = dist(centery, centerx, int(y), int(x))
anny, annx = get_annulus_segments(array, int(radd-fwhm),
int(np.round(3 * fwhm)))[0]
ciry, cirx = disk((y, x), int(np.ceil(fwhm)))
# masking the sources positions (using the MAD of pixels in annulus)
arr_masked_sources[ciry, cirx] = mad(array[anny, annx])
# S/Ns of annulus without the sources
coor_ann = [(x, y) for (x, y) in zip(annx, anny) if (x, y) not in
zip(cirx, ciry)]
res = pool_map(nproc, snr, arr_masked_sources, iterable(coor_ann),
fwhm, True, array2, use2alone,
exclude_negative_lobes)
res = np.array(res, dtype=object)
yy_res = res[:, 0]
xx_res = res[:, 1]
snr_value = res[:, 4]
snrmap_array[yy_res.astype('int'), xx_res.astype('int')] = snr_value
coor_ann += coor_ann
# S/Ns of the rest of the frame without the annulus
coor_rest = [(x, y) for (x, y) in zip(xx, yy) if (x, y) not in coor_ann]
res = pool_map(nproc, snr, array, iterable(coor_rest), fwhm, True,
array2, use2alone, exclude_negative_lobes)
res = np.array(res, dtype=object)
yy_res = res[:, 0]
xx_res = res[:, 1]
snr_value = res[:, 4]
snrmap_array[yy_res.astype('int'), xx_res.astype('int')] = snr_value
if plot:
plot_frames(snrmap_array, colorbar=True, title='S/N map', **kwargs)
if verbose:
print("S/N map created using {} processes".format(nproc))
timing(start_time)
return snrmap_array
def _snr_approx(array, source_xy, fwhm, centery, centerx):
"""
array - frame convolved with top hat kernel
"""
sourcex, sourcey = source_xy
rad = dist(centery, centerx, sourcey, sourcex)
ind_aper = disk((sourcey, sourcex), fwhm/2.)
# noise : STDDEV in convolved array of 1px wide annulus (while
# masking the flux aperture) * correction of # of resolution elements
ind_ann = circle_perimeter(int(centery), int(centerx), int(rad))
array2 = array.copy()
array2[ind_aper] = mad(array[ind_ann]) # mask
n2 = (2 * np.pi * rad) / fwhm - 1
noise = array2[ind_ann].std(ddof=1) * np.sqrt(1+(1/n2))
# signal : central px minus the mean of the pxs (masked) in 1px annulus
signal = array[sourcey, sourcex] - array2[ind_ann].mean()
snr_value = signal / noise
return sourcey, sourcex, snr_value
[docs]
def indep_ap_centers(array, source_xy, fwhm, exclude_negative_lobes=False,
exclude_theta_range=None, no_gap=False):
"""
Define independent aperture centers at a given radial separation, starting
from a test location provided with source_xy.
Parameters
----------
array : numpy ndarray, 2d
Frame in which the apertures will be defined (its dimensions are used).
source_xy : tuple of floats
X and Y coordinates of the planet or test speckle.
fwhm : float
Size in pixels of the FWHM, corresponding to the diameter of the
non-overlapping apertures.
exclude_negative_lobes : bool, opt
Whether to include the adjacent aperture lobes to the tested location
or not. Can be set to True if the image shows significant neg lobes.
exclude_theta_range : tuple of 2 floats or None, opt
If provided, range of trigonometric angles in deg (measured from
positive x axis), to be avoided for apertures used for noise estimation.
WARNING: this is to be used wisely, e.g. only if a known authentic
circumstellar signal is biasing the SNR estimate.
no_gap: bool, opt
Whether an overlapping aperture is defined between the first and last
non-overlapping aperture (at the end of making a full circle), in order
to leave no gap. False by default.
Returns
-------
(yy, xx) : tuple of 2 numpy ndarray
Tuple containing y and x coordinates of the apertures
"""
sourcex, sourcey = source_xy
centery, centerx = frame_center(array)
sep = dist(centery, centerx, float(sourcey), float(sourcex))
theta_0 = np.rad2deg(np.arctan2(sourcey - centery, sourcex - centerx))
if exclude_theta_range is not None:
exc_theta_range = list(exclude_theta_range)
if not sep > (fwhm / 2) + 1:
raise RuntimeError('`source_xy` is too close to the frame center')
# sens = 'clock' # counterclock
# assumes clockwise rotation when building test apertures
# change sign and conditions if counterclockwise
sign = -1
if exclude_theta_range is not None:
if theta_0 > exc_theta_range[0] and theta_0 < exc_theta_range[1]:
exc_theta_range[0] += 360
while theta_0 < exc_theta_range[1]:
theta_0 += 360
theta = theta_0
angle = np.arcsin(fwhm / 2. / sep) * 2
number_apertures = int(np.floor(2 * np.pi / angle))
if no_gap:
# if requested, add an (overlapping) aperture to avoid a gap
number_apertures += 1
yy = []
xx = []
yy_all = np.zeros(number_apertures)
xx_all = np.zeros(number_apertures)
cosangle = np.cos(angle)
sinangle = np.sin(angle)
xx.append(sourcex - centerx)
yy.append(sourcey - centery)
xx_all[0] = sourcex - centerx
yy_all[0] = sourcey - centery
for i in range(number_apertures - 1):
xx_all[i + 1] = cosangle * xx_all[i] - sign * sinangle * yy_all[i]
yy_all[i + 1] = cosangle * yy_all[i] + sign * sinangle * xx_all[i]
theta += sign * np.rad2deg(angle)
if exclude_negative_lobes and (i == 0 or i == number_apertures - 2):
continue
if exclude_theta_range is None:
xx.append(cosangle * xx_all[i] - sign * sinangle * yy_all[i])
yy.append(cosangle * yy_all[i] + sign * sinangle * xx_all[i])
else:
if theta < exc_theta_range[0] or theta > exc_theta_range[1]:
xx.append(cosangle * xx_all[i] - sign * sinangle * yy_all[i])
yy.append(cosangle * yy_all[i] + sign * sinangle * xx_all[i])
xx = np.array(xx)
yy = np.array(yy)
xx += centerx
yy += centery
return yy, xx
[docs]
def snr(array, source_xy, fwhm, full_output=False, array2=None, use2alone=False,
exclude_negative_lobes=False, exclude_theta_range=None, plot=False,
verbose=False):
"""
Calculate the S/N (signal to noise ratio) of a test resolution element
in a residual frame (e.g. post-processed with LOCI, PCA, etc). Implements
the approach described in [MAW14]_ on small sample statistics,
where a student t-test (eq. 9) can be used to determine S/N (and contrast)
in high contrast imaging. 3 extra possibilities compared to [MAW14]_:
* possibility to provide a second array (e.g. obtained with opposite \
derotation angles) to have more apertures for noise estimation;
* possibility to exclude negative ADI lobes directly adjacent to the \
tested xy location, to not bias the noise estimate;
* possibility to use only the second array for the noise estimation \
(useful for images containing a lot of disk/extended signals).
*** DISCLAIMER ***
Signal-to-noise ratio is not significance! For a conversion from snr to
n-sigma (i.e. the equivalent confidence level of a Gaussian n-sigma), use
the significance() function.
Parameters
----------
array : numpy ndarray, 2d
Post-processed frame where we want to measure S/N.
source_xy : tuple of floats
X and Y coordinates of the planet or test speckle.
fwhm : float
Size in pixels of the FWHM.
full_output : bool, optional
If True returns back the S/N value, the y, x input coordinates, noise
and flux.
array2 : None or numpy ndarray, 2d, optional
Additional image (e.g. processed image with negative derotation angles)
enabling to have more apertures for noise estimation at each radial
separation. Should have the same dimensions as array.
use2alone : bool, opt
Whether to use array2 alone to estimate the noise (can be useful to
estimate the S/N of extended disk features)
exclude_negative_lobes : bool, opt
Whether to include the adjacent aperture lobes to the tested location
or not. Can be set to True if the image shows significant neg lobes.
exclude_theta_range : tuple of 2 floats or None, opt
If provided, range of trigonometric angles in deg (measured from
positive x axis), to be avoided for apertures used for noise estimation.
WARNING: this is to be used wisely, e.g. only if a known authentic
circumstellar signal is biasing the SNR estimate.
plot : bool, optional
Plots the frame and the apertures considered for clarity.
verbose: bool, optional
Chooses whether to print some output or not.
Returns
-------
[if full_output=True:]
sourcey : numpy ndarray
[full_output=True] Input coordinates (``source_xy``) in Y.
sourcex : numpy ndarray
[full_output=True] Input coordinates (``source_xy``) in X.
f_source : float
[full_output=True] Flux in test elemnt.
fluxes : numpy ndarray
[full_output=True] Background apertures fluxes.
[always:]
snr_vale : float
Value of the S/N for the given test resolution element.
"""
check_array(array, dim=2, msg='array')
if not isinstance(source_xy, tuple):
raise TypeError("`source_xy` must be a tuple of floats")
if array2 is not None:
if not array2.shape == array.shape:
raise TypeError('`array2` has not the same shape as input array')
sourcex, sourcey = source_xy
yy, xx = indep_ap_centers(array, source_xy, fwhm, exclude_negative_lobes,
exclude_theta_range)
rad = fwhm/2.
apertures = CircularAperture(zip(xx, yy), r=rad) # Coordinates (X,Y)
fluxes = aperture_photometry(array, apertures, method='exact')
fluxes = np.array(fluxes['aperture_sum'])
if array2 is not None:
fluxes2 = aperture_photometry(array2, apertures, method='exact')
fluxes2 = np.array(fluxes2['aperture_sum'])
if use2alone:
fluxes = np.concatenate(([fluxes[0]], fluxes2[:]))
else:
fluxes = np.concatenate((fluxes, fluxes2))
f_source = fluxes[0].copy()
fluxes = fluxes[1:]
n2 = fluxes.shape[0]
backgr_apertures_std = fluxes.std(ddof=1)
snr_vale = (f_source - fluxes.mean())/(backgr_apertures_std *
np.sqrt(1+(1/n2)))
if verbose:
msg1 = 'S/N for the given pixel = {:.3f}'
msg2 = 'Integrated flux in FWHM test aperture = {:.3f}'
msg3 = 'Mean of background apertures integrated fluxes = {:.3f}'
msg4 = 'Std-dev of background apertures integrated fluxes = {:.3f}'
print(msg1.format(snr_vale))
print(msg2.format(f_source))
print(msg3.format(fluxes.mean()))
print(msg4.format(backgr_apertures_std))
if plot:
_, ax = plt.subplots(figsize=(6, 6))
ax.imshow(array, origin='lower', interpolation='nearest', alpha=0.5,
cmap='gray')
for i in range(xx.shape[0]):
# Circle takes coordinates as (X,Y)
aper = plt.Circle((xx[i], yy[i]), radius=fwhm/2., color='r',
fill=False, alpha=0.8)
ax.add_patch(aper)
cent = plt.Circle((xx[i], yy[i]), radius=0.8, color='r', fill=True,
alpha=0.5)
ax.add_patch(cent)
aper_source = plt.Circle((sourcex, sourcey), radius=0.7,
color='b', fill=True, alpha=0.5)
ax.add_patch(aper_source)
ax.grid(False)
plt.show()
if full_output:
return sourcey, sourcex, f_source, fluxes, snr_vale
else:
return snr_vale
[docs]
def significance(snr, rad, fwhm, student_to_gauss=True, verbose=True):
"""Convert a S/N ratio (measured as in [MAW14]_) into a Gaussian\
significance (n-sigma) with equivalent false alarm probability for a\
point-source detection measured at a given separation, or the opposite.
Parameters
----------
snr : float or numpy array
SNR value(s)
rad : float or numpy array
Radial separation(s) from the star in pixels. If an array, it should be
the same shape as snr and provide the radial separation corresponding
to each snr measurement.
fwhm : float
Full Width Half Maximum of the PSF.
student_to_gauss : bool, optional
Whether the conversion is from Student SNR to Gaussian significance
(True), or the opposite (False).
Returns
-------
sig : float
Equivalent Gaussian significance [student_to_gauss=True] or equivalent
Student S/N ratio [student_to_gauss=False].
"""
if student_to_gauss:
sig = norm.ppf(t.cdf(snr, (rad/fwhm)*2*np.pi-2))
if t.cdf(snr, (rad/fwhm)*2*np.pi-2) == 1.0:
print("Warning high S/N! cdf>0.9999999999999999 is rounded to 1")
msg = "Returning 8.2 sigma, but quote significance > 8.2 sigma."
print(msg)
return 8.2
if verbose:
msg = r"At a separation of {:.1f} px ({:.1f} FWHM), S/N = {:.1f} "
msg += r"corresponds to a {:.1f}-sigma detection in terms of "
msg += r"Gaussian false alarm probability."
print(msg.format(rad, rad/fwhm, snr, sig))
else:
sig = t.ppf(norm.cdf(snr), (rad/fwhm)*2*np.pi-2)
if verbose:
msg = r"At a separation of {:.1f} px ({:.1f} FWHM), a {:.1f}-sigma "
msg += r"detection in terms of Gaussian false alarm probability "
msg += r"translates into a Student S/N = {:.1f}."
print(msg.format(rad, rad/fwhm, snr, sig))
return sig
[docs]
def frame_report(array, fwhm, source_xy=None, verbose=True, **snr_arguments):
"""Provide info about candidate companions in a given post-processed frame.
Either a list of source positions is passed, or the position with the
highest is automatically considered. Integrated flux in aperture, S/N of\
central pixel (either ``source_xy`` or at max S/N value), mean S/N in
aperture at those/that location.
Parameters
----------
array : numpy ndarray
2d array or input frame.
fwhm : float
Size of the FWHM in pixels.
source_xy : tuple of floats or list (of tuples of floats)
X and Y coordinates of the center(s) of the source(s).
verbose : bool, optional
If True prints to stdout the frame info.
snr_arguments: dictionary, optional
Optional parameters for the ``vip_hci.metrics.snrmap`` function.
Returns
-------
source_xy : tuple of floats or list (of tuples of floats)
X and Y coordinates of the center(s) of the source(s).
obj_flux : list of floats
Integrated flux in aperture.
snr_centpx : list of floats
S/N of the ``source_xy`` pixels.
meansnr_pixels : list of floats
Mean S/N of pixels in 1xFWHM apertures centered on ``source_xy``.
"""
if array.ndim != 2:
raise TypeError('Array is not 2d.')
obj_flux = []
meansnr_pixels = []
snr_centpx = []
if source_xy is not None:
if isinstance(source_xy, (list, tuple)):
if not isinstance(source_xy[0], tuple):
source_xy = [source_xy]
else:
raise TypeError("`source_xy` must be a tuple of floats or tuple "
"of tuples")
for xy in source_xy:
x, y = xy
if verbose:
print(sep)
print('Coords of chosen px (X,Y) = {:.1f}, {:.1f}'.format(x, y))
# we get integrated flux on aperture with diameter=1FWHM
aper = CircularAperture((x, y), r=fwhm / 2.)
obj_flux_i = aperture_photometry(array, aper, method='exact')
obj_flux_i = obj_flux_i['aperture_sum'][0]
# we get the mean and stddev of SNRs on aperture
yy, xx = disk((y, x), fwhm / 2)
snr_pixels_i = [snr(array, (x_, y_), fwhm, plot=False,
verbose=False) for y_, x_ in zip(yy, xx)]
meansnr_i = np.mean(snr_pixels_i)
stdsnr_i = np.std(snr_pixels_i, ddof=1)
pxsnr_i = snr(array, (x, y), fwhm, plot=False, verbose=False)
obj_flux.append(obj_flux_i)
meansnr_pixels.append(meansnr_i)
snr_centpx.append(pxsnr_i)
if verbose:
msg0 = 'Flux in a centered 1xFWHM circular aperture = {:.3f}'
print(msg0.format(obj_flux_i))
print('Central pixel S/N = {:.3f}'.format(pxsnr_i))
print(sep)
print('Inside a centered 1xFWHM circular aperture:')
msg1 = 'Mean S/N (shifting the aperture center) = {:.3f}'
print(msg1.format(meansnr_i))
msg2 = 'Max S/N (shifting the aperture center) = {:.3f}'
print(msg2.format(np.max(snr_pixels_i)))
msg3 = 'stddev S/N (shifting the aperture center) = {:.3f}'
print(msg3.format(stdsnr_i))
print('')
else:
snr_map = snrmap(array, fwhm, **snr_arguments)
y, x = np.where(snr_map == np.nanmax(snr_map))
y, x = y[0], x[0] # assuming there is only one max, taking 1st if clump
source_xy = (x, y)
if verbose:
print(sep)
print('Coords of Max px (X,Y) = {:.1f}, {:.1f}'.format(x, y))
# we get integrated flux on aperture with diameter=1FWHM
aper = CircularAperture((x, y), r=fwhm / 2.)
obj_flux_i = aperture_photometry(array, aper, method='exact')
obj_flux_i = obj_flux_i['aperture_sum'][0]
# we get the mean and stddev of SNRs on aperture
yy, xx = disk((y, x), fwhm / 2.)
snr_pixels_i = [snr(array, (x_, y_), fwhm, plot=False,
verbose=False) for y_, x_ in zip(yy, xx)]
meansnr_pixels = np.mean(snr_pixels_i)
stdsnr_i = np.std(snr_pixels_i, ddof=1)
pxsnr_i = snr(array, (x, y), fwhm, plot=False, verbose=False)
obj_flux.append(obj_flux_i)
snr_centpx.append(pxsnr_i)
if verbose:
msg0 = 'Flux in a centered 1xFWHM circular aperture = {:.3f}'
print(msg0.format(obj_flux_i))
print('Central pixel S/N = {:.3f}'.format(pxsnr_i))
print(sep)
print('Inside a centered 1xFWHM circular aperture:')
msg1 = 'Mean S/N (shifting the aperture center) = {:.3f}'
print(msg1.format(meansnr_pixels))
msg2 = 'Max S/N (shifting the aperture center) = {:.3f}'
print(msg2.format(np.max(snr_pixels_i)))
msg3 = 'stddev S/N (shifting the aperture center) = {:.3f}'
print(msg3.format(stdsnr_i))
print(sep)
return source_xy, obj_flux, snr_centpx, meansnr_pixels