1590 lines
79 KiB
Python
Executable File
1590 lines
79 KiB
Python
Executable File
"""
|
|
Library function computing various steps of the reduction pipeline.
|
|
|
|
prototypes :
|
|
- bin_ndarray(ndarray, new_shape, operation) -> ndarray
|
|
Bins an ndarray to new_shape.
|
|
|
|
- crop_array(data_array, error_array, data_mask, step, null_val, inside, display, savename, plots_folder) -> crop_data_array, crop_error_array (, crop_mask), crop_headers
|
|
Homogeneously crop out null edges off a data array.
|
|
|
|
- deconvolve_array(data_array, headers, psf, FWHM, scale, shape, iterations, algo) -> deconvolved_data_array
|
|
Homogeneously deconvolve a data array using a chosen deconvolution algorithm.
|
|
|
|
- get_error(data_array, headers, error_array, data_mask, sub_type, display, savename, plots_folder, return_background) -> data_array, error_array, headers (, background)
|
|
Compute the uncertainties from the different error sources on each image of the input array.
|
|
|
|
- rebin_array(data_array, error_array, headers, pxsize, scale, operation, data_mask) -> rebinned_data, rebinned_error, rebinned_headers, Dxy (, data_mask)
|
|
Homegeneously rebin a data array given a target pixel size in scale units.
|
|
|
|
- align_data(data_array, error_array, upsample_factor, ref_data, ref_center, return_shifts) -> data_array, error_array (, shifts, errors)
|
|
Align data_array on ref_data by cross-correlation.
|
|
|
|
- smooth_data(data_array, error_array, data_mask, headers, FWHM, scale, smoothing) -> smoothed_array, smoothed_error
|
|
Smooth data by convoluting with a gaussian or by combining weighted images
|
|
|
|
- polarizer_avg(data_array, error_array, data_mask, headers, FWHM, scale, smoothing) -> polarizer_array, polarizer_cov
|
|
Average images in data_array on each used polarizer filter and compute correlated errors.
|
|
|
|
- compute_Stokes(data_array, error_array, data_mask, headers, FWHM, scale, smoothing) -> I_stokes, Q_stokes, U_stokes, Stokes_cov
|
|
Compute Stokes parameters I, Q and U and their respective correlated errors from data_array.
|
|
|
|
- compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, headers) -> P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P
|
|
Compute polarization degree (in %) and angle (in degree) and their respective errors.
|
|
|
|
- rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, headers, ang, SNRi_cut) -> I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, headers
|
|
Rotate I, Q, U given an angle in degrees using scipy functions.
|
|
|
|
- rotate_data(data_array, error_array, data_mask, headers, ang) -> data_array, error_array, data_mask, headers
|
|
Rotate data before reduction given an angle in degrees using scipy functions.
|
|
"""
|
|
|
|
from copy import deepcopy
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.patches import Rectangle
|
|
from matplotlib.colors import LogNorm
|
|
from scipy.ndimage import rotate as sc_rotate, shift as sc_shift
|
|
from scipy.signal import fftconvolve
|
|
from astropy.wcs import WCS
|
|
from astropy import log
|
|
import warnings
|
|
from .deconvolve import deconvolve_im, gaussian_psf, gaussian2d, zeropad
|
|
from .convex_hull import image_hull, clean_ROI
|
|
from .background import bkg_fit, bkg_hist, bkg_mini
|
|
from .plots import plot_obs
|
|
from .utils import princ_angle
|
|
from .cross_correlation import phase_cross_correlation
|
|
log.setLevel('ERROR')
|
|
|
|
|
|
# Useful tabulated values
|
|
# FOC instrument
|
|
globals()['trans2'] = {'f140w': 0.21, 'f175w': 0.24, 'f220w': 0.39, 'f275w': 0.40, 'f320w': 0.89, 'f342w': 0.81,
|
|
'f430w': 0.74, 'f370lp': 0.83, 'f486n': 0.63, 'f501n': 0.68, 'f480lp': 0.82, 'clear2': 1.0}
|
|
globals()['trans3'] = {'f120m': 0.10, 'f130m': 0.10, 'f140m': 0.08, 'f152m': 0.08, 'f165w': 0.28,
|
|
'f170m': 0.18, 'f195w': 0.42, 'f190m': 0.15, 'f210m': 0.18, 'f231m': 0.18, 'clear3': 1.0}
|
|
globals()['trans4'] = {'f253m': 0.18, 'f278m': 0.26, 'f307m': 0.26, 'f130lp': 0.92, 'f346m': 0.58,
|
|
'f372m': 0.73, 'f410m': 0.58, 'f437m': 0.71, 'f470m': 0.79, 'f502m': 0.82, 'f550m': 0.77, 'clear4': 1.0}
|
|
globals()['pol_efficiency'] = {'pol0': 0.92, 'pol60': 0.92, 'pol120': 0.91}
|
|
# POL0 = 0deg, POL60 = 60deg, POL120=120deg
|
|
globals()['theta'] = np.array([180.*np.pi/180., 60.*np.pi/180., 120.*np.pi/180.])
|
|
# Uncertainties on the orientation of the polarizers' axes taken to be 3deg (see Nota et. al 1996, p36; Robinson & Thomson 1995)
|
|
globals()['sigma_theta'] = np.array([3.*np.pi/180., 3.*np.pi/180., 3.*np.pi/180.])
|
|
# Image shift between polarizers as measured by Hodge (1995)
|
|
globals()['pol_shift'] = {'pol0': np.array([0., 0.])*1., 'pol60': np.array([3.63, -0.68])*1., 'pol120': np.array([0.65, 0.20])*1.}
|
|
globals()['sigma_shift'] = {'pol0': [0.3, 0.3], 'pol60': [0.3, 0.3], 'pol120': [0.3, 0.3]}
|
|
|
|
|
|
def get_row_compressor(old_dimension, new_dimension, operation='sum'):
|
|
"""
|
|
Return the matrix that allows to compress an array from an old dimension of
|
|
rows to a new dimension of rows, can be done by summing the original
|
|
components or averaging them.
|
|
----------
|
|
Inputs:
|
|
old_dimension, new_dimension : int
|
|
Number of rows in the original and target matrices.
|
|
operation : str, optional
|
|
Set the way original components of the matrix are put together
|
|
between summing ('sum') and averaging ('average', 'avg', 'mean') them.
|
|
Defaults to 'sum'.
|
|
----------
|
|
Returns:
|
|
dim_compressor : numpy.ndarray
|
|
2D matrix allowing row compression by matrix multiplication to the left
|
|
of the original matrix.
|
|
"""
|
|
dim_compressor = np.zeros((new_dimension, old_dimension))
|
|
bin_size = float(old_dimension) / new_dimension
|
|
next_bin_break = bin_size
|
|
which_row, which_column = 0, 0
|
|
|
|
while which_row < dim_compressor.shape[0] and which_column < dim_compressor.shape[1]:
|
|
if round(next_bin_break - which_column, 10) >= 1:
|
|
dim_compressor[which_row, which_column] = 1
|
|
which_column += 1
|
|
elif next_bin_break == which_column:
|
|
|
|
which_row += 1
|
|
next_bin_break += bin_size
|
|
else:
|
|
partial_credit = next_bin_break - which_column
|
|
dim_compressor[which_row, which_column] = partial_credit
|
|
which_row += 1
|
|
dim_compressor[which_row, which_column] = 1 - partial_credit
|
|
which_column += 1
|
|
next_bin_break += bin_size
|
|
|
|
if operation.lower() in ["mean", "average", "avg"]:
|
|
dim_compressor /= bin_size
|
|
|
|
return dim_compressor
|
|
|
|
|
|
def get_column_compressor(old_dimension, new_dimension, operation='sum'):
|
|
"""
|
|
Return the matrix that allows to compress an array from an old dimension of
|
|
columns to a new dimension of columns, can be done by summing the original
|
|
components or averaging them.
|
|
----------
|
|
Inputs:
|
|
old_dimension, new_dimension : int
|
|
Number of columns in the original and target matrices.
|
|
operation : str, optional
|
|
Set the way original components of the matrix are put together
|
|
between summing ('sum') and averaging ('average', 'avg', 'mean') them.
|
|
Defaults to 'sum'.
|
|
----------
|
|
Returns:
|
|
dim_compressor : numpy.ndarray
|
|
2D matrix allowing columns compression by matrix multiplication to the
|
|
right of the original matrix.
|
|
"""
|
|
return get_row_compressor(old_dimension, new_dimension, operation).transpose()
|
|
|
|
|
|
def bin_ndarray(ndarray, new_shape, operation='sum'):
|
|
"""
|
|
Bins an ndarray in all axes based on the target shape, by summing or
|
|
averaging.
|
|
|
|
Number of output dimensions must match number of input dimensions.
|
|
|
|
Example
|
|
-------
|
|
>>> m = np.arange(0,100,1).reshape((10,10))
|
|
>>> n = bin_ndarray(m, new_shape=(5,5), operation='sum')
|
|
>>> print(n)
|
|
|
|
[[ 22 30 38 46 54]
|
|
[102 110 118 126 134]
|
|
[182 190 198 206 214]
|
|
[262 270 278 286 294]
|
|
[342 350 358 366 374]]
|
|
|
|
"""
|
|
if operation.lower() not in ['sum', 'mean', 'average', 'avg']:
|
|
raise ValueError("Operation not supported.")
|
|
if ndarray.ndim != len(new_shape):
|
|
raise ValueError("Shape mismatch: {} -> {}".format(ndarray.shape,
|
|
new_shape))
|
|
if (np.array(ndarray.shape) % np.array(new_shape) == np.array([0., 0.])).all():
|
|
compression_pairs = [(d, c//d) for d, c in zip(new_shape, ndarray.shape)]
|
|
flattened = [l for p in compression_pairs for l in p]
|
|
ndarray = ndarray.reshape(flattened)
|
|
|
|
for i in range(len(new_shape)):
|
|
if operation.lower() == "sum":
|
|
ndarray = ndarray.sum(-1*(i+1))
|
|
elif operation.lower() in ["mean", "average", "avg"]:
|
|
ndarray = ndarray.mean(-1*(i+1))
|
|
else:
|
|
row_comp = np.mat(get_row_compressor(ndarray.shape[0], new_shape[0], operation))
|
|
col_comp = np.mat(get_column_compressor(ndarray.shape[1], new_shape[1], operation))
|
|
ndarray = np.array(row_comp * np.mat(ndarray) * col_comp)
|
|
|
|
return ndarray
|
|
|
|
|
|
def crop_array(data_array, headers, error_array=None, data_mask=None, step=5, null_val=None, inside=False, display=False, savename=None, plots_folder=""):
|
|
"""
|
|
Homogeneously crop an array: all contained images will have the same shape.
|
|
'inside' parameter will decide how much should be cropped.
|
|
----------
|
|
Inputs:
|
|
data_array : numpy.ndarray
|
|
Array containing the observation data (2D float arrays) to
|
|
homogeneously crop.
|
|
headers : header list
|
|
Headers associated with the images in data_array.
|
|
error_array : numpy.ndarray, optional
|
|
Array of images (2D floats, aligned and of the same shape) containing
|
|
the error in each pixel of the observation images in data_array.
|
|
If None, will be initialized to zeros.
|
|
Defaults to None.
|
|
data_mask : numpy.ndarray, optional
|
|
2D boolean array delimiting the data to work on.
|
|
If None, will be initialized with a full true mask.
|
|
Defaults to None.
|
|
step : int, optional
|
|
For images with straight edges, not all lines and columns need to be
|
|
browsed in order to have a good convex hull. Step value determine
|
|
how many row/columns can be jumped. With step=2 every other line will
|
|
be browsed.
|
|
Defaults to 5.
|
|
null_val : float or array-like, optional
|
|
Pixel value determining the threshold for what is considered 'outside'
|
|
the image. All border pixels below this value will be taken out.
|
|
If None, will be put to 75% of the mean value of the associated error
|
|
array.
|
|
Defaults to None.
|
|
inside : boolean, optional
|
|
If True, the cropped image will be the maximum rectangle included
|
|
inside the image. If False, the cropped image will be the minimum
|
|
rectangle in which the whole image is included.
|
|
Defaults to False.
|
|
display : boolean, optional
|
|
If True, data_array will be displayed with a rectangle around the
|
|
sub-image selected for region of interest.
|
|
Defaults to False.
|
|
----------
|
|
Returns:
|
|
cropped_array : numpy.ndarray
|
|
Array containing the observationnal data homogeneously cropped.
|
|
cropped_error : numpy.ndarray
|
|
Array containing the error on the observationnal data homogeneously cropped.
|
|
headers : header list
|
|
Updated headers associated with the images in data_array.
|
|
"""
|
|
if error_array is None:
|
|
error_array = np.zeros(data_array.shape)
|
|
if null_val is None:
|
|
null_val = [1.00*error.mean() for error in error_array]
|
|
elif type(null_val) is float:
|
|
null_val = [null_val,]*error_array.shape[0]
|
|
|
|
vertex = np.zeros((data_array.shape[0], 4), dtype=int)
|
|
for i, image in enumerate(data_array): # Get vertex of the rectangular convex hull of each image
|
|
vertex[i] = image_hull(image, step=step, null_val=null_val[i], inside=inside)
|
|
v_array = np.zeros(4, dtype=int)
|
|
if inside: # Get vertex of the maximum convex hull for all images
|
|
v_array[0] = np.max(vertex[:, 0]).astype(int)
|
|
v_array[1] = np.min(vertex[:, 1]).astype(int)
|
|
v_array[2] = np.max(vertex[:, 2]).astype(int)
|
|
v_array[3] = np.min(vertex[:, 3]).astype(int)
|
|
else: # Get vertex of the minimum convex hull for all images
|
|
v_array[0] = np.min(vertex[:, 0]).astype(int)
|
|
v_array[1] = np.max(vertex[:, 1]).astype(int)
|
|
v_array[2] = np.min(vertex[:, 2]).astype(int)
|
|
v_array[3] = np.max(vertex[:, 3]).astype(int)
|
|
|
|
new_shape = np.array([v_array[1]-v_array[0], v_array[3]-v_array[2]])
|
|
rectangle = [v_array[2], v_array[0], new_shape[1], new_shape[0], 0., 'b']
|
|
crop_headers = deepcopy(headers)
|
|
crop_array = np.zeros((data_array.shape[0], new_shape[0], new_shape[1]))
|
|
crop_error_array = np.zeros((data_array.shape[0], new_shape[0], new_shape[1]))
|
|
for i, image in enumerate(data_array):
|
|
# Put the image data in the cropped array
|
|
crop_array[i] = image[v_array[0]:v_array[1], v_array[2]:v_array[3]]
|
|
crop_error_array[i] = error_array[i][v_array[0]:v_array[1], v_array[2]:v_array[3]]
|
|
# Update CRPIX value in the associated header
|
|
curr_wcs = WCS(crop_headers[i]).celestial.deepcopy()
|
|
curr_wcs.wcs.crpix[:2] = curr_wcs.wcs.crpix[:2] - np.array([v_array[2], v_array[0]])
|
|
crop_headers[i].update(curr_wcs.to_header())
|
|
crop_headers[i]['naxis1'], crop_headers[i]['naxis2'] = crop_array[i].shape
|
|
|
|
if display:
|
|
plt.rcParams.update({'font.size': 15})
|
|
fig, ax = plt.subplots(figsize=(10, 10), layout='constrained')
|
|
convert_flux = headers[0]['photflam']
|
|
data = deepcopy(data_array[0]*convert_flux)
|
|
data[data <= data[data > 0.].min()] = data[data > 0.].min()
|
|
crop = crop_array[0]*convert_flux
|
|
instr = headers[0]['instrume']
|
|
rootname = headers[0]['rootname']
|
|
exptime = headers[0]['exptime']
|
|
filt = headers[0]['filtnam1']
|
|
# plots
|
|
# im = ax.imshow(data, vmin=data.min(), vmax=data.max(), origin='lower', cmap='gray')
|
|
im = ax.imshow(data, norm=LogNorm(crop[crop > 0.].mean()/5., crop.max()), origin='lower', cmap='gray')
|
|
x, y, width, height, angle, color = rectangle
|
|
ax.add_patch(Rectangle((x, y), width, height, edgecolor=color, fill=False))
|
|
# position of centroid
|
|
ax.plot([data.shape[1]/2, data.shape[1]/2], [0, data.shape[0]-1], '--', lw=1,
|
|
color='grey', alpha=0.3)
|
|
ax.plot([0, data.shape[1]-1], [data.shape[1]/2, data.shape[1]/2], '--', lw=1,
|
|
color='grey', alpha=0.3)
|
|
ax.annotate(instr+":"+rootname, color='white', fontsize=10,
|
|
xy=(0.02, 0.95), xycoords='axes fraction')
|
|
ax.annotate(filt, color='white', fontsize=14, xy=(0.02, 0.02),
|
|
xycoords='axes fraction')
|
|
ax.annotate(str(exptime)+" s", color='white', fontsize=10, xy=(0.80, 0.02),
|
|
xycoords='axes fraction')
|
|
ax.set(title="Location of cropped image.", xlabel='pixel offset', ylabel='pixel offset')
|
|
|
|
# fig.subplots_adjust(hspace=0, wspace=0, right=0.85)
|
|
# cbar_ax = fig.add_axes([0.9, 0.12, 0.02, 0.75])
|
|
fig.colorbar(im, ax=ax, label=r"Flux [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
|
|
|
|
if savename is not None:
|
|
fig.savefig("/".join([plots_folder, savename+'_'+filt+'_crop_region.pdf']),
|
|
bbox_inches='tight', dpi=200)
|
|
plot_obs(data_array, headers, vmin=convert_flux*data_array[data_array > 0.].mean()/5.,
|
|
vmax=convert_flux*data_array[data_array > 0.].max(), rectangle=[rectangle,]*len(headers),
|
|
savename=savename+'_crop_region', plots_folder=plots_folder)
|
|
plt.show()
|
|
|
|
if data_mask is not None:
|
|
crop_mask = data_mask[v_array[0]:v_array[1], v_array[2]:v_array[3]]
|
|
return crop_array, crop_error_array, crop_mask, crop_headers
|
|
else:
|
|
return crop_array, crop_error_array, crop_headers
|
|
|
|
|
|
def deconvolve_array(data_array, headers, psf='gaussian', FWHM=1., scale='px',
|
|
shape=None, iterations=20, algo='richardson'):
|
|
"""
|
|
Homogeneously deconvolve a data array using Richardson-Lucy iterative algorithm.
|
|
----------
|
|
Inputs:
|
|
data_array : numpy.ndarray
|
|
Array containing the observation data (2D float arrays) to
|
|
homogeneously deconvolve.
|
|
headers : header list
|
|
Headers associated with the images in data_array.
|
|
psf : str or numpy.ndarray, optionnal
|
|
String designing the type of desired Point Spread Function or array
|
|
of dimension 2 corresponding to the weights of a PSF.
|
|
Defaults to 'gaussian' type PSF.
|
|
FWHM : float, optional
|
|
Full Width at Half Maximum for desired PSF in 'scale units. Only used
|
|
for relevant values of 'psf' variable.
|
|
Defaults to 1.
|
|
scale : str, optional
|
|
Scale units for the FWHM of the PSF between 'pixel' and 'arcsec'.
|
|
Defaults to 'pixel'.
|
|
shape : tuple, optional
|
|
Shape of the kernel of the PSF. Must be of dimension 2. Only used for
|
|
relevant values of 'psf' variable.
|
|
Defaults to (9,9).
|
|
iterations : int, optional
|
|
Number of iterations for iterative deconvolution algorithms. Act as
|
|
as a regulation of the process.
|
|
Defaults to 20.
|
|
algo : str, optional
|
|
Name of the deconvolution algorithm that will be used. Implemented
|
|
algorithms are the following : 'Wiener', 'Van-Cittert',
|
|
'One Step Gradient', 'Conjugate Gradient' and 'Richardson-Lucy'.
|
|
Defaults to 'Richardson-Lucy'.
|
|
----------
|
|
Returns:
|
|
deconv_array : numpy.ndarray
|
|
Array containing the deconvolved data (2D float arrays) using given
|
|
point spread function.
|
|
"""
|
|
# If chosen FWHM scale is 'arcsec', compute FWHM in pixel scale
|
|
if scale.lower() in ['arcsec', 'arcseconds']:
|
|
pxsize = np.zeros((data_array.shape[0], 2))
|
|
for i, header in enumerate(headers):
|
|
# Get current pixel size
|
|
w = WCS(header).celestial.deepcopy()
|
|
pxsize[i] = np.round(w.wcs.cdelt/3600., 15)
|
|
if (pxsize != pxsize[0]).any():
|
|
raise ValueError("Not all images in array have same pixel size")
|
|
FWHM /= pxsize[0].min()
|
|
|
|
# Define Point-Spread-Function kernel
|
|
if psf.lower() in ['gauss', 'gaussian']:
|
|
if shape is None:
|
|
shape = np.min(data_array[0].shape)-2, np.min(data_array[0].shape)-2
|
|
kernel = gaussian_psf(FWHM=FWHM, shape=shape)
|
|
elif isinstance(psf, np.ndarray) and (len(psf.shape) == 2):
|
|
kernel = psf
|
|
else:
|
|
raise ValueError("{} is not a valid value for 'psf'".format(psf))
|
|
|
|
# Deconvolve images in the array using given PSF
|
|
deconv_array = np.zeros(data_array.shape)
|
|
for i, image in enumerate(data_array):
|
|
deconv_array[i] = deconvolve_im(image, kernel, iterations=iterations, clip=True, filter_epsilon=None, algo=algo)
|
|
|
|
return deconv_array
|
|
|
|
|
|
def get_error(data_array, headers, error_array=None, data_mask=None, sub_type=None, subtract_error=True, display=False, savename=None, plots_folder="", return_background=False):
|
|
"""
|
|
Look for sub-image of shape sub_shape that have the smallest integrated
|
|
flux (no source assumption) and define the background on the image by the
|
|
standard deviation on this sub-image.
|
|
----------
|
|
Inputs:
|
|
data_array : numpy.ndarray
|
|
Array containing the data to study (2D float arrays).
|
|
headers : header list
|
|
Headers associated with the images in data_array.
|
|
error_array : numpy.ndarray, optional
|
|
Array of images (2D floats, aligned and of the same shape) containing
|
|
the error in each pixel of the observation images in data_array.
|
|
If None, will be initialized to zeros.
|
|
Defaults to None.
|
|
data_mask : numpy.ndarray, optional
|
|
2D boolean array delimiting the data to work on.
|
|
If None, will be initialized with a full true mask.
|
|
Defaults to None.
|
|
sub_type : str or int or tuple, optional
|
|
If 'auto', look for optimal binning and fit intensity histogram with au gaussian.
|
|
If str or None, statistic rule to be used for the number of bins in counts/s.
|
|
If int, number of bins for the counts/s histogram.
|
|
If tuple, shape of the sub-image to look for. Must be odd.
|
|
Defaults to None.
|
|
subtract_error : float or bool, optional
|
|
If float, factor to which the estimated background should be multiplied
|
|
If False the background is not subtracted.
|
|
Defaults to True (factor = 1.).
|
|
display : boolean, optional
|
|
If True, data_array will be displayed with a rectangle around the
|
|
sub-image selected for background computation.
|
|
Defaults to False.
|
|
savename : str, optional
|
|
Name of the figure the map should be saved to. If None, the map won't
|
|
be saved (only displayed). Only used if display is True.
|
|
Defaults to None.
|
|
plots_folder : str, optional
|
|
Relative (or absolute) filepath to the folder in wich the map will
|
|
be saved. Not used if savename is None.
|
|
Defaults to current folder.
|
|
return_background : boolean, optional
|
|
If True, the pixel background value for each image in data_array is
|
|
returned.
|
|
Defaults to False.
|
|
----------
|
|
Returns:
|
|
data_array : numpy.ndarray
|
|
Array containing the data to study minus the background.
|
|
error_array : numpy.ndarray
|
|
Array containing the background values associated to the images in
|
|
data_array.
|
|
headers : header list
|
|
Updated headers associated with the images in data_array.
|
|
background : numpy.ndarray
|
|
Array containing the pixel background value for each image in
|
|
data_array.
|
|
Only returned if return_background is True.
|
|
"""
|
|
# Crop out any null edges
|
|
if error_array is None:
|
|
error_array = np.zeros(data_array.shape)
|
|
data, error = deepcopy(data_array), deepcopy(error_array)
|
|
if data_mask is not None:
|
|
mask = deepcopy(data_mask)
|
|
else:
|
|
data_c, error_c, _ = crop_array(data, headers, error, step=5, null_val=0., inside=False)
|
|
mask_c = np.ones(data_c[0].shape, dtype=bool)
|
|
for i, (data_ci, error_ci) in enumerate(zip(data_c, error_c)):
|
|
data[i], error[i] = zeropad(data_ci, data[i].shape), zeropad(error_ci, error[i].shape)
|
|
mask = zeropad(mask_c, data[0].shape).astype(bool)
|
|
background = np.zeros((data.shape[0]))
|
|
|
|
# wavelength dependence of the polarizer filters
|
|
# estimated to less than 1%
|
|
err_wav = data*0.01
|
|
# difference in PSFs through each polarizers
|
|
# estimated to less than 3%
|
|
err_psf = data*0.03
|
|
# flatfielding uncertainties
|
|
# estimated to less than 3%
|
|
err_flat = data*0.03
|
|
|
|
if (sub_type is None):
|
|
n_data_array, c_error_bkg, headers, background = bkg_hist(
|
|
data, error, mask, headers, subtract_error=subtract_error, display=display, savename=savename, plots_folder=plots_folder)
|
|
elif isinstance(sub_type, str):
|
|
if sub_type.lower() in ['auto']:
|
|
n_data_array, c_error_bkg, headers, background = bkg_fit(
|
|
data, error, mask, headers, subtract_error=subtract_error, display=display, savename=savename, plots_folder=plots_folder)
|
|
else:
|
|
n_data_array, c_error_bkg, headers, background = bkg_hist(
|
|
data, error, mask, headers, sub_type=sub_type, subtract_error=subtract_error, display=display, savename=savename, plots_folder=plots_folder)
|
|
elif isinstance(sub_type, tuple):
|
|
n_data_array, c_error_bkg, headers, background = bkg_mini(
|
|
data, error, mask, headers, sub_shape=sub_type, subtract_error=subtract_error, display=display, savename=savename, plots_folder=plots_folder)
|
|
else:
|
|
print("Warning: Invalid subtype.")
|
|
|
|
# Quadratically add uncertainties in the "correction factors" (see Kishimoto 1999)
|
|
n_error_array = np.sqrt(err_wav**2+err_psf**2+err_flat**2+c_error_bkg**2)
|
|
|
|
if return_background:
|
|
return n_data_array, n_error_array, headers, background
|
|
else:
|
|
return n_data_array, n_error_array, headers
|
|
|
|
|
|
def rebin_array(data_array, error_array, headers, pxsize, scale, operation='sum', data_mask=None):
|
|
"""
|
|
Homogeneously rebin a data array to get a new pixel size equal to pxsize
|
|
where pxsize is given in arcsec.
|
|
----------
|
|
Inputs:
|
|
data_array, error_array : numpy.ndarray
|
|
Arrays containing the images (2D float arrays) and their associated
|
|
errors that will be rebinned.
|
|
headers : header list
|
|
List of headers corresponding to the images in data_array.
|
|
pxsize : float
|
|
Physical size of the pixel in arcseconds that should be obtain with
|
|
the rebinning.
|
|
scale : str, optional
|
|
Scale units for the FWHM between 'pixel' and 'arcsec'.
|
|
Defaults to 'pixel'.
|
|
operation : str, optional
|
|
Set the way original components of the matrix are put together
|
|
between summing ('sum') and averaging ('average', 'avg', 'mean') them.
|
|
Defaults to 'sum'.
|
|
data_mask : numpy.ndarray, optional
|
|
2D boolean array delimiting the data to work on.
|
|
If None, will be initialized with a full true mask.
|
|
Defaults to None.
|
|
----------
|
|
Returns:
|
|
rebinned_data, rebinned_error : numpy.ndarray
|
|
Rebinned arrays containing the images and associated errors.
|
|
rebinned_headers : header list
|
|
Updated headers corresponding to the images in rebinned_data.
|
|
Dxy : numpy.ndarray
|
|
Array containing the rebinning factor in each direction of the image.
|
|
data_mask : numpy.ndarray, optional
|
|
Updated 2D boolean array delimiting the data to work on.
|
|
Only returned if inputed data_mask is not None.
|
|
"""
|
|
# Check that all images are from the same instrument
|
|
ref_header = headers[0]
|
|
instr = ref_header['instrume']
|
|
same_instr = np.array([instr == header['instrume'] for header in headers]).all()
|
|
if not same_instr:
|
|
raise ValueError("All images in data_array are not from the same\
|
|
instrument, cannot proceed.")
|
|
if instr not in ['FOC']:
|
|
raise ValueError("Cannot reduce images from {0:s} instrument\
|
|
(yet)".format(instr))
|
|
|
|
rebinned_data, rebinned_error, rebinned_headers = [], [], []
|
|
Dxy = np.array([1., 1.])
|
|
|
|
# Routine for the FOC instrument
|
|
if instr == 'FOC':
|
|
HST_aper = 2400. # HST aperture in mm
|
|
Dxy_arr = np.ones((data_array.shape[0], 2))
|
|
for i, (image, error, header) in enumerate(list(zip(data_array, error_array, headers))):
|
|
# Get current pixel size
|
|
w = WCS(header).celestial.deepcopy()
|
|
new_header = deepcopy(header)
|
|
|
|
# Compute binning ratio
|
|
if scale.lower() in ['px', 'pixel']:
|
|
Dxy_arr[i] = np.array([pxsize,]*2)
|
|
elif scale.lower() in ['arcsec', 'arcseconds']:
|
|
Dxy_arr[i] = np.array(pxsize/np.abs(w.wcs.cdelt)/3600.)
|
|
elif scale.lower() in ['full', 'integrate']:
|
|
Dxy_arr[i] = image.shape
|
|
else:
|
|
raise ValueError("'{0:s}' invalid scale for binning.".format(scale))
|
|
new_shape = np.ceil(min(image.shape/Dxy_arr, key=lambda x: x[0]+x[1])).astype(int)
|
|
|
|
for i, (image, error, header) in enumerate(list(zip(data_array, error_array, headers))):
|
|
# Get current pixel size
|
|
w = WCS(header).celestial.deepcopy()
|
|
new_header = deepcopy(header)
|
|
|
|
Dxy = image.shape/new_shape
|
|
if (Dxy < 1.).any():
|
|
raise ValueError("Requested pixel size is below resolution.")
|
|
|
|
# Rebin data
|
|
rebin_data = bin_ndarray(image, new_shape=new_shape, operation=operation)
|
|
rebinned_data.append(rebin_data)
|
|
|
|
# Propagate error
|
|
rms_image = np.sqrt(bin_ndarray(image**2, new_shape=new_shape, operation='average'))
|
|
sum_image = bin_ndarray(image, new_shape=new_shape, operation='sum')
|
|
mask = sum_image > 0.
|
|
new_error = np.zeros(rms_image.shape)
|
|
if operation.lower() in ["mean", "average", "avg"]:
|
|
new_error = np.sqrt(bin_ndarray(error**2, new_shape=new_shape, operation='average'))
|
|
else:
|
|
new_error = np.sqrt(bin_ndarray(error**2, new_shape=new_shape, operation='sum'))
|
|
rebinned_error.append(np.sqrt(rms_image**2 + new_error**2))
|
|
|
|
# Update header
|
|
nw = w.deepcopy()
|
|
nw.wcs.cdelt *= Dxy
|
|
nw.wcs.crpix /= Dxy
|
|
nw.array_shape = new_shape
|
|
new_header['NAXIS1'], new_header['NAXIS2'] = nw.array_shape
|
|
for key, val in nw.to_header().items():
|
|
new_header.set(key, val)
|
|
rebinned_headers.append(new_header)
|
|
if data_mask is not None:
|
|
data_mask = bin_ndarray(data_mask, new_shape=new_shape, operation='average') > 0.80
|
|
|
|
rebinned_data = np.array(rebinned_data)
|
|
rebinned_error = np.array(rebinned_error)
|
|
|
|
if data_mask is None:
|
|
return rebinned_data, rebinned_error, rebinned_headers, Dxy
|
|
else:
|
|
return rebinned_data, rebinned_error, rebinned_headers, Dxy, data_mask
|
|
|
|
|
|
def align_data(data_array, headers, error_array=None, background=None, upsample_factor=1., ref_data=None, ref_center=None, return_shifts=False):
|
|
"""
|
|
Align images in data_array using cross correlation, and rescale them to
|
|
wider images able to contain any rotation of the reference image.
|
|
All images in data_array must have the same shape.
|
|
----------
|
|
Inputs:
|
|
data_array : numpy.ndarray
|
|
Array containing the data to align (2D float arrays).
|
|
headers : header list
|
|
List of headers corresponding to the images in data_array.
|
|
error_array : numpy.ndarray, optional
|
|
Array of images (2D floats, aligned and of the same shape) containing
|
|
the error in each pixel of the observation images in data_array.
|
|
If None, will be initialized to zeros.
|
|
Defaults to None.
|
|
upsample_factor : float, optional
|
|
Oversampling factor for the cross-correlation, will allow sub-
|
|
pixel alignement as small as one over the factor of a pixel.
|
|
Defaults to one (no over-sampling).
|
|
ref_data : numpy.ndarray, optional
|
|
Reference image (2D float array) the data_array should be
|
|
aligned to. If "None", the ref_data will be the first image
|
|
of the data_array.
|
|
Defaults to None.
|
|
ref_center : numpy.ndarray, optional
|
|
Array containing the coordinates of the center of the reference
|
|
image or a string in 'max', 'flux', 'maxflux', 'max_flux'. If None,
|
|
will fallback to the center of the image.
|
|
Defaults to None.
|
|
return_shifts : boolean, optional
|
|
If False, calling the function will only return the array of
|
|
rescaled images. If True, will also return the shifts and
|
|
errors.
|
|
Defaults to True.
|
|
----------
|
|
Returns:
|
|
rescaled : numpy.ndarray
|
|
Array containing the aligned data from data_array, rescaled to wider
|
|
image with margins of value 0.
|
|
rescaled_error : numpy.ndarray
|
|
Array containing the errors on the aligned images in the rescaled array.
|
|
data_mask : numpy.ndarray
|
|
2D boolean array delimiting the data to work on.
|
|
shifts : numpy.ndarray
|
|
Array containing the pixel shifts on the x and y directions from
|
|
the reference image.
|
|
Only returned if return_shifts is True.
|
|
errors : numpy.ndarray
|
|
Array containing the relative error computed on every shift value.
|
|
Only returned if return_shifts is True.
|
|
"""
|
|
if ref_data is None:
|
|
# Define the reference to be the first image of the inputed array
|
|
# if None have been specified
|
|
ref_data = data_array[0]
|
|
same = 1
|
|
for array in data_array:
|
|
# Check if all images have the same shape. If not, cross-correlation
|
|
# cannot be computed.
|
|
same *= (array.shape == ref_data.shape)
|
|
if not same:
|
|
raise ValueError("All images in data_array must have same shape as\
|
|
ref_data")
|
|
if (error_array is None) or (background is None):
|
|
_, error_array, headers, background = get_error(data_array, headers, return_background=True)
|
|
|
|
# Crop out any null edges
|
|
# (ref_data must be cropped as well)
|
|
full_array = np.concatenate((data_array, [ref_data]), axis=0)
|
|
full_headers = deepcopy(headers)
|
|
full_headers.append(headers[0])
|
|
err_array = np.concatenate((error_array, [np.zeros(ref_data.shape)]), axis=0)
|
|
|
|
full_array, err_array, full_headers = crop_array(full_array, full_headers, err_array, step=5, inside=False, null_val=0.)
|
|
|
|
data_array, ref_data, headers = full_array[:-1], full_array[-1], full_headers[:-1]
|
|
error_array = err_array[:-1]
|
|
do_shift = True
|
|
if ref_center is None:
|
|
# Define the center of the reference image to be the center pixel
|
|
# if None have been specified
|
|
ref_center = (np.array(ref_data.shape)/2).astype(int)
|
|
do_shift = False
|
|
elif ref_center.lower() in ['max', 'flux', 'maxflux', 'max_flux']:
|
|
# Define the center of the reference image to be the pixel of max flux.
|
|
ref_center = np.unravel_index(np.argmax(ref_data), ref_data.shape)
|
|
else:
|
|
# Default to image center.
|
|
ref_center = (np.array(ref_data.shape)/2).astype(int)
|
|
|
|
# Create a rescaled null array that can contain any rotation of the
|
|
# original image (and shifted images)
|
|
shape = data_array.shape
|
|
res_shape = int(np.ceil(np.sqrt(2.)*np.max(shape[1:])))
|
|
rescaled_image = np.zeros((shape[0], res_shape, res_shape))
|
|
rescaled_error = np.ones((shape[0], res_shape, res_shape))
|
|
rescaled_mask = np.zeros((shape[0], res_shape, res_shape), dtype=bool)
|
|
res_center = (np.array(rescaled_image.shape[1:])/2).astype(int)
|
|
res_shift = res_center-ref_center
|
|
res_mask = np.zeros((res_shape, res_shape), dtype=bool)
|
|
res_mask[res_shift[0]:res_shift[0]+shape[1], res_shift[1]:res_shift[1]+shape[2]] = True
|
|
|
|
shifts, errors = [], []
|
|
for i, image in enumerate(data_array):
|
|
# Initialize rescaled images to background values
|
|
rescaled_error[i] *= 0.01*background[i]
|
|
# Get shifts and error by cross-correlation to ref_data
|
|
if do_shift:
|
|
shift, error, _ = phase_cross_correlation(ref_data/ref_data.max(), image/image.max(), upsample_factor=upsample_factor)
|
|
else:
|
|
shift = globals["pol_shift"][headers[i]['filtnam1'].lower()]
|
|
error = globals["sigma_shift"][headers[i]['filtnam1'].lower()]
|
|
# Rescale image to requested output
|
|
rescaled_image[i, res_shift[0]:res_shift[0]+shape[1], res_shift[1]:res_shift[1]+shape[2]] = deepcopy(image)
|
|
rescaled_error[i, res_shift[0]:res_shift[0]+shape[1], res_shift[1]:res_shift[1]+shape[2]] = deepcopy(error_array[i])
|
|
# Shift images to align
|
|
rescaled_image[i] = sc_shift(rescaled_image[i], shift, order=1, cval=0.)
|
|
rescaled_error[i] = sc_shift(rescaled_error[i], shift, order=1, cval=background[i])
|
|
|
|
curr_mask = sc_shift(res_mask, shift, order=1, cval=False)
|
|
mask_vertex = clean_ROI(curr_mask)
|
|
rescaled_mask[i, mask_vertex[2]:mask_vertex[3], mask_vertex[0]:mask_vertex[1]] = True
|
|
|
|
rescaled_image[i][rescaled_image[i] < 0.] = 0.
|
|
rescaled_image[i][(1-rescaled_mask[i]).astype(bool)] = 0.
|
|
|
|
# Uncertainties from shifting
|
|
prec_shift = np.array([1., 1.])/upsample_factor
|
|
shifted_image = sc_shift(rescaled_image[i], prec_shift, cval=0.)
|
|
error_shift = np.abs(rescaled_image[i] - shifted_image)/2.
|
|
# sum quadratically the errors
|
|
rescaled_error[i] = np.sqrt(rescaled_error[i]**2 + error_shift**2)
|
|
|
|
shifts.append(shift)
|
|
errors.append(error)
|
|
|
|
shifts = np.array(shifts)
|
|
errors = np.array(errors)
|
|
|
|
# Update headers CRPIX value
|
|
headers_wcs = [WCS(header).celestial.deepcopy() for header in headers]
|
|
new_crpix = np.array([wcs.wcs.crpix for wcs in headers_wcs]) + shifts[:, ::-1] + res_shift[::-1]
|
|
for i in range(len(headers_wcs)):
|
|
headers_wcs[i].wcs.crpix = new_crpix[0]
|
|
headers[i].update(headers_wcs[i].to_header())
|
|
|
|
data_mask = rescaled_mask.all(axis=0)
|
|
data_array, error_array, data_mask, headers = crop_array(rescaled_image, headers, rescaled_error, data_mask, null_val=0.01*background)
|
|
|
|
if return_shifts:
|
|
return data_array, error_array, headers, data_mask, shifts, errors
|
|
else:
|
|
return data_array, error_array, headers, data_mask
|
|
|
|
|
|
def smooth_data(data_array, error_array, data_mask, headers, FWHM=1., scale='pixel', smoothing='gaussian'):
|
|
"""
|
|
Smooth a data_array using selected function.
|
|
----------
|
|
Inputs:
|
|
data_array : numpy.ndarray
|
|
Array containing the data to smooth (2D float arrays).
|
|
error_array : numpy.ndarray
|
|
Array of images (2D floats, aligned and of the same shape) containing
|
|
the error in each pixel of the observation images in data_array.
|
|
data_mask : numpy.ndarray
|
|
2D boolean array delimiting the data to work on.
|
|
headers : header list
|
|
List of headers corresponding to the images in data_array.
|
|
FWHM : float, optional
|
|
Full Width at Half Maximum for desired smoothing in 'scale' units.
|
|
Defaults to 1.
|
|
scale : str, optional
|
|
Scale units for the FWHM between 'pixel' and 'arcsec'.
|
|
Defaults to 'pixel'.
|
|
smoothing : str, optional
|
|
Smoothing algorithm to be used on the input data array.
|
|
-'combine','combining' use the N images combining algorithm with
|
|
weighted pixels (inverse square of errors).
|
|
-'gauss','gaussian' convolve any input image with a gaussian of
|
|
standard deviation stdev = FWHM/(2*sqrt(2*log(2))).
|
|
Defaults to 'gaussian'. Won't be used if FWHM is None.
|
|
----------
|
|
Returns:
|
|
smoothed_array : numpy.ndarray
|
|
Array containing the smoothed images.
|
|
error_array : numpy.ndarray
|
|
Array containing the error images corresponding to the images in
|
|
smoothed_array.
|
|
"""
|
|
# If chosen FWHM scale is 'arcsec', compute FWHM in pixel scale
|
|
if scale.lower() in ['arcsec', 'arcseconds']:
|
|
pxsize = np.zeros((data_array.shape[0], 2))
|
|
for i, header in enumerate(headers):
|
|
# Get current pixel size
|
|
w = WCS(header).celestial.deepcopy()
|
|
pxsize[i] = np.round(w.wcs.cdelt*3600., 4)
|
|
if (pxsize != pxsize[0]).any():
|
|
raise ValueError("Not all images in array have same pixel size")
|
|
FWHM /= pxsize[0].min()
|
|
|
|
# Define gaussian stdev
|
|
stdev = FWHM/(2.*np.sqrt(2.*np.log(2)))
|
|
fmax = np.finfo(np.double).max
|
|
|
|
if smoothing.lower() in ['combine', 'combining']:
|
|
# Smooth using N images combination algorithm
|
|
# Weight array
|
|
weight = 1./error_array**2
|
|
# Prepare pixel distance matrix
|
|
xx, yy = np.indices(data_array[0].shape)
|
|
# Initialize smoothed image and error arrays
|
|
smoothed = np.zeros(data_array[0].shape)
|
|
error = np.zeros(data_array[0].shape)
|
|
|
|
# Combination smoothing algorithm
|
|
for r in range(smoothed.shape[0]):
|
|
for c in range(smoothed.shape[1]):
|
|
# Compute distance from current pixel
|
|
dist_rc = np.where(data_mask, np.sqrt((r-xx)**2+(c-yy)**2), fmax)
|
|
# Catch expected "OverflowWarning" as we overflow values that are not in the image
|
|
with warnings.catch_warnings(record=True) as w:
|
|
g_rc = np.array([np.exp(-0.5*(dist_rc/stdev)**2)/(2.*np.pi*stdev**2),]*data_array.shape[0])
|
|
# Apply weighted combination
|
|
smoothed[r, c] = np.where(data_mask[r, c], np.sum(data_array*weight*g_rc)/np.sum(weight*g_rc), data_array.mean(axis=0)[r, c])
|
|
error[r, c] = np.where(data_mask[r, c], np.sqrt(np.sum(weight*g_rc**2))/np.sum(weight*g_rc),
|
|
(np.sqrt(np.sum(error_array**2, axis=0)/error_array.shape[0]))[r, c])
|
|
|
|
# Nan handling
|
|
error[np.logical_or(np.isnan(smoothed*error), 1-data_mask)] = 0.
|
|
smoothed[np.logical_or(np.isnan(smoothed*error), 1-data_mask)] = 0.
|
|
|
|
elif smoothing.lower() in ['weight_gauss', 'weighted_gaussian', 'gauss', 'gaussian']:
|
|
# Convolution with gaussian function
|
|
smoothed = np.zeros(data_array.shape)
|
|
error = np.zeros(error_array.shape)
|
|
for i, (image, image_error) in enumerate(zip(data_array, error_array)):
|
|
x, y = np.meshgrid(np.arange(-image.shape[1]/2, image.shape[1]/2), np.arange(-image.shape[0]/2, image.shape[0]/2))
|
|
weights = np.ones(image_error.shape)
|
|
if smoothing.lower()[:6] in ['weight']:
|
|
weights = 1./image_error**2
|
|
weights[(1-np.isfinite(weights)).astype(bool)] = 0.
|
|
weights[(1-data_mask).astype(bool)] = 0.
|
|
weights /= weights.sum()
|
|
kernel = gaussian2d(x, y, stdev)
|
|
kernel /= kernel.sum()
|
|
smoothed[i] = np.where(data_mask, fftconvolve(image*weights, kernel, 'same')/fftconvolve(weights, kernel, 'same'), image)
|
|
error[i] = np.where(data_mask, np.sqrt(fftconvolve(image_error**2*weights**2, kernel**2, 'same'))/fftconvolve(weights, kernel, 'same'), image_error)
|
|
|
|
# Nan handling
|
|
error[i][np.logical_or(np.isnan(smoothed[i]*error[i]), 1-data_mask)] = 0.
|
|
smoothed[i][np.logical_or(np.isnan(smoothed[i]*error[i]), 1-data_mask)] = 0.
|
|
|
|
else:
|
|
raise ValueError("{} is not a valid smoothing option".format(smoothing))
|
|
|
|
return smoothed, error
|
|
|
|
|
|
def polarizer_avg(data_array, error_array, data_mask, headers, FWHM=None, scale='pixel', smoothing='gaussian'):
|
|
"""
|
|
Make the average image from a single polarizer for a given instrument.
|
|
-----------
|
|
Inputs:
|
|
data_array : numpy.ndarray
|
|
Array of images (2D floats, aligned and of the same shape) of a
|
|
single observation with multiple polarizers of an instrument.
|
|
error_array : numpy.ndarray
|
|
Array of images (2D floats, aligned and of the same shape) containing
|
|
the error in each pixel of the observation images in data_array.
|
|
data_mask : numpy.ndarray
|
|
2D boolean array delimiting the data to work on.
|
|
headers : header list
|
|
List of headers corresponding to the images in data_array.
|
|
FWHM : float, optional
|
|
Full Width at Half Maximum of the detector for smoothing of the
|
|
data on each polarizer filter in 'scale' units. If None, no smoothing
|
|
is done.
|
|
Defaults to None.
|
|
scale : str, optional
|
|
Scale units for the FWHM between 'pixel' and 'arcsec'.
|
|
Defaults to 'pixel'.
|
|
smoothing : str, optional
|
|
Smoothing algorithm to be used on the input data array.
|
|
-'combine','combining' use the N images combining algorithm with
|
|
weighted pixels (inverse square of errors).
|
|
-'gaussian' convolve any input image with a gaussian of standard
|
|
deviation stdev = FWHM/(2*sqrt(2*log(2))).
|
|
Defaults to 'gaussian'. Won't be used if FWHM is None.
|
|
----------
|
|
Returns:
|
|
polarizer_array : numpy.ndarray
|
|
Array of images averaged on each polarizer filter of the instrument
|
|
polarizer_cov : numpy.ndarray
|
|
Covariance matrix between the polarizer images in polarizer_array
|
|
"""
|
|
# Check that all images are from the same instrument
|
|
instr = headers[0]['instrume']
|
|
same_instr = np.array([instr == header['instrume'] for header in headers]).all()
|
|
if not same_instr:
|
|
raise ValueError("All images in data_array are not from the same\
|
|
instrument, cannot proceed.")
|
|
if instr not in ['FOC']:
|
|
raise ValueError("Cannot reduce images from {0:s} instrument\
|
|
(yet)".format(instr))
|
|
|
|
# Routine for the FOC instrument
|
|
if instr == 'FOC':
|
|
# Sort images by polarizer filter : can be 0deg, 60deg, 120deg for the FOC
|
|
is_pol0 = np.array([header['filtnam1'] == 'POL0' for header in headers])
|
|
if (1-is_pol0).all():
|
|
print("Warning : no image for POL0 of FOC found, averaged data\
|
|
will be NAN")
|
|
is_pol60 = np.array([header['filtnam1'] == 'POL60' for header in headers])
|
|
if (1-is_pol60).all():
|
|
print("Warning : no image for POL60 of FOC found, averaged data\
|
|
will be NAN")
|
|
is_pol120 = np.array([header['filtnam1'] == 'POL120' for header in headers])
|
|
if (1-is_pol120).all():
|
|
print("Warning : no image for POL120 of FOC found, averaged data\
|
|
will be NAN")
|
|
# Put each polarizer images in separate arrays
|
|
headers0 = [header for header in headers if header['filtnam1'] == 'POL0']
|
|
headers60 = [header for header in headers if header['filtnam1'] == 'POL60']
|
|
headers120 = [header for header in headers if header['filtnam1'] == 'POL120']
|
|
|
|
pol0_array = data_array[is_pol0]
|
|
pol60_array = data_array[is_pol60]
|
|
pol120_array = data_array[is_pol120]
|
|
|
|
err0_array = error_array[is_pol0]
|
|
err60_array = error_array[is_pol60]
|
|
err120_array = error_array[is_pol120]
|
|
|
|
# For a single observation, combination amount to a weighted gaussian
|
|
if np.max([is_pol0.sum(), is_pol60.sum(), is_pol120.sum()]) == 1 and smoothing.lower() in ['combine', 'combining']:
|
|
smoothing = 'weighted_gaussian'
|
|
|
|
if (FWHM is not None) and (smoothing.lower() in ['combine', 'combining']):
|
|
# Smooth by combining each polarizer images
|
|
pol0, err0 = smooth_data(pol0_array, err0_array, data_mask, headers0, FWHM=FWHM, scale=scale, smoothing=smoothing)
|
|
pol60, err60 = smooth_data(pol60_array, err60_array, data_mask, headers60, FWHM=FWHM, scale=scale, smoothing=smoothing)
|
|
pol120, err120 = smooth_data(pol120_array, err120_array, data_mask, headers120, FWHM=FWHM, scale=scale, smoothing=smoothing)
|
|
|
|
else:
|
|
# Sum on each polarization filter.
|
|
pol0_t = np.sum([header['exptime'] for header in headers0])
|
|
pol60_t = np.sum([header['exptime'] for header in headers60])
|
|
pol120_t = np.sum([header['exptime'] for header in headers120])
|
|
|
|
for i in range(pol0_array.shape[0]):
|
|
pol0_array[i] *= headers0[i]['exptime']
|
|
err0_array[i] *= headers0[i]['exptime']
|
|
for i in range(pol60_array.shape[0]):
|
|
pol60_array[i] *= headers60[i]['exptime']
|
|
err60_array[i] *= headers60[i]['exptime']
|
|
for i in range(pol120_array.shape[0]):
|
|
pol120_array[i] *= headers120[i]['exptime']
|
|
err120_array[i] *= headers120[i]['exptime']
|
|
|
|
pol0 = pol0_array.sum(axis=0)/pol0_t
|
|
pol60 = pol60_array.sum(axis=0)/pol60_t
|
|
pol120 = pol120_array.sum(axis=0)/pol120_t
|
|
pol_array = np.array([pol0, pol60, pol120])
|
|
pol_headers = [headers0[0], headers60[0], headers120[0]]
|
|
|
|
# Propagate uncertainties quadratically
|
|
err0 = np.sqrt(np.sum(err0_array**2, axis=0))/pol0_t
|
|
err60 = np.sqrt(np.sum(err60_array**2, axis=0))/pol60_t
|
|
err120 = np.sqrt(np.sum(err120_array**2, axis=0))/pol120_t
|
|
polerr_array = np.array([err0, err60, err120])
|
|
|
|
if not (FWHM is None) and (smoothing.lower() in ['gaussian', 'gauss', 'weighted_gaussian', 'weight_gauss']):
|
|
# Smooth by convoluting with a gaussian each polX image.
|
|
pol_array, polerr_array = smooth_data(pol_array, polerr_array, data_mask, pol_headers, FWHM=FWHM, scale=scale, smoothing=smoothing)
|
|
pol0, pol60, pol120 = pol_array
|
|
err0, err60, err120 = polerr_array
|
|
|
|
# Update headers
|
|
for header in headers:
|
|
if header['filtnam1'] == 'POL0':
|
|
list_head = headers0
|
|
elif header['filtnam1'] == 'POL60':
|
|
list_head = headers60
|
|
elif header['filtnam1'] == 'POL120':
|
|
list_head = headers120
|
|
header['exptime'] = np.sum([head['exptime'] for head in list_head])
|
|
pol_headers = [headers0[0], headers60[0], headers120[0]]
|
|
|
|
# Get image shape
|
|
shape = pol0.shape
|
|
|
|
# Construct the polarizer array
|
|
polarizer_array = np.zeros((3, shape[0], shape[1]))
|
|
polarizer_array[0] = pol0
|
|
polarizer_array[1] = pol60
|
|
polarizer_array[2] = pol120
|
|
|
|
# Define the covariance matrix for the polarizer images
|
|
# We assume cross terms are null
|
|
polarizer_cov = np.zeros((3, 3, shape[0], shape[1]))
|
|
polarizer_cov[0, 0] = err0**2
|
|
polarizer_cov[1, 1] = err60**2
|
|
polarizer_cov[2, 2] = err120**2
|
|
|
|
return polarizer_array, polarizer_cov, pol_headers
|
|
|
|
|
|
def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale='pixel', smoothing='combine', transmitcorr=True):
|
|
"""
|
|
Compute the Stokes parameters I, Q and U for a given data_set
|
|
----------
|
|
Inputs:
|
|
data_array : numpy.ndarray
|
|
Array of images (2D floats, aligned and of the same shape) of a
|
|
single observation with multiple polarizers of an instrument.
|
|
error_array : numpy.ndarray
|
|
Array of images (2D floats, aligned and of the same shape) containing
|
|
the error in each pixel of the observation images in data_array.
|
|
data_mask : numpy.ndarray
|
|
2D boolean array delimiting the data to work on.
|
|
headers : header list
|
|
List of headers corresponding to the images in data_array.
|
|
FWHM : float, optional
|
|
Full Width at Half Maximum of the detector for smoothing of the
|
|
data on each polarizer filter in scale units. If None, no smoothing
|
|
is done.
|
|
Defaults to None.
|
|
scale : str, optional
|
|
Scale units for the FWHM between 'pixel' and 'arcsec'.
|
|
Defaults to 'pixel'.
|
|
smoothing : str, optional
|
|
Smoothing algorithm to be used on the input data array.
|
|
-'combine','combining' use the N images combining algorithm with
|
|
weighted pixels (inverse square of errors).
|
|
-'gaussian' convolve any input image with a gaussian of standard
|
|
deviation stdev = FWHM/(2*sqrt(2*log(2))).
|
|
-'gaussian_after' convolve output Stokes I/Q/U with a gaussian of
|
|
standard deviation stdev = FWHM/(2*sqrt(2*log(2))).
|
|
Defaults to 'combine'. Won't be used if FWHM is None.
|
|
transmitcorr : bool, optional
|
|
Weither the images should be transmittance corrected for each filter
|
|
along the line of sight. Latest calibrated data products (.c1f) does
|
|
not require such correction.
|
|
Defaults to True.
|
|
----------
|
|
Returns:
|
|
I_stokes : numpy.ndarray
|
|
Image (2D floats) containing the Stokes parameters accounting for
|
|
total intensity
|
|
Q_stokes : numpy.ndarray
|
|
Image (2D floats) containing the Stokes parameters accounting for
|
|
vertical/horizontal linear polarization intensity
|
|
U_stokes : numpy.ndarray
|
|
Image (2D floats) containing the Stokes parameters accounting for
|
|
+45/-45deg linear polarization intensity
|
|
Stokes_cov : numpy.ndarray
|
|
Covariance matrix of the Stokes parameters I, Q, U.
|
|
"""
|
|
# Check that all images are from the same instrument
|
|
instr = headers[0]['instrume']
|
|
same_instr = np.array([instr == header['instrume'] for header in headers]).all()
|
|
if not same_instr:
|
|
raise ValueError("All images in data_array are not from the same\
|
|
instrument, cannot proceed.")
|
|
if instr not in ['FOC']:
|
|
raise ValueError("Cannot reduce images from {0:s} instrument\
|
|
(yet)".format(instr))
|
|
|
|
# Routine for the FOC instrument
|
|
if instr == 'FOC':
|
|
# Get image from each polarizer and covariance matrix
|
|
pol_array, pol_cov, pol_headers = polarizer_avg(data_array, error_array, data_mask, headers, FWHM=FWHM, scale=scale, smoothing=smoothing)
|
|
pol0, pol60, pol120 = pol_array
|
|
|
|
if (pol0 < 0.).any() or (pol60 < 0.).any() or (pol120 < 0.).any():
|
|
print("WARNING : Negative value in polarizer array.")
|
|
|
|
# Stokes parameters
|
|
# transmittance corrected
|
|
transmit = np.ones((3,)) # will be filter dependant
|
|
filt2, filt3, filt4 = headers[0]['filtnam2'], headers[0]['filtnam3'], headers[0]['filtnam4']
|
|
same_filt2 = np.array([filt2 == header['filtnam2'] for header in headers]).all()
|
|
same_filt3 = np.array([filt3 == header['filtnam3'] for header in headers]).all()
|
|
same_filt4 = np.array([filt4 == header['filtnam4'] for header in headers]).all()
|
|
if (same_filt2 and same_filt3 and same_filt4):
|
|
transmit2, transmit3, transmit4 = globals()["trans2"][filt2.lower()], globals()["trans3"][filt3.lower()], globals()["trans4"][filt4.lower()]
|
|
else:
|
|
print("WARNING : All images in data_array are not from the same \
|
|
band filter, the limiting transmittance will be taken.")
|
|
transmit2 = np.min([globals()["trans2"][header['filtnam2'].lower()] for header in headers])
|
|
transmit3 = np.min([globals()["trans3"][header['filtnam3'].lower()] for header in headers])
|
|
transmit4 = np.min([globals()["trans4"][header['filtnam4'].lower()] for header in headers])
|
|
if transmitcorr:
|
|
transmit *= transmit2*transmit3*transmit4
|
|
pol_eff = np.array([globals()["pol_efficiency"]['pol0'], globals()["pol_efficiency"]['pol60'], globals()["pol_efficiency"]['pol120']])
|
|
|
|
# Calculating correction factor
|
|
corr = np.array([1.0*h['photflam']/h['exptime'] for h in pol_headers])*pol_headers[0]['exptime']/pol_headers[0]['photflam']
|
|
|
|
# Orientation and error for each polarizer
|
|
fmax = np.finfo(np.float64).max
|
|
pol_flux = np.array([corr[0]*pol0, corr[1]*pol60, corr[2]*pol120])
|
|
|
|
coeff_stokes = np.zeros((3, 3))
|
|
# Coefficients linking each polarizer flux to each Stokes parameter
|
|
for i in range(3):
|
|
coeff_stokes[0, i] = pol_eff[(i+1) % 3]*pol_eff[(i+2) % 3]*np.sin(-2.*globals()["theta"][(i+1) % 3]+2.*globals()["theta"][(i+2) % 3])*2./transmit[i]
|
|
coeff_stokes[1, i] = (-pol_eff[(i+1) % 3]*np.sin(2.*globals()["theta"][(i+1) % 3]) +
|
|
pol_eff[(i+2) % 3]*np.sin(2.*globals()["theta"][(i+2) % 3]))*2./transmit[i]
|
|
coeff_stokes[2, i] = (pol_eff[(i+1) % 3]*np.cos(2.*globals()["theta"][(i+1) % 3]) -
|
|
pol_eff[(i+2) % 3]*np.cos(2.*globals()["theta"][(i+2) % 3]))*2./transmit[i]
|
|
|
|
# Normalization parameter for Stokes parameters computation
|
|
N = (coeff_stokes[0, :]*transmit/2.).sum()
|
|
coeff_stokes = coeff_stokes/N
|
|
I_stokes = np.zeros(pol_array[0].shape)
|
|
Q_stokes = np.zeros(pol_array[0].shape)
|
|
U_stokes = np.zeros(pol_array[0].shape)
|
|
Stokes_cov = np.zeros((3, 3, I_stokes.shape[0], I_stokes.shape[1]))
|
|
|
|
for i in range(I_stokes.shape[0]):
|
|
for j in range(I_stokes.shape[1]):
|
|
I_stokes[i, j], Q_stokes[i, j], U_stokes[i, j] = np.dot(coeff_stokes, pol_flux[:, i, j]).T
|
|
Stokes_cov[:, :, i, j] = np.dot(coeff_stokes, np.dot(pol_cov[:, :, i, j], coeff_stokes.T))
|
|
|
|
if not (FWHM is None) and (smoothing.lower() in ['weighted_gaussian_after', 'weight_gauss_after', 'gaussian_after', 'gauss_after']):
|
|
smoothing = smoothing.lower()[:-6]
|
|
Stokes_array = np.array([I_stokes, Q_stokes, U_stokes])
|
|
Stokes_error = np.array([np.sqrt(Stokes_cov[i, i]) for i in range(3)])
|
|
Stokes_headers = headers[0:3]
|
|
|
|
Stokes_array, Stokes_error = smooth_data(Stokes_array, Stokes_error, data_mask, headers=Stokes_headers, FWHM=FWHM, scale=scale, smoothing=smoothing)
|
|
|
|
I_stokes, Q_stokes, U_stokes = Stokes_array
|
|
Stokes_cov[0, 0], Stokes_cov[1, 1], Stokes_cov[2, 2] = deepcopy(Stokes_error**2)
|
|
|
|
sStokes_array = np.array([I_stokes*Q_stokes, I_stokes*U_stokes, Q_stokes*U_stokes])
|
|
sStokes_error = np.array([Stokes_cov[0, 1], Stokes_cov[0, 2], Stokes_cov[1, 2]])
|
|
uStokes_error = np.array([Stokes_cov[1, 0], Stokes_cov[2, 0], Stokes_cov[2, 1]])
|
|
|
|
sStokes_array, sStokes_error = smooth_data(sStokes_array, sStokes_error, data_mask,
|
|
headers=Stokes_headers, FWHM=FWHM, scale=scale, smoothing=smoothing)
|
|
uStokes_array, uStokes_error = smooth_data(sStokes_array, uStokes_error, data_mask,
|
|
headers=Stokes_headers, FWHM=FWHM, scale=scale, smoothing=smoothing)
|
|
|
|
Stokes_cov[0, 1], Stokes_cov[0, 2], Stokes_cov[1, 2] = deepcopy(sStokes_error)
|
|
Stokes_cov[1, 0], Stokes_cov[2, 0], Stokes_cov[2, 1] = deepcopy(uStokes_error)
|
|
|
|
mask = (Q_stokes**2 + U_stokes**2) > I_stokes**2
|
|
if mask.any():
|
|
print("WARNING : found {0:d} pixels for which I_pol > I_stokes".format(I_stokes[mask].size))
|
|
|
|
# Statistical error: Poisson noise is assumed
|
|
sigma_flux = np.array([np.sqrt(flux/head['exptime']) for flux, head in zip(pol_flux, pol_headers)])
|
|
s_I2_stat = np.sum([coeff_stokes[0, i]**2*sigma_flux[i]**2 for i in range(len(sigma_flux))], axis=0)
|
|
s_Q2_stat = np.sum([coeff_stokes[1, i]**2*sigma_flux[i]**2 for i in range(len(sigma_flux))], axis=0)
|
|
s_U2_stat = np.sum([coeff_stokes[2, i]**2*sigma_flux[i]**2 for i in range(len(sigma_flux))], axis=0)
|
|
|
|
pol_flux_corr = np.array([pf*2./t for (pf, t) in zip(pol_flux, transmit)])
|
|
coeff_stokes_corr = np.array([cs*t/2. for (cs, t) in zip(coeff_stokes.T, transmit)]).T
|
|
# Compute the derivative of each Stokes parameter with respect to the polarizer orientation
|
|
dI_dtheta1 = 2.*pol_eff[0]/N*(pol_eff[2]*np.cos(-2.*globals()["theta"][2]+2.*globals()["theta"][0])*(pol_flux_corr[1]-I_stokes) -
|
|
pol_eff[1]*np.cos(-2.*globals()["theta"][0]+2.*globals()["theta"][1])*(pol_flux_corr[2]-I_stokes) +
|
|
coeff_stokes_corr[0, 0]*(np.sin(2.*globals()["theta"][0])*Q_stokes-np.cos(2*globals()["theta"][0])*U_stokes))
|
|
dI_dtheta2 = 2.*pol_eff[1]/N*(pol_eff[0]*np.cos(-2.*globals()["theta"][0]+2.*globals()["theta"][1])*(pol_flux_corr[2]-I_stokes) -
|
|
pol_eff[2]*np.cos(-2.*globals()["theta"][1]+2.*globals()["theta"][2])*(pol_flux_corr[0]-I_stokes) +
|
|
coeff_stokes_corr[0, 1]*(np.sin(2.*globals()["theta"][1])*Q_stokes-np.cos(2*globals()["theta"][1])*U_stokes))
|
|
dI_dtheta3 = 2.*pol_eff[2]/N*(pol_eff[1]*np.cos(-2.*globals()["theta"][1]+2.*globals()["theta"][2])*(pol_flux_corr[0]-I_stokes) -
|
|
pol_eff[0]*np.cos(-2.*globals()["theta"][2]+2.*globals()["theta"][0])*(pol_flux_corr[1]-I_stokes) +
|
|
coeff_stokes_corr[0, 2]*(np.sin(2.*globals()["theta"][2])*Q_stokes-np.cos(2*globals()["theta"][2])*U_stokes))
|
|
dI_dtheta = np.array([dI_dtheta1, dI_dtheta2, dI_dtheta3])
|
|
|
|
dQ_dtheta1 = 2.*pol_eff[0]/N*(np.cos(2.*globals()["theta"][0])*(pol_flux_corr[1]-pol_flux_corr[2]) - (pol_eff[2]*np.cos(-2.*globals()
|
|
["theta"][2]+2.*globals()["theta"][0]) - pol_eff[1]*np.cos(-2.*globals()["theta"][0]+2.*globals()["theta"][1]))*Q_stokes +
|
|
coeff_stokes_corr[1, 0]*(np.sin(2.*globals()["theta"][0])*Q_stokes-np.cos(2*globals()["theta"][0])*U_stokes))
|
|
dQ_dtheta2 = 2.*pol_eff[1]/N*(np.cos(2.*globals()["theta"][1])*(pol_flux_corr[2]-pol_flux_corr[0]) - (pol_eff[0]*np.cos(-2.*globals()
|
|
["theta"][0]+2.*globals()["theta"][1]) - pol_eff[2]*np.cos(-2.*globals()["theta"][1]+2.*globals()["theta"][2]))*Q_stokes +
|
|
coeff_stokes_corr[1, 1]*(np.sin(2.*globals()["theta"][1])*Q_stokes-np.cos(2*globals()["theta"][1])*U_stokes))
|
|
dQ_dtheta3 = 2.*pol_eff[2]/N*(np.cos(2.*globals()["theta"][2])*(pol_flux_corr[0]-pol_flux_corr[1]) - (pol_eff[1]*np.cos(-2.*globals()
|
|
["theta"][1]+2.*globals()["theta"][2]) - pol_eff[0]*np.cos(-2.*globals()["theta"][2]+2.*globals()["theta"][0]))*Q_stokes +
|
|
coeff_stokes_corr[1, 2]*(np.sin(2.*globals()["theta"][2])*Q_stokes-np.cos(2*globals()["theta"][2])*U_stokes))
|
|
dQ_dtheta = np.array([dQ_dtheta1, dQ_dtheta2, dQ_dtheta3])
|
|
|
|
dU_dtheta1 = 2.*pol_eff[0]/N*(np.sin(2.*globals()["theta"][0])*(pol_flux_corr[1]-pol_flux_corr[2]) - (pol_eff[2]*np.cos(-2.*globals()
|
|
["theta"][2]+2.*globals()["theta"][0]) - pol_eff[1]*np.cos(-2.*globals()["theta"][0]+2.*globals()["theta"][1]))*U_stokes +
|
|
coeff_stokes_corr[2, 0]*(np.sin(2.*globals()["theta"][0])*Q_stokes-np.cos(2*globals()["theta"][0])*U_stokes))
|
|
dU_dtheta2 = 2.*pol_eff[1]/N*(np.sin(2.*globals()["theta"][1])*(pol_flux_corr[2]-pol_flux_corr[0]) - (pol_eff[0]*np.cos(-2.*globals()
|
|
["theta"][0]+2.*globals()["theta"][1]) - pol_eff[2]*np.cos(-2.*globals()["theta"][1]+2.*globals()["theta"][2]))*U_stokes +
|
|
coeff_stokes_corr[2, 1]*(np.sin(2.*globals()["theta"][1])*Q_stokes-np.cos(2*globals()["theta"][1])*U_stokes))
|
|
dU_dtheta3 = 2.*pol_eff[2]/N*(np.sin(2.*globals()["theta"][2])*(pol_flux_corr[0]-pol_flux_corr[1]) - (pol_eff[1]*np.cos(-2.*globals()
|
|
["theta"][1]+2.*globals()["theta"][2]) - pol_eff[0]*np.cos(-2.*globals()["theta"][2]+2.*globals()["theta"][0]))*U_stokes +
|
|
coeff_stokes_corr[2, 2]*(np.sin(2.*globals()["theta"][2])*Q_stokes-np.cos(2*globals()["theta"][2])*U_stokes))
|
|
dU_dtheta = np.array([dU_dtheta1, dU_dtheta2, dU_dtheta3])
|
|
|
|
# Compute the uncertainty associated with the polarizers' orientation (see Kishimoto 1999)
|
|
s_I2_axis = np.sum([dI_dtheta[i]**2 * globals()["sigma_theta"][i]**2 for i in range(len(globals()["sigma_theta"]))], axis=0)
|
|
s_Q2_axis = np.sum([dQ_dtheta[i]**2 * globals()["sigma_theta"][i]**2 for i in range(len(globals()["sigma_theta"]))], axis=0)
|
|
s_U2_axis = np.sum([dU_dtheta[i]**2 * globals()["sigma_theta"][i]**2 for i in range(len(globals()["sigma_theta"]))], axis=0)
|
|
# np.savetxt("output/sI_dir.txt", np.sqrt(s_I2_axis))
|
|
# np.savetxt("output/sQ_dir.txt", np.sqrt(s_Q2_axis))
|
|
# np.savetxt("output/sU_dir.txt", np.sqrt(s_U2_axis))
|
|
|
|
# Add quadratically the uncertainty to the Stokes covariance matrix
|
|
Stokes_cov[0, 0] += s_I2_axis + s_I2_stat
|
|
Stokes_cov[1, 1] += s_Q2_axis + s_Q2_stat
|
|
Stokes_cov[2, 2] += s_U2_axis + s_U2_stat
|
|
|
|
# Compute integrated values for P, PA before any rotation
|
|
mask = np.logical_and(data_mask.astype(bool), (I_stokes > 0.))
|
|
I_diluted = I_stokes[mask].sum()
|
|
Q_diluted = Q_stokes[mask].sum()
|
|
U_diluted = U_stokes[mask].sum()
|
|
I_diluted_err = np.sqrt(np.sum(Stokes_cov[0, 0][mask]))
|
|
Q_diluted_err = np.sqrt(np.sum(Stokes_cov[1, 1][mask]))
|
|
U_diluted_err = np.sqrt(np.sum(Stokes_cov[2, 2][mask]))
|
|
IQ_diluted_err = np.sqrt(np.sum(Stokes_cov[0, 1][mask]**2))
|
|
IU_diluted_err = np.sqrt(np.sum(Stokes_cov[0, 2][mask]**2))
|
|
QU_diluted_err = np.sqrt(np.sum(Stokes_cov[1, 2][mask]**2))
|
|
|
|
P_diluted = np.sqrt(Q_diluted**2+U_diluted**2)/I_diluted
|
|
P_diluted_err = (1./I_diluted)*np.sqrt((Q_diluted**2*Q_diluted_err**2 + U_diluted**2*U_diluted_err**2 + 2.*Q_diluted*U_diluted*QU_diluted_err)/(Q_diluted**2 + U_diluted**2) + ((Q_diluted/I_diluted)**2 + (U_diluted/I_diluted)**2)*I_diluted_err**2 - 2.*(Q_diluted/I_diluted)*IQ_diluted_err - 2.*(U_diluted/I_diluted)*IU_diluted_err)
|
|
|
|
PA_diluted = princ_angle((90./np.pi)*np.arctan2(U_diluted, Q_diluted))
|
|
PA_diluted_err = (90./(np.pi*(Q_diluted**2 + U_diluted**2)))*np.sqrt(U_diluted**2*Q_diluted_err**2 + Q_diluted**2*U_diluted_err**2 - 2.*Q_diluted*U_diluted*QU_diluted_err)
|
|
|
|
for header in headers:
|
|
header['P_int'] = (P_diluted, 'Integrated polarization degree')
|
|
header['P_int_err'] = (np.ceil(P_diluted_err*1000.)/1000., 'Integrated polarization degree error')
|
|
header['PA_int'] = (PA_diluted, 'Integrated polarization angle')
|
|
header['PA_int_err'] = (np.ceil(PA_diluted_err*10.)/10., 'Integrated polarization angle error')
|
|
|
|
return I_stokes, Q_stokes, U_stokes, Stokes_cov
|
|
|
|
|
|
def compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, headers):
|
|
"""
|
|
Compute the polarization degree (in %) and angle (in deg) and their
|
|
respective errors from given Stokes parameters.
|
|
----------
|
|
Inputs:
|
|
I_stokes : numpy.ndarray
|
|
Image (2D floats) containing the Stokes parameters accounting for
|
|
total intensity
|
|
Q_stokes : numpy.ndarray
|
|
Image (2D floats) containing the Stokes parameters accounting for
|
|
vertical/horizontal linear polarization intensity
|
|
U_stokes : numpy.ndarray
|
|
Image (2D floats) containing the Stokes parameters accounting for
|
|
+45/-45deg linear polarization intensity
|
|
Stokes_cov : numpy.ndarray
|
|
Covariance matrix of the Stokes parameters I, Q, U.
|
|
headers : header list
|
|
List of headers corresponding to the images in data_array.
|
|
----------
|
|
Returns:
|
|
P : numpy.ndarray
|
|
Image (2D floats) containing the polarization degree (in %).
|
|
debiased_P : numpy.ndarray
|
|
Image (2D floats) containing the debiased polarization degree (in %).
|
|
s_P : numpy.ndarray
|
|
Image (2D floats) containing the error on the polarization degree.
|
|
s_P_P : numpy.ndarray
|
|
Image (2D floats) containing the Poisson noise error on the
|
|
polarization degree.
|
|
PA : numpy.ndarray
|
|
Image (2D floats) containing the polarization angle.
|
|
s_PA : numpy.ndarray
|
|
Image (2D floats) containing the error on the polarization angle.
|
|
s_PA_P : numpy.ndarray
|
|
Image (2D floats) containing the Poisson noise error on the
|
|
polarization angle.
|
|
new_headers : header list
|
|
Updated list of headers corresponding to the reduced images accounting
|
|
for the new orientation angle.
|
|
"""
|
|
# Polarization degree and angle computation
|
|
mask = I_stokes > 0.
|
|
I_pol = np.zeros(I_stokes.shape)
|
|
I_pol[mask] = np.sqrt(Q_stokes[mask]**2 + U_stokes[mask]**2)
|
|
P = np.zeros(I_stokes.shape)
|
|
P[mask] = I_pol[mask]/I_stokes[mask]
|
|
PA = np.zeros(I_stokes.shape)
|
|
PA[mask] = (90./np.pi)*np.arctan2(U_stokes[mask], Q_stokes[mask])
|
|
|
|
if (P > 1).any():
|
|
print("WARNING : found {0:d} pixels for which P > 1".format(P[P > 1.].size))
|
|
|
|
# Associated errors
|
|
fmax = np.finfo(np.float64).max
|
|
s_P = np.ones(I_stokes.shape)*fmax
|
|
s_PA = np.ones(I_stokes.shape)*fmax
|
|
|
|
# Propagate previously computed errors
|
|
s_P[mask] = (1/I_stokes[mask])*np.sqrt((Q_stokes[mask]**2*Stokes_cov[1, 1][mask] + U_stokes[mask]**2*Stokes_cov[2, 2][mask] + 2.*Q_stokes[mask]*U_stokes[mask]*Stokes_cov[1, 2][mask])/(Q_stokes[mask]**2 + U_stokes[mask]**2) +
|
|
((Q_stokes[mask]/I_stokes[mask])**2 + (U_stokes[mask]/I_stokes[mask])**2)*Stokes_cov[0, 0][mask] - 2.*(Q_stokes[mask]/I_stokes[mask])*Stokes_cov[0, 1][mask] - 2.*(U_stokes[mask]/I_stokes[mask])*Stokes_cov[0, 2][mask])
|
|
s_PA[mask] = (90./(np.pi*(Q_stokes[mask]**2 + U_stokes[mask]**2)))*np.sqrt(U_stokes[mask]**2*Stokes_cov[1, 1][mask] +
|
|
Q_stokes[mask]**2*Stokes_cov[2, 2][mask] - 2.*Q_stokes[mask]*U_stokes[mask]*Stokes_cov[1, 2][mask])
|
|
s_P[np.isnan(s_P)] = fmax
|
|
s_PA[np.isnan(s_PA)] = fmax
|
|
|
|
# Catch expected "OverflowWarning" as wrong pixel have an overflowing error
|
|
with warnings.catch_warnings(record=True) as _:
|
|
mask2 = P**2 >= s_P**2
|
|
debiased_P = np.zeros(I_stokes.shape)
|
|
debiased_P[mask2] = np.sqrt(P[mask2]**2 - s_P[mask2]**2)
|
|
|
|
if (debiased_P > 1.).any():
|
|
print("WARNING : found {0:d} pixels for which debiased_P > 100%".format(debiased_P[debiased_P > 1.].size))
|
|
|
|
# Compute the total exposure time so that
|
|
# I_stokes*exp_tot = N_tot the total number of events
|
|
exp_tot = np.array([header['exptime'] for header in headers]).sum()
|
|
# print("Total exposure time : {} sec".format(exp_tot))
|
|
N_obs = I_stokes*exp_tot
|
|
|
|
# Errors on P, PA supposing Poisson noise
|
|
s_P_P = np.ones(I_stokes.shape)*fmax
|
|
s_P_P[mask] = np.sqrt(2.)/np.sqrt(N_obs[mask])*100.
|
|
s_PA_P = np.ones(I_stokes.shape)*fmax
|
|
s_PA_P[mask2] = s_P_P[mask2]/(2.*P[mask2])*180./np.pi
|
|
|
|
# Nan handling :
|
|
P[np.isnan(P)] = 0.
|
|
s_P[np.isnan(s_P)] = fmax
|
|
s_PA[np.isnan(s_PA)] = fmax
|
|
debiased_P[np.isnan(debiased_P)] = 0.
|
|
s_P_P[np.isnan(s_P_P)] = fmax
|
|
s_PA_P[np.isnan(s_PA_P)] = fmax
|
|
|
|
return P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P
|
|
|
|
|
|
def rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, headers, ang=None, SNRi_cut=None):
|
|
"""
|
|
Use scipy.ndimage.rotate to rotate I_stokes to an angle, and a rotation
|
|
matrix to rotate Q, U of a given angle in degrees and update header
|
|
orientation keyword.
|
|
----------
|
|
Inputs:
|
|
I_stokes : numpy.ndarray
|
|
Image (2D floats) containing the Stokes parameters accounting for
|
|
total intensity
|
|
Q_stokes : numpy.ndarray
|
|
Image (2D floats) containing the Stokes parameters accounting for
|
|
vertical/horizontal linear polarization intensity
|
|
U_stokes : numpy.ndarray
|
|
Image (2D floats) containing the Stokes parameters accounting for
|
|
+45/-45deg linear polarization intensity
|
|
Stokes_cov : numpy.ndarray
|
|
Covariance matrix of the Stokes parameters I, Q, U.
|
|
data_mask : numpy.ndarray
|
|
2D boolean array delimiting the data to work on.
|
|
headers : header list
|
|
List of headers corresponding to the reduced images.
|
|
ang : float, optional
|
|
Rotation angle (in degrees) that should be applied to the Stokes
|
|
parameters. If None, will rotate to have North up.
|
|
Defaults to None.
|
|
SNRi_cut : float, optional
|
|
Cut that should be applied to the signal-to-noise ratio on I.
|
|
Any SNR < SNRi_cut won't be displayed. If None, cut won't be applied.
|
|
Defaults to None.
|
|
----------
|
|
Returns:
|
|
new_I_stokes : numpy.ndarray
|
|
Rotated mage (2D floats) containing the rotated Stokes parameters
|
|
accounting for total intensity
|
|
new_Q_stokes : numpy.ndarray
|
|
Rotated mage (2D floats) containing the rotated Stokes parameters
|
|
accounting for vertical/horizontal linear polarization intensity
|
|
new_U_stokes : numpy.ndarray
|
|
Rotated image (2D floats) containing the rotated Stokes parameters
|
|
accounting for +45/-45deg linear polarization intensity.
|
|
new_Stokes_cov : numpy.ndarray
|
|
Updated covariance matrix of the Stokes parameters I, Q, U.
|
|
new_headers : header list
|
|
Updated list of headers corresponding to the reduced images accounting
|
|
for the new orientation angle.
|
|
new_data_mask : numpy.ndarray
|
|
Updated 2D boolean array delimiting the data to work on.
|
|
"""
|
|
# Apply cuts
|
|
if SNRi_cut is not None:
|
|
SNRi = I_stokes/np.sqrt(Stokes_cov[0, 0])
|
|
mask = SNRi < SNRi_cut
|
|
eps = 1e-5
|
|
for i in range(I_stokes.shape[0]):
|
|
for j in range(I_stokes.shape[1]):
|
|
if mask[i, j]:
|
|
I_stokes[i, j] = eps*np.sqrt(Stokes_cov[0, 0][i, j])
|
|
Q_stokes[i, j] = eps*np.sqrt(Stokes_cov[1, 1][i, j])
|
|
U_stokes[i, j] = eps*np.sqrt(Stokes_cov[2, 2][i, j])
|
|
|
|
# Rotate I_stokes, Q_stokes, U_stokes using rotation matrix
|
|
if ang is None:
|
|
ang = np.zeros((len(headers),))
|
|
for i, head in enumerate(headers):
|
|
ang[i] = -head['orientat']
|
|
ang = ang.mean()
|
|
alpha = np.pi/180.*ang
|
|
mrot = np.array([[1., 0., 0.],
|
|
[0., np.cos(2.*alpha), np.sin(2.*alpha)],
|
|
[0, -np.sin(2.*alpha), np.cos(2.*alpha)]])
|
|
|
|
old_center = np.array(I_stokes.shape)/2
|
|
shape = np.fix(np.array(I_stokes.shape)*np.sqrt(2.5)).astype(int)
|
|
new_center = np.array(shape)/2
|
|
|
|
I_stokes = zeropad(I_stokes, shape)
|
|
Q_stokes = zeropad(Q_stokes, shape)
|
|
U_stokes = zeropad(U_stokes, shape)
|
|
data_mask = zeropad(data_mask, shape)
|
|
Stokes_cov = zeropad(Stokes_cov, [*Stokes_cov.shape[:-2], *shape])
|
|
new_I_stokes = np.zeros(shape)
|
|
new_Q_stokes = np.zeros(shape)
|
|
new_U_stokes = np.zeros(shape)
|
|
new_Stokes_cov = np.zeros((*Stokes_cov.shape[:-2], *shape))
|
|
|
|
# Rotate original images using scipy.ndimage.rotate
|
|
new_I_stokes = sc_rotate(I_stokes, ang, order=1, reshape=False, cval=0.)
|
|
new_Q_stokes = sc_rotate(Q_stokes, ang, order=1, reshape=False, cval=0.)
|
|
new_U_stokes = sc_rotate(U_stokes, ang, order=1, reshape=False, cval=0.)
|
|
new_data_mask = sc_rotate(data_mask.astype(float)*10., ang, order=1, reshape=False, cval=0.)
|
|
new_data_mask[new_data_mask < 2] = 0.
|
|
new_data_mask = new_data_mask.astype(bool)
|
|
for i in range(3):
|
|
for j in range(3):
|
|
new_Stokes_cov[i, j] = sc_rotate(Stokes_cov[i, j], ang, order=1, reshape=False, cval=0.)
|
|
new_Stokes_cov[i, i] = np.abs(new_Stokes_cov[i, i])
|
|
|
|
for i in range(shape[0]):
|
|
for j in range(shape[1]):
|
|
new_I_stokes[i, j], new_Q_stokes[i, j], new_U_stokes[i, j] = np.dot(mrot, np.array([new_I_stokes[i, j], new_Q_stokes[i, j], new_U_stokes[i, j]])).T
|
|
new_Stokes_cov[:, :, i, j] = np.dot(mrot, np.dot(new_Stokes_cov[:, :, i, j], mrot.T))
|
|
|
|
# Update headers to new angle
|
|
new_headers = []
|
|
mrot = np.array([[np.cos(-alpha), -np.sin(-alpha)],
|
|
[np.sin(-alpha), np.cos(-alpha)]])
|
|
for header in headers:
|
|
new_header = deepcopy(header)
|
|
new_header['orientat'] = header['orientat'] + ang
|
|
new_wcs = WCS(header).celestial.deepcopy()
|
|
|
|
new_wcs.wcs.pc = np.dot(mrot, new_wcs.wcs.pc)
|
|
new_wcs.wcs.crpix = np.dot(mrot, new_wcs.wcs.crpix - old_center[::-1]) + new_center[::-1]
|
|
new_wcs.wcs.set()
|
|
for key, val in new_wcs.to_header().items():
|
|
new_header.set(key, val)
|
|
if new_wcs.wcs.pc[0, 0] == 1.:
|
|
new_header.set('PC1_1', 1.)
|
|
if new_wcs.wcs.pc[1, 1] == 1.:
|
|
new_header.set('PC2_2', 1.)
|
|
|
|
new_headers.append(new_header)
|
|
|
|
# Nan handling :
|
|
fmax = np.finfo(np.float64).max
|
|
|
|
new_I_stokes[np.isnan(new_I_stokes)] = 0.
|
|
new_Q_stokes[new_I_stokes == 0.] = 0.
|
|
new_U_stokes[new_I_stokes == 0.] = 0.
|
|
new_Q_stokes[np.isnan(new_Q_stokes)] = 0.
|
|
new_U_stokes[np.isnan(new_U_stokes)] = 0.
|
|
new_Stokes_cov[np.isnan(new_Stokes_cov)] = fmax
|
|
|
|
# Compute updated integrated values for P, PA
|
|
mask = deepcopy(new_data_mask).astype(bool)
|
|
I_diluted = new_I_stokes[mask].sum()
|
|
Q_diluted = new_Q_stokes[mask].sum()
|
|
U_diluted = new_U_stokes[mask].sum()
|
|
I_diluted_err = np.sqrt(np.sum(new_Stokes_cov[0, 0][mask]))
|
|
Q_diluted_err = np.sqrt(np.sum(new_Stokes_cov[1, 1][mask]))
|
|
U_diluted_err = np.sqrt(np.sum(new_Stokes_cov[2, 2][mask]))
|
|
IQ_diluted_err = np.sqrt(np.sum(new_Stokes_cov[0, 1][mask]**2))
|
|
IU_diluted_err = np.sqrt(np.sum(new_Stokes_cov[0, 2][mask]**2))
|
|
QU_diluted_err = np.sqrt(np.sum(new_Stokes_cov[1, 2][mask]**2))
|
|
|
|
P_diluted = np.sqrt(Q_diluted**2+U_diluted**2)/I_diluted
|
|
P_diluted_err = (1./I_diluted)*np.sqrt((Q_diluted**2*Q_diluted_err**2 + U_diluted**2*U_diluted_err**2 + 2.*Q_diluted*U_diluted*QU_diluted_err)/(Q_diluted**2 + U_diluted **
|
|
2) + ((Q_diluted/I_diluted)**2 + (U_diluted/I_diluted)**2)*I_diluted_err**2 - 2.*(Q_diluted/I_diluted)*IQ_diluted_err - 2.*(U_diluted/I_diluted)*IU_diluted_err)
|
|
|
|
PA_diluted = princ_angle((90./np.pi)*np.arctan2(U_diluted, Q_diluted))
|
|
PA_diluted_err = (90./(np.pi*(Q_diluted**2 + U_diluted**2)))*np.sqrt(U_diluted**2*Q_diluted_err **
|
|
2 + Q_diluted**2*U_diluted_err**2 - 2.*Q_diluted*U_diluted*QU_diluted_err)
|
|
|
|
for header in new_headers:
|
|
header['P_int'] = (P_diluted, 'Integrated polarization degree')
|
|
header['P_int_err'] = (np.ceil(P_diluted_err*1000.)/1000., 'Integrated polarization degree error')
|
|
header['PA_int'] = (PA_diluted, 'Integrated polarization angle')
|
|
header['PA_int_err'] = (np.ceil(PA_diluted_err*10.)/10., 'Integrated polarization angle error')
|
|
|
|
return new_I_stokes, new_Q_stokes, new_U_stokes, new_Stokes_cov, new_data_mask, new_headers
|
|
|
|
|
|
def rotate_data(data_array, error_array, data_mask, headers, ang):
|
|
"""
|
|
Use scipy.ndimage.rotate to rotate I_stokes to an angle, and a rotation
|
|
matrix to rotate Q, U of a given angle in degrees and update header
|
|
orientation keyword.
|
|
----------
|
|
Inputs:
|
|
data_array : numpy.ndarray
|
|
Array of images (2D floats) to be rotated by angle ang.
|
|
error_array : numpy.ndarray
|
|
Array of error associated to images in data_array.
|
|
data_mask : numpy.ndarray
|
|
2D boolean array delimiting the data to work on.
|
|
headers : header list
|
|
List of headers corresponding to the reduced images.
|
|
ang : float
|
|
Rotation angle (in degrees) that should be applied to the Stokes
|
|
parameters
|
|
----------
|
|
Returns:
|
|
new_data_array : numpy.ndarray
|
|
Updated array containing the rotated images.
|
|
new_error_array : numpy.ndarray
|
|
Updated array containing the rotated errors.
|
|
new_data_mask : numpy.ndarray
|
|
Updated 2D boolean array delimiting the data to work on.
|
|
new_headers : header list
|
|
Updated list of headers corresponding to the reduced images accounting
|
|
for the new orientation angle.
|
|
"""
|
|
# Rotate I_stokes, Q_stokes, U_stokes using rotation matrix
|
|
alpha = ang*np.pi/180.
|
|
|
|
old_center = np.array(data_array[0].shape)/2
|
|
shape = np.fix(np.array(data_array[0].shape)*np.sqrt(2.5)).astype(int)
|
|
new_center = np.array(shape)/2
|
|
|
|
data_array = zeropad(data_array, [data_array.shape[0], *shape])
|
|
error_array = zeropad(error_array, [error_array.shape[0], *shape])
|
|
data_mask = zeropad(data_mask, shape)
|
|
# Rotate original images using scipy.ndimage.rotate
|
|
new_data_array = []
|
|
new_error_array = []
|
|
for i in range(data_array.shape[0]):
|
|
new_data_array.append(sc_rotate(data_array[i], ang, order=1, reshape=False, cval=0.))
|
|
new_error_array.append(sc_rotate(error_array[i], ang, order=1, reshape=False, cval=0.))
|
|
new_data_array = np.array(new_data_array)
|
|
new_error_array = np.array(new_error_array)
|
|
new_data_mask = sc_rotate(data_mask*10., ang, order=1, reshape=False, cval=0.)
|
|
new_data_mask[new_data_mask < 2] = 0.
|
|
new_data_mask = new_data_mask.astype(bool)
|
|
|
|
for i in range(new_data_array.shape[0]):
|
|
new_data_array[i][new_data_array[i] < 0.] = 0.
|
|
|
|
# Update headers to new angle
|
|
new_headers = []
|
|
mrot = np.array([[np.cos(-alpha), -np.sin(-alpha)], [np.sin(-alpha), np.cos(-alpha)]])
|
|
for header in headers:
|
|
new_header = deepcopy(header)
|
|
new_header['orientat'] = header['orientat'] + ang
|
|
|
|
new_wcs = WCS(header).celestial.deepcopy()
|
|
|
|
new_wcs.wcs.pc[:2, :2] = np.dot(mrot, new_wcs.wcs.pc[:2, :2])
|
|
new_wcs.wcs.crpix[:2] = np.dot(mrot, new_wcs.wcs.crpix[:2] - old_center[::-1]) + new_center[::-1]
|
|
new_wcs.wcs.set()
|
|
for key, val in new_wcs.to_header().items():
|
|
new_header[key] = val
|
|
|
|
new_headers.append(new_header)
|
|
globals()['theta'] = globals()["theta"] - alpha
|
|
|
|
return new_data_array, new_error_array, new_data_mask, new_headers |