Source code for vip_hci.preproc.derotation

#! /usr/bin/env python
"""
Module with frame de-rotation routine for ADI.

.. [LAR97]
   | Larkin et al. 1997
   | **Fast Fourier method for the accurate rotation of sampled images**
   | *Optics Communications, Volume 154, Issue 1-3, pp. 99-106*
   | `https://ui.adsabs.harvard.edu/abs/1997OptCo.139...99L
     <https://ui.adsabs.harvard.edu/abs/1997OptCo.139...99L>`_

"""
from multiprocessing import cpu_count

from skimage.transform import rotate

from ..config.utils_conf import iterable
from ..config.utils_conf import pool_map
from ..var import frame_center
from ..var import frame_filter_lowpass
from .cosmetics import frame_pad
__author__ = 'Carlos Alberto Gomez Gonzalez, Valentin Christiaens'
__all__ = ['cube_derotate',
           'frame_rotate',
           'rotate_fft']

from astropy.stats import sigma_clipped_stats
import numpy as np
from numpy.fft import fft, ifft, fftshift, fftfreq
import warnings
from astropy.utils.exceptions import AstropyWarning
# intentionally ignore NaN warnings from astropy - won't ignore other warnings
warnings.simplefilter('ignore', category=AstropyWarning)
try:
    import cv2
    no_opencv = False
except ImportError:
    msg = "Opencv python bindings are missing."
    warnings.warn(msg, ImportWarning)
    no_opencv = True


