update readfos for addition and copy, query for fos

This commit is contained in:
2025-01-07 17:50:43 +01:00
parent 729720c5bb
commit f33cb2935d
2 changed files with 329 additions and 90 deletions

View File

@@ -25,11 +25,11 @@ def divide_proposal(products):
"""
for pid in np.unique(products["Proposal ID"]):
obs = products[products["Proposal ID"] == pid].copy()
same_filt = np.unique(np.array(np.sum([obs["Filters"][:, 1:] == filt[1:] for filt in obs["Filters"]], axis=2) < 3, dtype=bool), axis=0)
same_filt = np.unique(np.array(np.sum([obs["Filters"] == filt for filt in obs["Filters"]], axis=2) >= len(obs["Filters"][0]), dtype=bool), axis=0)
if len(same_filt) > 1:
for filt in same_filt:
products["Proposal ID"][np.any([products["Dataset"] == dataset for dataset in obs["Dataset"][filt]], axis=0)] = "_".join(
[obs["Proposal ID"][filt][0], "_".join([fi for fi in obs["Filters"][filt][0][1:] if fi[:-1] != "CLEAR"])]
[obs["Proposal ID"][filt][0], "_".join([fi for fi in obs["Filters"][filt][0] if fi[:-1] != "CLEAR"])]
)
for pid in np.unique(products["Proposal ID"]):
obs = products[products["Proposal ID"] == pid].copy()
@@ -44,38 +44,69 @@ def divide_proposal(products):
return products
def get_product_list(target=None, proposal_id=None):
def get_product_list(target=None, proposal_id=None, instrument="foc"):
"""
Retrieve products list for a given target from the MAST archive
"""
mission = MastMissions(mission="hst")
radius = "3"
select_cols = [
"sci_data_set_name",
"sci_spec_1234",
"sci_actual_duration",
"sci_start_time",
"sci_stop_time",
"sci_central_wavelength",
"sci_instrume",
"sci_aper_1234",
"sci_targname",
"sci_pep_id",
"sci_pi_last_name",
"sci_targname",
"sci_aper_1234",
"sci_spec_1234",
"sci_central_wavelength",
"sci_actual_duration",
"sci_instrume",
"sci_operating_mode",
"sci_data_set_name",
"sci_start_time",
"sci_stop_time",
"sci_refnum",
]
cols = ["Dataset", "Filters", "Exptime", "Start", "Stop", "Central wavelength", "Instrument", "Size", "Target name", "Proposal ID", "PI last name"]
cols = [
"Proposal ID",
"PI last name",
"Target name",
"Aperture",
"Filters",
"Central wavelength",
"Exptime",
"Instrument",
"Operating Mode",
"Dataset",
"Start",
"Stop",
"References",
]
if target is None:
target = input("Target name:\n>")
# Use query_object method to resolve the object name into coordinates
results = mission.query_object(target, radius=radius, select_cols=select_cols, sci_spec_1234="POL*", sci_obs_type="image", sci_aec="S", sci_instrume="foc")
if instrument == "foc":
results = mission.query_object(
target, radius=radius, select_cols=select_cols, sci_spec_1234="POL*", sci_obs_type="image", sci_aec="S", sci_instrume="foc"
)
dataproduct_type = "image"
description = "DADS C0F file - Calibrated exposure WFPC/WFPC2/FOC/FOS/GHRS/HSP"
elif instrument == "fos":
results = mission.query_object(
target, radius=radius, select_cols=select_cols, sci_operating_mode="SPECTROPOLARIMETRY", sci_obs_type="spectrum", sci_aec="S", sci_instrume="fos"
)
dataproduct_type = "spectrum"
description = ["DADS C0F file - Calibrated exposure WFPC/WFPC2/FOC/FOS/GHRS/HSP", "DADS C3F file - Calibrated exposure GHRS/FOS/HSP"]
for c, n_c in zip(select_cols, cols):
results.rename_column(c, n_c)
results["Proposal ID"] = Column(results["Proposal ID"], dtype="U35")
results["Filters"] = Column(np.array([filt.split(";") for filt in results["Filters"]], dtype=str))
if instrument == "foc":
results["POLFilters"] = Column(np.array([filt.split(";")[0] for filt in results["Filters"]], dtype=str))
results["Filters"] = Column(np.array([filt.split(";")[1:] for filt in results["Filters"]], dtype=str))
else:
results["Filters"] = Column(np.array([filt.split(";") for filt in results["Filters"]], dtype=str))
results["Start"] = Column(Time(results["Start"]))
results["Stop"] = Column(Time(results["Stop"]))
@@ -89,20 +120,34 @@ def get_product_list(target=None, proposal_id=None):
to_remove.append(i)
obs.remove_rows(to_remove)
# Remove observations for which a polarization filter is missing
polfilt = {"POL0": 0, "POL60": 1, "POL120": 2}
for pid in np.unique(obs["Proposal ID"]):
used_pol = np.zeros(3)
for dataset in obs[obs["Proposal ID"] == pid]:
used_pol[polfilt[dataset["Filters"][0]]] += 1
if np.any(used_pol < 1):
obs.remove_rows(np.arange(len(obs))[obs["Proposal ID"] == pid])
if instrument == "foc":
polfilt = {"POL0": 0, "POL60": 1, "POL120": 2}
for pid in np.unique(obs["Proposal ID"]):
used_pol = np.zeros(3)
for dataset in obs[obs["Proposal ID"] == pid]:
used_pol[polfilt[dataset["POLFilters"]]] += 1
if np.any(used_pol < 1):
obs.remove_rows(np.arange(len(obs))[obs["Proposal ID"] == pid])
# Remove observations for which a spectropolarization has not been reduced
if instrument == "fos":
for pid in np.unique(obs["Proposal ID"]):
observations = Observations.query_criteria(proposal_id=pid.split("_")[0])
c3prod = Observations.filter_products(
Observations.get_product_list(observations),
productType=["SCIENCE"],
dataproduct_type=dataproduct_type,
calib_level=[2],
description=description[1],
)
if len(c3prod) < 1:
obs.remove_rows(np.arange(len(obs))[obs["Proposal ID"] == pid])
tab = unique(obs, ["Target name", "Proposal ID"])
obs["Obs"] = [np.argmax(np.logical_and(tab["Proposal ID"] == data["Proposal ID"], tab["Target name"] == data["Target name"])) + 1 for data in obs]
try:
n_obs = unique(obs[["Obs", "Filters", "Start", "Central wavelength", "Instrument", "Size", "Target name", "Proposal ID", "PI last name"]], "Obs")
n_obs = unique(obs[["Obs", "Filters", "Start", "Central wavelength", "Instrument", "Aperture", "Target name", "Proposal ID", "PI last name"]], "Obs")
except IndexError:
raise ValueError("There is no observation with POL0, POL60 and POL120 for {0:s} in HST/FOC Legacy Archive".format(target))
raise ValueError("There is no observation with polarimetry for {0:s} in HST/{1:s} Legacy Archive".format(target, instrument.upper()))
b = np.zeros(len(results), dtype=bool)
if proposal_id is not None and str(proposal_id) in obs["Proposal ID"]:
@@ -130,29 +175,27 @@ def get_product_list(target=None, proposal_id=None):
products = Observations.filter_products(
Observations.get_product_list(observations),
productType=["SCIENCE"],
dataproduct_type=["image"],
dataproduct_type=dataproduct_type,
calib_level=[2],
description="DADS C0F file - Calibrated exposure WFPC/WFPC2/FOC/FOS/GHRS/HSP",
description=description,
)
products["proposal_id"] = Column(products["proposal_id"], dtype="U35")
products["target_name"] = Column(observations["target_name"])
for prod in products:
prod["proposal_id"] = results["Proposal ID"][results["Dataset"] == prod["productFilename"][: len(results["Dataset"][0])].upper()][0]
for prod in products:
prod["target_name"] = observations["target_name"][observations["obsid"] == prod["obsID"]][0]
tab = unique(products, ["target_name", "proposal_id"])
tab = unique(products, "proposal_id")
products["Obs"] = [np.argmax(np.logical_and(tab["proposal_id"] == data["proposal_id"], tab["target_name"] == data["target_name"])) + 1 for data in products]
products["Obs"] = [np.argmax(tab["proposal_id"] == data["proposal_id"]) + 1 for data in products]
return target, products
def retrieve_products(target=None, proposal_id=None, output_dir="./data"):
def retrieve_products(target=None, proposal_id=None, instrument="foc", output_dir="./data"):
"""
Given a target name and a proposal_id, create the local directories and retrieve the fits files from the MAST Archive
"""
target, products = get_product_list(target=target, proposal_id=proposal_id)
target, products = get_product_list(target=target, proposal_id=proposal_id, instrument=instrument)
prodpaths = []
# data_dir = path_join(output_dir, target)
out = ""
@@ -183,10 +226,11 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Query MAST for target products")
parser.add_argument("-t", "--target", metavar="targetname", required=False, help="the name of the target", type=str, default=None)
parser.add_argument("-p", "--proposal_id", metavar="proposal_id", required=False, help="the proposal id of the data products", type=int, default=None)
parser.add_argument("-i", "--instrum", metavar="instrum", required=False, help="the instrument used for observation", type=str, default="foc")
parser.add_argument(
"-o", "--output_dir", metavar="directory_path", required=False, help="output directory path for the data products", type=str, default="./data"
)
args = parser.parse_args()
print(args.target)
prodpaths = retrieve_products(target=args.target, proposal_id=args.proposal_id)
prodpaths = retrieve_products(target=args.target, proposal_id=args.proposal_id, instrument=args.instrum.lower(), output_dir=args.output_dir)
print(prodpaths)

View File

@@ -51,15 +51,16 @@ class specpol(object):
"""
def __init__(self, other=None):
if isinstance(other, specpol):
if isinstance(other, __class__):
# Copy constructor
self.hd = deepcopy(other.hd)
self.wav = deepcopy(other.wav)
self.wav_err = deepcopy(other.wav_err)
self.I = deepcopy(other.I)
self.Q = deepcopy(other.Q)
self.U = deepcopy(other.U)
self.V = deepcopy(other.V)
self.IQU_cov = deepcopy(other.IQU_cov)
self.IQUV_cov = deepcopy(other.IQUV_cov)
else:
# Initialise to zero
if isinstance(other, int):
@@ -72,29 +73,52 @@ class specpol(object):
"""
Set all values to zero.
"""
self.hd = dict([])
self.wav = np.zeros(n)
self.wav_err = np.zeros((n, 2))
self.I = np.zeros(n)
self.Q = np.zeros(n)
self.U = np.zeros(n)
self.V = np.zeros(n)
self.IQU_cov = np.zeros((4, 4, n))
self.IQUV_cov = np.zeros((4, 4, n))
@property
def wav_rest(self, z=None):
if z is None and "REDSHIFT" not in self.hd.keys():
from astroquery.ipac.ned import Ned
z = Ned.query_object(self.hd["TARGNAME"])["Redshift"][0]
self.hd["REDSHIFT"] = z
elif z is None:
z = self.hd["REDSHIFT"]
return self.wav / (z + 1)
@property
def wav_rest_err(self, z=None):
if z is None and "REDSHIFT" not in self.hd.keys():
from astroquery.ipac.ned import Ned
z = Ned.query_object(self.hd["TARGNAME"])["Redshift"][0]
self.hd["REDSHIFT"] = z
elif z is None:
z = self.hd["REDSHIFT"]
return self.wav_err / (z + 1)
@property
def I_err(self):
return np.sqrt(self.IQU_cov[0][0])
return np.sqrt(self.IQUV_cov[0][0])
@property
def Q_err(self):
return np.sqrt(self.IQU_cov[1][1])
return np.sqrt(self.IQUV_cov[1][1])
@property
def U_err(self):
return np.sqrt(self.IQU_cov[2][2])
return np.sqrt(self.IQUV_cov[2][2])
@property
def V_err(self):
return np.sqrt(self.IQU_cov[3][3])
return np.sqrt(self.IQUV_cov[3][3])
@property
def QN(self):
@@ -165,13 +189,12 @@ class specpol(object):
]
)
self.I, self.Q, self.U, self.V = np.dot(mrot, np.array([self.I, self.Q, self.U, self.V]))
self.IQU_cov = np.dot(mrot, np.dot(self.IQU_cov.T, mrot.T).T)
self.IQUV_cov = np.dot(mrot, np.dot(self.IQUV_cov.T, mrot.T).T)
def bin_size(self, size):
def bin(self, bin_edges):
"""
Rebin spectra to selected bin size in Angstrom.
Rebin spectra to given list of bin edges.
"""
bin_edges = np.arange(np.floor(self.wav.min()), np.ceil(self.wav.max()), size)
in_bin = np.digitize(self.wav, bin_edges) - 1
spec_b = specpol(bin_edges.shape[0] - 1)
for i in range(bin_edges.shape[0] - 1):
@@ -183,11 +206,22 @@ class specpol(object):
spec_b.U[i] = np.sum(self.U[in_bin == i])
spec_b.V[i] = np.sum(self.V[in_bin == i])
for m in range(4):
spec_b.IQU_cov[m][m][i] = np.sum(self.IQU_cov[m][m][in_bin == i])
spec_b.IQUV_cov[m][m][i] = np.sum(self.IQUV_cov[m][m][in_bin == i])
for n in [k for k in range(4) if k != m]:
spec_b.IQU_cov[m][n][i] = np.sqrt(np.sum(self.IQU_cov[m][n][in_bin == i] ** 2))
spec_b.IQUV_cov[m][n][i] = np.sqrt(np.sum(self.IQUV_cov[m][n][in_bin == i] ** 2))
spec_b.hd = deepcopy(self.hd)
spec_b.hd["NAXIS1"] = bin_edges.shape[0] - 1
spec_b.hd["DATAMIN"], spec_b.hd["DATAMAX"] = spec_b.I.min(), spec_b.I.max()
spec_b.hd["MINWAV"], spec_b.hd["MAXWAV"] = spec_b.wav.min(), spec_b.wav.max()
return spec_b
def bin_size(self, size):
"""
Rebin spectra to selected bin size in Angstrom.
"""
bin_edges = np.arange(np.floor(self.wav.min()), np.ceil(self.wav.max()), size)
return self.bin(bin_edges)
def dump_txt(self, filename, output_dir=""):
"""
Dump current spectra to a text file.
@@ -196,7 +230,7 @@ class specpol(object):
np.savetxt(join_path(output_dir, filename + ".txt"), data_dump)
return join_path(output_dir, filename)
def plot(self, fig=None, ax=None, savename=None, plots_folder=""):
def plot(self, fig=None, ax=None, rest=True, savename=None, plots_folder=""):
"""
Display current spectra.
"""
@@ -212,27 +246,33 @@ class specpol(object):
else:
self.fig = fig
self.ax = ax
if isinstance(ax, np.ndarray):
if isinstance(self.ax, np.ndarray):
ax1, ax2 = self.ax[:2]
else:
ax1 = self.ax
if rest:
wav, wav_err = self.wav_rest, self.wav_rest_err
rest_str = "Rest "
else:
wav, wav_err = self.wav, self.wav_rest
rest_str = ""
# Display flux and polarized flux on first ax
ax1.set_xlabel(r"Wavelength [$\AA$]")
ax1.errorbar(self.wav, self.I, xerr=self.wav_err.T, yerr=self.I_err, color="k", fmt=".", label="I")
ax1.errorbar(self.wav, self.PF, xerr=self.wav_err.T, yerr=self.PF_err, color="b", fmt=".", label="PF")
ax1.set_xlabel(rest_str + r"Wavelength [$\AA$]")
ax1.errorbar(wav, self.I, xerr=wav_err.T, yerr=self.I_err, color="k", fmt=".", label="I")
ax1.errorbar(wav, self.PF, xerr=wav_err.T, yerr=self.PF_err, color="b", fmt=".", label="PF")
ax1.set_ylabel(r"F$_\lambda$ [erg s$^{-1}$ cm$^{-2} \AA^{-1}$]")
ax1.legend(ncols=2, loc=1)
if isinstance(ax, np.ndarray):
if isinstance(self.ax, np.ndarray):
# When given 2 axes, display P and PA on second
ax2.set_xlabel(r"Wavelength [$\AA$]")
ax2.errorbar(self.wav, self.P, xerr=self.wav_err.T, yerr=self.P_err, color="b", fmt=".", label="P")
ax2.set_xlabel(rest_str + r"Wavelength [$\AA$]")
ax2.errorbar(wav, self.P, xerr=wav_err.T, yerr=self.P_err, color="b", fmt=".", label="P")
ax2.set_ylim([0.0, 1.0])
ax2.set_ylabel(r"P", color="b")
ax2.tick_params(axis="y", color="b", labelcolor="b")
ax22 = ax2.twinx()
ax22.errorbar(self.wav, self.PA, xerr=self.wav_err.T, yerr=self.PA_err, color="r", fmt=".", label="PA [°]")
ax22.errorbar(wav, self.PA, xerr=wav_err.T, yerr=self.PA_err, color="r", fmt=".", label="PA [°]")
ax22.set_ylim([0.0, 180.0])
ax22.set_ylabel(r"PA", color="r")
ax22.tick_params(axis="y", color="r", labelcolor="r")
@@ -248,6 +288,44 @@ class specpol(object):
else:
return self.ax
def __add__(self, other):
"""
Spectra addition, if not same binning default to self bins.
"""
spec_a = specpol(self)
if np.all(self.wav == other.wav):
spec_b = other
else:
bin_edges = np.zeros(spec_a.wav.shape[0] + 1)
bin_edges[:-1], bin_edges[-1] = spec_a.wav - spec_a.wav_err[:, 0], spec_a.wav[-1] + spec_a.wav_err[-1:1]
spec_b = other.bin(bin_edges=bin_edges)
spec_a.I += deepcopy(spec_b.I)
spec_a.Q += deepcopy(spec_b.Q)
spec_a.U += deepcopy(spec_b.U)
spec_a.V += deepcopy(spec_b.V)
spec_a.IQUV_cov += deepcopy(spec_b.IQUV_cov)
spec_a.hd["DATAMIN"], spec_a.hd["DATAMAX"] = spec_a.I.min(), spec_a.I.max()
spec_a.hd["EXPTIME"] += spec_b.hd["EXPTIME"]
spec_a.hd["ROOTNAME"] += "+" + spec_b.hd["ROOTNAME"]
return spec_a
def __deepcopy__(self, memo={}):
spec = specpol(self.wav.shape[0])
spec.__dict__.update(self.__dict__)
spec.hd = deepcopy(self.hd, memo)
spec.wav = deepcopy(self.wav, memo)
spec.wav_err = deepcopy(self.wav_err, memo)
spec.I = deepcopy(self.I, memo)
spec.Q = deepcopy(self.Q, memo)
spec.U = deepcopy(self.U, memo)
spec.V = deepcopy(self.V, memo)
spec.IQUV_cov = deepcopy(self.IQUV_cov, memo)
return spec
class FOSspecpol(specpol):
"""
@@ -256,41 +334,99 @@ class FOSspecpol(specpol):
def __init__(self, stokes, data_folder=""):
"""
Initialise object from fits filename or hdulist.
Initialise object from fits filename, fits hdulist or copy.
"""
if isinstance(stokes, __class__):
# Copy constructor
self.rootname = deepcopy(stokes.rootname)
self.hd = deepcopy(stokes.hd)
self.wav = deepcopy(stokes.wav)
self.wav_err = deepcopy(stokes.wav_err)
self.I = deepcopy(stokes.I)
self.Q = deepcopy(stokes.Q)
self.U = deepcopy(stokes.U)
self.V = deepcopy(stokes.V)
self.IQUV_cov = deepcopy(stokes.IQUV_cov)
self.P_fos = deepcopy(stokes.P_fos)
self.P_fos_err = deepcopy(stokes.P_fos_err)
self.PC_fos = deepcopy(stokes.PC_fos)
self.PC_fos_err = deepcopy(stokes.PC_fos_err)
self.PA_fos = deepcopy(stokes.PA_fos)
self.PA_fos_err = deepcopy(stokes.PA_fos_err)
self.subspec = {}
for name in ["PASS1", "PASS2", "PASS12", "PASS12corr"]:
spec = deepcopy(stokes.subspec[name])
self.subspec[name] = spec
elif stokes is None or isinstance(stokes, int):
self.zero(n=stokes)
else:
self.from_file(stokes, data_folder=data_folder)
@classmethod
def zero(self, n=1):
"""
Set all values to zero.
"""
self.rootname = ""
self.hd = dict([])
self.wav = np.zeros((4, n))
self.wav_err = np.zeros((4, n, 2))
self.I = np.zeros((4, n))
self.Q = np.zeros((4, n))
self.U = np.zeros((4, n))
self.V = np.zeros((4, n))
self.IQUV_cov = np.zeros((4, 4, 4, n))
self.subspec = {}
for i, name in enumerate(["PASS1", "PASS2", "PASS12", "PASS12corr"]):
spec = specpol(n)
spec.hd, spec.wav, spec.wav_err, spec.I, spec.Q, spec.U, spec.V = self.hd, self.wav[i], self.wav_err[i], self.I[i], self.Q[i], self.U[i], self.V[i]
spec.IQUV_cov = self.IQUV_cov[:, :, i, :]
self.subspec[name] = spec
self.P_fos = np.zeros(self.I.shape)
self.P_fos_err = np.zeros(self.I.shape)
self.PC_fos = np.zeros(self.I.shape)
self.PC_fos_err = np.zeros(self.I.shape)
self.PA_fos = np.zeros(self.I.shape)
self.PA_fos_err = np.zeros(self.I.shape)
def from_file(self, stokes, data_folder=""):
"""
Initialise object from fits file or hdulist.
"""
if isinstance(stokes, str):
self.file_root = stokes.split("_")[0]
self.hd = getheader(join_path(data_folder, self.file_root + "_c0f.fits"))
wav = getdata(join_path(data_folder, self.file_root + "_c0f.fits"))
stokes = getdata(join_path(data_folder, self.file_root + "_c3f.fits"))
self.rootname = stokes.split("_")[0]
self.hd = dict(getheader(join_path(data_folder, self.rootname + "_c0f.fits")))
wav = getdata(join_path(data_folder, self.rootname + "_c0f.fits"))
stokes = getdata(join_path(data_folder, self.rootname + "_c3f.fits"))
elif isinstance(stokes, hdu.hdulist.HDUList):
self.hd = stokes.header
self.file_root = self.hd["FILENAME"].split("_")[0]
wav = getdata(join_path(data_folder, self.file_root + "_c0f"))
self.hd = dict(stokes.header)
self.rootname = self.hd["FILENAME"].split("_")[0]
wav = getdata(join_path(data_folder, self.rootname + "_c0f"))
stokes = stokes.data
else:
raise ValueError("Input must be a path to a fits file or an HDUlist")
self.wav = np.concat((wav[0:2, :], wav[0].reshape(1, wav.shape[1]), wav[0].reshape(1, wav.shape[1])), axis=0)
self.wav_err = np.zeros((self.wav.shape[0], self.wav.shape[1], 2))
self.IQU_cov = np.zeros((4, 4, self.wav.shape[0], self.wav.shape[1]))
self.IQUV_cov = np.zeros((4, 4, self.wav.shape[0], self.wav.shape[1]))
self.I = stokes[0::14]
self.IQU_cov[0, 0] = stokes[4::14] ** 2
self.IQUV_cov[0, 0] = stokes[4::14] ** 2
self.Q = stokes[1::14]
self.IQU_cov[1, 1] = stokes[5::14] ** 2
self.IQUV_cov[1, 1] = stokes[5::14] ** 2
self.U = stokes[2::14]
self.IQU_cov[2, 2] = stokes[6::14] ** 2
self.IQUV_cov[2, 2] = stokes[6::14] ** 2
self.V = stokes[3::14]
self.IQU_cov[3, 3] = stokes[7::14] ** 2
self.IQUV_cov[3, 3] = stokes[7::14] ** 2
self.subspec = {}
for i, name in enumerate(["PASS1", "PASS2", "PASS12", "PASS12corr"]):
spec = specpol(self.wav[i].shape[0])
spec.wav, spec.wav_err, spec.I, spec.Q, spec.U, spec.V = self.wav[i], self.wav_err[i], self.I[i], self.Q[i], self.U[i], self.V[i]
spec.IQU_cov = self.IQU_cov[:, :, i, :]
spec.rotate((i != 3) * self.hd["PA_APER"])
spec.hd, spec.wav, spec.wav_err, spec.I, spec.Q, spec.U, spec.V = self.hd, self.wav[i], self.wav_err[i], self.I[i], self.Q[i], self.U[i], self.V[i]
spec.IQUV_cov = self.IQUV_cov[:, :, i, :]
spec.rotate(-(name[-4:] != "corr") * spec.hd["PA_APER"])
self.subspec[name] = spec
self.P_fos = stokes[8::14]
@@ -302,12 +438,6 @@ class FOSspecpol(specpol):
)
self.PA_fos_err = princ_angle(np.degrees(stokes[13::14]))
def __del__(self):
if hasattr(self, "ax"):
del self.ax
if hasattr(self, "fig"):
del self.fig
def dump_txt(self, filename, spec_list=None, output_dir=""):
"""
Dump current spectra to a text file.
@@ -319,7 +449,7 @@ class FOSspecpol(specpol):
outfiles.append(spec_list[name].dump_txt(filename="_".join([filename, name]), output_dir=output_dir))
return outfiles
def plot(self, spec_list=None, savename=None, plots_folder="", fos=False):
def plot(self, spec_list=None, rest=True, savename=None, plots_folder="", fos=False):
"""
Display current spectra in single figure.
"""
@@ -336,14 +466,20 @@ class FOSspecpol(specpol):
):
self.ax[i][0].set_title(title)
if fos:
self.ax[i][0] = spec_list[name].plot(ax=self.ax[i][0])
self.ax[i][1].set_xlabel(r"Wavelength [$\AA$]")
self.ax[i][1].errorbar(self.wav[i], self.P_fos[i], xerr=self.wav_err[i].T, yerr=self.P_fos_err[i], color="b", fmt=".", label="P_fos")
if rest:
wav, wav_err = self.wav_rest, self.wav_rest_err
rest_str = "Rest "
else:
wav, wav_err = self.wav, self.wav_rest
rest_str = ""
self.ax[i][0] = spec_list[name].plot(ax=self.ax[i][0], rest=rest)
self.ax[i][1].set_xlabel(rest_str + r"Wavelength [$\AA$]")
self.ax[i][1].errorbar(wav[i], self.P_fos[i], xerr=wav_err[i].T, yerr=self.P_fos_err[i], color="b", fmt=".", label="P_fos")
self.ax[i][1].set_ylim([0.0, 1.0])
self.ax[i][1].set_ylabel(r"P", color="b")
self.ax[i][1].tick_params(axis="y", color="b", labelcolor="b")
ax22 = self.ax[i][1].twinx()
ax22.errorbar(self.wav[i], self.PA_fos[i], xerr=self.wav_err[i].T, yerr=self.PA_fos_err[i], color="r", fmt=".", label="PA_fos [°]")
ax22.errorbar(wav[i], self.PA_fos[i], xerr=wav_err[i].T, yerr=self.PA_fos_err[i], color="r", fmt=".", label="PA_fos [°]")
ax22.set_ylim([0.0, 180.0])
ax22.set_ylabel(r"PA", color="r")
ax22.tick_params(axis="y", color="r", labelcolor="r")
@@ -354,7 +490,7 @@ class FOSspecpol(specpol):
self.ax[i] = spec_list[name].plot(ax=self.ax[i])
self.ax[0][0].set_ylim(ymin=0.0)
self.fig.suptitle("_".join([self.hd["TARGNAME"], str(self.hd["PROPOSID"]), self.hd["FILENAME"], self.hd["APER_ID"]]))
self.fig.suptitle("_".join([self.hd["TARGNAME"], str(self.hd["PROPOSID"]), self.hd["ROOTNAME"], self.hd["APER_ID"]]))
if savename is not None:
self.fig.savefig(join_path(plots_folder, savename + ".pdf"), dpi=300, bbox_inches="tight")
outfiles.append(join_path(plots_folder, savename + ".pdf"))
@@ -371,6 +507,49 @@ class FOSspecpol(specpol):
self.subspec[key][name] = self.subspec[name].bin_size(size)
return self.subspec[key]
def __add__(self, other):
"""
Spectra addition, if not same binning default to self bins.
"""
spec_a = FOSspecpol(self)
if np.all(self.wav == other.wav):
spec_b = other
else:
bin_edges = np.zeros(spec_a.wav.shape[0] + 1)
bin_edges[:-1], bin_edges[-1] = spec_a.wav - spec_a.wav_err[:, 0], spec_a.wav[-1] + spec_a.wav_err[-1:1]
spec_b = other.bin(bin_edges=bin_edges)
spec_a.I += deepcopy(spec_b.I)
spec_a.Q += deepcopy(spec_b.Q)
spec_a.U += deepcopy(spec_b.U)
spec_a.V += deepcopy(spec_b.V)
spec_a.IQUV_cov += deepcopy(spec_b.IQUV_cov)
for name in ["PASS1", "PASS2", "PASS12", "PASS12corr"]:
spec_a.subspec[name] += deepcopy(spec_b.subspec[name])
spec_a.subspec[name].hd["DATAMIN"], spec_a.subspec[name].hd["DATAMAX"] = spec_a.subspec[name].I.min(), spec_a.subspec[name].I.max()
spec_a.subspec[name].hd["EXPTIME"] += spec_b.subspec[name].hd["EXPTIME"]
spec_a.subspec[name].hd["ROOTNAME"] += "+" + spec_b.subspec[name].hd["ROOTNAME"]
spec_a.hd["DATAMIN"], spec_a.hd["DATAMAX"] = spec_a.I.min(), spec_a.I.max()
spec_a.hd["EXPTIME"] += spec_b.hd["EXPTIME"]
spec_a.hd["ROOTNAME"] += "+" + spec_b.hd["ROOTNAME"]
return spec_a
def __deepcopy__(self, memo):
spec = FOSspecpol(self.wav.shape[0])
spec.__dict__.update(self.__dict__)
for key in self.subspec.keys():
spec.subspec[key] = deepcopy(self.subspec[key])
return spec
def __del__(self):
if hasattr(self, "ax"):
del self.ax
if hasattr(self, "fig"):
del self.fig
def main(infiles, bin_size=None, output_dir=None):
outfiles = []
@@ -394,19 +573,35 @@ def main(infiles, bin_size=None, output_dir=None):
infiles = [p[1] for p in prod]
roots = np.unique([file.split("_")[0] for file in infiles])
for file_root in roots:
print(file_root)
spec = FOSspecpol(file_root, data_folder)
filename = "_".join([spec.hd["TARGNAME"], "FOS", str(spec.hd["PROPOSID"]), spec.file_root, spec.hd["APER_ID"]])
aper = dict([])
for rootname in roots:
print(rootname)
spec = FOSspecpol(rootname, data_folder)
filename = "_".join([spec.hd["TARGNAME"], "FOS", str(spec.hd["PROPOSID"]), spec.rootname, spec.hd["APER_ID"]])
if bin_size is not None:
key = "{0:.2f}bin".format(bin_size)
spec.bin_size(bin_size)
outfiles += spec.dump_txt("_".join([filename, key]), spec_list=spec.subspec[key], output_dir=output_dir)
outfiles += spec.plot(savename="_".join([filename, key]), spec_list=spec.subspec[key], plots_folder=plots_folder)
if hasattr(aper, spec.hd["APER_ID"]):
aper[spec.hd["APER_ID"]].append(spec.subspec[key]["PASS12corr"])
else:
aper[spec.hd["APER_ID"]] = [spec.subspec[key]["PASS12corr"]]
else:
outfiles += spec.dump_txt(filename, output_dir=output_dir)
outfiles += spec.plot(savename=filename, plots_folder=plots_folder)
del spec
if hasattr(aper, spec.hd["APER_ID"]):
aper[spec.hd["APER_ID"]].append(spec.subspec["PASS12corr"])
else:
aper[spec.hd["APER_ID"]] = [spec.subspec["PASS12corr"]]
plt.close("all")
for key in aper.keys():
filename = "_".join([spec.hd["TARGNAME"], "FOS", str(spec.hd["PROPOSID"]), "SUM"])
if bin_size is not None:
filename += "_{0:.2f}bin".format(bin_size)
spec = np.sum(aper[key])
outfiles.append(spec.dump_txt("_".join([filename, key]), output_dir=output_dir))
outfiles.append(spec.plot(savename="_".join([filename, key]), plots_folder=plots_folder)[2])
plt.show()
return outfiles