add batch binning in FOSspecpol in readfos scr

This commit is contained in:
2024-12-19 16:46:01 +01:00
parent b218279fef
commit 729720c5bb

View File

@@ -98,43 +98,53 @@ class specpol(object):
@property
def QN(self):
np.seterr(divide="ignore", invalid="ignore")
return self.Q / np.where(self.I > 0, self.I, np.nan)
@property
def QN_err(self):
np.seterr(divide="ignore", invalid="ignore")
return self.Q_err / np.where(self.I > 0, self.I, np.nan)
@property
def UN(self):
np.seterr(divide="ignore", invalid="ignore")
return self.U / np.where(self.I > 0, self.I, np.nan)
@property
def UN_err(self):
np.seterr(divide="ignore", invalid="ignore")
return self.U_err / np.where(self.I > 0, self.I, np.nan)
@property
def VN(self):
np.seterr(divide="ignore", invalid="ignore")
return self.V / np.where(self.I > 0, self.I, np.nan)
@property
def VN_err(self):
np.seterr(divide="ignore", invalid="ignore")
return self.V_err / np.where(self.I > 0, self.I, np.nan)
@property
def PF(self):
np.seterr(divide="ignore", invalid="ignore")
return np.sqrt(self.Q**2 + self.U**2 + self.V**2)
@property
def PF_err(self):
np.seterr(divide="ignore", invalid="ignore")
return np.sqrt(self.Q**2 * self.Q_err**2 + self.U**2 * self.U_err**2 + self.V**2 * self.V_err**2) / np.where(self.PF > 0, self.PF, np.nan)
@property
def P(self):
np.seterr(divide="ignore", invalid="ignore")
return self.PF / np.where(self.I > 0, self.I, np.nan)
@property
def P_err(self):
return np.sqrt(self.PF_err**2 + (self.PF / self.I) ** 2 * self.I_err**2) / np.where(self.I > 0, self.I, np.nan)
np.seterr(divide="ignore", invalid="ignore")
return np.where(self.I > 0, np.sqrt(self.PF_err**2 + (self.PF / self.I) ** 2 * self.I_err**2) / self.I, np.nan)
@property
def PA(self):
@@ -202,34 +212,33 @@ class specpol(object):
else:
self.fig = fig
self.ax = ax
if isinstance(ax, np.ndarray):
ax1, ax2 = self.ax[:2]
else:
ax1 = self.ax
self.ax[0].set_xlabel(r"Wavelength [$\AA$]")
self.ax[1].set_xlabel(r"Wavelength [$\AA$]")
# Display flux and polarized flux on first ax
ax1.set_xlabel(r"Wavelength [$\AA$]")
ax1.errorbar(self.wav, self.I, xerr=self.wav_err.T, yerr=self.I_err, color="k", fmt=".", label="I")
ax1.errorbar(self.wav, self.PF, xerr=self.wav_err.T, yerr=self.PF_err, color="b", fmt=".", label="PF")
ax1.set_ylabel(r"F$_\lambda$ [erg s$^{-1}$ cm$^{-2} \AA^{-1}$]")
ax1.legend(ncols=2, loc=1)
self.ax[0].errorbar(self.wav, self.I, xerr=self.wav_err.T, yerr=self.I_err, color="k", fmt=".", label="I")
self.ax[0].errorbar(self.wav, self.PF, xerr=self.wav_err.T, yerr=self.PF_err, color="b", fmt=".", label="PF")
self.ax[0].set_ylabel(r"F$_\lambda$ [erg s$^{-1}$ cm$^{-2} \AA^{-1}$]")
self.ax[0].legend()
# ax1 = self.ax[0].twinx()
# ax1.errorbar(self.wav, self.QN, xerr=self.wav_err.T, yerr=self.QN_err, fmt=".", label="QN")
# ax1.errorbar(self.wav, self.UN, xerr=self.wav_err.T, yerr=self.UN_err, fmt=".", label="UN")
# ax1.errorbar(self.wav, self.VN, xerr=self.wav_err.T, yerr=self.VN_err, fmt=".", label="VN")
# ax1.set_ylim([-1.0, 1.0])
# ax1.set_ylabel(r"Normalised stokes flux", color="g")
# ax1.tick_params(axis="y", color="g", labelcolor="g")
# h0, l0 = self.ax[0].get_legend_handles_labels()
# h1, l1 = ax1.get_legend_handles_labels()
# self.ax[0].legend(h0 + h1, l0 + l1, ncols=5)
self.ax[1].errorbar(self.wav, self.P, xerr=self.wav_err.T, yerr=self.P_err, color="b", fmt=".", label="P")
self.ax[1].set_ylim([0.0, 1.0])
self.ax[1].set_ylabel(r"P", color="b")
self.ax[1].tick_params(axis="y", color="b", labelcolor="b")
ax2 = self.ax[1].twinx()
ax2.errorbar(self.wav, self.PA, xerr=self.wav_err.T, yerr=self.PA_err, color="r", fmt=".", label="PA [°]")
ax2.set_ylim([0.0, 180.0])
ax2.set_ylabel(r"PA", color="r")
ax2.tick_params(axis="y", color="r", labelcolor="r")
if isinstance(ax, np.ndarray):
# When given 2 axes, display P and PA on second
ax2.set_xlabel(r"Wavelength [$\AA$]")
ax2.errorbar(self.wav, self.P, xerr=self.wav_err.T, yerr=self.P_err, color="b", fmt=".", label="P")
ax2.set_ylim([0.0, 1.0])
ax2.set_ylabel(r"P", color="b")
ax2.tick_params(axis="y", color="b", labelcolor="b")
ax22 = ax2.twinx()
ax22.errorbar(self.wav, self.PA, xerr=self.wav_err.T, yerr=self.PA_err, color="r", fmt=".", label="PA [°]")
ax22.set_ylim([0.0, 180.0])
ax22.set_ylabel(r"PA", color="r")
ax22.tick_params(axis="y", color="r", labelcolor="r")
h2, l2 = ax2.get_legend_handles_labels()
h22, l22 = ax22.get_legend_handles_labels()
ax2.legend(h2 + h22, l2 + l22, ncols=2, loc=1)
if hasattr(self, "fig") and savename is not None:
self.fig.savefig(join_path(plots_folder, savename + ".pdf"), dpi=300, bbox_inches="tight")
@@ -299,18 +308,18 @@ class FOSspecpol(specpol):
if hasattr(self, "fig"):
del self.fig
def dump_txt(self, filename, output_dir=""):
def dump_txt(self, filename, spec_list=None, output_dir=""):
"""
Dump current spectra to a text file.
"""
outfiles = []
for i in range(min(self.wav.shape[0], 4)):
data_dump = np.array([self.wav[i], self.I[i], self.Q[i], self.U[i], self.V[i], self.P[i], self.PA[i]]).T
np.savetxt(join_path(output_dir, filename + "_%d.txt" % i), data_dump)
outfiles.append(join_path(output_dir, filename + "_%d" % i))
if spec_list is None:
spec_list = self.subspec
for i, name in enumerate(["PASS1", "PASS2", "PASS12", "PASS12corr"]):
outfiles.append(spec_list[name].dump_txt(filename="_".join([filename, name]), output_dir=output_dir))
return outfiles
def plot(self, savename=None, plots_folder=""):
def plot(self, spec_list=None, savename=None, plots_folder="", fos=False):
"""
Display current spectra in single figure.
"""
@@ -319,42 +328,31 @@ class FOSspecpol(specpol):
del self.ax
if hasattr(self, "fig"):
del self.fig
self.fig, self.ax = plt.subplots(min(self.wav.shape[0], 4), 2, sharex=True, figsize=(20, 10), layout="constrained")
self.ax[-1][0].set_xlabel(r"Wavelength [$\AA$]", size="large")
self.ax[-1][1].set_xlabel(r"Wavelength [$\AA$]", size="large")
for i, name in enumerate(["Pass Direction 1", "Pass Direction 2", "Pass Direction 1&2", "Pass Direction 1&2 corrected"]):
self.ax[i][0].set_title(name)
self.ax[i][0].errorbar(self.wav[i], self.I[i], xerr=self.wav_err[i].T, yerr=self.I_err[i], color="k", fmt=".", label="I")
self.ax[i][0].errorbar(
self.wav[i],
self.P_fos[i] * self.I[i],
xerr=self.wav_err[i].T,
yerr=(self.P_fos_err[i] * self.I[i] + self.P_fos_err[i] * self.I_err[i]),
color="b",
fmt=".",
label="PF_fos",
)
self.ax[i][0].set_ylabel(r"F$_\lambda$ [erg s$^{-1}$ cm$^{-2} \AA^{-1}$]")
# ax1 = self.ax[i][0].twinx()
# ax1.errorbar(self.wav[i], self.QN[i], xerr=self.wav_err[i].T, yerr=self.QN_err[i], fmt=".", label="QN")
# ax1.errorbar(self.wav[i], self.UN[i], xerr=self.wav_err[i].T, yerr=self.UN_err[i], fmt=".", label="UN")
# ax1.errorbar(self.wav[i], self.VN[i], xerr=self.wav_err[i].T, yerr=self.VN_err[i], fmt=".", label="VN")
# ax1.set_ylabel(r"Normalised stokes flux", color="g")
# ax1.tick_params(axis="y", color="g", labelcolor="g")
# ax1.set_ylim([-1.0, 1.0])
# h0, l0 = self.ax[i][0].get_legend_handles_labels()
# h1, l1 = ax1.get_legend_handles_labels()
# self.ax[i][0].legend(h0 + h1, l0 + l1, ncols=5)
self.ax[i][0].legend()
self.ax[i][1].errorbar(self.wav[i], self.P_fos[i], xerr=self.wav_err[i].T, yerr=self.P_fos_err[i], color="b", fmt=".", label="P_fos")
self.ax[i][1].set(ylabel=r"P_fos", ylim=[0.0, 1.0])
ax2 = self.ax[i][1].twinx()
ax2.errorbar(self.wav[i], self.PA_fos[i], xerr=self.wav_err[i].T, yerr=self.PA_fos_err[i], color="r", fmt=".", label="P_fos")
ax2.set_ylabel(r"PA_fos", color="r")
ax2.tick_params(axis="y", color="r", labelcolor="r")
ax2.set_ylim([0.0, 180.0])
if spec_list is None:
spec_list = self.subspec
self.fig, self.ax = plt.subplots(4, 2, sharex=True, sharey="col", figsize=(20, 10), layout="constrained")
for i, (name, title) in enumerate(
[("PASS1", "Pass Direction 1"), ("PASS2", "Pass Direction 2"), ("PASS12", "Pass Direction 1&2"), ("PASS12corr", "Pass Direction 1&2 corrected")]
):
self.ax[i][0].set_title(title)
if fos:
self.ax[i][0] = spec_list[name].plot(ax=self.ax[i][0])
self.ax[i][1].set_xlabel(r"Wavelength [$\AA$]")
self.ax[i][1].errorbar(self.wav[i], self.P_fos[i], xerr=self.wav_err[i].T, yerr=self.P_fos_err[i], color="b", fmt=".", label="P_fos")
self.ax[i][1].set_ylim([0.0, 1.0])
self.ax[i][1].set_ylabel(r"P", color="b")
self.ax[i][1].tick_params(axis="y", color="b", labelcolor="b")
ax22 = self.ax[i][1].twinx()
ax22.errorbar(self.wav[i], self.PA_fos[i], xerr=self.wav_err[i].T, yerr=self.PA_fos_err[i], color="r", fmt=".", label="PA_fos [°]")
ax22.set_ylim([0.0, 180.0])
ax22.set_ylabel(r"PA", color="r")
ax22.tick_params(axis="y", color="r", labelcolor="r")
h2, l2 = self.ax[i][1].get_legend_handles_labels()
h22, l22 = ax22.get_legend_handles_labels()
self.ax[i][1].legend(h2 + h22, l2 + l22, ncols=2, loc=1)
else:
self.ax[i] = spec_list[name].plot(ax=self.ax[i])
self.ax[0][0].set_ylim(ymin=0.0)
self.fig.suptitle("_".join([self.hd["TARGNAME"], str(self.hd["PROPOSID"]), self.hd["FILENAME"], self.hd["APER_ID"]]))
if savename is not None:
@@ -366,13 +364,15 @@ class FOSspecpol(specpol):
"""
Rebin spectra to selected bin size in Angstrom.
"""
self.bin_edges = np.arange(np.floor(self.wav_fos.min()), np.ceil(self.wav_fos.max()), size)
self.in_bin = np.zeros(self.wav_fos.shape)
for i in range(self.wav_fos.shape[0]):
self.in_bin[i] = np.digitize(self.wav_fos[i], self.bin_edges) - 1
key = "{0:.2f}bin".format(size)
if key not in self.subspec.keys():
self.subspec[key] = dict([])
for name in ["PASS1", "PASS2", "PASS12", "PASS12corr"]:
self.subspec[key][name] = self.subspec[name].bin_size(size)
return self.subspec[key]
def main(infiles, output_dir=None):
def main(infiles, bin_size=None, output_dir=None):
outfiles = []
if infiles is not None:
prod = np.array([["/".join(filepath.split("/")[:-1]), filepath.split("/")[-1]] for filepath in infiles], dtype=str)
@@ -398,8 +398,14 @@ def main(infiles, output_dir=None):
print(file_root)
spec = FOSspecpol(file_root, data_folder)
filename = "_".join([spec.hd["TARGNAME"], "FOS", str(spec.hd["PROPOSID"]), spec.file_root, spec.hd["APER_ID"]])
outfiles += spec.dump_txt(filename, output_dir)
outfiles += spec.plot(filename, plots_folder)
if bin_size is not None:
key = "{0:.2f}bin".format(bin_size)
spec.bin_size(bin_size)
outfiles += spec.dump_txt("_".join([filename, key]), spec_list=spec.subspec[key], output_dir=output_dir)
outfiles += spec.plot(savename="_".join([filename, key]), spec_list=spec.subspec[key], plots_folder=plots_folder)
else:
outfiles += spec.dump_txt(filename, output_dir=output_dir)
outfiles += spec.plot(savename=filename, plots_folder=plots_folder)
del spec
plt.show()
@@ -411,9 +417,10 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Display and dump FOS Spectropolarimetry")
parser.add_argument("-f", "--files", metavar="path", required=False, nargs="*", help="the full or relative path to the data products", default=None)
parser.add_argument("-b", "--bin", metavar="bin_size", required=False, help="The bin size to resample spectra", type=float, default=None)
parser.add_argument(
"-o", "--output_dir", metavar="directory_path", required=False, help="output directory path for the data products", type=str, default=None
)
args = parser.parse_args()
exitcode = main(infiles=args.files, output_dir=args.output_dir)
exitcode = main(infiles=args.files, bin_size=args.bin, output_dir=args.output_dir)
print("Written to: ", exitcode)