add readfos.specpol.from_txt and more to dump_txt

This commit is contained in:
2025-01-23 15:22:09 +01:00
parent bdfed1190f
commit fb6e821dc1

View File

@@ -67,6 +67,8 @@ class specpol(object):
self.I_r_err = deepcopy(other.I_r_err)
self.wav_r = deepcopy(other.wav_r)
self.wav_r_err = deepcopy(other.wav_r_err)
elif isinstance(other, str):
self.from_txt(other)
else:
# Initialise to zero
if isinstance(other, int):
@@ -80,8 +82,9 @@ class specpol(object):
Set all values to zero.
"""
self.hd = dict([])
self.hd["TARGNAME"], self.hd["PROPOSID"], self.hd["ROOTNAME"], self.hd["APER_ID"] = "", 0, "", ""
self.hd["DENSITY"] = False
self.hd["XUNIT"], self.hd["YUNIT"] = r"Wavelength [m]", r"F [count s$^{-1}$]"
self.hd["XUNIT"], self.hd["YUNIT"] = r"Wavelength [m]", r"{0:s}F [$10^{{{1:d}}}$ count s$^{{-1}}$]"
self.bin_edges = np.zeros(n + 1)
self.wav = np.zeros(n)
self.wav_err = np.zeros((n, 2))
@@ -92,7 +95,9 @@ class specpol(object):
self.IQUV_cov = np.zeros((4, 4, n))
def rest(self, wav=None, z=None):
if z is None and "REDSHIFT" not in self.hd.keys():
if z is None and self.hd["TARGNAME"] == "":
z = 0
elif z is None and "REDSHIFT" not in self.hd.keys():
from astroquery.ipac.ned import Ned
z = Ned.query_object(self.hd["TARGNAME"])["Redshift"][0]
@@ -104,7 +109,9 @@ class specpol(object):
return wav / (z + 1)
def unrest(self, wav=None, z=None):
if z is None and "REDSHIFT" not in self.hd.keys():
if z is None and self.hd["TARGNAME"] == "":
z = 0
elif z is None and "REDSHIFT" not in self.hd.keys():
from astroquery.ipac.ned import Ned
z = Ned.query_object(self.hd["TARGNAME"])["Redshift"][0]
@@ -258,12 +265,60 @@ class specpol(object):
bin_edges = np.arange(self.bin_edges.min(), self.bin_edges.max() + size, size, dtype=np.float32)
return self.bin(bin_edges)
def from_txt(self, filename, data_dir=""):
"""
Fill current spectra from a text file.
"""
data_dump = np.loadtxt(join_path(data_dir, filename), skiprows=1).T
self.zero(data_dump.shape[1])
(
self.wav,
self.wav_err[:, 0],
self.I,
self.IQUV_cov[0, 0],
self.Q,
self.IQUV_cov[1, 1],
self.U,
self.IQUV_cov[2, 2],
self.V,
self.IQUV_cov[3, 3],
) = data_dump[:10]
self.wav_err[:, 1] = deepcopy(self.wav_err[:, 0])
self.bin_edges[:-1], self.bin_edges[-1] = deepcopy(self.wav - self.wav_err[:, 0]), deepcopy(self.wav[-1] + self.wav_err[-1, 1])
for i in range(4):
self.IQUV_cov[i][i] = deepcopy(self.IQUV_cov[i][i]) ** 2
with open(join_path(data_dir, filename)) as f:
self.hd["TARGNAME"], self.hd["PROPOSID"], self.hd["ROOTNAME"], self.hd["APER_ID"], self.hd["XUNIT"], self.hd["YUNIT"] = f.readline()[2:].split(";")
def dump_txt(self, filename, output_dir=""):
"""
Dump current spectra to a text file.
"""
data_dump = np.array([self.wav, self.I, self.Q, self.U, self.V, self.P, self.PA]).T
np.savetxt(join_path(output_dir, filename + ".txt"), data_dump)
header = ";".join([self.hd["TARGNAME"], str(self.hd["PROPOSID"]), self.hd["ROOTNAME"], self.hd["APER_ID"], self.hd["XUNIT"], self.hd["YUNIT"]])
header += "\nwav\t wav_err\t I\t I_err\t Q\t Q_err\t U\t U_err\t V\t V_err\t P\t P_err\t PA\t PA_err"
data_dump = np.array(
[
self.wav,
self.wav_err.mean(axis=1),
self.I,
self.I_err,
self.Q,
self.Q_err,
self.U,
self.U_err,
self.V,
self.V_err,
self.P,
self.P_err,
self.PA,
self.PA_err,
]
).T
np.savetxt(
join_path(output_dir, filename + ".txt"),
data_dump,
header=header,
)
return join_path(output_dir, filename)
def plot(self, fig=None, ax=None, savename=None, plots_folder=""):
@@ -271,8 +326,9 @@ class specpol(object):
Display current spectra.
"""
if fig is None:
plt.rcParams.update({"font.size": 15})
if ax is None:
self.fig, self.ax = plt.subplots(3, 1, sharex=True, figsize=(10, 15))
self.fig, self.ax = plt.subplots(3, 1, sharex=True, figsize=(20, 15))
self.fig.subplots_adjust(hspace=0)
self.fig.suptitle("_".join([self.hd["TARGNAME"], str(self.hd["PROPOSID"]), self.hd["ROOTNAME"], self.hd["APER_ID"]]))
else:
@@ -297,22 +353,44 @@ class specpol(object):
ax1 = self.ax
# Display flux and polarized flux on first ax
ax1.set_xlabel(self.hd["XUNIT"])
ax1.set_ylabel(self.hd["YUNIT"])
if hasattr(self, "I_r"):
# If available, display "raw" total flux
ax1.errorbar(self.wav_r, self.I_r, xerr=self.wav_r_err.T, yerr=self.I_r_err, color="k", fmt=".", label="I")
ymin, ymax = np.min((self.I_r - 1.5 * self.I_r_err)[self.I_r - 1.5 * self.I_r_err > 0.0]), np.max(self.I_r + self.I_r_err * 1.5)
yoffset = np.floor(np.log10(self.I_r[self.I_r > 0.0].min())).astype(int)
yoff = 10.0**yoffset
ymin, ymax = (
np.min((self.I_r - 1.5 * self.I_r_err)[self.I_r > 1.5 * self.I_r_err]) / yoff,
np.max((self.I_r + self.I_r_err * 1.5)[self.I_r > 1.5 * self.I_r_err]) / yoff,
)
xmin, xmax = np.min(self.wav_r - self.wav_r_err[:, 0]), np.max(self.wav_r + self.wav_r_err[:, 1])
ax1.errorbar(self.wav_r, self.I_r / yoff, xerr=self.wav_r_err.T, yerr=self.I_r_err / yoff, color="k", fmt=".", label="I")
else:
ax1.errorbar(self.wav, self.I, xerr=self.wav_err.T, yerr=self.I_err, color="k", fmt=".", label="I")
ymin, ymax = np.min((self.I - 1.5 * self.I_err)[self.I - 1.5 * self.I_err > 0.0]), np.max(self.I + self.I_err * 1.5)
yoffset = np.floor(np.log10(self.I[self.I > 0.0].min())).astype(int)
yoff = 10.0**yoffset
ymin, ymax = (
np.min((self.I - 1.5 * self.I_err)[self.I > 1.5 * self.I_err]) / yoff,
np.max((self.I + self.I_err * 1.5)[self.I > 1.5 * self.I_err]) / yoff,
)
xmin, xmax = np.min(self.wav - self.wav_err[:, 0]), np.max(self.wav + self.wav_err[:, 1])
# ax1.errorbar(self.wav, self.PF, xerr=self.wav_err.T, yerr=self.PF_err, color="b", fmt=".", label="PF")
# ax1.set_ylim([np.min(self.PF[self.PF > 0.0] - self.PF_err[self.PF > 0.0] * 1.5), ymax])
# ax1.set_ylim([ymin, ymax])
ax1.errorbar(self.wav, self.I / yoff, xerr=self.wav_err.T, yerr=self.I_err / yoff, color="k", fmt=".", label="I")
ax1.set_xlim([np.min([xmin, self.bin_edges.min()]), np.max([xmax, self.bin_edges.max()])])
# ax1.set_yscale("log")
ax1.set_xlabel(self.hd["XUNIT"])
ax1.set_ylim([ymin, ymax])
ax1.set_ylabel(self.hd["YUNIT"].format("", yoffset))
# ax11 = ax1.twinx()
# pfoffset = np.floor(np.log10(self.PF[self.PF > 0.0].min())).astype(int)
# pfoff = 10.0**pfoffset
# ax11.errorbar(self.wav, self.PF / pfoff, xerr=self.wav_err.T, yerr=self.PF_err / pfoff, color="b", fmt=".", label="PF")
# ax11.set_ylim(
# [
# ymin * yoff * self.P[np.logical_and(self.P > 0.0, np.isfinite(self.P))].min() / pfoff,
# ymax * yoff * self.P[np.logical_and(self.P > 0.0, np.isfinite(self.P))].max() / pfoff,
# ]
# )
# ax11.set_ylabel(self.hd["YUNIT"].format("Px", pfoffset), color="b")
# ax11.tick_params(axis="y", color="b", labelcolor="b")
# ax1.legend(ncols=2, loc=1)
if isinstance(self.ax, np.ndarray):
@@ -488,7 +566,7 @@ class FOSspecpol(specpol):
self.hd = dict([])
self.hd["DENSITY"] = True
self.hd["TARGNAME"] = "Undefined"
self.hd["XUNIT"], self.hd["YUNIT"] = r"Wavelength [$\AA$]", r"F$_\lambda$ [erg s$^{-1}$ cm$^{-2} \AA^{-1}$]"
self.hd["XUNIT"], self.hd["YUNIT"] = r"Wavelength [$\AA$]", r"{0:s}F$_\lambda$ [$10^{{{1:d}}}$ erg s$^{{-1}}$ cm$^{{-2}} \AA^{{-1}}$]"
self.bin_edges = np.zeros((4, n + 1))
self.wav = np.zeros((4, n))
self.wav_err = np.zeros((4, n, 2))
@@ -530,7 +608,7 @@ class FOSspecpol(specpol):
raise ValueError("Input must be a path to a fits file or an HDUlist")
# FOS spectra are given in flux density with respect to angstrom wavelength
self.hd["DENSITY"] = True
self.hd["XUNIT"], self.hd["YUNIT"] = r"Wavelength [$\AA$]", r"F$_\lambda$ [erg s$^{-1}$ cm$^{-2} \AA^{-1}$]"
self.hd["XUNIT"], self.hd["YUNIT"] = r"Wavelength [$\AA$]", r"{0:s}F$_\lambda$ [$10^{{{1:d}}}$ erg s$^{{-1}}$ cm$^{{-2}} \AA^{{-1}}$]"
# We set the error to be half the distance to the next mesure
self.wav = np.concat((wav[0:2, :], wav[0].reshape(1, wav.shape[1]), wav[0].reshape(1, wav.shape[1])), axis=0)