#! /usr/bin/env python
"""
Module with sky subtraction function.
.. [GOM17]
| Gomez-Gonzalez et al. 2017
| **VIP: Vortex Image Processing Package for High-contrast Direct Imaging**
| *The Astronomical Journal, Volume 154, p. 7*
| `https://arxiv.org/abs/1705.06184
<https://arxiv.org/abs/1705.06184>`_
.. [HUN18]
| Hunziker et al. 2018
| **PCA-based approach for subtracting thermal background emission in
high-contrast imaging data**
| *Astronomy & Astrophysics, Volume 611, p. 23*
| `https://arxiv.org/abs/1706.10069
<https://arxiv.org/abs/1706.10069>`_
.. [REN23]
| Ren 2023
| **Karhunen-Loève data imputation in high-contrast imaging**
| *Astronomy & Astrophysics, Volume 679, p. 8*
| `https://arxiv.org/abs/2308.16912
<https://arxiv.org/abs/2308.16912>`_
"""
__author__ = 'Sandrine Juillard'
__all__ = ['cube_subtract_sky_pca']
import numpy as np
from ..var import prepare_matrix
[docs]
def cube_subtract_sky_pca(sci_cube, sky_cube, masks, ref_cube=None, ncomp=2,
full_output=False):
"""PCA-based sky subtraction as explained in [REN23]_ (see also\
[GOM17]_ and [HUN18]_).
Parameters
----------
sci_cube : numpy ndarray
3d array of science frames.
sky_cube : numpy ndarray
3d array of sky frames.
masks : tuple of two numpy ndarray or one signle numpy ndarray
Mask indicating the boat and anchor regions for the analysis.
If two masks are provided, they will be assigned to mask_anchor and
mask_boat in that order.
If only one mask is provided, it will be used as the anchor, and the
boat images will not be masked (i.e., full frames used).
ref_cube : numpy ndarray or None, opt
Reference cube.
ncomp : int, opt
Sets the number of PCs you want to use in the sky subtraction.
full_output: bool, opt
Whether to also output pcs, reconstructed cube, residuals cube and
derotated residual cube.
Notes
-----
Masks can be created with the function
``vip_hci.var.create_ringed_spider_mask`` or
``vip_hci.var.get_annulus_segments`` (see Usage Example below).
Returns
-------
sci_cube_skysub : numpy ndarray
Sky-subtracted science cube
ref_cube_skysub : numpy ndarray
[If ref_cube is not None] Also returns sky-subtracted reference cube.
If full_output is set to True, returns (in the following order):
- sky-subtracted science cube,
- sky-subtracted reference cube (if any provided),
- boat principal components,
- anchor principal components, and
- reconstructed cube.
Usage Example
-------------
You can create the masks using `get_annulus_segments` from `vip_hci.var`.
.. code-block:: python
from vip_hci.var import get_annulus_segments
The function must be used as follows, where `ring_out`, `ring_in`, and
`coro` define the radius of the different annulus masks. They must have
the same shape as a frame of the science cube.
.. code-block:: python
ones = np.ones(cube[0].shape)
boat = get_annulus_segments(ones,coro,ring_out-coro, mode="mask")[0]
anchor = get_annulus_segments(ones,ring_in,ring_out-ring_in,
mode="mask")[0]
Masks should be provided as 'mask_rdi' argument when using PCA.
.. code-block:: python
res = pca(cube, angles, ref, mask_rdi=(boat, anchor), ncomp=2)
"""
try:
from ..psfsub.svd import svd_wrapper
except BaseException:
from ..pca.svd import svd_wrapper
if sci_cube.shape[1] != sky_cube.shape[1] or sci_cube.shape[2] != \
sky_cube.shape[2]:
raise TypeError('Science and Sky frames sizes do not match')
if ref_cube is not None:
if sci_cube.shape[1] != ref_cube.shape[1] or sci_cube.shape[2] != \
ref_cube.shape[2]:
raise TypeError('Science and Reference frames sizes do not match')
if type(masks) not in (list, tuple):
# If only one mask is provided, the second mask is generated
mask_anchor = masks
mask_boat = np.ones(masks.shape)
elif len(masks) != 2:
raise TypeError('Science and Reference frames sizes do not match')
else:
mask_anchor, mask_boat = masks
# -- Generate boat and anchor matrices
# Masking the sky cube with anchor
sky_cube_masked = np.zeros_like(sky_cube)
ind_masked = np.where(mask_anchor == 0)
for i in range(sky_cube.shape[0]):
masked_image = np.copy(sky_cube[i])
masked_image[ind_masked] = 0
sky_cube_masked[i] = masked_image
sky_anchor = sky_cube_masked.reshape(sky_cube.shape[0],
sky_cube.shape[1]*sky_cube.shape[2])
# Masking the science cube with anchor
sci_cube_anchor = np.zeros_like(sci_cube)
ind_masked = np.where(mask_anchor == 0)
for i in range(sci_cube.shape[0]):
masked_image = np.copy(sci_cube[i])
masked_image[ind_masked] = 0
sci_cube_anchor[i] = masked_image
Msci_masked_anchor = prepare_matrix(sci_cube_anchor, scaling=None,
verbose=False)
# Masking the science cube with boat
sci_cube_boat = np.zeros_like(sci_cube)
ind_masked = np.where(mask_boat == 0)
for i in range(sci_cube.shape[0]):
masked_image = np.copy(sci_cube[i])
masked_image[ind_masked] = 0
sci_cube_boat[i] = masked_image
Msci_masked = prepare_matrix(sci_cube_boat, scaling=None, verbose=False)
# Masking the sky cube with boat
sky_cube_boat = np.zeros_like(sky_cube)
ind_masked = np.where(mask_boat == 0)
for i in range(sky_cube.shape[0]):
masked_image = np.copy(sky_cube[i])
masked_image[ind_masked] = 0
sky_cube_boat[i] = masked_image
sky_boat = sky_cube_boat.reshape(sky_cube.shape[0],
sky_cube.shape[1]*sky_cube.shape[2])
# -- Generate eigenvectors of R(a)T R(a)
sky_kl = np.dot(sky_anchor, sky_anchor.T)
Msky_kl = prepare_matrix(sky_kl, scaling=None, verbose=False)
sky_pcs = svd_wrapper(Msky_kl, 'lapack', sky_kl.shape[0], False)
sky_pcs_kl = sky_pcs.reshape(sky_kl.shape[0], sky_kl.shape[1])
# -- Generate Kl and Dikl transform
sky_pc_anchor = np.dot(sky_pcs_kl, sky_anchor)
sky_pcs_anchor_cube = sky_pc_anchor.reshape(sky_cube.shape[0],
sky_cube.shape[1],
sky_cube.shape[2])
sky_pcs_boat_cube = np.dot(sky_pcs_kl, sky_boat).reshape(sky_cube.shape[0],
sky_cube.shape[1],
sky_cube.shape[2])
# -- Generate Kl projection to get coeff
transf_sci = np.zeros((sky_cube.shape[0], Msci_masked_anchor.shape[0]))
for i in range(Msci_masked_anchor.shape[0]):
transf_sci[:, i] = np.inner(sky_pc_anchor, Msci_masked_anchor[i].T)
Msky_pcs_anchor = prepare_matrix(sky_pcs_anchor_cube, scaling=None,
verbose=False)
mat_inv = np.linalg.inv(np.dot(Msky_pcs_anchor, Msky_pcs_anchor.T))
transf_sci_scaled = np.dot(mat_inv, transf_sci)
# -- Subtraction Dikl projection using anchor coeff to sci cube
sci_cube_skysub = np.zeros_like(sci_cube)
sky_opt = sci_cube.copy()
for i in range(Msci_masked.shape[0]):
tmp_sky = [np.sum(transf_sci_scaled[j, i]*sky_pcs_boat_cube[j]
for j in range(ncomp))]
sky_opt[i] = np.array(tmp_sky)
sci_cube_skysub[i] = sci_cube_boat[i] - sky_opt[i]
# -- Processing the reference cube (if any)
if ref_cube is not None:
# Masking the ref cube with anchor
ref_cube_anchor = np.zeros_like(sci_cube)
ind_masked = np.where(mask_anchor == 0)
for i in range(sci_cube.shape[0]):
masked_image = np.copy(sci_cube[i])
masked_image[ind_masked] = 0
ref_cube_anchor[i] = masked_image
Mref_masked_anchor = prepare_matrix(ref_cube_anchor, scaling=None,
verbose=False)
# Masking the ref cube with boat
ref_cube_boat = np.zeros_like(sci_cube)
ind_masked = np.where(mask_boat == 0)
for i in range(sci_cube.shape[0]):
masked_image = np.copy(sci_cube[i])
masked_image[ind_masked] = 0
ref_cube_boat[i] = masked_image
Mref_masked = prepare_matrix(ref_cube_boat, scaling=None, verbose=False)
transf_ref = np.zeros((sky_cube.shape[0], Mref_masked.shape[0]))
for i in range(Mref_masked.shape[0]):
transf_ref[:, i] = np.inner(sky_pc_anchor, Mref_masked_anchor[i].T)
transf_ref_scaled = np.dot(mat_inv, transf_ref)
ref_cube_skysub = np.zeros_like(ref_cube)
for i in range(Mref_masked.shape[0]):
tmp_sky = [np.sum(transf_ref_scaled[j, i]*sky_pcs_boat_cube[j]
for j in range(ncomp))]
sky_opt = np.array(tmp_sky)
ref_cube_skysub[i] = ref_cube_boat[i] - sky_opt
if full_output:
return (sci_cube_skysub, ref_cube_skysub, sky_pcs_anchor_cube,
sky_pcs_boat_cube, sky_opt)
else:
return sci_cube_skysub, ref_cube_skysub
else:
if full_output:
return (sci_cube_skysub, sky_pcs_anchor_cube, sky_pcs_boat_cube,
sky_opt)
else:
return sci_cube_skysub