Source code for vip_hci.var.filters

#! /usr/bin/env python
"""
Module with frame/cube filtering functionalities.

.. [DAB15]
   | Dabbech et al. 2015
   | **MORESANE: MOdel REconstruction by Synthesis-ANalysis Estimators. A sparse
     deconvolution algorithm for radio interferometric imaging**
   | *The Astrophysical Journal, Volume 641, Issue 1, pp. 556-564*
   | `https://arxiv.org/abs/1412.5387
     <https://arxiv.org/abs/1412.5387>`_

.. [KEN15]
   | J. S. Kenyon 2015
   | **PyMORESANE**
   | *GitHub repository*
   | `https://github.com/ratt-ru/PyMORESANE
     <https://github.com/ratt-ru/PyMORESANE>`_

.. [LUC74]
   | L. Lucy 1974
   | **An iterative technique for the rectification of observed distributions**
   | *The Astronomical Journal, Volume 79, p. 745*
   | `https://ui.adsabs.harvard.edu/abs/1974AJ.....79..745L/abstract
     <https://ui.adsabs.harvard.edu/abs/1974AJ.....79..745L/abstract>`_

.. [RIC72]
   | W. H. Richardson 1972
   | **Bayesian-Based Iterative Method of Image Restoration**
   | *J. Opt. Soc. Am., Volume 27, pp. 1593-1607*
   | `https://opg.optica.org/josa/abstract.cfm?uri=josa-62-1-55
     <https://opg.optica.org/josa/abstract.cfm?uri=josa-62-1-55>`_

"""


__author__ = 'Carlos Alberto Gomez Gonzalez, Valentin Christiaens'
__all__ = ['frame_filter_highpass',
           'frame_filter_lowpass',
           'frame_deconvolution',
           'cube_filter_highpass',
           'cube_filter_lowpass',
           'cube_filter_iuwt']

import warnings
try:
    import cv2
    no_opencv = False
except ImportError:
    msg = "Opencv python bindings are missing."
    warnings.warn(msg, ImportWarning)
    no_opencv = True
import numpy as np
from scipy.ndimage import median_filter
from skimage.restoration import richardson_lucy
from astropy.convolution import (convolve_fft, convolve, Gaussian2DKernel)
from astropy.convolution import interpolate_replace_nans as interp_nan
from astropy.stats import gaussian_fwhm_to_sigma
from .iuwt import iuwt_decomposition
from ..config import Progressbar