[docs] def frame_rotate(array, angle, imlib='vip-fft', interpolation='lanczos4', cxy=None, border_mode='constant', mask_val=np.nan, edge_blend=None, interp_zeros=False, ker=1): """Rotate a frame or 2D array. Parameters ---------- array : numpy ndarray Input image, 2d array. angle : float Rotation angle. imlib : {'opencv', 'skimage', 'vip-fft'}, str optional Library used for image transformations. Opencv is faster than skimage or 'vip-fft', but vip-fft slightly better preserves the flux in the image (followed by skimage with a biquintic interpolation). 'vip-fft' corresponds to the FFT-based rotation method described in [LAR97]_, and implemented in this module. Best results are obtained with images without any sharp intensity change (i.e. no numerical mask). Edge-blending and/or zero-interpolation may help if sharp transitions are unavoidable. interpolation : str, optional [Only used for imlib='opencv' or imlib='skimage'] For Skimage the options are: 'nearneig', bilinear', 'biquadratic', 'bicubic', 'biquartic' or 'biquintic'. The 'nearneig' interpolation is the fastest and the 'biquintic' the slowest. The 'nearneig' is the poorer option for interpolation of noisy astronomical images. For Opencv the options are: 'nearneig', 'bilinear', 'bicubic' or 'lanczos4'. The 'nearneig' interpolation is the fastest and the 'lanczos4' the slowest and more accurate. 'lanczos4' is the default for Opencv and 'biquartic' for Skimage. cxy : float, optional Coordinates X,Y of the point with respect to which the rotation will be performed. By default the rotation is done with respect to the center of the frame. border_mode : {'constant', 'edge', 'symmetric', 'reflect', 'wrap'}, str opt Pixel extrapolation method for handling the borders. 'constant' for padding with zeros. 'edge' for padding with the edge values of the image. 'symmetric' for padding with the reflection of the vector mirrored along the edge of the array. 'reflect' for padding with the reflection of the vector mirrored on the first and last values of the vector along each axis. 'wrap' for padding with the wrap of the vector along the axis (the first values are used to pad the end and the end values are used to pad the beginning). Default is 'constant'. mask_val: flt, opt If any numerical mask in the image to be rotated, what are its values? Will only be used if a strategy to mitigate Gibbs effects is adopted - see below. edge_blend: str, opt {None,'noise','interp','noise+interp'} Whether to blend the edges, by padding nans then inter/extrapolate them with a gaussian filter. Slower but can significantly reduce ringing artefacts from Gibbs phenomenon, in particular if several consecutive rotations are involved in your image processing. - 'noise': pad with small amplitude noise inferred from neighbours - 'interp': interpolated from neighbouring pixels using Gaussian kernel. - 'noise+interp': sum both components above at masked locations. Original mask will be placed back after rotation. interp_zeros: bool, opt [only used if edge_blend is not None] Whether to interpolate zeros in the frame before (de)rotation. Not dealing with them can induce a Gibbs phenomenon near their location. However, this flag should be false if rotating a binary mask. ker: float, opt Size of the Gaussian kernel used for interpolation. Returns ------- array_out : numpy ndarray Resulting frame. """ if array.ndim != 2: raise TypeError('Input array is not a frame or 2d array') if edge_blend is None: edge_blend = '' if edge_blend != '' or imlib == 'vip-fft': # fill with nans cy_ori, cx_ori = frame_center(array) y_ori, x_ori = array.shape if np.isnan(mask_val): mask_ori = np.where(np.isnan(array)) else: mask_ori = np.where(array == mask_val) array_nan = array.copy() array_zeros = array.copy() if interp_zeros == 1 or mask_val != 0: # set to nans for interpolation array_nan[np.where(array == mask_val)] = np.nan else: array_zeros[np.where(np.isnan(array))] = 0 if 'noise' in edge_blend: # evaluate std and med far from the star, avoiding nans _, med, stddev = sigma_clipped_stats(array_nan, sigma=1.5, cenfunc=np.nanmedian, stdfunc=np.nanstd) # pad and interpolate, about 1.2x original size if imlib == 'vip-fft': fac = 1.5 else: fac = 1.1 new_y = int(y_ori*fac) new_x = int(x_ori*fac) if y_ori % 2 != new_y % 2: new_y += 1 if x_ori % 2 != new_x % 2: new_x += 1 array_prep = np.empty([new_y, new_x]) array_prep1 = np.zeros([new_y, new_x]) array_prep[:] = np.nan if 'interp' in edge_blend: array_prep2 = array_prep.copy() med = 0 # local level will be added with Gaussian kernel if 'noise' in edge_blend: array_prep = np.random.normal(loc=med, scale=stddev, size=(new_y, new_x)) cy, cx = frame_center(array_prep) y0_p = int(cy-cy_ori) y1_p = int(cy+cy_ori) if new_y % 2: y1_p += 1 x0_p = int(cx-cx_ori) x1_p = int(cx+cx_ori) if new_x % 2: x1_p += 1 if interp_zeros: array_prep[y0_p:y1_p, x0_p:x1_p] = array_nan.copy() array_prep1[y0_p:y1_p, x0_p:x1_p] = array_nan.copy() else: array_prep[y0_p:y1_p, x0_p:x1_p] = array_zeros.copy() # interpolate nans with a Gaussian filter if 'interp' in edge_blend: array_prep2[y0_p:y1_p, x0_p:x1_p] = array_nan.copy() cond1 = array_prep1 == 0 cond2 = np.isnan(array_prep2) new_nan = np.where(cond1 & cond2) mask_nan = np.where(np.isnan(array_prep2)) if not ker: ker = array_nan.shape[0]/5 ker2 = 1 array_prep_corr1 = frame_filter_lowpass(array_prep2, mode='gauss', fwhm_size=ker) if 'noise' in edge_blend: array_prep_corr2 = frame_filter_lowpass(array_prep2, mode='gauss', fwhm_size=ker2) ori_nan = np.where(np.isnan(array_prep1)) array_prep[ori_nan] = array_prep_corr2[ori_nan] array_prep[new_nan] += array_prep_corr1[new_nan] else: array_prep[mask_nan] = array_prep_corr1[mask_nan] # finally pad zeros for 4x larger images before FFT if imlib == 'vip-fft': array_prep, new_idx = frame_pad(array_prep, fac=4/fac, fillwith=0, full_output=True) y0 = new_idx[0]+y0_p y1 = new_idx[0]+y1_p x0 = new_idx[2]+x0_p x1 = new_idx[2]+x1_p else: y0 = y0_p y1 = y1_p x0 = x0_p x1 = x1_p else: array_prep = array.copy() # residual (non-interp) nans should be set to 0 to avoid bug in rotation array_prep[np.where(np.isnan(array_prep))] = 0 y, x = array_prep.shape if cxy is None: cy, cx = frame_center(array_prep) else: cx, cy = cxy if imlib == 'vip-fft' and (cy, cx) != frame_center(array_prep): msg = "'vip-fft'imlib does not yet allow for custom center to be " msg += " provided " raise ValueError(msg) if imlib == 'vip-fft': array_out = rotate_fft(array_prep, angle) elif imlib == 'skimage': if interpolation == 'nearneig': order = 0 elif interpolation == 'bilinear': order = 1 elif interpolation == 'biquadratic': order = 2 elif interpolation == 'bicubic': order = 3 elif interpolation == 'biquartic' or interpolation == 'lanczos4': order = 4 elif interpolation == 'biquintic': order = 5 else: raise ValueError('Skimage interpolation method not recognized') if border_mode not in ['constant', 'edge', 'symmetric', 'reflect', 'wrap']: raise ValueError('Skimage `border_mode` not recognized.') # for a non-constant image, normalize manually min_val = np.nanmin(array_prep) max_val = np.nanmax(array_prep) if min_val != max_val: norm = True im_temp = array_prep - min_val max_val = np.nanmax(im_temp) im_temp /= max_val else: norm = False im_temp = array_prep.copy() array_out = rotate(im_temp, angle, order=order, center=(cx, cy), cval=np.nan, mode=border_mode) if norm: array_out *= max_val array_out += min_val array_out = np.nan_to_num(array_out) elif imlib == 'opencv': if no_opencv: msg = 'Opencv python bindings cannot be imported. Install opencv or' msg += ' set imlib to skimage' raise RuntimeError(msg) if interpolation == 'bilinear': intp = cv2.INTER_LINEAR elif interpolation == 'bicubic': intp = cv2.INTER_CUBIC elif interpolation == 'nearneig': intp = cv2.INTER_NEAREST elif interpolation == 'lanczos4': intp = cv2.INTER_LANCZOS4 else: raise ValueError('Opencv interpolation method not recognized') if border_mode == 'constant': bormo = cv2.BORDER_CONSTANT # iiiiii|abcdefgh|iiiiiii elif border_mode == 'edge': bormo = cv2.BORDER_REPLICATE # aaaaaa|abcdefgh|hhhhhhh elif border_mode == 'symmetric': bormo = cv2.BORDER_REFLECT # fedcba|abcdefgh|hgfedcb elif border_mode == 'reflect': bormo = cv2.BORDER_REFLECT_101 # gfedcb|abcdefgh|gfedcba elif border_mode == 'wrap': bormo = cv2.BORDER_WRAP # cdefgh|abcdefgh|abcdefg else: raise ValueError('Opencv `border_mode` not recognized.') M = cv2.getRotationMatrix2D((cx, cy), angle, 1) array_out = cv2.warpAffine(array_prep.astype(np.float32), M, (x, y), flags=intp, borderMode=bormo) else: raise ValueError('Image transformation library not recognized') if edge_blend != '' or imlib == 'vip-fft': array_out = array_out[y0:y1, x0:x1] # remove padding array_out[mask_ori] = mask_val # mask again original masked values return array_out
[docs] def cube_derotate(array, angle_list, imlib='vip-fft', interpolation='lanczos4', cxy=None, nproc=1, border_mode='constant', mask_val=np.nan, edge_blend=None, interp_zeros=False, ker=1): """Rotate a cube (3d array or image sequence) providing a vector or\ corresponding angles. Serves for rotating an ADI sequence to a common north given a vector with the corresponding parallactic angles for each frame. Parameters ---------- array : numpy ndarray Input 3d array, cube. angle_list : list Vector containing the parallactic angles. imlib : str, optional See the documentation of the ``vip_hci.preproc.frame_rotate`` function. interpolation : str, optional See the documentation of the ``vip_hci.preproc.frame_rotate`` function. cxy : tuple of int, optional Coordinates X,Y of the point with respect to which the rotation will be performed. By default the rotation is done with respect to the center of the frames, as it is returned by the function vip_hci.var.frame_center. nproc : int, optional Whether to rotate the frames in the sequence in a multi-processing fashion. Only useful if the cube is significantly large (frame size and number of frames). border_mode : str, optional See the documentation of the ``vip_hci.preproc.frame_rotate`` function. mask_val: flt, opt See the documentation of the ``vip_hci.preproc.frame_rotate`` function. edge_blend : str, optional See the documentation of the ``vip_hci.preproc.frame_rotate`` function. interp_zeros : str, optional See the documentation of the ``vip_hci.preproc.frame_rotate`` function. ker: int, optional See the documentation of the ``vip_hci.preproc.frame_rotate`` function. Returns ------- array_der : numpy ndarray Resulting cube with de-rotated frames. """ if array.ndim != 3: raise TypeError('Input array is not a cube or 3d array.') n_frames = array.shape[0] if nproc is None: nproc = cpu_count() // 2 # Hyper-threading doubles the # of cores if nproc == 1: array_der = np.zeros_like(array) for i in range(n_frames): array_der[i] = frame_rotate(array[i], -angle_list[i], imlib=imlib, interpolation=interpolation, cxy=cxy, border_mode=border_mode, mask_val=mask_val, edge_blend=edge_blend, interp_zeros=interp_zeros, ker=ker) elif nproc > 1: res = pool_map(nproc, _frame_rotate_mp, iterable(array), iterable(-angle_list), imlib, interpolation, cxy, border_mode, mask_val, edge_blend, interp_zeros, ker) array_der = np.array(res) return array_der
def _frame_rotate_mp(frame, angle, imlib, interpolation, cxy, border_mode, mask_val, edge_blend, interp_zeros, ker): framerot = frame_rotate(frame, angle, imlib, interpolation, cxy, border_mode, mask_val, edge_blend, interp_zeros, ker) return framerot def _find_indices_adi(angle_list, frame, thr, nframes=None, out_closest=False, truncate=False, max_frames=200): """Return the indices to be left in frames library for annular ADI median\ subtraction, LOCI or annular PCA. Parameters ---------- angle_list : numpy ndarray, 1d Vector of parallactic angle (PA) for each frame. frame : int Index of the current frame for which we are applying the PA threshold. thr : float PA threshold. nframes : int or None, optional Exact number of indices to be left. For annular median-ADI subtraction, where we keep the closest frames (after the PA threshold). If None then all the indices are returned (after the PA threshold). out_closest : bool, optional If True then the function returns the indices of the 2 closest frames. truncate : bool, optional Useful for annular PCA, when we want to discard too far away frames and avoid increasing the computational cost. max_frames : int, optional Max number of indices to be left. To be provided if ``truncate`` is True (used e.g. in pca_annular). Returns ------- indices : numpy ndarray, 1d Vector with the indices left. If ``out_closest`` is True then the function returns instead: index_prev, index_foll """ n = angle_list.shape[0] index_prev = 0 index_foll = frame for i in range(0, frame): if np.abs(angle_list[frame] - angle_list[i]) < thr: index_prev = i break else: index_prev += 1 for k in range(frame, n): if np.abs(angle_list[k] - angle_list[frame]) > thr: index_foll = k break else: index_foll += 1 if out_closest: return index_prev, index_foll - 1 else: if nframes is not None: # For annular ADI median subtraction, returning n_frames closest # indices (after PA thresholding) window = nframes // 2 ind1 = index_prev - window ind1 = max(ind1, 0) ind2 = index_prev ind3 = index_foll ind4 = index_foll + window ind4 = min(ind4, n) indices = np.array(list(range(ind1, ind2)) + list(range(ind3, ind4)), dtype='int32') else: # For annular PCA, returning all indices (after PA thresholding) half1 = range(0, index_prev) half2 = range(index_foll, n) indices = np.array(list(half1) + list(half2), dtype='int32') # The goal is to keep min(num_frames/2, ntrunc) in the library after # discarding those based on the PA threshold if truncate: thr = min(n-1, max_frames) all_indices = np.array(list(half1)+list(half2)) if len(all_indices) > thr: # then truncate and update indices # first sort by dPA dPA = np.abs(angle_list[all_indices]-angle_list[frame]) sort_indices = all_indices[np.argsort(dPA)] # keep the ntrunc first ones good_indices = sort_indices[:thr] # sort again, this time by increasing indices indices = np.sort(good_indices) return indices def _compute_pa_thresh(ann_center, fwhm, delta_rot=1): """Compute the parallactic angle threshold [degrees]. Replacing approximation: delta_rot * (fwhm/ann_center) / np.pi * 180 """ return np.rad2deg(2 * np.arctan(delta_rot * fwhm / (2 * ann_center))) def _define_annuli(angle_list, ann, n_annuli, fwhm, radius_int, annulus_width, delta_rot, n_segments, verbose, strict=False): """Define and return the requested annuli geometry: parallactic angle\ threshold, inner radius and annulus center for each annulus.""" if ann == n_annuli - 1: inner_radius = radius_int + (ann * annulus_width - 1) else: inner_radius = radius_int + ann * annulus_width ann_center = inner_radius + (annulus_width / 2) pa_threshold = _compute_pa_thresh(ann_center, fwhm, delta_rot) mid_range = np.abs(np.amax(angle_list) - np.amin(angle_list)) / 2 if pa_threshold >= mid_range - mid_range * 0.1: new_pa_th = float(mid_range - mid_range * 0.1) msg = 'WARNING: PA threshold {:.2f} is too big, recommended ' msg += ' value for annulus {:.0f}: {:.2f}' if strict: print(msg.format(pa_threshold, ann, new_pa_th)) else: print('PA threshold {:.2f} is likely too big, will be set to ' '{:.2f}'.format(pa_threshold, new_pa_th)) pa_threshold = new_pa_th if verbose: if pa_threshold > 0: print('Ann {} PA thresh: {:5.2f} Ann center: ' '{:3.0f} N segments: {} '.format(ann + 1, pa_threshold, ann_center, n_segments)) else: print('Ann {} Ann center: {:3.0f} N segments: ' '{} '.format(ann + 1, ann_center, n_segments)) return pa_threshold, inner_radius, ann_center
[docs] def rotate_fft(array, angle): """Rotate a frame or 2D array using Fourier transforms. Rotation is equivalent to 3 consecutive linear shears, or 3 consecutive 1D FFT phase shifts. See details in [LAR97]_. Parameters ---------- array : numpy ndarray Input image, 2d array. angle : float Rotation angle. Returns ------- array_out : numpy ndarray Resulting frame. Note ---- This method is slower than interpolation methods (e.g. opencv/lanczos4 or ndimage), but preserves the flux better (by construction it preserves the total power). It is more prone to large-scale Gibbs artefacts, so make sure no sharp edge nor bad pixels are present in the image to be rotated. Note ---- Warning: if input frame has even dimensions, the center of rotation will NOT be between the 4 central pixels, instead it will be on the top right of those 4 pixels. Make sure your images are centered with respect to that pixel before rotation. """ y_ori, x_ori = array.shape while angle < 0: angle += 360 while angle > 360: angle -= 360 # first convert to odd size before multiple 90deg rotations if not y_ori % 2 or not x_ori % 2: array_in = np.zeros([array.shape[0]+1, array.shape[1]+1]) array_in[:-1, :-1] = array.copy() else: array_in = array.copy() if angle > 45: dangle = angle % 90 if dangle > 45: dangle = -(90-dangle) nangle = np.rint(angle/90) array_in = np.rot90(array_in, nangle) else: dangle = angle # remove last row and column to make it even size before FFT array_in = array_in[:-1, :-1] a = np.tan(np.deg2rad(dangle)/2) b = -np.sin(np.deg2rad(dangle)) ori_y, ori_x = array_in.shape cy, cx = frame_center(array) arr_xy = np.mgrid[0:ori_y, 0:ori_x] arr_y = arr_xy[0]-cy arr_x = arr_xy[1]-cx # TODO: make FFT padding work for other option than '0'. s_x = _fft_shear(array_in, arr_x, a, ax=1, pad=0) s_xy = _fft_shear(s_x, arr_y, b, ax=0, pad=0) s_xyx = _fft_shear(s_xy, arr_x, a, ax=1, pad=0) if y_ori % 2 or x_ori % 2: # set it back to original dimensions array_out = np.zeros([s_xyx.shape[0]+1, s_xyx.shape[1]+1]) array_out[:-1, :-1] = np.real(s_xyx) else: array_out = np.real(s_xyx) return array_out
def _fft_shear(arr, arr_ori, c, ax, pad=0, shift_ini=True): ax2 = 1-ax % 2 freqs = fftfreq(arr_ori.shape[ax2]) sh_freqs = fftshift(freqs) arr_u = np.tile(sh_freqs, (arr_ori.shape[ax], 1)) if ax == 1: arr_u = arr_u.T s_x = fftshift(arr) s_x = fft(s_x, axis=ax) s_x = fftshift(s_x) s_x = np.exp(-2j*np.pi*c*arr_u*arr_ori)*s_x s_x = fftshift(s_x) s_x = ifft(s_x, axis=ax) s_x = fftshift(s_x) return s_x