Files
FOC_Reduction/package/lib/utils.py

325 lines
12 KiB
Python
Executable File

import numpy as np
from matplotlib.transforms import Bbox, BboxTransform
def rot2D(ang):
"""
Return the 2D rotation matrix of given angle in degrees
----------
Inputs:
ang : float
Angle in degrees
----------
Returns:
rot_mat : numpy.ndarray
2D matrix of shape (2,2) to perform vector rotation at angle "ang".
"""
alpha = np.pi * ang / 180
return np.array([[np.cos(alpha), np.sin(alpha)], [-np.sin(alpha), np.cos(alpha)]])
def princ_angle(ang):
"""
Return the principal angle in the 0° to 180° quadrant as PA is always
defined at p/m 180°.
----------
Inputs:
ang : float, numpy.ndarray
Angle in degrees. Can be an array of angles.
----------
Returns:
princ_ang : float, numpy.ndarray
Principal angle in the 0°-180° quadrant in the same shape as input.
"""
if not isinstance(ang, np.ndarray):
A = np.array([ang])
else:
A = np.array(ang)
while np.any(A < 0.0):
A[A < 0.0] = A[A < 0.0] + 360.0
while np.any(A >= 180.0):
A[A >= 180.0] = A[A >= 180.0] - 180.0
if type(ang) is type(A):
return A
else:
return A[0]
def PCconf(QN, UN, QN_ERR, UN_ERR):
"""
Compute the confidence level for 2 parameters polarisation degree and
polarisation angle from the PCUBE analysis.
----------
Inputs:
QN : float, numpy.ndarray
Normalized Q Stokes flux.
UN : float, numpy.ndarray
Normalized U Stokes flux.
QN_ERR : float, numpy.ndarray
Normalized error on Q Stokes flux.
UN_ERR : float, numpy.ndarray
Normalized error on U Stokes flux.
----------
Returns:
conf : numpy.ndarray
2D matrix of same shape as input containing the confidence on the polarization
computation between 0 and 1 for 2 parameters of interest (Q and U Stokes fluxes).
"""
mask = np.logical_and(QN_ERR > 0.0, UN_ERR > 0.0)
conf = np.full(QN.shape, -1.0)
chi2 = QN**2 / QN_ERR**2 + UN**2 / UN_ERR**2
conf[mask] = 1.0 - np.exp(-0.5 * chi2[mask])
return conf
def CenterConf(mask, PA, sPA):
"""
Compute the confidence map for the position of the center of emission.
----------
Inputs:
mask : bool, numpy.ndarray
Mask of the polarization vectors from which the center of emission should be drawn.
PA : float, numpy.ndarray
2D matrix containing the computed polarization angle.
sPA : float, numpy.ndarray
2D matrix containing the total uncertainties on the polarization angle.
----------
Returns:
conf : numpy.ndarray
2D matrix of same shape as input containing the confidence on the polarization
computation between 0 and 1 for 2 parameters of interest (Q and U Stokes fluxes).
"""
chi2 = np.full(PA.shape, np.nan, dtype=np.float64)
conf = np.full(PA.shape, -1.0, dtype=np.float64)
yy, xx = np.indices(PA.shape)
Nobs = np.sum(mask)
def ideal(c):
itheta = np.full(PA.shape, np.nan)
itheta[(xx + 0.5) != c[0]] = np.degrees(np.arctan((yy[(xx + 0.5) != c[0]] + 0.5 - c[1]) / (xx[(xx + 0.5) != c[0]] + 0.5 - c[0])))
itheta[(xx + 0.5) == c[0]] = PA[(xx + 0.5) == c[0]]
return princ_angle(itheta)
def chisq(c):
return np.sum((princ_angle(PA[mask]) - ideal((c[0], c[1]))[mask]) ** 2 / sPA[mask] ** 2) / (Nobs - 2)
for x, y in zip(xx[np.isfinite(PA)], yy[np.isfinite(PA)]):
chi2[y, x] = chisq((x, y))
from scipy.optimize import minimize
conf[np.isfinite(PA)] = 0.5 * np.exp(-0.5 * chi2[np.isfinite(PA)])
c0 = np.unravel_index(np.argmax(conf), conf.shape)[::-1]
result = minimize(chisq, c0, bounds=[(0, PA.shape[1]), (0.0, PA.shape[0])], method="trust-constr")
if result.success:
print("Center of emission found: reduced chi_squared {0:.2f}/{1:d}={2:.2f}".format(chisq(result.x) * (Nobs - 2), Nobs - 2, chisq(result.x)))
else:
print("Center of emission not found", result)
return conf, result.x
def sci_not(v, err, rnd=1, out=str):
"""
Return the scientific error notation as a string.
----------
Inputs:
v : float
Value to be transformed into scientific notation.
err : float
Error on the value to be transformed into scientific notation.
rnd : int
Number of significant numbers for the scientific notation.
Default to 1.
out : str or other
Format in which the notation should be returned. "str" means the notation
is returned as a single string, "other" means it is returned as a list of "str".
Default to str.
----------
Returns:
conf : numpy.ndarray
2D matrix of same shape as input containing the confidence on the polarization
computation between 0 and 1 for 2 parameters of interest (Q and U Stokes fluxes).
"""
power = -int(("%E" % v)[-3:]) + 1
output = [r"({0}".format(round(v * 10**power, rnd)), round(v * 10**power, rnd)]
if isinstance(err, list):
for error in err:
output[0] += r" $\pm$ {0}".format(round(error * 10**power, rnd))
output.append(round(error * 10**power, rnd))
else:
output[0] += r" $\pm$ {0}".format(round(err * 10**power, rnd))
output.append(round(err * 10**power, rnd))
if out is str:
return output[0] + r")e{0}".format(-power)
else:
return *output[1:], -power
class cursor_data:
"""
Object to overwrite data getter and formatter in interactive plots.
"""
def __init__(self, im, error=None, fmt=None) -> None:
self.im = im
self.data = im.get_array()
self.fmt = "{:.2f}" if fmt is None else fmt
self.err = error
def set_err(self, err) -> None:
if self.data.shape != err.shape:
raise ValueError("Error and Data don't have the same shape")
else:
self.err = err
def set_fmt(self, fmt) -> None:
self.fmt = fmt
def get(self, event):
xmin, xmax, ymin, ymax = self.im.get_extent()
if self.im.origin == "upper":
ymin, ymax = ymax, ymin
data_extent = Bbox([[xmin, ymin], [xmax, ymax]])
array_extent = Bbox([[0, 0], [self.data.shape[1], self.data.shape[0]]])
trans = self.im.get_transform().inverted()
trans += BboxTransform(boxin=data_extent, boxout=array_extent)
point = trans.transform([event.x, event.y])
if any(np.isnan(point)):
return None
j, i = point.astype(int)
# Clip the coordinates at array bounds
if not (0 <= i < self.data.shape[0]) or not (0 <= j < self.data.shape[1]):
return None
elif self.err is not None:
return self.data[i, j], self.err[i, j]
else:
return self.data
def format(self, y) -> str:
return self.fmt.format(*y)
def wcs_CD_to_PC(CD):
"""
Return the position angle in degrees to the North direction of a wcs
from the values of coefficient of its transformation matrix.
----------
Inputs:
CD : np.ndarray
Value of the WCS matrix CD
----------
Returns:
cdelt : (float, float)
Scaling factor in arcsec between pixel in X and Y directions.
orient : float
Angle in degrees between the North direction and the Up direction of the WCS.
"""
det = CD[0, 0] * CD[1, 1] - CD[0, 1] * CD[1, 0]
sgn = -1.0 if det < 0 else 1.0
cdelt = np.array([sgn, 1.0]) * np.sqrt(np.sum(CD**2, axis=1))
rot = np.arctan2(-CD[1, 0], sgn * CD[0, 0])
rot2 = np.arctan2(sgn * CD[0, 1], CD[1, 1])
orient = 0.5 * (rot + rot2) * 180.0 / np.pi
return cdelt, orient
def wcs_PA(PC, cdelt):
"""
Return the position angle in degrees to the North direction of a wcs
from the values of coefficient of its transformation matrix.
----------
Inputs:
PC : np.ndarray
Value of the WCS matrix PC
cdelt : (float, float)
Scaling factor in arcsec between pixel in X and Y directions.
----------
Returns:
orient : float
Angle in degrees between the North direction and the Up direction of the WCS.
"""
rot = np.pi / 2.0 - np.arctan2(-cdelt[1] * PC[1, 0], abs(cdelt[0]) * PC[0, 0])
rot2 = np.pi / 2.0 - np.arctan2(abs(cdelt[0]) * PC[0, 1], cdelt[1] * PC[1, 1])
orient = 0.5 * (rot + rot2) * 180.0 / np.pi
return orient
def add_stokes_axis_to_header(header, ind=0):
"""
Add a new Stokes axis to the WCS cards in the header.
----------
Inputs:
header : astropy.io.fits.header.Header
The header in which the WCS to work on is saved.
ind : int, optional
Index of the WCS to insert the new Stokes axis in front of.
To add at the end, do add_before_ind = wcs.wcs.naxis
The beginning is at position 0.
Default to 0.
----------
Returns:
new_head : astropy.io.fits.header.Header
A new Header instance with an additional Stokes axis
"""
from astropy.wcs import WCS
from astropy.wcs.utils import add_stokes_axis_to_wcs
wcs = WCS(header).deepcopy()
wcs_Stokes = add_stokes_axis_to_wcs(wcs, ind).deepcopy()
wcs_Stokes.array_shape = (*wcs.array_shape[ind:], 4, *wcs.array_shape[:ind]) if ind < wcs.wcs.naxis else (4, *wcs.array_shape)
new_head = header.copy()
new_head["NAXIS"] = wcs_Stokes.wcs.naxis
for key in wcs.to_header().keys():
if key not in wcs_Stokes.to_header().keys():
del new_head[key]
for key, val in (
list(wcs_Stokes.to_header().items())
+ [("NAXIS%d" % (i + 1), k) for i, k in enumerate(wcs_Stokes.array_shape[::-1])]
+ [("CUNIT%d" % (ind + 1), "STOKES")]
):
if key not in header.keys() and key[:-1] + str(wcs.wcs.naxis) in header.keys():
new_head.insert(key[:-1] + str(wcs.wcs.naxis), (key, val), after=int(key[-1]) < wcs.wcs.naxis)
elif key not in header.keys() and key[:2] + str(wcs.wcs.naxis) + key[2:-1] + str(wcs.wcs.naxis) in header.keys():
new_head.insert(key[:2] + str(wcs.wcs.naxis) + key[2:-1] + str(wcs.wcs.naxis), (key, val), after=int(key[-1]) < wcs.wcs.naxis)
else:
new_head[key] = val
return new_head
def remove_stokes_axis_from_header(header):
"""
Remove a Stokes axis to the WCS cards in the header.
----------
Inputs:
header : astropy.io.fits.header.Header
The header in which the WCS to work on is saved.
----------
Returns:
new_head : astropy.io.fits.header.Header
A new Header instance with only a celestial WCS.
"""
from astropy.wcs import WCS
wcs = WCS(header).deepcopy()
new_wcs = WCS(header).celestial.deepcopy()
new_head = header.copy()
if "NAXIS%d" % (new_wcs.wcs.naxis + 1) in new_head.keys():
del new_head["NAXIS%d" % (new_wcs.wcs.naxis + 1)]
new_head["NAXIS"] = new_wcs.wcs.naxis
for i, k in enumerate(new_wcs.array_shape[::-1]):
new_head["NAXIS%d" % (i + 1)] = k
for key in list(WCS(header).to_header().keys()) + list(
np.unique([["PC%d_%d" % (i + 1, j + 1) for i in range(wcs.wcs.naxis)] for j in range(wcs.wcs.naxis)])
):
if key in new_head.keys() and key not in new_wcs.to_header().keys():
del new_head[key]
for key, val in new_wcs.to_header().items():
if key not in new_head.keys() and key[:-1] + str(wcs.wcs.naxis) in new_head.keys():
new_head.insert(key[:-1] + str(wcs.wcs.naxis), (key, val), after=True)
elif key not in new_head.keys() and key[:2] + str(wcs.wcs.naxis) + key[2:-1] + str(wcs.wcs.naxis) in new_head.keys():
new_head.insert(key[:2] + str(wcs.wcs.naxis) + key[2:-1] + str(wcs.wcs.naxis), (key, val), after=True)
else:
new_head[key] = val
return new_head