Source code for vip_hci.preproc.badframes

#! /usr/bin/env python
"""
Module with functions for outlier frame detection.
"""

__author__ = 'Carlos Alberto Gomez Gonzalez'
__all__ = ['cube_detect_badfr_pxstats',
           'cube_detect_badfr_ellipticity',
           'cube_detect_badfr_correlation']

import numpy as np
import pandas as pn
from matplotlib import pyplot as plt
try:
    from photutils.detection import DAOStarFinder
except:
    from photutils import DAOStarFinder
from astropy.stats import sigma_clip
from ..var import get_annulus_segments
from ..config import time_ini, timing, check_array
from ..config.utils_conf import vip_figsize
from ..stats import cube_basic_stats, cube_distance


[docs] def cube_detect_badfr_pxstats(array, mode='annulus', in_radius=10, width=10, top_sigma=1.0, low_sigma=1.0, window=None, plot=True, verbose=True): """ Returns the list of bad frames from a cube using the px statistics in a centered annulus or circular aperture. Frames that are more than a few standard deviations discrepant are rejected. Should be applied on a recentered cube. Parameters ---------- array : numpy ndarray Input 3d array, cube. mode : {'annulus', 'circle'}, string optional Whether to take the statistics from a circle or an annulus. in_radius : int optional If mode is 'annulus' then 'in_radius' is the inner radius of the annular region. If mode is 'circle' then 'in_radius' is the radius of the aperture. width : int optional Size of the annulus. Ignored if mode is 'circle'. top_sigma : int, optional Top boundary for rejection. low_sigma : int, optional Lower boundary for rejection. window : int, optional Window for smoothing the mean and getting the rejection statistic. If None, it is defined as ``n_frames//3``. plot : bool, optional If true it plots the mean fluctuation as a function of the frames and the boundaries. verbose : bool, optional Whether to print to stdout or not. Returns ------- good_index_list : numpy ndarray 1d array of good indices. bad_index_list : numpy ndarray 1d array of bad frames indices. """ check_array(array, 3, msg='array') if mode == 'annulus': if in_radius + width > array[0].shape[0] / 2: msgve = 'Inner radius and annulus size are too big (out of boundaries)' raise ValueError(msgve) elif mode == 'circle': if in_radius > array[0].shape[0] / 2: msgve = 'Radius size is too big (out of boundaries)' raise ValueError(msgve) if verbose: start_time = time_ini() n = array.shape[0] mean_values = cube_basic_stats(array, mode, radius=in_radius, inner_radius=in_radius, size=width) if window is None: window = n//3 mean_smooth = pn.Series(mean_values).rolling(window, center=True).mean() mean_smooth = mean_smooth.bfill() # fillna(method='backfill') mean_smooth = mean_smooth.ffill() # fillna(method='ffill') sigma = np.std(mean_values) bad_index_list = [] good_index_list = [] top_boundary = np.empty([n]) bot_boundary = np.empty([n]) for i in range(n): if mode == 'annulus': i_mean_value = get_annulus_segments(array[i], width=width, inner_radius=in_radius, mode="val")[0].mean() elif mode == 'circle': i_mean_value = mean_values[i] top_boundary[i] = mean_smooth[i] + top_sigma*sigma bot_boundary[i] = mean_smooth[i] - low_sigma*sigma if i_mean_value > top_boundary[i] or i_mean_value < bot_boundary[i]: bad_index_list.append(i) else: good_index_list.append(i) if verbose: bad = len(bad_index_list) percent_bad_frames = (bad * 100) / n msg1 = "Done detecting bad frames from cube: {} out of {} ({:.3}%)" print(msg1.format(bad, n, percent_bad_frames)) if plot: plt.figure(figsize=vip_figsize) plt.plot(mean_values, 'o', alpha=0.6) plt.plot(mean_smooth, label='smoothed mean fluctuation', lw=2, ls='-', alpha=0.5) plt.plot(top_boundary, label='upper threshold', lw=1.4, ls='-', color='#9467bd', alpha=0.8) plt.plot(bot_boundary, label='lower threshold', lw=1.4, ls='-', color='#9467bd', alpha=0.8) plt.legend(fancybox=True, framealpha=0.5, loc='best') plt.grid('on', alpha=0.2) plt.ylabel('Mean value in '+mode) plt.xlabel('Frame number') if verbose: timing(start_time) good_index_list = np.array(good_index_list) bad_index_list = np.array(bad_index_list) return good_index_list, bad_index_list
[docs] def cube_detect_badfr_ellipticity(array, fwhm, crop_size=30, roundlo=-0.2, roundhi=0.2, plot=True, verbose=True): """ Returns the list of bad frames from a cube by measuring the PSF ellipticity of the central source. Should be applied on a recentered cube. Parameters ---------- array : numpy ndarray Input 3d array, cube. fwhm : float FWHM size in pixels. crop_size : int, optional Size in pixels of the square subframe to be analyzed. roundlo, roundhi : float, optional Lower and higher bounds for the ellipticity. See ``Notes`` below for details. plot : bool, optional If true it plots the central PSF roundness for each frame. verbose : bool, optional Whether to print to stdout or not. Returns ------- good_index_list : numpy ndarray 1d array of good indices. bad_index_list : numpy ndarray 1d array of bad frames indices. Note ---- From photutils.DAOStarFinder documentation: DAOFIND calculates the object roundness using two methods. The 'roundlo' and 'roundhi' bounds are applied to both measures of roundness. The first method ('roundness1'; called 'SROUND' in DAOFIND) is based on the source symmetry and is the ratio of a measure of the object's bilateral (2-fold) to four-fold symmetry. The second roundness statistic ('roundness2'; called 'GROUND' in DAOFIND) measures the ratio of the difference in the height of the best fitting Gaussian function in x minus the best fitting Gaussian function in y, divided by the average of the best fitting Gaussian functions in x and y. A circular source will have a zero roundness. A source extended in x or y will have a negative or positive roundness, respectively. """ from .cosmetics import cube_crop_frames check_array(array, 3, msg='array') if verbose: start_time = time_ini() array = cube_crop_frames(array, crop_size, verbose=False) n = array.shape[0] goodfr = [] badfr = [] roundness1 = [] roundness2 = [] for i in range(n): ff_clipped = sigma_clip(array[i], sigma=3, maxiters=None) thr = ff_clipped.max() DAOFIND = DAOStarFinder(threshold=thr, fwhm=fwhm) tbl = DAOFIND.find_stars(array[i]) table_mask = (tbl['peak'] == tbl['peak'].max()) tbl = tbl[table_mask] roun1 = tbl['roundness1'][0] roun2 = tbl['roundness2'][0] roundness1.append(roun1) roundness2.append(roun2) # we check the roundness if roundhi > roun1 > roundlo and roundhi > roun2 > roundlo: goodfr.append(i) else: badfr.append(i) bad_index_list = np.array(badfr) good_index_list = np.array(goodfr) if plot: _, ax = plt.subplots(figsize=vip_figsize) x = range(len(roundness1)) if n > 5000: marker = ',' else: marker = 'o' ax.plot(x, roundness1, '-', alpha=0.6, color='#1f77b4', label='roundness1') ax.plot(x, roundness1, marker=marker, alpha=0.4, color='#1f77b4') ax.plot(x, roundness2, '-', alpha=0.6, color='#9467bd', label='roundness2') ax.plot(x, roundness2, marker=marker, alpha=0.4, color='#9467bd') ax.hlines(roundlo, xmin=-1, xmax=n + 1, lw=2, colors='#ff7f0e', linestyles='dashed', label='roundlo', alpha=0.6) ax.hlines(roundhi, xmin=-1, xmax=n + 1, lw=2, colors='#ff7f0e', linestyles='dashdot', label='roundhi', alpha=0.6) plt.xlabel('Frame number') plt.ylabel('Roundness') plt.xlim(xmin=-1, xmax=n + 1) plt.legend(fancybox=True, framealpha=0.5, loc='best') plt.grid('on', alpha=0.2) if verbose: bad = len(bad_index_list) percent_bad_frames = (bad*100)/n msg1 = "Done detecting bad frames from cube: {} out of {} ({:.3}%)" print(msg1.format(bad, n, percent_bad_frames)) timing(start_time) return good_index_list, bad_index_list
[docs] def cube_detect_badfr_correlation(array, frame_ref, crop_size=30, dist='pearson', percentile=20, threshold=None, mode='full', inradius=None, width=None, plot=True, verbose=True, full_output=False): """ Returns the list of bad frames from a cube by measuring the distance (similarity) or correlation of the frames (cropped to a 30x30 subframe) wrt a reference frame from the same cube. Then the distance/correlation level is thresholded (percentile parameter) to find the outliers. Should be applied on a recentered cube. Parameters ---------- array : numpy ndarray Input 3d array, cube. frame_ref : int or 2d array Index of the frame that will be used as a reference or 2d reference array. crop_size : int, optional Size in pixels of the square subframe to be analyzed. dist : {'sad','euclidean','mse','pearson','spearman','ssim'}, str optional One of the similarity or dissimilarity measures from function ``vip_hci.stats.distances.cube_distance``. percentile : int, optional The percentage of frames that will be discarded, if threshold is not provided. threshold: None or float, optional If provided, corresponds to the threshold 'distance' value above/below which (depending on index of similarity/dissimilarity resp.) will be discarded. If not None, supersedes 'percentile'. mode : {'full','annulus'}, string optional Whether to use the full frames or a centered annulus. inradius : None or int, optional The inner radius when mode is 'annulus'. width : None or int, optional The width when mode is 'annulus'. plot : bool, optional If true it plots the mean fluctuation as a function of the frames and the boundaries. verbose : bool, optional Whether to print to stdout or not. full_output: bool, optional Whether to also return the array of distances. Returns ------- good_index_list : numpy ndarray 1d array of good indices. bad_index_list : numpy ndarray 1d array of bad frames indices. distances : numpy ndarray [if full_output=True] 1d array with the measured distances """ from .cosmetics import cube_crop_frames, frame_crop check_array(array, 3, msg='array') if verbose: start_time = time_ini() n = array.shape[0] # the cube is cropped to the central area subarray = cube_crop_frames(array, crop_size, verbose=False) if isinstance(frame_ref, np.ndarray): frame_ref = frame_crop(frame_ref, crop_size, verbose=False) distances = cube_distance(subarray, frame_ref, mode, dist, inradius=inradius, width=width, plot=False) if dist == 'pearson' or dist == 'spearman' or dist == 'ssim': # measures of correlation or similarity minval = np.min(distances[~np.isnan(distances)]) distances = np.nan_to_num(distances) distances[np.where(distances == 0)] = minval if threshold is None: threshold = np.percentile(distances, percentile) indbad = np.where(distances <= threshold) indgood = np.where(distances > threshold) else: # measures of dissimilarity if threshold is None: threshold = np.percentile(distances, 100-percentile) indbad = np.where(distances >= threshold) indgood = np.where(distances < threshold) bad_index_list = indbad[0] good_index_list = indgood[0] if verbose: bad = len(bad_index_list) percent_bad_frames = (bad*100)/n msg1 = "Done detecting bad frames from cube: {} out of {} ({:.3}%)" print(msg1.format(bad, n, percent_bad_frames)) if plot: lista = distances _, ax = plt.subplots(figsize=vip_figsize) x = range(len(lista)) ax.plot(x, lista, '-', alpha=0.6, color='#1f77b4') if n > 5000: marker = ',' else: marker = 'o' ax.plot(x, lista, marker=marker, alpha=0.4, color='#1f77b4') if isinstance(frame_ref, int): ax.vlines(frame_ref, ymin=np.nanmin(lista), ymax=np.nanmax(lista), colors='green', linestyles='dashed', lw=2, alpha=0.6, label='Reference frame '+str(frame_ref)) ax.hlines(threshold, xmin=-1, xmax=n+1, lw=2, colors='#ff7f0e', linestyles='dashed', label='Threshold', alpha=0.6) plt.xlabel('Frame number') if dist == 'sad': plt.ylabel('SAD - Manhattan distance') elif dist == 'euclidean': plt.ylabel('Euclidean distance') elif dist == 'pearson': plt.ylabel('Pearson correlation coefficient') elif dist == 'spearman': plt.ylabel('Spearman correlation coefficient') elif dist == 'mse': plt.ylabel('Mean squared error') elif dist == 'ssim': plt.ylabel('Structural Similarity Index') plt.xlim(xmin=-1, xmax=n+1) plt.legend(fancybox=True, framealpha=0.5, loc='best') plt.grid('on', alpha=0.2) if verbose: timing(start_time) if full_output: return good_index_list, bad_index_list, distances else: return good_index_list, bad_index_list