Source code for vip_hci.preproc.rescaling

#! /usr/bin/env python
"""
Module with frame px resampling/rescaling functions.
"""
__author__ = "Carlos Alberto Gomez Gonzalez, V. Christiaens, R. Farkas"
__all__ = [
    "frame_px_resampling",
    "cube_px_resampling",
    "cube_rescaling_wavelengths",
    "frame_rescaling",
    "cube_rescaling",
    "check_scal_vector",
    "find_scal_vector",
    "scale_fft",
]

import numpy as np
import warnings

try:
    import cv2

    no_opencv = False
except ImportError:
    warnings.warn("Opencv python bindings are missing.", ImportWarning)
    no_opencv = True

from scipy.ndimage import geometric_transform, zoom
from scipy.optimize import minimize
from ..var import frame_center, get_square, cube_filter_highpass
from .subsampling import cube_collapse
from .recentering import frame_shift
from .cosmetics import frame_crop


[docs] def cube_px_resampling( array, scale, imlib="vip-fft", interpolation="lanczos4", keep_center=False, verbose=True, ): """ Resample the frames of a cube with a single scale factor. Can deal with NaN values. Wrapper of ``frame_px_resampling``. Useful when we need to upsample (upscaling) or downsample (pixel binning) a set of frames, e.g. an ADI cube. Parameters ---------- array : 3d numpy ndarray Input cube, 3d array. scale : int, float or tuple Scale factor for upsampling or downsampling the frames in the cube. If a tuple it corresponds to the scale along x and y. imlib : str, optional See the documentation of the ``vip_hci.preproc.frame_px_resampling`` function. interpolation : str, optional See the documentation of the ``vip_hci.preproc.frame_px_resampling`` function. keep_center: bool, opt If input dimensions are even and the star centered (i.e. on dim//2, dim//2), whether to keep the star centered after scaling, i.e. on (new_dim//2, new_dim//2). For a non-centered input cube, better to leave it to False. verbose : bool, optional Whether to print out additional info such as the new cube shape. Returns ------- array_resc : numpy ndarray Output cube with resampled frames. """ if array.ndim != 3: raise TypeError("Input array is not a cube or 3d array.") array_resc = [] for i in range(array.shape[0]): imresc = frame_px_resampling( array[i], scale=scale, imlib=imlib, interpolation=interpolation, keep_center=keep_center, ) array_resc.append(imresc) array_resc = np.array(array_resc) if verbose: print("Cube successfully rescaled") print("New shape: {}".format(array_resc.shape)) return array_resc
[docs] def frame_px_resampling( array, scale, imlib="vip-fft", interpolation="lanczos4", keep_center=False, verbose=False, ): """ Resample the pixels of a frame changing the frame size. Can deal with NaN values. If ``scale`` < 1 then the frame is downsampled and if ``scale`` > 1 then its pixels are upsampled. Warning: if imlib is not 'vip-fft', the input size is even and keep_center set to True, an additional interpolation (shifting by (0.5,0.5)px) may occur after rescaling, to ensure center location stays on (dim//2,dim//2). Parameters ---------- array : numpy ndarray Input frame, 2d array. scale : int, float or tuple Scale factor for upsampling or downsampling the frame. If a tuple it corresponds to the scale along x and y. imlib : {'ndimage', 'opencv', 'vip-fft'}, optional Library used for image transformations. 'vip-fft' corresponds to a FFT-based rescaling algorithm implemented in VIP (``vip_hci.preproc.scale_fft``). interpolation : str, optional For 'ndimage' library: 'nearneig', bilinear', 'biquadratic', 'bicubic', 'biquartic', 'biquintic'. The 'nearneig' interpolation is the fastest and the 'biquintic' the slowest. The 'nearneig' is the worst option for interpolation of noisy astronomical images. For 'opencv' library: 'nearneig', 'bilinear', 'bicubic', 'lanczos4'. The 'nearneig' interpolation is the fastest and the 'lanczos4' the slowest and accurate. keep_center: bool, opt If input dimensions are even and the star centered (i.e. on dim//2, dim//2), whether to keep the star centered after scaling, i.e. on (new_dim//2, new_dim//2). For a non-centered input frame, better to leave it to False. verbose : bool, optional Whether to print out additional info such as the new image shape. Returns ------- array_resc : numpy ndarray Output resampled frame. """ if array.ndim != 2: raise TypeError("Input array is not a frame or 2d array") if isinstance(scale, tuple): scale_x, scale_y = scale elif isinstance(scale, (float, int)): scale_x = scale scale_y = scale else: raise TypeError("`scale` must be float, int or tuple") # Replace any NaN with real values before scaling mask = None nan_mask = np.isnan(array) if np.any(nan_mask): medval = np.nanmedian(array) array[nan_mask] = medval mask = np.zeros_like(array) mask[nan_mask] = 1 if array.shape[0] % 2: odd = True else: odd = False # expected output size out_sz = int(round(array.shape[0] * scale_y)), int(round(array.shape[1] * scale_x)) if not odd and keep_center and imlib != "vip-fft": def _make_odd(img): img_odd = np.zeros([img.shape[0] + 1, img.shape[1] + 1]) img_odd[:-1, :-1] = img.copy() img_odd[-1, :-1] = img[-1].copy() img_odd[:-1, -1] = img[:, -1].copy() img_odd[-1, -1] = np.mean([img[-1, -2], img[-2, -1], img[-2, -2]]) return img_odd array = _make_odd(array) if mask is not None: mask = _make_odd(mask) if imlib == "ndimage": if interpolation == "nearneig": order = 0 elif interpolation == "bilinear": order = 1 elif interpolation == "biquadratic": order = 2 elif interpolation == "bicubic": order = 3 elif interpolation == "biquartic" or interpolation == "lanczos4": order = 4 elif interpolation == "biquintic": order = 5 else: raise TypeError("Scipy.ndimage interpolation method not recognized") if mask is not None: mask = zoom(mask, zoom=(scale_y, scale_x), order=order) array_resc = zoom(array, zoom=(scale_y, scale_x), order=order) # For flux conservation: array_resc /= scale_y * scale_x elif imlib == "opencv": if no_opencv: msg = "Opencv python bindings cannot be imported. Install opencv or" msg += " set imlib to ndimage" raise RuntimeError(msg) if interpolation == "bilinear": intp = cv2.INTER_LINEAR elif interpolation == "bicubic": intp = cv2.INTER_CUBIC elif interpolation == "nearneig": intp = cv2.INTER_NEAREST elif interpolation == "lanczos4": intp = cv2.INTER_LANCZOS4 else: raise TypeError("Opencv interpolation method not recognized") if mask is not None: mask = cv2.resize( mask.astype(np.float32), (0, 0), fx=scale_x, fy=scale_y, interpolation=intp, ) array_resc = cv2.resize( array.astype(np.float32), (0, 0), fx=scale_x, fy=scale_y, interpolation=intp ) # For flux conservation: array_resc /= scale_y * scale_x elif imlib == "vip-fft": if scale_x != scale_y: msg = "FFT scaling only supports identical factors along x and y" raise ValueError(msg) if array.shape[0] != array.shape[1]: msg = "FFT scaling only supports square input arrays" raise ValueError(msg) # make array with even dimensions before FFT-scaling if odd: array_even = np.zeros([array.shape[0] + 1, array.shape[1] + 1]) array_even[1:, 1:] = array array = array_even if mask is not None: if odd: mask_even = np.zeros([mask.shape[0] + 1, mask.shape[1] + 1]) mask_even[1:, 1:] = mask mask = mask_even mask = scale_fft(mask, scale_x) if odd: mask_odd = np.zeros([mask.shape[0] - 1, mask.shape[1] - 1]) mask_odd = mask[1:, 1:] mask = mask_odd array_resc = scale_fft(array, scale_x) if odd: array = np.zeros([array_resc.shape[0] - 1, array_resc.shape[1] - 1]) array = array_resc[1:, 1:] array_resc = array # Note: FFT preserves flux - no need to scale flux separately else: raise ValueError("Image transformation library not recognized") # Place back NaN values in scaled array if mask is not None: array_resc[mask >= 0.5] = np.nan if keep_center and not array_resc.shape[0] % 2 and imlib != "vip-fft": if imlib == "ndimage": imlib_s = "ndimage-interp" else: imlib_s = imlib array_resc = frame_shift(array_resc, 0.5, 0.5, imlib_s, interpolation) if array_resc.shape != out_sz and imlib != "vip-fft": if out_sz[0] == out_sz[1]: if out_sz[0] < array_resc.shape[0]: array_resc = frame_crop( array_resc, out_sz[0], force=True, verbose=False ) else: # crop manually along each axis cy, cx = frame_center(array_resc) wing_y = (out_sz[0] - 1) / 2 y0 = int(cy - wing_y) yN = int(cy + wing_y + 1) wing_x = (out_sz[1] - 1) / 2 x0 = int(cx - wing_x) xN = int(cx + wing_x + 1) array_resc = array_resc[y0:yN, x0:xN] if verbose: print("Image successfully rescaled") print("New shape: {}".format(array_resc.shape)) return array_resc
[docs] def cube_rescaling_wavelengths( cube, scal_list, full_output=True, inverse=False, y_in=None, x_in=None, imlib="vip-fft", interpolation="lanczos4", collapse="median", pad_mode="reflect", ): """ Scale/Descale a cube by scal_list, with padding. Can deal with NaN values. Wrapper to scale or descale a cube by factors given in scal_list, without any loss of information (zero-padding if scaling > 1). Important: in case of IFS data, the scaling factors in scal_list should be >= 1 (ie. provide the scaling factors as for scaling to the longest wavelength channel). Parameters ---------- cube: 3D-array Data cube with frames to be rescaled. scal_list: 1D-array Vector of same dimension as the first dimension of datacube, containing the scaling factor for each frame. full_output: bool, optional Whether to output just the rescaled cube (False) or also its median, the new y and x shapes of the cube, and the new centers cy and cx of the frames (True). inverse: bool, optional Whether to inverse the scaling factors in scal_list before applying them or not; i.e. True is to descale the cube (typically after a first scaling has already been done) y_in, x_in: int Initial y and x sizes, required for ``inverse=True``. In case the cube is descaled, these values will be used to crop back the cubes/frames to their original size. imlib : {'opencv', 'ndimage', 'vip-fft'}, str optional Library used for image transformations. Opencv is faster than ndimage or skimage. 'vip-fft' corresponds to a FFT-based rescaling algorithm implemented in VIP (``vip_hci.preproc.scale_fft``). interpolation : str, optional For 'ndimage' library: 'nearneig', bilinear', 'bicuadratic', 'bicubic', 'biquartic', 'biquintic'. The 'nearneig' interpolation is the fastest and the 'biquintic' the slowest. The 'nearneig' is the poorer option for interpolation of noisy astronomical images. For 'opencv' library: 'nearneig', 'bilinear', 'bicubic', 'lanczos4'. The 'nearneig' interpolation is the fastest and the 'lanczos4' the slowest and accurate. 'lanczos4' is the default. collapse : {'median', 'mean', 'sum', 'trimmean'}, str optional Sets the way of collapsing the frames for producing a final image. pad_mode : str, optional One of the following string values: ``'constant'`` pads with a constant value ``'edge'`` pads with the edge values of array ``'linear_ramp'`` pads with the linear ramp between end_value and the array edge value. ``'maximum'`` pads with the maximum value of all or part of the vector along each axis ``'mean'`` pads with the mean value of all or part of the vector along each axis ``'median'`` pads with the median value of all or part of the vector along each axis ``'minimum'`` pads with the minimum value of all or part of the vector along each axis ``'reflect'`` pads with the reflection of the vector mirrored on the first and last values of the vector along each axis ``'symmetric'`` pads with the reflection of the vector mirrored along the edge of the array ``'wrap'`` pads with the wrap of the vector along the axis. The first values are used to pad the end and the end values are used to pad the beginning Returns ------- frame: 2d array [full_output=False] The median of the rescaled cube. cube : 3d array [full_output=True] Rescaled cube frame : 2d array [full_output=True] Median of the rescaled cube -- note it is in 2nd position if full_output is set to True. y,x,cy,cx : floats [full_output=True] New y and x shapes of the cube, and the new centers cy and cx of the frames """ n, y, x = cube.shape max_sc = np.amax(scal_list) if not inverse and max_sc > 1: new_y = int(np.ceil(max_sc * y)) new_x = int(np.ceil(max_sc * x)) if (new_y - y) % 2 != 0: new_y += 1 if (new_x - x) % 2 != 0: new_x += 1 pad_len_y = (new_y - y) // 2 pad_len_x = (new_x - x) // 2 pad_width = ((0, 0), (pad_len_y, pad_len_y), (pad_len_x, pad_len_x)) big_cube = np.pad(cube, pad_width, pad_mode) else: big_cube = cube.copy() n, y, x = big_cube.shape cy, cx = frame_center(big_cube[0]) if inverse: scal_list = 1.0 / scal_list cy, cx = frame_center(cube[0]) # (de)scale the cube, so that a planet would now move radially cube = cube_rescaling( big_cube, scal_list, ref_xy=(cx, cy), imlib=imlib, interpolation=interpolation ) frame = cube_collapse(cube, collapse) if inverse and max_sc > 1: if y_in is None or x_in is None: raise ValueError("You need to provide y_in and x_in when " "inverse=True!") siz = max(y_in, x_in) if frame.shape[0] > siz: frame = get_square(frame, siz, cy, cx) if full_output and cube.shape[-1] > siz: n_z = cube.shape[0] array_old = cube.copy() cube = np.zeros([n_z, siz, siz]) for zz in range(n_z): cube[zz] = get_square(array_old[zz], siz, cy, cx) if full_output: return cube, frame, y, x, cy, cx else: return frame
def _scale_func(output_coords, ref_xy=0, scaling=1.0, scale_y=None, scale_x=None): """ For each coordinate point in a new scaled image (output_coords), coordinates in the image before the scaling are returned. This scaling function is used within geometric_transform which, for each point in the output image, will compute the (spline) interpolated value at the corresponding frame coordinates before the scaling. """ ref_x, ref_y = ref_xy if scale_y is None: scale_y = scaling if scale_x is None: scale_x = scaling return ( ref_y + (output_coords[0] - ref_y) / scale_y, ref_x + (output_coords[1] - ref_x) / scale_x, )
[docs] def frame_rescaling( array, ref_xy=None, scale=1.0, imlib="vip-fft", interpolation="lanczos4", scale_y=None, scale_x=None, ): """ Rescale a frame by a factor wrt a reference point. The reference point is by default the center of the frame (typically the exact location of the star). However, it keeps the same dimensions. Parameters ---------- array : numpy ndarray Input frame, 2d array. ref_xy : float, optional Coordinates X,Y of the point wrt which the rescaling will be applied. By default the rescaling is done with respect to the center of the frame. scale : float Scaling factor. If > 1, it will upsample the input array equally along y and x by this factor. imlib : {'ndimage', 'opencv', 'vip-fft'}, optional Library used for image transformations. 'vip-fft' corresponds to a FFT-based rescaling algorithm implemented in VIP (``vip_hci.preproc.scale_fft``). interpolation : str, optional For 'ndimage' library: 'nearneig', bilinear', 'biquadratic', 'bicubic', 'biquartic', 'biquintic'. The 'nearneig' interpolation is the fastest and the 'biquintic' the slowest. The 'nearneig' is the worst option for interpolation of noisy astronomical images. For 'opencv' library: 'nearneig', 'bilinear', 'bicubic', 'lanczos4'. The 'nearneig' interpolation is the fastest and the 'lanczos4' the slowest and accurate. scale_y : float Scaling factor only for y axis. If provided, it takes priority on scale parameter. scale_x : float Scaling factor only for x axis. If provided, it takes priority on scale parameter. Returns ------- array_out : numpy ndarray Resulting frame. """ if array.ndim != 2: raise TypeError("Input array is not a frame or 2d array.") if scale_y is None: scale_y = scale if scale_x is None: scale_x = scale outshape = array.shape if ref_xy is None: ref_xy = frame_center(array) else: if imlib == "vip-fft" and ref_xy != frame_center(array): msg = "'vip-fft'imlib does not yet allow for custom center to be " msg += " provided " raise ValueError(msg) # Replace any NaN with real values before scaling mask = None nan_mask = np.isnan(array) if np.any(nan_mask): medval = np.nanmedian(array) array[nan_mask] = medval mask = np.zeros_like(array) mask[nan_mask] = 1 if imlib == "ndimage": if interpolation == "nearneig": order = 0 elif interpolation == "bilinear": order = 1 elif interpolation == "biquadratic": order = 2 elif interpolation == "bicubic": order = 3 elif interpolation == "biquartic" or interpolation == "lanczos4": order = 4 elif interpolation == "biquintic": order = 5 else: raise TypeError("Scipy.ndimage interpolation method not recognized") array_out = geometric_transform( array, _scale_func, order=order, output_shape=outshape, extra_keywords={ "ref_xy": ref_xy, "scaling": scale, "scale_y": scale_y, "scale_x": scale_x, }, ) array_out /= scale_y * scale_x elif imlib == "opencv": if no_opencv: msg = "Opencv python bindings cannot be imported. Install " msg += " opencv or set imlib to skimage" raise RuntimeError(msg) if interpolation == "bilinear": intp = cv2.INTER_LINEAR elif interpolation == "bicubic": intp = cv2.INTER_CUBIC elif interpolation == "nearneig": intp = cv2.INTER_NEAREST elif interpolation == "lanczos4": intp = cv2.INTER_LANCZOS4 else: raise TypeError("Opencv interpolation method not recognized") M = np.array( [ [scale_x, 0, (1.0 - scale_x) * ref_xy[0]], [0, scale_y, (1.0 - scale_y) * ref_xy[1]], ] ) array_out = cv2.warpAffine(array.astype(np.float32), M, outshape, flags=intp) array_out /= scale_y * scale_x elif imlib == "vip-fft": if scale_x != scale_y: msg = "FFT scaling only supports identical factors along x and y" raise ValueError(msg) if array.shape[0] != array.shape[1]: msg = "FFT scaling only supports square input arrays" raise ValueError(msg) # make array with even dimensions before FFT-scaling if array.shape[0] % 2: odd = True array_even = np.zeros([array.shape[0] + 1, array.shape[1] + 1]) array_even[1:, 1:] = array array = array_even else: odd = False if mask is not None: if odd: mask_even = np.zeros([mask.shape[0] + 1, mask.shape[1] + 1]) mask_even[1:, 1:] = mask mask = mask_even mask = scale_fft(mask, scale_x, ori_dim=True) if odd: mask_odd = np.zeros([mask.shape[0] - 1, mask.shape[1] - 1]) mask_odd = mask[1:, 1:] mask = mask_odd array_out = scale_fft(array, scale_x, ori_dim=True) if odd: array = np.zeros([array_out.shape[0] - 1, array_out.shape[1] - 1]) array = array_out[1:, 1:] array_out = array else: raise ValueError("Image transformation library not recognized") # Place back NaN values in scaled array if mask is not None: array_out[mask >= 0.5] = np.nan return array_out
[docs] def cube_rescaling( array, scaling_list, ref_xy=None, imlib="vip-fft", interpolation="lanczos4", scaling_y=None, scaling_x=None, ): """ Rescale a cube by factors from ``scaling_list`` wrt a position. Parameters ---------- array : numpy ndarray Input 3d array, cube. scaling_list : 1D numpy ndarray or list Scale corresponding to each frame in the cube. ref_xy : float, optional Coordinates X,Y of the point with respect to which the rescaling will be performed. By default the rescaling is done with respect to the center of the frames; central pixel if the frames have odd size. imlib : str optional See the documentation of ``vip_hci.preproc.cube_rescaling_wavelengths``. interpolation : str, optional See the documentation of ``vip_hci.preproc.cube_rescaling_wavelengths``. scaling_y : 1D-array or list Scaling factor only for y axis. If provided, it takes priority on scaling_list. scaling_x : 1D-array or list Scaling factor only for x axis. If provided, it takes priority on scaling_list. Returns ------- array_sc : numpy ndarray Resulting cube with rescaled frames. """ if array.ndim != 3: raise TypeError("Input array is not a cube or 3d array") array_sc = [] if scaling_list is None: scaling_list = [None] * array.shape[0] for i in range(array.shape[0]): array_sc.append( frame_rescaling( array[i], ref_xy=ref_xy, scale=scaling_list[i], imlib=imlib, interpolation=interpolation, scale_y=scaling_y, scale_x=scaling_x, ) ) return np.array(array_sc)
[docs] def check_scal_vector(scal_vec): """ Checks that the scaling factor list has the right format (i.e. all factors >= 1). If not, it returns the vector after normalization by the minimum value. Parameters ---------- scal_vec: 1d array or list Vector with the wavelengths. Returns ------- scal_vec: numpy ndarray, 1d Vector containing the scaling factors (after correction to comply with the condition >= 1). """ if not isinstance(scal_vec, (list, np.ndarray)): raise TypeError("`Scal_vec` is neither a list or an np.ndarray") scal_vec = np.array(scal_vec) # checking if min factor is 1: if scal_vec.min() != 1: scal_vec = scal_vec / scal_vec.min() return scal_vec
[docs] def find_scal_vector( cube, lbdas, fluxes, mask=None, nfp=2, fm="stddev", simplex_options=None, debug=False, imlib="vip-fft", interpolation="lanczos4", hpf=False, fwhm_max=5, **kwargs ): """ Find the optimal scaling factor for the channels of an IFS cube (or of dual-band pairs of images). The algorithm finds the optimal scaling factor that minimizes residuals in the rescaled frames. It takes the inverse of the wavelength vector as a first guess, and uses a similar method as the negative fake companion technique, but minimizing residuals in either a mask or the whole field. Parameters ---------- cube: 3D-array Data cube with frames to be rescaled. lbdas: 1d array Vector with the wavelengths, used for first guess on scaling factor. fluxes: 1d array Vector with the (unsaturated) fluxes at the different wavelengths, used for first guess on flux factor. mask: 2D-array, opt Binary mask, with ones where the residual intensities should be evaluated. If None is provided, the whole field is used. nfp: int, opt, {1,2} Number of free parameters: spatial scaling alone or spatial scaling + flux scaling. fm: str, opt, {"sum","stddev"} Figure of merit to use: sum of squared residuals or stddev of residual pixels. options: dict, optional The scipy.optimize.minimize options. hpf: bool, optional Whether to high-pass filter images before searching for optimal scaling factors. Can help to not be biased by a diffuse halo, and just leverage speckle expansion. fwhm_max: float, optional Maximum FWHM of the PSF across all wavelengths, in pixels. Only used if hpf is set to True. **kwargs: optional Optional arguments to the scipy.optimize.minimize function Returns ------- scal_vec: numpy ndarray, 1d Vector containing the scaling factors (after correction to comply with the condition >= 1). flux_vec: numpy ndarray, 1d [only returned if nfp==2] Vector containing the associated flux factors. """ scal_vec_ini = lbdas[-1] / lbdas n_z = len(lbdas) if n_z != len(fluxes) or n_z != cube.shape[0]: msg = "first axis of cube, fluxes and lbda must have same length" raise TypeError(msg) if simplex_options is None: simplex_options = {"xatol": 1e-6, "fatol": 1e-6, "maxiter": 800, "maxfev": 2000} scal_vec = np.ones(n_z) flux_vec = np.ones(n_z) array = cube.copy() if hpf: med_sz = int(5 * fwhm_max) if not med_sz % 2: med_sz += 1 array = cube_filter_highpass(cube, mode="median-subt", median_size=med_sz) for z in range(n_z - 1): flux_scal = fluxes[-1] / fluxes[z] cube_tmp = np.array([array[z], array[-1]]) if nfp == 1: p_ini = (scal_vec_ini[z],) solu = minimize( _chisquare_scal, p_ini, args=(cube_tmp, flux_scal, mask, fm, imlib, interpolation), method="Nelder-Mead", bounds=((1e-1, None),), options=simplex_options, **kwargs ) (scal_fac,) = solu.x flux_fac = flux_scal else: p_ini = (scal_vec_ini[z], flux_scal) solu = minimize( _chisquare_scal_2fp, p_ini, args=(cube_tmp, mask, fm, imlib, interpolation), method="Nelder-Mead", options=simplex_options, bounds=((1e-1, None), (1e-2, None)), **kwargs ) scal_fac, flux_fac = solu.x if debug: print("channel {:.0f}:".format(z), solu.x) scal_vec[z] = scal_fac flux_vec[z] = flux_fac scal_vec = check_scal_vector(scal_vec) return scal_vec, flux_vec
def _find_indices_sdi( scal, dist, index_ref, fwhm, delta_sep=1, nframes=None, debug=False ): """ Find optimal wavelengths which minimize self-subtraction in model PSF subtraction. Parameters ---------- scal : numpy ndarray or list Vector with the scaling factors. dist : float Separation or distance (in pixels) from the center of the array. index_ref : int The spectral channel index for which we are finding the indices of suitable spectral channels for the model PSF. fwhm : float Mean FWHM of all the wavelengths (in pixels). delta_sep : float, optional The threshold separation in terms of the mean FWHM. nframes : None or int, optional Must be an even value. In not None, then between 2 and adjacent ``nframes`` are kept. debug : bool, optional It True it prints out debug information. Returns ------- indices : numpy ndarray List of good indices. """ scal = np.asarray(scal) scal_ref = scal[index_ref] sep_lft = (scal_ref - scal) / scal_ref * ((dist + fwhm * delta_sep) / fwhm) sep_rgt = (scal - scal_ref) / scal_ref * ((dist - fwhm * delta_sep) / fwhm) map_lft = sep_lft >= delta_sep map_rgt = sep_rgt >= delta_sep indices = np.nonzero(map_lft | map_rgt)[0] if debug: print("dist: {}, index_ref: {}".format(dist, index_ref)) print("sep_lft:", " ".join(["{:+.2f}".format(x) for x in sep_lft])) print("sep_rgt:", " ".join(["{:+.2f}".format(x) for x in sep_rgt])) print("indices:", indices) print("indices size: {}".format(indices.size)) if indices.size == 0: raise RuntimeError( "No frames left after radial motion threshold. Try " "decreasing the value of `delta_sep`" ) if nframes is not None: i1 = map_lft.sum() window = nframes // 2 if i1 - window < 0 or i1 + window > indices[-1]: window = nframes ind1 = max(0, i1 - window) ind2 = min(scal.size, i1 + window) indices = indices[ind1:ind2] if indices.size < 2: raise RuntimeError( "No frames left after radial motion threshold. " "Try decreasing the value of `delta_sep` or " "`nframes`" ) if debug: print("indices (nframes):", indices) return indices def _chisquare_scal( modelParameters, cube, flux_fac=1, mask=None, fm="sum", imlib="vip-fft", interpolation="lanczos4", ): r""" Calculate the reduced math:`\chi^2`: .. math:: \chi^2_r = \frac{1}{N-3}\sum_{j=1}^{N} |I_j|, where N is the number of pixels in the image (or mask if provided), and :math:`I_j` the j-th pixel intensity, considering one free parameter: the physical scaling factor between images of the cube, for a given input flux scaling factor. Parameters ---------- modelParameters: tuple The model parameters, typically (scal_fac, flux_fac). cube: numpy.array The cube of fits images expressed as a numpy.array. flux_fac: mask: 2D-array, opt Binary mask, with ones where the residual intensities should be evaluated. If None is provided, the whole field is used. fm: str, opt, {"sum","stddev"} Figure of merit to use: sum of squared residuals or stddev of residual pixels. Returns ------- chi: float The reduced chi squared. """ # rescale in flux and spatially array = cube.copy() # scale_fac, flux_fac = modelParameters (scale_fac,) = modelParameters array[0] *= flux_fac scaling_list = np.array([scale_fac, 1]) array = cube_rescaling( array, scaling_list, imlib=imlib, interpolation=interpolation ) frame = array[1] - array[0] if mask is None: mask = np.ones_like(frame) if fm == "sum": chi = np.sum(np.power(frame[np.where(mask)], 2)) elif fm == "stddev": values = frame[np.where(mask)] values = values[values != 0] chi = np.std(values) else: raise RuntimeError("fm choice not recognized.") return chi def _chisquare_scal_2fp( modelParameters, cube, mask=None, fm="sum", imlib="vip-fft", interpolation="lanczos4", ): r""" Calculate the reduced :math:`\chi^2`: .. math:: \chi^2_r = \frac{1}{N-3}\sum_{j=1}^{N} |I_j|, where N is the number of pixels within a circular aperture centered on the first estimate of the planet position, and :math:`I_j` the j-th pixel intensity. Two free parameters: physical and flux scaling factors. Parameters ---------- modelParameters: tuple The model parameters, typically (scal_fac, flux_fac). cube: numpy.array The cube of fits images expressed as a numpy.array. mask: 2D-array, opt Binary mask, with ones where the residual intensities should be evaluated. If None is provided, the whole field is used. fm: str, opt, {"sum","stddev"} Figure of merit to use: sum of squared residuals or stddev of residual pixels. Returns ------- chi: float The reduced chi squared. """ # rescale in flux and spatially array = cube.copy() scale_fac, flux_fac = modelParameters array[0] *= flux_fac scaling_list = np.array([scale_fac, 1]) array = cube_rescaling( array, scaling_list, imlib=imlib, interpolation=interpolation ) frame = array[1] - array[0] if mask is None: mask = np.ones_like(frame) if fm == "sum": chi = np.sum(np.power(frame[np.where(mask)], 2)) elif fm == "stddev": values = frame[np.where(mask)] values = values[values != 0] chi = np.std(values) else: raise RuntimeError("fm choice not recognized.") return chi
[docs] def scale_fft(array, scale, ori_dim=False): """ Resample the frames of a cube with a single scale factor using a FFT-based method. Parameters ---------- array : 3d numpy ndarray Input cube, 3d array. scale : int or float Scale factor for upsampling or downsampling the frames in the cube. If a tuple it corresponds to the scale along x and y. ori_dim: bool, opt Whether to crop/pad scaled array in order to have the output with the same dimensions as the input array. By default, the x,y dimensions of the output are the closest integer to scale*dim_input, with the same parity as the input. Returns ------- array_resc : numpy ndarray Output cube with resampled frames. """ if scale == 1: return array dim = array.shape[0] # even square dtype = array.dtype.kind kd_array = np.arange(dim / 2 + 1, dtype=int) # scaling factor chosen as *close* as possible to N''/N', where: # N' = N + 2*KD (N': dim after FT) # N" = N + 2*KF (N'': dim after FT-1 of FT image), # => N" = 2*round(N'*sc/2) # => KF = (N"-N)/2 = round(N'*sc/2 - N/2) # = round(N/2*(sc-1) + KD*sc) # We call yy=N/2*(sc-1) +KD*sc yy = dim / 2 * (scale - 1) + kd_array.astype(float) * scale # We minimize the difference between the `ideal' N" and its closest # integer value by minimizing |yy-int(yy)|. kf_array = np.round(yy).astype(int) tmp = np.abs(yy - kf_array) imin = np.nanargmin(tmp) kd_io = kd_array[imin] kf_io = kf_array[imin] # Extract a part of array and place into dim_p array dim_p = int(dim + 2 * kd_io) tmp = np.zeros((dim_p, dim_p), dtype=dtype) tmp[kd_io : kd_io + dim, kd_io : kd_io + dim] = array # Fourier-transform the larger array array_f = np.fft.fftshift(np.fft.fft2(tmp)) # Extract a part of, or expand, the FT to dim_pp pixels dim_pp = int(dim + 2 * kf_io) if dim_pp > dim_p: tmp = np.zeros((dim_pp, dim_pp), dtype=complex) tmp[ (dim_pp - dim_p) // 2 : (dim_pp + dim_p) // 2, (dim_pp - dim_p) // 2 : (dim_pp + dim_p) // 2, ] = array_f else: tmp = array_f[ kd_io - kf_io : kd_io - kf_io + dim_pp, kd_io - kf_io : kd_io - kf_io + dim_pp, ] # inverse Fourier-transform the FT tmp = np.fft.ifft2(np.fft.fftshift(tmp)) array_resc = tmp.real del tmp # Extract a part of or expand the scaled image to desired number of pixels dim_resc = int(round(scale * dim)) if dim_resc > dim and dim_resc % 2 != dim % 2: dim_resc += 1 elif dim_resc < dim and dim_resc % 2 != dim % 2: dim_resc -= 1 # for reversibility if not ori_dim and dim_pp > dim_resc: array_resc = array_resc[ (dim_pp - dim_resc) // 2 : (dim_pp + dim_resc) // 2, (dim_pp - dim_resc) // 2 : (dim_pp + dim_resc) // 2, ] elif not ori_dim and dim_pp <= dim_resc: array = np.zeros((dim_resc, dim_resc)) array[ (dim_resc - dim_pp) // 2 : (dim_resc + dim_pp) // 2, (dim_resc - dim_pp) // 2 : (dim_resc + dim_pp) // 2, ] = array_resc array_resc = array elif dim_pp > dim: array_resc = array_resc[kf_io : kf_io + dim, kf_io : kf_io + dim] elif dim_pp <= dim: scaled = array * 0 scaled[-kf_io : -kf_io + dim_pp, -kf_io : -kf_io + dim_pp] = array_resc array_resc = scaled return array_resc