Put I,Q,U into single Stokes matrix

This commit is contained in:
2025-08-05 18:56:45 +02:00
parent 8b4c7c38f2
commit 20280e7226
4 changed files with 148 additions and 255 deletions

View File

@@ -46,6 +46,7 @@ import matplotlib.pyplot as plt
import numpy as np
from astropy import log
from astropy.wcs import WCS
from astropy.wcs.utils import add_stokes_axis_to_wcs
from matplotlib.colors import LogNorm
from matplotlib.patches import Rectangle
from scipy.ndimage import rotate as sc_rotate
@@ -1182,15 +1183,8 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
Defaults to True.
----------
Returns:
I_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
total intensity
Q_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
vertical/horizontal linear polarization intensity
U_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
+45/-45deg linear polarization intensity
Stokes : numpy.ndarray
Image (2D floats) containing the Stokes I,Q,U,V flux
Stokes_cov : numpy.ndarray
Covariance matrix of the Stokes parameters I, Q, U.
"""
@@ -1269,28 +1263,26 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
N = (coeff_stokes[0, :] * transmit / 2.0).sum()
coeff_stokes = coeff_stokes / N
coeff_stokes_corr = np.array([cs * t / 2.0 for (cs, t) in zip(coeff_stokes.T, transmit)]).T
I_stokes = np.zeros(pol_array[0].shape)
Q_stokes = np.zeros(pol_array[0].shape)
U_stokes = np.zeros(pol_array[0].shape)
Stokes_cov = np.zeros((3, 3, I_stokes.shape[0], I_stokes.shape[1]))
Stokes = np.zeros((4, pol_array[0].shape[0], pol_array[0].shape[1]))
Stokes_cov = np.zeros((4, 4, Stokes.shape[1], Stokes.shape[2]))
for i in range(I_stokes.shape[0]):
for j in range(I_stokes.shape[1]):
I_stokes[i, j], Q_stokes[i, j], U_stokes[i, j] = np.dot(coeff_stokes, pol_flux[:, i, j]).T
Stokes_cov[:, :, i, j] = np.dot(coeff_stokes, np.dot(pol_cov[:, :, i, j], coeff_stokes.T))
for i in range(Stokes.shape[1]):
for j in range(Stokes.shape[2]):
Stokes[:3, i, j] = np.dot(coeff_stokes, pol_flux[:, i, j]).T
Stokes_cov[:3, :3, i, j] = np.dot(coeff_stokes, np.dot(pol_cov[:, :, i, j], coeff_stokes.T))
if (FWHM is not None) and (smoothing.lower() in ["weighted_gaussian_after", "weight_gauss_after", "gaussian_after", "gauss_after"]):
smoothing = smoothing.lower()[:-6]
Stokes_array = np.array([I_stokes, Q_stokes, U_stokes])
Stokes_array = deepcopy(Stokes[:3])
Stokes_error = np.array([np.sqrt(Stokes_cov[i, i]) for i in range(3)])
Stokes_headers = headers[0:3]
Stokes_array, Stokes_error = smooth_data(Stokes_array, Stokes_error, data_mask, headers=Stokes_headers, FWHM=FWHM, scale=scale, smoothing=smoothing)
I_stokes, Q_stokes, U_stokes = Stokes_array
Stokes[:3] = deepcopy(Stokes_array)
Stokes_cov[0, 0], Stokes_cov[1, 1], Stokes_cov[2, 2] = deepcopy(Stokes_error**2)
sStokes_array = np.array([I_stokes * Q_stokes, I_stokes * U_stokes, Q_stokes * U_stokes])
sStokes_array = np.array([Stokes[0, 1], Stokes[0, 2], Stokes[1, 2]])
sStokes_error = np.array([Stokes_cov[0, 1], Stokes_cov[0, 2], Stokes_cov[1, 2]])
uStokes_error = np.array([Stokes_cov[1, 0], Stokes_cov[2, 0], Stokes_cov[2, 1]])
@@ -1304,14 +1296,14 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
Stokes_cov[0, 1], Stokes_cov[0, 2], Stokes_cov[1, 2] = deepcopy(sStokes_error)
Stokes_cov[1, 0], Stokes_cov[2, 0], Stokes_cov[2, 1] = deepcopy(uStokes_error)
mask = (Q_stokes**2 + U_stokes**2) > I_stokes**2
mask = (Stokes[1] ** 2 + Stokes[2] ** 2) > Stokes[0] ** 2
if mask.any():
print("WARNING : found {0:d} pixels for which I_pol > I_stokes".format(I_stokes[mask].size))
print("WARNING : found {0:d} pixels for which I_pol > I_stokes".format(mask.sum()))
# Statistical error: Poisson noise is assumed
sigma_flux = np.array([np.sqrt(flux / head["exptime"]) for flux, head in zip(pol_flux, pol_headers)])
s_IQU_stat = np.zeros(Stokes_cov.shape)
for i in range(Stokes_cov.shape[0]):
for i in range(3):
s_IQU_stat[i, i] = np.sum([coeff_stokes[i, k] ** 2 * sigma_flux[k] ** 2 for k in range(len(sigma_flux))], axis=0)
for j in [k for k in range(3) if k > i]:
s_IQU_stat[i, j] = np.sum([coeff_stokes[i, k] * coeff_stokes[j, k] * sigma_flux[k] ** 2 for k in range(len(sigma_flux))], axis=0)
@@ -1327,13 +1319,13 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
* pol_eff[j]
/ N
* (
pol_eff[(j + 2) % 3] * np.cos(-2.0 * theta[(j + 2) % 3] + 2.0 * theta[j]) * (pol_flux_corr[(j + 1) % 3] - I_stokes)
- pol_eff[(j + 1) % 3] * np.cos(-2.0 * theta[j] + 2.0 * theta[(j + 1) % 3]) * (pol_flux_corr[(j + 2) % 3] - I_stokes)
+ coeff_stokes_corr[0, j] * (np.sin(2.0 * theta[j]) * Q_stokes - np.cos(2 * theta[j]) * U_stokes)
pol_eff[(j + 2) % 3] * np.cos(-2.0 * theta[(j + 2) % 3] + 2.0 * theta[j]) * (pol_flux_corr[(j + 1) % 3] - Stokes[0])
- pol_eff[(j + 1) % 3] * np.cos(-2.0 * theta[j] + 2.0 * theta[(j + 1) % 3]) * (pol_flux_corr[(j + 2) % 3] - Stokes[0])
+ coeff_stokes_corr[0, j] * (np.sin(2.0 * theta[j]) * Stokes[1] - np.cos(2 * theta[j]) * Stokes[2])
)
)
# Derivative of Q_stokes wrt theta_1, 2, 3
# Derivative of Stokes[1] wrt theta_1, 2, 3
for j in range(3):
dIQU_dtheta[1, j] = (
2.0
@@ -1345,12 +1337,12 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
pol_eff[(j + 2) % 3] * np.cos(-2.0 * theta[(j + 2) % 3] + 2.0 * theta[j])
- pol_eff[(j + 1) % 3] * np.cos(-2.0 * theta[j] + 2.0 * theta[(j + 1) % 3])
)
* Q_stokes
+ coeff_stokes_corr[1, j] * (np.sin(2.0 * theta[j]) * Q_stokes - np.cos(2 * theta[j]) * U_stokes)
* Stokes[1]
+ coeff_stokes_corr[1, j] * (np.sin(2.0 * theta[j]) * Stokes[1] - np.cos(2 * theta[j]) * Stokes[2])
)
)
# Derivative of U_stokes wrt theta_1, 2, 3
# Derivative of Stokes[2] wrt theta_1, 2, 3
for j in range(3):
dIQU_dtheta[2, j] = (
2.0
@@ -1362,14 +1354,14 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
pol_eff[(j + 2) % 3] * np.cos(-2.0 * theta[(j + 2) % 3] + 2.0 * theta[j])
- pol_eff[(j + 1) % 3] * np.cos(-2.0 * theta[j] + 2.0 * theta[(j + 1) % 3])
)
* U_stokes
+ coeff_stokes_corr[2, j] * (np.sin(2.0 * theta[j]) * Q_stokes - np.cos(2 * theta[j]) * U_stokes)
* Stokes[2]
+ coeff_stokes_corr[2, j] * (np.sin(2.0 * theta[j]) * Stokes[1] - np.cos(2 * theta[j]) * Stokes[2])
)
)
# Compute the uncertainty associated with the polarizers' orientation (see Kishimoto 1999)
s_IQU_axis = np.zeros(Stokes_cov.shape)
for i in range(Stokes_cov.shape[0]):
for i in range(3):
s_IQU_axis[i, i] = np.sum([dIQU_dtheta[i, k] ** 2 * globals()["sigma_theta"][k] ** 2 for k in range(len(globals()["sigma_theta"]))], axis=0)
for j in [k for k in range(3) if k > i]:
s_IQU_axis[i, j] = np.sum(
@@ -1386,15 +1378,13 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
header_stokes = pol_headers[0]
else:
all_I_stokes = np.zeros((np.unique(rotate).size, data_array.shape[1], data_array.shape[2]))
all_Q_stokes = np.zeros((np.unique(rotate).size, data_array.shape[1], data_array.shape[2]))
all_U_stokes = np.zeros((np.unique(rotate).size, data_array.shape[1], data_array.shape[2]))
all_Stokes_cov = np.zeros((np.unique(rotate).size, 3, 3, data_array.shape[1], data_array.shape[2]))
all_Stokes = np.zeros((np.unique(rotate).size, 4, data_array.shape[1], data_array.shape[2]))
all_Stokes_cov = np.zeros((np.unique(rotate).size, 4, 4, data_array.shape[1], data_array.shape[2]))
all_header_stokes = [{}] * np.unique(rotate).size
for i, rot in enumerate(np.unique(rotate)):
rot_mask = rotate == rot
all_I_stokes[i], all_Q_stokes[i], all_U_stokes[i], all_Stokes_cov[i], all_header_stokes[i] = compute_Stokes(
all_Stokes[i], all_Stokes_cov[i], all_header_stokes[i] = compute_Stokes(
data_array[rot_mask],
error_array[rot_mask],
data_mask,
@@ -1407,10 +1397,8 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
)
all_exp = np.array([float(h["exptime"]) for h in all_header_stokes])
I_stokes = np.sum([exp * I for exp, I in zip(all_exp, all_I_stokes)], axis=0) / all_exp.sum()
Q_stokes = np.sum([exp * Q for exp, Q in zip(all_exp, all_Q_stokes)], axis=0) / all_exp.sum()
U_stokes = np.sum([exp * U for exp, U in zip(all_exp, all_U_stokes)], axis=0) / all_exp.sum()
Stokes_cov = np.zeros((3, 3, I_stokes.shape[0], I_stokes.shape[1]))
Stokes = np.sum([exp * S for exp, S in zip(all_exp, all_Stokes)], axis=0) / all_exp.sum()
Stokes_cov = np.zeros((4, 4, Stokes.shape[1], Stokes.shape[2]))
for i in range(3):
Stokes_cov[i, i] = np.sum([exp**2 * cov for exp, cov in zip(all_exp, all_Stokes_cov[:, i, i])], axis=0) / all_exp.sum() ** 2
for j in [x for x in range(3) if x != i]:
@@ -1424,19 +1412,19 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
# Nan handling :
fmax = np.finfo(np.float64).max
I_stokes[np.isnan(I_stokes)] = 0.0
Q_stokes[I_stokes == 0.0] = 0.0
U_stokes[I_stokes == 0.0] = 0.0
Q_stokes[np.isnan(Q_stokes)] = 0.0
U_stokes[np.isnan(U_stokes)] = 0.0
Stokes[np.isnan(Stokes)] = 0.0
Stokes[1:][np.broadcast_to(Stokes[0] == 0.0, Stokes[1:].shape)] = 0.0
Stokes_cov[np.isnan(Stokes_cov)] = fmax
wcs_Stokes = add_stokes_axis_to_wcs(WCS(header_stokes), 0)
wcs_Stokes.array_shape = (4, *Stokes.shape[1:])[::-1]
header_stokes["NAXIS1"], header_stokes["NAXIS2"], header_stokes["NAXIS3"] = wcs_Stokes.array_shape[::-1]
for key, val in list(wcs_Stokes.to_header().items()) + list(zip(["PC1_1", "PC1_2", "PC1_3", "PC2_1", "PC3_1", "CUNIT1"], [1, 0, 0, 0, 0, "STOKES"])):
header_stokes[key] = val
if integrate:
# Compute integrated values for P, PA before any rotation
mask = deepcopy(data_mask).astype(bool)
I_diluted = I_stokes[mask].sum()
Q_diluted = Q_stokes[mask].sum()
U_diluted = U_stokes[mask].sum()
I_diluted, Q_diluted, U_diluted = (Stokes[:3] * np.broadcast_to(mask, Stokes[:3].shape)).sum(axis=(1, 2))
I_diluted_err = np.sqrt(np.sum(Stokes_cov[0, 0][mask]))
Q_diluted_err = np.sqrt(np.sum(Stokes_cov[1, 1][mask]))
U_diluted_err = np.sqrt(np.sum(Stokes_cov[2, 2][mask]))
@@ -1462,26 +1450,19 @@ def compute_Stokes(data_array, error_array, data_mask, headers, FWHM=None, scale
header_stokes["PA_int"] = (PA_diluted, "Integrated polarization angle")
header_stokes["sPA_int"] = (np.ceil(PA_diluted_err * 10.0) / 10.0, "Integrated polarization angle error")
return I_stokes, Q_stokes, U_stokes, Stokes_cov, header_stokes, s_IQU_stat
return Stokes, Stokes_cov, header_stokes, s_IQU_stat
def compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, header_stokes, s_IQU_stat=None):
def compute_pol(Stokes, Stokes_cov, header_stokes, s_IQU_stat=None):
"""
Compute the polarization degree (in %) and angle (in deg) and their
respective errors from given Stokes parameters.
----------
Inputs:
I_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
total intensity
Q_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
vertical/horizontal linear polarization intensity
U_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
+45/-45deg linear polarization intensity
Stokes : numpy.ndarray
Image (2D floats) containing the Stokes I,Q,U,V fluxes
Stokes_cov : numpy.ndarray
Covariance matrix of the Stokes parameters I, Q, U.
Covariance matrix of the Stokes parameters I, Q, U, V.
header_stokes : astropy.fits.header.Header
Header file associated with the Stokes fluxes.
----------
@@ -1504,49 +1485,49 @@ def compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, header_stokes, s_IQU_s
polarization angle.
"""
# Polarization degree and angle computation
mask = I_stokes > 0.0
I_pol = np.zeros(I_stokes.shape)
I_pol[mask] = np.sqrt(Q_stokes[mask] ** 2 + U_stokes[mask] ** 2)
P = np.zeros(I_stokes.shape)
P[mask] = I_pol[mask] / I_stokes[mask]
PA = np.zeros(I_stokes.shape)
PA[mask] = (90.0 / np.pi) * np.arctan2(U_stokes[mask], Q_stokes[mask])
mask = Stokes[0] > 0.0
I_pol = np.zeros(Stokes[0].shape)
I_pol[mask] = np.sqrt(Stokes[1][mask] ** 2 + Stokes[2][mask] ** 2)
P = np.zeros(Stokes[0].shape)
P[mask] = I_pol[mask] / Stokes[0][mask]
PA = np.zeros(Stokes[0].shape)
PA[mask] = (90.0 / np.pi) * np.arctan2(Stokes[2][mask], Stokes[1][mask])
if (P > 1).any():
print("WARNING : found {0:d} pixels for which P > 1".format(P[P > 1.0].size))
# Associated errors
fmax = np.finfo(np.float64).max
s_P = np.ones(I_stokes.shape) * fmax
s_PA = np.ones(I_stokes.shape) * fmax
s_P = np.ones(Stokes[0].shape) * fmax
s_PA = np.ones(Stokes[0].shape) * fmax
# Propagate previously computed errors
s_P[mask] = (1 / I_stokes[mask]) * np.sqrt(
s_P[mask] = (1 / Stokes[0][mask]) * np.sqrt(
(
Q_stokes[mask] ** 2 * Stokes_cov[1, 1][mask]
+ U_stokes[mask] ** 2 * Stokes_cov[2, 2][mask]
+ 2.0 * Q_stokes[mask] * U_stokes[mask] * Stokes_cov[1, 2][mask]
Stokes[1][mask] ** 2 * Stokes_cov[1, 1][mask]
+ Stokes[2][mask] ** 2 * Stokes_cov[2, 2][mask]
+ 2.0 * Stokes[1][mask] * Stokes[2][mask] * Stokes_cov[1, 2][mask]
)
/ (Q_stokes[mask] ** 2 + U_stokes[mask] ** 2)
+ ((Q_stokes[mask] / I_stokes[mask]) ** 2 + (U_stokes[mask] / I_stokes[mask]) ** 2) * Stokes_cov[0, 0][mask]
- 2.0 * (Q_stokes[mask] / I_stokes[mask]) * Stokes_cov[0, 1][mask]
- 2.0 * (U_stokes[mask] / I_stokes[mask]) * Stokes_cov[0, 2][mask]
/ (Stokes[1][mask] ** 2 + Stokes[2][mask] ** 2)
+ ((Stokes[1][mask] / Stokes[0][mask]) ** 2 + (Stokes[2][mask] / Stokes[0][mask]) ** 2) * Stokes_cov[0, 0][mask]
- 2.0 * (Stokes[1][mask] / Stokes[0][mask]) * Stokes_cov[0, 1][mask]
- 2.0 * (Stokes[2][mask] / Stokes[0][mask]) * Stokes_cov[0, 2][mask]
)
s_PA[mask] = (90.0 / (np.pi * (Q_stokes[mask] ** 2 + U_stokes[mask] ** 2))) * np.sqrt(
U_stokes[mask] ** 2 * Stokes_cov[1, 1][mask]
+ Q_stokes[mask] ** 2 * Stokes_cov[2, 2][mask]
- 2.0 * Q_stokes[mask] * U_stokes[mask] * Stokes_cov[1, 2][mask]
s_PA[mask] = (90.0 / (np.pi * (Stokes[1][mask] ** 2 + Stokes[2][mask] ** 2))) * np.sqrt(
Stokes[2][mask] ** 2 * Stokes_cov[1, 1][mask]
+ Stokes[1][mask] ** 2 * Stokes_cov[2, 2][mask]
- 2.0 * Stokes[1][mask] * Stokes[2][mask] * Stokes_cov[1, 2][mask]
)
s_P[np.isnan(s_P)] = fmax
s_PA[np.isnan(s_PA)] = fmax
# Compute the total exposure time so that
# I_stokes*exp_tot = N_tot the total number of events
N_obs = I_stokes * float(header_stokes["exptime"])
# Stokes[0]*exp_tot = N_tot the total number of events
N_obs = Stokes[0] * float(header_stokes["exptime"])
# Errors on P, PA supposing Poisson noise
s_P_P = np.ones(I_stokes.shape) * fmax
s_PA_P = np.ones(I_stokes.shape) * fmax
s_P_P = np.ones(Stokes[0].shape) * fmax
s_PA_P = np.ones(Stokes[0].shape) * fmax
maskP = np.logical_and(mask, P > 0.0)
if s_IQU_stat is not None:
# If IQU covariance matrix containing only statistical error is given propagate to P and PA
@@ -1554,25 +1535,25 @@ def compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, header_stokes, s_IQU_s
with warnings.catch_warnings(record=True) as _:
s_P_P[maskP] = (
P[maskP]
/ I_stokes[maskP]
/ Stokes[0][maskP]
* np.sqrt(
s_IQU_stat[0, 0][maskP]
- 2.0 / (I_stokes[maskP] * P[maskP] ** 2) * (Q_stokes[maskP] * s_IQU_stat[0, 1][maskP] + U_stokes[maskP] * s_IQU_stat[0, 2][maskP])
- 2.0 / (Stokes[0][maskP] * P[maskP] ** 2) * (Stokes[1][maskP] * s_IQU_stat[0, 1][maskP] + Stokes[2][maskP] * s_IQU_stat[0, 2][maskP])
+ 1.0
/ (I_stokes[maskP] ** 2 * P[maskP] ** 4)
/ (Stokes[0][maskP] ** 2 * P[maskP] ** 4)
* (
Q_stokes[maskP] ** 2 * s_IQU_stat[1, 1][maskP]
+ U_stokes[maskP] ** 2 * s_IQU_stat[2, 2][maskP] * Q_stokes[maskP] * U_stokes[maskP] * s_IQU_stat[1, 2][maskP]
Stokes[1][maskP] ** 2 * s_IQU_stat[1, 1][maskP]
+ Stokes[2][maskP] ** 2 * s_IQU_stat[2, 2][maskP] * Stokes[1][maskP] * Stokes[2][maskP] * s_IQU_stat[1, 2][maskP]
)
)
)
s_PA_P[maskP] = (
90.0
/ (np.pi * I_stokes[maskP] ** 2 * P[maskP] ** 2)
/ (np.pi * Stokes[0][maskP] ** 2 * P[maskP] ** 2)
* (
Q_stokes[maskP] ** 2 * s_IQU_stat[2, 2][maskP]
+ U_stokes[maskP] * s_IQU_stat[1, 1][maskP]
- 2.0 * Q_stokes[maskP] * U_stokes[maskP] * s_IQU_stat[1, 2][maskP]
Stokes[1][maskP] ** 2 * s_IQU_stat[2, 2][maskP]
+ Stokes[2][maskP] * s_IQU_stat[1, 1][maskP]
- 2.0 * Stokes[1][maskP] * Stokes[2][maskP] * s_IQU_stat[1, 2][maskP]
)
)
else:
@@ -1583,7 +1564,7 @@ def compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, header_stokes, s_IQU_s
# Catch expected "OverflowWarning" as wrong pixel have an overflowing error
with warnings.catch_warnings(record=True) as _:
mask2 = P**2 >= s_P_P**2
debiased_P = np.zeros(I_stokes.shape)
debiased_P = np.zeros(Stokes[0].shape)
debiased_P[mask2] = np.sqrt(P[mask2] ** 2 - s_P_P[mask2] ** 2)
if (debiased_P > 1.0).any():
@@ -1600,24 +1581,17 @@ def compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, header_stokes, s_IQU_s
return P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P
def rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, header_stokes, s_IQU_stat=None, SNRi_cut=None):
def rotate_Stokes(Stokes, Stokes_cov, data_mask, header_stokes, s_IQU_stat=None, SNRi_cut=None):
"""
Use scipy.ndimage.rotate to rotate I_stokes to an angle, and a rotation
matrix to rotate Q, U of a given angle in degrees and update header
orientation keyword.
----------
Inputs:
I_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
total intensity
Q_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
vertical/horizontal linear polarization intensity
U_stokes : numpy.ndarray
Image (2D floats) containing the Stokes parameters accounting for
+45/-45deg linear polarization intensity
Stokes : numpy.ndarray
Stokes cube (3D floats) containing the Stokes I, Q, U, V fluxes.
Stokes_cov : numpy.ndarray
Covariance matrix of the Stokes parameters I, Q, U.
Covariance matrix of the Stokes parameters I, Q, U, V.
data_mask : numpy.ndarray
2D boolean array delimiting the data to work on.
header_stokes : astropy.fits.header.Header
@@ -1628,17 +1602,10 @@ def rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, header_st
Defaults to None.
----------
Returns:
new_I_stokes : numpy.ndarray
Rotated mage (2D floats) containing the rotated Stokes parameters
accounting for total intensity
new_Q_stokes : numpy.ndarray
Rotated mage (2D floats) containing the rotated Stokes parameters
accounting for vertical/horizontal linear polarization intensity
new_U_stokes : numpy.ndarray
Rotated image (2D floats) containing the rotated Stokes parameters
accounting for +45/-45deg linear polarization intensity.
Stokes : numpy.ndarray
Rotated Stokes cube (3D floats) containing the rotated Stokes I, Q, U, V fluxes.
new_Stokes_cov : numpy.ndarray
Updated covariance matrix of the Stokes parameters I, Q, U.
Updated covariance matrix of the Stokes parameters I, Q, U, V.
new_header_stokes : astropy.fits.header.Header
Updated Header file associated with the Stokes fluxes accounting
for the new orientation angle.
@@ -1647,51 +1614,38 @@ def rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, header_st
"""
# Apply cuts
if SNRi_cut is not None:
SNRi = I_stokes / np.sqrt(Stokes_cov[0, 0])
SNRi = Stokes[0] / np.sqrt(Stokes_cov[0, 0])
mask = SNRi < SNRi_cut
eps = 1e-5
for i in range(I_stokes.shape[0]):
for j in range(I_stokes.shape[1]):
if mask[i, j]:
I_stokes[i, j] = eps * np.sqrt(Stokes_cov[0, 0][i, j])
Q_stokes[i, j] = eps * np.sqrt(Stokes_cov[1, 1][i, j])
U_stokes[i, j] = eps * np.sqrt(Stokes_cov[2, 2][i, j])
for i in range(4):
Stokes[i][mask] = eps * np.sqrt(Stokes_cov[i, i][mask])
# Rotate I_stokes, Q_stokes, U_stokes using rotation matrix
# Rotate Stokes I, Q, U using rotation matrix
ang = -float(header_stokes["ORIENTAT"])
alpha = np.pi / 180.0 * ang
mrot = np.array([[1.0, 0.0, 0.0], [0.0, np.cos(2.0 * alpha), np.sin(2.0 * alpha)], [0, -np.sin(2.0 * alpha), np.cos(2.0 * alpha)]])
old_center = np.array(I_stokes.shape) / 2
shape = np.fix(np.array(I_stokes.shape) * np.sqrt(2.5)).astype(int)
old_center = np.array(Stokes.shape[1:]) / 2
shape = np.fix(np.array(Stokes.shape[1:]) * np.sqrt(2.5)).astype(int)
new_center = np.array(shape) / 2
I_stokes = zeropad(I_stokes, shape)
Q_stokes = zeropad(Q_stokes, shape)
U_stokes = zeropad(U_stokes, shape)
Stokes = zeropad(Stokes, (*Stokes.shape[:-2], *shape))
data_mask = zeropad(data_mask, shape)
Stokes_cov = zeropad(Stokes_cov, [*Stokes_cov.shape[:-2], *shape])
new_I_stokes = np.zeros(shape)
new_Q_stokes = np.zeros(shape)
new_U_stokes = np.zeros(shape)
Stokes_cov = zeropad(Stokes_cov, (*Stokes_cov.shape[:-2], *shape))
new_Stokes = np.zeros((*Stokes.shape[:-2], *shape))
new_Stokes_cov = np.zeros((*Stokes_cov.shape[:-2], *shape))
# Rotate original images using scipy.ndimage.rotate
new_I_stokes = sc_rotate(I_stokes, ang, order=1, reshape=False, cval=0.0)
new_Q_stokes = sc_rotate(Q_stokes, ang, order=1, reshape=False, cval=0.0)
new_U_stokes = sc_rotate(U_stokes, ang, order=1, reshape=False, cval=0.0)
new_Stokes = sc_rotate(Stokes, ang, axes=(1, 2), order=1, reshape=False, cval=0.0)
new_data_mask = sc_rotate(data_mask.astype(float) * 10.0, ang, order=1, reshape=False, cval=0.0)
new_data_mask[new_data_mask < 1.0] = 0.0
new_data_mask = new_data_mask.astype(bool)
for i in range(3):
for j in range(3):
new_Stokes_cov[i, j] = sc_rotate(Stokes_cov[i, j], ang, order=1, reshape=False, cval=0.0)
new_Stokes_cov[i, i] = np.abs(new_Stokes_cov[i, i])
new_Stokes_cov = np.abs(sc_rotate(Stokes_cov, ang, axes=(2, 3), order=1, reshape=False, cval=0.0))
for i in range(shape[0]):
for j in range(shape[1]):
new_I_stokes[i, j], new_Q_stokes[i, j], new_U_stokes[i, j] = np.dot(mrot, np.array([new_I_stokes[i, j], new_Q_stokes[i, j], new_U_stokes[i, j]])).T
new_Stokes_cov[:, :, i, j] = np.dot(mrot, np.dot(new_Stokes_cov[:, :, i, j], mrot.T))
new_Stokes[:3, i, j] = np.dot(mrot, new_Stokes[:3, i, j])
new_Stokes_cov[:3, :3, i, j] = np.dot(mrot, np.dot(new_Stokes_cov[:3, :3, i, j], mrot.T))
if s_IQU_stat is not None:
s_IQU_stat = zeropad(s_IQU_stat, [*s_IQU_stat.shape[:-2], *shape])
@@ -1702,16 +1656,16 @@ def rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, header_st
new_s_IQU_stat[i, i] = np.abs(new_s_IQU_stat[i, i])
for i in range(shape[0]):
for j in range(shape[1]):
new_s_IQU_stat[:, :, i, j] = np.dot(mrot, np.dot(new_s_IQU_stat[:, :, i, j], mrot.T))
new_s_IQU_stat[:3, :3, i, j] = np.dot(mrot, np.dot(new_s_IQU_stat[:3, :3, i, j], mrot.T))
# Update headers to new angle
mrot = np.array([[np.cos(-alpha), -np.sin(-alpha)], [np.sin(-alpha), np.cos(-alpha)]])
new_header_stokes = deepcopy(header_stokes)
new_wcs = WCS(header_stokes).celestial.deepcopy()
new_wcs = WCS(header_stokes).deepcopy()
new_wcs.wcs.pc = np.dot(mrot, new_wcs.wcs.pc)
new_wcs.wcs.crpix = np.dot(mrot, new_wcs.wcs.crpix - old_center[::-1]) + new_center[::-1]
new_wcs.wcs.pc[1:] = np.dot(mrot, new_wcs.wcs.pc[1:])
new_wcs.wcs.crpix[1:] = np.dot(mrot, new_wcs.wcs.crpix[1:] - old_center[::-1]) + new_center[::-1]
new_wcs.wcs.set()
for key, val in new_wcs.to_header().items():
new_header_stokes.set(key, val)
@@ -1720,18 +1674,13 @@ def rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, header_st
# Nan handling :
fmax = np.finfo(np.float64).max
new_I_stokes[np.isnan(new_I_stokes)] = 0.0
new_Q_stokes[new_I_stokes == 0.0] = 0.0
new_U_stokes[new_I_stokes == 0.0] = 0.0
new_Q_stokes[np.isnan(new_Q_stokes)] = 0.0
new_U_stokes[np.isnan(new_U_stokes)] = 0.0
new_Stokes[np.isnan(new_Stokes)] = 0.0
new_Stokes[1:][np.broadcast_to(new_Stokes[0] == 0.0, Stokes[1:].shape)] = 0.0
new_Stokes_cov[np.isnan(new_Stokes_cov)] = fmax
# Compute updated integrated values for P, PA
mask = deepcopy(new_data_mask).astype(bool)
I_diluted = new_I_stokes[mask].sum()
Q_diluted = new_Q_stokes[mask].sum()
U_diluted = new_U_stokes[mask].sum()
I_diluted, Q_diluted, U_diluted = (new_Stokes[:3] * np.broadcast_to(mask, Stokes[:3].shape)).sum(axis=(1, 2))
I_diluted_err = np.sqrt(np.sum(new_Stokes_cov[0, 0][mask]))
Q_diluted_err = np.sqrt(np.sum(new_Stokes_cov[1, 1][mask]))
U_diluted_err = np.sqrt(np.sum(new_Stokes_cov[2, 2][mask]))
@@ -1758,9 +1707,9 @@ def rotate_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, header_st
new_header_stokes["sPA_int"] = (np.ceil(PA_diluted_err * 10.0) / 10.0, "Integrated polarization angle error")
if s_IQU_stat is not None:
return new_I_stokes, new_Q_stokes, new_U_stokes, new_Stokes_cov, new_data_mask, new_header_stokes, new_s_IQU_stat
return new_Stokes, new_Stokes_cov, new_data_mask, new_header_stokes, new_s_IQU_stat
else:
return new_I_stokes, new_Q_stokes, new_U_stokes, new_Stokes_cov, new_data_mask, new_header_stokes
return new_Stokes, new_Stokes_cov, new_data_mask, new_header_stokes
def rotate_data(data_array, error_array, data_mask, headers):