Source code for vip_hci.fm.utils_negfd

#! /usr/bin/env python
"""
Module with post-processing related functions called from within the NEGFD
algorithm.
"""

__author__ = "Valentin Christiaens, Carlos Alberto Gomez Gonzalez"
__all__ = ["cube_disk_free"]

import numpy as np
from ..preproc import (
    cube_crop_frames,
    frame_pad,
    cube_shift,
    frame_shift,
    cube_derotate,
    frame_rotate,
    cube_rescaling,
    frame_rescaling,
)
from .fakedisk import cube_inject_fakedisk


[docs] def cube_disk_free( disk_parameter, cube, derot_angs, disk_img, psfn=None, imlib="vip-fft", interpolation="lanczos4", imlib_sh="vip-fft", interpolation_sh="lanczos4", imlib_sc="vip-fft", interpolation_sc="lanczos4", transmission=None, weights=None, **rot_options ): """ Return a cube in which we have injected a negative fake disk model image \ with xy shifts, spatial scaling, rotation and flux scaling given by \ disk_parameter. Parameters ---------- disk_parameter: numpy.array or list or tuple The (delta_x, delta_y, theta, scal, flux) for the model disk image. For a 4d cube, disk_parameter should be either (i) a numpy array with shape (5,n_ch), (ii) a 5-element list/tuple where delta_x, delta_y, theta and scal can be floats or 1d-arrays, but flux mandatorily as a 1d array with length equal to cube.shape[0]. cube: numpy.array The cube of fits images expressed as a numpy.array. angs: numpy.array The parallactic angle fits image expressed as a numpy.array. disk_img: 2d or 3d numpy ndarray The disk image to be injected, as a 2d ndarray (for either 3D or 4D input cubes) or a 3d numpy array (for a 4D spectral+ADI input cube). In the latter case, the images should correspond to different wavelengths, and the zeroth shape of disk_model and cube should match. psfn: 2d or 3d numpy ndarray The normalized psf expressed as a numpy ndarray. Can be 3d for a 4d (spectral+ADI) input cube. This would only be used to convolve disk_img. Leave to None if the disk_img is already convolved. imlib : str, optional See the documentation of the ``vip_hci.preproc.frame_rotate`` function. interpolation : str, optional See the documentation of the ``vip_hci.preproc.frame_rotate`` function. imlib_sh : str, optional See the documentation of the ``vip_hci.preproc.frame_shift`` function. interpolation_sh : str, optional See the documentation of the ``vip_hci.preproc.frame_shift`` function. imlib_sc : str, optional See the documentation of the ``vip_hci.preproc.frame_rescaling`` function. interpolation_sc : str, optional See the documentation of the ``vip_hci.preproc.frame_rescaling`` function. transmission: numpy array, optional Radial transmission of the coronagraph, if any. Array with either 2 x n_rad, 1+n_ch x n_rad columns. The first column should contain the radial separation in pixels, while the other column(s) are the corresponding off-axis transmission (between 0 and 1), for either all, or each spectral channel (only relevant for a 4D input cube). rot_options: dictionary, optional Dictionary with optional keyword values for "nproc", "cxy", "imlib", "interpolation, "border_mode", "mask_val", "edge_blend", "interp_zeros", "ker" (see documentation of ``vip_hci.preproc.frame_rotate``) Returns ------- cdf : numpy.array The cube with negative disk model injected at the position given in disk_parameter. """ cdf = np.zeros_like(cube) disk_model_tmp = disk_img.copy() # unify planet_parameter format if isinstance(disk_parameter, np.ndarray): cond1 = cube.ndim == 3 and disk_parameter.ndim < 2 cond2 = cube.ndim == 4 and disk_parameter.ndim < 3 if not cond1 or not cond2: raise TypeError("Wrong dimensions for disk_parameter") elif len(disk_parameter) != 5: raise TypeError("Wrong length for disk_parameter") if cube.ndim == 4: if len(disk_parameter[-1]) != cube.shape[0]: msg = "Length of flux scaling parameter should match cube axis 0" raise ValueError(msg) if not isinstance(disk_parameter, np.ndarray): cond1 = np.isscalar(disk_parameter[0]) cond2 = np.isscalar(disk_parameter[1]) cond3 = np.isscalar(disk_parameter[2]) if cond1 or cond2 or cond3: ndisk_parameter = np.zeros([5, cube.shape[0]]) for j in range(5): ndisk_parameter[j, :] = disk_parameter[j] disk_parameter = ndisk_parameter.copy() else: disk_parameter = np.array(disk_parameter) if disk_parameter.shape[-1] != cube.shape[0]: raise TypeError("Input disk parameter has wrong dimensions") if disk_model_tmp.ndim == 2: disk_model_tmp = np.array([disk_model_tmp] * cube.shape[0]) if psfn is None: psfn = [None] * cube.shape[0] delta_x = disk_parameter[0, :] delta_y = disk_parameter[1, :] theta = disk_parameter[2, :] disk_model_tmp = cube_shift( disk_model_tmp, delta_y, delta_x, imlib=imlib_sh, interpolation=interpolation_sh, ) angs = np.array([theta] * disk_model_tmp.shape[0]) disk_model_tmp = cube_derotate( disk_model_tmp, -angs, imlib=imlib, interpolation=interpolation, **rot_options ) for j in range(cube.shape[0]): sc = disk_parameter[3, j] disk_model_tmp[j] = frame_rescaling( disk_model_tmp[j], scale=sc, imlib=imlib_sc, interpolation=interpolation_sc, ) flevel = disk_parameter[4, j] disk_rot = cube_inject_fakedisk( flevel * disk_model_tmp[j], derot_angs, psf=psfn[j], transmission=transmission, **rot_options ) if weights is not None: if len(weights) != cube.shape[1]: raise TypeError("weights length should match cube axis 1") else: disk_rot *= weights if disk_rot.shape[-1] < cube.shape[-1]: disk_rot_pad = np.zeros_like(cube[j]) pad_fac = cube.shape[-1] / disk_rot.shape[-1] for i in range(cube.shape[1]): disk_rot_pad[i] = frame_pad( disk_rot[i], pad_fac, fillwith=0, keep_parity=False, full_output=False, ) disk_rot = disk_rot_pad elif disk_rot.shape[-1] > cube.shape[-1]: disk_rot = cube_crop_frames(disk_rot, cube.shape[-1], force=True) cdf[j] = cube[j] - disk_rot else: delta_x = disk_parameter[0] delta_y = disk_parameter[1] theta = disk_parameter[2] sc = disk_parameter[3] flevel = disk_parameter[4] disk_model_tmp = frame_shift( disk_model_tmp, delta_y, delta_x, imlib=imlib_sh, interpolation=interpolation_sh, ) disk_model_tmp = frame_rotate( disk_model_tmp, theta, imlib=imlib, interpolation=interpolation, **rot_options ) disk_model_tmp = frame_rescaling( disk_model_tmp, scale=sc, imlib=imlib_sc, interpolation=interpolation_sc ) disk_rot = cube_inject_fakedisk( flevel * disk_model_tmp, derot_angs, psf=psfn, transmission=transmission, **rot_options ) if weights is not None: if len(weights) != cube.shape[0]: raise TypeError("weights length should match cube axis 0") else: disk_rot *= weights if disk_rot.shape[-1] < cube.shape[-1]: disk_rot_pad = np.zeros_like(cube) pad_fac = cube.shape[-1] / disk_rot.shape[-1] for i in range(cube.shape[0]): disk_rot_pad[i] = frame_pad( disk_rot[i], pad_fac, fillwith=0, keep_parity=False, full_output=False, ) disk_rot = disk_rot_pad elif disk_rot.shape[-1] > cube.shape[-1]: disk_rot = cube_crop_frames(disk_rot, cube.shape[-1], force=True) cdf = cube - disk_rot return cdf