#! /usr/bin/env python
"""
Module containing functions for cubes frame registration.
.. [GUI08]
| Guizar-Sicairos et al. 2008
| **Efficient subpixel image registration algorithms**
| *Optics Letters, Volume 33, Issue 2, p. 156*
| `https://ui.adsabs.harvard.edu/abs/2008OptL...33..156G
<https://ui.adsabs.harvard.edu/abs/2008OptL...33..156G>`_
.. [PUE15]
| Pueyo et al. 2015
| **Reconnaissance of the HR 8799 Exosolar System. II. Astrometry and Orbital
Motion**
| *The Astrophysical Journal, Volume 803, Issue 1, p. 31*
| `https://arxiv.org/abs/1409.6388
<https://arxiv.org/abs/1409.6388>`_
"""
__author__ = 'C. A. Gomez Gonzalez, V. Christiaens, G. Ruane, R. Farkas'
__all__ = ['frame_shift',
'cube_shift',
'frame_center_radon',
'frame_center_satspots',
'cube_recenter_satspots',
'cube_recenter_radon',
'cube_recenter_dft_upsampling',
'cube_recenter_2dfit',
'cube_recenter_via_speckles']
import warnings
import numpy as np
try:
import cv2
no_opencv = False
except ImportError:
msg = "Opencv python bindings are missing."
warnings.warn(msg, ImportWarning)
no_opencv = True
from hciplot import plot_frames
from scipy.ndimage import fourier_shift
from scipy.ndimage import shift
from skimage.transform import radon
from skimage.registration import phase_cross_correlation
from multiprocessing import cpu_count
from matplotlib import pyplot as plt
from ..config import time_ini, timing, Progressbar
from ..config.utils_conf import vip_figsize, check_array
from ..config.utils_conf import pool_map, iterable
from ..stats import frame_basic_stats
from ..var import (get_square, frame_center, get_annulus_segments,
fit_2dmoffat, fit_2dgaussian, fit_2dairydisk,
fit_2d2gaussian, cube_filter_lowpass, cube_filter_highpass,
frame_filter_highpass, frame_filter_lowpass)
from .cosmetics import cube_crop_frames, frame_crop
from .subsampling import cube_collapse
[docs]
def frame_shift(array, shift_y, shift_x, imlib='vip-fft',
interpolation='lanczos4', border_mode='reflect'):
"""Shift a 2D array by shift_y, shift_x.
Parameters
----------
array : numpy ndarray
Input 2d array.
shift_y, shift_x: float
Shifts in y and x directions.
imlib : {'opencv', 'ndimage-fourier', 'ndimage-interp', 'vip-fft'}, str opt
Library or method used for performing the image shift.
'ndimage-fourier' or 'vip-fft': does a fourier shift operation and
preserves better the pixel values - therefore the flux and photometry
(wrapper of scipy.ndimage.fourier_shift). Interpolation-based shift
('opencv' and 'ndimage-interp') is faster but less accurate than the
fourier shift. 'opencv' is recommended when speed is critical.
interpolation : str, optional
Only used in case of imlib is set to 'opencv' or 'ndimage-interp'
(Scipy.ndimage), where the images are shifted via interpolation.
For Scipy.ndimage the options are: 'nearneig', bilinear', 'biquadratic',
'bicubic', 'biquartic' or 'biquintic'. The 'nearneig' interpolation is
the fastest and the 'biquintic' the slowest. The 'nearneig' is the
poorer option for interpolation of noisy astronomical images.
For Opencv the options are: 'nearneig', 'bilinear', 'bicubic' or
'lanczos4'. The 'nearneig' interpolation is the fastest and the
'lanczos4' the slowest and accurate. 'lanczos4' is the default for
Opencv and 'biquartic' for Scipy.ndimage.
border_mode : {'reflect', 'nearest', 'constant', 'mirror', 'wrap'}
For 'opencv' and 'ndimage-interp', points outside the boundaries of the
input are filled according to the value of this parameter.
With 'reflect', the input is extended by reflecting about the edge of
the last pixel. With 'nearest', the input is extended by replicating the
last pixel. With 'constant', the input is extended by filling all values
beyond the edge with zeros. With 'mirror', the input is extended by
reflecting about the center of the last pixel. With 'wrap', the input is
extended by wrapping around to the opposite edge. Default is 'reflect'.
Note: for 'ndimage-fourier' default is 'wrap' (impossible to change),
while border_mode is 'constant' (zeros) for 'vip-fft'.
Returns
-------
array_shifted : numpy ndarray
Shifted 2d array.
"""
check_array(array, dim=2)
image = array.copy()
if imlib == 'ndimage-fourier':
# Warning: default border mode is 'wrap' (cannot be changed)
shift_val = (shift_y, shift_x)
array_shifted = fourier_shift(np.fft.fftn(image), shift_val)
array_shifted = np.fft.ifftn(array_shifted)
array_shifted = array_shifted.real
elif imlib == 'vip-fft':
ny_ori, nx_ori = image.shape
# First pad to avoid 'wrapping' values at the edges
npad = int(np.ceil(np.amax(np.abs([shift_y, shift_x]))))
cy_ori, cx_ori = frame_center(array)
new_y = int(ny_ori+2*npad)
new_x = int(nx_ori+2*npad)
new_image = np.zeros([new_y, new_x], dtype=array.dtype)
cy, cx = frame_center(new_image)
y0 = int(cy-cy_ori)
y1 = int(cy+cy_ori)
if new_y % 2:
y1 += 1
x0 = int(cx-cx_ori)
x1 = int(cx+cx_ori)
if new_x % 2:
x1 += 1
new_image[y0:y1, x0:x1] = array.copy()
p_y0 = npad
p_x0 = npad
npix = new_y
# If non-square, add extra pad to make it square
if new_y != new_x:
if new_y > new_x:
npix = new_y
image = np.zeros([npix, npix])
x0 = int(cy-cx)
x1 = x0+new_x
image[:, x0:x1] = new_image.copy()
p_x0 += x0
else:
npix = new_x
image = np.zeros([npix, npix])
y0 = int(cx-cy)
y1 = y0+new_y
image[y0:y1] = new_image.copy()
p_y0 += y0
new_image = image.copy()
# If odd, add an extra pad layer to make it even
if npix % 2:
npix += 1
image = np.zeros([npix, npix])
if shift_x > 0:
x0 = 0
else:
x0 = 1
p_x0 += 1
if shift_y > 0:
y0 = 0
else:
y0 = 1
p_y0 += 1
image[y0:y0+npix-1, x0:x0+npix-1] = new_image.copy()
new_image = image.copy()
# actual FT-based shift
ramp = np.outer(np.ones(npix), np.arange(npix) - npix/2)
tilt = (-2*np.pi / npix) * (shift_x*ramp + shift_y*ramp.T)
fact = np.fft.fftshift(np.cos(tilt) + 1j*np.sin(tilt))
image_ft = np.fft.fft2(new_image) # no np.fft.fftshift applied!
array_shifted = np.fft.ifft2(image_ft * fact).real
# final crop to compensate padding
array_shifted = array_shifted[p_y0:p_y0+ny_ori, p_x0:p_x0+nx_ori]
elif imlib == 'ndimage-interp':
if interpolation == 'nearneig':
order = 0
elif interpolation == 'bilinear':
order = 1
elif interpolation == 'biquadratic':
order = 2
elif interpolation == 'bicubic':
order = 3
elif interpolation == 'biquartic' or interpolation == 'lanczos4':
order = 4
elif interpolation == 'biquintic':
order = 5
else:
raise ValueError('Scipy.ndimage interpolation method not '
'recognized')
if border_mode not in ['reflect', 'nearest', 'constant', 'mirror',
'wrap']:
raise ValueError('`border_mode` not recognized')
array_shifted = shift(image, (shift_y, shift_x), order=order,
mode=border_mode)
elif imlib == 'opencv':
if no_opencv:
msg = 'Opencv python bindings cannot be imported. Install opencv or'
msg += ' set imlib to ndimage-fourier or ndimage-interp'
raise RuntimeError(msg)
if interpolation == 'bilinear':
intp = cv2.INTER_LINEAR
elif interpolation == 'bicubic':
intp = cv2.INTER_CUBIC
elif interpolation == 'nearneig':
intp = cv2.INTER_NEAREST
elif interpolation == 'lanczos4':
intp = cv2.INTER_LANCZOS4
else:
raise ValueError('Opencv interpolation method not recognized')
if border_mode == 'mirror':
bormo = cv2.BORDER_REFLECT_101 # gfedcb|abcdefgh|gfedcba
elif border_mode == 'reflect':
bormo = cv2.BORDER_REFLECT # fedcba|abcdefgh|hgfedcb
elif border_mode == 'wrap':
bormo = cv2.BORDER_WRAP # cdefgh|abcdefgh|abcdefg
elif border_mode == 'constant':
bormo = cv2.BORDER_CONSTANT # iiiiii|abcdefgh|iiiiiii
elif border_mode == 'nearest':
bormo = cv2.BORDER_REPLICATE # aaaaaa|abcdefgh|hhhhhhh
else:
raise ValueError('`border_mode` not recognized')
image = np.float32(image)
y, x = image.shape
M = np.float32([[1, 0, shift_x], [0, 1, shift_y]])
array_shifted = cv2.warpAffine(image, M, (x, y), flags=intp,
borderMode=bormo)
else:
raise ValueError('Image transformation library not recognized')
return array_shifted
[docs]
def cube_shift(cube, shift_y, shift_x, imlib='vip-fft',
interpolation='lanczos4', border_mode='reflect', nproc=None):
"""Shift the X-Y coordinates of a cube or 3D array by x and y values.
Parameters
----------
cube : numpy ndarray, 3d
Input cube.
shift_y, shift_x: float, list of floats or np.ndarray of floats
Shifts in y and x directions for each frame. If the a single value is
given then all the frames will be shifted by the same amount.
imlib : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
interpolation : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
border_mode : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
nproc: int or None, optional
Number of CPUs to use for multiprocessing. If None, will be
automatically set to half the number of available CPUs.
Returns
-------
cube_out : numpy ndarray, 3d
Cube with shifted frames.
"""
check_array(cube, dim=3)
nfr = cube.shape[0]
if np.isscalar(shift_x):
shift_x = np.ones([nfr]) * shift_x
if np.isscalar(shift_y):
shift_y = np.ones([nfr]) * shift_y
if nproc is None:
nproc = cpu_count()//2
if nproc == 1:
cube_out = np.zeros_like(cube)
for i in range(cube.shape[0]):
cube_out[i] = frame_shift(cube[i], shift_y[i], shift_x[i], imlib,
interpolation, border_mode)
elif nproc > 1:
res = pool_map(nproc, frame_shift, iterable(cube), iterable(shift_y),
iterable(shift_x), imlib, interpolation, border_mode)
cube_out = np.array(res)
return cube_out
[docs]
def frame_center_satspots(array, xy, subi_size=19, sigfactor=6, shift=False,
imlib='vip-fft', interpolation='lanczos4',
fit_type='moff', filter_freq=(0, 0),
border_mode='reflect', debug=False, verbose=True):
"""Find the center of a frame with satellite spots (relevant e.g. for\
VLT/SPHERE data).
The method used to determine the center is by centroiding the 4 spots via a
2D Gaussian fit and finding the intersection of the lines they create (see
Notes). This method is very sensitive to the SNR of the satellite spots,
therefore thresholding of the background pixels is performed. If the results
are too extreme, the debug parameter will allow to see in depth what is
going on with the fit (you may have to adjust the `sigfactor` for the
background pixels thresholding).
Parameters
----------
array : numpy ndarray, 2d
Image or frame.
xy : tuple of 4 tuples of 2 elements
Tuple with coordinates X,Y of the 4 satellite spots. When the spots are
in an X configuration, the order is the following: top-left, top-right,
bottom-left and bottom-right. When the spots are in an + (cross-like)
configuration, the order is the following: top, right, left, bottom.
subi_size : int, optional
Size of subimage where the fitting is done.
sigfactor : int, optional
The background pixels will be thresholded before fitting a 2d Gaussian
to the data using sigma clipped statistics. All values smaller than
(MEDIAN + sigfactor*STDDEV) will be replaced by small random Gaussian
noise.
shift : bool, optional
If True the image is shifted.
imlib : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
interpolation : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
fit_type: str, optional {'gaus','moff'}
Type of 2d fit to infer the centroid of the satellite spots.
filter_freq: tuple of 2 floats, optional
If the first (resp. second) element of the tuple is larger than 0,
a high-pass (resp. low-pass) filter is applied to the image,
before fitting the satellite spots. The elements should correspond to
the fwhm_size of the frame_filter_highpass and frame_filter_lowpass
functions, respectively. If both elements are non-zero, both high-pass
and low-pass filter of the image are applied, in that order. This can be
useful to better isolate the signal from the satellite spots.
border_mode : {'reflect', 'nearest', 'constant', 'mirror', 'wrap'}
Points outside the boundaries of the input are filled accordingly.
With 'reflect', the input is extended by reflecting about the edge of
the last pixel. With 'nearest', the input is extended by replicating the
last pixel. With 'constant', the input is extended by filling all values
beyond the edge with zeros. With 'mirror', the input is extended by
reflecting about the center of the last pixel. With 'wrap', the input is
extended by wrapping around to the opposite edge. Default is 'reflect'.
debug : bool, optional
If True debug information is printed and plotted.
verbose : bool, optional
If True the intersection and shifts information is printed out.
Returns
-------
array_rec : 2d numpy array
Shifted images. *Only returned if ``shift=True``.*
shifty, shiftx : floats
Shift Y,X to get to the true center.
ceny, cenx : floats
Center Y,X coordinates of the true center. *Only returned if
``shift=True``.*
Note
----
We are solving a linear system:
.. code-block:: python
A1 * x + B1 * y = C1
A2 * x + B2 * y = C2
Cramer's rule - solution can be found in determinants:
.. code-block:: python
x = Dx/D
y = Dy/D
where D is main determinant of the system:
.. code-block:: python
A1 B1
A2 B2
and Dx and Dy can be found from matrices:
.. code-block:: python
C1 B1
C2 B2
and
.. code-block:: python
A1 C1
A2 C2
C column consequently substitutes the coef. columns of x and y
L stores our coefs A, B, C of the line equations.
.. code-block:: python
For D: L1[0] L1[1] for Dx: L1[2] L1[1] for Dy: L1[0] L1[2]
L2[0] L2[1] L2[2] L2[1] L2[0] L2[2]
"""
def line(p1, p2):
"""Calculate coefs A, B, C of line equation by 2 points."""
A = (p1[1] - p2[1])
B = (p2[0] - p1[0])
C = (p1[0] * p2[1] - p2[0] * p1[1])
return A, B, -C
def intersection(L1, L2):
"""Find intersection point (if any) of 2 lines provided by coefs."""
D = L1[0] * L2[1] - L1[1] * L2[0]
Dx = L1[2] * L2[1] - L1[1] * L2[2]
Dy = L1[0] * L2[2] - L1[2] * L2[0]
if D != 0:
x = Dx / D
y = Dy / D
return x, y
else:
return None
# --------------------------------------------------------------------------
check_array(array, dim=2)
if fit_type not in ['gaus', 'moff']:
raise TypeError('fit_type is not recognized')
if not isinstance(xy, (tuple, list)) or len(xy) != 4:
raise TypeError('Input waffle spot coordinates in wrong format (must '
'be a tuple of 4 tuples')
cy, cx = frame_center(array)
centx = []
centy = []
subims = []
if filter_freq[0] > 0:
array = frame_filter_highpass(array, mode='gauss-subt',
fwhm_size=filter_freq[0])
if filter_freq[1] > 0:
array = frame_filter_lowpass(array, fwhm_size=filter_freq[1])
for i in range(len(xy)):
sim, y, x = get_square(array, subi_size, xy[i][1], xy[i][0],
position=True, verbose=False)
if fit_type == 'gaus':
cent2dgy, cent2dgx = fit_2dgaussian(sim, crop=False, threshold=True,
sigfactor=sigfactor,
debug=debug, full_output=False)
else:
cent2dgy, cent2dgx = fit_2dmoffat(sim, crop=False, threshold=True,
sigfactor=sigfactor, debug=debug,
full_output=False)
centx.append(cent2dgx + x)
centy.append(cent2dgy + y)
subims.append(sim)
cent2dgx_1, cent2dgx_2, cent2dgx_3, cent2dgx_4 = centx
cent2dgy_1, cent2dgy_2, cent2dgy_3, cent2dgy_4 = centy
si1, si2, si3, si4 = subims
if debug:
plot_frames((si1, si2, si3, si4), colorbar=True)
print('Centroids X,Y:')
print(cent2dgx_1, cent2dgy_1)
print(cent2dgx_2, cent2dgy_2)
print(cent2dgx_3, cent2dgy_3)
print(cent2dgx_4, cent2dgy_4)
L1 = line([cent2dgx_1, cent2dgy_1], [cent2dgx_4, cent2dgy_4])
L2 = line([cent2dgx_2, cent2dgy_2], [cent2dgx_3, cent2dgy_3])
R = intersection(L1, L2)
msgerr = "Check that the order of the tuples in `xy` is correct and"
msgerr += " the satellite spots have good S/N"
if R is not None:
shiftx = cx - R[0]
shifty = cy - R[1]
if np.abs(shiftx) < cx * 2 and np.abs(shifty) < cy * 2:
if debug or verbose:
print('Intersection coordinates (X,Y):', R[0], R[1], '\n')
print('Shifts (X,Y): {:.3f}, {:.3f}'.format(shiftx, shifty))
if shift:
array_rec = frame_shift(array, shifty, shiftx, imlib=imlib,
interpolation=interpolation,
border_mode=border_mode)
return array_rec, shifty, shiftx, centy, centx
else:
return shifty, shiftx
else:
raise RuntimeError("Too large shifts. " + msgerr)
else:
raise RuntimeError("Something went wrong, no intersection found. " +
msgerr)
[docs]
def cube_recenter_satspots(array, xy, subi_size=19, sigfactor=6, plot=True,
fit_type='moff', lbda=None, filter_freq=(0, 0),
border_mode='constant', debug=False, verbose=True,
full_output=False):
"""Recenter an image cube based on satellite spots (more details in `.
The function relies on ``frame_center_satspots`` to align each image of the
cube individually (details in ``vip_hci.preproc.frame_center_satspots``).
The function returns the recentered image cube, abd can also plot the
histogram of the shifts, and calculate its statistics. The latter can help
to assess the dispersion of the star center by using waffle/satellite spots
(like those in VLT/SPHERE images) and evaluate the uncertainty of the
position of the center.
Parameters
----------
array : numpy ndarray, 3d
Input cube.
xy : tuple of 4 tuples of 2 elements
Tuple with coordinates X,Y of the 4 satellite spots. When the spots are
in an X configuration, the order is the following: top-left, top-right,
bottom-left and bottom-right. When the spots are in an + (plus-like)
configuration, the order is the following: top, right, left, bottom.
If wavelength vector is not provided, assumes all sat spots of the cube
are at a similar location. If wavelength is provided, only coordinates
of the sat spots in the first channel should be provided. The boxes
location in other channels will be scaled accordingly.
subi_size : int, optional
Size of subimage where the fitting is done.
sigfactor : int, optional
The background pixels will be thresholded before fitting a 2d Gaussian
to the data using sigma clipped statistics. All values smaller than
(MEDIAN + sigfactor*STDDEV) will be replaced by small random Gaussian
noise.
plot : bool, optional
Whether to plot the shifts.
fit_type: str, optional {'gaus','moff'}
Type of 2d fit to infer the centroid of the satellite spots.
lbda: 1d array or list, opt
Wavelength vector. If provided, the subimages will be scaled accordingly
to follow the motion of the satellite spots.
filter_freq: tuple of 2 floats, optional
If the first (resp. second) element of the tuple is larger than 0,
a high-pass (resp. low-pass) filter is applied to the image,
before fitting the satellite spots. The elements should correspond to
the fwhm_size of the frame_filter_highpass and frame_filter_lowpass
functions, respectively. If both elements are non-zero, both high-pass
and low-pass filter of the image are applied, in that order.
This can be useful to isolate the signal from the satellite spots.
border_mode : {'reflect', 'nearest', 'constant', 'mirror', 'wrap'}
Points outside the boundaries of the input are filled accordingly.
With 'reflect', the input is extended by reflecting about the edge of
the last pixel. With 'nearest', the input is extended by replicating the
last pixel. With 'constant', the input is extended by filling all values
beyond the edge with zeros. With 'mirror', the input is extended by
reflecting about the center of the last pixel. With 'wrap', the input is
extended by wrapping around to the opposite edge. Default is 'reflect'.
debug : bool, optional
If True debug information is printed and plotted (fit and residuals,
intersections and shifts). This has to be used carefully as it can
produce too much output and plots.
verbose : bool, optional
Whether to print to stdout the timing and additional info.
full_output : bool, optional
Whether to return 2 1d arrays of shifts along with the recentered cube
or not.
Returns
-------
array_rec
The shifted cube.
shift_y, shift_x
[full_output==True] Shifts Y,X to get to the true center for each image.
sat_y, sat_x
[full_output==True] Y,X positions of the satellite spots in each image.
Order: top-left, top-right, bottom-left and bottom-right.
"""
check_array(array, dim=3)
if verbose:
start_time = time_ini()
n_frames = array.shape[0]
shift_x = np.zeros((n_frames))
shift_y = np.zeros((n_frames))
sat_y = np.zeros([n_frames, 4])
sat_x = np.zeros([n_frames, 4])
array_rec = []
if lbda is not None:
cy, cx = frame_center(array[0])
final_xy = []
rescal = lbda/lbda[0]
for i in range(n_frames):
xy_new = []
for s in range(4):
xy_new.append(
(cx+rescal[i]*(xy[s][0]-cx), cy+rescal[i]*(xy[s][1]-cy)))
xy_new = tuple(xy_new)
final_xy.append(xy_new)
else:
final_xy = [xy for i in range(n_frames)]
if verbose:
print("Final xy positions for sat spots:", final_xy)
print('Looping through the frames, fitting the intersections:')
for i in Progressbar(range(n_frames), verbose=verbose):
res = frame_center_satspots(array[i], final_xy[i], debug=debug,
shift=True, subi_size=subi_size,
sigfactor=sigfactor, fit_type=fit_type,
filter_freq=filter_freq,
verbose=False, border_mode=border_mode)
array_rec.append(res[0])
shift_y[i] = res[1]
shift_x[i] = res[2]
sat_y[i] = res[3]
sat_x[i] = res[4]
if verbose:
timing(start_time)
if plot:
plt.figure(figsize=vip_figsize)
plt.plot(shift_x, 'o-', label='Shifts in x', alpha=0.5)
plt.plot(shift_y, 'o-', label='Shifts in y', alpha=0.5)
plt.legend(loc='best')
plt.grid('on', alpha=0.2)
plt.ylabel('Pixels')
plt.xlabel('Frame number')
plt.figure(figsize=vip_figsize)
b = int(np.sqrt(n_frames))
la = 'Histogram'
_ = plt.hist(shift_x, bins=b, alpha=0.5, label=la + ' shifts X')
_ = plt.hist(shift_y, bins=b, alpha=0.5, label=la + ' shifts Y')
plt.legend(loc='best')
plt.ylabel('Bin counts')
plt.xlabel('Pixels')
if verbose:
msg1 = 'MEAN X,Y: {:.3f}, {:.3f}'
print(msg1.format(np.mean(shift_x), np.mean(shift_y)))
msg2 = 'MEDIAN X,Y: {:.3f}, {:.3f}'
print(msg2.format(np.median(shift_x), np.median(shift_y)))
msg3 = 'STDDEV X,Y: {:.3f}, {:.3f}'
print(msg3.format(np.std(shift_x), np.std(shift_y)))
array_rec = np.array(array_rec)
if full_output:
return array_rec, shift_y, shift_x, sat_y, sat_x
else:
return array_rec
[docs]
def frame_center_radon(array, cropsize=None, hsize_ini=1., step_ini=0.1,
n_iter=5, tol=0.1, mask_center=None, nproc=None,
satspots_cfg=None, theta_0=0, delta_theta=5,
gauss_fit=True, hpf=True, filter_fwhm=8, imlib='vip-fft',
interpolation='lanczos4', full_output=False,
verbose=True, plot=True, debug=False):
"""Find the center of a broadband (co-added) frame with speckles and\
satellite spots elongated towards the star (center).
The function uses the Radon transform implementation from scikit-image, and
follow the algorithm presented in [PUE15]_.
Parameters
----------
array : numpy ndarray
Input 2d array or image.
cropsize : None or odd int, optional
Size in pixels of the cropped central area of the input array that will
be used. It should be large enough to contain the bright elongated
speckle or satellite spots.
hsize_ini : float, optional
Size of the box for the grid search for first centering iteration. The
frame is shifted to each direction from the center in a hsize length
with a given step.
step_ini : float, optional
The step of the coordinates change in the first step. Note: should not
be too fine for efficiency as it is automatically refined at each step.
n_iter : int, optional
Number of iterations for finer recentering. At each step, a finer
step is considered based on the amplitude of the shifts found in the
previous step. Iterations are particularly relevant when mask_center is
not None, as the masked area will change from one iteration to the next.
tol : float, optional
Absolute tolerance on relative shift from one iteration to the next to
consider convergence. If the absolute value of the shift is found to be
less than tol, the iterative algorithm is stopped.
mask_center : None or int, optional
If None the central area of the frame is kept. If int a centered zero
mask will be applied to the frame. By default the center isn't masked.
nproc : int, optional
Number of processes for parallel computing. If None the number of
processes will be set to cpu_count()/2.
satspots_cfg: None or str ('x', '+' or 'custom'), opt
If satellite spots are present, provide a string corresponding to the
configuration of the satellite spots: as a cross ('x'), as a
plus sign ('+') or 'custom' (provide theta_0). Leave to None if no
satellite spots present. Note: setting satspots_cfg to non-None value
leads to varying performance depending on dataset.
theta_0: float between [0,90[, optional
Azimuth of the first satellite spot. Only considered if satspots_cfg is
set to 'custom'.
delta_theta: float, optional
Azimuthal half-width in degrees of the slices considered along a '+' or
'x' pattern to calculate the Radon transform. E.g. if set to 5 for 'x'
configuration, it will consider slices from 40 to 50 deg in each
quadrant.
hpf: bool, optional
Whether to high-pass filter the images
filter_fwhm: float, optional
In case of high-pass filtering, this is the FWHM of the low-pass filter
used for subtraction to the original image to get the high-pass
filtered image (i.e. should be >~ 2 x FWHM).
imlib : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
interpolation : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
full_output: bool, optional
Whether to also return the cost map, and uncertainty on centering.
verbose : bool optional
Whether to print to stdout some messages and info.
plot : bool, optional
Whether to plot the radon cost function.
debug : bool, optional
Whether to print and plot intermediate info.
Returns
-------
optimy, optimx : floats
Values of the Y, X coordinates of the center of the frame based on the
radon optimization. (always returned)
dxy : float
[full_output=True] Uncertainty on center in pixels.
cost_bound : 2d numpy array
[full_output=True] Radon cost function surface.
"""
if array.ndim != 2:
raise TypeError('Input array is not a frame or 2d array')
if verbose:
start_time = time_ini()
def _center_radon(array, cropsize=None, hsize=1., step=0.1,
mask_center=None, nproc=None, satspots_cfg=None,
theta_0=0, d_theta=5, gauss_fit=False,
imlib='vip-fft', interpolation='lanczos4',
verbose=True, plot=True, debug=False):
frame = array.copy()
ori_cent_y, ori_cent_x = frame_center(frame)
if cropsize is not None:
if not cropsize % 2:
raise TypeError("If not None, cropsize should be odd integer")
frame = frame_crop(frame, cropsize, verbose=False)
listyx = np.linspace(start=-hsize, stop=hsize, num=int(2*hsize/step)+1,
endpoint=True)
if not mask_center:
radint = 0
else:
if not isinstance(mask_center, int):
raise TypeError
radint = mask_center
coords = [(y, x) for y in listyx for x in listyx]
# coords = [(x, y) for y in listyx for x in listyx]
cent, _ = frame_center(frame)
frame = get_annulus_segments(frame, radint, cent-radint, mode="mask")[0]
if debug:
if satspots_cfg is not None:
samples = 10
if satspots_cfg == 'x':
theta = np.hstack((np.linspace(start=45-d_theta,
stop=45+d_theta,
num=samples,
endpoint=False),
np.linspace(start=135-d_theta,
stop=135+d_theta,
num=samples,
endpoint=False),
np.linspace(start=225-d_theta,
stop=225+d_theta,
num=samples,
endpoint=False),
np.linspace(start=315-d_theta,
stop=315+d_theta,
num=samples,
endpoint=False)))
elif satspots_cfg == '+':
theta = np.hstack((np.linspace(start=-d_theta,
stop=d_theta,
num=samples,
endpoint=False),
np.linspace(start=90-d_theta,
stop=90+d_theta,
num=samples,
endpoint=False),
np.linspace(start=180-d_theta,
stop=180+d_theta,
num=samples,
endpoint=False),
np.linspace(start=270-d_theta,
stop=270+d_theta,
num=samples,
endpoint=False)))
elif satspots_cfg == 'custom':
theta = np.hstack((np.linspace(start=90-theta_0-d_theta,
stop=90-theta_0+d_theta,
num=samples, endpoint=False),
np.linspace(start=180-theta_0-d_theta,
stop=180-theta_0+d_theta,
num=samples, endpoint=False),
np.linspace(start=270-theta_0-d_theta,
stop=270-theta_0+d_theta,
num=samples, endpoint=False),
np.linspace(start=360-theta_0-d_theta,
stop=360-theta_0+d_theta,
num=samples,
endpoint=False)))
else:
msg = "If not None, satspots_cfg can only be 'x' or '+'."
raise ValueError(msg)
sinogram = radon(frame, theta=theta, circle=True)
plot_frames((frame, sinogram))
# print(np.sum(np.abs(sinogram[int(cent), :])))
else:
theta = np.linspace(start=0, stop=360, num=int(cent*2),
endpoint=False)
sinogram = radon(frame, theta=theta, circle=True)
plot_frames((frame, sinogram))
# print(np.sum(np.abs(sinogram[int(cent), :])))
if nproc is None:
nproc = cpu_count() // 2 # Hyper-threading doubles the # of cores
if nproc == 1:
costf = []
for coord in coords:
res = _radon_costf(frame, cent, radint, coord, satspots_cfg,
theta_0, d_theta, imlib, interpolation)
costf.append(res)
costf = np.array(costf)
elif nproc > 1:
res = pool_map(nproc, _radon_costf, frame, cent, radint,
iterable(coords), satspots_cfg, theta_0, d_theta,
imlib, interpolation)
costf = np.array(res)
if verbose:
msg = 'Done {} radon transform calls distributed in {} processes'
print(msg.format(len(coords), nproc))
cost_bound = costf.reshape(listyx.shape[0], listyx.shape[0])
if plot:
plt.contour(cost_bound, cmap='CMRmap', origin='lower')
plt.imshow(cost_bound, cmap='CMRmap', origin='lower',
interpolation='nearest')
plt.colorbar()
plt.grid('off')
plt.show()
if gauss_fit: # or full_output:
# fit a 2d gaussian to the surface
fit_res = fit_2dgaussian(cost_bound-np.amin(cost_bound), crop=False,
threshold=False, sigfactor=3, debug=debug,
full_output=True)
# optimal shift -> optimal position
opt_yind = float(fit_res['centroid_y'].iloc[0])
opt_xind = float(fit_res['centroid_x'].iloc[0])
opt_yshift = -hsize + opt_yind*step
opt_xshift = -hsize + opt_xind*step
optimy = ori_cent_y - opt_yshift
optimx = ori_cent_x - opt_xshift
# find uncertainty on centering
unc_y = float(fit_res['fwhm_y'].iloc[0])*step
unc_x = float(fit_res['fwhm_x'].iloc[0])*step
dyx = (unc_y, unc_x) # np.sqrt(unc_y**2 + unc_x**2)
# Replace the position found by Gaussian fit
if not gauss_fit:
# OLD CODE:
argm = np.argmax(costf) # index of 1st max in 1d cost function
opt_yshift, opt_xshift = coords[argm]
# maxima in the 2d cost function surface
# num_max = np.where(cost_bound == cost_bound.max())[0].shape[0]
# ind_maxy, ind_maxx = np.where(cost_bound == cost_bound.max())
# argmy = ind_maxy[int(np.ceil(num_max/2)) - 1]
# argmx = ind_maxx[int(np.ceil(num_max/2)) - 1]
# y_grid = np.array(coords)[:, 0].reshape(listyx.shape[0],
# listyx.shape[0])
# x_grid = np.array(coords)[:, 1].reshape(listyx.shape[0],
# listyx.shape[0])
# optimy = ori_cent_y-y_grid[argmy, 0] # subtract optimal shift
# optimx = ori_cent_x-x_grid[0, argmx] # subtract optimal shift
optimy = ori_cent_y - opt_yshift
optimx = ori_cent_x - opt_xshift
dyx = (step, step)
if verbose:
print('Cost function max: {}'.format(costf.max()))
# print('Cost function # maxima: {}'.format(num_max))
m = 'Finished grid search radon optimization: dy={:.3f}, dx={:.3f}'
print(m.format(opt_yshift, opt_xshift))
timing(start_time)
return optimy, optimx, opt_yshift, opt_xshift, dyx, cost_bound
# high-pass filtering if requested
if hpf:
array = frame_filter_highpass(array, mode='gauss-subt',
fwhm_size=filter_fwhm)
ori_cent_y, ori_cent_x = frame_center(array)
hsize = hsize_ini
step = step_ini
opt_yshift = 0
opt_xshift = 0
for i in range(n_iter):
if verbose:
print("*** Iteration {}/{} ***".format(i+1, n_iter))
res = _center_radon(array, cropsize=cropsize, hsize=hsize, step=step,
mask_center=mask_center, nproc=nproc,
satspots_cfg=satspots_cfg, theta_0=theta_0,
d_theta=delta_theta, gauss_fit=gauss_fit,
imlib=imlib, interpolation=interpolation,
verbose=verbose, plot=plot, debug=debug)
_, _, y_shift, x_shift, dyx, cost_bound = res
array = frame_shift(array, y_shift, x_shift, imlib=imlib,
interpolation=interpolation)
opt_yshift += y_shift
opt_xshift += x_shift
abs_shift = np.sqrt(y_shift**2 + x_shift**2)
if abs_shift < tol:
if i == 0:
msg = "Null shifts found at first iteration for step = {}. Try"
msg += " with a finer step."
raise ValueError(msg.format(step))
else:
msg = "Convergence found after {} iterations (final step = {})."
print(msg.format(i+1, step))
break
# refine box - OR NOT?!
# max_sh = np.amax(np.abs(np.array([y_shift, x_shift])))
hsize *= 0.75 # *max_sh
step *= 0.75
optimy = ori_cent_y+opt_yshift # ORI: -
optimx = ori_cent_x+opt_xshift # ORI: -
if verbose:
print("Star (x,y) location: {:.2f}, {:.2f}".format(optimx, optimy))
print("Final (x,y) shifts: {:.2f}, {:.2f}".format(opt_xshift,
opt_yshift))
if full_output:
return optimy, optimx, dyx, cost_bound
else:
return optimy, optimx
def _radon_costf(frame, cent, radint, coords, satspots_cfg=None, theta_0=0,
delta_theta=5, imlib='vip-fft', interpolation='lanczos4'):
"""Calculate Radon cost function used in frame_center_radon()."""
frame_shifted = frame_shift(frame, coords[0], coords[1], imlib=imlib,
interpolation=interpolation)
frame_shifted_ann = get_annulus_segments(frame_shifted, radint,
cent-radint, mode="mask")[0]
if satspots_cfg is None:
theta = np.linspace(start=0, stop=360, num=frame_shifted_ann.shape[0],
endpoint=False)
elif satspots_cfg == '+':
samples = 10
theta = np.hstack((np.linspace(start=-delta_theta, stop=delta_theta,
num=samples, endpoint=False),
np.linspace(start=90-delta_theta,
stop=90+delta_theta, num=samples,
endpoint=False),
np.linspace(start=180-delta_theta,
stop=180+delta_theta, num=samples,
endpoint=False),
np.linspace(start=270-delta_theta,
stop=270+delta_theta, num=samples,
endpoint=False)))
elif satspots_cfg == 'x':
samples = 10
theta = np.hstack((np.linspace(start=45-delta_theta,
stop=45+delta_theta, num=samples,
endpoint=False),
np.linspace(start=135-delta_theta,
stop=135+delta_theta, num=samples,
endpoint=False),
np.linspace(start=225-delta_theta,
stop=225+delta_theta, num=samples,
endpoint=False),
np.linspace(start=315-delta_theta,
stop=315+delta_theta, num=samples,
endpoint=False)))
elif satspots_cfg == 'custom':
samples = 10
theta = np.hstack((np.linspace(start=theta_0-delta_theta,
stop=theta_0+delta_theta,
num=samples, endpoint=False),
np.linspace(start=theta_0+90-delta_theta,
stop=theta_0+90+delta_theta,
num=samples, endpoint=False),
np.linspace(start=theta_0+180-delta_theta,
stop=theta_0+180+delta_theta,
num=samples, endpoint=False),
np.linspace(start=theta_0+270-delta_theta,
stop=theta_0+270+delta_theta,
num=samples, endpoint=False)))
sinogram = radon(frame_shifted_ann, theta=theta, circle=True)
# costf = np.sum(np.abs(sinogram[int(cent), :])) # ORI DEF
# rather consider the sum of the 4 top values?
qstep = len(theta)//4
sort_sin = []
for i in range(4):
sort_sin.append(np.nanmax(sinogram[int(cent), i*qstep:(i+1)*qstep]))
costf = np.nansum(sort_sin)
return costf
[docs]
def cube_recenter_radon(array, full_output=False, verbose=True, imlib='vip-fft',
interpolation='lanczos4', border_mode='reflect',
nproc=None, **kwargs):
"""Recenter a cube using the Radon transform, as in [PUE15]_.
The function loops through its frames, relying on the ``frame_center_radon``
function for the recentering.
Parameters
----------
array : numpy ndarray
Input 3d array or cube.
full_output : {False, True}, bool optional
If True the recentered cube is returned along with the y and x shifts.
verbose : {True, False}, bool optional
Whether to print timing and intermediate information to stdout.
imlib : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
interpolation : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
border_mode : {'reflect', 'nearest', 'constant', 'mirror', 'wrap'}
Points outside the boundaries of the input are filled accordingly.
With 'reflect', the input is extended by reflecting about the edge of
the last pixel. With 'nearest', the input is extended by replicating the
last pixel. With 'constant', the input is extended by filling all values
beyond the edge with zeros. With 'mirror', the input is extended by
reflecting about the center of the last pixel. With 'wrap', the input is
extended by wrapping around to the opposite edge. Default is 'reflect'.
nproc : int, optional
Number of processes for parallel computing. If None the number of
processes will be set to cpu_count()/2.
kwargs:
Other optional parameters for ``vip_hci.preproc.frame_center_radon``
function, such as cropsize, hsize, step, satspots_cfg, mask_center,
hpf, filter_fwhm, nproc or debug.
Returns
-------
array_rec : 3d ndarray
Recentered cube.
y, x : 1d arrays of floats
[full_output] Shifts in y and x.
dyx: 1d array of floats
[full_output] Array of uncertainty on center in pixels.
"""
check_array(array, dim=3)
if nproc is None:
nproc = int(cpu_count() / 2)
if verbose:
start_time = time_ini()
n_frames = array.shape[0]
x = np.zeros((n_frames))
y = np.zeros((n_frames))
dyx = np.zeros((n_frames, 2))
cy, cx = frame_center(array[0])
array_rec = array.copy()
for i in Progressbar(range(n_frames), desc="Recentering frames...",
verbose=verbose):
res = frame_center_radon(array[i], verbose=False, plot=False,
imlib=imlib, interpolation=interpolation,
full_output=True, nproc=nproc, **kwargs)
y[i] = res[0]
x[i] = res[1]
dyx[i] = res[2]
array_rec[i] = frame_shift(array[i], cy-y[i], cx-x[i], imlib=imlib,
interpolation=interpolation,
border_mode=border_mode)
if verbose:
timing(start_time)
if full_output:
return array_rec, y-cy, x-cx, dyx
else:
return array_rec
[docs]
def cube_recenter_dft_upsampling(array, upsample_factor=100, subi_size=None,
center_fr1=None, negative=False, fwhm=4,
imlib='vip-fft', interpolation='lanczos4',
mask=None, border_mode='reflect', log=False,
collapse='median', full_output=False,
verbose=True, nproc=None, save_shifts=False,
debug=False, plot=True, **collapse_args):
"""Recenter a cube of frames using the DFT upsampling method as proposed\
in [GUI08]_ and implemented in the ``phase_cross_correlation`` function\
from scikit-image.
The algorithm (DFT upsampling) obtains an initial estimate of the
cross-correlation peak by an FFT and then refines the shift estimation by
upsampling the DFT only in a small neighborhood of that estimate by means
of a matrix-multiply DFT.
Optionally, after alignment of all images to the first one, a 2D Gaussian
fit can be made to the mean image to recenter them based on the location of
the Gaussian centroid. This second stage is performed if subi_size is not
None.
Parameters
----------
array : numpy ndarray
Input cube.
upsample_factor : int, optional
Upsampling factor (default 100). Images will be registered to within
1/upsample_factor of a pixel. The larger the slower the algorithm.
subi_size : int or None, optional
Size of the square subimage sides in pixels, used to find the centroid
of the mean aligned cube image (i.e. after DFT-based registration). If
subi_size is None then the first frame is assumed to be
centered already.
center_fr1 = (cy_1, cx_1) : Tuple, optional
[subi_size != None] Coordinates of the center of the subimage for
fitting a 2d Gaussian. Since the first part of the function of the
algorithm aligns all subsequent frames to the first one. This tuple
should be the rough coordinates of the centroid in the first frame. If
not provided, the function considers the center of the images.
negative : bool, optional
[subi_size != None] If True the final centroiding is done with a
negative 2D Gaussian fit, instead of a positive one.
fwhm : float, optional
[subi_size != None] First guess of the FWHM in pixels for the Gaussian
fit.
nproc : int or None, optional
Number of processes (>1) for parallel computing. If 1 then it runs in
serial. If None the number of processes will be set to (cpu_count()/2).
imlib : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
interpolation : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
mask: 2D np.ndarray, optional
Binary mask indicating where the cross-correlation should be calculated
in the images. If provided, should be the same size as array frames.
[Note: requires skimage >= 0.18.0]
border_mode : {'reflect', 'nearest', 'constant', 'mirror', 'wrap'}
Points outside the boundaries of the input are filled accordingly.
With 'reflect', the input is extended by reflecting about the edge of
the last pixel. With 'nearest', the input is extended by replicating the
last pixel. With 'constant', the input is extended by filling all values
beyond the edge with zeros. With 'mirror', the input is extended by
reflecting about the center of the last pixel. With 'wrap', the input is
extended by wrapping around to the opposite edge. Default is 'reflect'.
log : bool, optional
Whether to run the cross-correlation algorithm on images converted in
log scale. This can be useful to leverage the whole extent of the PSF
and be less dominated by the brightest central pixels.
collapse : {'median', 'mean', 'sum', 'max', 'trimmean', 'absmean', 'wmean'}
Method used to collapse the aligned cube before 2D Gaussian fit. Should
be an argument accepted by the ``vip_hci.preproc.cube_collapse``
function.
full_output : bool, optional
Whether to return 2 1d arrays of shifts along with the recentered cube
or not.
verbose : bool, optional
Whether to print to stdout the timing or not.
save_shifts : bool, optional
Whether to save the shifts to a file in disk.
debug : bool, optional
Whether to print to stdout the shifts or not.
plot : bool, optional
If True, the shifts are plotted.
collapse_args: opt
Additional options passed to the ``vip_hci.preproc.cube_collapse``
function.
Returns
-------
array_recentered : numpy ndarray
The recentered cube.
y : numpy ndarray
[full_output=True] 1d array with the shifts in y.
x : numpy ndarray
[full_output=True] 1d array with the shifts in x.
Note
----
This function uses the implementation from scikit-image of the algorithm
described in [GUI08]_. This algorithm registers two images (2-D rigid
translation) within a fraction of a pixel specified by the user. Instead of
computing a zero-padded FFT (fast Fourier transform), this code
uses selective upsampling by a matrix-multiply DFT (discrete FT) to
dramatically reduce computation time and memory without sacrificing
accuracy. With this procedure all the image points are used to compute the
upsampled cross-correlation in a very small neighborhood around its peak.
"""
if verbose:
start_time = time_ini()
check_array(array, dim=3)
if mask is not None:
if mask.shape != array.shape[-2:]:
msg = "If provided, mask should have same shape as frames"
raise TypeError(msg)
n_frames, sizey, sizex = array.shape
if subi_size is not None:
if center_fr1 is None:
print("`center_fr1` not provided")
print("Using the coordinates of the 1st frame center for "
"the Gaussian 2d fit")
cy_1, cx_1 = frame_center(array[0])
else:
cy_1, cx_1 = center_fr1
if not isinstance(subi_size, int):
raise ValueError('subi_size must be an integer or None')
if subi_size < fwhm:
raise ValueError('`subi_size` (value in pixels) is too small')
if sizey % 2 == 0:
if subi_size % 2 != 0:
subi_size += 1
print('`subi_size` is odd (while frame size is even)')
print('Setting `subi_size` to {} pixels'.format(subi_size))
else:
if subi_size % 2 == 0:
subi_size += 1
print('`subi_size` is even (while frame size is odd)')
print('Setting `subi_size` to {} pixels'.format(subi_size))
n_frames = array.shape[0]
x = np.zeros((n_frames))
y = np.zeros((n_frames))
array_rec = array.copy()
cy, cx = frame_center(array[0])
# convert to log scale
if log:
array_rec -= (np.nanmin(array_rec)-1)
array_rec = np.log(array_rec)
# Finding the shifts with DFT upsampling of each frame wrt the first
if nproc is None:
nproc = cpu_count() // 2 # Hyper-threading doubles the # of cores
if nproc == 1:
for i in Progressbar(range(1, n_frames),
desc="frames", verbose=verbose):
y[i], x[i], array_rec[i] = _shift_dft(array_rec, array_rec, i,
upsample_factor, mask,
interpolation, imlib,
border_mode)
elif nproc > 1:
res = pool_map(nproc, _shift_dft, array_rec, array_rec,
iterable(range(1, n_frames)), upsample_factor, mask,
interpolation, imlib, border_mode)
res = np.array(res, dtype=object)
y[1:] = res[:, 0]
x[1:] = res[:, 1]
array_rec[1:] = [frames for frames in res[:, 2]]
else:
raise ValueError("nproc should be an int > 0.")
if debug:
print("\nShifts in X and Y")
for i in range(n_frames):
print(x[i], y[i])
# Centroiding mean frame with 2d gaussian and shifting (only necessary if
# first frame was not well-centered)
msg0 = "The rest of the frames will be shifted by cross-correlation wrt "
msg0 += "the 1st"
if subi_size is not None:
# before 2D gaussian fit, take non-log images
if log:
array_rec = cube_shift(array, shift_y=y, shift_x=x, imlib=imlib,
interpolation=interpolation, nproc=nproc)
marray_al = cube_collapse(array_rec, mode=collapse, **collapse_args)
y1, x1 = _centroid_2dg_frame([marray_al], 0, subi_size,
cy_1, cx_1, negative, debug, fwhm)
x[:] += cx - x1
y[:] += cy - y1
if verbose:
msg = "Shift for first frame X,Y=({:.3f}, {:.3f})"
print(msg.format(x[0], y[0]))
print(msg0)
if debug:
titd = "original / shifted 1st frame subimage"
plot_frames((frame_crop(array[0], subi_size, verbose=False),
frame_crop(array_rec[0], subi_size, verbose=False)),
grid=True, title=titd)
else:
if verbose:
print("The first frame is assumed to be well centered wrt the"
"center of the array")
print(msg0)
array_rec = cube_shift(array, shift_y=y, shift_x=x, imlib=imlib,
interpolation=interpolation, nproc=nproc)
if verbose:
timing(start_time)
if plot:
plt.figure(figsize=vip_figsize)
plt.plot(x, 'o-', label='shifts in x', alpha=0.5)
plt.plot(y, 'o-', label='shifts in y', alpha=0.5)
plt.legend(loc='best')
plt.grid('on', alpha=0.2)
plt.ylabel('Pixels')
plt.xlabel('Frame number')
plt.figure(figsize=vip_figsize)
b = int(np.sqrt(n_frames))
la = 'Histogram'
_ = plt.hist(x, bins=b, alpha=0.5, label=la + ' shifts X')
_ = plt.hist(y, bins=b, alpha=0.5, label=la + ' shifts Y')
plt.legend(loc='best')
plt.ylabel('Bin counts')
plt.xlabel('Pixels')
if save_shifts:
np.savetxt('recent_dft_shifts.txt', np.transpose([y, x]), fmt='%f')
if full_output:
return array_rec, y, x
else:
return array_rec
def _shift_dft(array_rec, array, frnum, upsample_factor, mask, interpolation,
imlib, border_mode):
"""Align images using a DFT-based cross-correlation algorithm."""
shift_yx = phase_cross_correlation(array_rec[0], array[frnum],
upsample_factor=upsample_factor,
reference_mask=mask)
y_i, x_i = shift_yx[0]
array_rec_i = frame_shift(array[frnum], shift_y=y_i, shift_x=x_i,
imlib=imlib, interpolation=interpolation,
border_mode=border_mode)
return y_i, x_i, array_rec_i
[docs]
def cube_recenter_2dfit(array, xy=None, fwhm=4, subi_size=5, model='gauss',
nproc=1, imlib='vip-fft', interpolation='lanczos4',
offset=None, negative=False, threshold=False,
sigfactor=2, fix_neg=False, params_2g=None,
border_mode='reflect', save_shifts=False,
full_output=False, verbose=True, debug=False,
plot=True):
"""Recenter the frames of a cube such that the centroid of a fitted 2D\
model is placed at the center of the frames.
The shifts are found by fitting a 2D Gaussian, Moffat, Airy or double
Gaussian (positive+negative), as set by ``model``, to a subimage centered at
``xy``. This assumes the frames don't have too large shifts (<5px). The
frames are then shifted using the function frame_shift().
Parameters
----------
array : numpy ndarray
Input cube.
xy : tuple of integers or floats
Integer coordinates of the center of the subimage. For the double
Gaussian fit with fixed negative gaussian (``fix_neg=True``), this
should correspond to the exact location of the center of the negative
Gaussian (e.g. the center of the coronagraph mask). In that case a tuple
of floats is accepted.
fwhm : float or numpy ndarray
FWHM size in pixels, either one value (float) that will be the same for
the whole cube, or an array of floats with the same dimension as the
0th dim of array, containing the fwhm for each channel (e.g. in the case
of an IFS cube, where the FWHM varies with wavelength).
subi_size : int, optional
Size of the square subimage sides in pixels.
model : str, optional
Sets the type of fit to be used. 'gauss' for a 2d Gaussian fit,
'moff' for a 2d Moffat fit, 'airy' for a 2d Airy disk fit, and
'2gauss' for a 2d double Gaussian (positive+negative) fit.
nproc : int or None, optional
Number of processes (>1) for parallel computing. If 1 then it runs in
serial. If None the number of processes will be set to (cpu_count()/2).
imlib : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
interpolation : str, optional
See the documentation of the ``vip_hci.preproc.frame_shift`` function.
offset : tuple of floats, optional
If None the region of the frames used for the 2d Gaussian/Moffat fit is
shifted to the center of the images (2d arrays). If a tuple is given it
serves as the offset of the fitted area wrt the center of the 2d arrays.
negative : bool, optional
If True a negative 2d Gaussian/Moffat fit is performed.
fix_neg: bool, optional
In case of a double gaussian fit, whether to fix the parameters of the
megative gaussian. If True, they should be provided in params_2g.
params_2g: None or dictionary, optional
In case of a double gaussian fit, dictionary with either fixed or first
guess parameters for the double gaussian. E.g.:
params_2g = {'fwhm_neg': 3.5, 'fwhm_pos': (3.5,4.2), 'theta_neg': 48.,
'theta_pos':145., 'neg_amp': 0.5}
- fwhm_neg: float or tuple with fwhm of neg gaussian
- fwhm_pos: can be a tuple for x and y axes of pos gaussian (replaces
fwhm)
- theta_neg: trigonometric angle of the x axis of the neg gaussian (deg)
- theta_pos: trigonometric angle of the x axis of the pos gaussian (deg)
- neg_amp: amplitude of the neg gaussian wrt the amp of the positive one
Note: it is always recommended to provide theta_pos and theta_neg for a
better fit.
threshold : bool, optional
If True the background pixels (estimated using sigma clipped statistics)
will be replaced by small random Gaussian noise (recommended for 2g).
sigfactor: float, optional
If thresholding is performed, set the the threshold in terms of
gaussian sigma in the subimage (will depend on your cropping size).
border_mode : {'reflect', 'nearest', 'constant', 'mirror', 'wrap'}
Points outside the boundaries of the input are filled accordingly.
With 'reflect', the input is extended by reflecting about the edge of
the last pixel. With 'nearest', the input is extended by replicating the
last pixel. With 'constant', the input is extended by filling all values
beyond the edge with zeros. With 'mirror', the input is extended by
reflecting about the center of the last pixel. With 'wrap', the input is
extended by wrapping around to the opposite edge. Default is 'reflect'.
save_shifts : bool, optional
Whether to save the shifts to a file in disk.
full_output : bool, optional
Whether to return 2 1d arrays of shifts along with the recentered cube
or not.
verbose : bool, optional
Whether to print to stdout the timing or not.
debug : bool, optional
If True the details of the fitting are shown. Won't work when the cube
contains >20 frames (as it might produce an extremely long output).
plot : bool, optional
If True, the shifts are plotted.
Returns
-------
array_rec: numpy ndarray
The recentered cube.
y : numpy ndarray
[full_output=True] 1d array with the shifts in y.
x : numpy ndarray
[full_output=True] 1d array with the shifts in x.
"""
if verbose:
start_time = time_ini()
check_array(array, dim=3)
n_frames, sizey, sizex = array.shape
if not isinstance(subi_size, int):
raise ValueError('`subi_size` must be an integer')
if sizey % 2 == 0:
if subi_size % 2 != 0:
subi_size += 1
print('`subi_size` is odd (while frame size is even)')
print('Setting `subi_size` to {} pixels'.format(subi_size))
else:
if subi_size % 2 == 0:
subi_size += 1
print('`subi_size` is even (while frame size is odd)')
print('Setting `subi_size` to {} pixels'.format(subi_size))
if isinstance(fwhm, (float, int, np.float32, np.float64)):
fwhm = np.ones(n_frames) * fwhm
if debug and array.shape[0] > 20:
msg = 'Debug with a big array will produce a very long output. '
msg += 'Try with less than 20 frames in debug mode'
raise RuntimeWarning(msg)
if xy is not None:
pos_x, pos_y = xy
cond = model != '2gauss'
if (not isinstance(pos_x, int) or not isinstance(pos_y, int)) and cond:
raise TypeError('`xy` must be a tuple of integers')
else:
pos_y, pos_x = frame_center(array[0])
cy, cx = frame_center(array[0])
array_rec = np.empty_like(array)
if model == 'gauss':
func = _centroid_2dg_frame
elif model == 'moff':
func = _centroid_2dm_frame
elif model == 'airy':
func = _centroid_2da_frame
elif model == '2gauss':
func = _centroid_2d2g_frame
else:
raise ValueError('model not recognized')
if nproc is None:
nproc = cpu_count() // 2 # Hyper-threading doubles the # of cores
if nproc == 1:
res = []
if verbose:
print('2d {}-fitting'.format(model))
for i in Progressbar(range(n_frames), desc="frames", verbose=verbose):
if model == "2gauss":
args = [array, i, subi_size, pos_y, pos_x, debug, fwhm[i],
fix_neg, params_2g, threshold, sigfactor]
else:
args = [array, i, subi_size, pos_y, pos_x, negative, debug,
fwhm[i], threshold, sigfactor]
res.append(func(*args))
res = np.array(res, dtype=object)
elif nproc > 1:
if model == "2gauss":
args = [array, iterable(range(n_frames)), subi_size, pos_y, pos_x,
debug, iterable(fwhm), fix_neg, params_2g, threshold,
sigfactor]
else:
args = [array, iterable(range(n_frames)), subi_size, pos_y, pos_x,
negative, debug, iterable(fwhm), threshold, sigfactor]
res = pool_map(nproc, func, *args)
res = np.array(res, dtype=object)
y = cy - res[:, 0]
x = cx - res[:, 1]
if model == "2gauss" and not fix_neg:
y_neg = res[:, 2]
x_neg = res[:, 3]
fwhm_x = res[:, 4]
fwhm_y = res[:, 5]
fwhm_neg_x = res[:, 6]
fwhm_neg_y = res[:, 7]
theta = res[:, 8]
theta_neg = res[:, 9]
amp_pos = res[:, 10]
amp_neg = res[:, 11]
if offset is not None:
offx, offy = offset
y -= offy
x -= offx
for i in Progressbar(range(n_frames), desc="Shifting", verbose=verbose):
if debug:
print("\nShifts in X and Y")
print(x[i], y[i])
array_rec[i] = frame_shift(array[i], y[i], x[i], imlib=imlib,
interpolation=interpolation,
border_mode=border_mode)
if verbose:
timing(start_time)
if plot:
if nproc > 1:
print("Warning: plots may not show well if nproc is set to a value")
print(" different than 1.")
plt.figure(figsize=vip_figsize)
b = int(np.sqrt(n_frames))
la = 'Histogram'
_ = plt.hist(x, bins=b, alpha=0.5, label=la + ' shifts X')
_ = plt.hist(y, bins=b, alpha=0.5, label=la + ' shifts Y')
if model == "2gauss" and not fix_neg:
_ = plt.hist(cx-x_neg, bins=b, alpha=0.5,
label=la + ' shifts X (neg gaussian)')
_ = plt.hist(cy-y_neg, bins=b, alpha=0.5,
label=la + ' shifts Y (neg gaussian)')
plt.legend(loc='best')
plt.ylabel('Bin counts')
plt.xlabel('Pixels')
plt.figure(figsize=vip_figsize)
plt.plot(y, 'o-', label='shifts in y', alpha=0.5)
plt.plot(x, 'o-', label='shifts in x', alpha=0.5)
plt.legend(loc='best')
plt.grid('on', alpha=0.2)
plt.ylabel('Pixels')
plt.xlabel('Frame number')
if save_shifts:
np.savetxt('recent_gauss_shifts.txt', np.transpose([y, x]), fmt='%f')
if full_output:
if model == "2gauss" and not fix_neg:
return (array_rec, y, x, y_neg, x_neg, fwhm_x, fwhm_y, fwhm_neg_x,
fwhm_neg_y, theta, theta_neg, amp_pos, amp_neg)
return array_rec, y, x
else:
return array_rec
def _centroid_2dg_frame(cube, frnum, size, pos_y, pos_x, negative, debug,
fwhm, threshold=False, sigfactor=1):
"""Find the centroid with a 2d Gaussian fit in a frame of the cube."""
sub_image, y1, x1 = get_square(cube[frnum], size=size, y=pos_y, x=pos_x,
position=True)
# negative gaussian fit
if negative:
sub_image = -sub_image + np.abs(np.min(-sub_image))
y_i, x_i = fit_2dgaussian(sub_image, crop=False, fwhmx=fwhm, fwhmy=fwhm,
threshold=threshold, sigfactor=sigfactor,
debug=debug,
full_output=False)
y_i = y1 + y_i
x_i = x1 + x_i
return y_i, x_i
def _centroid_2dm_frame(cube, frnum, size, pos_y, pos_x, negative, debug,
fwhm, threshold=False, sigfactor=1):
"""Find the centroid with a 2d Moffat fit in a frame of the cube."""
sub_image, y1, x1 = get_square(cube[frnum], size=size, y=pos_y, x=pos_x,
position=True)
# negative fit
if negative:
sub_image = -sub_image + np.abs(np.min(-sub_image))
y_i, x_i = fit_2dmoffat(sub_image, crop=False, fwhm=fwhm, debug=debug,
threshold=threshold, sigfactor=sigfactor,
full_output=False)
y_i = y1 + y_i
x_i = x1 + x_i
return y_i, x_i
def _centroid_2da_frame(cube, frnum, size, pos_y, pos_x, negative, debug,
fwhm, threshold=False, sigfactor=1):
"""Find the centroid with a 2d Airy fit in a frame of the cube."""
sub_image, y1, x1 = get_square(cube[frnum], size=size, y=pos_y, x=pos_x,
position=True)
# negative fit
if negative:
sub_image = -sub_image + np.abs(np.min(-sub_image))
y_i, x_i = fit_2dairydisk(sub_image, crop=False, fwhm=fwhm,
threshold=threshold, sigfactor=sigfactor,
full_output=False, debug=debug)
y_i = y1 + y_i
x_i = x1 + x_i
return y_i, x_i
def _centroid_2d2g_frame(cube, frnum, size, pos_y, pos_x, debug=False, fwhm=4,
fix_neg=True, params_2g=None, threshold=False,
sigfactor=1):
"""Find the centroid with a 2d double Gaussian fit in a cube frame."""
size = min(cube[frnum].shape[0], cube[frnum].shape[1], size)
if isinstance(params_2g, dict):
fwhm_neg = params_2g.get('fwhm_neg', 0.8*fwhm)
fwhm_pos = params_2g.get('fwhm_pos', 2*fwhm)
theta_neg = params_2g.get('theta_neg', 0.)
theta_pos = params_2g.get('theta_pos', 0.)
neg_amp = params_2g.get('neg_amp', 1)
res_DF = fit_2d2gaussian(cube[frnum], crop=True, cent=(pos_x, pos_y),
cropsize=size, fwhm_neg=fwhm_neg,
fwhm_pos=fwhm_pos, neg_amp=neg_amp,
fix_neg=fix_neg, theta_neg=theta_neg,
theta_pos=theta_pos, threshold=threshold,
sigfactor=sigfactor, full_output=True, debug=debug)
y_i = res_DF['centroid_y']
x_i = res_DF['centroid_x']
if not fix_neg:
y_neg = res_DF['centroid_y_neg']
x_neg = res_DF['centroid_x_neg']
fwhm_x = res_DF['fwhm_x']
fwhm_y = res_DF['fwhm_y']
fwhm_neg_x = res_DF['fwhm_x_neg']
fwhm_neg_y = res_DF['fwhm_y_neg']
theta = res_DF['theta']
theta_neg = res_DF['theta_neg']
amp_pos = res_DF['amplitude']
amp_neg = res_DF['amplitude_neg']
return (y_i, x_i, y_neg, x_neg, fwhm_x, fwhm_y, fwhm_neg_x, fwhm_neg_y,
theta, theta_neg, amp_pos, amp_neg)
return y_i, x_i
[docs]
def cube_recenter_via_speckles(cube_sci, cube_ref=None, alignment_iter=5,
gammaval=1, min_spat_freq=0.5, max_spat_freq=3,
fwhm=4, upsample_factor=100, debug=False,
recenter_median=False, fit_type='gaus',
negative=True, crop=True, subframesize=25,
mask=None, ann_rad=0.5, ann_rad_search=False,
ann_width=0.5, collapse='median',
imlib='vip-fft', interpolation='lanczos4',
border_mode='reflect', log=True, plot=True,
full_output=False, nproc=None, **collapse_args):
"""Register frames based on the median speckle pattern.
The function also optionally centers images based on the position of the
vortex null in the median frame (through a negative Gaussian fit or an
annulus fit). By default, images are filtered to isolate speckle spatial
frequencies, and converted to log-scale before the cross-correlation based
alignment. The image cube should already be centered within ~2px accuracy
before being passed to this function (e.g. through an eyeball crop using
``vip_hci.preproc.cube_crop_frames``).
Parameters
----------
cube_sci : numpy ndarray
Science cube.
cube_ref : numpy ndarray
Reference cube (e.g. for NIRC2 data in RDI mode).
alignment_iter : int, optional
Number of alignment iterations (recomputes median after each iteration).
gammaval : int, optional
Applies a gamma correction to emphasize speckles (useful for faint
stars).
min_spat_freq : float, optional
Spatial frequency for low pass filter.
max_spat_freq : float, optional
Spatial frequency for high pass filter.
fwhm : float, optional
Full width at half maximum.
upsample_factor: int, optional
Upsampling factor (default 100). Images will be registered to within
1/upsample_factor of a pixel. The larger the slower the algorithm.
debug : bool, optional
Outputs extra info.
recenter_median : bool, optional
Recenter the frames at each iteration based on a 2d fit.
fit_type : str, optional
If recenter_median is True, this is the model to which the image is
fitted to for recentering. 'gaus' works well for NIRC2_AGPM data.
'ann' works better for NACO+AGPM data.
negative : bool, optional
If True, uses a negative gaussian fit to determine the center of the
median frame.
crop: bool, optional
Whether to calculate the recentering on a cropped version of the cube
that is speckle-dominated (recommended).
subframesize : int, optional
Sub-frame window size used. Should cover the region where speckles are
the dominant noise source.
mask: 2D np.ndarray, optional
Binary mask indicating where the cross-correlation should be calculated
in the images. If provided, should be the same size as array frames.
ann_rad: float, optional
[if fit_type='ann'] The expected inner radius of the annulus in FWHM.
ann_rad_search: bool
[if fit_type='ann'] Whether to also search for optimal radius.
ann_width: float, optional
[if fit_type='ann'] The expected radial width of the annulus in FWHM.
collapse : {'median', 'mean', 'sum', 'max', 'trimmean', 'absmean', 'wmean'}
Method used to collapse the aligned cube before 2D Gaussian fit. Should
be an argument accepted by the ``vip_hci.preproc.cube_collapse``
function.
imlib : str, optional
Image processing library to use.
interpolation : str, optional
Interpolation method to use.
border_mode : {'reflect', 'nearest', 'constant', 'mirror', 'wrap'}
Points outside the boundaries of the input are filled accordingly.
With 'reflect', the input is extended by reflecting about the edge of
the last pixel. With 'nearest', the input is extended by replicating the
last pixel. With 'constant', the input is extended by filling all values
beyond the edge with zeros. With 'mirror', the input is extended by
reflecting about the center of the last pixel. With 'wrap', the input is
extended by wrapping around to the opposite edge. Default is 'reflect'.
log : bool
Whether to run the cross-correlation algorithm on images converted in
log scale. This can be useful to leverage the whole extent of the PSF
and be less dominated by the brightest central pixels.
plot : bool, optional
If True, the shifts are plotted.
full_output: bool, optional
Whether to return more variables, useful for debugging.
**collapse_args:
Additional arguments passed to the ``vip_hci.preproc.cube_collapse``
function.
Returns
-------
cube_reg_sci : numpy 3d ndarray
Registered science cube
cube_reg_ref : numpy 3d ndarray
[cube_ref!=None] Cube registered to science frames
cube_sci_lpf : numpy 3d ndarray
[full_output=True] Low+high-pass filtered science cube
cube_stret : numpy 3d ndarray
[full_output=True] cube_stret with stretched values used for cross-corr
cum_x_shifts_sci: numpy 1d array
[full_output=True] Vector of x shifts for science frames
cum_y_shifts_sci: numpy 1d array
[full_output=True] Vector of y shifts for science frames
cum_x_shifts_ref: numpy 1d array
[full_output=True & cube_ref!=None] Vector of x shifts for ref frames
cum_y_shifts_ref: numpy 1d array
[full_output=True & cube_ref!=None] Vector of y shifts for ref frames
"""
n, y, x = cube_sci.shape
check_array(cube_sci, dim=3)
gam = gammaval
if nproc is None:
nproc = cpu_count()//2
if recenter_median and fit_type not in {'gaus', 'ann'}:
raise TypeError("fit type not recognized. Should be 'ann' or 'gaus'")
if crop and not subframesize < y:
raise ValueError('`Subframesize` is too large')
if cube_ref is not None:
ref_star = True
nref = cube_ref.shape[0]
else:
ref_star = False
if crop:
cube_sci_subframe = cube_crop_frames(cube_sci, subframesize, force=True,
verbose=False)
if ref_star:
cube_ref_subframe = cube_crop_frames(cube_ref, subframesize,
force=True, verbose=False)
else:
subframesize = cube_sci.shape[-1]
cube_sci_subframe = np.copy(cube_sci)
if ref_star:
cube_ref_subframe = np.copy(cube_ref)
ceny, cenx = frame_center(cube_sci_subframe[0])
print('Sub frame shape: {}'.format(cube_sci_subframe.shape))
print('Center pixel: ({}, {})'.format(ceny, cenx))
# Filtering cubes. Will be used for alignment purposes
cube_sci_lpf = np.copy(cube_sci_subframe)
if ref_star:
cube_ref_lpf = np.copy(cube_ref_subframe)
cube_sci_lpf = cube_sci_lpf - np.min(cube_sci_lpf)
if ref_star:
cube_ref_lpf = cube_ref_lpf - np.min(cube_ref_lpf)
if max_spat_freq > 0:
median_size = int(fwhm * max_spat_freq)
# Remove spatial frequencies <0.5 lam/D and >3lam/D to isolate speckles
cube_sci_hpf = cube_filter_highpass(cube_sci_lpf, 'median-subt',
median_size=median_size,
verbose=False)
else:
cube_sci_hpf = cube_sci_lpf
if min_spat_freq > 0:
cube_sci_lpf = cube_filter_lowpass(cube_sci_hpf, 'gauss',
fwhm_size=min_spat_freq * fwhm,
verbose=False)
else:
cube_sci_lpf = np.copy(cube_sci_hpf)
if ref_star:
if max_spat_freq > 0:
cube_ref_hpf = cube_filter_highpass(cube_ref_lpf, 'median-subt',
median_size=median_size,
verbose=False)
else:
cube_ref_hpf = cube_ref_lpf
if min_spat_freq > 0:
cube_ref_lpf = cube_filter_lowpass(cube_ref_hpf, 'gauss',
fwhm_size=min_spat_freq * fwhm,
verbose=False)
else:
cube_ref_lpf = np.copy(cube_ref_hpf)
if ref_star:
align_cube = np.zeros((1 + n + nref, subframesize, subframesize))
align_cube[1:(n + 1), :, :] = cube_sci_lpf
align_cube[(n + 1):(n + 2 + nref), :, :] = cube_ref_lpf
else:
align_cube = np.zeros((1 + n, subframesize, subframesize))
align_cube[1:(n + 1), :, :] = cube_sci_lpf
n_frames = align_cube.shape[0] # 1+n or 1+n+nref
cum_y_shifts = 0
cum_x_shifts = 0
# no iteration => just align with respect to first frame => converges
if alignment_iter == 1:
align_cube[0] = cube_sci_lpf[0]
# center the cube with stretched values
if log:
cube_stret = np.log10((align_cube-np.min(align_cube)+1)**gam)
else:
cube_stret = align_cube.copy()
if mask is not None and crop:
mask_tmp = frame_crop(mask, subframesize)
else:
mask_tmp = mask
res = cube_recenter_dft_upsampling(cube_stret, center_fr1=(ceny, cenx),
upsample_factor=upsample_factor,
fwhm=fwhm, subi_size=None,
full_output=True,
verbose=debug, plot=plot,
mask=mask_tmp, imlib=imlib,
interpolation=interpolation,
nproc=nproc)
cube_stret, y_shift, x_shift = res
sqsum_shifts = np.sum(np.sqrt(y_shift ** 2 + x_shift ** 2))
print('Square sum of shift vecs: ' + str(sqsum_shifts))
for j in range(1, n_frames):
align_cube[j] = frame_shift(align_cube[j], y_shift[j], x_shift[j],
imlib=imlib,
interpolation=interpolation,
border_mode=border_mode)
cum_y_shifts += y_shift
cum_x_shifts += x_shift
if recenter_median:
align_cube[0] = cube_collapse(align_cube[1:(n + 1)], mode=collapse,
**collapse_args)
# Recenter the median frame using a 2d fit
if fit_type == 'gaus' and negative:
crop_sz = int(fwhm)
elif fit_type == 'gaus':
crop_sz = int(3*fwhm)
else:
crop_sz = int(6*fwhm)
if not crop_sz % 2:
# size should be odd and small, between 5 and 7
if crop_sz > 7:
crop_sz -= 1
else:
crop_sz += 1
sub_image, y1, x1 = get_square(align_cube[0], size=crop_sz,
y=ceny, x=cenx, position=True)
if fit_type == 'gaus':
if negative:
sub_image = -sub_image + np.abs(np.min(-sub_image))
y_i, x_i = fit_2dgaussian(sub_image, crop=False,
threshold=False, sigfactor=1,
debug=debug, full_output=False)
elif fit_type == 'ann':
if upsample_factor > 20:
print("WARNING: the annulus centering may be slow for ")
print("upsample_factor larger than 20")
sampl_cen = 1./upsample_factor
if ann_rad_search:
sampl_rad = fwhm*ann_rad/10 # 1/10 of estimated radius
else:
sampl_rad = None
y_i, x_i, rad = _fit_2dannulus(sub_image, fwhm=fwhm, crop=False,
ann_rad=ann_rad,
sampl_cen=sampl_cen,
sampl_rad=sampl_rad,
ann_width=ann_width,
unc_in=2.)
yshift = ceny - (y1 + y_i)
xshift = cenx - (x1 + x_i)
cum_y_shifts += yshift
cum_x_shifts += xshift
else:
for i in range(alignment_iter):
align_cube[0] = cube_collapse(align_cube[1:(n + 1)], mode=collapse,
**collapse_args)
if recenter_median:
# Recenter the median frame using a 2d fit
if fit_type == 'gaus' and negative:
crop_sz = int(fwhm)
elif fit_type == 'gaus':
crop_sz = int(3*fwhm)
else:
crop_sz = int(6*fwhm)
if not crop_sz % 2:
# size should be odd and small
if crop_sz > 7:
crop_sz -= 1
else:
crop_sz += 1
sub_image, y1, x1 = get_square(align_cube[0], size=crop_sz,
y=ceny, x=cenx, position=True)
if fit_type == 'gaus':
if negative:
sub_image = -sub_image + np.abs(np.min(-sub_image))
y_i, x_i = fit_2dgaussian(sub_image, crop=False,
threshold=False, sigfactor=1,
debug=debug, full_output=False)
elif fit_type == 'ann':
sampl_cen = 1./upsample_factor
if ann_rad_search:
sampl_rad = fwhm*ann_rad/10 # 1/10 of estimated radius
else:
sampl_rad = None
y_i, x_i, rad = _fit_2dannulus(sub_image, fwhm=fwhm,
crop=False, ann_rad=ann_rad,
sampl_cen=sampl_cen,
sampl_rad=sampl_rad,
ann_width=ann_width,
unc_in=2.)
yshift = ceny - (y1 + y_i)
xshift = cenx - (x1 + x_i)
align_cube[0] = frame_shift(align_cube[0, :, :], yshift, xshift,
imlib=imlib,
interpolation=interpolation,
border_mode=border_mode)
# center the cube with stretched values
if log:
cube_stret = np.log10((align_cube-np.min(align_cube)+1)**gam)
else:
cube_stret = align_cube.copy()
if mask is not None and crop:
mask_tmp = frame_crop(mask, subframesize)
else:
mask_tmp = mask
res = cube_recenter_dft_upsampling(cube_stret, subi_size=None,
upsample_factor=upsample_factor,
center_fr1=(ceny, cenx),
fwhm=fwhm, full_output=True,
verbose=False, plot=False,
mask=mask_tmp, imlib=imlib,
interpolation=interpolation,
nproc=nproc)
_, y_shift, x_shift = res
sqsum_shifts = np.sum(np.sqrt(y_shift ** 2 + x_shift ** 2))
print('Square sum of shift vecs: ' + str(sqsum_shifts))
for j in range(1, n_frames):
align_cube[j] = frame_shift(align_cube[j], y_shift[j],
x_shift[j], imlib=imlib,
interpolation=interpolation,
border_mode=border_mode)
cum_y_shifts += y_shift
cum_x_shifts += x_shift
cube_reg_sci = np.copy(cube_sci)
cum_y_shifts_sci = cum_y_shifts[1:(n + 1)]
cum_x_shifts_sci = cum_x_shifts[1:(n + 1)]
cube_reg_sci = cube_shift(cube_sci, cum_y_shifts_sci, cum_x_shifts_sci,
imlib=imlib, interpolation=interpolation,
border_mode=border_mode)
if plot:
plt.figure(figsize=vip_figsize)
plt.plot(cum_x_shifts_sci, 'o-', label='Shifts in x', alpha=0.5)
plt.plot(cum_y_shifts_sci, 'o-', label='Shifts in y', alpha=0.5)
plt.legend(loc='best')
plt.grid('on', alpha=0.2)
plt.ylabel('Pixels')
plt.xlabel('Frame number')
plt.figure(figsize=vip_figsize)
b = int(np.sqrt(n))
la = 'Histogram'
_ = plt.hist(cum_x_shifts_sci, bins=b, alpha=0.5, label=la+' shifts X')
_ = plt.hist(cum_y_shifts_sci, bins=b, alpha=0.5, label=la+' shifts Y')
plt.legend(loc='best')
plt.ylabel('Bin counts')
plt.xlabel('Pixels')
if ref_star:
cube_reg_ref = np.copy(cube_ref)
cum_y_shifts_ref = cum_y_shifts[(n + 1):]
cum_x_shifts_ref = cum_x_shifts[(n + 1):]
cube_reg_ref = cube_shift(cube_ref, cum_y_shifts_ref, cum_x_shifts_ref,
imlib=imlib, interpolation=interpolation,
border_mode=border_mode)
if ref_star:
if full_output:
return (cube_reg_sci, cube_reg_ref, cube_sci_lpf, cube_stret,
cum_x_shifts_sci, cum_y_shifts_sci, cum_x_shifts_ref,
cum_y_shifts_ref)
else:
return (cube_reg_sci, cube_reg_ref)
else:
if full_output:
return (cube_reg_sci, cube_sci_lpf, cube_stret,
cum_x_shifts_sci, cum_y_shifts_sci)
else:
return cube_reg_sci
def _fit_2dannulus(array, fwhm=4, crop=False, cent=None, cropsize=15,
ann_rad=0.5, ann_width=0.5, sampl_cen=0.1, sampl_rad=None,
unc_in=2.):
"""Find the center of a donut-shape signal (e.g. a coronagraphic PSF) with\
an annulus fit.
The function uses a grid of positions for the center and radius of the
annulus. The best fit is found by maximizing the mean flux measured in the
annular mask. This requires the image to be already roughly centered
(by an uncertainty provided by unc_in).
Parameters
----------
array : array_like
Image with a single donut-like source, already approximately at the
center of the frame.
fwhm : float
Gaussian PSF full width half maximum from fitting (in pixels).
ann_rad: float, opt
First estimate of the hole radius (in terms of fwhm). The grid search
on the radius of the optimal annulus goes from 0.5 to 2 times hole_rad.
Note: for the AGPM PSF of VLT/NACO, the optimal hole_rad ~ 0.5FWHM.
ann_width: float, opt
Width of the annulus in FWHM; default is 0.5 FWHM.
sampl_cen: float, opt
Precision of the grid sampling to find the center of the annulus (in
pixels)
sampl_rad: float, opt or None.
Precision of the grid sampling to find the optimal radius of the
annulus (in pixels). If set to None, there is no grid search for the
optimal radius of the annulus, the value given by hole_rad is used.
unc_in: float, opt
Initial uncertainty on the center location (with respect to center of
input subframe) in pixels; this will set the grid width.
Returns
-------
mean_y : float
Source centroid y position on the full image from fitting.
mean_x : float
Source centroid x position on the full image from fitting.
final_hole_rad : float
[if sampl_rad != None] Best fit radius of the annulus, in pixels.
[if sampl_rad = None] Input radius of the annulus, in pixels.
"""
if cent is None:
ceny, cenx = frame_center(array)
else:
cenx, ceny = cent
if crop:
x_sub_px = cenx % 1
y_sub_px = ceny % 1
imside = array.shape[0]
psf_subimage, suby, subx = get_square(array, min(cropsize, imside),
int(ceny), int(cenx),
position=True)
ceny, cenx = frame_center(psf_subimage)
ceny += y_sub_px
cenx += x_sub_px
ann_sz = ann_width*fwhm
grid_sh_x = np.arange(-unc_in, unc_in, sampl_cen)
grid_sh_y = np.arange(-unc_in, unc_in, sampl_cen)
if sampl_rad is None:
rads = [ann_rad*fwhm]
else:
rads = np.arange(0.5*ann_rad*fwhm, 2*ann_rad*fwhm, sampl_rad)
flux_ann = np.zeros([grid_sh_x.shape[0], grid_sh_y.shape[0]])
best_rad = np.zeros([grid_sh_x.shape[0], grid_sh_y.shape[0]])
for ii, xx in enumerate(grid_sh_x):
for jj, yy in enumerate(grid_sh_y):
tmp_tmp = frame_shift(array, yy, xx)
for rr, rad in enumerate(rads):
# mean flux in the annulus
tmp = frame_basic_stats(tmp_tmp, 'annulus', inner_radius=rad,
size=ann_sz, plot=False)
if tmp > flux_ann[ii, jj]:
flux_ann[ii, jj] = tmp
best_rad[ii, jj] = rad
i_max, j_max = np.unravel_index(np.argmax(flux_ann), flux_ann.shape)
mean_x = cenx - grid_sh_x[i_max]
mean_y = ceny - grid_sh_y[j_max]
if sampl_rad is None:
return mean_y, mean_x, ann_rad*fwhm
else:
final_hole_rad = best_rad[i_max, j_max]/fwhm
return mean_y, mean_x, final_hole_rad