Files
FOC_Reduction/package/lib/reduction.py

1859 lines
85 KiB
Python
Executable File

"""
Library function computing various steps of the reduction pipeline.
prototypes :
- bin_ndarray(ndarray, new_shape, operation) -> ndarray
Bins an ndarray to new_shape.
- crop_array(data_array, error_array, data_mask, step, null_val, inside, display, savename, plots_folder) -> crop_data_array, crop_error_array (, crop_mask), crop_headers
Homogeneously crop out null edges off a data array.
- deconvolve_array(data_array, headers, psf, FWHM, scale, shape, iterations, algo) -> deconvolved_data_array
Homogeneously deconvolve a data array using a chosen deconvolution algorithm.
- get_error(data_array, headers, error_array, data_mask, sub_type, display, savename, plots_folder, return_background) -> data_array, error_array, headers (, background)
Compute the uncertainties from the different error sources on each image of the input array.
- rebin_array(data_array, error_array, headers, pxsize, scale, operation, data_mask) -> rebinned_data, rebinned_error, rebinned_headers, Dxy (, data_mask)
Homegeneously rebin a data array given a target pixel size in scale units.
- align_data(data_array, error_array, upsample_factor, ref_data, ref_center, return_shifts) -> data_array, error_array (, shifts, errors)
Align data_array on ref_data by cross-correlation.
- smooth_data(data_array, error_array, data_mask, headers, FWHM, scale, smoothing) -> smoothed_array, smoothed_error
Smooth data by convoluting with a gaussian or by combining weighted images
- polarizer_avg(data_array, error_array, data_mask, headers, FWHM, scale, smoothing) -> polarizer_array, polarizer_cov
Average images in data_array on each used polarizer filter and compute correlated errors.
- compute_Stokes(data_array, error_array, data_mask, headers, FWHM, scale, smoothing) -> I_stokes, Q_stokes, U_stokes, Stokes_cov
Compute Stokes parameters I, Q and U and their respective correlated errors from data_array.
- compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, headers) -> P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P
Compute polarization degree (in %) and angle (in degree) and their respective errors.
- rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, headers, ang, SNRi_cut) -> I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, headers
Rotate I, Q, U given an angle in degrees using scipy functions.
- rotate_data(data_array, error_array, data_mask, headers, ang) -> data_array, error_array, data_mask, headers
Rotate data before reduction given an angle in degrees using scipy functions.
"""
import warnings
from copy import deepcopy
import matplotlib.pyplot as plt
import numpy as np
from astropy import log
from astropy.wcs import WCS
from matplotlib.colors import LogNorm
from matplotlib.patches import Rectangle
from scipy.ndimage import rotate as sc_rotate
from scipy.ndimage import shift as sc_shift
from scipy.signal import fftconvolve
from .background import bkg_fit, bkg_hist, bkg_mini
from .convex_hull import image_hull
from .cross_correlation import phase_cross_correlation
from .deconvolve import deconvolve_im, gaussian2d, gaussian_psf, zeropad
from .plots import plot_obs
from .utils import princ_angle
log.setLevel("ERROR")
# Useful tabulated values
# FOC instrument
globals()["trans2"] = {
"f140w": 0.21,
"f175w": 0.24,
"f220w": 0.39,
"f275w": 0.40,
"f320w": 0.89,
"f342w": 0.81,
"f430w": 0.74,
"f370lp": 0.83,
"f486n": 0.63,
"f501n": 0.68,
"f480lp": 0.82,
"clear2": 1.0,
}
globals()["trans3"] = {
"f120m": 0.10,
"f130m": 0.10,
"f140m": 0.08,
"f152m": 0.08,
"f165w": 0.28,
"f170m": 0.18,
"f195w": 0.42,
"f190m": 0.15,
"f210m": 0.18,
"f231m": 0.18,
"clear3": 1.0,
}
globals()["trans4"] = {
"f253m": 0.18,
"f278m": 0.26,
"f307m": 0.26,
"f130lp": 0.92,
"f346m": 0.58,
"f372m": 0.73,
"f410m": 0.58,
"f437m": 0.71,
"f470m": 0.79,
"f502m": 0.82,
"f550m": 0.77,
"clear4": 1.0,
}
globals()["pol_efficiency"] = {"pol0": 0.92, "pol60": 0.92, "pol120": 0.91}
# POL0 = 0deg, POL60 = 60deg, POL120=120deg
globals()["theta"] = np.array([180.0 * np.pi / 180.0, 60.0 * np.pi / 180.0, 120.0 * np.pi / 180.0])
# Uncertainties on the orientation of the polarizers' axes taken to be 3deg (see Nota et. al 1996, p36; Robinson & Thomson 1995)
globals()["sigma_theta"] = np.array([3.0 * np.pi / 180.0, 3.0 * np.pi / 180.0, 3.0 * np.pi / 180.0])
# Image shift between polarizers as measured by Hodge (1995)
globals()["pol_shift"] = {"pol0": np.array([0.0, 0.0]) * 1.0, "pol60": np.array([3.63, -0.68]) * 1.0, "pol120": np.array([0.65, 0.20]) * 1.0}
globals()["sigma_shift"] = {"pol0": [0.3, 0.3], "pol60": [0.3, 0.3], "pol120": [0.3, 0.3]}
def get_row_compressor(old_dimension, new_dimension, operation="sum"):
"""
Return the matrix that allows to compress an array from an old dimension of
rows to a new dimension of rows, can be done by summing the original
components or averaging them.
----------
Inputs:
old_dimension, new_dimension : int
Number of rows in the original and target matrices.
operation : str, optional
Set the way original components of the matrix are put together
between summing ('sum') and averaging ('average', 'avg', 'mean') them.
Defaults to 'sum'.
----------
Returns:
dim_compressor : numpy.ndarray
2D matrix allowing row compression by matrix multiplication to the left
of the original matrix.
"""
dim_compressor = np.zeros((new_dimension, old_dimension))
bin_size = float(old_dimension) / new_dimension
next_bin_break = bin_size
which_row, which_column = 0, 0
while which_row < dim_compressor.shape[0] and which_column < dim_compressor.shape[1]:
if round(next_bin_break - which_column, 10) >= 1:
dim_compressor[which_row, which_column] = 1
which_column += 1
elif next_bin_break == which_column:
which_row += 1
next_bin_break += bin_size
else:
partial_credit = next_bin_break - which_column
dim_compressor[which_row, which_column] = partial_credit
which_row += 1
dim_compressor[which_row, which_column] = 1 - partial_credit
which_column += 1
next_bin_break += bin_size
if operation.lower() in ["mean", "average", "avg"]:
dim_compressor /= bin_size
return dim_compressor
def get_column_compressor(old_dimension, new_dimension, operation="sum"):
"""
Return the matrix that allows to compress an array from an old dimension of
columns to a new dimension of columns, can be done by summing the original
components or averaging them.
----------
Inputs:
old_dimension, new_dimension : int
Number of columns in the original and target matrices.
operation : str, optional
Set the way original components of the matrix are put together
between summing ('sum') and averaging ('average', 'avg', 'mean') them.
Defaults to 'sum'.
----------
Returns:
dim_compressor : numpy.ndarray
2D matrix allowing columns compression by matrix multiplication to the
right of the original matrix.
"""
return get_row_compressor(old_dimension, new_dimension, operation).transpose()
def bin_ndarray(ndarray, new_shape, operation="sum"):
"""
Bins an ndarray in all axes based on the target shape, by summing or
averaging.
Number of output dimensions must match number of input dimensions.
Example
-------
>>> m = np.arange(0,100,1).reshape((10,10))
>>> n = bin_ndarray(m, new_shape=(5,5), operation='sum')
>>> print(n)
[[ 22 30 38 46 54]
[102 110 118 126 134]
[182 190 198 206 214]
[262 270 278 286 294]
[342 350 358 366 374]]
"""
if operation.lower() not in ["sum", "mean", "average", "avg"]:
raise ValueError("Operation not supported.")
if ndarray.ndim != len(new_shape):
raise ValueError("Shape mismatch: {} -> {}".format(ndarray.shape, new_shape))
if (np.array(ndarray.shape) % np.array(new_shape) == np.array([0.0, 0.0])).all():
compression_pairs = [(d, c // d) for d, c in zip(new_shape, ndarray.shape)]
flattened = [l for p in compression_pairs for l in p]
ndarray = ndarray.reshape(flattened)
for i in range(len(new_shape)):
if operation.lower() == "sum":
ndarray = ndarray.sum(-1 * (i + 1))
elif operation.lower() in ["mean", "average", "avg"]:
ndarray = ndarray.mean(-1 * (i + 1))
else:
row_comp = np.mat(get_row_compressor(ndarray.shape[0], new_shape[0], operation))
col_comp = np.mat(get_column_compressor(ndarray.shape[1], new_shape[1], operation))
ndarray = np.array(row_comp * np.mat(ndarray) * col_comp)
return ndarray
def crop_array(data_array, headers, error_array=None, data_mask=None, step=5, null_val=None, inside=False, display=False, savename=None, plots_folder=""):
"""
Homogeneously crop an array: all contained images will have the same shape.
'inside' parameter will decide how much should be cropped.
----------
Inputs:
data_array : numpy.ndarray
Array containing the observation data (2D float arrays) to
homogeneously crop.
headers : header list
Headers associated with the images in data_array.
error_array : numpy.ndarray, optional
Array of images (2D floats, aligned and of the same shape) containing
the error in each pixel of the observation images in data_array.
If None, will be initialized to zeros.
Defaults to None.
data_mask : numpy.ndarray, optional
2D boolean array delimiting the data to work on.
If None, will be initialized with a full true mask.
Defaults to None.
step : int, optional
For images with straight edges, not all lines and columns need to be
browsed in order to have a good convex hull. Step value determine
how many row/columns can be jumped. With step=2 every other line will
be browsed.
Defaults to 5.
null_val : float or array-like, optional
Pixel value determining the threshold for what is considered 'outside'
the image. All border pixels below this value will be taken out.
If None, will be put to 75% of the mean value of the associated error
array.
Defaults to None.
inside : boolean, optional
If True, the cropped image will be the maximum rectangle included
inside the image. If False, the cropped image will be the minimum
rectangle in which the whole image is included.
Defaults to False.
display : boolean, optional
If True, data_array will be displayed with a rectangle around the
sub-image selected for region of interest.
Defaults to False.
----------
Returns:
cropped_array : numpy.ndarray
Array containing the observationnal data homogeneously cropped.
cropped_error : numpy.ndarray
Array containing the error on the observationnal data homogeneously cropped.
headers : header list
Updated headers associated with the images in data_array.
"""
if error_array is None:
error_array = np.zeros(data_array.shape)
if null_val is None:
null_val = [1.00 * error.mean() for error in error_array]
elif type(null_val) is float:
null_val = [
null_val,
] * error_array.shape[0]
vertex = np.zeros((data_array.shape[0], 4), dtype=int)
for i, image in enumerate(data_array): # Get vertex of the rectangular convex hull of each image
vertex[i] = image_hull(image, step=step, null_val=null_val[i], inside=inside)
v_array = np.zeros(4, dtype=int)
if inside: # Get vertex of the maximum convex hull for all images
v_array[0] = np.max(vertex[:, 0]).astype(int)
v_array[1] = np.min(vertex[:, 1]).astype(int)
v_array[2] = np.max(vertex[:, 2]).astype(int)
v_array[3] = np.min(vertex[:, 3]).astype(int)
else: # Get vertex of the minimum convex hull for all images
v_array[0] = np.min(vertex[:, 0]).astype(int)
v_array[1] = np.max(vertex[:, 1]).astype(int)
v_array[2] = np.min(vertex[:, 2]).astype(int)
v_array[3] = np.max(vertex[:, 3]).astype(int)
new_shape = np.array([v_array[1] - v_array[0], v_array[3] - v_array[2]])
rectangle = [v_array[2], v_array[0], new_shape[1], new_shape[0], 0.0, "b"]
crop_headers = deepcopy(headers)
crop_array = np.zeros((data_array.shape[0], new_shape[0], new_shape[1]))
crop_error_array = np.zeros((data_array.shape[0], new_shape[0], new_shape[1]))
for i, image in enumerate(data_array):
# Put the image data in the cropped array
crop_array[i] = image[v_array[0] : v_array[1], v_array[2] : v_array[3]]
crop_error_array[i] = error_array[i][v_array[0] : v_array[1], v_array[2] : v_array[3]]
# Update CRPIX value in the associated header
curr_wcs = WCS(crop_headers[i]).celestial.deepcopy()
curr_wcs.wcs.crpix[:2] = curr_wcs.wcs.crpix[:2] - np.array([v_array[2], v_array[0]])
crop_headers[i].update(curr_wcs.to_header())
crop_headers[i]["naxis1"], crop_headers[i]["naxis2"] = crop_array[i].shape
if display:
plt.rcParams.update({"font.size": 15})
fig, ax = plt.subplots(figsize=(10, 10), layout="constrained")
convert_flux = headers[0]["photflam"]
data = deepcopy(data_array[0] * convert_flux)
data[data <= data[data > 0.0].min()] = data[data > 0.0].min()
crop = crop_array[0] * convert_flux
instr = headers[0]["instrume"]
rootname = headers[0]["rootname"]
exptime = headers[0]["exptime"]
filt = headers[0]["filtnam1"]
# plots
# im = ax.imshow(data, vmin=data.min(), vmax=data.max(), origin='lower', cmap='gray')
im = ax.imshow(data, norm=LogNorm(crop[crop > 0.0].mean() / 5.0, crop.max()), origin="lower", cmap="gray")
x, y, width, height, angle, color = rectangle
ax.add_patch(Rectangle((x, y), width, height, edgecolor=color, fill=False))
# position of centroid
ax.plot([data.shape[1] / 2, data.shape[1] / 2], [0, data.shape[0] - 1], "--", lw=1, color="grey", alpha=0.3)
ax.plot([0, data.shape[1] - 1], [data.shape[1] / 2, data.shape[1] / 2], "--", lw=1, color="grey", alpha=0.3)
ax.annotate(instr + ":" + rootname, color="white", fontsize=10, xy=(0.02, 0.95), xycoords="axes fraction")
ax.annotate(filt, color="white", fontsize=14, xy=(0.02, 0.02), xycoords="axes fraction")
ax.annotate(str(exptime) + " s", color="white", fontsize=10, xy=(0.80, 0.02), xycoords="axes fraction")
ax.set(title="Location of cropped image.", xlabel="pixel offset", ylabel="pixel offset")
# fig.subplots_adjust(hspace=0, wspace=0, right=0.85)
# cbar_ax = fig.add_axes([0.9, 0.12, 0.02, 0.75])
fig.colorbar(im, ax=ax, label=r"Flux [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
if savename is not None:
fig.savefig("/".join([plots_folder, savename + "_" + filt + "_crop_region.pdf"]), bbox_inches="tight", dpi=200)
plot_obs(
data_array,
headers,
vmin=convert_flux * data_array[data_array > 0.0].mean() / 5.0,
vmax=convert_flux * data_array[data_array > 0.0].max(),
rectangle=[
rectangle,
]
* len(headers),
savename=savename + "_crop_region",
plots_folder=plots_folder,
)
plt.show()
if data_mask is not None:
crop_mask = data_mask[v_array[0] : v_array[1], v_array[2] : v_array[3]]
return crop_array, crop_error_array, crop_mask, crop_headers
else:
return crop_array, crop_error_array, crop_headers
def deconvolve_array(data_array, headers, psf="gaussian", FWHM=1.0, scale="px", shape=None, iterations=20, algo="richardson"):
"""
Homogeneously deconvolve a data array using Richardson-Lucy iterative algorithm.
----------
Inputs:
data_array : numpy.ndarray
Array containing the observation data (2D float arrays) to
homogeneously deconvolve.
headers : header list
Headers associated with the images in data_array.
psf : str or numpy.ndarray, optionnal
String designing the type of desired Point Spread Function or array
of dimension 2 corresponding to the weights of a PSF.
Defaults to 'gaussian' type PSF.
FWHM : float, optional
Full Width at Half Maximum for desired PSF in 'scale units. Only used
for relevant values of 'psf' variable.
Defaults to 1.
scale : str, optional
Scale units for the FWHM of the PSF between 'pixel' and 'arcsec'.
Defaults to 'pixel'.
shape : tuple, optional
Shape of the kernel of the PSF. Must be of dimension 2. Only used for
relevant values of 'psf' variable.
Defaults to (9,9).
iterations : int, optional
Number of iterations for iterative deconvolution algorithms. Act as
as a regulation of the process.
Defaults to 20.
algo : str, optional
Name of the deconvolution algorithm that will be used. Implemented
algorithms are the following : 'Wiener', 'Van-Cittert',
'One Step Gradient', 'Conjugate Gradient' and 'Richardson-Lucy'.
Defaults to 'Richardson-Lucy'.
----------
Returns:
deconv_array : numpy.ndarray
Array containing the deconvolved data (2D float arrays) using given
point spread function.
"""
# If chosen FWHM scale is 'arcsec', compute FWHM in pixel scale
if scale.lower() in ["arcsec", "arcseconds"]:
pxsize = np.zeros((data_array.shape[0], 2))
for i, header in enumerate(headers):
# Get current pixel size
w = WCS(header).celestial.deepcopy()
pxsize[i] = np.round(w.wcs.cdelt / 3600.0, 15)
if (pxsize != pxsize[0]).any():
raise ValueError("Not all images in array have same pixel size")
FWHM /= pxsize[0].min()
# Define Point-Spread-Function kernel
if psf.lower() in ["gauss", "gaussian"]:
if shape is None:
shape = np.min(data_array[0].shape) - 2, np.min(data_array[0].shape) - 2
kernel = gaussian_psf(FWHM=FWHM, shape=shape)
elif isinstance(psf, np.ndarray) and (len(psf.shape) == 2):
kernel = psf
else:
raise ValueError("{} is not a valid value for 'psf'".format(psf))
# Deconvolve images in the array using given PSF
deconv_array = np.zeros(data_array.shape)
for i, image in enumerate(data_array):
deconv_array[i] = deconvolve_im(image, kernel, iterations=iterations, clip=True, filter_epsilon=None, algo=algo)
return deconv_array
def get_error(data_array, headers, error_array=None, data_mask=None, sub_type=None, subtract_error=0.5, display=False, savename=None, plots_folder="", return_background=False):
"""
Look for sub-image of shape sub_shape that have the smallest integrated
flux (no source assumption) and define the background on the image by the
standard deviation on this sub-image.
----------
Inputs:
data_array : numpy.ndarray
Array containing the data to study (2D float arrays).
headers : header list
Headers associated with the images in data_array.
error_array : numpy.ndarray, optional
Array of images (2D floats, aligned and of the same shape) containing
the error in each pixel of the observation images in data_array.
If None, will be initialized to zeros.
Defaults to None.
data_mask : numpy.ndarray, optional
2D boolean array delimiting the data to work on.
If None, will be initialized with a full true mask.
Defaults to None.
sub_type : str or int or tuple, optional
If 'auto', look for optimal binning and fit intensity histogram with au gaussian.
If str or None, statistic rule to be used for the number of bins in counts/s.
If int, number of bins for the counts/s histogram.
If tuple, shape of the sub-image to look for. Must be odd.
Defaults to None.
subtract_error : float or bool, optional
If float, factor to which the estimated background should be multiplied
If False the background is not subtracted.
Defaults to True (factor = 1.).
display : boolean, optional
If True, data_array will be displayed with a rectangle around the
sub-image selected for background computation.
Defaults to False.
savename : str, optional
Name of the figure the map should be saved to. If None, the map won't
be saved (only displayed). Only used if display is True.
Defaults to None.
plots_folder : str, optional
Relative (or absolute) filepath to the folder in wich the map will
be saved. Not used if savename is None.
Defaults to current folder.
return_background : boolean, optional
If True, the pixel background value for each image in data_array is
returned.
Defaults to False.
----------
Returns:
data_array : numpy.ndarray
Array containing the data to study minus the background.
error_array : numpy.ndarray
Array containing the background values associated to the images in
data_array.
headers : header list
Updated headers associated with the images in data_array.
background : numpy.ndarray
Array containing the pixel background value for each image in
data_array.
Only returned if return_background is True.
"""
# Crop out any null edges
if error_array is None:
error_array = np.zeros(data_array.shape)
data, error = deepcopy(data_array), deepcopy(error_array)
if data_mask is not None:
mask = deepcopy(data_mask)
else:
data_c, error_c, _ = crop_array(data, headers, error, step=5, null_val=0.0, inside=False)
mask_c = np.ones(data_c[0].shape, dtype=bool)
for i, (data_ci, error_ci) in enumerate(zip(data_c, error_c)):
data[i], error[i] = zeropad(data_ci, data[i].shape), zeropad(error_ci, error[i].shape)
mask = zeropad(mask_c, data[0].shape).astype(bool)
background = np.zeros((data.shape[0]))
# wavelength dependence of the polarizer filters
# estimated to less than 1%
err_wav = data * 0.01
# difference in PSFs through each polarizers
# estimated to less than 3%
err_psf = data * 0.03
# flatfielding uncertainties
# estimated to less than 3%
err_flat = data * 0.03
if sub_type is None:
n_data_array, c_error_bkg, headers, background = bkg_hist(
data, error, mask, headers, subtract_error=subtract_error, display=display, savename=savename, plots_folder=plots_folder
)
sub_type, subtract_error = "histogram ", str(int(subtract_error>0.))
elif isinstance(sub_type, str):
if sub_type.lower() in ["auto"]:
n_data_array, c_error_bkg, headers, background = bkg_fit(
data, error, mask, headers, subtract_error=subtract_error, display=display, savename=savename, plots_folder=plots_folder
)
sub_type, subtract_error = "histogram fit ", "mean+%.1fsigma"%subtract_error if subtract_error != 0. else 0.
else:
n_data_array, c_error_bkg, headers, background = bkg_hist(
data, error, mask, headers, sub_type=sub_type, subtract_error=subtract_error, display=display, savename=savename, plots_folder=plots_folder
)
sub_type, subtract_error = "histogram ", "mean+%.1fsigma"%subtract_error if subtract_error != 0. else 0.
elif isinstance(sub_type, tuple):
n_data_array, c_error_bkg, headers, background = bkg_mini(
data, error, mask, headers, sub_shape=sub_type, subtract_error=subtract_error, display=display, savename=savename, plots_folder=plots_folder
)
sub_type, subtract_error = "minimal flux ", "mean+%.1fsigma"%subtract_error if subtract_error != 0. else 0.
else:
print("Warning: Invalid subtype.")
for header in headers:
header["BKG_TYPE"] = (sub_type,"Bkg estimation method used during reduction")
header["BKG_SUB"] = (subtract_error,"Amount of bkg subtracted from images")
# Quadratically add uncertainties in the "correction factors" (see Kishimoto 1999)
n_error_array = np.sqrt(err_wav**2 + err_psf**2 + err_flat**2 + c_error_bkg**2)
if return_background:
return n_data_array, n_error_array, headers, background
else:
return n_data_array, n_error_array, headers
def rebin_array(data_array, error_array, headers, pxsize=2, scale="px", operation="sum", data_mask=None):
"""
Homogeneously rebin a data array to get a new pixel size equal to pxsize
where pxsize is given in arcsec.
----------
Inputs:
data_array, error_array : numpy.ndarray
Arrays containing the images (2D float arrays) and their associated
errors that will be rebinned.
headers : header list
List of headers corresponding to the images in data_array.
pxsize : float
Physical size of the pixel in arcseconds that should be obtain with
the rebinning.
scale : str, optional
Scale units for the FWHM between 'pixel' and 'arcsec'.
Defaults to 'pixel'.
operation : str, optional
Set the way original components of the matrix are put together
between summing ('sum') and averaging ('average', 'avg', 'mean') them.
Defaults to 'sum'.
data_mask : numpy.ndarray, optional
2D boolean array delimiting the data to work on.
If None, will be initialized with a full true mask.
Defaults to None.
----------
Returns:
rebinned_data, rebinned_error : numpy.ndarray
Rebinned arrays containing the images and associated errors.
rebinned_headers : header list
Updated headers corresponding to the images in rebinned_data.
Dxy : numpy.ndarray
Array containing the rebinning factor in each direction of the image.
data_mask : numpy.ndarray, optional
Updated 2D boolean array delimiting the data to work on.
Only returned if inputed data_mask is not None.
"""
# Check that all images are from the same instrument
ref_header = headers[0]
instr = ref_header["instrume"]
same_instr = np.array([instr == header["instrume"] for header in headers]).all()
if not same_instr:
raise ValueError(
"All images in data_array are not from the same\
instrument, cannot proceed."
)
if instr not in ["FOC"]:
raise ValueError(
"Cannot reduce images from {0:s} instrument\
(yet)".format(instr)
)
rebinned_data, rebinned_error, rebinned_headers = [], [], []
Dxy = np.array([1.0, 1.0])
# Routine for the FOC instrument
Dxy_arr = np.ones((data_array.shape[0], 2))
for i, (image, error, header) in enumerate(list(zip(data_array, error_array, headers))):
# Get current pixel size
w = WCS(header).celestial.deepcopy()
new_header = deepcopy(header)
# Compute binning ratio
if scale.lower() in ["px", "pixel"]:
Dxy_arr[i] = np.array( [ pxsize, ] * 2)
scale = "px"
elif scale.lower() in ["arcsec", "arcseconds"]:
Dxy_arr[i] = np.array(pxsize / np.abs(w.wcs.cdelt) / 3600.0)
scale = "arcsec"
elif scale.lower() in ["full", "integrate"]:
Dxy_arr[i] = image.shape
pxsize, scale = "", "full"
else:
raise ValueError("'{0:s}' invalid scale for binning.".format(scale))
new_shape = np.ceil(min(image.shape / Dxy_arr, key=lambda x: x[0] + x[1])).astype(int)
for i, (image, error, header) in enumerate(list(zip(data_array, error_array, headers))):
# Get current pixel size
w = WCS(header).celestial.deepcopy()
new_header = deepcopy(header)
Dxy = image.shape / new_shape
if (Dxy < 1.0).any():
raise ValueError("Requested pixel size is below resolution.")
# Rebin data
rebin_data = bin_ndarray(image, new_shape=new_shape, operation=operation)
rebinned_data.append(rebin_data)
# Propagate error
rms_image = np.sqrt(bin_ndarray(image**2, new_shape=new_shape, operation="average"))
# sum_image = bin_ndarray(image, new_shape=new_shape, operation="sum")
# mask = sum_image > 0.0
new_error = np.zeros(rms_image.shape)
if operation.lower() in ["mean", "average", "avg"]:
new_error = np.sqrt(bin_ndarray(error**2, new_shape=new_shape, operation="average"))
else:
new_error = np.sqrt(bin_ndarray(error**2, new_shape=new_shape, operation="sum"))
rebinned_error.append(np.sqrt(rms_image**2 + new_error**2))
# Update header
nw = w.deepcopy()
nw.wcs.cdelt *= Dxy
nw.wcs.crpix /= Dxy
nw.array_shape = new_shape
new_header["NAXIS1"], new_header["NAXIS2"] = nw.array_shape
for key, val in nw.to_header().items():
new_header.set(key, val)
new_header["SAMPLING"] = (str(pxsize)+scale, "Resampling performed during reduction")
rebinned_headers.append(new_header)
if data_mask is not None:
data_mask = bin_ndarray(data_mask, new_shape=new_shape, operation="average") > 0.80
rebinned_data = np.array(rebinned_data)
rebinned_error = np.array(rebinned_error)
if data_mask is None:
return rebinned_data, rebinned_error, rebinned_headers, Dxy
else:
return rebinned_data, rebinned_error, rebinned_headers, Dxy, data_mask
def align_data(data_array, headers, error_array=None, data_mask=None, background=None, upsample_factor=1.0, ref_data=None, ref_center=None, return_shifts=False):
"""
Align images in data_array using cross correlation, and rescale them to
wider images able to contain any rotation of the reference image.
All images in data_array must have the same shape.
----------
Inputs:
data_array : numpy.ndarray
Array containing the data to align (2D float arrays).
headers : header list
List of headers corresponding to the images in data_array.
error_array : numpy.ndarray, optional
Array of images (2D floats, aligned and of the same shape) containing
the error in each pixel of the observation images in data_array.
If None, will be initialized to zeros.
Defaults to None.
upsample_factor : float, optional
Oversampling factor for the cross-correlation, will allow sub-
pixel alignement as small as one over the factor of a pixel.
Defaults to one (no over-sampling).
ref_data : numpy.ndarray, optional
Reference image (2D float array) the data_array should be
aligned to. If "None", the ref_data will be the first image
of the data_array.
Defaults to None.
ref_center : numpy.ndarray, optional
Array containing the coordinates of the center of the reference
image or a string in 'max', 'flux', 'maxflux', 'max_flux'. If None,
will fallback to the center of the image.
Defaults to None.
return_shifts : boolean, optional
If False, calling the function will only return the array of
rescaled images. If True, will also return the shifts and
errors.
Defaults to True.
----------
Returns:
rescaled : numpy.ndarray
Array containing the aligned data from data_array, rescaled to wider
image with margins of value 0.
rescaled_error : numpy.ndarray
Array containing the errors on the aligned images in the rescaled array.
headers : header list
List of headers corresponding to the images in data_array.
data_mask : numpy.ndarray
2D boolean array delimiting the data to work on.
shifts : numpy.ndarray
Array containing the pixel shifts on the x and y directions from
the reference image.
Only returned if return_shifts is True.
errors : numpy.ndarray
Array containing the relative error computed on every shift value.
Only returned if return_shifts is True.
"""
if ref_data is None:
# Define the reference to be the first image of the inputed array
# if None have been specified
ref_data = data_array[0]
same = 1
for array in data_array:
# Check if all images have the same shape. If not, cross-correlation
# cannot be computed.
same *= array.shape == ref_data.shape
if not same:
raise ValueError(
"All images in data_array must have same shape as\
ref_data"
)
if (error_array is None) or (background is None):
_, error_array, headers, background = get_error(data_array, headers, return_background=True)
# Crop out any null edges
# (ref_data must be cropped as well)
full_array = np.concatenate((data_array, [ref_data]), axis=0)
full_headers = deepcopy(headers)
full_headers.append(headers[0])
err_array = np.concatenate((error_array, [np.zeros(ref_data.shape)]), axis=0)
if data_mask is None:
full_array, err_array, full_headers = crop_array(full_array, full_headers, err_array, step=5, inside=False, null_val=0.0)
else:
full_array, err_array, data_mask, full_headers = crop_array(full_array, full_headers, err_array, data_mask=data_mask, step=5, inside=False, null_val=0.0)
data_array, ref_data, headers = full_array[:-1], full_array[-1], full_headers[:-1]
error_array = err_array[:-1]
do_shift = True
if ref_center is None:
# Define the center of the reference image to be the center pixel
# if None have been specified
ref_center = (np.array(ref_data.shape) / 2).astype(int)
do_shift = False
elif ref_center.lower() in ["max", "flux", "maxflux", "max_flux"]:
# Define the center of the reference image to be the pixel of max flux.
ref_center = np.unravel_index(np.argmax(ref_data), ref_data.shape)
else:
# Default to image center.
ref_center = (np.array(ref_data.shape) / 2).astype(int)
# Create a rescaled null array that can contain any rotation of the
# original image (and shifted images)
shape = data_array.shape
res_shape = int(np.ceil(np.sqrt(2.0) * np.max(shape[1:])))
rescaled_image = np.zeros((shape[0], res_shape, res_shape))
rescaled_error = np.ones((shape[0], res_shape, res_shape))
rescaled_mask = np.zeros((shape[0], res_shape, res_shape), dtype=bool)
res_center = (np.array(rescaled_image.shape[1:]) / 2).astype(int)
res_shift = res_center - ref_center
res_mask = np.zeros((res_shape, res_shape), dtype=bool)
res_mask[res_shift[0] : res_shift[0] + shape[1], res_shift[1] : res_shift[1] + shape[2]] = True
if data_mask is not None:
res_mask = np.logical_and(res_mask,zeropad(data_mask, (res_shape, res_shape)).astype(bool))
shifts, errors = [], []
for i, image in enumerate(data_array):
# Initialize rescaled images to background values
rescaled_error[i] *= 0.01 * background[i]
# Get shifts and error by cross-correlation to ref_data
if do_shift:
shift, error, _ = phase_cross_correlation(ref_data / ref_data.max(), image / image.max(), upsample_factor=upsample_factor)
else:
shift = globals["pol_shift"][headers[i]["filtnam1"].lower()]
error = globals["sigma_shift"][headers[i]["filtnam1"].lower()]
# Rescale image to requested output
rescaled_image[i, res_shift[0] : res_shift[0] + shape[1], res_shift[1] : res_shift[1] + shape[2]] = deepcopy(image)
rescaled_error[i, res_shift[0] : res_shift[0] + shape[1], res_shift[1] : res_shift[1] + shape[2]] = deepcopy(error_array[i])
# Shift images to align
rescaled_image[i] = sc_shift(rescaled_image[i], shift, order=1, cval=0.0)
rescaled_error[i] = sc_shift(rescaled_error[i], shift, order=1, cval=background[i])
curr_mask = sc_shift(res_mask*10., shift, order=1, cval=0.0)
curr_mask[curr_mask < curr_mask.max()*2./3.] = 0.0
rescaled_mask[i] = curr_mask.astype(bool)
# mask_vertex = clean_ROI(curr_mask)
# rescaled_mask[i, mask_vertex[2] : mask_vertex[3], mask_vertex[0] : mask_vertex[1]] = True
rescaled_image[i][rescaled_image[i] < 0.0] = 0.0
rescaled_image[i][(1 - rescaled_mask[i]).astype(bool)] = 0.0
# Uncertainties from shifting
prec_shift = np.array([1.0, 1.0]) / upsample_factor
shifted_image = sc_shift(rescaled_image[i], prec_shift, cval=0.0)
error_shift = np.abs(rescaled_image[i] - shifted_image) / 2.0
# sum quadratically the errors
rescaled_error[i] = np.sqrt(rescaled_error[i] ** 2 + error_shift**2)
shifts.append(shift)
errors.append(error)
shifts = np.array(shifts)
errors = np.array(errors)
# Update headers CRPIX value
headers_wcs = [WCS(header).celestial.deepcopy() for header in headers]
new_crpix = np.array([wcs.wcs.crpix for wcs in headers_wcs]) + shifts[:, ::-1] + res_shift[::-1]
for i in range(len(headers_wcs)):
headers_wcs[i].wcs.crpix = new_crpix[0]
headers[i].update(headers_wcs[i].to_header())
data_mask = rescaled_mask.all(axis=0)
data_array, error_array, data_mask, headers = crop_array(rescaled_image, headers, rescaled_error, data_mask, null_val=0.01 * background)
if return_shifts:
return data_array, error_array, headers, data_mask, shifts, errors
else:
return data_array, error_array, headers, data_mask
def smooth_data(data_array, error_array, data_mask, headers, FWHM=1.5, scale="pixel", smoothing="weighted_gaussian"):
"""
Smooth a data_array using selected function.
----------
Inputs:
data_array : numpy.ndarray
Array containing the data to smooth (2D float arrays).
error_array : numpy.ndarray
Array of images (2D floats, aligned and of the same shape) containing
the error in each pixel of the observation images in data_array.
data_mask : numpy.ndarray
2D boolean array delimiting the data to work on.
headers : header list
List of headers corresponding to the images in data_array.
FWHM : float, optional
Full Width at Half Maximum for desired smoothing in 'scale' units.
Defaults to 1.
scale : str, optional
Scale units for the FWHM between 'pixel' and 'arcsec'.
Defaults to 'pixel'.
smoothing : str, optional
Smoothing algorithm to be used on the input data array.
-'combine','combining' use the N images combining algorithm with
weighted pixels (inverse square of errors).
-'gauss','gaussian' convolve any input image with a gaussian of
standard deviation stdev = FWHM/(2*sqrt(2*log(2))).
Defaults to 'gaussian'. Won't be used if FWHM is None.
----------
Returns:
smoothed_array : numpy.ndarray
Array containing the smoothed images.
error_array : numpy.ndarray
Array containing the error images corresponding to the images in
smoothed_array.
"""
# If chosen FWHM scale is 'arcsec', compute FWHM in pixel scale
if scale.lower() in ["arcsec", "arcseconds"]:
pxsize = np.zeros((data_array.shape[0], 2))
for i, header in enumerate(headers):
# Get current pixel size
w = WCS(header).celestial.deepcopy()
pxsize[i] = np.round(w.wcs.cdelt * 3600.0, 4)
if (pxsize != pxsize[0]).any():
raise ValueError("Not all images in array have same pixel size")
FWHM_size = str(FWHM)
FWHM_scale = "arcsec"
FWHM /= pxsize[0].min()
else:
FWHM_size = str(FWHM)
FWHM_scale = "px"
# Define gaussian stdev
stdev = FWHM / (2.0 * np.sqrt(2.0 * np.log(2)))
fmax = np.finfo(np.double).max
if smoothing.lower() in ["combine", "combining"]:
smoothing = "combine"
# Smooth using N images combination algorithm
# Weight array
weight = 1.0 / error_array**2
# Prepare pixel distance matrix
xx, yy = np.indices(data_array[0].shape)
# Initialize smoothed image and error arrays
smoothed = np.zeros(data_array[0].shape)
error = np.zeros(data_array[0].shape)
# Combination smoothing algorithm
for r in range(smoothed.shape[0]):
for c in range(smoothed.shape[1]):
# Compute distance from current pixel
dist_rc = np.where(data_mask, np.sqrt((r - xx) ** 2 + (c - yy) ** 2), fmax)
# Catch expected "OverflowWarning" as we overflow values that are not in the image
with warnings.catch_warnings(record=True) as w:
g_rc = np.array(
[
np.exp(-0.5 * (dist_rc / stdev) ** 2) / (2.0 * np.pi * stdev**2),
]
* data_array.shape[0]
)
# Apply weighted combination
smoothed[r, c] = np.where(data_mask[r, c], np.sum(data_array * weight * g_rc) / np.sum(weight * g_rc), data_array.mean(axis=0)[r, c])
error[r, c] = np.where(
data_mask[r, c],
np.sqrt(np.sum(weight * g_rc**2)) / np.sum(weight * g_rc),
(np.sqrt(np.sum(error_array**2, axis=0) / error_array.shape[0]))[r, c],
)
# Nan handling
error[np.logical_or(np.isnan(smoothed * error), 1 - data_mask)] = 0.0
smoothed[np.logical_or(np.isnan(smoothed * error), 1 - data_mask)] = 0.0
elif smoothing.lower() in ["weight_gauss", "weighted_gaussian", "gauss", "gaussian"]:
smoothing = "gaussian"
# Convolution with gaussian function
smoothed = np.zeros(data_array.shape)
error = np.zeros(error_array.shape)
for i, (image, image_error) in enumerate(zip(data_array, error_array)):
x, y = np.meshgrid(np.arange(-image.shape[1] / 2, image.shape[1] / 2), np.arange(-image.shape[0] / 2, image.shape[0] / 2))
weights = np.ones(image_error.shape)
if smoothing.lower()[:6] in ["weight"]:
smoothing = "weighted gaussian"
weights = 1.0 / image_error**2
weights[(1 - np.isfinite(weights)).astype(bool)] = 0.0
weights[(1 - data_mask).astype(bool)] = 0.0
weights /= weights.sum()
kernel = gaussian2d(x, y, stdev)
kernel /= kernel.sum()
smoothed[i] = np.where(data_mask, fftconvolve(image * weights, kernel, "same") / fftconvolve(weights, kernel, "same"), image)
error[i] = np.where(
data_mask, np.sqrt(fftconvolve(image_error**2 * weights**2, kernel**2, "same")) / fftconvolve(weights, kernel, "same"), image_error
)
# Nan handling
error[i][np.logical_or(np.isnan(smoothed[i] * error[i]), 1 - data_mask)] = 0.0
smoothed[i][np.logical_or(np.isnan(smoothed[i] * error[i]), 1 - data_mask)] = 0.0
else:
raise ValueError("{} is not a valid smoothing option".format(smoothing))
for header in headers:
header["SMOOTH"] = (" ".join([smoothing,FWHM_size,FWHM_scale]),"Smoothing method used during reduction")
return smoothed, error
def polarizer_avg(data_array, error_array, data_mask, headers, FWHM=1.5, scale="pixel", smoothing="weighted_gaussian"):
"""
Make the average image from a single polarizer for a given instrument.
-----------
Inputs:
data_array : numpy.ndarray
Array of images (2D floats, aligned and of the same shape) of a
single observation with multiple polarizers of an instrument.
error_array : numpy.ndarray
Array of images (2D floats, aligned and of the same shape) containing
the error in each pixel of the observation images in data_array.
data_mask : numpy.ndarray
2D boolean array delimiting the data to work on.
headers : header list
List of headers corresponding to the images in data_array.
FWHM : float, optional
Full Width at Half Maximum of the detector for smoothing of the
data on each polarizer filter in 'scale' units. If None, no smoothing
is done.
Defaults to None.
scale : str, optional
Scale units for the FWHM between 'pixel' and 'arcsec'.
Defaults to 'pixel'.
smoothing : str, optional
Smoothing algorithm to be used on the input data array.
-'combine','combining' use the N images combining algorithm with
weighted pixels (inverse square of errors).
-'gaussian' convolve any input image with a gaussian of standard
deviation stdev = FWHM/(2*sqrt(2*log(2))).
Defaults to 'gaussian'. Won't be used if FWHM is None.
----------
Returns:
polarizer_array : numpy.ndarray
Array of images averaged on each polarizer filter of the instrument
polarizer_cov : numpy.ndarray
Covariance matrix between the polarizer images in polarizer_array
"""
# Check that all images are from the same instrument
instr = headers[0]["instrume"]
same_instr = np.array([instr == header["instrume"] for header in headers]).all()
if not same_instr:
raise ValueError(
"All images in data_array are not from the same\
instrument, cannot proceed."
)
if instr not in ["FOC"]:
raise ValueError(
"Cannot reduce images from {0:s} instrument\
(yet)".format(instr)
)
# Routine for the FOC instrument
if instr == "FOC":
# Sort images by polarizer filter : can be 0deg, 60deg, 120deg for the FOC
is_pol0 = np.array([header["filtnam1"] == "POL0" for header in headers])
if (1 - is_pol0).all():
print(
"Warning : no image for POL0 of FOC found, averaged data\
will be NAN"
)
is_pol60 = np.array([header["filtnam1"] == "POL60" for header in headers])
if (1 - is_pol60).all():
print(
"Warning : no image for POL60 of FOC found, averaged data\
will be NAN"
)
is_pol120 = np.array([header["filtnam1"] == "POL120" for header in headers])
if (1 - is_pol120).all():
print(
"Warning : no image for POL120 of FOC found, averaged data\
will be NAN"
)
# Put each polarizer images in separate arrays
headers0 = [header for header in headers if header["filtnam1"] == "POL0"]
headers60 = [header for header in headers if header["filtnam1"] == "POL60"]
headers120 = [header for header in headers if header["filtnam1"] == "POL120"]
pol0_array = data_array[is_pol0]
pol60_array = data_array[is_pol60]
pol120_array = data_array[is_pol120]
err0_array = error_array[is_pol0]
err60_array = error_array[is_pol60]
err120_array = error_array[is_pol120]
# For a single observation, combination amount to a weighted gaussian
if np.max([is_pol0.sum(), is_pol60.sum(), is_pol120.sum()]) == 1 and smoothing.lower() in ["combine", "combining"]:
smoothing = "weighted_gaussian"
if (FWHM is not None) and (smoothing.lower() in ["combine", "combining"]):
# Smooth by combining each polarizer images
pol0, err0 = smooth_data(pol0_array, err0_array, data_mask, headers0, FWHM=FWHM, scale=scale, smoothing=smoothing)
pol60, err60 = smooth_data(pol60_array, err60_array, data_mask, headers60, FWHM=FWHM, scale=scale, smoothing=smoothing)
pol120, err120 = smooth_data(pol120_array, err120_array, data_mask, headers120, FWHM=FWHM, scale=scale, smoothing=smoothing)
else:
# Sum on each polarization filter.
pol0_t = np.sum([header["exptime"] for header in headers0])
pol60_t = np.sum([header["exptime"] for header in headers60])
pol120_t = np.sum([header["exptime"] for header in headers120])
for i in range(pol0_array.shape[0]):
pol0_array[i] *= headers0[i]["exptime"]
err0_array[i] *= headers0[i]["exptime"]
for i in range(pol60_array.shape[0]):
pol60_array[i] *= headers60[i]["exptime"]
err60_array[i] *= headers60[i]["exptime"]
for i in range(pol120_array.shape[0]):
pol120_array[i] *= headers120[i]["exptime"]
err120_array[i] *= headers120[i]["exptime"]
pol0 = pol0_array.sum(axis=0) / pol0_t
pol60 = pol60_array.sum(axis=0) / pol60_t
pol120 = pol120_array.sum(axis=0) / pol120_t
pol_array = np.array([pol0, pol60, pol120])
pol_headers = [headers0[0], headers60[0], headers120[0]]
# Propagate uncertainties quadratically
err0 = np.sqrt(np.sum(err0_array**2, axis=0)) / pol0_t
err60 = np.sqrt(np.sum(err60_array**2, axis=0)) / pol60_t
err120 = np.sqrt(np.sum(err120_array**2, axis=0)) / pol120_t
polerr_array = np.array([err0, err60, err120])
if (FWHM is not None) and (smoothing.lower() in ["gaussian", "gauss", "weighted_gaussian", "weight_gauss"]):
# Smooth by convoluting with a gaussian each polX image.
pol_array, polerr_array = smooth_data(pol_array, polerr_array, data_mask, pol_headers, FWHM=FWHM, scale=scale, smoothing=smoothing)
pol0, pol60, pol120 = pol_array
err0, err60, err120 = polerr_array
# Update headers
for header in headers:
if header["filtnam1"] == "POL0":
list_head = headers0
elif header["filtnam1"] == "POL60":
list_head = headers60
elif header["filtnam1"] == "POL120":
list_head = headers120
header["exptime"] = np.sum([head["exptime"] for head in list_head])
pol_headers = [headers0[0], headers60[0], headers120[0]]
# Get image shape
shape = pol0.shape
# Construct the polarizer array
polarizer_array = np.zeros((3, shape[0], shape[1]))
polarizer_array[0] = pol0
polarizer_array[1] = pol60
polarizer_array[2] = pol120
# Define the covariance matrix for the polarizer images
# We assume cross terms are null
polarizer_cov = np.zeros((3, 3, shape[0], shape[1]))
polarizer_cov[0, 0] = err0**2
polarizer_cov[1, 1] = err60**2
polarizer_cov[2, 2] = err120**2
return polarizer_array, polarizer_cov, pol_headers
def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale="pixel", smoothing="combine", transmitcorr=True, integrate=True):
"""
Compute the Stokes parameters I, Q and U for a given data_set
----------
Inputs:
data_array : numpy.ndarray
Array of images (2D floats, aligned and of the same shape) of a
single observation with multiple polarizers of an instrument.
error_array : numpy.ndarray
Array of images (2D floats, aligned and of the same shape) containing
the error in each pixel of the observation images in data_array.
data_mask : numpy.ndarray
2D boolean array delimiting the data to work on.
headers : header list
List of headers corresponding to the images in data_array.
FWHM : float, optional
Full Width at Half Maximum of the detector for smoothing of the
data on each polarizer filter in scale units. If None, no smoothing
is done.
Defaults to None.
scale : str, optional
Scale units for the FWHM between 'pixel' and 'arcsec'.
Defaults to 'pixel'.
smoothing : str, optional
Smoothing algorithm to be used on the input data array.
-'combine','combining' use the N images combining algorithm with
weighted pixels (inverse square of errors).
-'gaussian' convolve any input image with a gaussian of standard
deviation stdev = FWHM/(2*sqrt(2*log(2))).
-'gaussian_after' convolve output Stokes I/Q/U with a gaussian of
standard deviation stdev = FWHM/(2*sqrt(2*log(2))).
Defaults to 'combine'. Won't be used if FWHM is None.
transmitcorr : bool, optional
Weither the images should be transmittance corrected for each filter
along the line of sight. Latest calibrated data products (.c1f) does
not require such correction.
Defaults to True.
----------
Returns:
I_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
total intensity
Q_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
vertical/horizontal linear polarization intensity
U_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
+45/-45deg linear polarization intensity
Stokes_cov : numpy.ndarray
Covariance matrix of the Stokes parameters I, Q, U.
"""
# Check that all images are from the same instrument
instr = headers[0]["instrume"]
same_instr = np.array([instr == header["instrume"] for header in headers]).all()
if not same_instr:
raise ValueError(
"All images in data_array are not from the same\
instrument, cannot proceed."
)
if instr not in ["FOC"]:
raise ValueError(
"Cannot reduce images from {0:s} instrument\
(yet)".format(instr)
)
rotate = np.zeros(len(headers))
for i,head in enumerate(headers):
try:
rotate[i] = head['ROTATE']
except KeyError:
rotate[i] = 0.
if (np.unique(rotate) == rotate[0]).all():
theta = globals()["theta"] - rotate[0] * np.pi / 180.0
# Get image from each polarizer and covariance matrix
pol_array, pol_cov, pol_headers = polarizer_avg(data_array, error_array, data_mask, headers, FWHM=FWHM, scale=scale, smoothing=smoothing)
pol0, pol60, pol120 = pol_array
if (pol0 < 0.0).any() or (pol60 < 0.0).any() or (pol120 < 0.0).any():
print("WARNING : Negative value in polarizer array.")
# Stokes parameters
# transmittance corrected
transmit = np.ones((3,)) # will be filter dependant
filt2, filt3, filt4 = headers[0]["filtnam2"], headers[0]["filtnam3"], headers[0]["filtnam4"]
same_filt2 = np.array([filt2 == header["filtnam2"] for header in headers]).all()
same_filt3 = np.array([filt3 == header["filtnam3"] for header in headers]).all()
same_filt4 = np.array([filt4 == header["filtnam4"] for header in headers]).all()
if same_filt2 and same_filt3 and same_filt4:
transmit2, transmit3, transmit4 = globals()["trans2"][filt2.lower()], globals()["trans3"][filt3.lower()], globals()["trans4"][filt4.lower()]
else:
print(
"WARNING : All images in data_array are not from the same \
band filter, the limiting transmittance will be taken."
)
transmit2 = np.min([globals()["trans2"][header["filtnam2"].lower()] for header in headers])
transmit3 = np.min([globals()["trans3"][header["filtnam3"].lower()] for header in headers])
transmit4 = np.min([globals()["trans4"][header["filtnam4"].lower()] for header in headers])
if transmitcorr:
transmit *= transmit2 * transmit3 * transmit4
pol_eff = np.array([globals()["pol_efficiency"]["pol0"], globals()["pol_efficiency"]["pol60"], globals()["pol_efficiency"]["pol120"]])
# Calculating correction factor: allows all pol_filt to share same exptime and inverse sensitivity (taken to be the one from POL0)
corr = np.array([1.0 * h["photflam"] / h["exptime"] for h in pol_headers]) * pol_headers[0]["exptime"] / pol_headers[0]["photflam"]
pol_headers[1]['photflam'], pol_headers[1]['exptime'] = pol_headers[0]['photflam'], pol_headers[1]['exptime']
pol_headers[2]['photflam'], pol_headers[2]['exptime'] = pol_headers[0]['photflam'], pol_headers[2]['exptime']
# Orientation and error for each polarizer
# fmax = np.finfo(np.float64).max
pol_flux = np.array([corr[0] * pol0, corr[1] * pol60, corr[2] * pol120])
coeff_stokes = np.zeros((3, 3))
# Coefficients linking each polarizer flux to each Stokes parameter
for i in range(3):
coeff_stokes[0, i] = (
pol_eff[(i + 1) % 3]
* pol_eff[(i + 2) % 3]
* np.sin(-2.0 * theta[(i + 1) % 3] + 2.0 * theta[(i + 2) % 3])
* 2.0
/ transmit[i]
)
coeff_stokes[1, i] = (
(-pol_eff[(i + 1) % 3] * np.sin(2.0 * theta[(i + 1) % 3]) + pol_eff[(i + 2) % 3] * np.sin(2.0 * theta[(i + 2) % 3]))
* 2.0
/ transmit[i]
)
coeff_stokes[2, i] = (
(pol_eff[(i + 1) % 3] * np.cos(2.0 * theta[(i + 1) % 3]) - pol_eff[(i + 2) % 3] * np.cos(2.0 * theta[(i + 2) % 3]))
* 2.0
/ transmit[i]
)
# Normalization parameter for Stokes parameters computation
N = (coeff_stokes[0, :] * transmit / 2.0).sum()
coeff_stokes = coeff_stokes / N
I_stokes = np.zeros(pol_array[0].shape)
Q_stokes = np.zeros(pol_array[0].shape)
U_stokes = np.zeros(pol_array[0].shape)
Stokes_cov = np.zeros((3, 3, I_stokes.shape[0], I_stokes.shape[1]))
for i in range(I_stokes.shape[0]):
for j in range(I_stokes.shape[1]):
I_stokes[i, j], Q_stokes[i, j], U_stokes[i, j] = np.dot(coeff_stokes, pol_flux[:, i, j]).T
Stokes_cov[:, :, i, j] = np.dot(coeff_stokes, np.dot(pol_cov[:, :, i, j], coeff_stokes.T))
if (FWHM is not None) and (smoothing.lower() in ["weighted_gaussian_after", "weight_gauss_after", "gaussian_after", "gauss_after"]):
smoothing = smoothing.lower()[:-6]
Stokes_array = np.array([I_stokes, Q_stokes, U_stokes])
Stokes_error = np.array([np.sqrt(Stokes_cov[i, i]) for i in range(3)])
Stokes_headers = headers[0:3]
Stokes_array, Stokes_error = smooth_data(Stokes_array, Stokes_error, data_mask, headers=Stokes_headers, FWHM=FWHM, scale=scale, smoothing=smoothing)
I_stokes, Q_stokes, U_stokes = Stokes_array
Stokes_cov[0, 0], Stokes_cov[1, 1], Stokes_cov[2, 2] = deepcopy(Stokes_error**2)
sStokes_array = np.array([I_stokes * Q_stokes, I_stokes * U_stokes, Q_stokes * U_stokes])
sStokes_error = np.array([Stokes_cov[0, 1], Stokes_cov[0, 2], Stokes_cov[1, 2]])
uStokes_error = np.array([Stokes_cov[1, 0], Stokes_cov[2, 0], Stokes_cov[2, 1]])
sStokes_array, sStokes_error = smooth_data(
sStokes_array, sStokes_error, data_mask, headers=Stokes_headers, FWHM=FWHM, scale=scale, smoothing=smoothing
)
uStokes_array, uStokes_error = smooth_data(
sStokes_array, uStokes_error, data_mask, headers=Stokes_headers, FWHM=FWHM, scale=scale, smoothing=smoothing
)
Stokes_cov[0, 1], Stokes_cov[0, 2], Stokes_cov[1, 2] = deepcopy(sStokes_error)
Stokes_cov[1, 0], Stokes_cov[2, 0], Stokes_cov[2, 1] = deepcopy(uStokes_error)
mask = (Q_stokes**2 + U_stokes**2) > I_stokes**2
if mask.any():
print("WARNING : found {0:d} pixels for which I_pol > I_stokes".format(I_stokes[mask].size))
# Statistical error: Poisson noise is assumed
sigma_flux = np.array([np.sqrt(flux / head["exptime"]) for flux, head in zip(pol_flux, pol_headers)])
s_I2_stat = np.sum([coeff_stokes[0, i] ** 2 * sigma_flux[i] ** 2 for i in range(len(sigma_flux))], axis=0)
s_Q2_stat = np.sum([coeff_stokes[1, i] ** 2 * sigma_flux[i] ** 2 for i in range(len(sigma_flux))], axis=0)
s_U2_stat = np.sum([coeff_stokes[2, i] ** 2 * sigma_flux[i] ** 2 for i in range(len(sigma_flux))], axis=0)
pol_flux_corr = np.array([pf * 2.0 / t for (pf, t) in zip(pol_flux, transmit)])
coeff_stokes_corr = np.array([cs * t / 2.0 for (cs, t) in zip(coeff_stokes.T, transmit)]).T
# Compute the derivative of each Stokes parameter with respect to the polarizer orientation
dI_dtheta1 = (
2.0
* pol_eff[0]
/ N
* (
pol_eff[2] * np.cos(-2.0 * theta[2] + 2.0 * theta[0]) * (pol_flux_corr[1] - I_stokes)
- pol_eff[1] * np.cos(-2.0 * theta[0] + 2.0 * theta[1]) * (pol_flux_corr[2] - I_stokes)
+ coeff_stokes_corr[0, 0] * (np.sin(2.0 * theta[0]) * Q_stokes - np.cos(2 * theta[0]) * U_stokes)
)
)
dI_dtheta2 = (
2.0
* pol_eff[1]
/ N
* (
pol_eff[0] * np.cos(-2.0 * theta[0] + 2.0 * theta[1]) * (pol_flux_corr[2] - I_stokes)
- pol_eff[2] * np.cos(-2.0 * theta[1] + 2.0 * theta[2]) * (pol_flux_corr[0] - I_stokes)
+ coeff_stokes_corr[0, 1] * (np.sin(2.0 * theta[1]) * Q_stokes - np.cos(2 * theta[1]) * U_stokes)
)
)
dI_dtheta3 = (
2.0
* pol_eff[2]
/ N
* (
pol_eff[1] * np.cos(-2.0 * theta[1] + 2.0 * theta[2]) * (pol_flux_corr[0] - I_stokes)
- pol_eff[0] * np.cos(-2.0 * theta[2] + 2.0 * theta[0]) * (pol_flux_corr[1] - I_stokes)
+ coeff_stokes_corr[0, 2] * (np.sin(2.0 * theta[2]) * Q_stokes - np.cos(2 * theta[2]) * U_stokes)
)
)
dI_dtheta = np.array([dI_dtheta1, dI_dtheta2, dI_dtheta3])
dQ_dtheta1 = (
2.0
* pol_eff[0]
/ N
* (
np.cos(2.0 * theta[0]) * (pol_flux_corr[1] - pol_flux_corr[2])
- (
pol_eff[2] * np.cos(-2.0 * theta[2] + 2.0 * theta[0])
- pol_eff[1] * np.cos(-2.0 * theta[0] + 2.0 * theta[1])
)
* Q_stokes
+ coeff_stokes_corr[1, 0] * (np.sin(2.0 * theta[0]) * Q_stokes - np.cos(2 * theta[0]) * U_stokes)
)
)
dQ_dtheta2 = (
2.0
* pol_eff[1]
/ N
* (
np.cos(2.0 * theta[1]) * (pol_flux_corr[2] - pol_flux_corr[0])
- (
pol_eff[0] * np.cos(-2.0 * theta[0] + 2.0 * theta[1])
- pol_eff[2] * np.cos(-2.0 * theta[1] + 2.0 * theta[2])
)
* Q_stokes
+ coeff_stokes_corr[1, 1] * (np.sin(2.0 * theta[1]) * Q_stokes - np.cos(2 * theta[1]) * U_stokes)
)
)
dQ_dtheta3 = (
2.0
* pol_eff[2]
/ N
* (
np.cos(2.0 * theta[2]) * (pol_flux_corr[0] - pol_flux_corr[1])
- (
pol_eff[1] * np.cos(-2.0 * theta[1] + 2.0 * theta[2])
- pol_eff[0] * np.cos(-2.0 * theta[2] + 2.0 * theta[0])
)
* Q_stokes
+ coeff_stokes_corr[1, 2] * (np.sin(2.0 * theta[2]) * Q_stokes - np.cos(2 * theta[2]) * U_stokes)
)
)
dQ_dtheta = np.array([dQ_dtheta1, dQ_dtheta2, dQ_dtheta3])
dU_dtheta1 = (
2.0
* pol_eff[0]
/ N
* (
np.sin(2.0 * theta[0]) * (pol_flux_corr[1] - pol_flux_corr[2])
- (
pol_eff[2] * np.cos(-2.0 * theta[2] + 2.0 * theta[0])
- pol_eff[1] * np.cos(-2.0 * theta[0] + 2.0 * theta[1])
)
* U_stokes
+ coeff_stokes_corr[2, 0] * (np.sin(2.0 * theta[0]) * Q_stokes - np.cos(2 * theta[0]) * U_stokes)
)
)
dU_dtheta2 = (
2.0
* pol_eff[1]
/ N
* (
np.sin(2.0 * theta[1]) * (pol_flux_corr[2] - pol_flux_corr[0])
- (
pol_eff[0] * np.cos(-2.0 * theta[0] + 2.0 * theta[1])
- pol_eff[2] * np.cos(-2.0 * theta[1] + 2.0 * theta[2])
)
* U_stokes
+ coeff_stokes_corr[2, 1] * (np.sin(2.0 * theta[1]) * Q_stokes - np.cos(2 * theta[1]) * U_stokes)
)
)
dU_dtheta3 = (
2.0
* pol_eff[2]
/ N
* (
np.sin(2.0 * theta[2]) * (pol_flux_corr[0] - pol_flux_corr[1])
- (
pol_eff[1] * np.cos(-2.0 * theta[1] + 2.0 * theta[2])
- pol_eff[0] * np.cos(-2.0 * theta[2] + 2.0 * theta[0])
)
* U_stokes
+ coeff_stokes_corr[2, 2] * (np.sin(2.0 * theta[2]) * Q_stokes - np.cos(2 * theta[2]) * U_stokes)
)
)
dU_dtheta = np.array([dU_dtheta1, dU_dtheta2, dU_dtheta3])
# Compute the uncertainty associated with the polarizers' orientation (see Kishimoto 1999)
s_I2_axis = np.sum([dI_dtheta[i] ** 2 * globals()["sigma_theta"][i] ** 2 for i in range(len(globals()["sigma_theta"]))], axis=0)
s_Q2_axis = np.sum([dQ_dtheta[i] ** 2 * globals()["sigma_theta"][i] ** 2 for i in range(len(globals()["sigma_theta"]))], axis=0)
s_U2_axis = np.sum([dU_dtheta[i] ** 2 * globals()["sigma_theta"][i] ** 2 for i in range(len(globals()["sigma_theta"]))], axis=0)
# np.savetxt("output/sI_dir.txt", np.sqrt(s_I2_axis))
# np.savetxt("output/sQ_dir.txt", np.sqrt(s_Q2_axis))
# np.savetxt("output/sU_dir.txt", np.sqrt(s_U2_axis))
# Add quadratically the uncertainty to the Stokes covariance matrix
Stokes_cov[0, 0] += s_I2_axis + s_I2_stat
Stokes_cov[1, 1] += s_Q2_axis + s_Q2_stat
Stokes_cov[2, 2] += s_U2_axis + s_U2_stat
# Save values to single header
header_stokes = pol_headers[0]
else:
all_I_stokes = np.zeros((np.unique(rotate).size, data_array.shape[1], data_array.shape[2]))
all_Q_stokes = np.zeros((np.unique(rotate).size, data_array.shape[1], data_array.shape[2]))
all_U_stokes = np.zeros((np.unique(rotate).size, data_array.shape[1], data_array.shape[2]))
all_Stokes_cov = np.zeros((np.unique(rotate).size, 3, 3, data_array.shape[1], data_array.shape[2]))
all_header_stokes = [{},]*np.unique(rotate).size
for i,rot in enumerate(np.unique(rotate)):
rot_mask = (rotate == rot)
all_I_stokes[i], all_Q_stokes[i], all_U_stokes[i], all_Stokes_cov[i], all_header_stokes[i] = compute_Stokes(data_array[rot_mask], error_array[rot_mask], data_mask, [headers[i] for i in np.arange(len(headers))[rot_mask]], FWHM=FWHM, scale=scale, smoothing=smoothing, transmitcorr=transmitcorr, integrate=False)
all_exp = np.array([float(h['exptime']) for h in all_header_stokes])
I_stokes = np.sum([exp*I for exp, I in zip(all_exp, all_I_stokes)],axis=0) / all_exp.sum()
Q_stokes = np.sum([exp*Q for exp, Q in zip(all_exp, all_Q_stokes)],axis=0) / all_exp.sum()
U_stokes = np.sum([exp*U for exp, U in zip(all_exp, all_U_stokes)],axis=0) / all_exp.sum()
Stokes_cov = np.zeros((3, 3, I_stokes.shape[0], I_stokes.shape[1]))
for i in range(3):
Stokes_cov[i,i] = np.sum([exp**2*cov for exp, cov in zip(all_exp, all_Stokes_cov[:,i,i])], axis=0) / all_exp.sum()**2
for j in [x for x in range(3) if x!=i]:
Stokes_cov[i,j] = np.sqrt(np.sum([exp**2*cov**2 for exp, cov in zip(all_exp, all_Stokes_cov[:,i,j])], axis=0) / all_exp.sum()**2)
Stokes_cov[j,i] = np.sqrt(np.sum([exp**2*cov**2 for exp, cov in zip(all_exp, all_Stokes_cov[:,j,i])], axis=0) / all_exp.sum()**2)
# Save values to single header
header_stokes = all_header_stokes[0]
header_stokes['exptime'] = all_exp.sum()
# Nan handling :
fmax = np.finfo(np.float64).max
I_stokes[np.isnan(I_stokes)] = 0.0
Q_stokes[I_stokes == 0.0] = 0.0
U_stokes[I_stokes == 0.0] = 0.0
Q_stokes[np.isnan(Q_stokes)] = 0.0
U_stokes[np.isnan(U_stokes)] = 0.0
Stokes_cov[np.isnan(Stokes_cov)] = fmax
if integrate:
# Compute integrated values for P, PA before any rotation
mask = deepcopy(data_mask).astype(bool)
I_diluted = I_stokes[mask].sum()
Q_diluted = Q_stokes[mask].sum()
U_diluted = U_stokes[mask].sum()
I_diluted_err = np.sqrt(np.sum(Stokes_cov[0, 0][mask]))
Q_diluted_err = np.sqrt(np.sum(Stokes_cov[1, 1][mask]))
U_diluted_err = np.sqrt(np.sum(Stokes_cov[2, 2][mask]))
IQ_diluted_err = np.sqrt(np.sum(Stokes_cov[0, 1][mask] ** 2))
IU_diluted_err = np.sqrt(np.sum(Stokes_cov[0, 2][mask] ** 2))
QU_diluted_err = np.sqrt(np.sum(Stokes_cov[1, 2][mask] ** 2))
P_diluted = np.sqrt(Q_diluted**2 + U_diluted**2) / I_diluted
P_diluted_err = np.sqrt(
(Q_diluted**2 * Q_diluted_err**2 + U_diluted**2 * U_diluted_err**2 + 2.0 * Q_diluted * U_diluted * QU_diluted_err) / (Q_diluted**2 + U_diluted**2)
+ ((Q_diluted / I_diluted) ** 2 + (U_diluted / I_diluted) ** 2) * I_diluted_err**2
- 2.0 * (Q_diluted / I_diluted) * IQ_diluted_err
- 2.0 * (U_diluted / I_diluted) * IU_diluted_err
)
PA_diluted = princ_angle((90.0 / np.pi) * np.arctan2(U_diluted, Q_diluted))
PA_diluted_err = (90.0 / (np.pi * (Q_diluted**2 + U_diluted**2))) * np.sqrt(
U_diluted**2 * Q_diluted_err**2 + Q_diluted**2 * U_diluted_err**2 - 2.0 * Q_diluted * U_diluted * QU_diluted_err
)
header_stokes["P_int"] = (P_diluted, "Integrated polarization degree")
header_stokes["sP_int"] = (np.ceil(P_diluted_err * 1000.0) / 1000.0, "Integrated polarization degree error")
header_stokes["PA_int"] = (PA_diluted, "Integrated polarization angle")
header_stokes["sPA_int"] = (np.ceil(PA_diluted_err * 10.0) / 10.0, "Integrated polarization angle error")
return I_stokes, Q_stokes, U_stokes, Stokes_cov, header_stokes
def compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, header_stokes):
"""
Compute the polarization degree (in %) and angle (in deg) and their
respective errors from given Stokes parameters.
----------
Inputs:
I_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
total intensity
Q_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
vertical/horizontal linear polarization intensity
U_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
+45/-45deg linear polarization intensity
Stokes_cov : numpy.ndarray
Covariance matrix of the Stokes parameters I, Q, U.
header_stokes : astropy.fits.header.Header
Header file associated with the Stokes fluxes.
----------
Returns:
P : numpy.ndarray
Image (2D floats) containing the polarization degree (in %).
debiased_P : numpy.ndarray
Image (2D floats) containing the debiased polarization degree (in %).
s_P : numpy.ndarray
Image (2D floats) containing the error on the polarization degree.
s_P_P : numpy.ndarray
Image (2D floats) containing the Poisson noise error on the
polarization degree.
PA : numpy.ndarray
Image (2D floats) containing the polarization angle.
s_PA : numpy.ndarray
Image (2D floats) containing the error on the polarization angle.
s_PA_P : numpy.ndarray
Image (2D floats) containing the Poisson noise error on the
polarization angle.
"""
# Polarization degree and angle computation
mask = I_stokes > 0.0
I_pol = np.zeros(I_stokes.shape)
I_pol[mask] = np.sqrt(Q_stokes[mask] ** 2 + U_stokes[mask] ** 2)
P = np.zeros(I_stokes.shape)
P[mask] = I_pol[mask] / I_stokes[mask]
PA = np.zeros(I_stokes.shape)
PA[mask] = (90.0 / np.pi) * np.arctan2(U_stokes[mask], Q_stokes[mask])
if (P > 1).any():
print("WARNING : found {0:d} pixels for which P > 1".format(P[P > 1.0].size))
# Associated errors
fmax = np.finfo(np.float64).max
s_P = np.ones(I_stokes.shape) * fmax
s_PA = np.ones(I_stokes.shape) * fmax
# Propagate previously computed errors
s_P[mask] = (1 / I_stokes[mask]) * np.sqrt(
(
Q_stokes[mask] ** 2 * Stokes_cov[1, 1][mask]
+ U_stokes[mask] ** 2 * Stokes_cov[2, 2][mask]
+ 2.0 * Q_stokes[mask] * U_stokes[mask] * Stokes_cov[1, 2][mask]
)
/ (Q_stokes[mask] ** 2 + U_stokes[mask] ** 2)
+ ((Q_stokes[mask] / I_stokes[mask]) ** 2 + (U_stokes[mask] / I_stokes[mask]) ** 2) * Stokes_cov[0, 0][mask]
- 2.0 * (Q_stokes[mask] / I_stokes[mask]) * Stokes_cov[0, 1][mask]
- 2.0 * (U_stokes[mask] / I_stokes[mask]) * Stokes_cov[0, 2][mask]
)
s_PA[mask] = (90.0 / (np.pi * (Q_stokes[mask] ** 2 + U_stokes[mask] ** 2))) * np.sqrt(
U_stokes[mask] ** 2 * Stokes_cov[1, 1][mask]
+ Q_stokes[mask] ** 2 * Stokes_cov[2, 2][mask]
- 2.0 * Q_stokes[mask] * U_stokes[mask] * Stokes_cov[1, 2][mask]
)
s_P[np.isnan(s_P)] = fmax
s_PA[np.isnan(s_PA)] = fmax
# Catch expected "OverflowWarning" as wrong pixel have an overflowing error
with warnings.catch_warnings(record=True) as _:
mask2 = P**2 >= s_P**2
debiased_P = np.zeros(I_stokes.shape)
debiased_P[mask2] = np.sqrt(P[mask2] ** 2 - s_P[mask2] ** 2)
if (debiased_P > 1.0).any():
print("WARNING : found {0:d} pixels for which debiased_P > 100%".format(debiased_P[debiased_P > 1.0].size))
# Compute the total exposure time so that
# I_stokes*exp_tot = N_tot the total number of events
exp_tot = header_stokes["exptime"]
# print("Total exposure time : {} sec".format(exp_tot))
N_obs = I_stokes * exp_tot
# Errors on P, PA supposing Poisson noise
s_P_P = np.ones(I_stokes.shape) * fmax
s_P_P[mask] = np.sqrt(2.0) / np.sqrt(N_obs[mask]) * 100.0
s_PA_P = np.ones(I_stokes.shape) * fmax
s_PA_P[mask2] = s_P_P[mask2] / (2.0 * P[mask2]) * 180.0 / np.pi
# Nan handling :
P[np.isnan(P)] = 0.0
s_P[np.isnan(s_P)] = fmax
s_PA[np.isnan(s_PA)] = fmax
debiased_P[np.isnan(debiased_P)] = 0.0
s_P_P[np.isnan(s_P_P)] = fmax
s_PA_P[np.isnan(s_PA_P)] = fmax
return P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P
def rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, header_stokes, SNRi_cut=None):
"""
Use scipy.ndimage.rotate to rotate I_stokes to an angle, and a rotation
matrix to rotate Q, U of a given angle in degrees and update header
orientation keyword.
----------
Inputs:
I_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
total intensity
Q_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
vertical/horizontal linear polarization intensity
U_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
+45/-45deg linear polarization intensity
Stokes_cov : numpy.ndarray
Covariance matrix of the Stokes parameters I, Q, U.
data_mask : numpy.ndarray
2D boolean array delimiting the data to work on.
header_stokes : astropy.fits.header.Header
Header file associated with the Stokes fluxes.
SNRi_cut : float, optional
Cut that should be applied to the signal-to-noise ratio on I.
Any SNR < SNRi_cut won't be displayed. If None, cut won't be applied.
Defaults to None.
----------
Returns:
new_I_stokes : numpy.ndarray
Rotated mage (2D floats) containing the rotated Stokes parameters
accounting for total intensity
new_Q_stokes : numpy.ndarray
Rotated mage (2D floats) containing the rotated Stokes parameters
accounting for vertical/horizontal linear polarization intensity
new_U_stokes : numpy.ndarray
Rotated image (2D floats) containing the rotated Stokes parameters
accounting for +45/-45deg linear polarization intensity.
new_Stokes_cov : numpy.ndarray
Updated covariance matrix of the Stokes parameters I, Q, U.
new_header_stokes : astropy.fits.header.Header
Updated Header file associated with the Stokes fluxes accounting
for the new orientation angle.
new_data_mask : numpy.ndarray
Updated 2D boolean array delimiting the data to work on.
"""
# Apply cuts
if SNRi_cut is not None:
SNRi = I_stokes / np.sqrt(Stokes_cov[0, 0])
mask = SNRi < SNRi_cut
eps = 1e-5
for i in range(I_stokes.shape[0]):
for j in range(I_stokes.shape[1]):
if mask[i, j]:
I_stokes[i, j] = eps * np.sqrt(Stokes_cov[0, 0][i, j])
Q_stokes[i, j] = eps * np.sqrt(Stokes_cov[1, 1][i, j])
U_stokes[i, j] = eps * np.sqrt(Stokes_cov[2, 2][i, j])
# Rotate I_stokes, Q_stokes, U_stokes using rotation matrix
# ang = np.zeros((len(headers),))
# for i, head in enumerate(headers):
# pc = WCS(head).celestial.wcs.pc[0,0]
# ang[i] = -np.arccos(WCS(head).celestial.wcs.pc[0,0]) * 180.0 / np.pi if np.abs(pc) < 1. else 0.
pc = WCS(header_stokes).celestial.wcs.pc[0,0]
ang = -np.arccos(WCS(header_stokes).celestial.wcs.pc[0,0]) * 180.0 / np.pi if np.abs(pc) < 1. else 0.
alpha = np.pi / 180.0 * ang
mrot = np.array([[1.0, 0.0, 0.0], [0.0, np.cos(2.0 * alpha), np.sin(2.0 * alpha)], [0, -np.sin(2.0 * alpha), np.cos(2.0 * alpha)]])
old_center = np.array(I_stokes.shape) / 2
shape = np.fix(np.array(I_stokes.shape) * np.sqrt(2.5)).astype(int)
new_center = np.array(shape) / 2
I_stokes = zeropad(I_stokes, shape)
Q_stokes = zeropad(Q_stokes, shape)
U_stokes = zeropad(U_stokes, shape)
data_mask = zeropad(data_mask, shape)
Stokes_cov = zeropad(Stokes_cov, [*Stokes_cov.shape[:-2], *shape])
new_I_stokes = np.zeros(shape)
new_Q_stokes = np.zeros(shape)
new_U_stokes = np.zeros(shape)
new_Stokes_cov = np.zeros((*Stokes_cov.shape[:-2], *shape))
# Rotate original images using scipy.ndimage.rotate
new_I_stokes = sc_rotate(I_stokes, ang, order=1, reshape=False, cval=0.0)
new_Q_stokes = sc_rotate(Q_stokes, ang, order=1, reshape=False, cval=0.0)
new_U_stokes = sc_rotate(U_stokes, ang, order=1, reshape=False, cval=0.0)
new_data_mask = sc_rotate(data_mask.astype(float) * 10.0, ang, order=1, reshape=False, cval=0.0)
new_data_mask[new_data_mask < 2] = 0.0
new_data_mask = new_data_mask.astype(bool)
for i in range(3):
for j in range(3):
new_Stokes_cov[i, j] = sc_rotate(Stokes_cov[i, j], ang, order=1, reshape=False, cval=0.0)
new_Stokes_cov[i, i] = np.abs(new_Stokes_cov[i, i])
for i in range(shape[0]):
for j in range(shape[1]):
new_I_stokes[i, j], new_Q_stokes[i, j], new_U_stokes[i, j] = np.dot(mrot, np.array([new_I_stokes[i, j], new_Q_stokes[i, j], new_U_stokes[i, j]])).T
new_Stokes_cov[:, :, i, j] = np.dot(mrot, np.dot(new_Stokes_cov[:, :, i, j], mrot.T))
# Update headers to new angle
mrot = np.array([[np.cos(-alpha), -np.sin(-alpha)], [np.sin(-alpha), np.cos(-alpha)]])
new_header_stokes = deepcopy(header_stokes)
new_header_stokes["orientat"] = header_stokes["orientat"] + ang
new_wcs = WCS(header_stokes).celestial.deepcopy()
new_wcs.wcs.pc = np.dot(mrot, new_wcs.wcs.pc)
new_wcs.wcs.crpix = np.dot(mrot, new_wcs.wcs.crpix - old_center[::-1]) + new_center[::-1]
new_wcs.wcs.set()
for key, val in new_wcs.to_header().items():
new_header_stokes.set(key, val)
if new_wcs.wcs.pc[0, 0] == 1.0:
new_header_stokes.set("PC1_1", 1.0)
if new_wcs.wcs.pc[1, 1] == 1.0:
new_header_stokes.set("PC2_2", 1.0)
new_header_stokes["orientat"] = header_stokes["orientat"] + ang
# Nan handling :
fmax = np.finfo(np.float64).max
new_I_stokes[np.isnan(new_I_stokes)] = 0.0
new_Q_stokes[new_I_stokes == 0.0] = 0.0
new_U_stokes[new_I_stokes == 0.0] = 0.0
new_Q_stokes[np.isnan(new_Q_stokes)] = 0.0
new_U_stokes[np.isnan(new_U_stokes)] = 0.0
new_Stokes_cov[np.isnan(new_Stokes_cov)] = fmax
# Compute updated integrated values for P, PA
mask = deepcopy(new_data_mask).astype(bool)
I_diluted = new_I_stokes[mask].sum()
Q_diluted = new_Q_stokes[mask].sum()
U_diluted = new_U_stokes[mask].sum()
I_diluted_err = np.sqrt(np.sum(new_Stokes_cov[0, 0][mask]))
Q_diluted_err = np.sqrt(np.sum(new_Stokes_cov[1, 1][mask]))
U_diluted_err = np.sqrt(np.sum(new_Stokes_cov[2, 2][mask]))
IQ_diluted_err = np.sqrt(np.sum(new_Stokes_cov[0, 1][mask] ** 2))
IU_diluted_err = np.sqrt(np.sum(new_Stokes_cov[0, 2][mask] ** 2))
QU_diluted_err = np.sqrt(np.sum(new_Stokes_cov[1, 2][mask] ** 2))
P_diluted = np.sqrt(Q_diluted**2 + U_diluted**2) / I_diluted
P_diluted_err = (1.0 / I_diluted) * np.sqrt(
(Q_diluted**2 * Q_diluted_err**2 + U_diluted**2 * U_diluted_err**2 + 2.0 * Q_diluted * U_diluted * QU_diluted_err) / (Q_diluted**2 + U_diluted**2)
+ ((Q_diluted / I_diluted) ** 2 + (U_diluted / I_diluted) ** 2) * I_diluted_err**2
- 2.0 * (Q_diluted / I_diluted) * IQ_diluted_err
- 2.0 * (U_diluted / I_diluted) * IU_diluted_err
)
PA_diluted = princ_angle((90.0 / np.pi) * np.arctan2(U_diluted, Q_diluted))
PA_diluted_err = (90.0 / (np.pi * (Q_diluted**2 + U_diluted**2))) * np.sqrt(
U_diluted**2 * Q_diluted_err**2 + Q_diluted**2 * U_diluted_err**2 - 2.0 * Q_diluted * U_diluted * QU_diluted_err
)
new_header_stokes["P_int"] = (P_diluted, "Integrated polarization degree")
new_header_stokes["sP_int"] = (np.ceil(P_diluted_err * 1000.0) / 1000.0, "Integrated polarization degree error")
new_header_stokes["PA_int"] = (PA_diluted, "Integrated polarization angle")
new_header_stokes["sPA_int"] = (np.ceil(PA_diluted_err * 10.0) / 10.0, "Integrated polarization angle error")
return new_I_stokes, new_Q_stokes, new_U_stokes, new_Stokes_cov, new_data_mask, new_header_stokes
def rotate_data(data_array, error_array, data_mask, headers):
"""
Use scipy.ndimage.rotate to rotate I_stokes to an angle, and a rotation
matrix to rotate Q, U of a given angle in degrees and update header
orientation keyword.
----------
Inputs:
data_array : numpy.ndarray
Array of images (2D floats) to be rotated by angle ang.
error_array : numpy.ndarray
Array of error associated to images in data_array.
data_mask : numpy.ndarray
2D boolean array delimiting the data to work on.
headers : header list
List of headers corresponding to the reduced images.
----------
Returns:
new_data_array : numpy.ndarray
Updated array containing the rotated images.
new_error_array : numpy.ndarray
Updated array containing the rotated errors.
new_data_mask : numpy.ndarray
Updated 2D boolean array delimiting the data to work on.
new_headers : header list
Updated list of headers corresponding to the reduced images accounting
for the new orientation angle.
"""
# Rotate I_stokes, Q_stokes, U_stokes using rotation matrix
old_center = np.array(data_array[0].shape) / 2
shape = np.fix(np.array(data_array[0].shape) * np.sqrt(2.5)).astype(int)
new_center = np.array(shape) / 2
data_array = zeropad(data_array, [data_array.shape[0], *shape])
error_array = zeropad(error_array, [error_array.shape[0], *shape])
data_mask = zeropad(data_mask, shape)
# Rotate original images using scipy.ndimage.rotate
new_headers = []
new_data_array = []
new_error_array = []
new_data_mask = []
for i,header in zip(range(data_array.shape[0]),headers):
ang = -float(header["ORIENTAT"])
alpha = ang * np.pi / 180.0
new_data_array.append(sc_rotate(data_array[i], ang, order=1, reshape=False, cval=0.0))
new_error_array.append(sc_rotate(error_array[i], ang, order=1, reshape=False, cval=0.0))
new_data_mask.append(sc_rotate(data_mask * 10.0, ang, order=1, reshape=False, cval=0.0))
# Update headers to new angle
mrot = np.array([[np.cos(-alpha), -np.sin(-alpha)], [np.sin(-alpha), np.cos(-alpha)]])
new_header = deepcopy(header)
new_wcs = WCS(header).celestial.deepcopy()
new_wcs.wcs.pc[:2, :2] = np.dot(mrot, new_wcs.wcs.pc[:2, :2])
new_wcs.wcs.crpix[:2] = np.dot(mrot, new_wcs.wcs.crpix[:2] - old_center[::-1]) + new_center[::-1]
new_wcs.wcs.set()
for key, val in new_wcs.to_header().items():
new_header[key] = val
new_header["ORIENTAT"] = np.arccos(new_wcs.celestial.wcs.pc[0,0]) * 180.0 / np.pi
new_header["ROTATE"] = ang
new_headers.append(new_header)
new_data_array = np.array(new_data_array)
new_error_array = np.array(new_error_array)
new_data_mask = np.array(new_data_mask).sum(axis=0)
new_data_mask[new_data_mask < new_data_mask.max()*2./3.] = 0.0
new_data_mask = new_data_mask.astype(bool)
for i in range(new_data_array.shape[0]):
new_data_array[i][new_data_array[i] < 0.0] = 0.0
return new_data_array, new_error_array, new_data_mask, new_headers