save raw flux in fits file and display

fix rebase display on main
This commit is contained in:
2025-03-19 16:38:36 +01:00
parent e3a3abdb0d
commit fb63b389e1
4 changed files with 204 additions and 110 deletions

View File

@@ -133,6 +133,12 @@ def plot_obs(data_array, headers, rectangle=None, shifts=None, savename=None, pl
ax_curr.arrow(x, y, dx, dy, length_includes_head=True, width=0.1, head_width=0.3, color="g")
ax_curr.plot([x, x], [0, data.shape[0] - 1], "--", lw=2, color="g", alpha=0.85)
ax_curr.plot([0, data.shape[1] - 1], [y, y], "--", lw=2, color="g", alpha=0.85)
# position of centroid
ax_curr.plot([data.shape[1] / 2, data.shape[1] / 2], [0, data.shape[0] - 1], "--", lw=2, color="b", alpha=0.85)
ax_curr.plot([0, data.shape[1] - 1], [data.shape[0] / 2, data.shape[0] / 2], "--", lw=2, color="b", alpha=0.85)
cr_x, cr_y = head["CRPIX1"], head["CRPIX2"]
# Plot WCS reference point
ax_curr.plot([cr_x], [cr_y], "+", lw=2, color="r", alpha=0.85)
if rectangle is not None:
x, y, width, height, angle, color = rectangle[i]
ax_curr.add_patch(Rectangle((x, y), width, height, angle=angle, edgecolor=color, fill=False))
@@ -189,7 +195,7 @@ def plot_Stokes(Stokes, savename=None, plots_folder=""):
for dataset in [stkI, stkQ, stkU]:
dataset[np.logical_not(data_mask)] = np.nan
wcs = WCS(Stokes[0]).deepcopy()
wcs = WCS(Stokes["I_STOKES"]).deepcopy()
# Plot figure
plt.rcParams.update({"font.size": 14})
@@ -288,6 +294,9 @@ def polarization_map(
The figure and ax created for interactive contour maps.
"""
# Get data
flux = Stokes[0].data[0].copy() * Stokes[0].header["PHOTFLAM"]
flux_error = Stokes[0].data[1].copy() * Stokes[0].header["PHOTFLAM"]
flux_mask = Stokes[0].data[2].astype(bool).copy()
stkI = Stokes["I_stokes"].data.copy()
stkQ = Stokes["Q_stokes"].data.copy()
stkU = Stokes["U_stokes"].data.copy()
@@ -302,6 +311,20 @@ def polarization_map(
data_mask = np.zeros(stkI.shape).astype(bool)
data_mask[stkI > 0.0] = True
wcs = WCS(Stokes["I_STOKES"]).deepcopy()
pivot_wav = Stokes["I_STOKES"].header["photplam"]
convert_flux = Stokes["I_STOKES"].header["photflam"]
# Get integrated flux values from sum
I_diluted = stkI[data_mask].sum() * convert_flux
I_diluted_err = np.sqrt(np.sum(stk_cov[0, 0][data_mask])) * convert_flux
# Get integrated polarization values from header
P_diluted = Stokes["I_STOKES"].header["P_int"]
P_diluted_err = Stokes["I_STOKES"].header["sP_int"]
PA_diluted = Stokes["I_STOKES"].header["PA_int"]
PA_diluted_err = Stokes["I_STOKES"].header["sPA_int"]
# Compute confidence level map
QN, UN, QN_ERR, UN_ERR = np.full((4, stkI.shape[0], stkI.shape[1]), np.nan)
for nflux, sflux in zip([QN, UN, QN_ERR, UN_ERR], [stkQ, stkU, np.sqrt(stk_cov[1, 1]), np.sqrt(stk_cov[2, 2])]):
@@ -314,12 +337,8 @@ def polarization_map(
for j in range(3):
stk_cov[i][j][np.logical_not(data_mask)] = np.nan
wcs = WCS(Stokes[0]).deepcopy()
pivot_wav = Stokes[0].header["photplam"]
convert_flux = Stokes[0].header["photflam"]
# Plot Stokes parameters map
if display is None or display.lower() in ["default"]:
if display is None or display.lower() in ["pol", "polarization", "polarisation", "pol_deg", "p"]:
plot_Stokes(Stokes, savename=savename, plots_folder=plots_folder)
# Compute SNR and apply cuts
@@ -363,7 +382,7 @@ def polarization_map(
fig = plt.figure(figsize=(7 * ratiox, 7 * ratioy), layout="constrained")
if ax is None:
ax = fig.add_subplot(111, projection=wcs)
ax.set(aspect="equal", fc="k")
ax.set(aspect="equal", fc="k", xlim=(0, stkI.shape[1]), ylim=(0, stkI.shape[0]))
# fig.subplots_adjust(hspace=0, wspace=0, left=0.102, right=1.02)
# ax.coords.grid(True, color='white', ls='dotted', alpha=0.5)
@@ -409,7 +428,25 @@ def polarization_map(
ax.set_facecolor("white")
font_color = "black"
if display.lower() in ["i", "intensity"]:
if display.lower() in ["f", "flux", "fluxdensity"]:
# If no display selected, show intensity map
display = "f"
if flux_lim is not None:
vmin, vmax = flux_lim
else:
vmin, vmax = np.max(flux[flux > 0.0]) / 2e3, np.max(flux[flux > 0.0])
imflux, cr = flux.copy(), WCS(Stokes[0].header).wcs.crpix.astype(int)
imflux[cr[1] - 1 : cr[1] + 2, cr[0] - 1 : cr[0] + 2] = np.nan
im = ax.imshow(
imflux, transform=ax.get_transform(WCS(Stokes[0].header).celestial), norm=LogNorm(vmin, vmax), aspect="equal", cmap=kwargs["cmap"], alpha=1.0
)
fig.colorbar(im, ax=ax, aspect=50, shrink=0.60, pad=0.025, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA$]")
levelsF = np.array([0.8, 2.0, 5.0, 10.0, 20.0, 50.0]) / 100.0 * vmax
print("Flux density contour levels : ", levelsF)
ax.contour(flux, levels=levelsF, transform=ax.get_transform(WCS(Stokes[0].header).celestial), colors="grey", linewidths=0.5)
ax.plot(*WCS(Stokes[1]).wcs.crpix, "g+")
I_diluted, I_diluted_err = np.sum(flux[flux_mask]), np.sqrt(np.sum(flux_error[flux_mask] ** 2))
elif display.lower() in ["i", "intensity"]:
# If no display selected, show intensity map
display = "i"
if flux_lim is not None:
@@ -420,9 +457,12 @@ def polarization_map(
vmin, vmax = 1.0 / 2.0 * np.median(np.sqrt(stk_cov[0, 0][stkI > 0.0]) * convert_flux), np.max(stkI[stkI > 0.0] * convert_flux)
im = ax.imshow(stkI * convert_flux, norm=LogNorm(vmin, vmax), aspect="equal", cmap=kwargs["cmap"], alpha=1.0)
fig.colorbar(im, ax=ax, aspect=50, shrink=0.60, pad=0.025, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
levelsI = np.array([0.8, 2.0, 5.0, 10.0, 20.0, 50.0]) / 100.0 * vmax
print("Flux density contour levels : ", levelsI)
ax.contour(stkI * convert_flux, levels=levelsI, colors="grey", linewidths=0.5)
# levelsI = np.array([0.8, 2.0, 5.0, 10.0, 20.0, 50.0]) / 100.0 * vmax
# print("Stokes I contour levels : ", levelsI)
# ax.contour(stkI * convert_flux, levels=levelsI, colors="grey", linewidths=0.5)
levelsF = np.array([0.8, 2.0, 5.0, 10.0, 20.0, 50.0]) / 100.0 * np.max(flux[flux > 0.0])
print("Flux density contour levels : ", levelsF)
ax.contour(flux, levels=levelsF, transform=ax.get_transform(WCS(Stokes[0].header).celestial), colors="grey", linewidths=0.5)
elif display.lower() in ["pf", "pol_flux"]:
# Display polarization flux
display = "pf"
@@ -512,22 +552,18 @@ def polarization_map(
# Defaults to intensity map
if flux_lim is not None:
vmin, vmax = flux_lim
elif mask.sum() > 0.0:
vmin, vmax = 1.0 * np.mean(np.sqrt(stk_cov[0, 0][mask]) * convert_flux), np.max(stkI[stkI > 0.0] * convert_flux)
else:
vmin, vmax = 1.0 * np.mean(np.sqrt(stk_cov[0, 0][stkI > 0.0]) * convert_flux), np.max(stkI[stkI > 0.0] * convert_flux)
im = ax.imshow(stkI * convert_flux, norm=LogNorm(vmin, vmax), aspect="equal", cmap=kwargs["cmap"], alpha=1.0)
vmin, vmax = np.max(flux[flux > 0.0] * convert_flux) / 2e3, np.max(flux[flux > 0.0] * convert_flux)
im = ax.imshow(
flux * Stokes[0].header["PHOTFLAM"],
transform=ax.get_transform(WCS(Stokes[0].header).celestial),
norm=LogNorm(vmin, vmax),
aspect="equal",
cmap=kwargs["cmap"],
alpha=1.0,
)
fig.colorbar(im, ax=ax, aspect=50, shrink=0.60, pad=0.025, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA$]")
# Get integrated flux values from sum
I_diluted = stkI[data_mask].sum()
I_diluted_err = np.sqrt(np.sum(stk_cov[0, 0][data_mask]))
# Get integrated polarization values from header
P_diluted = Stokes[0].header["P_int"]
P_diluted_err = Stokes[0].header["sP_int"]
PA_diluted = Stokes[0].header["PA_int"]
PA_diluted_err = Stokes[0].header["sPA_int"]
I_diluted, I_diluted_err = np.sum(flux[flux_mask]), np.sqrt(np.sum(flux_error[flux_mask] ** 2))
plt.rcParams.update({"font.size": 11})
px_size = wcs.wcs.get_cdelt()[0] * 3600.0
@@ -545,12 +581,12 @@ def polarization_map(
back_length=0.0,
head_length=7.0,
head_width=7.0,
angle=-Stokes[0].header["orientat"],
angle=-Stokes["I_STOKES"].header["orientat"],
text_props={"ec": "k", "fc": font_color, "alpha": 1, "lw": 0.5},
arrow_props={"ec": "k", "fc": font_color, "alpha": 1, "lw": 1},
)
if display.lower() in ["i", "s_i", "snri", "pf", "p", "pa", "s_p", "snrp", "confp"] and step_vec != 0:
if display.lower() in ["f", "i", "s_i", "snri", "pf", "p", "pa", "s_p", "snrp", "confp"] and step_vec != 0:
if scale_vec == -1:
poldata[np.isfinite(poldata)] = 1.0 / 2.0
step_vec = 1
@@ -1174,6 +1210,8 @@ class overplot_chandra(align_maps):
other_data = deepcopy(self.other_data)
other_wcs = self.other_wcs.deepcopy()
if zoom != 1:
from scipy.ndimage import zoom as sc_zoom
other_data = sc_zoom(other_data, zoom)
other_wcs.wcs.crpix *= zoom
other_wcs.wcs.cdelt /= zoom
@@ -2439,9 +2477,9 @@ class pol_map(object):
self.conf = PCconf(self.QN, self.UN, self.QN_ERR, self.UN_ERR)
# Get data
self.targ = self.Stokes[0].header["targname"]
self.pivot_wav = self.Stokes[0].header["photplam"]
self.map_convert = self.Stokes[0].header["photflam"]
self.targ = self.Stokes["I_STOKES"].header["targname"]
self.pivot_wav = self.Stokes["I_STOKES"].header["photplam"]
self.map_convert = self.Stokes["I_STOKES"].header["photflam"]
# Create figure
plt.rcParams.update({"font.size": 10})
@@ -2535,7 +2573,7 @@ class pol_map(object):
def select_roi(event):
if self.data is None:
self.data = self.Stokes[0].data
self.data = self.Stokes["I_STOKES"].data
if self.selected:
self.selected = False
self.region = deepcopy(self.select_instance.mask.astype(bool))
@@ -2577,7 +2615,7 @@ class pol_map(object):
def select_aperture(event):
if self.data is None:
self.data = self.Stokes[0].data
self.data = self.Stokes["I_STOKES"].data
if self.selected:
self.selected = False
self.select_instance.update_mask()
@@ -2634,7 +2672,7 @@ class pol_map(object):
def select_slit(event):
if self.data is None:
self.data = self.Stokes[0].data
self.data = self.Stokes["I_STOKES"].data
if self.selected:
self.selected = False
self.select_instance.update_mask()
@@ -2911,7 +2949,19 @@ class pol_map(object):
@property
def wcs(self):
return WCS(self.Stokes[0].header).celestial.deepcopy()
return WCS(self.Stokes["I_STOKES"].header).celestial.deepcopy()
@property
def Flux(self):
return self.Stokes[0].data[0] * self.Stokes[0].header["PHOTFLAM"]
@property
def Flux_err(self):
return self.Stokes[0].data[1] * self.Stokes[0].header["PHOTFLAM"]
@property
def Flux_mask(self):
return self.Stokes[0].data[2].astype(bool)
@property
def I(self):
@@ -2975,7 +3025,7 @@ class pol_map(object):
@property
def data_mask(self):
return self.Stokes["DATA_MASK"].data
return self.Stokes["DATA_MASK"].data.astype(bool)
def set_data_mask(self, mask):
self.Stokes[np.argmax([self.Stokes[i].header["datatype"] == "Data_mask" for i in range(len(self.Stokes))])].data = mask.astype(float)
@@ -3046,7 +3096,7 @@ class pol_map(object):
back_length=0.0,
head_length=10.0,
head_width=10.0,
angle=-self.Stokes[0].header["orientat"],
angle=-self.Stokes["I_STOKES"].header["orientat"],
color="white",
text_props={"ec": None, "fc": "w", "alpha": 1, "lw": 0.4},
arrow_props={"ec": None, "fc": "w", "alpha": 1, "lw": 1},
@@ -3054,18 +3104,20 @@ class pol_map(object):
ax.add_artist(self.north_dir)
def display(self, fig=None, ax=None, flux_lim=None):
norm = None
if self.display_selection is None:
self.display_selection = "total_flux"
kwargs = dict([])
if flux_lim is None:
flux_lim = self.flux_lim
if self.display_selection.lower() in ["total_flux"]:
self.data = self.I * self.map_convert
if self.display_selection is None or self.display_selection.lower() in ["total_flux"]:
self.data = self.Flux # self.I * self.map_convert
if flux_lim is None:
vmin, vmax = (1.0 / 2.0 * np.median(self.data[self.data > 0.0]), np.max(self.data[self.data > 0.0]))
else:
vmin, vmax = flux_lim
norm = LogNorm(vmin, vmax)
kwargs["norm"] = LogNorm(vmin, vmax)
if ax is None:
kwargs["transform"] = self.ax.get_transform(WCS(self.Stokes[0].header).celestial)
else:
kwargs["transform"] = ax.get_transform(WCS(self.Stokes[0].header).celestial)
label = r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]"
elif self.display_selection.lower() in ["pol_flux"]:
self.data = self.I * self.map_convert * self.P
@@ -3073,28 +3125,28 @@ class pol_map(object):
vmin, vmax = (1.0 / 2.0 * np.median(self.I[self.I > 0.0] * self.map_convert), np.max(self.I[self.I > 0.0] * self.map_convert))
else:
vmin, vmax = flux_lim
norm = LogNorm(vmin, vmax)
kwargs["norm"] = LogNorm(vmin, vmax)
label = r"$P \cdot F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]"
elif self.display_selection.lower() in ["pol_deg"]:
self.data = self.P * 100.0
vmin, vmax = 0.0, np.max(self.data[self.P > self.s_P])
kwargs["vmin"], kwargs["vmax"] = 0.0, np.max(self.data[self.P > self.s_P])
label = r"$P$ [%]"
elif self.display_selection.lower() in ["pol_ang"]:
self.data = princ_angle(self.PA)
vmin, vmax = 0, 180.0
kwargs["vmin"], kwargs["vmax"] = 0, 180.0
label = r"$\theta_{P}$ [°]"
elif self.display_selection.lower() in ["snri"]:
s_I = np.sqrt(self.IQU_cov[0, 0])
SNRi = np.zeros(self.I.shape)
SNRi[s_I > 0.0] = self.I[s_I > 0.0] / s_I[s_I > 0.0]
self.data = SNRi
vmin, vmax = 0.0, np.max(self.data[self.data > 0.0])
kwargs["vmin"], kwargs["vmax"] = 0.0, np.max(self.data[self.data > 0.0])
label = r"$I_{Stokes}/\sigma_{I}$"
elif self.display_selection.lower() in ["snrp"]:
SNRp = np.zeros(self.P.shape)
SNRp[self.s_P > 0.0] = self.P[self.s_P > 0.0] / self.s_P[self.s_P > 0.0]
self.data = SNRp
vmin, vmax = 0.0, np.max(self.data[self.data > 0.0])
kwargs["vmin"], kwargs["vmax"] = 0.0, np.max(self.data[self.data > 0.0])
label = r"$P/\sigma_{P}$"
if fig is None:
@@ -3105,22 +3157,17 @@ class pol_map(object):
self.cbar.remove()
if hasattr(self, "im"):
self.im.remove()
if norm is not None:
self.im = ax.imshow(self.data, norm=norm, aspect="equal", cmap="inferno")
else:
self.im = ax.imshow(self.data, vmin=vmin, vmax=vmax, aspect="equal", cmap="inferno")
self.im = ax.imshow(self.data, aspect="equal", cmap="inferno", **kwargs)
ax.set(xlim=(0, self.I.shape[1]), ylim=(0, self.I.shape[0]))
plt.rcParams.update({"font.size": 14})
self.cbar = fig.colorbar(self.im, ax=ax, aspect=50, shrink=0.75, pad=0.025, label=label)
plt.rcParams.update({"font.size": 10})
fig.canvas.draw_idle()
return self.im
else:
if norm is not None:
im = ax.imshow(self.data, norm=norm, aspect="equal", cmap="inferno")
else:
im = ax.imshow(self.data, vmin=vmin, vmax=vmax, aspect="equal", cmap="inferno")
ax.set_xlim(0, self.data.shape[1])
ax.set_ylim(0, self.data.shape[0])
im = ax.imshow(self.data, aspect="equal", cmap="inferno", **kwargs)
ax.set_xlim(0, self.I.shape[1])
ax.set_ylim(0, self.I.shape[0])
plt.rcParams.update({"font.size": 14})
fig.colorbar(im, ax=ax, aspect=50, shrink=0.75, pad=0.025, label=label)
plt.rcParams.update({"font.size": 10})
@@ -3144,24 +3191,24 @@ class pol_map(object):
ax = self.ax
if hasattr(self, "quiver"):
self.quiver.remove()
self.quiver = ax.quiver(
X[:: self.step_vec, :: self.step_vec],
Y[:: self.step_vec, :: self.step_vec],
XY_U[:: self.step_vec, :: self.step_vec],
XY_V[:: self.step_vec, :: self.step_vec],
units="xy",
scale=1.0 / scale_vec,
scale_units="xy",
pivot="mid",
headwidth=0.0,
headlength=0.0,
headaxislength=0.0,
width=0.3,
linewidth=0.6,
color="white",
edgecolor="black",
)
if self.pa_err:
self.quiver = ax.quiver(
X[:: self.step_vec, :: self.step_vec],
Y[:: self.step_vec, :: self.step_vec],
XY_U[:: self.step_vec, :: self.step_vec],
XY_V[:: self.step_vec, :: self.step_vec],
units="xy",
scale=1.0 / scale_vec,
scale_units="xy",
pivot="mid",
headwidth=0.0,
headlength=0.0,
headaxislength=0.0,
width=0.1,
# linewidth=0.6,
color="black",
edgecolor="black",
)
XY_U_err1, XY_V_err1 = (
P_cut * np.cos(np.pi / 2.0 + (self.PA + 3.0 * self.s_PA) * np.pi / 180.0),
P_cut * np.sin(np.pi / 2.0 + (self.PA + 3.0 * self.s_PA) * np.pi / 180.0),
@@ -3210,6 +3257,25 @@ class pol_map(object):
edgecolor="black",
ls="dashed",
)
else:
self.quiver = ax.quiver(
X[:: self.step_vec, :: self.step_vec],
Y[:: self.step_vec, :: self.step_vec],
XY_U[:: self.step_vec, :: self.step_vec],
XY_V[:: self.step_vec, :: self.step_vec],
units="xy",
scale=1.0 / scale_vec,
scale_units="xy",
pivot="mid",
headwidth=0.0,
headlength=0.0,
headaxislength=0.0,
width=0.3,
linewidth=0.6,
color="white",
edgecolor="black",
)
fig.canvas.draw_idle()
return self.quiver
else:
@@ -3281,12 +3347,20 @@ class pol_map(object):
str_conf = ""
if self.region is None:
s_I = np.sqrt(self.IQU_cov[0, 0])
I_reg = self.I.sum()
I_reg_err = np.sqrt(np.sum(s_I**2))
P_reg = self.Stokes[0].header["P_int"]
P_reg_err = self.Stokes[0].header["sP_int"]
PA_reg = self.Stokes[0].header["PA_int"]
PA_reg_err = self.Stokes[0].header["sPA_int"]
I_reg = (
np.sum(self.Flux[self.Flux_mask]) / self.map_convert
if self.display_selection is None or self.display_selection.lower() in ["total_flux"]
else np.sum(self.I[self.data_mask])
)
I_reg_err = (
np.sqrt(np.sum(self.Flux_err[self.Flux_mask] ** 2)) / self.map_convert
if self.display_selection is None or self.display_selection.lower() in ["total_flux"]
else np.sqrt(np.sum(s_I[self.data_mask] ** 2))
)
P_reg = self.Stokes["I_STOKES"].header["P_int"]
P_reg_err = self.Stokes["I_STOKES"].header["sP_int"]
PA_reg = self.Stokes["I_STOKES"].header["PA_int"]
PA_reg_err = self.Stokes["I_STOKES"].header["sPA_int"]
s_I = np.sqrt(self.IQU_cov[0, 0])
s_Q = np.sqrt(self.IQU_cov[1, 1])