"""
Implementation of the PACO algorithm for VIP, based on [FLA18]_.
Variable naming is based on the notation of [FLA18]_, see table 1 in the paper
for a description.
Last updated 2022-05-09 by Evert Nasedkin (nasedkinevert@gmail.com).
.. [FLA18]
| Flasseur et al. 2018
| **Exoplanet detection in angular differential imaging by statistical
learning of the nonstationary patch covariances. The PACO algorithm**
| *Astronomy & Astrophysics, Volume 618, p. 138*
| `https://ui.adsabs.harvard.edu/abs/2018A%26A...618A.138F/abstract
<https://ui.adsabs.harvard.edu/abs/2018A%26A...618A.138F/abstract>`_
"""
import sys
#import os
from abc import abstractmethod
# Required so numpy parallelization doesn't conflict with multiprocessing
# os.environ["MKL_NUM_THREADS"] = "1"
# os.environ["NUMEXPR_NUM_THREADS"] = "1"
# os.environ["OMP_NUM_THREADS"] = "1"
#from multiprocessing import Pool
from typing import Tuple, Union, Optional, Callable
import numpy as np
from scipy import ndimage
from scipy.ndimage import filters
from ..config.utils_conf import pool_map, iterable
from ..preproc.rescaling import frame_px_resampling, cube_px_resampling, frame_shift
from ..var.coords import cart_to_pol, pol_to_cart
from ..metrics.detection import detection
from ..fm import normalize_psf
__author__ = "Evert Nasedkin"
__all__ = ['FastPACO',
'FullPACO']
class PACO:
"""
This class implements the bulk of the PACO algorithm as described in
[FLA18]_. In general, the idea is to take in an ADI stack of images and
statistically determine if there is a signal above the background in each
'patch' of the image. This is done by tracing the ark of the hypothesized
planet through the stack, and comparing this set of patches to a set
consisting of background only. This is done for each pixel (or sub-pixel)
location in the image. The output is a signal-to-noise and/or a flux map
over the field of view. The user can choose to use FullPACO or FastPACO,
which are described by algorithms 1 and 2 of [FLA18]_. FastPACO has been
parallelized, and is the recommended usage.
This output can then be used to compute an unbiased estimate of the flux
of point sources detected in the image above some user-supplied detection
threshold.
Parameters
----------
cube : numpy.ndarray
3D science frames taken in pupil tracking/ADI mode.
Dimensions should be (time, x, y), and units should be detector units
(ie output of SPHERE or GPI reduction pipelines). The data should be
centered, and have pre-processing already applied (e.g. bad pixel
correction).
angles : numpy.ndarray
List of parallactic angles for each frame in degrees. Length of this
array should be the same as the time axis of the science cube.
psf : numpy.ndarray
Unsaturated PSF image. If a cube is provided, the median of the cube
will be used.
dit_psf : float, optional
Integration time of the unsaturated PSF in seconds. The PSF is
normalised to dit_science/dit_psf/nd_transmission.
dit_science : float, optional
Integration time of the science frames in seconds. The PSF is normalised
to dit_science/dit_psf/nd_transmission.
nd_transmission : float, optional
Transmission of an ND filter used to aquire the unsaturated PSF. The PSF
is normalised to dit_science/dit_psf/nd_transmission.
fwhm : float, optional
FWHM of PSF in arcseconds. Default values give 4px radius.
pixscale : float, optional
Detector pixel scale in arcseconds per pixel. Default values give 4px
radius.
rescaling_factor : float, optional
Scaling for sub/super pixel resolution for PACO. Will rescale both the
science cube and the PSF.
verbose : bool, optional
Sets level of printed outputs.
"""
def __init__(self,
cube: np.ndarray,
angles: np.ndarray,
psf: np.ndarray,
dit_psf: Optional[float] = 1.0,
dit_science: Optional[float] = 1.0,
nd_transmission: Optional[float] = 1.0,
fwhm: Optional[float] = 4.0,
pixscale: Optional[float] = 1.0,
rescaling_factor: Optional[float] = 1.0,
verbose: Optional[bool] = False) -> None:
# Science image setup
try:
self.cube = cube
except BaseException:
raise ValueError("You must provide a 3D cube of science data!")
self.num_frames = self.cube.shape[0]
self.width = self.cube.shape[2]
self.height = self.cube.shape[1]
# Parallactic angles
try:
self.angles = angles
except BaseException:
raise ValueError("You must provide an array of parallactic angles!")
# Pixel scaling
self.pixscale = pixscale
self.rescaling_factor = rescaling_factor
# PSF setup
self.fwhm = int(fwhm/pixscale)
try:
# How do we want to deal with stacks of psfs? Median? Just take the first one?
# Ideally if nPSFs = nImages, use each for each. Need to update!
if len(psf.shape) > 2:
psf = np.nanmedian(psf, axis=0)
self.psf = psf * dit_science/dit_psf/nd_transmission
self.dit_science = dit_science
self.dit_psf = dit_psf
mask = create_boolean_circular_mask(self.cube[0].shape,
radius=self.fwhm)
self.patch_area_pixels = self.cube[0][mask].ravel().shape[0]
except BaseException:
raise ValueError("You must provide an unsaturated PSF image!")
self.patch_width = 2*int(self.fwhm) + 3
self.verbose = verbose
# These are what we're calculating
self.snr = None
self.flux = None
self.std = None
# Diagnostics
if self.verbose:
print("---------------------- ")
print("Summary of PACO setup: \n")
print(f"Image Cube shape = {self.cube.shape}")
print(f"PIXSCALE = {self.pixscale:06}")
print("PSF | Area | Rad | Width | ")
print(f" | {self.patch_area_pixels:01} |"
+ f" {self.fwhm:02} | {self.psf.shape[0]:03} | ")
print(f"Patch width: {self.patch_width}")
print("---------------------- \n")
sys.stdout.flush()
@abstractmethod
def PACOCalc(self,
phi0s: np.ndarray,
use_subpixel_psf_astrometry: Optional[bool] = True,
cpu: Optional[int] = 1) -> None:
"""
This function is algorithm dependant, and sets up the actual
calculation process.
Parameters
----------
phi0s : numpy.ndarray
Array of pixel coordinates to try to search for the planet signal.
Typically a grid created using numpy.meshgrid.
use_subpixel_psf_astrometry : bool
If true, the PSF model for each patch is shifted to the correct
location as predicted by the starting location and the parallactic
angles, before being resampled for the patch. If false, the PSF
model is simply located at the center of each patch. Significantly
improves performance if set to False, but the SNR is reduced.
cpu : int, optional
Number of cpus to use for parallelization.
Returns
-------
a : numpy.ndarray
a_l from Equation 15 of [FLA18]_
b : numpy.ndarray
b_l from Equation 16 of [FLA18]_
"""
def run(self,
cpu: Optional[int] = 1,
imlib: Optional[str] = 'vip-fft',
interpolation: Optional[str] = 'lanczos4',
keep_center: Optional[bool] = True,
use_subpixel_psf_astrometry: Optional[bool] = True) -> Tuple[np.ndarray, np.ndarray]:
"""
Run method of the PACO class. This function wraps up the PACO
calculation steps, and returns the snr and flux estimates as
outputs. The image library arguments are used if a rescaling of the
data is desired: upsampling the images will result in a better SNR
detection, but will be slower.
Parameters
----------
cpu : int, optional
Number of processers to use
imlib : str, optional
See the documentation of the ``vip_hci.preproc.frame_px_resampling``
function.
interpolation : str, optional
See the documentation of the ``vip_hci.preproc.frame_px_resampling``
function.
keep_center: bool, opt
If input dimensions are even and the star centered (i.e. on
dim//2, dim//2), whether to keep the star centered after scaling, i.e.
on (new_dim//2, new_dim//2). For a non-centered input cube, better to
leave it to False.
use_subpixel_psf_astrometry : bool
If true, the PSF model for each patch is shifted to the correct
location as predicted by the starting location and the parallactic
angles, before being resampled for the patch. If false, the PSF
model is simply located at the center of each patch. Significantly
improves performance if set to False, but the SNR is reduced.
Returns
-------
snr : numpy.ndarray
2D map of the signal-to-noise estimate as computed by PACO.
This is b/sqrt(a), as in eqn 24 of [FLA18]_.
flux : numpy.ndarray
2D map of the flux estimate as computed by PACO
This is b/a, as in eqn 21 of [FLA18]_.
"""
if self.rescaling_factor != 1:
self.rescale_cube_and_psf(imlib=imlib,
interpolation=interpolation,
keep_center=keep_center,
verbose=self.verbose)
if self.verbose:
print("---------------------- ")
print(f"Using {cpu} processor(s).")
print(f"Rescaled Image Cube shape: {self.cube.shape}")
print("Rescaled PSF:")
print("PSF | Area | Rad | Width | ")
print(f" | {self.patch_area_pixels:01} |"
+ f" {self.fwhm:02} | {self.psf.shape[0]:03} | ")
print("---------------------- \n")
# Setup pixel coordinates
x, y = np.meshgrid(np.arange(0, self.height),
np.arange(0, self.width))
phi0s = np.column_stack((x.flatten(), y.flatten()))
# Compute a,b
a, b = self.PACOCalc(np.array(phi0s), cpu=cpu)
# Reshape into a 2D image, with the same dimensions as the input images
a = np.reshape(a, (self.height, self.width))
b = np.reshape(b, (self.height, self.width))
# Output arrays
snr = b/np.sqrt(a)
flux = b/a
self.snr = snr
self.flux = flux
self.std = 1/np.sqrt(a)
return snr, flux
"""
Utility Functions
"""
# Set the image stack to be processed
def set_cube(self, cube: np.ndarray) -> None:
"""
Provide a 3D image array to process. This updates
the science cube, and the associated dimensions.
Parameters
----------
cube : numpy.ndarray
3D science frames taken in pupil tracking/ADI mode.
Dimensions should be (time, x, y), and units should be detector
units (ie output of SPHERE or GPI reduction pipelines). The data
should be centered, and have pre-processing already applied (e.g.
bad pixel correction).
"""
self.cube = np.array(cube)
self.num_frames = self.cube.shape[0]
self.width = self.cube.shape[2]
self.height = self.cube.shape[1]
# Set the template PSF
def set_psf(self, psf: np.ndarray) -> None:
"""
Read in the PSF template
Parameters
----------
psf: numpy.ndarray
An unsaturated psf to use as the template.
"""
self.psf = psf
# Set parallactic angles
def set_angles(self, angles: np.ndarray) -> None:
"""
Set the rotation angle for each frame
Parameters
----------
angles: numpy.ndarray
A list of the parallactic angles for each frame of the science data.
"""
self.angles = angles
def get_patch(self,
px: Tuple[int, int],
width: Optional[int] = None,
mask: Optional[np.ndarray] = None) -> np.ndarray:
"""
Gets patch at given pixel px with size k for the current img sequenc
Parameters
----------
px : Tuple[int, int]
Pixel coordinates for center of patch
width : int
width of a square patch to be masked
Returns
-------
patch : numpy.ndarray
A PACO "patch". This is a column through the time dimension of the
unrotated frames, used to build the background statistics at the
location of a given pixel.
"""
if width is None:
width = self.patch_width
if mask is None:
mask = create_boolean_circular_mask(self.cube[0].shape,
radius=self.fwhm,
center=px)
k = int(width/2)
if width % 2 != 0:
k2 = k+1
else:
k2 = k
nx, ny = np.shape(self.cube[0])[:2]
if px[0]+k2 > nx or px[0]-k < 0 or px[1]+k2 > ny or px[1]-k < 0:
return np.ones((self.num_frames, self.patch_area_pixels))*np.nan
patch = self.cube[np.broadcast_to(mask, self.cube.shape)]\
.reshape(self.num_frames, self.patch_area_pixels)
return patch
def set_scale(self, scale: float) -> None:
"""
Set subpixel scaling factor
Parameters
----------
scale : float
Scaling factor. Greater than one will result in an upsampled image,
less than one will result in a downsampled image.
"""
self.rescaling_factor = scale
def rescale_cube_and_psf(self,
imlib: Optional[str] = 'vip-fft',
interpolation: Optional[str] = 'lanczos4',
keep_center: Optional[bool] = True) -> None:
"""
Rescale each image in the stack by the class level scaling factor
set during initialization or with set_scale. A scale factor of greater
than one will upsample the image, a factor of less than one will downsample
the image. This function wraps the VIP scaling function, and uses the same
arguments to choose libraries and interpolation methods.
Parameters
----------
imlib : str, optional
See the documentation of the ``vip_hci.preproc.frame_px_resampling``
function.
interpolation : str, optional
See the documentation of the ``vip_hci.preproc.frame_px_resampling``
function.
keep_center: bool, opt
If input dimensions are even and the star centered (i.e. on
dim//2, dim//2), whether to keep the star centered after scaling, i.e.
on (new_dim//2, new_dim//2). For a non-centered input cube, better to
leave it to False.
"""
if self.rescaling_factor == 1:
if self.verbose:
print("Scale is 1, no scaling applied.")
return
# Resample the science cube
cube_px_resampling(self.cube,
self.rescaling_factor,
imlib=imlib,
interpolation=interpolation,
keep_center=keep_center,
verbose=False)
self.pixscale = self.pixscale/self.rescaling_factor
self.fwhm = int(self.fwhm*self.rescaling_factor)
# Resample the PSF
if self.psf is not None:
self.psf = frame_px_resampling(self.psf,
self.rescaling_factor,
imlib=imlib,
interpolation=interpolation,
keep_center=keep_center,
verbose=False)
mask = create_boolean_circular_mask(self.psf.shape, self.fwhm)
self.patch_area_pixels = self.psf[mask].shape[0]
self.patch_width = 2*int(self.fwhm) + 3
"""
Math Functions
"""
def psf_model_function(self, mean: float, model: Callable,
params: dict) -> np.ndarray:
"""
This function is deprecated in favour of directly supplying
a PSF. In principle, an analytic model (ie a gaussian or moffat PSF)
can be used in place of a measured unsaturated PSF.
Parameters
----------
mean : float
If using the psfTemplateModel function, the mean
model : dnc
numpy statistical model (need to import numpy module for this)
**kwargs: dict
additional arguments for model
Returns
-------
self.psf : numpy.ndarray
Returns the PSF template used by PACO
"""
if self.psf:
return self.psf
if model is None:
print("Please input either a 2D PSF or a model function.")
sys.exit(1)
else:
if model.__name__ == "psfTemplateModel":
try:
self.psf = model(mean, params)
return self.psf
except ValueError:
print("Fix template size")
self.psf = model(mean, params)
return self.psf
def al(self,
hfl: Union[list, np.ndarray],
Cfl_inv: Union[list, np.ndarray],
method: Optional[str] = "") -> np.ndarray:
"""
a_l
The sum of a_l is the inverse of the variance of the background at the given pixel.
Einsum can get slow with large tensors, and may not actually be faster.
If einsum is used, arguments must be numpy arrays, otherwise lists.
Parameters
----------
hfl : list
This is a list of flattened psf templates
Cfl_inv : list
This is a list of inverse covariance matrices
method: string
Can be empty or "einsum". This determines the method
used to do the matrix operations. "einsum" is slower for large arrays.
Returns
-------
a : numpy.ndarray
a_l from equation 15 of [FLA18]_.
"""
if method == "einsum":
d1 = np.einsum('ijk,gj', Cfl_inv, hfl)
return np.einsum('ml,ml', hfl, np.diagonal(d1).T)
a = np.sum(np.array([np.dot(hfl[i], np.dot(Cfl_inv[i], hfl[i]).T)
for i in range(len(hfl))]), axis=0)
return a
def bl(self, hfl: Union[list, np.ndarray],
Cfl_inv: Union[list, np.ndarray],
r_fl: Union[list, np.ndarray],
m_fl: Union[list, np.ndarray],
method: Optional[str] = "") -> np.ndarray:
"""
b_l
The sum of b_l is the flux estimate at the given pixel.
Einsum can get slow with large tensors, and may not actually be faster.
If einsum is used, arguments must be numpy arrays, otherwise lists.
Parameters
----------
hfl : numpy.ndarray
This is an array of flattened psf templates.
Cfl_inv : numpy.ndarray
This is an array of inverse covariance matrices.
r_fl : numpy.ndarray
This is an array of flux measurements following the predicted path.
m_fl : numpy.ndarray
This is an array of mean background statistics for each location in the path.
method: string
Can be empty or "einsum". This determines the method
used to do the matrix operations. "einsum" is slower for large arrays.
Returns
-------
b : numpy.ndarray
b_l from equation 16 of [FLA18]_.
"""
if method == "einsum":
d1 = np.einsum('ijk,gj', Cfl_inv, r_fl-m_fl)
return np.einsum('ml,ml', hfl, np.diagonal(d1).T)
b = np.sum(np.array([np.dot(np.dot(Cfl_inv[i], hfl[i]).T, (r_fl[i]-m_fl[i]))
for i in range(len(hfl))]), axis=0)
return b
"""
FluxPACO
"""
def flux_estimate(self,
phi0s: np.ndarray,
eps: Optional[float] = 0.1,
initial_est: Optional[list] = [0.0]) -> list:
"""
Unbiased estimate of the flux of a source located at p0
The estimate of the flux is given by ahat * h, where h is the PSF template.
This implements algorithm 3 from [FLA18]_.
TODO: Further testing to ensure that the extracted contrast is actually unbiased.
Don't trust this estimate without checking!
Parameters
----------
phi0s : numpy.ndarray
List of locations of sources to compute unbiased flux estimate in pixel units.
Origin is at the bottom left. Should be a list of (x,y) tuples, or a 2D numpy
array.
eps : float
Precision requirement for iteration (0,1)
initial_est : float
Initial estimate of the flux at p0 in contrast units
Returns
-------
ests : list
List of a-hat values for each detected source in the SNR map. This is the unbiased
estimate of the flux at that location. Practically, this is similar to negative PSF
injection. If the PSF is correctly normalized, this should be in contrast units.
stds : list
List of the estimated standard deviation on the flux estimates in contrast units.
norm : float
np.nanmax(psf) - Scaling factor for flux estimate and standard deviations.
"""
print("Computing unbiased flux estimate...")
if self.verbose:
print("Initial guesses:")
print("Positions: ", phi0s)
print("Contrasts: ", initial_est)
dim = self.width/2
# Create arrays needed for storage
# Store for each image pixel, for each temporal frame an image
# for patches: for each time, we need to store a column of patches
normalised_psf, norm, fwhm = normalize_psf(self.psf,
fwhm='fit',
size=None,
threshold=None,
mask_core=None,
model='airy',
imlib='vip-fft',
interpolation='lanczos4',
force_odd=False,
full_output=True,
verbose=self.verbose,
debug=False)
psf_mask = create_boolean_circular_mask(
normalised_psf.shape, radius=self.fwhm)
# The off axis PSF at each point
hoff = np.zeros(
(self.num_frames, self.num_frames, self.patch_area_pixels))
# Create arrays needed for storage
# Store for each image pixel, for each temporal frame an image
# for patches: for each time, we need to store a column of patches
# 2d selection of pixels around a given point
x, y = np.meshgrid(np.arange(-dim, dim), np.arange(-dim, dim))
ests = []
stds = []
for i, p0 in enumerate(phi0s):
p0 = (p0[1], p0[0])
angles_px = np.array(
get_rotated_pixel_coords(x, y, p0, self.angles))
hon = []
for l, ang in enumerate(angles_px):
# Get the column of patches at this point
offax = frame_shift(normalised_psf,
ang[1]-int(ang[1]),
ang[0]-int(ang[0]),
imlib='vip-fft',
interpolation='lanczos4',
border_mode='reflect')[psf_mask]
hoff[l, l] = offax
hon.append(offax)
Cinv, m, patches = self.compute_statistics(
np.array(angles_px).astype(int))
# Get Angles
# Ensure within image bounds
# Extract relevant patches and statistics
Cinlst = []
mlst = []
patch = []
for l, ang in enumerate(angles_px):
Cinlst.append(Cinv[int(ang[0]), int(ang[1])])
mlst.append(m[int(ang[0]), int(ang[1])])
patch.append(patches[int(ang[0]), int(ang[1]), l])
a = self.al(hon, Cinlst)
b = self.bl(hon, Cinlst, patch, mlst)
print(b/a)
# Fill patches and signal template
# Unbiased flux estimation
ahat = initial_est[i]
aprev = 1e10 # Arbitrary large value so that the loop will run
while np.abs(ahat - aprev) > np.abs(ahat * eps):
a = 0.0
b = 0.0
# the mean of a temporal column of patches at each pixel
m = np.zeros((self.num_frames, self.patch_area_pixels))
# the inverse covariance matrix at each point
Cinv = np.zeros(
(self.num_frames, self.patch_area_pixels, self.patch_area_pixels))
# Patches here are columns in time
for l, ang in enumerate(angles_px):
apatch = self.get_patch(ang.astype(int))
m[l], Cinv[l] = self.iterate_flux_calc(
ahat, apatch, hoff[l])
# Patches here are where the planet is expected to be
a = self.al(hon, Cinv)
b = self.bl(hon, Cinv, patch, m)
aprev = ahat
ahat = b/a
if self.verbose:
print(f"Flux estimate: {ahat/norm}")
ests.append(np.abs(ahat/norm))
stds.append(1/np.sqrt(a)/norm)
print("Extracted contrasts")
print("-------------------")
for i in range(len(phi0s)):
print(
f"x: {phi0s[i][0]}, y: {phi0s[i][1]}, flux: {ests[i]}±{stds[i]}")
return ests, stds, norm
def iterate_flux_calc(self, est: float, patch: np.ndarray,
model: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute the iterative estimates for the mean and inverse covariance.
Parameters
----------
est : float
Current estimate for the magnitude of the flux
patch : numpy.ndarray
Column of patches about p0
model : numpy.ndarray
Template for PSF
Returns
-------
m : numpy.ndarray
Mean of background patches.
Cinv : numpy.ndarray
List of inverse covariance matrices between the patches.
"""
if patch is None:
return None, None
unbiased = np.array([apatch - est*model[l]
for l, apatch in enumerate(patch)])
m, Cinv = compute_statistics_at_pixel(unbiased)
return m, Cinv
def subpixel_threshold_detect(self, snr_map: np.ndarray,
threshold: float,
mode: Optional[str] = 'lpeaks',
bkg_sigma: Optional[float] = 5.0,
matched_filter: Optional[bool] = False,
mask: Optional[bool] = True,
full_output: Optional[bool] = False,
cpu: Optional[int] = 1) -> np.ndarray:
""" Wraps VIP.metrics.detection.detection, see that function for further
documentation. Note that the output convention here is different - this
function returns xx,yy.
Finds blobs in a 2d array. The algorithm is designed for automatically
finding planets in post-processed high contrast final frames. Blob can
be defined as a region of an image in which some properties are constant
or vary within a prescribed range of values. See ``Notes`` below to read
about the algorithm details.
Parameters
----------
snr_map : numpy ndarray, 2d
Input frame.
threshold : float
S/N threshold for deciding whether the blob is a detection or not. Used
to threshold the S/N map when ``mode`` is set to 'snrmap' or 'snrmapf'.
mode : {'lpeaks', 'log', 'dog', 'snrmap', 'snrmapf'}, optional
Sets with algorithm to use. Each algorithm yields different results. See
notes for the details of each method.
bkg_sigma : int or float, optional
The number standard deviations above the clipped median for setting the
background level. Used when ``mode`` is either 'lpeaks', 'dog' or 'log'.
matched_filter : bool, optional
Whether to correlate with the psf of not. Used when ``mode`` is either
'lpeaks', 'dog' or 'log'.
mask : bool, optional
If True the central region (circular aperture of 2*FWHM radius) of the
image will be masked out.
full_output : bool, optional
Whether to output just the coordinates of blobs that fulfill the SNR
constraint or a table with all the blobs and the peak pixels and SNR.
cpu : None or int, optional
The number of processes for running the ``snrmap`` function.
verbose : bool, optional
Whether to print to stdout information about found blobs.
Returns
-------
peaks : np.ndarray
xx,yy values of the centers of local maxima above the provided
threshold
"""
peaks = detection(snr_map,
fwhm=self.fwhm,
psf=self.psf/np.nanmax(self.psf),
mode=mode,
bkg_sigma=bkg_sigma,
matched_filter=matched_filter,
mask=mask,
snr_thresh=threshold,
nproc=cpu,
plot=False,
debug=False,
full_output=full_output,
verbose=self.verbose)
if full_output:
return peaks.T
else:
return peaks
def pixel_threshold_detection(
self, snr_map: np.ndarray, threshold: float) -> np.ndarray:
"""
Returns a list of the pixel coordinates of center of signals above a given threshold
Parameters
----------
snr_map : numpy.ndarray
SNR map, b/sqrt(a) as computed by run()
threshold: float
Threshold for detection in sigma
Returns
-------
locs : numpy.array
Array of (x,y) pixel location estimates for the location of point sources
above the provided threshold.
"""
data_max = filters.maximum_filter(snr_map, size=self.fwhm)
maxima = (snr_map == data_max)
diff = (data_max > threshold)
maxima[diff == 0] = 0
labeled, _ = ndimage.label(maxima)
slices = ndimage.find_objects(labeled)
x, y = [], []
for dy, dx in slices:
x_center = (dx.start + dx.stop - 1)/2
x.append(x_center)
y_center = (dy.start + dy.stop - 1)/2
y.append(y_center)
return np.array(list(zip(x, y)))
def compute_statistics(self, phi0s: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
This function computes the mean and inverse covariance matrix for
each patch in the image stack in Serial. Used by FastPACO and flux
estimation.
Parameters
----------
phi0s : numpy.ndarray
Array of pixel locations to estimate companion position
Returns
-------
Cinv : numpy.ndarray
Inverse covariance matrix between the the mean of each of the patches.
The patches are a column through the time axis of the unrotated
science images. Together with the mean this provides an empirical
estimate of the background statistics. An inv covariance matrix is provided
for each pixel location in ph0s.
m : numpy.ndarray
Mean of each of the background patches along the time axis, for each pixel location
in phi0s.
patch : numpy.ndarray
The background column for each test pixel location in phi0s.
"""
if self.verbose:
print("Precomputing Statistics...")
# Store for each image pixel, for each temporal frame an image
# for patches: for each time, we need to store a column of patches
patch = np.zeros(
(self.width, self.height, self.num_frames, self.patch_area_pixels))
# the mean of a temporal column of patches centered at each pixel
m = np.zeros((self.height, self.width, self.patch_area_pixels))
# the inverse covariance matrix at each point
Cinv = np.zeros(
(self.height, self.width, self.patch_area_pixels, self.patch_area_pixels))
# *** SERIAL ***
# Loop over all pixels
# i is the same as theta_k in the PACO paper
for p0 in phi0s:
apatch = self.get_patch(p0)
# For some black magic reason this needs to be inverted here.
m[p0[1]][p0[0]], Cinv[p0[1]][p0[0]
] = compute_statistics_at_pixel(apatch)
patch[p0[1]][p0[0]] = apatch
return Cinv, m, patch
"""
**************************************************
* *
* Fast PACO *
* *
**************************************************
"""
[docs]
class FastPACO(PACO):
"""
This class implements Algorithm 2 from [FLA18]_.
"""
[docs]
def PACOCalc(self,
phi0s: np.ndarray,
use_subpixel_psf_astrometry: Optional[bool] = True,
cpu: Optional[int] = 1) -> None:
"""
PACOCalc
This function iterates of a list of test points (phi0) and a list
of angles between frames to produce 'a' and b', which can be used to
generate a signal to noise map where SNR = b/sqrt(a) at each pixel.
Parameters
----------
phi0s : numpy.ndarray
Array of (x,y) pixel locations to estimate companion position
use_subpixel_psf_astrometry : bool
If true, the PSF model for each patch is shifted to the correct
location as predicted by the starting location and the parallactic
angles, before being resampled for the patch. If false, the PSF
model is simply located at the center of each patch. Significantly
improves performance if set to False, but the SNR is reduced.
cpu : int
Number of cores to use for parallel processing
Returns
-------
a : numpy.ndarray
a_l from Equation 15 of [FLA18]_
b : numpy.ndarray
b_l from Equation 16 of [FLA18]_
"""
npx = len(phi0s) # Number of pixels in an image
dim = self.width/2
a = np.zeros(npx) # Setup output arrays
b = np.zeros(npx)
phi0s = np.array([phi0s[:, 1], phi0s[:, 0]]).T
if cpu == 1:
Cinv, m, patches = self.compute_statistics(phi0s)
else:
Cinv, m, patches = self.compute_statistics_parallel(phi0s, cpu=cpu)
normalised_psf = normalize_psf(self.psf,
fwhm='fit',
size=None,
threshold=None,
mask_core=None,
model='airy',
imlib='vip-fft',
interpolation='lanczos4',
force_odd=False,
full_output=False,
verbose=self.verbose,
debug=False)
psf_mask = create_boolean_circular_mask(
normalised_psf.shape, radius=self.fwhm)
# Create arrays needed for storage
# Store for each image pixel, for each temporal frame an image
# for patches: for each time, we need to store a column of patches
# Currently forcing integer grid, but meshgrid takes floats as
# arguments...
x, y = np.meshgrid(np.arange(-dim, dim), np.arange(-dim, dim))
if self.verbose:
print("Running Fast PACO...")
# Loop over all pixels
# i is the same as theta_k in the PACO paper
for i, p0 in enumerate(phi0s):
# Get Angles
angles_px = get_rotated_pixel_coords(x, y, p0, self.angles)
# Ensure within image bounds
if(int(np.max(angles_px.flatten())) >= self.width or
int(np.min(angles_px.flatten())) < 0):
a[i] = np.nan
b[i] = np.nan
continue
# Extract relevant patches and statistics
Cinlst = []
mlst = []
hlst = []
patch = []
for l, ang in enumerate(angles_px):
Cinlst.append(Cinv[int(ang[0]), int(ang[1])])
mlst.append(m[int(ang[0]), int(ang[1])])
if use_subpixel_psf_astrometry:
offax = frame_shift(normalised_psf,
ang[1]-int(ang[1]),
ang[0]-int(ang[0]),
imlib='vip-fft',
interpolation='lanczos4',
border_mode='reflect')[psf_mask]
else:
offax = normalised_psf[psf_mask]
hlst.append(offax)
patch.append(patches[int(ang[0]), int(ang[1]), l])
# Calculate a and b, matrices
a[i] = self.al(hlst, Cinlst)
b[i] = self.bl(hlst, Cinlst, patch, mlst)
if self.verbose:
print("Done")
return a, b
[docs]
def compute_statistics_parallel(self,
phi0s: np.ndarray,
cpu: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
This function computes the mean and inverse covariance matrix for
each patch in the image stack in parallel.
Parameters
----------
phi0s : int numpy.ndarray
Array of pixel locations to estimate companion position
cpu : int
Number of processors to use
Returns
-------
Cinv : numpy.ndarray
Inverse covariance matrix between the the mean of each of the patches.
The patches are a column through the time axis of the unrotated
science images. Together with the mean this provides an empirical
estimate of the background statistics. An inv covariance matrix is provided
for each pixel location in ph0s.
m : numpy.ndarray
Mean of each of the background patches along the time axis, for each pixel location
in phi0s.
patch : numpy.ndarray
The background column for each test pixel location in phi0s.
"""
if self.verbose:
print("Precomputing Statistics using %d Processes..." % cpu)
# the mean of a temporal column of patches at each pixel
m = np.zeros((self.height*self.width*self.patch_area_pixels))
# the inverse covariance matrix at each point
Cinv = np.zeros((self.height *
self.width *
self.patch_area_pixels *
self.patch_area_pixels))
# *** Parallel Processing ***
p_data = pool_map(cpu, self.get_patch, iterable(phi0s))
patches = [p for p in p_data]
data = pool_map(cpu, compute_statistics_at_pixel, iterable(patches))
# p_pool = Pool(processes=cpu)
#p_data = p_pool.map(self.get_patch, phi0s, chunksize=int(npx/cpu))
# p_pool.close()
# p_pool.join()
# patches = [p for p in p_data]
# p = Pool(processes=cpu)
# data = p.map(compute_statistics_at_pixel, patches, chunksize=int(npx/cpu))
# p.close()
# p.join()
ms, cs = [], []
for d in data:
if d[0] is None or d[1] is None:
ms.append(np.full(self.patch_area_pixels, np.nan))
cs.append(np.full((self.patch_area_pixels,
self.patch_area_pixels), np.nan))
else:
ms.append(d[0])
cs.append(d[1])
ms = np.array(ms)
cs = np.array(cs)
patches = np.array(patches)
# Reshape outputs
patches = patches.reshape((self.width,
self.height,
self.num_frames,
self.patch_area_pixels))
m = ms.reshape((self.height,
self.width,
self.patch_area_pixels))
Cinv = cs.reshape((self.height,
self.width,
self.patch_area_pixels,
self.patch_area_pixels))
patches = np.swapaxes(patches, 0, 1)
m = np.swapaxes(m, 0, 1)
Cinv = np.swapaxes(Cinv, 0, 1)
return Cinv, m, patches
"""
**************************************************
* *
* Full PACO *
* *
**************************************************
"""
[docs]
class FullPACO(PACO):
"""
Implementation of Algorithm 1 from [FLA18]_
"""
[docs]
def PACOCalc(self,
phi0s: np.ndarray,
use_subpixel_psf_astrometry: Optional[bool] = True,
cpu: Optional[int] = 1) -> None:
"""
PACOCalc
This function iterates of a list of test points (phi0) and a list
of angles between frames to produce 'a' and b', which can be used to
generate a signal to noise map where SNR = b/sqrt(a) at each pixel.
Parameters
----------
phi0s : numpy.ndarray
Array of (x,y) pixel locations to estimate companion position
use_subpixel_psf_astrometry : bool
If true, the PSF model for each patch is shifted to the correct
location as predicted by the starting location and the parallactic
angles, before being resampled for the patch. If false, the PSF
model is simply located at the center of each patch. Significantly
improves performance if set to False, but the SNR is reduced.
cpu : int
Number of cores to use for parallel processing. TODO: Not yet implemented.
Returns
-------
a : numpy.ndarray
a_l from Equation 15 of [FLA18]_
b : numpy.ndarray
b_l from Equation 16 of [FLA18]_
"""
npx = len(phi0s) # Number of pixels in an image
dim = self.width/2
a = np.zeros(npx) # Setup output arrays
b = np.zeros(npx)
normalised_psf = normalize_psf(self.psf,
fwhm='fit',
size=None,
threshold=None,
mask_core=None,
model='airy',
imlib='vip-fft',
interpolation='lanczos4',
force_odd=False,
full_output=False,
verbose=self.verbose,
debug=False)
psf_mask = create_boolean_circular_mask(
normalised_psf.shape, radius=self.fwhm)
if self.verbose:
print("Running Full PACO...")
# Set up coordinates so 0 is at the center of the image
x, y = np.meshgrid(np.arange(-dim, dim), np.arange(-dim, dim))
if cpu > 1:
print("Multiprocessing for full PACO is not yet implemented!")
# Store intermediate results
patch = np.zeros(
(self.width, self.height, self.num_frames, self.patch_area_pixels))
# the mean of a temporal column of patches centered at each pixel
m = np.zeros((self.height, self.width, self.patch_area_pixels))
# the inverse covariance matrix at each point
Cinv = np.zeros(
(self.height, self.width, self.patch_area_pixels, self.patch_area_pixels))
# Loop over all pixels
# i is the same as theta_k in the PACO paper
for i, p0 in enumerate(phi0s):
# Get list of pixels for each rotation angle
angles_px = get_rotated_pixel_coords(
x, y, (p0[1], p0[0]), self.angles)
# Ensure within image bounds
if(int(np.max(angles_px.flatten())) >= self.width or
int(np.min(angles_px.flatten())) < 0):
a[i] = np.nan
b[i] = np.nan
continue
# Iterate over each temporal frame/each angle
# Same as iterating over phi_l
current_patch = []
mlst = []
h = []
clst = []
for l, ang in enumerate(angles_px):
# Get the column of patches at this point
if np.max(patch[int(ang[0]), int(ang[1])]) == 0:
apatch = self.get_patch((int(ang[1]), int(ang[0])))
patch[int(ang[0]), int(ang[1])] = apatch
m[int(ang[0]), int(ang[1])], Cinv[int(ang[0]), int(
ang[1])] = compute_statistics_at_pixel(apatch)
else:
apatch = patch[int(ang[0]), int(ang[1])]
if apatch is None:
continue
mlst.append(m[int(ang[0]), int(ang[1])])
clst.append(Cinv[int(ang[0]), int(ang[1])])
current_patch.append(apatch)
if use_subpixel_psf_astrometry:
offax = frame_shift(normalised_psf,
ang[1]-int(ang[1]),
ang[0]-int(ang[0]),
imlib='vip-fft',
interpolation='lanczos4',
border_mode='reflect')[psf_mask]
else:
offax = normalised_psf[psf_mask]
h.append(offax)
current_patch = np.array(current_patch)
patches = np.array([current_patch[l, l]
for l in range(len(angles_px))])
h = np.array(h)
mlst = np.array(mlst)
clst = np.array(clst)
# Calculate a and b, matrices
a[i] = self.al(h, clst)
b[i] = self.bl(h, clst, patches, mlst)
if self.verbose:
print("Done")
return a, b
"""
Math functions for computing patch covariance
"""
def compute_statistics_at_pixel(
patch: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Calculate the mean and inverse covariance within a patch
Reimplemented in PACO class, can probably be deleted
Parameters
----------
patch : numpy.ndarray
Array of circular (flattened) patches centered on the same physical
pixel vertically throughout the image stack
"""
if patch is None:
return None, None
T = patch.shape[0]
#size = patch.shape[1]
# Calculate the mean of the column
m = np.mean(patch, axis=0)
# Calculate the covariance matrix
S = sample_covariance(patch, m, T)
rho = shrinkage_factor(S, T)
F = diagsample_covariance(S)
C = covariance(rho, S, F)
Cinv = np.linalg.inv(C)
return m, Cinv
def covariance(rho: np.ndarray, S: np.ndarray, F: np.ndarray) -> np.ndarray:
"""
Ĉ: Shrinkage covariance matrix
Parameters
----------
rho : float
Shrinkage factor weight
S : numpy.ndarray
Sample covariance matrix
F : numpy.ndarray
Diagonal of sample covariance matrix
Returns
-------
m : numpy.ndarray
Mean of each of the background patches along the time axis.
Cinv : numpy.ndarray
Inverse covariance matrix between the the mean of each of the patches.
The patches are a column through the time axis of the unrotated
science images. Together with the mean this provides an empirical
estimate of the background statistics.
"""
C = (1.0-rho)*S + rho*F
return C
def sample_covariance(r: np.ndarray, m: np.ndarray,
T: np.ndarray) -> np.ndarray:
"""
Ŝ: Sample covariance matrix
Parameters
----------
r : numpy.ndarray
Observed intensity at position θk and time tl
m : numpy.ndarray
Mean of all background patches at position θk
T : int
Number of temporal frames
Returns
-------
S : numpy.ndarray
Sample covariance
"""
#S = (1.0/T)*np.sum([np.outer((p-m).ravel(),(p-m).ravel().T) for p in r], axis=0)
S = (1.0/T)*np.sum([np.cov(np.stack((p, m)),
rowvar=False, bias=False) for p in r], axis=0)
return S
def diagsample_covariance(S: np.ndarray) -> np.ndarray:
"""
F: Diagonal elements of the sample covariance matrix
Parameters
----------
S : arr
Sample covariance matrix
Returns
-------
F : numpy.ndarray
Diagonal elements of the sample covariance matrix
"""
return np.diag(np.diag(S))
def shrinkage_factor(S: np.ndarray, T: np.ndarray) -> float:
"""
ρ: Shrinkage factor to regularize covariant matrix
Parameters
----------
S : numpy.ndarray
Sample covariance matrix
T : int
Number of temporal frames
Returns
-------
ρ : float
Shrinkage factor to regularize covariant matrix
"""
top = (np.trace(np.dot(S, S)) + np.trace(S)**2 -
2.0*np.sum(S**2.0))
bot = ((T+1.0)*(np.trace(np.dot(S, S)) -
np.sum(np.diag(S)**2.0)))
p = top/bot
return max(min(p, 1.0), 0.0)
def get_rotated_pixel_coords(x: np.ndarray,
y: np.ndarray,
p0: Tuple[int, int],
angles: np.ndarray,
astro_convention: Optional[bool] = False) -> np.ndarray:
"""
For a given pixel, find the new pixel location after a rotation for each angle in angles
Parameters
----------
x : numpy.ndarrayr
Grid of x components of pixel coordinates
y : numpy.ndarrayr
Grid of y components of pixel coordinates
p0 : (int,int)
Initial pixel location
angles : numpy.ndarrayr
List of angles for which to compute the new pixel location
Returns
-------
nx : numpy.ndarray
New array of x pixels coordinates following the rotation
ny : numpy.ndarray
New array of y pixels coordinates following the rotation
"""
# Current pixel
phi0 = np.array([x[int(p0[0]), int(p0[1])], y[int(p0[0]), int(p0[1])]])
# Convert to polar coordinates
rad, theta = cart_to_pol(
phi0[0], phi0[1], astro_convention=astro_convention)
# Rotate by parallactic angles
angles_rad = -1*angles + theta
# Rotate the polar coordinates by each frame angle
angles_pol = np.array([rad*np.ones_like(angles_rad), angles_rad])
# Find the new pixel coordinates after rotation
nx, ny = pol_to_cart(
angles_pol[0], angles_pol[1], astro_convention=astro_convention)
# Shift to center coordinates (central pixel is 0)
# TODO - use vip cx, cy arguments rather than shifting after?
nx += +int(x.shape[0]/2)
ny += +int(x.shape[0]/2)
return np.array([nx, ny]).T
def create_boolean_circular_mask(shape: np.ndarray,
radius: Optional[int] = 4,
center: Optional[Tuple[int, int]] = None) -> np.ndarray:
"""
Returns a 2D boolean mask given some radius and location
Parameters
----------
shape : numpy.ndarray
Shape of a 2D numpy array
radius : int
Radius of the mask in pixels
center : (int,int)
Pixel coordinates denoting the center of the mask,
None defaults to center of shape
Returns
-------
mask : numpy.ndarray
A boolean mask of the the same shape as the science
input data (provided by shape argument). The mask is 0
outside of a circular region located at center, with a specified radius.
"""
w = shape[0]
h = shape[1]
if center is None:
center = [int(w/2), int(h/2)]
if radius is None:
radius = min(center[0], center[1], w-center[0], h-center[1])
X, Y = np.ogrid[:w, :h]
dist2 = (X - center[0])**2 + (Y-center[1])**2
mask = dist2 <= radius**2
return mask