modify files to comply with pep8 format
This commit is contained in:
@@ -15,9 +15,8 @@ import numpy as np
|
||||
from os.path import join as path_join
|
||||
from astropy.io import fits
|
||||
from astropy.wcs import WCS
|
||||
from lib.convex_hull import image_hull, clean_ROI
|
||||
from lib.convex_hull import clean_ROI
|
||||
from lib.plots import princ_angle
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
||||
def get_obs_data(infiles, data_folder="", compute_flux=False):
|
||||
@@ -42,29 +41,29 @@ def get_obs_data(infiles, data_folder="", compute_flux=False):
|
||||
"""
|
||||
data_array, headers = [], []
|
||||
for i in range(len(infiles)):
|
||||
with fits.open(path_join(data_folder,infiles[i])) as f:
|
||||
with fits.open(path_join(data_folder, infiles[i])) as f:
|
||||
headers.append(f[0].header)
|
||||
data_array.append(f[0].data)
|
||||
data_array = np.array(data_array,dtype=np.double)
|
||||
data_array = np.array(data_array, dtype=np.double)
|
||||
|
||||
# Prevent negative count value in imported data
|
||||
for i in range(len(data_array)):
|
||||
data_array[i][data_array[i] < 0.] = 0.
|
||||
|
||||
|
||||
# force WCS to convention PCi_ja unitary, cdelt in deg
|
||||
for header in headers:
|
||||
new_wcs = WCS(header).deepcopy()
|
||||
if new_wcs.wcs.has_cd() or (new_wcs.wcs.cdelt[:2] == np.array([1., 1.])).all():
|
||||
# Update WCS with relevant information
|
||||
if new_wcs.wcs.has_cd():
|
||||
old_cd = new_wcs.wcs.cd[:2,:2]
|
||||
old_cd = new_wcs.wcs.cd[:2, :2]
|
||||
del new_wcs.wcs.cd
|
||||
keys = list(new_wcs.to_header().keys())+['CD1_1','CD1_2','CD2_1','CD2_2']
|
||||
keys = list(new_wcs.to_header().keys())+['CD1_1', 'CD1_2', 'CD2_1', 'CD2_2']
|
||||
for key in keys:
|
||||
header.remove(key, ignore_missing=True)
|
||||
new_cdelt = np.linalg.eig(old_cd)[0]
|
||||
elif (new_wcs.wcs.cdelt == np.array([1., 1.])).all() and \
|
||||
(new_wcs.array_shape in [(512, 512),(1024,512),(512,1024),(1024,1024)]):
|
||||
(new_wcs.array_shape in [(512, 512), (1024, 512), (512, 1024), (1024, 1024)]):
|
||||
old_cd = new_wcs.wcs.pc
|
||||
new_wcs.wcs.pc = np.dot(old_cd, np.diag(1./new_cdelt))
|
||||
new_wcs.wcs.cdelt = new_cdelt
|
||||
@@ -73,14 +72,14 @@ def get_obs_data(infiles, data_folder="", compute_flux=False):
|
||||
header['orientat'] = princ_angle(float(header['orientat']))
|
||||
|
||||
# force WCS for POL60 to have same pixel size as POL0 and POL120
|
||||
is_pol60 = np.array([head['filtnam1'].lower()=='pol60' for head in headers],dtype=bool)
|
||||
cdelt = np.round(np.array([WCS(head).wcs.cdelt for head in headers]),14)
|
||||
if np.unique(cdelt[np.logical_not(is_pol60)],axis=0).size!=2:
|
||||
print(np.unique(cdelt[np.logical_not(is_pol60)],axis=0))
|
||||
is_pol60 = np.array([head['filtnam1'].lower() == 'pol60' for head in headers], dtype=bool)
|
||||
cdelt = np.round(np.array([WCS(head).wcs.cdelt for head in headers]), 14)
|
||||
if np.unique(cdelt[np.logical_not(is_pol60)], axis=0).size != 2:
|
||||
print(np.unique(cdelt[np.logical_not(is_pol60)], axis=0))
|
||||
raise ValueError("Not all images have same pixel size")
|
||||
else:
|
||||
for i in np.arange(len(headers))[is_pol60]:
|
||||
headers[i]['cdelt1'],headers[i]['cdelt2'] = np.unique(cdelt[np.logical_not(is_pol60)],axis=0)[0]
|
||||
headers[i]['cdelt1'], headers[i]['cdelt2'] = np.unique(cdelt[np.logical_not(is_pol60)], axis=0)[0]
|
||||
|
||||
if compute_flux:
|
||||
for i in range(len(infiles)):
|
||||
@@ -91,8 +90,8 @@ def get_obs_data(infiles, data_folder="", compute_flux=False):
|
||||
|
||||
|
||||
def save_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, P, debiased_P, s_P,
|
||||
s_P_P, PA, s_PA, s_PA_P, headers, data_mask, filename, data_folder="",
|
||||
return_hdul=False):
|
||||
s_P_P, PA, s_PA, s_PA_P, headers, data_mask, filename, data_folder="",
|
||||
return_hdul=False):
|
||||
"""
|
||||
Save computed polarimetry parameters to a single fits file,
|
||||
updating header accordingly.
|
||||
@@ -127,12 +126,12 @@ def save_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, P, debiased_P, s_P,
|
||||
informations (WCS, orientation, data_type).
|
||||
Only returned if return_hdul is True.
|
||||
"""
|
||||
#Create new WCS object given the modified images
|
||||
# Create new WCS object given the modified images
|
||||
ref_header = headers[0]
|
||||
exp_tot = np.array([header['exptime'] for header in headers]).sum()
|
||||
new_wcs = WCS(ref_header).deepcopy()
|
||||
|
||||
if data_mask.shape != (1,1):
|
||||
|
||||
if data_mask.shape != (1, 1):
|
||||
vertex = clean_ROI(data_mask)
|
||||
shape = vertex[1::2]-vertex[0::2]
|
||||
new_wcs.array_shape = shape
|
||||
@@ -153,56 +152,56 @@ def save_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, P, debiased_P, s_P,
|
||||
header['PA_int'] = (ref_header['PA_int'], 'Integrated polarisation angle')
|
||||
header['PA_int_err'] = (ref_header['PA_int_err'], 'Integrated polarisation angle error')
|
||||
|
||||
#Crop Data to mask
|
||||
if data_mask.shape != (1,1):
|
||||
I_stokes = I_stokes[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
Q_stokes = Q_stokes[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
U_stokes = U_stokes[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
P = P[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
debiased_P = debiased_P[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
s_P = s_P[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
s_P_P = s_P_P[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
PA = PA[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
s_PA = s_PA[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
s_PA_P = s_PA_P[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
|
||||
new_Stokes_cov = np.zeros((*Stokes_cov.shape[:-2],*shape[::-1]))
|
||||
# Crop Data to mask
|
||||
if data_mask.shape != (1, 1):
|
||||
I_stokes = I_stokes[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
Q_stokes = Q_stokes[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
U_stokes = U_stokes[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
P = P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
debiased_P = debiased_P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
s_P = s_P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
s_P_P = s_P_P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
PA = PA[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
s_PA = s_PA[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
s_PA_P = s_PA_P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
|
||||
new_Stokes_cov = np.zeros((*Stokes_cov.shape[:-2], *shape[::-1]))
|
||||
for i in range(3):
|
||||
for j in range(3):
|
||||
Stokes_cov[i,j][(1-data_mask).astype(bool)] = 0.
|
||||
new_Stokes_cov[i,j] = Stokes_cov[i,j][vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
Stokes_cov[i, j][(1-data_mask).astype(bool)] = 0.
|
||||
new_Stokes_cov[i, j] = Stokes_cov[i, j][vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
Stokes_cov = new_Stokes_cov
|
||||
|
||||
data_mask = data_mask[vertex[2]:vertex[3],vertex[0]:vertex[1]]
|
||||
|
||||
data_mask = data_mask[vertex[2]:vertex[3], vertex[0]:vertex[1]]
|
||||
data_mask = data_mask.astype(float, copy=False)
|
||||
|
||||
#Create HDUList object
|
||||
# Create HDUList object
|
||||
hdul = fits.HDUList([])
|
||||
|
||||
#Add I_stokes as PrimaryHDU
|
||||
# Add I_stokes as PrimaryHDU
|
||||
header['datatype'] = ('I_stokes', 'type of data stored in the HDU')
|
||||
I_stokes[(1-data_mask).astype(bool)] = 0.
|
||||
primary_hdu = fits.PrimaryHDU(data=I_stokes, header=header)
|
||||
primary_hdu.name = 'I_stokes'
|
||||
hdul.append(primary_hdu)
|
||||
|
||||
#Add Q, U, Stokes_cov, P, s_P, PA, s_PA to the HDUList
|
||||
for data, name in [[Q_stokes,'Q_stokes'],[U_stokes,'U_stokes'],
|
||||
[Stokes_cov,'IQU_cov_matrix'],[P, 'Pol_deg'],
|
||||
[debiased_P, 'Pol_deg_debiased'],[s_P, 'Pol_deg_err'],
|
||||
[s_P_P, 'Pol_deg_err_Poisson_noise'],[PA, 'Pol_ang'],
|
||||
[s_PA, 'Pol_ang_err'],[s_PA_P, 'Pol_ang_err_Poisson_noise'],
|
||||
[data_mask, 'Data_mask']]:
|
||||
# Add Q, U, Stokes_cov, P, s_P, PA, s_PA to the HDUList
|
||||
for data, name in [[Q_stokes, 'Q_stokes'], [U_stokes, 'U_stokes'],
|
||||
[Stokes_cov, 'IQU_cov_matrix'], [P, 'Pol_deg'],
|
||||
[debiased_P, 'Pol_deg_debiased'], [s_P, 'Pol_deg_err'],
|
||||
[s_P_P, 'Pol_deg_err_Poisson_noise'], [PA, 'Pol_ang'],
|
||||
[s_PA, 'Pol_ang_err'], [s_PA_P, 'Pol_ang_err_Poisson_noise'],
|
||||
[data_mask, 'Data_mask']]:
|
||||
hdu_header = header.copy()
|
||||
hdu_header['datatype'] = name
|
||||
if not name == 'IQU_cov_matrix':
|
||||
data[(1-data_mask).astype(bool)] = 0.
|
||||
hdu = fits.ImageHDU(data=data,header=hdu_header)
|
||||
hdu = fits.ImageHDU(data=data, header=hdu_header)
|
||||
hdu.name = name
|
||||
hdul.append(hdu)
|
||||
|
||||
#Save fits file to designated filepath
|
||||
hdul.writeto(path_join(data_folder,filename+".fits"), overwrite=True)
|
||||
# Save fits file to designated filepath
|
||||
hdul.writeto(path_join(data_folder, filename+".fits"), overwrite=True)
|
||||
|
||||
if return_hdul:
|
||||
return hdul
|
||||
|
||||
Reference in New Issue
Block a user