Source code for vip_hci.fm.fakecomp

#! /usr/bin/env python
"""Module with fake companion injection functions."""

__author__ = 'Carlos Alberto Gomez Gonzalez, Valentin Christiaens'
__all__ = ['collapse_psf_cube',
           'normalize_psf',
           'cube_inject_companions',
           'generate_cube_copies_with_injections',
           'frame_inject_companion']

from multiprocessing import cpu_count
import numpy as np
from scipy import stats
from scipy.interpolate import interp1d
from packaging import version
import photutils
try:
    from photutils.aperture import aperture_photometry, CircularAperture
except:
    from photutils import aperture_photometry, CircularAperture
if version.parse(photutils.__version__) >= version.parse('0.3'):
    # for photutils version >= '0.3' use photutils.centroids.centroid_com
    from photutils.centroids import centroid_com as cen_com
else:
    # for photutils version < '0.3' use photutils.centroid_com
    import photutils.centroid_com as cen_com
from ..preproc import (cube_crop_frames, frame_shift, frame_crop, cube_shift,
                       frame_rotate)
from ..var import (frame_center, fit_2dgaussian, fit_2dairydisk, fit_2dmoffat,
                   get_circle, get_annulus_segments, dist_matrix)
from ..config.utils_conf import print_precision, check_array, pool_map, iterable


