Initial commit
This commit is contained in:
140
src/FOC_reduction.py
Executable file
140
src/FOC_reduction.py
Executable file
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/python
|
||||
#-*- coding:utf-8 -*-
|
||||
"""
|
||||
Main script where are progressively added the steps for the FOC pipeline reduction.
|
||||
"""
|
||||
|
||||
#Project libraries
|
||||
import sys
|
||||
import numpy as np
|
||||
import copy
|
||||
import lib.fits as proj_fits #Functions to handle fits files
|
||||
import lib.reduction as proj_red #Functions used in reduction pipeline
|
||||
import lib.plots as proj_plots #Functions for plotting data
|
||||
|
||||
|
||||
def main():
|
||||
##### User inputs
|
||||
## Input and output locations
|
||||
globals()['data_folder'] = "../data/NGC1068_x274020/"
|
||||
infiles = ['x274020at.c0f.fits','x274020bt.c0f.fits','x274020ct.c0f.fits',
|
||||
'x274020dt.c0f.fits','x274020et.c0f.fits','x274020ft.c0f.fits',
|
||||
'x274020gt.c0f.fits','x274020ht.c0f.fits','x274020it.c0f.fits']
|
||||
globals()['plots_folder'] = "../plots/NGC1068_x274020/"
|
||||
|
||||
# globals()['data_folder'] = "../data/NGC1068_x14w010/"
|
||||
# infiles = ['x14w0101t_c0f.fits','x14w0102t_c0f.fits','x14w0103t_c0f.fits',
|
||||
# 'x14w0104t_c0f.fits','x14w0105p_c0f.fits','x14w0106t_c0f.fits']
|
||||
# infiles = ['x14w0101t_c1f.fits','x14w0102t_c1f.fits','x14w0103t_c1f.fits',
|
||||
# 'x14w0104t_c1f.fits','x14w0105p_c1f.fits','x14w0106t_c1f.fits']
|
||||
# globals()['plots_folder'] = "../plots/NGC1068_x14w010/"
|
||||
|
||||
# globals()['data_folder'] = "../data/3C405_x136060/"
|
||||
# infiles = ['x1360601t_c0f.fits','x1360602t_c0f.fits','x1360603t_c0f.fits']
|
||||
# infiles = ['x1360601t_c1f.fits','x1360602t_c1f.fits','x1360603t_c1f.fits']
|
||||
# globals()['plots_folder'] = "../plots/3C405_x136060/"
|
||||
|
||||
# globals()['data_folder'] = "../data/CygnusA_x43w0/"
|
||||
# infiles = ['x43w0101r_c0f.fits', 'x43w0104r_c0f.fits', 'x43w0107r_c0f.fits',
|
||||
# 'x43w0201r_c0f.fits', 'x43w0204r_c0f.fits', 'x43w0102r_c0f.fits',
|
||||
# 'x43w0105r_c0f.fits', 'x43w0108r_c0f.fits', 'x43w0202r_c0f.fits',
|
||||
# 'x43w0205r_c0f.fits', 'x43w0103r_c0f.fits', 'x43w0106r_c0f.fits',
|
||||
# 'x43w0109r_c0f.fits', 'x43w0203r_c0f.fits', 'x43w0206r_c0f.fits']
|
||||
# globals()['plots_folder'] = "../plots/CygnusA_x43w0/"
|
||||
|
||||
## Reduction parameters
|
||||
# Deconvolution
|
||||
deconvolve = True
|
||||
if deconvolve:
|
||||
psf = 'gaussian' #Can be user-defined as well
|
||||
psf_FWHM = 0.10
|
||||
psf_scale = 'arcsec'
|
||||
psf_shape=(9,9)
|
||||
iterations = 10
|
||||
# Error estimation
|
||||
error_sub_shape = (50,50)
|
||||
display_error = False
|
||||
# Data binning
|
||||
rebin = True
|
||||
if rebin:
|
||||
pxsize = 0.10
|
||||
px_scale = 'arcsec' #pixel or arcsec
|
||||
rebin_operation = 'sum' #sum or average
|
||||
# Alignement
|
||||
align_center = 'maxflux' #If None will align image to image center
|
||||
display_data = True
|
||||
# Smoothing
|
||||
smoothing_function = 'combine' #gaussian or combine
|
||||
smoothing_FWHM = None #If None, no smoothing is done
|
||||
smoothing_scale = 'pixel' #pixel or arcsec
|
||||
# Rotation
|
||||
rotate = False #rotation to North convention can give erroneous results
|
||||
rotate_library = 'scipy' #scipy or pillow
|
||||
# Polarization map output
|
||||
figname = 'NGC1068_FOC' #target/intrument name
|
||||
figtype = '' #additionnal informations
|
||||
SNRp_cut = 3 #P measurments with SNR>3
|
||||
SNRi_cut = 30 #I measurments with SNR>30, which implies an uncertainty in P of 4.7%.
|
||||
step_vec = 1 #plot all vectors in the array. if step_vec = 2, then every other vector will be plotted
|
||||
|
||||
##### Pipeline start
|
||||
## Step 1:
|
||||
# Get data from fits files and translate to flux in erg/cm²/s/Angstrom.
|
||||
data_array, headers = proj_fits.get_obs_data(infiles, data_folder=data_folder, compute_flux=True)
|
||||
# Crop data to remove outside blank margins.
|
||||
data_array, error_array = proj_red.crop_array(data_array, step=5, null_val=0., inside=True)
|
||||
# Deconvolve data using Richardson-Lucy iterative algorithm with a gaussian PSF of given FWHM.
|
||||
headers2 = copy.deepcopy(headers)
|
||||
if deconvolve:
|
||||
data_array2 = proj_red.deconvolve_array(data_array, headers2, psf=psf, FWHM=psf_FWHM, scale=psf_scale, shape=psf_shape, iterations=iterations)
|
||||
# Estimate error from data background, estimated from sub-image of desired sub_shape.
|
||||
data_array, error_array = proj_red.get_error(data_array, sub_shape=error_sub_shape, display=display_error, headers=headers, savename=figname+"_errors", plots_folder=plots_folder)
|
||||
data_array2, error_array2 = proj_red.get_error(data_array2, sub_shape=error_sub_shape, display=display_error, headers=headers2, savename=figname+"_errors", plots_folder=plots_folder)
|
||||
# Rebin data to desired pixel size.
|
||||
if rebin:
|
||||
data_array, error_array, headers, Dxy = proj_red.rebin_array(data_array, error_array, headers, pxsize=pxsize, scale=px_scale, operation=rebin_operation)
|
||||
data_array2, error_array2, headers2, Dxy = proj_red.rebin_array(data_array2, error_array2, headers2, pxsize=pxsize, scale=px_scale, operation=rebin_operation)
|
||||
#Align and rescale images with oversampling.
|
||||
data_array, error_array = proj_red.align_data(data_array, error_array, upsample_factor=np.min(Dxy).astype(int), ref_center=align_center, return_shifts=False)
|
||||
data_array2, error_array2 = proj_red.align_data(data_array2, error_array2, upsample_factor=np.min(Dxy).astype(int), ref_center=align_center, return_shifts=False)
|
||||
|
||||
#Plot array for checking output
|
||||
if display_data:
|
||||
proj_plots.plot_obs(data_array, headers, vmin=data_array.min(), vmax=data_array.max(), savename=figname+"_center_"+align_center, plots_folder=plots_folder)
|
||||
proj_plots.plot_obs(data_array2, headers, vmin=data_array.min(), vmax=data_array.max(), savename=figname+"_deconv_center_"+align_center, plots_folder=plots_folder)
|
||||
proj_plots.plot_obs(data_array/data_array2, headers, vmin=0., vmax=10., savename=figname+"_ratio_deconv_center_"+align_center, plots_folder=plots_folder)
|
||||
|
||||
## Step 2:
|
||||
# Compute Stokes I, Q, U with smoothed polarized images
|
||||
# SMOOTHING DISCUSSION :
|
||||
# FWHM of FOC have been estimated at about 0.03" across 1500-5000 Angstrom band, which is about 2 detector pixels wide
|
||||
# see Jedrzejewski, R.; Nota, A.; Hack, W. J., A Comparison Between FOC and WFPC2
|
||||
# Bibcode : 1995chst.conf...10J
|
||||
I_stokes, Q_stokes, U_stokes, Stokes_cov = proj_red.compute_Stokes(data_array, error_array, headers, FWHM=smoothing_FWHM, scale=smoothing_scale, smoothing=smoothing_function)
|
||||
|
||||
## Step 3:
|
||||
# Rotate images to have North up
|
||||
if rotate:
|
||||
ref_header = copy.deepcopy(headers[0])
|
||||
if rotate_library.lower() in ['scipy','scipy.ndimage']:
|
||||
I_stokes, Q_stokes, U_stokes, Stokes_cov, headers = proj_red.rotate_sc_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, headers, -ref_header['orientat'])
|
||||
if rotate_library.lower() in ['pillow','pil']:
|
||||
I_stokes, Q_stokes, U_stokes, Stokes_cov, headers = proj_red.rotate_PIL_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, headers, -ref_header['orientat'])
|
||||
# Compute polarimetric parameters (polarization degree and angle).
|
||||
P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P = proj_red.compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, headers)
|
||||
|
||||
## Step 4:
|
||||
# Save image to FITS.
|
||||
Stokes_test = proj_fits.save_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P, headers[0], figname+figtype, data_folder=data_folder, return_hdul=True)
|
||||
|
||||
## Step 5:
|
||||
# Plot polarization map (Background is either total Flux, Polarization degree or Polarization degree error).
|
||||
proj_plots.polarization_map(copy.deepcopy(Stokes_test), SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, step_vec=step_vec, savename=figname+figtype, plots_folder=plots_folder, display=None)
|
||||
proj_plots.polarization_map(copy.deepcopy(Stokes_test), SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, step_vec=step_vec, savename=figname+figtype+"_P", plots_folder=plots_folder, display='Pol_deg')
|
||||
proj_plots.polarization_map(copy.deepcopy(Stokes_test), SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, step_vec=step_vec, savename=figname+figtype+"_P_err", plots_folder=plots_folder, display='Pol_deg_err')
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
BIN
src/lib/__pycache__/convex_hull.cpython-39.pyc
Normal file
BIN
src/lib/__pycache__/convex_hull.cpython-39.pyc
Normal file
Binary file not shown.
BIN
src/lib/__pycache__/cross_correlation.cpython-39.pyc
Normal file
BIN
src/lib/__pycache__/cross_correlation.cpython-39.pyc
Normal file
Binary file not shown.
BIN
src/lib/__pycache__/deconvolve.cpython-39.pyc
Normal file
BIN
src/lib/__pycache__/deconvolve.cpython-39.pyc
Normal file
Binary file not shown.
BIN
src/lib/__pycache__/fits.cpython-39.pyc
Normal file
BIN
src/lib/__pycache__/fits.cpython-39.pyc
Normal file
Binary file not shown.
BIN
src/lib/__pycache__/plots.cpython-39.pyc
Normal file
BIN
src/lib/__pycache__/plots.cpython-39.pyc
Normal file
Binary file not shown.
BIN
src/lib/__pycache__/reduction.cpython-39.pyc
Normal file
BIN
src/lib/__pycache__/reduction.cpython-39.pyc
Normal file
Binary file not shown.
337
src/lib/convex_hull.py
Executable file
337
src/lib/convex_hull.py
Executable file
@@ -0,0 +1,337 @@
|
||||
"""
|
||||
Library functions for graham algorithm implementation (find the convex hull
|
||||
of a given list of points).
|
||||
"""
|
||||
|
||||
import copy
|
||||
import numpy as np
|
||||
|
||||
|
||||
# Define angle and vectors operations
|
||||
def vector(A, B):
|
||||
"""
|
||||
Define a vector from the inputed points A, B.
|
||||
"""
|
||||
return (B[0] - A[0], B[1] - A[1])
|
||||
|
||||
|
||||
def determinant(u, v):
|
||||
"""
|
||||
Compute the determinant of 2 vectors.
|
||||
--------
|
||||
Inputs:
|
||||
u, v : vectors (list of length 2)
|
||||
Vectors for which the determinant must be computed.
|
||||
----------
|
||||
Returns:
|
||||
determinant : real
|
||||
Determinant of the input vectors.
|
||||
"""
|
||||
return u[0] * v[1] - u[1] * v[0]
|
||||
|
||||
|
||||
def pseudo_angle(A, B, C):
|
||||
"""
|
||||
Define a pseudo-angle from the inputed points A, B, C.
|
||||
"""
|
||||
u = vector(A, B)
|
||||
v = vector(A, C)
|
||||
return determinant(u, v)
|
||||
|
||||
|
||||
def distance(A, B):
|
||||
"""
|
||||
Compute the euclidian distance betwwen 2 points of the plan.
|
||||
----------
|
||||
Inputs:
|
||||
A, B : points (list of length 2)
|
||||
Points between which the distance must be computed.
|
||||
----------
|
||||
Returns:
|
||||
distance : real
|
||||
Euclidian distance between A, B.
|
||||
"""
|
||||
x, y = vector(A, B)
|
||||
return np.sqrt(x ** 2 + y ** 2)
|
||||
|
||||
|
||||
# Define lexicographic and composition order
|
||||
def lexico(A, B):
|
||||
"""
|
||||
Define the lexicographic order between points A, B.
|
||||
A <= B if:
|
||||
- y_a < y_b
|
||||
or - y_a = y_b and x_a < x_b
|
||||
----------
|
||||
Inputs:
|
||||
A, B : points (list of length 2)
|
||||
Points compared for the lexicographic order
|
||||
----------
|
||||
Returns:
|
||||
lexico : boolean
|
||||
True if A<=B, False otherwise.
|
||||
"""
|
||||
return (A[1] < B[1]) or (A[1] == B[1] and A[0] <= B[0])
|
||||
|
||||
|
||||
def min_lexico(s):
|
||||
"""
|
||||
Return the minimum of a list of points for the lexicographic order.
|
||||
----------
|
||||
Inputs:
|
||||
s : list of points
|
||||
List of points in which we look for the minimum for the lexicographic
|
||||
order
|
||||
----------
|
||||
Returns:
|
||||
m : point (list of length 2)
|
||||
Minimum for the lexicographic order in the input list s.
|
||||
"""
|
||||
m = s[0]
|
||||
for x in s:
|
||||
if lexico(x, m): m = x
|
||||
return m
|
||||
|
||||
|
||||
def comp(Omega, A, B):
|
||||
"""
|
||||
Test the order relation between points such that A <= B :
|
||||
This is True if :
|
||||
- (Omega-A,Omega-B) is a right-hand basis of the plan.
|
||||
or - (Omega-A,Omega-B) are linearly dependent.
|
||||
----------
|
||||
Inputs:
|
||||
Omega, A, B : points (list of length 2)
|
||||
Points compared for the composition order
|
||||
----------
|
||||
Returns:
|
||||
comp : boolean
|
||||
True if A <= B, False otherwise.
|
||||
"""
|
||||
t = pseudo_angle(Omega, A, B)
|
||||
if t > 0:
|
||||
return True
|
||||
elif t < 0:
|
||||
return False
|
||||
else:
|
||||
return distance(Omega, A) <= distance(Omega, B)
|
||||
|
||||
|
||||
# Implement quicksort
|
||||
def partition(s, l, r, order):
|
||||
"""
|
||||
Take a random element of a list 's' between indexes 'l', 'r' and place it
|
||||
at its right spot using relation order 'order'. Return the index at which
|
||||
it was placed.
|
||||
----------
|
||||
Inputs:
|
||||
s : list
|
||||
List of elements to be ordered.
|
||||
l, r : int
|
||||
Index of the first and last elements to be considered.
|
||||
order : func: A, B -> bool
|
||||
Relation order between 2 elements A, B that returns True if A<=B,
|
||||
False otherwise.
|
||||
----------
|
||||
Returns:
|
||||
index : int
|
||||
Index at which have been placed the element chosen by the function.
|
||||
"""
|
||||
i = l - 1
|
||||
for j in range(l, r):
|
||||
if order(s[j], s[r]):
|
||||
i = i + 1
|
||||
temp = copy.deepcopy(s[i])
|
||||
s[i] = copy.deepcopy(s[j])
|
||||
s[j] = copy.deepcopy(temp)
|
||||
temp = copy.deepcopy(s[i+1])
|
||||
s[i+1] = copy.deepcopy(s[r])
|
||||
s[r] = copy.deepcopy(temp)
|
||||
return i + 1
|
||||
|
||||
|
||||
def sort_aux(s, l, r, order):
|
||||
"""
|
||||
Sort a list 's' between indexes 'l', 'r' using relation order 'order' by
|
||||
dividing it in 2 sub-lists and sorting these.
|
||||
"""
|
||||
if l <= r:
|
||||
# Call partition function that gives an index on which the list will be
|
||||
#divided
|
||||
q = partition(s, l, r, order)
|
||||
sort_aux(s, l, q - 1, order)
|
||||
sort_aux(s, q + 1, r, order)
|
||||
|
||||
|
||||
def quicksort(s, order):
|
||||
"""
|
||||
Implement the quicksort algorithm on the list 's' using the relation order
|
||||
'order'.
|
||||
"""
|
||||
# Call auxiliary sort on whole list.
|
||||
sort_aux(s, 0, len(s) - 1, order)
|
||||
|
||||
|
||||
def sort_angles_distances(Omega, s):
|
||||
"""
|
||||
Sort the list of points 's' for the composition order given reference point
|
||||
Omega.
|
||||
"""
|
||||
order = lambda A, B: comp(Omega, A, B)
|
||||
quicksort(s, order)
|
||||
|
||||
|
||||
# Define fuction for stacks (use here python lists with stack operations).
|
||||
def empty_stack(): return []
|
||||
def stack(S, A): S.append(A)
|
||||
def unstack(S): S.pop()
|
||||
def stack_top(S): return S[-1]
|
||||
def stack_sub_top(S): return S[-2]
|
||||
|
||||
|
||||
# Alignement handling
|
||||
def del_aux(s, k, Omega):
|
||||
"""
|
||||
Returne the index of the first element in 's' after the index 'k' that is
|
||||
not linearly dependent of (Omega-s[k]), first element not aligned to Omega
|
||||
and s[k].
|
||||
----------
|
||||
Inputs:
|
||||
s : list
|
||||
List of points
|
||||
k : int
|
||||
Index from which to look for aligned points in s.
|
||||
Omega : point (list of length 2)
|
||||
Reference point in 's' to test alignement.
|
||||
----------
|
||||
Returns:
|
||||
index : int
|
||||
Index of the first element not aligned to Omega and s[k].
|
||||
"""
|
||||
p = s[k]
|
||||
j = k + 1
|
||||
while (j < len(s)) and (pseudo_angle(Omega, p, s[j]) == 0):
|
||||
j = j + 1
|
||||
return j
|
||||
|
||||
|
||||
def del_aligned(s, Omega):
|
||||
"""
|
||||
Deprive a list of points 's' from all intermediary points that are aligned
|
||||
using Omega as a reference point.
|
||||
----------
|
||||
Inputs:
|
||||
s : list
|
||||
List of points
|
||||
Omega : point (list of length 2)
|
||||
Point of reference to test alignement.
|
||||
-----------
|
||||
Returns:
|
||||
s1 : list
|
||||
List of points where no points are aligned using reference point Omega.
|
||||
"""
|
||||
s1 = [s[0]]
|
||||
n = len(s)
|
||||
k = 1
|
||||
while k < n:
|
||||
# Get the index of the last point aligned with (Omega-s[k])
|
||||
k = del_aux(s, k, Omega)
|
||||
s1.append(s[k - 1])
|
||||
return s1
|
||||
|
||||
|
||||
# Implement Graham algorithm
|
||||
def convex_hull(A):
|
||||
"""
|
||||
Implement Graham algorithm to find the convex hull of a given list of
|
||||
points, handling aligned points.
|
||||
----------
|
||||
Inputs:
|
||||
A : list
|
||||
List of points for which the the convex hull must be returned.
|
||||
----------
|
||||
S : list
|
||||
List of points defining the convex hull of the input list A.
|
||||
"""
|
||||
S = empty_stack()
|
||||
Omega = min_lexico(A)
|
||||
sort_angles_distances(Omega, A)
|
||||
A = del_aligned(A, Omega)
|
||||
stack(S, A[0])
|
||||
stack(S, A[1])
|
||||
stack(S, A[2])
|
||||
for i in range(3, len(A)):
|
||||
while pseudo_angle(stack_sub_top(S), stack_top(S), A[i]) <= 0:
|
||||
unstack(S)
|
||||
stack(S, A[i])
|
||||
return S
|
||||
|
||||
|
||||
def image_hull(image, step=5, null_val=0., inside=True):
|
||||
"""
|
||||
Compute the convex hull of a 2D image and return the 4 relevant coordinates
|
||||
of the maximum included rectangle (ie. crop image to maximum rectangle).
|
||||
----------
|
||||
Inputs:
|
||||
image : numpy.ndarray
|
||||
2D array of floats corresponding to an image in intensity that need to
|
||||
be cropped down.
|
||||
step : int, optional
|
||||
For images with straight edges, not all lines and columns need to be
|
||||
browsed in order to have a good convex hull. Step value determine
|
||||
how many row/columns can be jumped. With step=2 every other line will
|
||||
be browsed.
|
||||
Defaults to 5.
|
||||
null_val : float, optional
|
||||
Pixel value determining the threshold for what is considered 'outside'
|
||||
the image. All border pixels below this value will be taken out.
|
||||
Defaults to 0.
|
||||
inside : boolean, optional
|
||||
If True, the cropped image will be the maximum rectangle included
|
||||
inside the image. If False, the cropped image will be the minimum
|
||||
rectangle in which the whole image is included.
|
||||
Defaults to True.
|
||||
----------
|
||||
Returns:
|
||||
vertex : numpy.ndarray
|
||||
Array containing the vertex of the maximum included rectangle.
|
||||
"""
|
||||
H = []
|
||||
shape = np.array(image.shape)
|
||||
row, col = np.indices(shape)
|
||||
for i in range(0,shape[0],step):
|
||||
r = row[i,:][image[i,:]>null_val]
|
||||
c = col[i,:][image[i,:]>null_val]
|
||||
if len(r)>1 and len(c)>1:
|
||||
H.append((r[0],c[0]))
|
||||
H.append((r[-1],c[-1]))
|
||||
for j in range(0,shape[1],step):
|
||||
r = row[:,j][image[:,j]>null_val]
|
||||
c = col[:,j][image[:,j]>null_val]
|
||||
if len(r)>1 and len(c)>1:
|
||||
if not((r[0],c[0]) in H):
|
||||
H.append((r[0],c[0]))
|
||||
if not((r[-1],c[-1]) in H):
|
||||
H.append((r[-1],c[-1]))
|
||||
S = np.array(convex_hull(H))
|
||||
|
||||
x_min, y_min = S[:,0]<S[:,0].mean(), S[:,1]<S[:,1].mean()
|
||||
x_max, y_max = S[:,0]>S[:,0].mean(), S[:,1]>S[:,1].mean()
|
||||
# Get the 4 extrema
|
||||
S0 = S[x_min*y_min][np.abs(0-S[x_min*y_min].sum(axis=1)).min() == np.abs(0-S[x_min*y_min].sum(axis=1))][0]
|
||||
S1 = S[x_min*y_max][np.abs(shape[1]-S[x_min*y_max].sum(axis=1)).min() == np.abs(shape[1]-S[x_min*y_max].sum(axis=1))][0]
|
||||
S2 = S[x_max*y_min][np.abs(shape[0]-S[x_max*y_min].sum(axis=1)).min() == np.abs(shape[0]-S[x_max*y_min].sum(axis=1))][0]
|
||||
S3 = S[x_max*y_max][np.abs(shape.sum()-S[x_max*y_max].sum(axis=1)).min() == np.abs(shape.sum()-S[x_max*y_max].sum(axis=1))][0]
|
||||
# Get the vertex of the biggest included rectangle
|
||||
if inside:
|
||||
f0 = np.max([S0[0],S1[0]])
|
||||
f1 = np.min([S2[0],S3[0]])
|
||||
f2 = np.max([S0[1],S2[1]])
|
||||
f3 = np.min([S1[1],S3[1]])
|
||||
else:
|
||||
f0 = np.min([S0[0],S1[0]])
|
||||
f1 = np.max([S2[0],S3[0]])
|
||||
f2 = np.min([S0[1],S2[1]])
|
||||
f3 = np.max([S1[1],S3[1]])
|
||||
|
||||
return np.array([f0, f1, f2, f3]).astype(int)
|
||||
249
src/lib/cross_correlation.py
Executable file
249
src/lib/cross_correlation.py
Executable file
@@ -0,0 +1,249 @@
|
||||
"""
|
||||
Library functions for phase cross-correlation computation.
|
||||
"""
|
||||
##Prefer FFTs via the new scipy.fft module when available (SciPy 1.4+)
|
||||
#Otherwise fall back to numpy.fft.
|
||||
#Like numpy 1.15+ scipy 1.3+ is also using pocketfft, but a newer
|
||||
#C++/pybind11 version called pypocketfft
|
||||
try:
|
||||
import scipy.fft as fft
|
||||
except ImportError:
|
||||
import numpy.fft as fft
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def _upsampled_dft(data, upsampled_region_size,
|
||||
upsample_factor=1, axis_offsets=None):
|
||||
"""
|
||||
Upsampled DFT by matrix multiplication.
|
||||
This code is intended to provide the same result as if the following
|
||||
operations were performed:
|
||||
- Embed the array "data" in an array that is ``upsample_factor`` times
|
||||
larger in each dimension. ifftshift to bring the center of the
|
||||
image to (1,1).
|
||||
- Take the FFT of the larger array.
|
||||
- Extract an ``[upsampled_region_size]`` region of the result, starting
|
||||
with the ``[axis_offsets+1]`` element.
|
||||
It achieves this result by computing the DFT in the output array without
|
||||
the need to zeropad. Much faster and memory efficient than the zero-padded
|
||||
FFT approach if ``upsampled_region_size`` is much smaller than
|
||||
``data.size * upsample_factor``.
|
||||
----------
|
||||
Inputs:
|
||||
data : array
|
||||
The input data array (DFT of original data) to upsample.
|
||||
upsampled_region_size : integer or tuple of integers, optional
|
||||
The size of the region to be sampled. If one integer is provided, it
|
||||
is duplicated up to the dimensionality of ``data``.
|
||||
upsample_factor : integer, optional
|
||||
The upsampling factor. Defaults to 1.
|
||||
axis_offsets : tuple of integers, optional
|
||||
The offsets of the region to be sampled. Defaults to None (uses
|
||||
image center)
|
||||
----------
|
||||
Returns:
|
||||
output : ndarray
|
||||
The upsampled DFT of the specified region.
|
||||
"""
|
||||
# if people pass in an integer, expand it to a list of equal-sized sections
|
||||
if not hasattr(upsampled_region_size, "__iter__"):
|
||||
upsampled_region_size = [upsampled_region_size, ] * data.ndim
|
||||
else:
|
||||
if len(upsampled_region_size) != data.ndim:
|
||||
raise ValueError("shape of upsampled region sizes must be equal "
|
||||
"to input data's number of dimensions.")
|
||||
|
||||
if axis_offsets is None:
|
||||
axis_offsets = [0, ] * data.ndim
|
||||
else:
|
||||
if len(axis_offsets) != data.ndim:
|
||||
raise ValueError("number of axis offsets must be equal to input "
|
||||
"data's number of dimensions.")
|
||||
|
||||
im2pi = 1j * 2 * np.pi
|
||||
|
||||
dim_properties = list(zip(data.shape, upsampled_region_size, axis_offsets))
|
||||
|
||||
for (n_items, ups_size, ax_offset) in dim_properties[::-1]:
|
||||
kernel = ((np.arange(ups_size) - ax_offset)[:, None]
|
||||
* fft.fftfreq(n_items, upsample_factor))
|
||||
kernel = np.exp(-im2pi * kernel)
|
||||
|
||||
# Equivalent to:
|
||||
# data[i, j, k] = kernel[i, :] @ data[j, k].T
|
||||
data = np.tensordot(kernel, data, axes=(1, -1))
|
||||
return data
|
||||
|
||||
|
||||
def _compute_phasediff(cross_correlation_max):
|
||||
"""
|
||||
Compute global phase difference between the two images (should be
|
||||
zero if images are non-negative).
|
||||
----------
|
||||
Inputs:
|
||||
cross_correlation_max : complex
|
||||
The complex value of the cross correlation at its maximum point.
|
||||
"""
|
||||
return np.arctan2(cross_correlation_max.imag, cross_correlation_max.real)
|
||||
|
||||
|
||||
def _compute_error(cross_correlation_max, src_amp, target_amp):
|
||||
"""
|
||||
Compute RMS error metric between ``src_image`` and ``target_image``.
|
||||
----------
|
||||
Inputs:
|
||||
cross_correlation_max : complex
|
||||
The complex value of the cross correlation at its maximum point.
|
||||
src_amp : float
|
||||
The normalized average image intensity of the source image
|
||||
target_amp : float
|
||||
The normalized average image intensity of the target image
|
||||
"""
|
||||
error = 1.0 - cross_correlation_max * cross_correlation_max.conj() /\
|
||||
(src_amp * target_amp)
|
||||
return np.sqrt(np.abs(error))
|
||||
|
||||
|
||||
def phase_cross_correlation(reference_image, moving_image, *,
|
||||
upsample_factor=1, space="real",
|
||||
return_error=True, overlap_ratio=0.3):
|
||||
"""
|
||||
Efficient subpixel image translation registration by cross-correlation.
|
||||
This code gives the same precision as the FFT upsampled cross-correlation
|
||||
in a fraction of the computation time and with reduced memory requirements.
|
||||
It obtains an initial estimate of the cross-correlation peak by an FFT and
|
||||
then refines the shift estimation by upsampling the DFT only in a small
|
||||
neighborhood of that estimate by means of a matrix-multiply DFT.
|
||||
----------
|
||||
Inputs:
|
||||
reference_image : array
|
||||
Reference image.
|
||||
moving_image : array
|
||||
Image to register. Must be same dimensionality as
|
||||
``reference_image``.
|
||||
upsample_factor : int, optional^
|
||||
Upsampling factor. Images will be registered to within
|
||||
``1 / upsample_factor`` of a pixel. For example
|
||||
``upsample_factor == 20`` means the images will be registered
|
||||
within 1/20th of a pixel. Default is 1 (no upsampling).
|
||||
Not used if any of ``reference_mask`` or ``moving_mask`` is not None.
|
||||
space : string, one of "real" or "fourier", optional
|
||||
Defines how the algorithm interprets input data. "real" means
|
||||
data will be FFT'd to compute the correlation, while "fourier"
|
||||
data will bypass FFT of input data. Case insensitive.
|
||||
return_error : bool, optional
|
||||
Returns error and phase difference if on, otherwise only
|
||||
shifts are returned.
|
||||
overlap_ratio : float, optional
|
||||
Minimum allowed overlap ratio between images. The correlation for
|
||||
translations corresponding with an overlap ratio lower than this
|
||||
threshold will be ignored. A lower `overlap_ratio` leads to smaller
|
||||
maximum translation, while a higher `overlap_ratio` leads to greater
|
||||
robustness against spurious matches due to small overlap between
|
||||
masked images.
|
||||
----------
|
||||
Returns:
|
||||
shifts : ndarray
|
||||
Shift vector (in pixels) required to register ``moving_image``
|
||||
with ``reference_image``. Axis ordering is consistent with
|
||||
numpy (e.g. Z, Y, X)
|
||||
error : float
|
||||
Translation invariant normalized RMS error between
|
||||
``reference_image`` and ``moving_image``.
|
||||
phasediff : float
|
||||
Global phase difference between the two images (should be
|
||||
zero if images are non-negative).
|
||||
----------
|
||||
References:
|
||||
[1] Manuel Guizar-Sicairos, Samuel T. Thurman, and James R. Fienup,
|
||||
"Efficient subpixel image registration algorithms,"
|
||||
Optics Letters 33, 156-158 (2008). :DOI:`10.1364/OL.33.000156`
|
||||
[2] James R. Fienup, "Invariant error metrics for image reconstruction"
|
||||
Optics Letters 36, 8352-8357 (1997). :DOI:`10.1364/AO.36.008352`
|
||||
[3] Dirk Padfield. Masked Object Registration in the Fourier Domain.
|
||||
IEEE Transactions on Image Processing, vol. 21(5),
|
||||
pp. 2706-2718 (2012). :DOI:`10.1109/TIP.2011.2181402`
|
||||
[4] D. Padfield. "Masked FFT registration". In Proc. Computer Vision and
|
||||
Pattern Recognition, pp. 2918-2925 (2010).
|
||||
:DOI:`10.1109/CVPR.2010.5540032`
|
||||
"""
|
||||
|
||||
# images must be the same shape
|
||||
if reference_image.shape != moving_image.shape:
|
||||
raise ValueError("images must be same shape")
|
||||
|
||||
# assume complex data is already in Fourier space
|
||||
if space.lower() == 'fourier':
|
||||
src_freq = reference_image
|
||||
target_freq = moving_image
|
||||
# real data needs to be fft'd.
|
||||
elif space.lower() == 'real':
|
||||
src_freq = fft.fftn(reference_image)
|
||||
target_freq = fft.fftn(moving_image)
|
||||
else:
|
||||
raise ValueError('space argument must be "real" of "fourier"')
|
||||
|
||||
# Whole-pixel shift - Compute cross-correlation by an IFFT
|
||||
shape = src_freq.shape
|
||||
image_product = src_freq * target_freq.conj()
|
||||
cross_correlation = fft.ifftn(image_product)
|
||||
|
||||
# Locate maximum
|
||||
maxima = np.unravel_index(np.argmax(np.abs(cross_correlation)),
|
||||
cross_correlation.shape)
|
||||
midpoints = np.array([np.fix(axis_size / 2) for axis_size in shape])
|
||||
|
||||
shifts = np.stack(maxima).astype(np.float64)
|
||||
shifts[shifts > midpoints] -= np.array(shape)[shifts > midpoints]
|
||||
|
||||
if upsample_factor == 1:
|
||||
if return_error:
|
||||
src_amp = np.sum(np.real(src_freq * src_freq.conj()))
|
||||
src_amp /= src_freq.size
|
||||
target_amp = np.sum(np.real(target_freq * target_freq.conj()))
|
||||
target_amp /= target_freq.size
|
||||
CCmax = cross_correlation[maxima]
|
||||
# If upsampling > 1, then refine estimate with matrix multiply DFT
|
||||
else:
|
||||
# Initial shift estimate in upsampled grid
|
||||
shifts = np.round(shifts * upsample_factor) / upsample_factor
|
||||
upsampled_region_size = np.ceil(upsample_factor * 1.5)
|
||||
# Center of output array at dftshift + 1
|
||||
dftshift = np.fix(upsampled_region_size / 2.0)
|
||||
upsample_factor = np.array(upsample_factor, dtype=np.float64)
|
||||
# Matrix multiply DFT around the current shift estimate
|
||||
sample_region_offset = dftshift - shifts*upsample_factor
|
||||
cross_correlation = _upsampled_dft(image_product.conj(),
|
||||
upsampled_region_size,
|
||||
upsample_factor,
|
||||
sample_region_offset).conj()
|
||||
# Locate maximum and map back to original pixel grid
|
||||
maxima = np.unravel_index(np.argmax(np.abs(cross_correlation)),
|
||||
cross_correlation.shape)
|
||||
CCmax = cross_correlation[maxima]
|
||||
|
||||
maxima = np.stack(maxima).astype(np.float64) - dftshift
|
||||
|
||||
shifts = shifts + maxima / upsample_factor
|
||||
|
||||
if return_error:
|
||||
src_amp = np.sum(np.real(src_freq * src_freq.conj()))
|
||||
target_amp = np.sum(np.real(target_freq * target_freq.conj()))
|
||||
|
||||
# If its only one row or column the shift along that dimension has no
|
||||
# effect. We set to zero.
|
||||
for dim in range(src_freq.ndim):
|
||||
if shape[dim] == 1:
|
||||
shifts[dim] = 0
|
||||
|
||||
if return_error:
|
||||
# Redirect user to masked_phase_cross_correlation if NaNs are observed
|
||||
if np.isnan(CCmax) or np.isnan(src_amp) or np.isnan(target_amp):
|
||||
raise ValueError(
|
||||
"NaN values found, please remove NaNs from your input data")
|
||||
|
||||
return shifts, _compute_error(CCmax, src_amp, target_amp),\
|
||||
_compute_phasediff(CCmax)
|
||||
else:
|
||||
return shifts
|
||||
122
src/lib/deconvolve.py
Executable file
122
src/lib/deconvolve.py
Executable file
@@ -0,0 +1,122 @@
|
||||
"""
|
||||
Library function for the implementation of Richardson-Lucy deconvolution algorithm.
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
from scipy.signal import convolve
|
||||
|
||||
|
||||
def richardson_lucy(image, psf, iterations=20, clip=True, filter_epsilon=None):
|
||||
"""
|
||||
Richardson-Lucy deconvolution algorithm.
|
||||
----------
|
||||
Inputs:
|
||||
image : numpy.darray
|
||||
Input degraded image (can be N dimensional) of floats between 0 and 1.
|
||||
psf : numpy.darray
|
||||
The point spread function.
|
||||
iterations : int, optional
|
||||
Number of iterations. This parameter plays the role of
|
||||
regularisation.
|
||||
clip : boolean, optional
|
||||
True by default. If true, pixel value of the result above 1 or
|
||||
under -1 are thresholded for skimage pipeline compatibility.
|
||||
filter_epsilon: float, optional
|
||||
Value below which intermediate results become 0 to avoid division
|
||||
by small numbers.
|
||||
----------
|
||||
Returns:
|
||||
im_deconv : ndarray
|
||||
The deconvolved image.
|
||||
----------
|
||||
References
|
||||
[1] https://doi.org/10.1364/JOSA.62.000055
|
||||
[2] https://en.wikipedia.org/wiki/Richardson%E2%80%93Lucy_deconvolution
|
||||
"""
|
||||
float_type = np.promote_types(image.dtype, np.float32)
|
||||
image = image.astype(float_type, copy=False)
|
||||
psf = psf.astype(float_type, copy=False)
|
||||
im_deconv = np.full(image.shape, 0.5, dtype=float_type)
|
||||
psf_mirror = np.flip(psf)
|
||||
|
||||
for _ in range(iterations):
|
||||
conv = convolve(im_deconv, psf, mode='same')
|
||||
if filter_epsilon:
|
||||
relative_blur = np.where(conv < filter_epsilon, 0, image / conv)
|
||||
else:
|
||||
relative_blur = image / conv
|
||||
im_deconv *= convolve(relative_blur, psf_mirror, mode='same')
|
||||
|
||||
if clip:
|
||||
im_deconv[im_deconv > 1] = 1
|
||||
im_deconv[im_deconv < -1] = -1
|
||||
|
||||
return im_deconv
|
||||
|
||||
|
||||
def deconvolve_im(image, psf, iterations=20, clip=True, filter_epsilon=None):
|
||||
"""
|
||||
Prepare an image for deconvolution using Richardson-Lucy algorithm and
|
||||
return results.
|
||||
----------
|
||||
Inputs:
|
||||
image : numpy.darray
|
||||
Input degraded image (can be N dimensional) of floats between 0 and 1.
|
||||
psf : numpy.darray
|
||||
The point spread function.
|
||||
iterations : int, optional
|
||||
Number of iterations. This parameter plays the role of
|
||||
regularisation.
|
||||
clip : boolean, optional
|
||||
True by default. If true, pixel value of the result above 1 or
|
||||
under -1 are thresholded for skimage pipeline compatibility.
|
||||
filter_epsilon: float, optional
|
||||
Value below which intermediate results become 0 to avoid division
|
||||
by small numbers.
|
||||
----------
|
||||
Returns:
|
||||
im_deconv : ndarray
|
||||
The deconvolved image.
|
||||
"""
|
||||
# Normalize image to highest pixel value
|
||||
pxmax = image[np.isfinite(image)].max()
|
||||
if pxmax == 0.:
|
||||
raise ValueError("Invalid image")
|
||||
norm_image = image/pxmax
|
||||
|
||||
# Deconvolve normalized image
|
||||
norm_deconv = richardson_lucy(image=norm_image, psf=psf, iterations=iterations,
|
||||
clip=clip, filter_epsilon=filter_epsilon)
|
||||
|
||||
# Output deconvolved image with original pxmax value
|
||||
im_deconv = pxmax*norm_deconv
|
||||
|
||||
return im_deconv
|
||||
|
||||
|
||||
def gaussian_psf(FWHM=1., shape=(5,5)):
|
||||
"""
|
||||
Define the gaussian Point-Spread-Function of chosen shape and FWHM.
|
||||
----------
|
||||
Inputs:
|
||||
FWHM : float, optional
|
||||
The Full Width at Half Maximum of the desired gaussian function for the
|
||||
PSF in pixel increments.
|
||||
Defaults to 1.
|
||||
shape : tuple, optional
|
||||
The shape of the PSF kernel. Must be of dimension 2.
|
||||
Defaults to (5,5).
|
||||
----------
|
||||
Returns:
|
||||
kernel : numpy.ndarray
|
||||
Kernel containing the weights of the desired gaussian PSF.
|
||||
"""
|
||||
# Compute standard deviation from FWHM
|
||||
stdev = FWHM/(2.*np.sqrt(2.*np.log(2.)))
|
||||
|
||||
# Create kernel of desired shape
|
||||
xx, yy = np.indices(shape)
|
||||
x0, y0 = (np.array(shape)-1.)/2.
|
||||
kernel = np.exp(-0.5*((xx-x0)**2+(yy-y0)**2)/stdev**2)
|
||||
|
||||
return kernel
|
||||
139
src/lib/fits.py
Executable file
139
src/lib/fits.py
Executable file
@@ -0,0 +1,139 @@
|
||||
"""
|
||||
Library function for simplified fits handling.
|
||||
|
||||
prototypes :
|
||||
- get_obs_data(infiles, data_folder) -> data_array, headers
|
||||
Extract the observationnal data from fits files
|
||||
|
||||
- save_Stokes(I, Q, U, Stokes_cov, P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P, ref_header, filename, data_folder, return_hdul) -> ( HDUL_data )
|
||||
Save computed polarimetry parameters to a single fits file (and return HDUList)
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
from astropy.io import fits
|
||||
from astropy import wcs
|
||||
|
||||
|
||||
def get_obs_data(infiles, data_folder="", compute_flux=False):
|
||||
"""
|
||||
Extract the observationnal data from the given fits files.
|
||||
----------
|
||||
Inputs:
|
||||
infiles : strlist
|
||||
List of the fits file names to be added to the observation set.
|
||||
data_folder : str, optional
|
||||
Relative or absolute path to the folder containing the data.
|
||||
compute_flux : boolean, optional
|
||||
If True, return data_array will contain flux information, assuming
|
||||
raw data are counts and header have keywork EXPTIME and PHOTFLAM.
|
||||
Default to False.
|
||||
----------
|
||||
Returns:
|
||||
data_array : numpy.ndarray
|
||||
Array of images (2D floats) containing the observation data.
|
||||
headers : header list
|
||||
List of headers objects corresponding to each image in data_array.
|
||||
"""
|
||||
data_array, headers = [], []
|
||||
for i in range(len(infiles)):
|
||||
with fits.open(data_folder+infiles[i]) as f:
|
||||
headers.append(f[0].header)
|
||||
data_array.append(f[0].data)
|
||||
data_array = np.array(data_array)
|
||||
|
||||
if compute_flux:
|
||||
for i in range(len(infiles)):
|
||||
# Compute the flux in ergs/cm²/s.angstrom
|
||||
data_array[i] *= headers[i]['PHOTFLAM']/headers[i]['EXPTIME']
|
||||
|
||||
return data_array, headers
|
||||
|
||||
|
||||
def save_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, P, debiased_P, s_P,
|
||||
s_P_P, PA, s_PA, s_PA_P, ref_header, filename, data_folder="",
|
||||
return_hdul=False):
|
||||
"""
|
||||
Save computed polarimetry parameters to a single fits file,
|
||||
updating header accordingly.
|
||||
----------
|
||||
Inputs:
|
||||
I_stokes, Q_stokes, U_stokes, P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P : numpy.ndarray
|
||||
Images (2D float arrays) containing the computed polarimetric data :
|
||||
Stokes parameters I, Q, U, Polarization degree and debieased,
|
||||
its error propagated and assuming Poisson noise, Polarization angle,
|
||||
its error propagated and assuming Poisson noise.
|
||||
Stokes_cov : numpy.ndarray
|
||||
Covariance matrix of the Stokes parameters I, Q, U.
|
||||
ref_header : astropy.io.fits.header.Header
|
||||
Header of reference some keywords will be copied from (CRVAL, CDELT,
|
||||
INSTRUME, PROPOSID, TARGNAME, ORIENTAT).
|
||||
filename : str
|
||||
Name that will be given to the file on writing (will appear in header).
|
||||
data_folder : str, optional
|
||||
Relative or absolute path to the folder the fits file will be saved to.
|
||||
Defaults to current folder.
|
||||
return_hdul : boolean, optional
|
||||
If True, the function will return the created HDUList from the
|
||||
input arrays.
|
||||
Defaults to False.
|
||||
----------
|
||||
Return:
|
||||
hdul : astropy.io.fits.hdu.hdulist.HDUList
|
||||
HDUList containing I_stokes in the PrimaryHDU, then Q_stokes, U_stokes,
|
||||
P, s_P, PA, s_PA in this order. Headers have been updated to relevant
|
||||
informations (WCS, orientation, data_type).
|
||||
Only returned if return_hdul is True.
|
||||
"""
|
||||
#Create new WCS object given the modified images
|
||||
new_wcs = wcs.WCS(ref_header).deepcopy()
|
||||
if new_wcs.wcs.has_cd():
|
||||
del new_wcs.wcs.cd
|
||||
keys = list(new_wcs.to_header().keys())+['CD1_1','CD1_2','CD2_1','CD2_2']
|
||||
for key in keys:
|
||||
ref_header.remove(key, ignore_missing=True)
|
||||
new_wcs.wcs.cdelt = 3600.*np.sqrt(np.sum(new_wcs.wcs.get_pc()**2,axis=1))
|
||||
if (new_wcs.wcs.cdelt == np.array([1., 1.])).all() and \
|
||||
(new_wcs.array_shape in [(512, 512),(1024,512),(512,1024),(1024,1024)]):
|
||||
# Update WCS with relevant information
|
||||
HST_aper = 2400. # HST aperture in mm
|
||||
f_ratio = ref_header['f_ratio']
|
||||
px_dim = np.array([25., 25.]) # Pixel dimension in µm
|
||||
if ref_header['pxformt'].lower() == 'zoom':
|
||||
px_dim[0] = 50.
|
||||
new_wcs.wcs.cdelt = 206.3*px_dim/(f_ratio*HST_aper)
|
||||
new_wcs.wcs.crpix = [I_stokes.shape[0]/2, I_stokes.shape[1]/2]
|
||||
|
||||
header = new_wcs.to_header()
|
||||
header['instrume'] = (ref_header['instrume'], 'Instrument from which data is reduced')
|
||||
header['photplam'] = (ref_header['photplam'], 'Pivot Wavelength')
|
||||
header['proposid'] = (ref_header['proposid'], 'PEP proposal identifier for observation')
|
||||
header['targname'] = (ref_header['targname'], 'Target name')
|
||||
header['orientat'] = (ref_header['orientat'], 'Angle between North and the y-axis of the image')
|
||||
header['filename'] = (filename, 'Original filename')
|
||||
|
||||
#Create HDUList object
|
||||
hdul = fits.HDUList([])
|
||||
|
||||
#Add I_stokes as PrimaryHDU
|
||||
header['datatype'] = ('I_stokes', 'type of data stored in the HDU')
|
||||
primary_hdu = fits.PrimaryHDU(data=I_stokes, header=header)
|
||||
hdul.append(primary_hdu)
|
||||
|
||||
#Add Q, U, Stokes_cov, P, s_P, PA, s_PA to the HDUList
|
||||
for data, name in [[Q_stokes,'Q_stokes'],[U_stokes,'U_stokes'],
|
||||
[Stokes_cov,'IQU_cov_matrix'],[P, 'Pol_deg'],
|
||||
[debiased_P, 'Pol_deg_debiased'],[s_P, 'Pol_deg_err'],
|
||||
[s_P_P, 'Pol_deg_err_Poisson_noise'],[PA, 'Pol_ang'],
|
||||
[s_PA, 'Pol_ang_err'],[s_PA_P, 'Pol_ang_err_Poisson_noise']]:
|
||||
hdu_header = header.copy()
|
||||
hdu_header['datatype'] = name
|
||||
hdu = fits.ImageHDU(data=data,header=hdu_header)
|
||||
hdul.append(hdu)
|
||||
|
||||
#Save fits file to designated filepath
|
||||
hdul.writeto(data_folder+filename+".fits", overwrite=True)
|
||||
|
||||
if return_hdul:
|
||||
return hdul
|
||||
else:
|
||||
return 0
|
||||
225
src/lib/plots.py
Executable file
225
src/lib/plots.py
Executable file
@@ -0,0 +1,225 @@
|
||||
"""
|
||||
Library functions for displaying informations using matplotlib
|
||||
|
||||
prototypes :
|
||||
- plot_obs(data_array, headers, shape, vmin, vmax, savename, plots_folder)
|
||||
Plots whole observation raw data in given display shape
|
||||
|
||||
- polarization_map(Stokes_hdul, SNRp_cut, SNRi_cut, step_vec, savename, plots_folder, display)
|
||||
Plots polarization map of polarimetric parameters saved in an HDUList
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.patches import Rectangle
|
||||
from mpl_toolkits.axes_grid1.anchored_artists import AnchoredSizeBar
|
||||
from astropy.wcs import WCS
|
||||
|
||||
|
||||
def plot_obs(data_array, headers, shape=None, vmin=0., vmax=6., rectangle=None,
|
||||
savename=None, plots_folder=""):
|
||||
"""
|
||||
Plots raw observation imagery with some information on the instrument and
|
||||
filters.
|
||||
----------
|
||||
Inputs:
|
||||
data_array : numpy.ndarray
|
||||
Array of images (2D floats, aligned and of the same shape) of a
|
||||
single observation with multiple polarizers of an instrument
|
||||
headers : header list
|
||||
List of headers corresponding to the images in data_array
|
||||
shape : array-like of length 2, optional
|
||||
Shape of the display, with shape = [#row, #columns]. If None, defaults
|
||||
to the optimal square.
|
||||
Defaults to None.
|
||||
vmin : float, optional
|
||||
Min pixel value that should be displayed.
|
||||
Defaults to 0.
|
||||
vmax : float, optional
|
||||
Max pixel value that should be displayed.
|
||||
Defaults to 6.
|
||||
rectangle : numpy.ndarray, optional
|
||||
Array of parameters for matplotlib.patches.Rectangle objects that will
|
||||
be displayed on each output image. If None, no rectangle displayed.
|
||||
Defaults to None.
|
||||
savename : str, optional
|
||||
Name of the figure the map should be saved to. If None, the map won't
|
||||
be saved (only displayed).
|
||||
Defaults to None.
|
||||
plots_folder : str, optional
|
||||
Relative (or absolute) filepath to the folder in wich the map will
|
||||
be saved. Not used if savename is None.
|
||||
Defaults to current folder.
|
||||
"""
|
||||
if shape is None:
|
||||
shape = np.array([np.ceil(np.sqrt(data_array.shape[0])).astype(int),]*2)
|
||||
fig, ax = plt.subplots(shape[0], shape[1], figsize=(10,10), dpi=200,
|
||||
sharex=True, sharey=True)
|
||||
|
||||
for i, enum in enumerate(list(zip(ax.flatten(),data_array))):
|
||||
ax = enum[0]
|
||||
data = enum[1]
|
||||
instr = headers[i]['instrume']
|
||||
rootname = headers[i]['rootname']
|
||||
exptime = headers[i]['exptime']
|
||||
filt = headers[i]['filtnam1']
|
||||
#plots
|
||||
im = ax.imshow(data, vmin=vmin, vmax=vmax, origin='lower')
|
||||
if not(rectangle is None):
|
||||
x, y, width, height = rectangle[i]
|
||||
ax.add_patch(Rectangle((x, y), width, height, edgecolor='r', fill=False))
|
||||
#position of centroid
|
||||
ax.plot([data.shape[1]/2, data.shape[1]/2], [0,data.shape[0]-1], lw=1,
|
||||
color='black')
|
||||
ax.plot([0,data.shape[1]-1], [data.shape[1]/2, data.shape[1]/2], lw=1,
|
||||
color='black')
|
||||
ax.annotate(instr+":"+rootname, color='white', fontsize=5, xy=(0.02, 0.95), xycoords='axes fraction')
|
||||
ax.annotate(filt, color='white', fontsize=10, xy=(0.02, 0.02), xycoords='axes fraction')
|
||||
ax.annotate(exptime, color='white', fontsize=5, xy=(0.80, 0.02), xycoords='axes fraction')
|
||||
|
||||
fig.subplots_adjust(hspace=0, wspace=0, right=0.85)
|
||||
cbar_ax = fig.add_axes([0.9, 0.12, 0.02, 0.75])
|
||||
fig.colorbar(im, cax=cbar_ax)
|
||||
|
||||
if not (savename is None):
|
||||
fig.suptitle(savename)
|
||||
fig.savefig(plots_folder+savename+".png",bbox_inches='tight')
|
||||
plt.show()
|
||||
return 0
|
||||
|
||||
def polarization_map(Stokes, SNRp_cut=3., SNRi_cut=30., step_vec=1,
|
||||
savename=None, plots_folder="", display=None):
|
||||
"""
|
||||
Plots polarization map from Stokes HDUList.
|
||||
----------
|
||||
Inputs:
|
||||
Stokes : astropy.io.fits.hdu.hdulist.HDUList
|
||||
HDUList containing I, Q, U, P, s_P, PA, s_PA (in this particular order)
|
||||
for one observation.
|
||||
SNRp_cut : float, optional
|
||||
Cut that should be applied to the signal-to-noise ratio on P.
|
||||
Any SNR < SNRp_cut won't be displayed.
|
||||
Defaults to 3.
|
||||
SNRi_cut : float, optional
|
||||
Cut that should be applied to the signal-to-noise ratio on I.
|
||||
Any SNR < SNRi_cut won't be displayed.
|
||||
Defaults to 30. This value implies an uncertainty in P of 4.7%
|
||||
step_vec : int, optional
|
||||
Number of steps between each displayed polarization vector.
|
||||
If step_vec = 2, every other vector will be displayed.
|
||||
Defaults to 1
|
||||
savename : str, optional
|
||||
Name of the figure the map should be saved to. If None, the map won't
|
||||
be saved (only displayed).
|
||||
Defaults to None.
|
||||
plots_folder : str, optional
|
||||
Relative (or absolute) filepath to the folder in wich the map will
|
||||
be saved. Not used if savename is None.
|
||||
Defaults to current folder.
|
||||
display : str, optional
|
||||
Choose the map to display between intensity (default), polarization
|
||||
degree ('p','pol','pol_deg') or polarization degree error ('s_p',
|
||||
'pol_err','pol_deg_err').
|
||||
Defaults to None (intensity).
|
||||
"""
|
||||
#Get data
|
||||
stkI = Stokes[np.argmax([Stokes[i].header['datatype']=='I_stokes' for i in range(len(Stokes))])]
|
||||
stkQ = Stokes[np.argmax([Stokes[i].header['datatype']=='Q_stokes' for i in range(len(Stokes))])]
|
||||
stkU = Stokes[np.argmax([Stokes[i].header['datatype']=='U_stokes' for i in range(len(Stokes))])]
|
||||
stk_cov = Stokes[np.argmax([Stokes[i].header['datatype']=='IQU_cov_matrix' for i in range(len(Stokes))])]
|
||||
pol = Stokes[np.argmax([Stokes[i].header['datatype']=='Pol_deg' for i in range(len(Stokes))])]
|
||||
pol_err = Stokes[np.argmax([Stokes[i].header['datatype']=='Pol_deg_err' for i in range(len(Stokes))])]
|
||||
pang = Stokes[np.argmax([Stokes[i].header['datatype']=='Pol_ang' for i in range(len(Stokes))])]
|
||||
pang_err = Stokes[np.argmax([Stokes[i].header['datatype']=='Pol_ang_err' for i in range(len(Stokes))])]
|
||||
|
||||
pivot_wav = Stokes[0].header['photplam']
|
||||
wcs = WCS(Stokes[0]).deepcopy()
|
||||
|
||||
#Compute SNR and apply cuts
|
||||
pol.data[pol.data == 0.] = np.nan
|
||||
SNRp = pol.data/pol_err.data
|
||||
pol.data[SNRp < SNRp_cut] = np.nan
|
||||
SNRi = stkI.data/np.sqrt(stk_cov.data[0,0])
|
||||
pol.data[SNRi < SNRi_cut] = np.nan
|
||||
|
||||
mask = (SNRp < SNRp_cut) * (SNRi < SNRi_cut)
|
||||
|
||||
# Look for pixel of max polarization
|
||||
if np.isfinite(pol.data).any():
|
||||
p_max = np.max(pol.data[np.isfinite(pol.data)])
|
||||
x_max, y_max = np.unravel_index(np.argmax(pol.data==p_max),pol.data.shape)
|
||||
else:
|
||||
print("No pixel with polarization information above requested SNR.")
|
||||
|
||||
#Plot the map
|
||||
fig = plt.figure(figsize=(15,15))
|
||||
ax = fig.add_subplot(111, projection=wcs)
|
||||
ax.set_facecolor('k')
|
||||
fig.subplots_adjust(hspace=0, wspace=0, right=0.95)
|
||||
cbar_ax = fig.add_axes([0.98, 0.12, 0.01, 0.75])
|
||||
|
||||
if display is None:
|
||||
# If no display selected, show intensity map
|
||||
vmin, vmax = 0., np.max(stkI.data[stkI.data > 0.])
|
||||
im = ax.imshow(stkI.data,extent=[-stkI.data.shape[1]/2.,stkI.data.shape[1]/2.,-stkI.data.shape[0]/2.,stkI.data.shape[0]/2.], vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno', alpha=1.)
|
||||
cbar = plt.colorbar(im, cax=cbar_ax, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
|
||||
elif display.lower() in ['p','pol','pol_deg']:
|
||||
# Display polarization degree map
|
||||
vmin, vmax = 0., 100.
|
||||
im = ax.imshow(pol.data,extent=[-pol.data.shape[1]/2.,pol.data.shape[1]/2.,-pol.data.shape[0]/2.,pol.data.shape[0]/2.], vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno', alpha=1.)
|
||||
cbar = plt.colorbar(im, cax=cbar_ax, label=r"$P$ [%]")
|
||||
elif display.lower() in ['s_p','pol_err','pol_deg_err']:
|
||||
# Display polarization degree error map
|
||||
vmin, vmax = 0., 5.
|
||||
im = ax.imshow(pol_err.data,extent=[-pol_err.data.shape[1]/2.,pol_err.data.shape[1]/2.,-pol_err.data.shape[0]/2.,pol_err.data.shape[0]/2.], vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno', alpha=1.)
|
||||
cbar = plt.colorbar(im, cax=cbar_ax, label=r"$\sigma_P$ [%]")
|
||||
else:
|
||||
# Defaults to intensity map
|
||||
vmin, vmax = 0., np.max(stkI.data[stkI.data > 0.])
|
||||
im = ax.imshow(stkI.data,extent=[-stkI.data.shape[1]/2.,stkI.data.shape[1]/2.,-stkI.data.shape[0]/2.,stkI.data.shape[0]/2.], vmin=vmin, vmax=vmax, aspect='auto', cmap='inferno', alpha=1.)
|
||||
cbar = plt.colorbar(im, cax=cbar_ax, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA$]")
|
||||
|
||||
px_size = wcs.wcs.get_cdelt()[0]
|
||||
px_sc = AnchoredSizeBar(ax.transData, 1./px_size, '1 arcsec', 3, pad=0.5, sep=5, borderpad=0.5, frameon=False, size_vertical=0.005, color='w')
|
||||
ax.add_artist(px_sc)
|
||||
|
||||
X, Y = np.meshgrid(np.linspace(-stkI.data.shape[0]/2.,stkI.data.shape[0]/2.,stkI.data.shape[0]), np.linspace(-stkI.data.shape[1]/2.,stkI.data.shape[1]/2.,stkI.data.shape[1]))
|
||||
U, V = pol.data*np.cos(np.pi/2.+pang.data*np.pi/180.), pol.data*np.sin(np.pi/2.+pang.data*np.pi/180.)
|
||||
Q = ax.quiver(X[::step_vec,::step_vec],Y[::step_vec,::step_vec],U[::step_vec,::step_vec],V[::step_vec,::step_vec],units='xy',angles='uv',scale=50.,scale_units='xy',pivot='mid',headwidth=0.,headlength=0.,headaxislength=0.,width=0.1,color='w')
|
||||
pol_sc = AnchoredSizeBar(ax.transData, 2., r"$P$= 100 %", 4, pad=0.5, sep=5, borderpad=0.5, frameon=False, size_vertical=0.005, color='w')
|
||||
ax.add_artist(pol_sc)
|
||||
|
||||
# Compute integrated parameters and associated errors
|
||||
I_int = stkI.data[mask].sum()
|
||||
Q_int = stkQ.data[mask].sum()
|
||||
U_int = stkU.data[mask].sum()
|
||||
I_int_err = np.sqrt(np.sum(stk_cov.data[0,0][mask]))
|
||||
Q_int_err = np.sqrt(np.sum(stk_cov.data[1,1][mask]))
|
||||
U_int_err = np.sqrt(np.sum(stk_cov.data[2,2][mask]))
|
||||
IQ_int_err = np.sqrt(np.sum(stk_cov.data[0,1][mask]**2))
|
||||
IU_int_err = np.sqrt(np.sum(stk_cov.data[0,2][mask]**2))
|
||||
QU_int_err = np.sqrt(np.sum(stk_cov.data[1,2][mask]**2))
|
||||
|
||||
P_int = np.sqrt(Q_int**2+U_int**2)/I_int*100.
|
||||
P_int_err = (100./I_int)*np.sqrt((Q_int**2*Q_int_err**2 + U_int**2*U_int_err**2 + 2.*Q_int*U_int*QU_int_err)/(Q_int**2 + U_int**2) + ((Q_int/I_int)**2 + (U_int/I_int)**2)*I_int_err**2 - 2.*(Q_int/I_int)*IQ_int_err - 2.*(U_int/I_int)*IU_int_err)
|
||||
|
||||
PA_int = (90./np.pi)*np.arctan2(U_int,Q_int)+90.
|
||||
PA_int_err = (90./(np.pi*(Q_int**2 + U_int**2)))*np.sqrt(U_int**2*Q_int_err**2 + Q_int**2*U_int_err**2 - 2.*Q_int*U_int*QU_int_err)
|
||||
|
||||
ax.annotate(r"$F_{{\lambda}}^{{int}}$({0:.0f} $\AA$) = {1:.1e} $\pm$ {2:.1e} $ergs \cdot cm^{{-2}} \cdot s^{{-1}} \cdot \AA^{{-1}}$".format(pivot_wav,I_int,I_int_err)+"\n"+r"$P^{{int}}$ = {0:.2f} $\pm$ {1:.2f} %".format(P_int,P_int_err)+"\n"+r"$\theta_{{P}}^{{int}}$ = {0:.2f} $\pm$ {1:.2f} °".format(PA_int,PA_int_err), color='white', fontsize=11, xy=(0.01, 0.94), xycoords='axes fraction')
|
||||
|
||||
ax.coords.grid(True, color='white', ls='dotted', alpha=0.5)
|
||||
ax.coords[0].set_axislabel('Right Ascension (J2000)')
|
||||
ax.coords[0].set_axislabel_position('t')
|
||||
ax.coords[0].set_ticklabel_position('t')
|
||||
ax.coords[1].set_axislabel('Declination (J2000)')
|
||||
ax.coords[1].set_axislabel_position('l')
|
||||
ax.coords[1].set_ticklabel_position('l')
|
||||
ax.axis('equal')
|
||||
|
||||
if not savename is None:
|
||||
fig.suptitle(savename)
|
||||
fig.savefig(plots_folder+savename+".png",bbox_inches='tight',dpi=200)
|
||||
|
||||
plt.show()
|
||||
return 0
|
||||
1501
src/lib/reduction.py
Executable file
1501
src/lib/reduction.py
Executable file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user