#! /usr/bin/env python
"""
Module with cosmetics procedures. Contains the function for bad pixel fixing.
Also functions for cropping cubes.
"""
__author__ = "Carlos Alberto Gomez Gonzalez, V. Christiaens"
__all__ = [
"cube_crop_frames",
"cube_drop_frames",
"frame_crop",
"frame_pad",
"cube_correct_nan",
"approx_stellar_position",
]
from multiprocessing import cpu_count
import numpy as np
from astropy.stats import sigma_clipped_stats
from ..config.utils_conf import pool_map, iterable
from ..stats import sigma_filter
from ..var import frame_center, get_square
from multiprocessing import Process
import multiprocessing
from multiprocessing import set_start_method
shared_mem = True
try:
from multiprocessing import shared_memory
except ImportError:
print("Failed to import shared_memory from multiprocessing")
try:
print("Trying to import shared_memory directly(for python 3.7)")
import shared_memory
except ModuleNotFoundError:
shared_mem = False
print("WARNING: multiprocessing unavailable for bad pixel correction.")
print("Either pip install shared-memory38, or upgrade to python>=3.8")
[docs]
def cube_crop_frames(
array, size, xy=None, force=False, verbose=True, full_output=False
):
"""Crops frames in a cube (3d or 4d array).
Parameters
----------
array : numpy ndarray
Input 3d or 4d array.
size : int
Size of the desired central sub-array in each frame, in pixels.
xy : tuple of ints
X, Y coordinates of new frame center. If you are getting the
coordinates from ds9 subtract 1, python has 0-based indexing.
force : bool, optional
``size`` and the original size of the frames must be both even or odd.
With ``force`` set to True this condition can be avoided.
verbose : bool optional
If True message of completion is showed.
full_output: bool optional
If true, returns cenx and ceny in addition to array_view.
Returns
-------
array_out : numpy ndarray
Cube with cropped frames.
"""
if array.ndim == 3:
temp_fr = array[0]
elif array.ndim == 4:
temp_fr = array[0, 0]
else:
raise TypeError("`Array` is not a cube (3d or 4d numpy.ndarray)")
if xy is not None:
cenx, ceny = xy
else:
ceny, cenx = frame_center(temp_fr)
_, y0, x0 = get_square(
temp_fr, size, y=ceny, x=cenx, position=True, force=force, verbose=verbose
)
if not force:
if temp_fr.shape[0] % 2 == 0:
if size % 2 != 0:
size += 1
else:
if size % 2 == 0:
size += 1
y1 = int(y0 + size)
x1 = int(x0 + size)
if array.ndim == 3:
array_out = array[:, y0:y1, x0:x1]
elif array.ndim == 4:
array_out = array[:, :, y0:y1, x0:x1]
if verbose:
print("New shape: {}".format(array_out.shape))
if full_output:
return array_out, cenx, ceny
else:
return array_out
[docs]
def frame_crop(array, size, cenxy=None, force=False, verbose=True):
"""Crops a square frame (2d array). Uses the ``get_square`` function.
Parameters
----------
array : numpy ndarray
Input frame.
size : int, odd
Size of the subframe.
cenxy : tuple, optional
Coordinates of the center of the subframe.
force : bool, optional
Size and the size of the 2d array must be both even or odd. With
``force`` set to False, the requested size is flexible (i.e. +1 can be
applied to requested crop size for its parity to match the input size).
If ``force`` set to True, the requested crop size is enforced, even if
parities do not match (warnings are raised!).
verbose : bool optional
If True, a message of completion is shown.
Returns
-------
array_view : numpy ndarray
Sub array.
"""
if array.ndim != 2:
raise TypeError("`Array` is not a frame or 2d array")
if not cenxy:
ceny, cenx = frame_center(array)
else:
cenx, ceny = cenxy
array_view = get_square(array, size, ceny, cenx, force=force, verbose=verbose)
if verbose:
print("New shape: {}".format(array_view.shape))
return array_view
[docs]
def frame_pad(
array, fac, fillwith=0, loc=0, scale=1, keep_parity=True, full_output=False
):
"""Pads a frame (2d array) equally on each sides, where the final frame
size is set by a multiplicative factor applied to the original size. The
padding is set by fillwith, which can be either a fixed value or white
noise, characterized by (loc, scale).
Uses the ``get_square`` function.
Parameters
----------
array : 2D numpy ndarray
Input frame.
fac : float > 1 or tuple of 2 floats > 1.
Ratio of the size between padded and input frame. If a tuple,
corresponds to padding factors along the y and x dimensions resp.
fillwith : float or str, optional
If a float or np.nan: value used for padding.
If str, must be 'noise', which will inject white noise, using loc and
scale parameters.
loc : float, optional
If padding noise, mean of the white noise.
scale : float, optional
If padding noise, standard deviation of the white noise.
keep_parity : bool, optional
Whether keep parity of dimensions after padding.
full_output : bool, optional
Whether to also return the indices of input frame within the padded
frame (in addition to padded frame).
Returns
-------
array_out : numpy ndarray
Padded array.
ori_indices: tuple
[returned if full_output=True] Indices of the bottom left and top
right vertices of the original image within the padded array:
(y0, yN, x0, xN).
"""
if not array.ndim == 2:
raise TypeError("The input array must be 2d")
if np.isscalar(fac):
if fac < 1:
raise ValueError("fac should be larger than 1")
else:
if fac[0] < 1 or fac[-1] < 1:
raise ValueError("fac elements should be larger than 1")
y, x = array.shape
cy_ori, cx_ori = frame_center(array)
if np.isscalar(fac):
fac = [fac, fac]
new_y = int(round(y * fac[0]))
new_x = int(round(x * fac[1]))
if new_y % 2 != y % 2 and keep_parity:
new_y -= 1
if new_x % 2 != x % 2 and keep_parity:
new_x -= 1
if fillwith == "noise":
array_out = np.random.normal(loc=loc, scale=scale, size=(new_y, new_x))
else:
array_out = np.zeros([new_y, new_x], dtype=array.dtype)
array_out[:] = fillwith
cy, cx = frame_center(array_out)
y0 = int(cy - cy_ori)
y1 = int(cy + cy_ori)
if y1 - y0 < y:
y1 += 1
elif y1 - y0 > y:
y1 -= 1
x0 = int(cx - cx_ori)
x1 = int(cx + cx_ori)
if x1 - x0 < x:
x1 += 1
elif x1 - x0 > x:
x1 -= 1
array_out[y0:y1, x0:x1] = array.copy()
ori_indices = (y0, y1, x0, x1)
if full_output:
return array_out, ori_indices
else:
return array_out
[docs]
def cube_drop_frames(array, n, m, parallactic=None, verbose=True):
"""
Slice the cube so that all frames between ``n`` and ``m`` are kept.
Operates on axis 0 for 3D cubes, and on axis 1 for 4D cubes. This returns a
modified *copy* of ``array``. The indices ``n`` and ``m`` are included and
1-based.
Parameters
----------
array : numpy ndarray
Input cube, 3d or 4d.
n : int
1-based index of the first frame to be kept. Frames before this one are
dropped.
m : int
1-based index of the last frame to be kept. Frames after this one are
dropped.
parallactic : 1d ndarray, optional
parallactic angles vector. If provided, a modified copy of
``parallactic`` is returned additionally.
Returns
-------
array_view : numpy ndarray
Cube with new size.
parallactic : 1d numpy ndarray
[parallactic != None] New parallactic angles.
"""
if m > array.shape[0]:
raise TypeError("End index must be smaller than the # of frames")
if array.ndim == 4:
array_view = array[:, n - 1 : m, :, :].copy()
elif array.ndim == 3:
array_view = array[n - 1 : m, :, :].copy()
else:
raise ValueError("only 3D and 4D cubes are supported!")
if parallactic is not None:
if not parallactic.ndim == 1:
raise ValueError("Parallactic angles vector has wrong shape")
parallactic = parallactic[n - 1 : m]
if verbose:
print("Cube successfully sliced")
print("New cube shape: {}".format(array_view.shape))
if parallactic is not None:
msg = "New parallactic angles vector shape: {}"
print(msg.format(parallactic.shape))
if parallactic is not None:
return array_view, parallactic
else:
return array_view
def frame_remove_stripes(array):
"""Removes unwanted stripe artifact in frames with non-perfect bias or sky
subtraction. Encountered this case on an LBT data cube.
"""
lines = array[:50]
lines = np.vstack((lines, array[-50:]))
mean = lines.mean(axis=0)
for i in range(array.shape[1]):
array[:, i] = array[:, i] - mean[i]
return array
[docs]
def cube_correct_nan(
cube, neighbor_box=3, min_neighbors=3, verbose=False, half_res_y=False, nproc=1
):
"""Sigma filtering of nan pixels in a whole frame or cube. Tested on
SINFONI data.
Parameters
----------
cube : cube_like
Input 3d or 2d array.
neighbor_box : int, optional
The side of the square window around each pixel where the sigma and
median are calculated for the nan pixel correction.
min_neighbors : int, optional
Minimum number of good neighboring pixels to be able to correct the
bad/nan pixels.
verbose: bool, optional
Whether to print more information or not during processing
half_res_y: bool, optional
Whether the input data have every couple of 2 rows identical, i.e. there
is twice less angular resolution vertically than horizontally (e.g.
SINFONI data). The algorithm goes twice faster if this option is
rightfully set to True.
nproc: None or int, optional
Number of CPUs for multiprocessing (only used for 3D input). If None,
will automatically set it to half the available number of CPUs.
Returns
-------
obj_tmp : numpy ndarray
Output cube with corrected nan pixels in each frame
"""
obj_tmp = cube.copy()
ndims = obj_tmp.ndim
if ndims != 2 and ndims != 3:
raise TypeError("Input object is not two or three dimensional")
if neighbor_box < 3 or neighbor_box % 2 == 0:
raise ValueError("neighbor_box should be an odd value greater than 3")
max_neigh = sum(range(3, neighbor_box + 2, 2))
if min_neighbors > max_neigh:
min_neighbors = max_neigh
msg = "Warning! min_neighbors was reduced to {} to avoid bugs."
print(msg.format(max_neigh))
if ndims == 2:
obj_tmp, nnanpix = nan_corr_2d(
obj_tmp, neighbor_box, min_neighbors, half_res_y, verbose, True
)
if verbose:
print("{} NaN pixels were corrected".format(nnanpix))
elif ndims == 3:
if nproc is None:
nproc = cpu_count() // 2
n_z = obj_tmp.shape[0]
if nproc == 1 or not shared_mem:
for zz in range(n_z):
obj_tmp[zz], nnanpix = nan_corr_2d(
obj_tmp[zz], neighbor_box, min_neighbors, half_res_y, verbose, True
)
if verbose:
msg = "In channel {}, {} NaN pixels were corrected"
print(msg.format(zz, nnanpix))
else:
# dummy calling the function to prevent compiling of the function by individual workers
# This should save some time.
dummy_obj = nan_corr_2d(
obj_tmp[0],
neighbor_box,
min_neighbors,
half_res_y,
verbose,
full_output=False,
)
if verbose:
msg = "Correcting NaNs in multiprocessing using ADACS' approach..."
print(msg)
# creation of shared memory blob
shm = shared_memory.SharedMemory(create=True, size=obj_tmp.nbytes)
# creating an array object similar to obj_tmp and occupying shared memory.
obj_tmp_shared = np.ndarray(
obj_tmp.shape, dtype=obj_tmp.dtype, buffer=shm.buf
)
# nan_corr_2d_mp function passes the frame and other details to nan_corr_2d for processing.
def nan_corr_2d_mp(
i, obj, neighbor_box, min_neighbors, half_res_y, verbose
):
obj_tmp_shared[i] = nan_corr_2d(
obj,
neighbor_box,
min_neighbors,
half_res_y,
verbose,
full_output=False,
)
# _nan_corr_2d unfolds the arguments
global _nan_corr_2d_mp
def _nan_corr_2d_mp(args):
nan_corr_2d_mp(*args)
context = multiprocessing.get_context("fork")
# processes argumnet is not provided to pool as no. of processes is
# inferred from os.cpu_count()
pool = context.Pool(processes=nproc, maxtasksperchild=1)
# creating a list of arguments
args = []
for j in range(n_z):
args.append(
[j, obj_tmp[j], neighbor_box, min_neighbors, half_res_y, verbose]
)
try:
pool.map_async(_nan_corr_2d_mp, args, chunksize=1).get(
timeout=10_000_000
)
finally:
pool.close()
pool.join()
obj_tmp[:] = obj_tmp_shared[:]
shm.close()
shm.unlink()
# Original Multiprocessing code commented out below and ADACS approach is activated.
# res = pool_map(nproc, nan_corr_2d, iterable(obj_tmp), neighbor_box,
# min_neighbors, half_res_y, verbose, False)
# obj_tmp = np.array(res, dtype=object)
if verbose:
print("All nan pixels are corrected.")
return obj_tmp
def nan_corr_2d(
obj_tmp, neighbor_box, min_neighbors, half_res_y, verbose, full_output=True
):
n_x = obj_tmp.shape[1]
n_y = obj_tmp.shape[0]
if half_res_y:
if n_y % 2 != 0:
raise ValueError(
"The input frames do not have an even number "
"of rows. Hence, you should probably not be "
"using the option half_res_y = True."
)
n_y = int(n_y / 2)
frame = obj_tmp
obj_tmp = np.zeros([n_y, n_x])
for yy in range(n_y):
obj_tmp[yy] = frame[2 * yy]
# tuple with the 2D indices of each nan value of the frame
nan_indices = np.where(np.isnan(obj_tmp))
nan_map = np.zeros_like(obj_tmp)
nan_map[nan_indices] = 1
nnanpix = int(np.sum(nan_map))
# Correct nan with iterative sigma filter
obj_tmp = sigma_filter(
obj_tmp,
nan_map,
neighbor_box=neighbor_box,
min_neighbors=min_neighbors,
verbose=verbose,
half_res_y=half_res_y,
)
if half_res_y:
frame = obj_tmp
n_y = 2 * n_y
obj_tmp = np.zeros([n_y, n_x])
for yy in range(n_y):
obj_tmp[yy] = frame[int(yy / 2)]
if full_output:
return obj_tmp, nnanpix
else:
return obj_tmp
[docs]
def approx_stellar_position(cube, fwhm, return_test=False, verbose=False):
"""Finds the approximate coordinates of the star, assuming it is the
brightest signal in the images. The algorithm can handle images dominated
by noise, since outliers are corrected based on the position of ths star in
other channels.
Parameters
----------
obj_tmp : numpy ndarray
Input 3d cube
fwhm : float or array 1D
Input full width half maximum value of the PSF for each channel.
This will be used as the standard deviation for Gaussian kernel
of the Gaussian filtering.
If float, it is assumed the same for all channels.
return_test: bool, optional
Whether the test result vector (a bool vector with whether the star
centroid could be find in the corresponding channel) should be returned
as well, along with the approx stellar coordinates.
verbose: bool, optional
Chooses whether to print some additional information.
Returns
-------
star_approx_idx: 2d numpy array
Array of y and x approximate coordinates of the star in each channel of
the cube. Dimensions are nz x 2.
test_result: 1d numpy array
[return_test=True] It also returns the test result vector.
"""
from ..metrics import peak_coordinates
obj_tmp = cube.copy()
n_z = obj_tmp.shape[0]
if isinstance(fwhm, float) or isinstance(fwhm, int):
fwhm_scal = fwhm
fwhm = np.zeros((n_z))
fwhm[:] = fwhm_scal
# 1/ Write a 2-columns array with indices of all max pixel values in the
# cube
star_tmp_idx = np.zeros([n_z, 2])
star_approx_idx = np.zeros([n_z, 2])
test_result = np.ones(n_z)
for zz in range(n_z):
star_tmp_idx[zz] = peak_coordinates(obj_tmp[zz], fwhm[zz])
# 2/ Detect the outliers in each column
_, med_y, stddev_y = sigma_clipped_stats(star_tmp_idx[:, 0], sigma=2.5)
_, med_x, stddev_x = sigma_clipped_stats(star_tmp_idx[:, 1], sigma=2.5)
lim_inf_y = med_y - 3 * stddev_y
lim_sup_y = med_y + 3 * stddev_y
lim_inf_x = med_x - 3 * stddev_x
lim_sup_x = med_x + 3 * stddev_x
if verbose:
print("median y of star - 3sigma = ", lim_inf_y)
print("median y of star + 3sigma = ", lim_sup_y)
print("median x of star - 3sigma = ", lim_inf_x)
print("median x of star + 3sigma = ", lim_sup_x)
for zz in range(n_z):
if (
(star_tmp_idx[zz, 0] < lim_inf_y)
or (star_tmp_idx[zz, 0] > lim_sup_y)
or (star_tmp_idx[zz, 1] < lim_inf_x)
or (star_tmp_idx[zz, 1] > lim_sup_x)
):
test_result[zz] = 0
# 3/ Replace by the median of neighbouring good coordinates if need be
for zz in range(n_z):
if test_result[zz] == 0:
ii = 1
inf_neigh = max(0, zz - ii)
sup_neigh = min(n_z - 1, zz + ii)
while test_result[inf_neigh] == 0 and test_result[sup_neigh] == 0:
ii = ii + 1
inf_neigh = max(0, zz - ii)
sup_neigh = min(n_z - 1, zz + ii)
if test_result[inf_neigh] == 1 and test_result[sup_neigh] == 1:
star_approx_idx[zz] = np.floor(
(star_tmp_idx[sup_neigh] + star_tmp_idx[inf_neigh]) / 2
)
elif test_result[inf_neigh] == 1:
star_approx_idx[zz] = star_tmp_idx[inf_neigh]
else:
star_approx_idx[zz] = star_tmp_idx[sup_neigh]
else:
star_approx_idx[zz] = star_tmp_idx[zz]
if return_test:
return star_approx_idx, test_result.astype(bool)
else:
return star_approx_idx