[docs] def cube_inject_companions(array, psf_template, angle_list, flevel, rad_dists, plsc=None, n_branches=1, theta=0, imlib='vip-fft', interpolation='lanczos4', transmission=None, radial_gradient=False, full_output=False, verbose=False, nproc=1): """Inject fake companions in branches and given radial distances in an ADI\ cube. Parameters ---------- array : 3d/4d numpy ndarray Input cube. This is copied before the injections take place, so ``array`` is never modified. psf_template : 2d/3d numpy ndarray [for a 3D input array] 2d array with the normalized PSF template, with an odd or even shape. The PSF image must be centered wrt to the array. Therefore, it is recommended to run the function ``normalize_psf`` to generate a centered and flux-normalized PSF template. It can also be a 3D array, but length should match that of ADI cube. [for a 4D input array] In the ADI+mSDI case, it must be a 3d array (matching spectral dimensions). angle_list : 1d numpy ndarray List of parallactic angles, in degrees. flevel : float or 1d array or 2d array Factor for controlling the brightness of the fake companions. If a float, the same flux is used for all injections. [3D input cube]: if a list/1d array is provided, it should have same length as number of frames in the 3D cube (can be used to take into account varying observing conditions or airmass). [4D (ADI+mSDI) input cube]: if a list/1d array should have the same length as the number of spectral channels (i.e. provide a spectrum). If a 2d array, it should be n_wavelength x n_frames (can e.g. be used to inject a spectrum in varying conditions). rad_dists : float, list or array 1d Vector of radial distances of fake companions in pixels. plsc : float or None Value of the plsc in arcsec/px. Only used for printing debug output when ``verbose=True``. n_branches : int, optional Number of azimutal branches. theta : float, optional Angle in degrees for rotating the position of the first branch that by default is located at zero degrees. Theta counts counterclockwise from the positive x axis. imlib : str, optional See the documentation of the ``vip_hci.preproc.frame_shift`` function. interpolation : str, optional See the documentation of the ``vip_hci.preproc.frame_shift`` function. transmission: numpy array, optional Radial transmission of the coronagraph, if any. Array with either 2 x n_rad, 1+n_ch x n_rad columns. The first column should contain the radial separation in pixels, while the other column(s) are the corresponding off-axis transmission (between 0 and 1), for either all, or each spectral channel (only relevant for a 4D input cube). radial_gradient: bool, optional Whether to apply a radial gradient to the psf image at the moment of injection. By default False, i.e. the flux of the psf image is scaled only considering the value of tramnsmission at the exact radius the companion is injected. Setting it to False may better represent the transmission at the very edge of a physical mask though. full_output : bool, optional Returns the ``x`` and ``y`` coordinates of the injections, additionally to the new array. verbose : bool, optional If True prints out additional information. nproc: int or None, optional Number of CPUs to use for multiprocessing. If None, will be automatically set to half the number of available CPUs. Returns ------- array_out : numpy ndarray Output array with the injected fake companions. positions : list of tuple(y, x) [full_output] Coordinates of the injections in the first frame (and first wavelength for 4D cubes). psf_trans: numpy ndarray [full_output & transmission != None] Array with injected psf affected by transmission (serves to check radial transmission) """ if nproc is None: nproc = cpu_count()//2 def _cube_inject_adi(array, psf_template, angle_list, flevel, plsc, rad_dists, n_branches=1, theta=0, imlib='vip-fft', interpolation='lanczos4', transmission=None, radial_gradient=False, verbose=False): if np.isscalar(flevel): flevel = np.ones_like(angle_list)*flevel if transmission is not None: # last radial separation should be beyond the edge of frame interp_trans = interp1d(transmission[0], transmission[1]) ceny, cenx = frame_center(array[0]) nframes = array.shape[-3] size_fc = psf_template.shape[-1] positions = [] # fake companion cube fc_fr = np.zeros([nframes, size_fc, size_fc]) if psf_template.ndim == 2: for fr in range(nframes): fc_fr[fr] = psf_template else: for fr in range(nframes): fc_fr[fr] = psf_template[fr] psf_trans = None array_out = array.copy() for branch in range(n_branches): ang = (branch * 2 * np.pi / n_branches) + np.deg2rad(theta) if verbose: print('Branch {}:'.format(branch+1)) for rad in rad_dists: fc_fr_rad = fc_fr.copy() if transmission is not None: if radial_gradient: y_star = pceny x_star = pcenx - rad d = dist_matrix(size_fc, x_star, y_star) for i in range(d.shape[0]): fc_fr_rad[:, i] = interp_trans(d[i])*fc_fr[:, i] # check the effect of transmission on a single PSF tmp psf_trans = frame_rotate(fc_fr_rad[0], -(ang*180/np.pi-angle_list[0]), imlib=imlib_rot, interpolation=interpolation) else: fc_fr_rad = interp_trans(rad)*fc_fr if nproc == 1: for fr in range(nframes): array_out[fr] += _frame_shift_fcp(fc_fr_rad[fr], array[fr], rad, ang, angle_list[fr], flevel[fr], size_fc, imlib_sh, imlib_rot, interpolation, transmission, radial_gradient) else: res = pool_map(nproc, _frame_shift_fcp, iterable(fc_fr_rad), iterable(array), rad, ang, iterable(angle_list), iterable(flevel), size_fc, imlib_sh, imlib_rot, interpolation, transmission, radial_gradient) array_out += np.array(res) pos_y = rad * np.sin(ang) + ceny pos_x = rad * np.cos(ang) + cenx positions.append((pos_y, pos_x)) if verbose: rad_arcs = rad * plsc print('\t(X,Y)=({:.2f}, {:.2f}) at {:.2f} arcsec ' '({:.2f} pxs from center)'.format(pos_x, pos_y, rad_arcs, rad)) return array_out, positions, psf_trans check_array(array, dim=(3, 4), msg="array") check_array(psf_template, dim=(2, 3), msg="psf_template") nframes = array.shape[-3] pceny, pcenx = frame_center(psf_template) if array.ndim == 4 and psf_template.ndim != 3: raise ValueError('`psf_template` must be a 3d array') if verbose and not np.isscalar(plsc): raise TypeError("`plsc` must be a scalar") if not np.isscalar(flevel): if len(flevel) != array.shape[0]: msg = "if not scalar `flevel` must have same length as array" raise TypeError(msg) # set imlib for rotation & shift (rotation used if transmission!=None) if imlib == 'opencv': imlib_sh = imlib imlib_rot = imlib elif imlib == 'skimage' or imlib == 'ndimage-interp': imlib_sh = 'ndimage-interp' imlib_rot = 'skimage' elif imlib == 'vip-fft' or imlib == 'ndimage-fourier': imlib_sh = imlib imlib_rot = 'vip-fft' else: raise TypeError("Interpolation not recognized.") rad_dists = np.asarray(rad_dists).reshape(-1) # forces ndim=1 if not rad_dists[-1] < array.shape[-1] / 2: raise ValueError('rad_dists last location is at the border (or ' 'outside) of the field') if transmission is not None: t_nz = transmission.shape[0] if transmission.ndim != 2: raise ValueError("transmission should be a 2D ndarray") elif t_nz != 2 and t_nz != 1+array.shape[0]: msg = "transmission dimensions should be (2,N) or (n_wave+1, N)" raise ValueError(msg) # if transmission doesn't have right format for interpolation, adapt it diag = np.sqrt(2)*array.shape[-1] if transmission[0, 0] != 0 or transmission[0, -1] < diag: trans_rad_list = transmission[0].tolist() for j in range(t_nz-1): trans_list = transmission[j+1].tolist() # should have a zero point if transmission[0, 0] != 0: if j == 0: trans_rad_list = [0]+trans_rad_list trans_list = [0]+trans_list # last point should be max possible distance between fc and star if transmission[0, -1] < np.sqrt(2)*array.shape[-1]/2: if j == 0: trans_rad_list = trans_rad_list+[diag] trans_list = trans_list+[1] if j == 0: ntransmission = np.zeros([t_nz, len(trans_rad_list)]) ntransmission[0] = trans_rad_list ntransmission[j+1] = trans_list transmission = ntransmission.copy() # ADI case if array.ndim == 3: res = _cube_inject_adi(array, psf_template, angle_list, flevel, plsc, rad_dists, n_branches, theta, imlib, interpolation, transmission, radial_gradient, verbose) array_out, positions, psf_trans = res # ADI+mSDI (IFS) case else: nframes_wav = array.shape[0] array_out = array.copy() if np.isscalar(flevel): flevel_all = np.ones([nframes_wav, nframes])*flevel elif flevel.ndim == 1: flevel_all = np.zeros([nframes_wav, nframes]) for i in range(nframes_wav): flevel_all[i, :] = flevel[i] else: flevel_all = flevel for i in range(nframes_wav): if verbose: msg = "*** Processing spectral channel {}/{} ***" print(msg.format(i+1, nframes_wav)) if transmission is None: trans = None elif transmission.shape[0] == 2: trans = transmission elif transmission.shape[0] == nframes_wav+1: trans = np.array([transmission[0], transmission[i+1]]) else: msg = "transmission shape ({}, {}) is not valid" raise TypeError(msg.format(transmission.shape[0], transmission.shape[1])) res = _cube_inject_adi(array[i], psf_template[i], angle_list, flevel_all[i], plsc, rad_dists, n_branches, theta, imlib, interpolation, trans, radial_gradient, verbose=(i == 0 & verbose is True)) array_out[i], positions, psf_trans = res if full_output: if transmission is not None: return array_out, positions, psf_trans else: return array_out, positions else: return array_out
def _frame_shift_fcp(fc_fr_rad, array, rad, ang, derot_ang, flevel, size_fc, imlib_sh, imlib_rot, interpolation, transmission, radial_gradient): """Specific cube shift algorithm to inject fake companions.""" ceny, cenx = frame_center(array) sizey = array.shape[-2] sizex = array.shape[-1] array_sh = np.zeros_like(array) w = int(np.ceil(size_fc/2)) if size_fc % 2: # new convention w -= 1 sty = int(ceny) - w stx = int(cenx) - w shift_y = rad * np.sin(ang - np.deg2rad(derot_ang)) shift_x = rad * np.cos(ang - np.deg2rad(derot_ang)) if transmission is not None and radial_gradient: fc_fr_ang = frame_rotate(fc_fr_rad, -(ang*180/np.pi - derot_ang), imlib=imlib_rot, interpolation=interpolation) else: fc_fr_ang = fc_fr_rad.copy() # sub-px shift (within PSF template frame) dsy = shift_y-int(shift_y) dsx = shift_x-int(shift_x) fc_fr_ang = frame_shift(fc_fr_ang, dsy, dsx, imlib_sh, interpolation, border_mode='constant') # integer shift (in final cube) y0 = sty+int(shift_y) x0 = stx+int(shift_x) yN = y0+size_fc xN = x0+size_fc p_y0 = 0 p_x0 = 0 p_yN = size_fc p_xN = size_fc if y0 < 0: p_y0 = -y0 y0 = 0 if x0 < 0: p_x0 = -x0 x0 = 0 if yN > sizey: p_yN -= yN-sizey yN = sizey if xN > sizex: p_xN -= xN-sizex xN = sizex array_sh[y0:yN, x0:xN] = flevel*fc_fr_ang[p_y0:p_yN, p_x0:p_xN] return array_sh
[docs] def generate_cube_copies_with_injections(array, psf_template, angle_list, plsc, n_copies=100, inrad=8, outrad=12, dist_flux=("uniform", 2, 500)): """ Create multiple copies of ``array`` with different random injections. This is a wrapper around ``metrics.cube_inject_companions``, which deals with multiple copies of the original data cube and generates random parameters. Parameters ---------- array : 3d/4d numpy ndarray Original input cube. psf_template : 2d/3d numpy ndarray Array with the normalized psf template. It should have an odd shape. It's recommended to run the function ``normalize_psf`` to get a proper PSF template. In the ADI+mSDI case it must be a 3d array. angle_list : 1d numpy ndarray List of parallactic angles, in degrees. plsc : float Value of the plsc in arcsec/px. Only used for printing debug output when ``verbose=True``. n_copies : int This is the number of 'cube copies' returned. inrad,outrad : float Inner and outer radius of the injections. The actual injection position is chosen randomly. dist_flux : tuple('method', params) Tuple describing the flux selection. Method can be a function, the ``*params`` are passed to it. Method can also be a string, for a pre-defined random function: ``('skewnormal', skew, mean, var)`` uses scipy.stats.skewnorm.rvs ``('uniform', low, high)`` uses np.random.uniform ``('normal', loc, scale)`` uses np.random.normal Returns ------- fake_data : dict Represents a copy of the original ``array``, with fake injections. The dictionary keys are: ``cube`` Array shaped like the input ``array``, with the fake injections. ``position`` : list of tuples(y,x) List containing the positions of the injected companions, as (y,x) tuples. ``dist`` : float The distance of the injected companions, which was passed to ``cube_inject_companions``. ``theta`` : float, degrees The initial angle, as passed to ``cube_inject_companions``. ``flux`` : float The flux passed to ``cube_inject_companions``. """ # TODO: 'mask' parameter for known companions? width = outrad - inrad yy, xx = get_annulus_segments(array[0], inrad, width)[0] num_patches = yy.shape[0] # Defining Fluxes according to chosen distribution dist_fkt = dict(skewnormal=stats.skewnorm.rvs, normal=np.random.normal, uniform=np.random.uniform).get(dist_flux[0], dist_flux[0]) fluxes = sorted(dist_fkt(*dist_flux[1:], size=n_copies)) inds_inj = np.random.randint(0, num_patches, size=n_copies) # Injections for n in range(n_copies): injx = xx[inds_inj[n]] - frame_center(array[0])[1] injy = yy[inds_inj[n]] - frame_center(array[0])[0] dist = np.sqrt(injx**2 + injy**2) theta = np.mod(np.arctan2(injy, injx) / np.pi * 180, 360) fake_cube, positions = cube_inject_companions( array, psf_template, angle_list, plsc=plsc, flevel=fluxes[n], theta=theta, rad_dists=dist, n_branches=1, # TODO: multiple injections? full_output=True, verbose=False ) yield dict( positions=positions, dist=dist, theta=theta, flux=fluxes[n], cube=fake_cube )
[docs] def frame_inject_companion(array, array_fc, pos_y, pos_x, flux, imlib='vip-fft', interpolation='lanczos4'): """ Inject a fake companion in a single frame (can be a single multi-wavelength\ frame) at given coordinates, or in a cube (at the same coordinates, flux\ and with same fake companion image throughout the cube). Parameters ---------- array : numpy ndarray, 2d or 3d Input frame or cube. array_fc : numpy ndarray, 2d Fake companion image to be injected. If even-dimensions, the center should be placed at coordinates [dim//2, dim//2] (0-based indexing), as per VIP's convention. pos_y, pos_x: float Y and X coordinates where the companion should be injected flux : int Flux at which the fake companion should be injected (i.e. scaling factor for the injected image) imlib : str, optional See documentation of the ``vip_hci.preproc.frame_shift`` function. interpolation : str, optional See documentation of the ``vip_hci.preproc.frame_shift`` function. Returns ------- array_out : numpy ndarray Frame or cube with the companion injected """ if not (array.ndim == 2 or array.ndim == 3): raise TypeError('Array is not a 2d or 3d array.') if array.ndim == 2: size_fc = array_fc.shape[0] ceny, cenx = frame_center(array) ceny = int(ceny) cenx = int(cenx) fc_fr = np.zeros_like(array) w = int(np.floor(size_fc/2.)) odd = size_fc % 2 # fake companion in the center of a zeros frame fc_fr[ceny-w:ceny+w+odd, cenx-w:cenx+w+odd] = array_fc array_out = array + frame_shift(fc_fr, pos_y-ceny, pos_x-cenx, imlib, interpolation) * flux if array.ndim == 3: size_fc = array_fc.shape[1] ceny, cenx = frame_center(array[0]) ceny = int(ceny) cenx = int(cenx) fc_fr = np.zeros_like(array) w = int(np.floor(size_fc/2.)) odd = size_fc % 2 # fake companion in the center of a zeros frame fc_fr[:, ceny-w:ceny+w+odd, cenx-w:cenx+w+odd] = array_fc array_out = array + cube_shift(fc_fr, pos_y - ceny, pos_x - cenx, imlib, interpolation) * flux return array_out
[docs] def collapse_psf_cube(array, size, fwhm=4, verbose=True, collapse='mean'): """Create a 2d PSF template from a cube of non-saturated off-axis frames\ of the star by taking the mean and normalizing the PSF flux. Parameters ---------- array : numpy ndarray, 3d Input cube. size : int Size of the squared subimage. fwhm: float, optional The size of the Full Width Half Maximum in pixel. verbose : {True,False}, bool optional Whether to print to stdout information about file opening, cropping and completion of the psf template. collapse : {'mean','median'}, string optional Defines the way the frames are collapsed. Returns ------- psf_normd : numpy ndarray Normalized PSF. """ if array.ndim != 3 and array.ndim != 4: raise TypeError('Array is not a cube, 3d or 4d array.') n = array.shape[0] psf = cube_crop_frames(array, size=size, verbose=verbose) if collapse == 'mean': psf = np.mean(psf, axis=0) elif collapse == 'median': psf = np.median(psf, axis=0) else: raise TypeError('Collapse mode not recognized.') psf_normd = normalize_psf(psf, size=size, fwhm=fwhm) if verbose: print("Done scaled PSF template from the average of", n, "frames.") return psf_normd
[docs] def normalize_psf(array, fwhm='fit', size=None, threshold=None, mask_core=None, model='gauss', imlib='vip-fft', interpolation='lanczos4', force_odd=True, correct_outliers=True, full_output=False, verbose=True, debug=False): """Normalize a PSF (2d or 3d array), to have the flux in a 1xFWHM aperture\ equal to one. It also allows cropping of the array. Automatic recentering of the PSF is done internally - as long as it is already roughly centered within ~2px. Parameters ---------- array: numpy ndarray The PSF, 2d (ADI data) or 3d array (IFS data). fwhm: int, float, 1d array or str, optional The Full Width Half Maximum in pixels. It can handle a different FWHM value for different wavelengths (IFS data). If set to 'fit' then a ``model`` (assuming the PSF is centered in the array) is fitted to estimate the FWHM in 2D or 3D PSF arrays. size : int or None, optional If int it will correspond to the size of the centered sub-image to be cropped form the PSF array. The PSF is assumed to be roughly centered with respect to the array. threshold : None or float, optional Sets to zero values smaller than threshold (in the normalized image). This can be used to only leave the core of the PSF. mask_core : None or float, optional Sets the radius of a circular aperture for the core of the PSF, everything else will be set to zero. model : {'gauss', 'moff', 'airy'}, str optional The assumed model used to fit the PSF: either a Gaussian, a Moffat or an Airy 2d distribution. imlib : str, optional See the documentation of the ``vip_hci.preproc.frame_shift`` function. interpolation : str, optional See the documentation of the ``vip_hci.preproc.frame_shift`` function. force_odd : bool, optional If True the resulting array will have odd size (and the PSF will be placed at its center). If False, and the frame size is even, then the PSF will be put at the center of an even-sized frame. correct_outliers: bool, optional For an input 3D cube (IFS) of PSFs, if the 2D fit fails for one of the channels, whether to interpolate FWHM value from surrounding channels, and recalculate flux and normalization. full_output : bool, optional If True the flux in a FWHM aperture is returned along with the normalized PSF. verbose : bool, optional If True intermediate results are printed out. debug : bool, optional If True the fitting will output additional information and a diagnostic plot will be shown (this might cause a long output if ``array`` is 3d and has many slices). Returns ------- psf_norm : numpy ndarray The normalized PSF (2d or 3d array). fwhm_flux : numpy ndarray [full_output=True] The flux in a FWHM aperture (it can be a single value or a vector). fwhm : numpy ndarray [full_output=True] The FWHM size. If ``fwhm`` is set to 'fit' then it is the fitted FWHM value according to the assumed ``model`` (the mean in X and Y is returned when ``model`` is set to 'gauss'). """ def psf_norm_2d(psf, fwhm, threshold, mask_core, full_output, verbose): """Normalize PSF in the 2d case.""" # we check if the psf is centered and fix it if needed cy, cx = frame_center(psf, verbose=False) xcom, ycom = cen_com(psf) if not (np.allclose(cy, ycom, atol=1e-2) or np.allclose(cx, xcom, atol=1e-2)): # first we find the centroid and put it in the center of the array centry, centrx = fit_2d(psf, full_output=False, debug=False) if not np.isnan(centry) and not np.isnan(centrx): shiftx, shifty = centrx - cx, centry - cy psf = frame_shift(psf, -shifty, -shiftx, imlib=imlib, interpolation=interpolation) for _ in range(2): centry, centrx = fit_2d(psf, full_output=False, debug=False) if np.isnan(centry) or np.isnan(centrx): break cy, cx = frame_center(psf, verbose=False) shiftx, shifty = centrx - cx, centry - cy psf = frame_shift(psf, -shifty, -shiftx, imlib=imlib, interpolation=interpolation) # we check whether the flux is normalized and fix it if needed fwhm_aper = CircularAperture((cx, cy), fwhm/2) fwhm_aper_phot = aperture_photometry(psf, fwhm_aper, method='exact') fwhm_flux = np.array(fwhm_aper_phot['aperture_sum']) if fwhm_flux > 1.1 or fwhm_flux < 0.9: psf_norm_array = psf / np.array(fwhm_aper_phot['aperture_sum']) else: psf_norm_array = psf if threshold is not None: psf_norm_array[np.where(psf_norm_array < threshold)] = 0 if mask_core is not None: psf_norm_array = get_circle(psf_norm_array, radius=mask_core) if verbose: print("Flux in 1xFWHM aperture: {:.3f}".format(fwhm_flux[0])) if full_output: return psf_norm_array, fwhm_flux, fwhm else: return psf_norm_array ########################################################################### if model == 'gauss': fit_2d = fit_2dgaussian elif model == 'moff': fit_2d = fit_2dmoffat elif model == 'airy': fit_2d = fit_2dairydisk else: raise ValueError('`Model` not recognized') if array.ndim == 2: y, x = array.shape if size is not None: if force_odd and size % 2 == 0: size += 1 msg = "`Force_odd` is True therefore `size` was set to {}" print(msg.format(size)) else: if force_odd and y % 2 == 0: size = y - 1 msg = "`Force_odd` is True and frame size is even, therefore " msg += "new frame size was set to {}" print(msg.format(size)) if size is not None: if size < array.shape[0]: array = frame_crop(array, size, force=True, verbose=False) else: array = array.copy() else: array = array.copy() if not np.isscalar(fwhm): msg = "For a 2d input array, fwhm should be a scalar or string." raise ValueError(msg) elif fwhm == 'fit': fit = fit_2d(array, full_output=True, debug=debug) if model == 'gauss': fwhm = np.mean((fit['fwhm_x'], fit['fwhm_y'])) if verbose: print("\nMean FWHM: {:.3f}".format(fwhm)) elif model == 'moff' or model == 'airy': fwhm = fit.fwhm.at[0] if verbose: print("FWHM: {:.3f}".format(fwhm)) res = psf_norm_2d(array, fwhm, threshold, mask_core, full_output, verbose) return res elif array.ndim == 3: n, y, x = array.shape if size is not None: if force_odd and size % 2 == 0: size += 1 msg = "`Force_odd` is True therefore `size` was set to {}" print(msg.format(size)) else: if force_odd and y % 2 == 0: size = y - 1 msg = "`Force_odd` is True and frame size is even, therefore " msg += "new frame size was set to {}" print(msg.format(size)) if size is not None: if size < array.shape[1]: array = cube_crop_frames(array, size, force=True, verbose=False) else: array = array.copy() if np.isscalar(fwhm): if fwhm != 'fit': fwhm = [fwhm] * array.shape[0] else: fits_vect = [fit_2d(array[i], full_output=True, debug=debug) for i in range(n)] if model == 'gauss': fwhmx = [fits_vect[i]['fwhm_x'] for i in range(n)] fwhmy = [fits_vect[i]['fwhm_y'] for i in range(n)] fwhm_vect = [np.mean((fwhmx[i], fwhmy[i])) for i in range(n)] fwhm = np.array(fwhm_vect) if verbose: print("Mean FWHM per channel: ") print_precision(fwhm) elif model == 'moff' or model == 'airy': fwhm_vect = [fits_vect[i]['fwhm'] for i in range(n)] fwhm = np.array(fwhm_vect) fwhm = fwhm.flatten() if verbose: print("FWHM per channel:") print_precision(fwhm) # Replace outliers if needed if correct_outliers: if np.sum(np.isnan(fwhm)) > 0: for f in range(n): if np.isnan(fwhm[f]) and f != 0 and f != n-1: fwhm[f] = np.nanmean(np.array([fwhm[f-1], fwhm[f+1]])) elif np.isnan(fwhm[f]): msg = "2D fit failed for first or last channel." msg += " Try other parameters?" raise ValueError(msg) elif len(fwhm) != array.shape[0]: msg = "If fwhm is a list/1darray it should have a length of {}" raise ValueError(msg.format(array.shape[0])) array_out = [] fwhm_flux = np.zeros(n) for fr in range(array.shape[0]): restemp = psf_norm_2d(array[fr], fwhm[fr], threshold, mask_core, True, False) array_out.append(restemp[0]) fwhm_flux[fr] = restemp[1] array_out = np.array(array_out) if verbose: print("Flux in 1xFWHM aperture: ") print_precision(fwhm_flux) if full_output: return array_out, fwhm_flux, fwhm else: return array_out else: msg = "Input psf should be 2D or 3D. If of higher dimension, either use" msg += " ``vip_hci.preproc.cube_collapse`` first, or loop on the " msg += "temporal axis." raise ValueError(msg.format(array.shape[0]))