[docs] def cube_filter_iuwt(cube, coeff=5, rel_coeff=1, full_output=False): """ Isotropic Undecimated Wavelet Transform filtering, as implemented in [KEN15]_ and detailed in [DAB15]_. Parameters ---------- cube : numpy ndarray Input cube. coeff : int, optional Number of wavelet scales to be used in the decomposition. rel_coeff : int, optional Number of relevant coefficients. In other words how many wavelet scales will represent in a better way our data. One or two scales are enough for filtering our images. full_output : bool, optional If True, an additional cube with the multiscale decomposition of each frame will be returned. Returns ------- cubeout : numpy ndarray Output cube with the filtered frames. If full_output is True the filtered cube is returned together with the a 4d cube containing the multiscale decomposition of each frame. """ cubeout = np.zeros_like(cube) cube_coeff = np.zeros((cube.shape[0], coeff, cube.shape[1], cube.shape[2])) n_frames = cube.shape[0] print('Decomposing frames with the Isotropic Undecimated Wavelet Transform') for i in Progressbar(range(n_frames)): res = iuwt_decomposition(cube[i], coeff, store_smoothed=False) cube_coeff[i] = res for j in range(rel_coeff): cubeout[i] += cube_coeff[i][j] if full_output: return cubeout, cube_coeff else: return cubeout
[docs] def cube_filter_highpass(array, mode='laplacian', verbose=True, **kwargs): """ Apply ``frame_filter_highpass`` to the frames of a 3d or 4d cube. Parameters ---------- array : numpy ndarray Input cube, 3d or 4d. mode : str, optional ``mode`` parameter to the ``frame_filter_highpass`` function. Defaults to a Laplacian high-pass filter. verbose : bool, optional If ``True`` timing and progress bar are shown. **kwargs : dict Passed through to the ``frame_filter_highpass`` function. Returns ------- filtered : numpy ndarray High-pass filtered cube. """ array_out = np.empty_like(array) if array.ndim == 3: for i in Progressbar(range(array.shape[0]), verbose=verbose): array_out[i] = frame_filter_highpass(array[i], mode=mode, **kwargs) elif array.ndim == 4: for i in Progressbar(range(array.shape[1]), verbose=verbose): for lam in range(array.shape[0]): array_out[lam][i] = frame_filter_highpass(array[lam][i], mode=mode, **kwargs) else: raise TypeError('Input array is not a 3d or 4d cube') return array_out
def fft(array): """ Perform the 2d discrete Fourier transform, using numpy's fft2 function. This produces a new representation of the image in which each pixel represents a spatial frequency and orientation, rather than an xy coordinate. When Fourier-transformed images are plotted graphically, the low frequencies are found at the centre; this is not what fft2 actually produces, so we need to also apply numpy's fftshift (centering low frequencies). """ fft_array = np.fft.fftshift(np.fft.fft2(array)) return fft_array def ifft(array): """ Get the inverse Fourier transform on the image. This produces an array of complex numbers whose real values correspond to the image in the original space (decentering). Note ---- A real function corresponds to a symmetric function in fourier space. As long as the operations we apply in the fourier space do not break this symmetry, the data returned by ``ifft`` should not containy any imaginary part. """ new_array = np.fft.ifft2(np.fft.ifftshift(array)).real return new_array
[docs] def frame_filter_highpass(array, mode, median_size=5, kernel_size=5, fwhm_size=5, btw_cutoff=0.2, btw_order=2, hann_cutoff=5, psf=None, conv_mode='conv', mask=None): """ High-pass filtering of input frame depending on parameter ``mode``. The filtered image properties will depend on the ``mode`` and the relevant parameters. Parameters ---------- array : numpy ndarray Input array, 2d frame. mode : str Type of High-pass filtering. ``laplacian`` applies a Laplacian filter with kernel size defined by ``kernel_size`` using the Opencv library. ``laplacian-conv`` applies a Laplacian high-pass filter by defining a kernel (with ``kernel_size``) and using the ``convolve_fft`` Astropy function. ``median-subt`` subtracts a median low-pass filtered version of the image. ``gauss-subt`` subtracts a Gaussian low-pass filtered version of the image. ``fourier-butter`` applies a high-pass 2D Butterworth filter in Fourier domain. ``hann`` uses a Hann window. median_size : int, optional Size of the median box for the ``median-subt`` filter. kernel_size : int, optional Size of the Laplacian kernel used in ``laplacian`` mode. It must be an positive odd integer value. fwhm_size : float, optional Size of the Gaussian kernel used in ``gauss-subt`` mode. btw_cutoff : float, optional Frequency cutoff for low-pass 2d Butterworth filter used in ``fourier-butter`` mode. btw_order : int, optional Order of low-pass 2d Butterworth filter used in ``fourier-butter`` mode. hann_cutoff : float Frequency cutoff for the ``hann`` mode. psf: numpy ndarray, optional Input normalised and centered psf, 2d frame. Should be provided if mode is set to 'psf'. conv_mode : {'conv', 'convfft'}, str optional 'conv' uses the multidimensional gaussian filter from scipy.ndimage and 'convfft' uses the fft convolution with a 2d Gaussian kernel. mask: numpy ndarray, optional Binary mask indicating where the low-pass filtered image should be interpolated with astropy.convolution. This otion can be useful if the low-pass filtered image is aimed to capture low-spatial frequency sky signal, while avoiding a stellar halo (set to one in the binary mask). Note: only works with Gaussian kernel or PSF convolution. Returns ------- filtered : numpy ndarray High-pass filtered image. """ def butter2d_lp(size, cutoff, n=3): """ Create low-pass 2D Butterworth filter. Function from PsychoPy library, credits to Jonathan Peirce, 2010 Parameters ---------- size : tuple size of the filter cutoff : float relative cutoff frequency of the filter (0 - 1.0) n : int, optional order of the filter, the higher n is the sharper the transition is. Returns ------- numpy ndarray filter kernel in 2D centered """ if not 0 < cutoff <= 1: raise ValueError('Cutoff frequency must be between 0 and 1.') if not isinstance(n, int): raise ValueError('n must be an integer >= 1.') rows, cols = size x = np.linspace(-0.5, 0.5, cols) * cols y = np.linspace(-0.5, 0.5, rows) * rows # An array with every pixel = radius relative to center radius = np.sqrt((x**2)[np.newaxis] + (y**2)[:, np.newaxis]) # The filter f = 1 / (1 + (radius / cutoff)**(2*n)) return f def round_away(x): """ Round to the *nearest* integer, half-away-from-zero. Parameters ---------- x : array-like Returns ------- r_rounded : array-like (float) Note ---- IDL ``ROUND`` rounds to the *nearest* integer (commercial rounding), unlike numpy's round/rint, which round to the nearest *even* value (half-to-even, financial rounding) as defined in IEEE-754 standard. """ return np.trunc(x + np.copysign(0.5, x)) # -------------------------------------------------------------------------- if array.ndim != 2: raise TypeError("Input array is not a frame or 2d array.") if mask is not None and (mode != 'psf-subt' and mode != 'gauss-subt'): msg = "Masking option only available for gauss-subt and psf-subt modes" raise TypeError(msg) if mode == 'laplacian': # Applying a Laplacian high-pass kernel if kernel_size % 2 == 0 or kernel_size < 0: raise ValueError("Kernel size must be an odd and positive value.") if not no_opencv: msg = "Opencv bindings are missing. Trying a convolution with a " msg += "Laplacian kernel instead." filtered = cv2.Laplacian(-array, cv2.CV_32F, ksize=kernel_size) elif mode == 'laplacian-conv': # Applying a Laplacian high-pass kernel defining a kernel and using # the convolve_fft Astropy function kernel3 = np.array([[-1, -1, -1], [-1, 8, -1], [-1, -1, -1]]) kernel5 = np.array([[-4, -1, 0, -1, -4], [-1, 2, 3, 2, -1], [0, 3, 4, 3, 0], [-1, 2, 3, 2, -1], [-4, -1, 0, -1, -4]]) kernel7 = np.array([[-10, -5, -2, -1, -2, -5, -10], [-5, 0, 3, 4, 3, 0, -5], [-2, 3, 6, 7, 6, 3, -2], [-1, 4, 7, 8, 7, 4, -1], [-2, 3, 6, 7, 6, 3, -2], [-5, 0, 3, 4, 3, 0, -5], [-10, -5, -2, -1, -2, -5, -10]]) if kernel_size == 3: kernel = kernel3 elif kernel_size == 5: kernel = kernel5 elif kernel_size == 7: kernel = kernel7 else: raise ValueError('Kernel size must be either 3, 5 or 7.') filtered = convolve_fft(array, kernel, normalize_kernel=False, nan_treatment='fill') elif mode == 'median-subt': # Subtracting the low_pass filtered (median) image from the image itself medianed = frame_filter_lowpass(array, 'median', median_size=median_size) filtered = array - medianed elif mode == 'gauss-subt': # Subtracting the low_pass filtered (median) image from the image itself gaussed = frame_filter_lowpass(array, 'gauss', fwhm_size=fwhm_size, conv_mode=conv_mode, mask=mask) filtered = array - gaussed elif mode == 'psf-subt': if psf is None: raise TypeError("psf should be provided for psf-subt mode") # Subtracting the low_pass filtered (median) image from the image itself psfed = frame_filter_lowpass(array, 'psf', psf=psf, mask=mask) filtered = array - psfed elif mode == 'fourier-butter': # Designs an n-th order high-pass 2D Butterworth filter filt = 1 - butter2d_lp(array.shape, cutoff=btw_cutoff, n=btw_order) array_fft = fft(array) fft_new = array_fft * filt filtered = ifft(fft_new) elif mode == 'hann': # TODO: this code could be shortened using np.convolve # see http://scipy-cookbook.readthedocs.io/items/SignalSmooth.html # create a Hanning profile window cut at the chosen frequency: npix = array.shape[0] cutoff = npix/2 * hann_cutoff cutoff_inside = round_away(np.minimum(cutoff, (npix/2 - 1))).astype(int) winsize = 2*cutoff_inside + 1 win1d = np.hanning(winsize) win = 1 - np.outer(win1d, win1d) array_fft = fft(array) # remove high spatial frequency along the Hann profile: array_fft[npix//2 - cutoff_inside: npix//2 + cutoff_inside + 1, npix//2 - cutoff_inside: npix//2 + cutoff_inside + 1] *= win filtered = ifft(array_fft) else: raise TypeError('Mode not recognized.') return filtered
[docs] def frame_filter_lowpass(array, mode='gauss', median_size=5, fwhm_size=5, conv_mode='convfft', kernel_sz=None, psf=None, mask=None, iterate=True, half_res_y=False, **kwargs): """ Low-pass filtering of input frame depending on parameter ``mode``. Parameters ---------- array : numpy ndarray Input array, 2d frame. mode : {'median', 'gauss', 'psf'}, str optional Type of low-pass filtering. median_size : int, optional Size of the median box for filtering the low-pass median filter. fwhm_size : float or tuple of 2 floats, optional Size of the Gaussian kernel for the low-pass Gaussian filter. If a tuple is provided, it should correspond to y and x kernel sizes, respectively. conv_mode : {'conv', 'convfft'}, str optional 'conv' uses the multidimensional gaussian filter from scipy.ndimage and 'convfft' uses the fft convolution with a 2d Gaussian kernel. kernel_sz: int or None, optional Size of the kernel in pixels for 2D Gaussian and Moffat convolutions. If None, astropy.convolution will automatically consider 8*radius kernel sizes. psf: numpy ndarray, optional Input normalised and centered psf, 2d frame. Should be provided if mode is set to 'psf'. mask: numpy ndarray, optional Binary mask indicating where the low-pass filtered image should be interpolated with astropy.convolution. This option can be useful if the low-pass filtered image is used to capture low-spatial frequency sky signal, while avoiding a stellar halo (set to one in the binary mask). Note: only works with Gaussian kernel or PSF convolution. iterate: bool, opt If the first convolution leaves nans, whether to continue replacing nans by interpolation until they are all replaced. half_res_y: bool, {True,False}, optional Whether the input data has only half the angular resolution vertically compared to horizontally (e.g. the case for some IFUs); in other words there are always 2 rows of pixels with exactly the same values. If so, the kernel will also be squashed vertically by a factor 2. Only used if mode is 'gauss' **kwargs : dict Passed through to the astropy.convolution.convolve or convolve_fft function. Returns ------- filtered : numpy ndarray Low-pass filtered image. """ if array.ndim != 2: raise TypeError('Input array is not a frame or 2d array.') if not isinstance(median_size, int): raise ValueError('`Median_size` must be integer') if mask is not None: if mode == 'median': msg = "Masking not available for median filter" if mask.shape != array.shape: msg = "Mask dimensions should be the same as array" raise TypeError(msg) if mode == 'median': # creating the low_pass filtered (median) image filtered = median_filter(array, median_size, mode='nearest') elif mode == 'gauss': # 2d Gaussian filter kernel_sz_y = kernel_sz if np.isscalar(fwhm_size): sigma = fwhm_size * gaussian_fwhm_to_sigma sigma_y = sigma else: if len(fwhm_size) != 2: msg = "If not a scalar, fwhm_size must be of length 2" raise TypeError(msg) sigma_y = fwhm_size[0] * gaussian_fwhm_to_sigma sigma = fwhm_size[1] * gaussian_fwhm_to_sigma if kernel_sz is not None: kernel_sz_y = int(kernel_sz*fwhm_size[0]/fwhm_size[1]) if kernel_sz_y % 2 != kernel_sz % 2: kernel_sz_y += 1 if half_res_y: sigma_y = max(1, sigma_y//2) if kernel_sz_y is not None: kernel_sz_y = kernel_sz_y//2 if kernel_sz_y % 2 != kernel_sz % 2: kernel_sz_y += 1 if conv_mode == 'conv': filtered = convolve(array, Gaussian2DKernel(x_stddev=sigma, y_stddev=sigma_y, x_size=kernel_sz, y_size=kernel_sz_y), mask=mask, **kwargs) if iterate and np.sum(np.isnan(filtered)) > 0: filtered = interp_nan(filtered, Gaussian2DKernel(x_stddev=sigma, y_stddev=sigma_y, x_size=kernel_sz, y_size=kernel_sz_y), convolve=convolve) elif conv_mode == 'convfft': # FFT Convolution with a 2d gaussian kernel created with Astropy. filtered = convolve_fft(array, Gaussian2DKernel(x_stddev=sigma, y_stddev=sigma_y, x_size=kernel_sz, y_size=kernel_sz_y), mask=mask, **kwargs) if iterate and np.sum(np.isnan(filtered)) > 0: filtered = interp_nan(filtered, Gaussian2DKernel(x_stddev=sigma, y_stddev=sigma_y, x_size=kernel_sz, y_size=kernel_sz_y), convolve=convolve_fft, **kwargs) else: raise TypeError('2d Gaussian filter mode not recognized') elif mode == 'psf': if psf is None: raise TypeError('psf should be provided for convolution') elif psf.ndim != 2: raise TypeError('Input psf is not a frame or 2d array.') if psf.shape[-1] > array.shape[-1]: raise TypeError('Input psf is larger than input array. Crop.') # psf convolution if conv_mode == 'conv': filtered = convolve(array, psf, mask=mask, **kwargs) if iterate and np.sum(np.isnan(filtered)) > 0: filtered = interp_nan(filtered, psf, convolve=convolve, **kwargs) elif conv_mode == 'convfft': filtered = convolve_fft(array, psf, mask=mask, **kwargs) if iterate and np.sum(np.isnan(filtered)) > 0: filtered = interp_nan(filtered, psf, convolve=convolve_fft, **kwargs) else: raise TypeError('Low-pass filter mode not recognized') return filtered
[docs] def cube_filter_lowpass(array, mode='gauss', median_size=5, fwhm_size=5, conv_mode='conv', kernel_sz=None, verbose=True, psf=None, mask=None, iterate=True, **kwargs): """ Apply ``frame_filter_lowpass`` to the frames of a 3d or 4d cube. Parameters ---------- array : numpy ndarray Input cube, 3d or 4d. mode : str, optional See the documentation of the ``frame_filter_lowpass`` function. median_size : int, optional See the documentation of the ``frame_filter_lowpass`` function. fwhm_size : float, optional See the documentation of the ``frame_filter_lowpass`` function. conv_mode : str, optional See the documentation of the ``frame_filter_lowpass`` function. kernel_sz: int, optional See the documentation of the ``frame_filter_lowpass`` function. verbose : bool, optional If True timing and progress bar are shown. psf: numpy ndarray, optional Input normalised and centered psf, 2d frame. Should be provided if mode is set to 'psf'. mask: numpy ndarray, optional Binary mask indicating where the low-pass filtered image should be interpolated with astropy.convolution. This otion can be useful if the low-pass filtered image is aimed to capture low-spatial frequency sky signal, while avoiding a stellar halo (set to one in the binary mask). Note: only works with Gaussian kernel or PSF convolution. iterate: bool, opt If the first convolution leaves nans, whether to continue replacing nans by interpolation until they are all replaced. **kwargs : dict Passed through to the astropy.convolution.convolve or convolve_fft function. Returns ------- filtered : numpy ndarray Low-pass filtered cube. """ array_out = np.empty_like(array) if array.ndim == 3: for i in Progressbar(range(array.shape[0]), verbose=verbose): array_out[i] = frame_filter_lowpass(array[i], mode, median_size, fwhm_size, conv_mode, kernel_sz, psf, mask, iterate, **kwargs) elif array.ndim == 4: for i in Progressbar(range(array.shape[1]), verbose=verbose): for lam in range(array.shape[0]): array_out[lam][i] = frame_filter_lowpass(array[lam][i], mode, median_size, fwhm_size, conv_mode, kernel_sz, psf, mask, iterate, **kwargs) else: raise TypeError('Input array is not a 3d or 4d cube') return array_out
[docs] def frame_deconvolution(array, psf, n_it=30): """ Iterative image deconvolution following the scikit-image implementation of the Richardson-Lucy algorithm, described in [RIC72]_ and [LUC74]_. Considering an image that has been convolved by the point spread function of an instrument, the algorithm will sharpen the blurred image through a user-defined number of iterations, which changes the regularisation. Parameters ---------- array : numpy ndarray Input image, 2d frame. psf : numpy ndarray Input psf, 2d frame. n_it : int, optional Number of iterations. Returns ------- deconv : numpy ndarray Deconvolved image. """ if array.ndim != 2: raise TypeError('Input array is not a frame or 2d array.') if psf.ndim != 2: raise TypeError('Input psf is not a frame or 2d array.') max_I = np.amax(array) min_I = np.amin(array) drange = max_I-min_I deconv = richardson_lucy((array-min_I)/drange, psf, iterations=n_it) deconv *= drange deconv += min_I return deconv