Source code for vip_hci.preproc.subsampling

#! /usr/bin/env python
"""
Module with pixel and frame subsampling functions.

.. [BRA13]
   | Brandt et al. 2013
   | **New Techniques for High-contrast Imaging with ADI: The ACORNS-ADI SEEDS
     Data Reduction Pipeline**
   | *The Astrophysical Journal, Volume 764, Issue 2, p. 183*
   | `https://arxiv.org/abs/1209.3014
     <https://arxiv.org/abs/1209.3014>`_

"""

__author__ = 'Carlos Alberto Gomez Gonzalez, Valentin Christiaens'
__all__ = ['cube_collapse',
           'cube_subsample',
           'cube_subsample_trimmean']

import numpy as np


[docs] def cube_collapse(cube, mode='median', n=50, w=None): """Collapse a 3D or 4D cube into a 2D frame or 3D cube, respectively. The ``mode`` parameter determines how the collapse should be done. It is possible to perform a trimmed mean combination of the frames, as in [BRA13]_. In case of a 4D input cube, it is assumed to be an IFS dataset with the zero-th axis being the spectral dimension, and the first axis the temporal dimension. Parameters ---------- cube : numpy ndarray Cube. mode : {'median', 'mean', 'sum', 'max', 'trimmean', 'absmean', 'wmean'} Sets the way of collapsing the images in the cube. 'wmean' stands for weighted mean and requires weights w to be provided. 'absmean' stands for the mean of absolute values (potentially useful for negfc). n : int, optional Sets the discarded values at high and low ends. When n = N is the same as taking the mean, when n = 1 is like taking the median. w: 1d numpy array or list, optional Weights to be applied for a weighted mean. Need to be provided if collapse mode is 'wmean'. Returns ------- frame : numpy ndarray Output array, cube combined. """ arr = cube if arr.ndim == 3: ax = 0 elif arr.ndim == 4: nch = arr.shape[0] ax = 1 else: raise TypeError('The input array is not a cube or 3d array.') if mode == 'wmean': if w is None: raise ValueError( "Weights have to be provided for weighted mean mode") if len(w) != cube.shape[0]: raise TypeError("Weights need same length as cube") if isinstance(w, list): w = np.array(w) if mode == 'mean': frame = np.nanmean(arr, axis=ax) elif mode == 'median': frame = np.nanmedian(arr, axis=ax) elif mode == 'sum': frame = np.nansum(arr, axis=ax) elif mode == 'max': frame = np.nanmax(arr, axis=ax) elif mode == 'trimmean': N = arr.shape[ax] k = (N - n)//2 if N % 2 != n % 2: n += 1 if ax == 0: frame = np.empty_like(arr[0]) for index, _ in np.ndenumerate(arr[0]): sort = np.sort(arr[:, index[0], index[1]]) frame[index] = np.nanmean(sort[k:k+n]) else: frame = np.empty_like(arr[:, 0]) for j in range(nch): for index, _ in np.ndenumerate(arr[:, 0]): sort = np.sort(arr[j, :, index[0], index[1]]) frame[j][index] = np.nanmean(sort[k:k+n]) elif mode == 'wmean': arr[np.where(np.isnan(arr))] = 0 # to avoid product with nan if ax == 0: frame = np.inner(w, np.moveaxis(arr, 0, -1)) else: frame = np.empty_like(arr[:, 0]) for j in range(nch): frame[j] = np.inner(w, np.moveaxis(arr[j], 0, -1)) elif mode == 'absmean': frame = np.nanmean(np.abs(arr), axis=ax) else: raise TypeError("mode not recognized") return frame
[docs] def cube_subsample(array, n, mode="mean", w=None, parallactic=None, verbose=True): """Mean/Median combines frames in 3d or 4d cube with window ``n``. Parameters ---------- array : numpy ndarray Input 3d array, cube. n : int Window for mean/median. mode : {'mean','median', 'wmean'}, optional Switch for choosing mean, median or weighted average. parallactic : numpy ndarray, optional List of corresponding parallactic angles. verbose : bool optional Returns ------- arr_view : numpy ndarray Resulting array. If ``parallactic`` is provided the the new cube and angles are returned. """ if array.ndim not in [3, 4]: raise TypeError('The input array is not a cube or 3d or 4d array') kwargs = {} if mode == 'median': func = np.median elif mode == 'mean': func = np.mean elif mode == 'wmean': func = np.average w = w/np.sum(w) kwargs = {'weights': w} else: raise ValueError('`Mode` should be either Mean or Median') if array.ndim == 3: m = int(array.shape[0] / n) resid = array.shape[0] % n y = array.shape[1] x = array.shape[2] arr = np.empty([m, y, x]) if parallactic is not None: angles = np.zeros(m) for i in range(m): arr[i, :, :] = func(array[n * i:n * i + n, :, :], axis=0, **kwargs) if parallactic is not None: angles[i] = func(parallactic[n * i:n * i + n], **kwargs) elif array.ndim == 4: m = int(array.shape[1] / n) resid = array.shape[1] % n w = array.shape[0] y = array.shape[2] x = array.shape[3] arr = np.empty([w, m, y, x]) if parallactic is not None: angles = np.zeros(m) for j in range(w): for i in range(m): arr[j, i, :, :] = func(array[j, n * i:n * i + n, :, :], axis=0, **kwargs) if parallactic is not None: angles[i] = func(parallactic[n * i:n * i + n], **kwargs) if verbose: msg = "Cube temporally subsampled by taking the {} of every {} frames" print(msg.format(mode, n)) if resid > 0: print("Initial # of frames and window are not multiples ({} " "frames were dropped)".format(resid)) print("New shape: {}".format(arr.shape)) if parallactic is not None: return arr, angles else: return arr
[docs] def cube_subsample_trimmean(arr, n, m): """Perform a trimmed mean combination every m frames in a cube. Details in [BRA13]_. Parameters ---------- arr : numpy ndarray Cube. n : int Sets the discarded values at high and low ends. When n = N is the same as taking the mean, when n = 1 is like taking the median. m : int Window from the trimmed mean. Returns ------- arr_view : numpy ndarray Output array, cube combined. """ if arr.ndim != 3: raise TypeError('The input array is not a cube or 3d array') num = int(arr.shape[0]/m) res = int(arr.shape[0] % m) y = arr.shape[1] x = arr.shape[2] arr2 = np.empty([num+2, y, x]) for i in range(num): arr2[0] = cube_collapse(arr[:m, :, :], 'trimmean', n) if i > 0: arr2[i] = cube_collapse(arr[m*i:m*i+m, :, :], 'trimmean', n) arr2[num] = cube_collapse(arr[-res:, :, :], 'trimmean', n) arr_view = arr2[:num+1] # slicing until m+1 - last index not included msg = "Cube temporally subsampled by taking the trimmed mean of every {} " msg += "frames" print(msg.format(m)) return arr_view