Create analysis tool

This commit is contained in:
Thibault Barnouin
2022-04-06 15:45:14 +02:00
parent c4311bbb4b
commit f6df92a1c2
15 changed files with 652 additions and 184 deletions

View File

@@ -118,10 +118,11 @@ def save_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, P, debiased_P, s_P,
exp_tot = np.array([header['exptime'] for header in headers]).sum()
new_wcs = wcs.WCS(ref_header).deepcopy()
vertex = clean_ROI(data_mask)
shape = vertex[1::2]-vertex[0::2]
new_wcs.array_shape = shape
new_wcs.wcs.crpix = np.array(new_wcs.wcs.crpix) - vertex[0::-2]
if data_mask.shape != (1,1):
vertex = clean_ROI(data_mask)
shape = vertex[1::2]-vertex[0::2]
new_wcs.array_shape = shape
new_wcs.wcs.crpix = np.array(new_wcs.wcs.crpix) - vertex[0::-2]
header = new_wcs.to_header()
header['instrume'] = (ref_header['instrume'], 'Instrument from which data is reduced')
@@ -138,25 +139,27 @@ def save_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, P, debiased_P, s_P,
header['PA_int_err'] = (ref_header['PA_int_err'], 'Integrated polarization angle error')
#Crop Data to mask
I_stokes = I_stokes[vertex[0]:vertex[1],vertex[2]:vertex[3]]
Q_stokes = Q_stokes[vertex[0]:vertex[1],vertex[2]:vertex[3]]
U_stokes = U_stokes[vertex[0]:vertex[1],vertex[2]:vertex[3]]
P = P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
debiased_P = debiased_P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
s_P = s_P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
s_P_P = s_P_P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
PA = PA[vertex[0]:vertex[1],vertex[2]:vertex[3]]
s_PA = s_PA[vertex[0]:vertex[1],vertex[2]:vertex[3]]
s_PA_P = s_PA_P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
new_Stokes_cov = np.zeros((3,3,shape[0],shape[1]))
for i in range(3):
for j in range(3):
Stokes_cov[i,j][(1-data_mask).astype(bool)] = 0.
new_Stokes_cov[i,j] = Stokes_cov[i,j][vertex[0]:vertex[1],vertex[2]:vertex[3]]
Stokes_cov = new_Stokes_cov
data_mask = data_mask[vertex[0]:vertex[1],vertex[2]:vertex[3]].astype(float, copy=False)
if data_mask.shape != (1,1):
I_stokes = I_stokes[vertex[0]:vertex[1],vertex[2]:vertex[3]]
Q_stokes = Q_stokes[vertex[0]:vertex[1],vertex[2]:vertex[3]]
U_stokes = U_stokes[vertex[0]:vertex[1],vertex[2]:vertex[3]]
P = P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
debiased_P = debiased_P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
s_P = s_P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
s_P_P = s_P_P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
PA = PA[vertex[0]:vertex[1],vertex[2]:vertex[3]]
s_PA = s_PA[vertex[0]:vertex[1],vertex[2]:vertex[3]]
s_PA_P = s_PA_P[vertex[0]:vertex[1],vertex[2]:vertex[3]]
new_Stokes_cov = np.zeros((3,3,shape[0],shape[1]))
for i in range(3):
for j in range(3):
Stokes_cov[i,j][(1-data_mask).astype(bool)] = 0.
new_Stokes_cov[i,j] = Stokes_cov[i,j][vertex[0]:vertex[1],vertex[2]:vertex[3]]
Stokes_cov = new_Stokes_cov
data_mask = data_mask[vertex[0]:vertex[1],vertex[2]:vertex[3]]
data_mask = data_mask.astype(float, copy=False)
#Create HDUList object
hdul = fits.HDUList([])

View File

