Source code for vip_hci.preproc.skysubtraction

#! /usr/bin/env python

"""
Module with sky subtraction function.

.. [GOM17]
   | Gomez-Gonzalez et al. 2017
   | **VIP: Vortex Image Processing Package for High-contrast Direct Imaging**
   | *The Astronomical Journal, Volume 154, p. 7*
   | `https://arxiv.org/abs/1705.06184
     <https://arxiv.org/abs/1705.06184>`_
     
.. [HUN18]
   | Hunziker et al. 2018
   | **PCA-based approach for subtracting thermal background emission in 
     high-contrast imaging data**
   | *Astronomy & Astrophysics, Volume 611, p. 23*
   | `https://arxiv.org/abs/1706.10069
     <https://arxiv.org/abs/1706.10069>`_
     
"""

__author__ = 'Carlos Alberto Gomez Gonzalez'
__all__ = ['cube_subtract_sky_pca']

import numpy as np
from ..var import prepare_matrix


[docs]def cube_subtract_sky_pca(sci_cube, sky_cube, mask, ref_cube=None, ncomp=2, full_output=False): """ PCA-based sky subtraction as explained in [GOM17]_ and [HUN18]_. Parameters ---------- sci_cube : numpy ndarray 3d array of science frames. sky_cube : numpy ndarray 3d array of sky frames. mask : numpy ndarray Mask indicating the region for the analysis. Can be created with the function vip_hci.var.create_ringed_spider_mask. ref_cube : numpy ndarray or None, opt Reference cube. ncomp : int, opt Sets the number of PCs you want to use in the sky subtraction. full_output: bool, opt Whether to also output pcs, reconstructed cube, residuals cube and derotated residual cube. Returns ------- sci_cube_skysub : numpy ndarray Sky-subtracted science cube ref_cube_skysub : numpy ndarray [If ref_cube is not None] Also returns sky-subtracted reference cube. If full_output is set to True, returns (in the following order): - sky-subtracted science cube, - sky-subtracted reference cube (if any provided), - principal components (non-masked), - principal components (masked), and - reconstructed cube. """ try: from ..psfsub.svd import svd_wrapper except BaseException: from ..pca.svd import svd_wrapper if sci_cube.shape[1] != sky_cube.shape[1] or sci_cube.shape[2] != \ sky_cube.shape[2]: raise TypeError('Science and Sky frames sizes do not match') if ref_cube is not None: if sci_cube.shape[1] != ref_cube.shape[1] or sci_cube.shape[2] != \ ref_cube.shape[2]: raise TypeError('Science and Reference frames sizes do not match') # Getting the EVs from the sky cube Msky = prepare_matrix(sky_cube, scaling=None, verbose=False) sky_pcs = svd_wrapper(Msky, 'lapack', sky_cube.shape[0], False) sky_pcs_cube = sky_pcs.reshape(sky_cube.shape[0], sky_cube.shape[1], sky_cube.shape[2]) # Masking the science cube sci_cube_masked = np.zeros_like(sci_cube) ind_masked = np.where(mask == 0) for i in range(sci_cube.shape[0]): masked_image = np.copy(sci_cube[i]) masked_image[ind_masked] = 0 sci_cube_masked[i] = masked_image Msci_masked = prepare_matrix(sci_cube_masked, scaling=None, verbose=False) # Masking the PCs learned from the skies sky_pcs_cube_masked = np.zeros_like(sky_pcs_cube) for i in range(sky_pcs_cube.shape[0]): masked_image = np.copy(sky_pcs_cube[i]) masked_image[ind_masked] = 0 sky_pcs_cube_masked[i] = masked_image # Project the masked frames onto the sky PCs to get the coefficients transf_sci = np.zeros((sky_cube.shape[0], Msci_masked.shape[0])) for i in range(Msci_masked.shape[0]): transf_sci[:, i] = np.inner(sky_pcs, Msci_masked[i].T) Msky_pcs_masked = prepare_matrix(sky_pcs_cube_masked, scaling=None, verbose=False) mat_inv = np.linalg.inv(np.dot(Msky_pcs_masked, Msky_pcs_masked.T)) transf_sci_scaled = np.dot(mat_inv, transf_sci) # Obtaining the optimized sky and subtraction sci_cube_skysub = np.zeros_like(sci_cube) sky_opt = sci_cube.copy() for i in range(Msci_masked.shape[0]): sky_opt[i] = np.array([np.sum( transf_sci_scaled[j, i] * sky_pcs_cube[j] for j in range(ncomp))]) sci_cube_skysub[i] = sci_cube[i] - sky_opt[i] # Processing the reference cube (if any) if ref_cube is not None: ref_cube_masked = np.zeros_like(ref_cube) for i in range(ref_cube.shape[0]): masked_image = np.copy(ref_cube[i]) masked_image[ind_masked] = 0 ref_cube_masked[i] = masked_image Mref_masked = prepare_matrix(ref_cube_masked, scaling=None, verbose=False) transf_ref = np.zeros((sky_cube.shape[0], Mref_masked.shape[0])) for i in range(Mref_masked.shape[0]): transf_ref[:, i] = np.inner(sky_pcs, Mref_masked[i].T) transf_ref_scaled = np.dot(mat_inv, transf_ref) ref_cube_skysub = np.zeros_like(ref_cube) for i in range(Mref_masked.shape[0]): sky_opt = np.array([np.sum(transf_ref_scaled[j, i] * sky_pcs_cube[j] for j in range(ncomp))]) ref_cube_skysub[i] = ref_cube[i] - sky_opt if full_output: return (sci_cube_skysub, ref_cube_skysub, sky_pcs_cube, sky_pcs_cube_masked, sky_opt) else: return sci_cube_skysub, ref_cube_skysub else: if full_output: return (sci_cube_skysub, sky_pcs_cube, sky_pcs_cube_masked, sky_opt) else: return sci_cube_skysub