@@ -13,8 +13,8 @@ from copy import deepcopy
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from matplotlib.widgets import RectangleSelector, Button, Slider
from matplotlib.transforms import Bbox
from matplotlib.path import Path
from matplotlib.widgets import RectangleSelector, Button, Slider, TextBox, LassoSelector
import matplotlib.font_manager as fm
from mpl_toolkits.axes_grid1.anchored_artists import AnchoredSizeBar, AnchoredDirectionArrows
from astropy.wcs import WCS
@@ -682,44 +682,72 @@ class crop_map(object):
"""
Class to interactively crop a map to desired Region of Interest
"""
def __init__(self, hdul):
def __init__(self, hdul, fig=None, ax=None):
#Get data
self.cropped=False
self.hdul = hdul
self.header = deepcopy(self.hdul[0].header)
self.wcs = WCS(self.header).deepcopy()
self.data = deepcopy(self.hdul[0].data)
try:
convert_flux = self.header['photflam']
self.convert_flux = self.header['photflam']
except KeyError:
convert_flux = 1.
self.convert_flux = 1.
#Plot the map
plt.rcParams.update({'font.size': 16})
self.fig = plt.figure(figsize=(15,15))
self.ax = self.fig.add_subplot(111, projection=self.wcs)
self.ax.set_facecolor('k')
self.fig.subplots_adjust(hspace=0, wspace=0, right=0.9)
cbar_ax = self.fig.add_axes([0.95, 0.12, 0.01, 0.75])
plt.rcParams.update({'font.size': 12})
plt.ioff()
if fig is None:
self.fig = plt.figure(figsize=(15,15))
self.fig.suptitle("Click and drag to crop to desired Region of Interest.")
else:
self.fig = fig
if ax is None:
self.ax = self.fig.add_subplot(111, projection=self.wcs)
self.mask_alpha=1.
#Selection button
self.axapply = self.fig.add_axes([0.80, 0.01, 0.1, 0.04])
self.bapply = Button(self.axapply, 'Apply')
self.axreset = self.fig.add_axes([0.60, 0.01, 0.1, 0.04])
self.breset = Button(self.axreset, 'Reset')
self.embedded = False
else:
self.ax = ax
self.mask_alpha = 0.75
self.rect_selector = RectangleSelector(self.ax, self.onselect_crop,
drawtype='box', button=[1], interactive=True)
self.embedded = True
self.display()
plt.ion()
self.ax.plot(*self.wcs.wcs.crpix, 'r+')
self.extent = np.array([0.,self.data.shape[0],0., self.data.shape[1]])
self.center = np.array(self.data.shape)/2
self.RSextent = deepcopy(self.extent)
self.RScenter = deepcopy(self.center)
vmin, vmax = 0., np.max(self.data[self.data > 0.]*convert_flux)
im = self.ax.imshow(self.data*convert_flux, vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno', alpha=1., origin='lower')
cbar = plt.colorbar(im, cax=cbar_ax, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
plt.show()
#Selection button
self.axapply = self.fig.add_axes([0.80, 0.01, 0.1, 0.04])
self.bapply = Button(self.axapply, 'Apply crop')
self.axreset = self.fig.add_axes([0.60, 0.01, 0.1, 0.04])
self.breset = Button(self.axreset, 'Reset')
def display(self, data=None, wcs=None, convert_flux=None):
if data is None:
data = self.data
if wcs is None:
wcs = self.wcs
if convert_flux is None:
convert_flux = self.convert_flux
vmin, vmax = 0., np.max(data[data > 0.]*convert_flux)
if hasattr(self, 'im'):
self.im.remove()
self.im = self.ax.imshow(data*convert_flux, vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno', alpha=self.mask_alpha, origin='lower')
if hasattr(self, 'cr'):
self.cr[0].set_data(*wcs.wcs.crpix)
else:
self.cr = self.ax.plot(*wcs.wcs.crpix, 'r+')
self.fig.canvas.draw_idle()
return self.im
self.ax.set_title("Click and drag to crop to desired Region of Interest.")
@property
def crpix_in_RS(self):
crpix = self.wcs.wcs.crpix
x_lim, y_lim = self.RSextent[:2], self.RSextent[2:]
@@ -729,57 +757,85 @@ class crop_map(object):
return False
def reset_crop(self, event):
self.ax.reset_wcs(self.wcs)
if hasattr(self, 'hdul_crop'):
del self.hdul_crop, self.data_crop
self.display()
if self.fig.canvas.manager.toolbar.mode == '':
self.rect_selector = RectangleSelector(self.ax, self.onselect_crop,
drawtype='box', button=[1], interactive=True)
self.RSextent = deepcopy(self.extent)
self.RScenter = deepcopy(self.center)
self.ax.set_xlim(*self.extent[:2])
self.ax.set_ylim(*self.extent[2:])
self.rect_selector.clear()
self.fig.canvas.draw_idle()
def onselect_crop(self, eclick, erelease) -> None:
# Obtain (xmin, xmax, ymin, ymax) values
self.RSextent = np.array(self.rect_selector.extents)
self.RScenter = np.array(self.rect_selector.center)
if self.embedded:
self.apply_crop(erelease)
def apply_crop(self, event):
if hasattr(self, 'hdul_crop'):
header = self.header_crop
data = self.data_crop
wcs = self.wcs_crop
else:
header = self.header
data = self.data
wcs = self.wcs
vertex = self.RSextent.astype(int)
shape = vertex[1::2] - vertex[0::2]
#Update WCS and header in new cropped image
crpix = np.array(self.wcs.wcs.crpix)
self.wcs_crop = self.wcs.deepcopy()
self.wcs_crop.array_shape = shape
if self.crpix_in_RS():
self.wcs_crop.wcs.crpix = np.array(self.wcs_crop.wcs.crpix) - self.RSextent[::2]
else:
self.wcs_crop.wcs.crval = self.wcs.wcs_pix2world([self.RScenter],1)[0]
self.wcs_crop.wcs.crpix = self.RScenter-self.RSextent[::2]
# Crop dataset
self.data_crop = self.data[vertex[2]:vertex[3], vertex[0]:vertex[1]]
extent = np.array(self.im.get_extent())
shape_im = extent[1::2] - extent[0::2]
if (shape_im.astype(int) != shape).any() and (self.RSextent != self.extent).any():
#Update WCS and header in new cropped image
crpix = np.array(wcs.wcs.crpix)
self.wcs_crop = wcs.deepcopy()
self.wcs_crop.array_shape = shape
if self.crpix_in_RS:
self.wcs_crop.wcs.crpix = np.array(self.wcs_crop.wcs.crpix) - self.RSextent[::2]
else:
self.wcs_crop.wcs.crval = wcs.wcs_pix2world([self.RScenter],1)[0]
self.wcs_crop.wcs.crpix = self.RScenter-self.RSextent[::2]
# Crop dataset
self.data_crop = deepcopy(data[vertex[2]:vertex[3], vertex[0]:vertex[1]])
#Write cropped map to new HDUList
self.header_crop = deepcopy(self.header)
self.header_crop.update(self.wcs_crop.to_header())
self.hdul_crop = fits.HDUList([fits.PrimaryHDU(self.data_crop,self.header_crop)])
#Write cropped map to new HDUList
self.header_crop = deepcopy(header)
self.header_crop.update(self.wcs_crop.to_header())
self.hdul_crop = fits.HDUList([fits.PrimaryHDU(self.data_crop,self.header_crop)])
try:
convert_flux = self.header_crop['photflam']
except KeyError:
convert_flux = 1.
try:
convert_flux = self.header_crop['photflam']
except KeyError:
convert_flux = 1.
self.fig.clear()
self.ax = self.fig.add_subplot(111,projection=self.wcs_crop)
vmin, vmax = 0., np.max(self.data_crop[self.data_crop > 0.]*convert_flux)
im = self.ax.imshow(self.data_crop*convert_flux, vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno', alpha=1., origin='lower')
self.ax.plot(*self.wcs_crop.wcs.crpix, 'r+')
cbar_ax = self.fig.add_axes([0.95, 0.12, 0.01, 0.75])
cbar = plt.colorbar(im, cax=cbar_ax, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
xlim, ylim = self.RSextent[1::2]-self.RSextent[0::2]
self.ax.set_xlim(0,xlim)
self.ax.set_ylim(0,ylim)
self.rect_selector.clear()
self.rect_selector.clear()
self.ax.reset_wcs(self.wcs_crop)
self.display(data=self.data_crop, wcs=self.wcs_crop, convert_flux=convert_flux)
xlim, ylim = self.RSextent[1::2]-self.RSextent[0::2]
self.ax.set_xlim(0,xlim)
self.ax.set_ylim(0,ylim)
if self.fig.canvas.manager.toolbar.mode == '':
self.rect_selector = RectangleSelector(self.ax, self.onselect_crop,
drawtype='box', button=[1], interactive=True)
self.fig.canvas.draw_idle()
def on_close(self, event) -> None:
if not hasattr(self, 'hdul_crop'):
self.hdul_crop = self.hdul
self.rect_selector.disconnect_events()
self.cropped = True
def crop(self) -> None:
if self.fig.canvas.manager.toolbar.mode == '':
@@ -787,6 +843,7 @@ class crop_map(object):
drawtype='box', button=[1], interactive=True)
self.bapply.on_clicked(self.apply_crop)
self.breset.on_clicked(self.reset_crop)
self.fig.canvas.mpl_connect('close_event', self.on_close)
plt.show()
def writeto(self, filename):
@@ -802,146 +859,384 @@ class crop_Stokes(crop_map):
"""
Redefine apply_crop method for the Stokes HDUList.
"""
self.hdul_crop = deepcopy(self.hdul)
if hasattr(self, 'hdul_crop'):
hdul = self.hdul_crop
data = self.data_crop
wcs = self.wcs_crop
else:
hdul = self.hdul
data = self.data
wcs = self.wcs
vertex = self.RSextent.astype(int)
shape = vertex[1::2] - vertex[0::2]
#Update WCS and header in new cropped image
crpix = np.array(self.wcs.wcs.crpix)
self.wcs_crop = self.wcs.deepcopy()
self.wcs_crop.array_shape = shape
if self.crpix_in_RS():
self.wcs_crop.wcs.crpix = np.array(self.wcs_crop.wcs.crpix) - self.RSextent[::2]
else:
self.wcs_crop.wcs.crval = self.wcs.wcs_pix2world([self.RScenter],1)[0]
self.wcs_crop.wcs.crpix = self.RScenter-self.RSextent[::2]
# Crop dataset
for dataset in self.hdul_crop:
if dataset.header['datatype']=='IQU_cov_matrix':
stokes_cov = np.zeros((3,3,shape[1],shape[0]))
for i in range(3):
for j in range(3):
stokes_cov[i,j] = dataset.data[i,j][vertex[2]:vertex[3], vertex[0]:vertex[1]]
dataset.data = stokes_cov
extent = np.array(self.im.get_extent())
shape_im = extent[1::2] - extent[0::2]
if (shape_im.astype(int) != shape).any() and (self.RSextent != self.extent).any():
#Update WCS and header in new cropped image
self.hdul_crop = deepcopy(hdul)
crpix = np.array(wcs.wcs.crpix)
self.wcs_crop = wcs.deepcopy()
self.wcs_crop.array_shape = shape
if self.crpix_in_RS:
self.wcs_crop.wcs.crpix = np.array(self.wcs_crop.wcs.crpix) - self.RSextent[::2]
else:
dataset.data = dataset.data[vertex[2]:vertex[3], vertex[0]:vertex[1]]
dataset.header.update(self.wcs_crop.to_header())
self.wcs_crop.wcs.crval = wcs.wcs_pix2world([self.RScenter],1)[0]
self.wcs_crop.wcs.crpix = self.RScenter-self.RSextent[::2]
# Crop dataset
for dataset in self.hdul_crop:
if dataset.header['datatype']=='IQU_cov_matrix':
stokes_cov = np.zeros((3,3,shape[1],shape[0]))
for i in range(3):
for j in range(3):
stokes_cov[i,j] = deepcopy(dataset.data[i,j][vertex[2]:vertex[3], vertex[0]:vertex[1]])
dataset.data = stokes_cov
else:
dataset.data = deepcopy(dataset.data[vertex[2]:vertex[3], vertex[0]:vertex[1]])
dataset.header.update(self.wcs_crop.to_header())
try:
convert_flux = self.hdul_crop[0].header['photflam']
except KeyError:
convert_flux = 1.
try:
convert_flux = self.hdul_crop[0].header['photflam']
except KeyError:
convert_flux = 1.
data_crop = self.hdul_crop[0].data
self.fig.clear()
self.ax = self.fig.add_subplot(111,projection=self.wcs_crop)
vmin, vmax = 0., np.max(data_crop[data_crop > 0.]*convert_flux)
im = self.ax.imshow(data_crop*convert_flux, vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno', alpha=1., origin='lower')
self.ax.plot(*self.wcs_crop.wcs.crpix, 'r+')
cbar_ax = self.fig.add_axes([0.95, 0.12, 0.01, 0.75])
cbar = plt.colorbar(im, cax=cbar_ax, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
xlim, ylim = self.RSextent[1::2]-self.RSextent[0::2]
self.ax.set_xlim(0,xlim)
self.ax.set_ylim(0,ylim)
self.rect_selector.clear()
self.data_crop = self.hdul_crop[0].data
self.rect_selector.clear()
if not self.embedded:
self.ax.reset_wcs(self.wcs_crop)
self.display(data=self.data_crop, wcs=self.wcs_crop, convert_flux=convert_flux)
xlim, ylim = self.RSextent[1::2]-self.RSextent[0::2]
self.ax.set_xlim(0,xlim)
self.ax.set_ylim(0,ylim)
else:
self.on_close(event)
if self.fig.canvas.manager.toolbar.mode == '':
self.rect_selector = RectangleSelector(self.ax, self.onselect_crop,
drawtype='box', button=[1], interactive=True)
self.fig.canvas.draw_idle()
@property
def data_mask(self):
return self.hdul_crop[-1].data
class image_lasso_selector:
def __init__(self, img, fig=None, ax=None):
"""
img must have shape (X, Y)
"""
self.selected = False
self.img = img
self.vmin, self.vmax = 0., np.max(self.img[self.img>0.])
plt.ioff() # see https://github.com/matplotlib/matplotlib/issues/17013
if fig is None:
self.fig = plt.figure(figsize=(15,15))
else:
self.fig = fig
if ax is None:
self.ax = self.fig.gca()
self.mask_alpha = 1.
self.embedded = False
else:
self.ax = ax
self.mask_alpha = 0.1
self.embedded = True
self.displayed = self.ax.imshow(self.img, vmin=self.vmin, vmax=self.vmax, aspect='auto', cmap='inferno',alpha=self.mask_alpha)
plt.ion()
lineprops = {'color': 'grey', 'linewidth': 1, 'alpha': 0.8}
self.lasso = LassoSelector(self.ax, self.onselect,props=lineprops, useblit=False)
self.lasso.set_visible(True)
pix_x = np.arange(self.img.shape[0])
pix_y = np.arange(self.img.shape[1])
xv, yv = np.meshgrid(pix_y,pix_x)
self.pix = np.vstack( (xv.flatten(), yv.flatten()) ).T
self.fig.canvas.mpl_connect('close_event', self.on_close)
plt.show()
def on_close(self, event=None) -> None:
if not hasattr(self, 'mask'):
self.mask = np.zeros(self.img.shape[:2],dtype=bool)
self.lasso.disconnect_events()
self.selected = True
def onselect(self, verts):
self.verts = verts
p = Path(verts)
self.indices = p.contains_points(self.pix, radius=0).reshape(self.img.shape[:2])
self.update_mask()
def update_mask(self):
self.displayed.remove()
self.displayed = self.ax.imshow(self.img, vmin=self.vmin, vmax=self.vmax, aspect='auto', cmap='inferno',alpha=self.mask_alpha)
array = self.displayed.get_array().data
self.mask = np.zeros(self.img.shape[:2],dtype=bool)
self.mask[self.indices] = True
if hasattr(self, 'cont'):
for coll in self.cont.collections:
coll.remove()
self.cont = self.ax.contour(self.mask.astype(float),levels=[0.5], colors='white', linewidths=1)
if not self.embedded:
self.displayed.set_data(array)
self.fig.canvas.draw_idle()
else:
self.on_close()
class pol_map(object):
"""
Class to interactively study polarization maps.
"""
def __init__(self,Stokes, SNRp_cut=3., SNRi_cut=30.):
def __init__(self,Stokes, SNRp_cut=3., SNRi_cut=30., selection=None):
self.Stokes = Stokes
self.wcs = deepcopy(WCS(Stokes[0].header))
self.Stokes = deepcopy(Stokes)
self.SNRp_cut = SNRp_cut
self.SNRi_cut = SNRi_cut
self.SNRi = deepcopy(self.SNRi_cut)
self.SNRp = deepcopy(self.SNRp_cut)
self.region = None
self.data = None
self.display_selection = selection
#Get data
self.targ = self.Stokes[0].header['targname']
self.pivot_wav = self.Stokes[0].header['photplam']
self.convert_flux = self.Stokes[0].header['photflam']
self.I = self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='I_stokes' for i in range(len(Stokes))])].data
self.Q = self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Q_stokes' for i in range(len(Stokes))])].data
self.U = self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='U_stokes' for i in range(len(Stokes))])].data
self.IQU_cov = self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='IQU_cov_matrix' for i in range(len(Stokes))])].data
self.P = self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Pol_deg_debiased' for i in range(len(Stokes))])].data
self.s_P = self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Pol_deg_err' for i in range(len(Stokes))])].data
self.PA = self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Pol_ang' for i in range(len(Stokes))])].data
#Create figure
fontprops = fm.FontProperties(size=16)
plt.rcParams.update({'font.size': 10})
self.fig = plt.figure(figsize=(15,15))
self.fig.subplots_adjust(hspace=0, wspace=0, right=0.9)
self.fig.subplots_adjust(hspace=0, wspace=0, right=0.88)
self.ax = self.fig.add_subplot(111,projection=self.wcs)
self.ax.set_facecolor('black')
self.ax.coords.grid(True, color='white', ls='dotted', alpha=0.5)
self.ax.coords[0].set_axislabel('Right Ascension (J2000)')
self.ax.coords[0].set_axislabel_position('t')
self.ax.coords[0].set_ticklabel_position('t')
self.ax.coords[1].set_axislabel('Declination (J2000)')
self.ax.coords[1].set_axislabel_position('l')
self.ax.coords[1].set_ticklabel_position('l')
self.ax.axis('equal')
self.ax_cosmetics()
self.cbar_ax = self.fig.add_axes([0.925, 0.12, 0.01, 0.75])
#Display total flux
im = self.ax.imshow(self.I*self.convert_flux, vmin=0., vmax=self.I[self.I > 0.].max()*self.convert_flux, aspect='auto', cmap='inferno')
cbar_ax = self.fig.add_axes([0.95, 0.12, 0.01, 0.75])
cbar = plt.colorbar(im, cax=cbar_ax, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
#Display selected data (Default to total flux)
self.display()
#Display polarization vectors in SNR_cut
self.pol_vector()
#Display scales and orientation
px_size = self.wcs.wcs.cdelt[0]*3600.
px_sc = AnchoredSizeBar(self.ax.transData, 1./px_size, '1 arcsec', 3, pad=0.5, sep=5, borderpad=0.5, frameon=False, size_vertical=0.005, color='white', fontproperties=fontprops)
self.ax.add_artist(px_sc)
pol_sc = AnchoredSizeBar(self.ax.transData, 2., r"$P$= 100%", 4, pad=0.5, sep=5, borderpad=0.5, frameon=False, size_vertical=0.005, color='white', fontproperties=fontprops)
self.ax.add_artist(pol_sc)
north_dir = AnchoredDirectionArrows(self.ax.transAxes, "E", "N", length=-0.08, fontsize=0.025, loc=1, aspect_ratio=-1, sep_y=0.01, sep_x=0.01, back_length=0., head_length=10., head_width=10., angle=-self.Stokes[0].header['orientat'], color='white', text_props={'ec': None, 'fc': 'w', 'alpha': 1, 'lw': 0.4}, arrow_props={'ec': None,'fc':'w','alpha': 1,'lw': 1})
self.ax.add_artist(north_dir)
#Display integrated values in ROI
self.pol_int
self.pol_int()
#Set axes for sliders (SNRp_cut, SNRi_cut)
ax_I_cut = self.fig.add_axes([0.125, 0.080, 0.35, 0.01])
ax_P_cut = self.fig.add_axes([0.125, 0.055, 0.35, 0.01])
ax_reset = self.fig.add_axes([0.125, 0.020, 0.05, 0.02])
ax_snr_reset = self.fig.add_axes([0.125, 0.020, 0.05, 0.02])
SNRi_max = np.max(self.I[self.IQU_cov[0,0]>0.]/np.sqrt(self.IQU_cov[0,0][self.IQU_cov[0,0]>0.]))
SNRp_max = np.max(self.P[self.s_P>0.]/self.s_P[self.s_P > 0.])
s_I_cut = Slider(ax_I_cut,r"$SNR^{I}_{cut}$",1.,SNRi_max,valstep=1,valinit=self.SNRi_cut)
s_P_cut = Slider(ax_P_cut,r"$SNR^{P}_{cut}$",1.,SNRp_max,valstep=1,valinit=self.SNRp_cut)
b_reset = Button(ax_reset,"Reset")
s_I_cut = Slider(ax_I_cut,r"$SNR^{I}_{cut}$",1.,int(SNRi_max*0.95),valstep=1,valinit=self.SNRi_cut)
s_P_cut = Slider(ax_P_cut,r"$SNR^{P}_{cut}$",1.,int(SNRp_max*0.95),valstep=1,valinit=self.SNRp_cut)
b_snr_reset = Button(ax_snr_reset,"Reset")
def update_snri(val):
self.SNRi = val
self.quiver.remove()
self.pol_vector()
self.fig.canvas.draw_idle()
def update_snrp(val):
self.SNRp = val
self.quiver.remove()
self.pol_vector()
self.fig.canvas.draw_idle()
def reset(event):
def reset_snr(event):
s_I_cut.reset()
s_P_cut.reset()
s_I_cut.on_changed(update_snri)
s_P_cut.on_changed(update_snrp)
b_reset.on_clicked(reset)
b_snr_reset.on_clicked(reset_snr)
plt.show(block=True)
#Set axe for ROI selection
ax_select = self.fig.add_axes([0.55, 0.070, 0.05, 0.02])
ax_roi_reset = self.fig.add_axes([0.605, 0.070, 0.05, 0.02])
b_select = Button(ax_select,"Select")
self.selected = False
b_roi_reset = Button(ax_roi_reset,"Reset")
def select_roi(event):
if self.data is None:
self.data = self.Stokes[0].data
if self.selected:
self.selected = False
self.region = deepcopy(self.select_instance.mask.astype(bool))
self.select_instance.displayed.remove()
for coll in self.select_instance.cont.collections:
coll.remove()
self.select_instance.lasso.set_active(False)
self.set_data_mask(deepcopy(self.region))
self.pol_int()
else:
self.selected = True
self.region = None
self.select_instance = image_lasso_selector(self.data, fig=self.fig, ax=self.ax)
self.select_instance.lasso.set_active(True)
k = 0
while not self.select_instance.selected and k<60:
self.fig.canvas.start_event_loop(timeout=1)
k+=1
select_roi(event)
self.fig.canvas.draw_idle()
def reset_roi(event):
self.region = None
self.pol_int()
self.fig.canvas.draw_idle()
b_select.on_clicked(select_roi)
b_roi_reset.on_clicked(reset_roi)
#Set axe for crop Stokes
ax_crop = self.fig.add_axes([0.70, 0.070, 0.05, 0.02])
ax_crop_reset = self.fig.add_axes([0.755, 0.070, 0.05, 0.02])
b_crop = Button(ax_crop,"Crop")
self.cropped = False
b_crop_reset = Button(ax_crop_reset,"Reset")
def crop(event):
if self.cropped:
self.cropped = False
self.crop_instance.im.remove()
self.crop_instance.cr.pop(0).remove()
self.crop_instance.rect_selector.set_active(False)
self.Stokes = self.crop_instance.hdul_crop
self.region = deepcopy(self.data_mask.astype(bool))
self.pol_int()
self.ax.reset_wcs(self.wcs)
self.ax_cosmetics()
self.display()
self.pol_vector()
else:
self.cropped = True
self.crop_instance = crop_Stokes(self.Stokes, fig=self.fig, ax=self.ax)
self.crop_instance.rect_selector.set_active(True)
k = 0
while not self.crop_instance.cropped and k<60:
self.fig.canvas.start_event_loop(timeout=1)
k+=1
crop(event)
self.fig.canvas.draw_idle()
def reset_crop(event):
self.Stokes = deepcopy(Stokes)
self.region = None
self.pol_int()
self.ax.reset_wcs(self.wcs)
self.ax_cosmetics()
self.display()
self.pol_vector()
b_crop.on_clicked(crop)
b_crop_reset.on_clicked(reset_crop)
#Set axe for saving plot
ax_save = self.fig.add_axes([0.850, 0.070, 0.05, 0.02])
b_save = Button(ax_save, "Save")
ax_text_save = self.fig.add_axes([0.3, 0.020, 0.5, 0.025],visible=False)
text_box = TextBox(ax_text_save, "Save to:", initial='')
def saveplot(event):
ax_text_save.set(visible=True)
ax_snr_reset.set(visible=False)
ax_save.set(visible=False)
self.fig.canvas.draw_idle()
b_save.on_clicked(saveplot)
def submit(expression):
ax_text_save.set(visible=False)
if expression != '':
save_fig = plt.figure(figsize=(15,15))
save_ax = save_fig.add_subplot(111, projection=self.wcs)
self.ax_cosmetics(ax=save_ax)
self.display(fig=save_fig,ax=save_ax)
self.pol_vector(fig=save_fig,ax=save_ax)
self.pol_int(fig=save_fig,ax=save_ax)
save_fig.suptitle(r"{0:s} with $SNR_{{p}} \geq$ {1:d} and $SNR_{{I}} \geq$ {2:d}".format(self.targ, int(self.SNRp), int(self.SNRi)))
if not expression[-4:] in ['.png', '.jpg']:
expression += '.png'
save_fig.savefig(expression, bbox_inches='tight', dpi=200)
plt.close(save_fig)
text_box.set_val('')
ax_snr_reset.set(visible=True)
ax_save.set(visible=True)
self.fig.canvas.draw_idle()
text_box.on_submit(submit)
#Set axes for display buttons
ax_tf = self.fig.add_axes([0.925, 0.085, 0.05, 0.02])
ax_pf = self.fig.add_axes([0.925, 0.065, 0.05, 0.02])
ax_p = self.fig.add_axes([0.925, 0.045, 0.05, 0.02])
ax_snri = self.fig.add_axes([0.925, 0.025, 0.05, 0.02])
ax_snrp = self.fig.add_axes([0.925, 0.005, 0.05, 0.02])
b_tf = Button(ax_tf,r"$F_{\lambda}$")
b_pf = Button(ax_pf,r"$F_{\lambda} \cdot P$")
b_p = Button(ax_p,r"$P$")
b_snri = Button(ax_snri,r"$I / \sigma_{I}$")
b_snrp = Button(ax_snrp,r"$P / \sigma_{P}$")
def d_tf(event):
self.display_selection = 'total_flux'
self.display()
b_tf.on_clicked(d_tf)
def d_pf(event):
self.display_selection = 'pol_flux'
self.display()
b_pf.on_clicked(d_pf)
def d_p(event):
self.display_selection = 'pol_deg'
self.display()
b_p.on_clicked(d_p)
def d_snri(event):
self.display_selection = 'snri'
self.display()
b_snri.on_clicked(d_snri)
def d_snrp(event):
self.display_selection = 'snrp'
self.display()
b_snrp.on_clicked(d_snrp)
plt.show()
@property
def wcs(self):
return deepcopy(WCS(self.Stokes[0].header))
@property
def I(self):
return self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='I_stokes' for i in range(len(self.Stokes))])].data
@property
def Q(self):
return self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Q_stokes' for i in range(len(self.Stokes))])].data
@property
def U(self):
return self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='U_stokes' for i in range(len(self.Stokes))])].data
@property
def IQU_cov(self):
return self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='IQU_cov_matrix' for i in range(len(self.Stokes))])].data
@property
def P(self):
return self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Pol_deg_debiased' for i in range(len(self.Stokes))])].data
@property
def s_P(self):
return self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Pol_deg_err' for i in range(len(self.Stokes))])].data
@property
def PA(self):
return self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Pol_ang' for i in range(len(self.Stokes))])].data
@property
def data_mask(self):
return self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Data_mask' for i in range(len(self.Stokes))])].data
def set_data_mask(self, mask):
self.Stokes[np.argmax([self.Stokes[i].header['datatype']=='Data_mask' for i in range(len(self.Stokes))])].data = mask.astype(float)
@property
def cut(self):
@@ -950,18 +1245,106 @@ class pol_map(object):
SNRp_mask[self.s_P > 0.] = self.P[self.s_P > 0.] / self.s_P[self.s_P > 0.] > self.SNRp
SNRi_mask[s_I > 0.] = self.I[s_I > 0.] / s_I[s_I > 0.] > self.SNRi
return np.logical_and(SNRi_mask,SNRp_mask)
def pol_vector(self):
def ax_cosmetics(self, ax=None):
if ax is None:
ax = self.ax
ax.set_facecolor('black')
ax.coords.grid(True, color='white', ls='dotted', alpha=0.5)
ax.coords[0].set_axislabel('Right Ascension (J2000)')
ax.coords[0].set_axislabel_position('t')
ax.coords[0].set_ticklabel_position('t')
ax.coords[1].set_axislabel('Declination (J2000)')
ax.coords[1].set_axislabel_position('l')
ax.coords[1].set_ticklabel_position('l')
ax.axis('equal')
#Display scales and orientation
fontprops = fm.FontProperties(size=14)
px_size = self.wcs.wcs.cdelt[0]*3600.
if hasattr(self,'px_sc'):
self.px_sc.remove()
self.px_sc = AnchoredSizeBar(ax.transData, 1./px_size, '1 arcsec', 3, pad=0.5, sep=5, borderpad=0.5, frameon=False, size_vertical=0.005, color='white', fontproperties=fontprops)
ax.add_artist(self.px_sc)
if hasattr(self,'pol_sc'):
self.pol_sc.remove()
self.pol_sc = AnchoredSizeBar(ax.transData, 2., r"$P$= 100%", 4, pad=0.5, sep=5, borderpad=0.5, frameon=False, size_vertical=0.005, color='white', fontproperties=fontprops)
ax.add_artist(self.pol_sc)
if hasattr(self,'north_dir'):
self.north_dir.remove()
self.north_dir = AnchoredDirectionArrows(ax.transAxes, "E", "N", length=-0.08, fontsize=0.025, loc=1, aspect_ratio=-1, sep_y=0.01, sep_x=0.01, back_length=0., head_length=10., head_width=10., angle=-self.Stokes[0].header['orientat'], color='white', text_props={'ec': None, 'fc': 'w', 'alpha': 1, 'lw': 0.4}, arrow_props={'ec': None,'fc':'w','alpha': 1,'lw': 1})
ax.add_artist(self.north_dir)
def display(self, fig=None, ax=None):
if self.display_selection is None:
self.display_selection = "total_flux"
if self.display_selection.lower() in ['total_flux']:
self.data = self.I*self.convert_flux
vmin, vmax = 0., np.max(self.data[self.data > 0.])
label = r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]"
elif self.display_selection.lower() in ['pol_flux']:
self.data = self.I*self.convert_flux*self.P
vmin, vmax = 0., np.max(self.data[self.data > 0.])
label = r"$F_{\lambda} \cdot P$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]"
elif self.display_selection.lower() in ['pol_deg']:
self.data = self.P*100.
vmin, vmax = 0., np.max(self.data[self.data > 0.])
label = r"$P$ [%]"
elif self.display_selection.lower() in ['snri']:
s_I = np.sqrt(self.IQU_cov[0,0])
SNRi = np.zeros(self.I.shape)
SNRi[s_I > 0.] = self.I[s_I > 0.]/s_I[s_I > 0.]
self.data = SNRi
vmin, vmax = 0., np.max(self.data[self.data > 0.])
label = r"$I_{Stokes}/\sigma_{I}$"
elif self.display_selection.lower() in ['snrp']:
SNRp = np.zeros(self.P.shape)
SNRp[self.s_P > 0.] = self.P[self.s_P > 0.]/self.s_P[self.s_P > 0.]
self.data = SNRp
vmin, vmax = 0., np.max(self.data[self.data > 0.])
label = r"$P/\sigma_{P}$"
if fig is None:
fig = self.fig
if ax is None:
ax = self.ax
if hasattr(self, 'im'):
self.im.remove()
self.im = ax.imshow(self.data, vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno')
ax.set_xlim(0,self.data.shape[1])
ax.set_ylim(0,self.data.shape[0])
self.cbar = plt.colorbar(self.im, cax=self.cbar_ax, label=label)
fig.canvas.draw_idle()
return self.im
else:
im = ax.imshow(self.data, vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno')
ax.set_xlim(0,self.data.shape[1])
ax.set_ylim(0,self.data.shape[0])
plt.colorbar(im, pad=0.025, aspect=80, label=label)
fig.canvas.draw_idle()
def pol_vector(self, fig=None, ax=None):
P_cut = np.ones(self.P.shape)*np.nan
P_cut[self.cut] = self.P[self.cut]
X, Y = np.meshgrid(np.arange(self.I.shape[1]),np.arange(self.I.shape[0]))
XY_U, XY_V = P_cut*np.cos(np.pi/2. + self.PA*np.pi/180.), P_cut*np.sin(np.pi/2. + self.PA*np.pi/180.)
self.quiver = self.ax.quiver(X, Y, XY_U, XY_V, units='xy', scale=0.5, scale_units='xy', pivot='mid', headwidth=0., headlength=0., headaxislength=0., width=0.1, color='white')
return self.quiver
if fig is None:
fig = self.fig
if ax is None:
ax = self.ax
if hasattr(self, 'quiver'):
self.quiver.remove()
self.quiver = ax.quiver(X, Y, XY_U, XY_V, units='xy', scale=0.5, scale_units='xy', pivot='mid', headwidth=0., headlength=0., headaxislength=0., width=0.1, color='white')
fig.canvas.draw_idle()
return self.quiver
else:
ax.quiver(X, Y, XY_U, XY_V, units='xy', scale=0.5, scale_units='xy', pivot='mid', headwidth=0., headlength=0., headaxislength=0., width=0.1, color='white')
fig.canvas.draw_idle()
@property
def pol_int(self):
def pol_int(self, fig=None, ax=None):
if self.region is None:
n_pix = self.I.size
s_I = np.sqrt(self.IQU_cov[0,0])
@@ -996,4 +1379,24 @@ class pol_map(object):
PA_reg = princ_angle((90./np.pi)*np.arctan2(U_reg,Q_reg))
PA_reg_err = (90./(np.pi*(Q_reg**2+U_reg**2)))*np.sqrt(U_reg**2*Q_reg_err**2 + Q_reg**2*U_reg_err**2 - 2.*Q_reg*U_reg*QU_reg_err)
return self.ax.annotate(r"$F_{{\lambda}}^{{int}}$({0:.0f} $\AA$) = {1} $ergs \cdot cm^{{-2}} \cdot s^{{-1}} \cdot \AA^{{-1}}$".format(self.pivot_wav,sci_not(I_reg*self.convert_flux,I_reg_err*self.convert_flux,2))+"\n"+r"$P^{{int}}$ = {0:.1f} $\pm$ {1:.1f} %".format(P_reg*100.,P_reg_err*100.)+"\n"+r"$\theta_{{P}}^{{int}}$ = {0:.1f} $\pm$ {1:.1f} °".format(PA_reg,PA_reg_err), color='white', fontsize=12, xy=(0.01, 0.90), xycoords='axes fraction')
if hasattr(self, 'cont'):
for coll in self.cont.collections:
coll.remove()
del self.cont
if fig is None:
fig = self.fig
if ax is None:
ax = self.ax
if hasattr(self, 'an_int'):
self.an_int.remove()
self.an_int = ax.annotate(r"$F_{{\lambda}}^{{int}}$({0:.0f} $\AA$) = {1} $ergs \cdot cm^{{-2}} \cdot s^{{-1}} \cdot \AA^{{-1}}$".format(self.pivot_wav,sci_not(I_reg*self.convert_flux,I_reg_err*self.convert_flux,2))+"\n"+r"$P^{{int}}$ = {0:.1f} $\pm$ {1:.1f} %".format(P_reg*100.,P_reg_err*100.)+"\n"+r"$\theta_{{P}}^{{int}}$ = {0:.1f} $\pm$ {1:.1f} °".format(PA_reg,PA_reg_err), color='white', fontsize=12, xy=(0.01, 0.90), xycoords='axes fraction')
if not self.region is None:
self.cont = ax.contour(self.region.astype(float),levels=[0.5], colors='white', linewidths=0.8)
fig.canvas.draw_idle()
return self.an_int
else:
ax.annotate(r"$F_{{\lambda}}^{{int}}$({0:.0f} $\AA$) = {1} $ergs \cdot cm^{{-2}} \cdot s^{{-1}} \cdot \AA^{{-1}}$".format(self.pivot_wav,sci_not(I_reg*self.convert_flux,I_reg_err*self.convert_flux,2))+"\n"+r"$P^{{int}}$ = {0:.1f} $\pm$ {1:.1f} %".format(P_reg*100.,P_reg_err*100.)+"\n"+r"$\theta_{{P}}^{{int}}$ = {0:.1f} $\pm$ {1:.1f} °".format(PA_reg,PA_reg_err), color='white', fontsize=12, xy=(0.01, 0.94), xycoords='axes fraction')
if not self.region is None:
ax.contour(self.region.astype(float),levels=[0.5], colors='white', linewidths=0.8)
fig.canvas.draw_idle()

View File

@@ -1,36 +0,0 @@
#!/usr/bin/python3
from astropy.io import fits
import numpy as np
from copy import deepcopy
from plots import overplot_radio
Stokes_UV = fits.open("../../data/IC5063_x3nl030/IC5063_FOC_combine_FWHM020.fits")
Stokes_18GHz = fits.open("../../data/IC5063_x3nl030/radio/IC5063.18GHz.fits")
Stokes_24GHz = fits.open("../../data/IC5063_x3nl030/radio/IC5063.24GHz.fits")
Stokes_103GHz = fits.open("../../data/IC5063_x3nl030/radio/I5063_103GHz.fits")
Stokes_229GHz = fits.open("../../data/IC5063_x3nl030/radio/I5063_229GHz.fits")
Stokes_357GHz = fits.open("../../data/IC5063_x3nl030/radio/I5063_357GHz.fits")
levelsMorganti = np.array([1.,2.,3.,8.,16.,32.,64.,128.])
#levels18GHz = np.array([0.6, 1.5, 3, 6, 12, 24, 48, 96])/100.*Stokes_18GHz[0].data.max()
levels18GHz = levelsMorganti*0.28*1e-3
A = overplot_radio(Stokes_UV, Stokes_18GHz)
A.plot(levels=levels18GHz, SNRp_cut=10.0, SNRi_cut=100.0, savename='../../plots/IC5063_x3nl030/18GHz_overplot_forced.png')
#levels24GHz = np.array([1.,1.5, 3, 6, 12, 24, 48, 96])/100.*Stokes_24GHz[0].data.max()
levels24GHz = levelsMorganti*0.46*1e-3
B = overplot_radio(Stokes_UV, Stokes_24GHz)
B.plot(levels=levels24GHz, SNRp_cut=10.0, SNRi_cut=100.0, savename='../../plots/IC5063_x3nl030/24GHz_overplot_forced.png')
levels103GHz = np.linspace(1,99,11)/100.*np.max(deepcopy(Stokes_103GHz[0].data[Stokes_103GHz[0].data > 0.]))
C = overplot_radio(Stokes_UV, Stokes_103GHz)
C.plot(levels=levels103GHz, SNRp_cut=10.0, SNRi_cut=100.0, savename='../../plots/IC5063_x3nl030/103GHz_overplot_forced.png')
levels229GHz = np.linspace(1,99,11)/100.*np.max(deepcopy(Stokes_229GHz[0].data[Stokes_229GHz[0].data > 0.]))
D = overplot_radio(Stokes_UV, Stokes_229GHz)
D.plot(levels=levels229GHz, SNRp_cut=10.0, SNRi_cut=100.0, savename='../../plots/IC5063_x3nl030/229GHz_overplot_forced.png')
levels357GHz = np.linspace(1,99,11)/100.*np.max(deepcopy(Stokes_357GHz[0].data[Stokes_357GHz[0].data > 0.]))
E = overplot_radio(Stokes_UV, Stokes_357GHz)
E.plot(levels=levels357GHz, SNRp_cut=10.0, SNRi_cut=100.0, savename='../../plots/IC5063_x3nl030/357GHz_overplot_forced.png')