package-style architecture

This commit is contained in:
2024-05-24 12:15:06 +02:00
parent 4e17f40534
commit fb4849df25
18 changed files with 26 additions and 15 deletions

240
package/FOC_reduction.py Executable file
View File

@@ -0,0 +1,240 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
"""
Main script where are progressively added the steps for the FOC pipeline reduction.
"""
# Project libraries
import numpy as np
from copy import deepcopy
from os import system
from os.path import exists as path_exists
import lib.fits as proj_fits # Functions to handle fits files
import lib.reduction as proj_red # Functions used in reduction pipeline
import lib.plots as proj_plots # Functions for plotting data
from lib.utils import sci_not, princ_angle
from matplotlib.colors import LogNorm
def main(target=None, proposal_id=None, infiles=None, output_dir="./data", crop=False, interactive=False):
# Reduction parameters
# Deconvolution
deconvolve = False
if deconvolve:
# from lib.deconvolve import from_file_psf
psf = 'gaussian' # Can be user-defined as well
# psf = from_file_psf(data_folder+psf_file)
psf_FWHM = 3.1
psf_scale = 'px'
psf_shape = None # (151, 151)
iterations = 1
algo = "conjgrad"
# Initial crop
display_crop = False
# Background estimation
error_sub_type = 'freedman-diaconis' # sqrt, sturges, rice, scott, freedman-diaconis (default) or shape (example (51, 51))
subtract_error = 0.50
display_bkg = True
# Data binning
rebin = True
pxsize = 0.10
px_scale = 'arcsec' # pixel, arcsec or full
rebin_operation = 'sum' # sum or average
# Alignement
align_center = 'center' # If None will not align the images
display_align = False
display_data = False
# Smoothing
smoothing_function = 'combine' # gaussian_after, weighted_gaussian_after, gaussian, weighted_gaussian or combine
smoothing_FWHM = 0.150 # If None, no smoothing is done
smoothing_scale = 'arcsec' # pixel or arcsec
# Rotation
rotate_data = False # rotation to North convention can give erroneous results
rotate_stokes = True
# Polarization map output
SNRp_cut = 3. # P measurments with SNR>3
SNRi_cut = 3. # I measurments with SNR>30, which implies an uncertainty in P of 4.7%.
flux_lim = None # lowest and highest flux displayed on plot, defaults to bkg and maximum in cut if None
vec_scale = 5
step_vec = 1 # plot all vectors in the array. if step_vec = 2, then every other vector will be plotted if step_vec = 0 then all vectors are displayed at full length
# Pipeline start
# Step 1:
# Get data from fits files and translate to flux in erg/cm²/s/Angstrom.
if infiles is not None:
prod = np.array([["/".join(filepath.split('/')[:-1]), filepath.split('/')[-1]] for filepath in infiles], dtype=str)
obs_dir = "/".join(infiles[0].split("/")[:-1])
if not path_exists(obs_dir):
system("mkdir -p {0:s} {1:s}".format(obs_dir, obs_dir.replace("data", "plots")))
if target is None:
target = input("Target name:\n>")
else:
from lib.query import retrieve_products
target, products = retrieve_products(target, proposal_id, output_dir=output_dir)
prod = products.pop()
for prods in products:
main(target=target, infiles=["/".join(pr) for pr in prods], output_dir=output_dir, crop=crop, interactive=interactive)
data_folder = prod[0][0]
try:
plots_folder = data_folder.replace("data", "plots")
except ValueError:
plots_folder = "."
if not path_exists(plots_folder):
system("mkdir -p {0:s} ".format(plots_folder))
infiles = [p[1] for p in prod]
data_array, headers = proj_fits.get_obs_data(infiles, data_folder=data_folder, compute_flux=True)
figname = "_".join([target, "FOC"])
figtype = ""
if rebin:
if px_scale not in ['full']:
figtype = "".join(["b", "{0:.2f}".format(pxsize), px_scale]) # additionnal informations
else:
figtype = "full"
if smoothing_FWHM is not None:
figtype += "_"+"".join(["".join([s[0] for s in smoothing_function.split("_")]),
"{0:.2f}".format(smoothing_FWHM), smoothing_scale]) # additionnal informations
if deconvolve:
figtype += "_deconv"
if align_center is None:
figtype += "_not_aligned"
# Crop data to remove outside blank margins.
data_array, error_array, headers = proj_red.crop_array(data_array, headers, step=5, null_val=0.,
inside=True, display=display_crop, savename=figname, plots_folder=plots_folder)
data_mask = np.ones(data_array[0].shape, dtype=bool)
# Deconvolve data using Richardson-Lucy iterative algorithm with a gaussian PSF of given FWHM.
if deconvolve:
data_array = proj_red.deconvolve_array(data_array, headers, psf=psf, FWHM=psf_FWHM, scale=psf_scale, shape=psf_shape, iterations=iterations, algo=algo)
# Estimate error from data background, estimated from sub-image of desired sub_shape.
background = None
data_array, error_array, headers, background = proj_red.get_error(data_array, headers, error_array, data_mask=data_mask, sub_type=error_sub_type, subtract_error=subtract_error, display=display_bkg, savename="_".join([figname, "errors"]), plots_folder=plots_folder, return_background=True)
# Align and rescale images with oversampling.
data_array, error_array, headers, data_mask = proj_red.align_data(
data_array, headers, error_array=error_array, background=background, upsample_factor=10, ref_center=align_center, return_shifts=False)
if display_align:
proj_plots.plot_obs(data_array, headers, savename="_".join([figname, str(align_center)]), plots_folder=plots_folder, norm=LogNorm(
vmin=data_array[data_array > 0.].min()*headers[0]['photflam'], vmax=data_array[data_array > 0.].max()*headers[0]['photflam']))
# Rebin data to desired pixel size.
if rebin:
data_array, error_array, headers, Dxy, data_mask = proj_red.rebin_array(
data_array, error_array, headers, pxsize=pxsize, scale=px_scale, operation=rebin_operation, data_mask=data_mask)
# Rotate data to have North up
if rotate_data:
data_mask = np.ones(data_array.shape[1:]).astype(bool)
alpha = headers[0]['orientat']
data_array, error_array, data_mask, headers = proj_red.rotate_data(data_array, error_array, data_mask, headers, -alpha)
# Plot array for checking output
if display_data and px_scale.lower() not in ['full', 'integrate']:
proj_plots.plot_obs(data_array, headers, savename="_".join([figname, "rebin"]), plots_folder=plots_folder, norm=LogNorm(
vmin=data_array[data_array > 0.].min()*headers[0]['photflam'], vmax=data_array[data_array > 0.].max()*headers[0]['photflam']))
background = np.array([np.array(bkg).reshape(1, 1) for bkg in background])
background_error = np.array([np.array(np.sqrt((bkg-background[np.array([h['filtnam1'] == head['filtnam1'] for h in headers], dtype=bool)].mean())
** 2/np.sum([h['filtnam1'] == head['filtnam1'] for h in headers]))).reshape(1, 1) for bkg, head in zip(background, headers)])
# Step 2:
# Compute Stokes I, Q, U with smoothed polarized images
# SMOOTHING DISCUSSION :
# FWHM of FOC have been estimated at about 0.03" across 1500-5000 Angstrom band, which is about 2 detector pixels wide
# see Jedrzejewski, R.; Nota, A.; Hack, W. J., A Comparison Between FOC and WFPC2
# Bibcode : 1995chst.conf...10J
I_stokes, Q_stokes, U_stokes, Stokes_cov = proj_red.compute_Stokes(
data_array, error_array, data_mask, headers, FWHM=smoothing_FWHM, scale=smoothing_scale, smoothing=smoothing_function, transmitcorr=False)
I_bkg, Q_bkg, U_bkg, S_cov_bkg = proj_red.compute_Stokes(background, background_error, np.array(True).reshape(
1, 1), headers, FWHM=None, scale=smoothing_scale, smoothing=smoothing_function, transmitcorr=False)
# Step 3:
# Rotate images to have North up
if rotate_stokes:
I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, headers = proj_red.rotate_Stokes(
I_stokes, Q_stokes, U_stokes, Stokes_cov, data_mask, headers, SNRi_cut=None)
I_bkg, Q_bkg, U_bkg, S_cov_bkg, _, _ = proj_red.rotate_Stokes(I_bkg, Q_bkg, U_bkg, S_cov_bkg, np.array(True).reshape(1, 1), headers, SNRi_cut=None)
# Compute polarimetric parameters (polarisation degree and angle).
P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P = proj_red.compute_pol(I_stokes, Q_stokes, U_stokes, Stokes_cov, headers)
P_bkg, debiased_P_bkg, s_P_bkg, s_P_P_bkg, PA_bkg, s_PA_bkg, s_PA_P_bkg = proj_red.compute_pol(I_bkg, Q_bkg, U_bkg, S_cov_bkg, headers)
# Step 4:
# Save image to FITS.
figname = "_".join([figname, figtype]) if figtype != "" else figname
Stokes_test = proj_fits.save_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P,
headers, data_mask, figname, data_folder=data_folder, return_hdul=True)
# Step 5:
# crop to desired region of interest (roi)
if crop:
figname += "_crop"
stokescrop = proj_plots.crop_Stokes(deepcopy(Stokes_test), norm=LogNorm())
stokescrop.crop()
stokescrop.write_to("/".join([data_folder, figname+".fits"]))
Stokes_test, headers = stokescrop.hdul_crop, [dataset.header for dataset in stokescrop.hdul_crop]
data_mask = Stokes_test['data_mask'].data.astype(bool)
print("F_int({0:.0f} Angs) = ({1} ± {2})e{3} ergs.cm^-2.s^-1.Angs^-1".format(headers[0]['photplam'], *sci_not(
Stokes_test[0].data[data_mask].sum()*headers[0]['photflam'], np.sqrt(Stokes_test[3].data[0, 0][data_mask].sum())*headers[0]['photflam'], 2, out=int)))
print("P_int = {0:.1f} ± {1:.1f} %".format(headers[0]['p_int']*100., np.ceil(headers[0]['p_int_err']*1000.)/10.))
print("PA_int = {0:.1f} ± {1:.1f} °".format(princ_angle(headers[0]['pa_int']), princ_angle(np.ceil(headers[0]['pa_int_err']*10.)/10.)))
# Background values
print("F_bkg({0:.0f} Angs) = ({1} ± {2})e{3} ergs.cm^-2.s^-1.Angs^-1".format(headers[0]['photplam'], *sci_not(
I_bkg[0, 0]*headers[0]['photflam'], np.sqrt(S_cov_bkg[0, 0][0, 0])*headers[0]['photflam'], 2, out=int)))
print("P_bkg = {0:.1f} ± {1:.1f} %".format(debiased_P_bkg[0, 0]*100., np.ceil(s_P_bkg[0, 0]*1000.)/10.))
print("PA_bkg = {0:.1f} ± {1:.1f} °".format(princ_angle(PA_bkg[0, 0]), princ_angle(np.ceil(s_PA_bkg[0, 0]*10.)/10.)))
# Plot polarisation map (Background is either total Flux, Polarization degree or Polarization degree error).
if px_scale.lower() not in ['full', 'integrate'] and not interactive:
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim,
step_vec=step_vec, vec_scale=vec_scale, savename="_".join([figname]), plots_folder=plots_folder)
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim, step_vec=step_vec,
vec_scale=vec_scale, savename="_".join([figname, "I"]), plots_folder=plots_folder, display='Intensity')
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim, step_vec=step_vec,
vec_scale=vec_scale, savename="_".join([figname, "P_flux"]), plots_folder=plots_folder, display='Pol_Flux')
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim, step_vec=step_vec,
vec_scale=vec_scale, savename="_".join([figname, "P"]), plots_folder=plots_folder, display='Pol_deg')
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim, step_vec=step_vec,
vec_scale=vec_scale, savename="_".join([figname, "PA"]), plots_folder=plots_folder, display='Pol_ang')
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim, step_vec=step_vec,
vec_scale=vec_scale, savename="_".join([figname, "I_err"]), plots_folder=plots_folder, display='I_err')
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim, step_vec=step_vec,
vec_scale=vec_scale, savename="_".join([figname, "P_err"]), plots_folder=plots_folder, display='Pol_deg_err')
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim, step_vec=step_vec,
vec_scale=vec_scale, savename="_".join([figname, "SNRi"]), plots_folder=plots_folder, display='SNRi')
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim, step_vec=step_vec,
vec_scale=vec_scale, savename="_".join([figname, "SNRp"]), plots_folder=plots_folder, display='SNRp')
elif not interactive:
proj_plots.polarisation_map(deepcopy(Stokes_test), data_mask, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut,
savename=figname, plots_folder=plots_folder, display='integrate')
elif px_scale.lower() not in ['full', 'integrate']:
proj_plots.pol_map(Stokes_test, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim)
return 0
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Query MAST for target products')
parser.add_argument('-t', '--target', metavar='targetname', required=False, help='the name of the target', type=str, default=None)
parser.add_argument('-p', '--proposal_id', metavar='proposal_id', required=False, help='the proposal id of the data products', type=int, default=None)
parser.add_argument('-f', '--files', metavar='path', required=False, nargs='*', help='the full or relative path to the data products', default=None)
parser.add_argument('-o', '--output_dir', metavar='directory_path', required=False,
help='output directory path for the data products', type=str, default="./data")
parser.add_argument('-c', '--crop', action='store_true', required=False, help='whether to crop the analysis region')
parser.add_argument('-i', '--interactive', action='store_true', required=False, help='whether to output to the interactive analysis tool')
args = parser.parse_args()
exitcode = main(target=args.target, proposal_id=args.proposal_id, infiles=args.files,
output_dir=args.output_dir, crop=args.crop, interactive=args.interactive)
print("Finished with ExitCode: ", exitcode)

2
package/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from . import lib
from . import src

9
package/lib/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
from . import background
from . import convex_hull
from . import cross_correlation
from . import deconvolve
from . import fits
from . import plots
from . import query
from . import reduction
from . import utils

455
package/lib/background.py Executable file
View File

@@ -0,0 +1,455 @@
"""
Library function for background estimation steps of the reduction pipeline.
prototypes :
- display_bkg(data, background, std_bkg, headers, histograms, binning, rectangle, savename, plots_folder)
Display and save how the background noise is computed.
- bkg_hist(data, error, mask, headers, sub_shape, display, savename, plots_folder) -> n_data_array, n_error_array, headers, background)
Compute the error (noise) of the input array by computing the base mode of each image.
- bkg_mini(data, error, mask, headers, sub_shape, display, savename, plots_folder) -> n_data_array, n_error_array, headers, background)
Compute the error (noise) of the input array by looking at the sub-region of minimal flux in every image and of shape sub_shape.
"""
from os.path import join as path_join
from copy import deepcopy
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.colors import LogNorm
from matplotlib.patches import Rectangle
from datetime import datetime
from .plots import plot_obs
from scipy.optimize import curve_fit
def gauss(x, *p):
N, mu, sigma = p
return N*np.exp(-(x-mu)**2/(2.*sigma**2))
def gausspol(x, *p):
N, mu, sigma, a, b, c, d = p
return N*np.exp(-(x-mu)**2/(2.*sigma**2)) + a*np.log(x) + b/x + c*x + d
def bin_centers(edges):
return (edges[1:]+edges[:-1])/2.
def display_bkg(data, background, std_bkg, headers, histograms=None, binning=None, coeff=None, rectangle=None, savename=None, plots_folder="./"):
plt.rcParams.update({'font.size': 15})
convert_flux = np.array([head['photflam'] for head in headers])
date_time = np.array([headers[i]['date-obs']+';'+headers[i]['time-obs']
for i in range(len(headers))])
date_time = np.array([datetime.strptime(d, '%Y-%m-%d;%H:%M:%S')
for d in date_time])
filt = np.array([headers[i]['filtnam1'] for i in range(len(headers))])
dict_filt = {"POL0": 'r', "POL60": 'g', "POL120": 'b'}
c_filt = np.array([dict_filt[f] for f in filt])
fig, ax = plt.subplots(figsize=(10, 6), constrained_layout=True)
for f in np.unique(filt):
mask = [fil == f for fil in filt]
ax.scatter(date_time[mask], background[mask]*convert_flux[mask],
color=dict_filt[f], label="{0:s}".format(f))
ax.errorbar(date_time, background*convert_flux,
yerr=std_bkg*convert_flux, fmt='+k',
markersize=0, ecolor=c_filt)
# Date handling
locator = mdates.AutoDateLocator()
formatter = mdates.ConciseDateFormatter(locator)
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(formatter)
ax.set_ylim(bottom=0.)
ax.set_xlabel("Observation date and time")
ax.set_ylabel(r"Flux [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
plt.legend()
if not (savename is None):
this_savename = deepcopy(savename)
if not savename[-4:] in ['.png', '.jpg', '.pdf']:
this_savename += '_background_flux.pdf'
else:
this_savename = savename[:-4]+"_background_flux"+savename[-4:]
fig.savefig(path_join(plots_folder, this_savename), bbox_inches='tight')
if not (histograms is None):
filt_obs = {"POL0": 0, "POL60": 0, "POL120": 0}
fig_h, ax_h = plt.subplots(figsize=(10, 6), constrained_layout=True)
for i, (hist, bins) in enumerate(zip(histograms, binning)):
filt_obs[headers[i]['filtnam1']] += 1
ax_h.plot(bins*convert_flux[i], hist, '+', color="C{0:d}".format(i), alpha=0.8,
label=headers[i]['filtnam1']+' (Obs '+str(filt_obs[headers[i]['filtnam1']])+')')
ax_h.plot([background[i]*convert_flux[i], background[i]*convert_flux[i]], [hist.min(), hist.max()], 'x--', color="C{0:d}".format(i), alpha=0.8)
if not (coeff is None):
# ax_h.plot(bins*convert_flux[i], gausspol(bins, *coeff[i]), '--', color="C{0:d}".format(i), alpha=0.8)
ax_h.plot(bins*convert_flux[i], gauss(bins, *coeff[i]), '--', color="C{0:d}".format(i), alpha=0.8)
ax_h.set_xscale('log')
ax_h.set_ylim([0., np.max([hist.max() for hist in histograms])])
ax_h.set_xlim([np.min(background*convert_flux)*1e-2, np.max(background*convert_flux)*1e2])
ax_h.set_xlabel(r"Flux [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
ax_h.set_ylabel(r"Number of pixels in bin")
ax_h.set_title("Histogram for each observation")
plt.legend()
if not (savename is None):
this_savename = deepcopy(savename)
if not savename[-4:] in ['.png', '.jpg', '.pdf']:
this_savename += '_histograms.pdf'
else:
this_savename = savename[:-4]+"_histograms"+savename[-4:]
fig_h.savefig(path_join(plots_folder, this_savename), bbox_inches='tight')
fig2, ax2 = plt.subplots(figsize=(10, 10))
data0 = data[0]*convert_flux[0]
bkg_data0 = data0 <= background[0]*convert_flux[0]
instr = headers[0]['instrume']
rootname = headers[0]['rootname']
exptime = headers[0]['exptime']
filt = headers[0]['filtnam1']
# plots
im2 = ax2.imshow(data0, norm=LogNorm(data0[data0 > 0.].mean()/10., data0.max()), origin='lower', cmap='gray')
ax2.imshow(bkg_data0, origin='lower', cmap='Reds', alpha=0.5)
if not (rectangle is None):
x, y, width, height, angle, color = rectangle[0]
ax2.add_patch(Rectangle((x, y), width, height, edgecolor=color, fill=False, lw=2))
ax2.annotate(instr+":"+rootname, color='white', fontsize=10, xy=(0.01, 1.00), xycoords='axes fraction', verticalalignment='top', horizontalalignment='left')
ax2.annotate(filt, color='white', fontsize=14, xy=(0.01, 0.01), xycoords='axes fraction', verticalalignment='bottom', horizontalalignment='left')
ax2.annotate(str(exptime)+" s", color='white', fontsize=10, xy=(1.00, 0.01),
xycoords='axes fraction', verticalalignment='bottom', horizontalalignment='right')
ax2.set(xlabel='pixel offset', ylabel='pixel offset', aspect='equal')
fig2.subplots_adjust(hspace=0, wspace=0, right=1.0)
fig2.colorbar(im2, ax=ax2, location='right', aspect=50, pad=0.025, label=r"Flux [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
if not (savename is None):
this_savename = deepcopy(savename)
if not savename[-4:] in ['.png', '.jpg', '.pdf']:
this_savename += '_'+filt+'_background_location.pdf'
else:
this_savename = savename[:-4]+'_'+filt+'_background_location'+savename[-4:]
fig2.savefig(path_join(plots_folder, this_savename), bbox_inches='tight')
if not (rectangle is None):
plot_obs(data, headers, vmin=data[data > 0.].min()*convert_flux.mean(), vmax=data[data > 0.].max()*convert_flux.mean(), rectangle=rectangle,
savename=savename+"_background_location", plots_folder=plots_folder)
elif not (rectangle is None):
plot_obs(data, headers, vmin=data[data > 0.].min(), vmax=data[data > 0.].max(), rectangle=rectangle)
plt.show()
def sky_part(img):
rand_ind = np.unique((np.random.rand(np.floor(img.size/4).astype(int))*2*img.size).astype(int) % img.size)
rand_pix = img.flatten()[rand_ind]
# Intensity range
sky_med = np.median(rand_pix)
sig = np.min([img[img < sky_med].std(), img[img > sky_med].std()])
sky_range = [sky_med-2.*sig, np.max([sky_med+sig, 7e-4])] # Detector background average FOC Data Handbook Sec. 7.6
sky = img[np.logical_and(img >= sky_range[0], img <= sky_range[1])]
return sky, sky_range
def bkg_estimate(img, bins=None, chi2=None, coeff=None):
if bins is None or chi2 is None or coeff is None:
bins, chi2, coeff = [8], [], []
else:
try:
bins.append(int(3./2.*bins[-1]))
except IndexError:
bins, chi2, coeff = [8], [], []
hist, bin_edges = np.histogram(img[img > 0], bins=bins[-1])
binning = bin_centers(bin_edges)
peak = binning[np.argmax(hist)]
bins_stdev = binning[hist > hist.max()/2.]
stdev = bins_stdev[-1]-bins_stdev[0]
# p0 = [hist.max(), peak, stdev, 1e-3, 1e-3, 1e-3, 1e-3]
p0 = [hist.max(), peak, stdev]
try:
# popt, pcov = curve_fit(gausspol, binning, hist, p0=p0)
popt, pcov = curve_fit(gauss, binning, hist, p0=p0)
except RuntimeError:
popt = p0
# chi2.append(np.sum((hist - gausspol(binning, *popt))**2)/hist.size)
chi2.append(np.sum((hist - gauss(binning, *popt))**2)/hist.size)
coeff.append(popt)
return bins, chi2, coeff
def bkg_fit(data, error, mask, headers, subtract_error=True, display=False, savename=None, plots_folder=""):
"""
----------
Inputs:
data : numpy.ndarray
Array containing the data to study (2D float arrays).
error : numpy.ndarray
Array of images (2D floats, aligned and of the same shape) containing
the error in each pixel of the observation images in data_array.
mask : numpy.ndarray
2D boolean array delimiting the data to work on.
headers : header list
Headers associated with the images in data_array.
subtract_error : float or bool, optional
If float, factor to which the estimated background should be multiplied
If False the background is not subtracted.
Defaults to True (factor = 1.).
display : boolean, optional
If True, data_array will be displayed with a rectangle around the
sub-image selected for background computation.
Defaults to False.
savename : str, optional
Name of the figure the map should be saved to. If None, the map won't
be saved (only displayed). Only used if display is True.
Defaults to None.CNRS-Unistra Labo ObsAstroS
plots_folder : str, optional
Relative (or absolute) filepath to the folder in wich the map will
be saved. Not used if savename is None.
Defaults to current folder.
----------
Returns:
data_array : numpy.ndarray
Array containing the data to study minus the background.
headers : header list
Updated headers associated with the images in data_array.
error_array : numpy.ndarray
Array containing the background values associated to the images in
data_array.
background : numpy.ndarray
Array containing the pixel background value for each image in
data_array.
"""
n_data_array, n_error_array = deepcopy(data), deepcopy(error)
error_bkg = np.ones(n_data_array.shape)
std_bkg = np.zeros((data.shape[0]))
background = np.zeros((data.shape[0]))
histograms, binning = [], []
for i, image in enumerate(data):
# Compute the Count-rate histogram for the image
sky, sky_range = sky_part(image[image > 0.])
bins, chi2, coeff = bkg_estimate(sky)
while bins[-1] < 256:
bins, chi2, coeff = bkg_estimate(sky, bins, chi2, coeff)
hist, bin_edges = np.histogram(sky, bins=bins[-1])
histograms.append(hist)
binning.append(bin_centers(bin_edges))
chi2, coeff = np.array(chi2), np.array(coeff)
weights = 1/chi2**2
weights /= weights.sum()
bkg = np.sum(weights*(coeff[:, 1]+np.abs(coeff[:, 2])*subtract_error))
error_bkg[i] *= bkg
n_error_array[i] = np.sqrt(n_error_array[i]**2 + error_bkg[i]**2)
# Substract background
if subtract_error > 0:
n_data_array[i][mask] = n_data_array[i][mask] - bkg
n_data_array[i][np.logical_and(mask, n_data_array[i] <= 1e-3*bkg)] = 1e-3*bkg
std_bkg[i] = image[np.abs(image-bkg)/bkg < 1.].std()
background[i] = bkg
if display:
display_bkg(data, background, std_bkg, headers, histograms=histograms, binning=binning, coeff=coeff, savename=savename, plots_folder=plots_folder)
return n_data_array, n_error_array, headers, background
def bkg_hist(data, error, mask, headers, sub_type=None, subtract_error=True, display=False, savename=None, plots_folder=""):
"""
----------
Inputs:
data : numpy.ndarray
Array containing the data to study (2D float arrays).
error : numpy.ndarray
Array of images (2D floats, aligned and of the same shape) containing
the error in each pixel of the observation images in data_array.
mask : numpy.ndarray
2D boolean array delimiting the data to work on.
headers : header list
Headers associated with the images in data_array.
sub_type : str or int, optional
If str, statistic rule to be used for the number of bins in counts/s.
If int, number of bins for the counts/s histogram.
Defaults to "Freedman-Diaconis".
subtract_error : float or bool, optional
If float, factor to which the estimated background should be multiplied
If False the background is not subtracted.
Defaults to True (factor = 1.).
display : boolean, optional
If True, data_array will be displayed with a rectangle around the
sub-image selected for background computation.
Defaults to False.
savename : str, optional
Name of the figure the map should be saved to. If None, the map won't
be saved (only displayed). Only used if display is True.
Defaults to None.
plots_folder : str, optional
Relative (or absolute) filepath to the folder in wich the map will
be saved. Not used if savename is None.
Defaults to current folder.
----------
Returns:
data_array : numpy.ndarray
Array containing the data to study minus the background.
headers : header list
Updated headers associated with the images in data_array.
error_array : numpy.ndarray
Array containing the background values associated to the images in
data_array.
background : numpy.ndarray
Array containing the pixel background value for each image in
data_array.
"""
n_data_array, n_error_array = deepcopy(data), deepcopy(error)
error_bkg = np.ones(n_data_array.shape)
std_bkg = np.zeros((data.shape[0]))
background = np.zeros((data.shape[0]))
histograms, binning, coeff = [], [], []
for i, image in enumerate(data):
# Compute the Count-rate histogram for the image
n_mask = np.logical_and(mask, image > 0.)
if not (sub_type is None):
if isinstance(sub_type, int):
n_bins = sub_type
elif sub_type.lower() in ['sqrt']:
n_bins = np.fix(np.sqrt(image[n_mask].size)).astype(int) # Square-root
elif sub_type.lower() in ['sturges']:
n_bins = np.ceil(np.log2(image[n_mask].size)).astype(int)+1 # Sturges
elif sub_type.lower() in ['rice']:
n_bins = 2*np.fix(np.power(image[n_mask].size, 1/3)).astype(int) # Rice
elif sub_type.lower() in ['scott']:
n_bins = np.fix((image[n_mask].max()-image[n_mask].min())/(3.5*image[n_mask].std()/np.power(image[n_mask].size, 1/3))).astype(int) # Scott
else:
n_bins = np.fix((image[n_mask].max()-image[n_mask].min())/(2*np.subtract(*np.percentile(image[n_mask], [75, 25])) /
np.power(image[n_mask].size, 1/3))).astype(int) # Freedman-Diaconis
else:
n_bins = np.fix((image[n_mask].max()-image[n_mask].min())/(2*np.subtract(*np.percentile(image[n_mask], [75, 25])) /
np.power(image[n_mask].size, 1/3))).astype(int) # Freedman-Diaconis
hist, bin_edges = np.histogram(np.log(image[n_mask]), bins=n_bins)
histograms.append(hist)
binning.append(np.exp(bin_centers(bin_edges)))
# Fit a gaussian to the log-intensity histogram
bins_stdev = binning[-1][hist > hist.max()/2.]
stdev = bins_stdev[-1]-bins_stdev[0]
# p0 = [hist.max(), binning[-1][np.argmax(hist)], stdev, 1e-3, 1e-3, 1e-3, 1e-3]
p0 = [hist.max(), binning[-1][np.argmax(hist)], stdev]
# popt, pcov = curve_fit(gausspol, binning[-1], hist, p0=p0)
popt, pcov = curve_fit(gauss, binning[-1], hist, p0=p0)
coeff.append(popt)
bkg = popt[1]+np.abs(popt[2])*subtract_error
error_bkg[i] *= bkg
n_error_array[i] = np.sqrt(n_error_array[i]**2 + error_bkg[i]**2)
# Substract background
if subtract_error > 0:
n_data_array[i][mask] = n_data_array[i][mask] - bkg
n_data_array[i][np.logical_and(mask, n_data_array[i] <= 1e-3*bkg)] = 1e-3*bkg
std_bkg[i] = image[np.abs(image-bkg)/bkg < 1.].std()
background[i] = bkg
if display:
display_bkg(data, background, std_bkg, headers, histograms=histograms, binning=binning, coeff=coeff, savename=savename, plots_folder=plots_folder)
return n_data_array, n_error_array, headers, background
def bkg_mini(data, error, mask, headers, sub_shape=(15, 15), subtract_error=True, display=False, savename=None, plots_folder=""):
"""
Look for sub-image of shape sub_shape that have the smallest integrated
flux (no source assumption) and define the background on the image by the
standard deviation on this sub-image.
----------
Inputs:
data : numpy.ndarray
Array containing the data to study (2D float arrays).
error : numpy.ndarray
Array of images (2D floats, aligned and of the same shape) containing
the error in each pixel of the observation images in data_array.
mask : numpy.ndarray
2D boolean array delimiting the data to work on.
headers : header list
Headers associated with the images in data_array.
sub_shape : tuple, optional
Shape of the sub-image to look for. Must be odd.
Defaults to 10% of input array.
subtract_error : float or bool, optional
If float, factor to which the estimated background should be multiplied
If False the background is not subtracted.
Defaults to True (factor = 1.).
display : boolean, optional
If True, data_array will be displayed with a rectangle around the
sub-image selected for background computation.
Defaults to False.
savename : str, optional
Name of the figure the map should be saved to. If None, the map won't
be saved (only displayed). Only used if display is True.
Defaults to None.
plots_folder : str, optional
Relative (or absolute) filepath to the folder in wich the map will
be saved. Not used if savename is None.
Defaults to current folder.
----------
Returns:
data_array : numpy.ndarray
Array containing the data to study minus the background.
headers : header list
Updated headers associated with the images in data_array.
error_array : numpy.ndarray
Array containing the background values associated to the images in
data_array.
background : numpy.ndarray
Array containing the pixel background value for each image in
data_array.
"""
sub_shape = np.array(sub_shape)
# Make sub_shape of odd values
if not (np.all(sub_shape % 2)):
sub_shape += 1-sub_shape % 2
shape = np.array(data.shape)
diff = (sub_shape-1).astype(int)
temp = np.zeros((shape[0], shape[1]-diff[0], shape[2]-diff[1]))
n_data_array, n_error_array = deepcopy(data), deepcopy(error)
error_bkg = np.ones(n_data_array.shape)
std_bkg = np.zeros((data.shape[0]))
background = np.zeros((data.shape[0]))
rectangle = []
for i, image in enumerate(data):
# Find the sub-image of smallest integrated flux (suppose no source)
# sub-image dominated by background
fmax = np.finfo(np.double).max
img = deepcopy(image)
img[1-mask] = fmax/(diff[0]*diff[1])
for r in range(temp.shape[1]):
for c in range(temp.shape[2]):
temp[i][r, c] = np.where(mask[r, c], img[r:r+diff[0], c:c+diff[1]].sum(), fmax/(diff[0]*diff[1]))
minima = np.unravel_index(np.argmin(temp.sum(axis=0)), temp.shape[1:])
for i, image in enumerate(data):
rectangle.append([minima[1], minima[0], sub_shape[1], sub_shape[0], 0., 'r'])
# Compute error : root mean square of the background
sub_image = image[minima[0]:minima[0]+sub_shape[0], minima[1]:minima[1]+sub_shape[1]]
# bkg = np.std(sub_image) # Previously computed using standard deviation over the background
bkg = np.sqrt(np.sum(sub_image**2)/sub_image.size)*subtract_error if subtract_error > 0 else np.sqrt(np.sum(sub_image**2)/sub_image.size)
error_bkg[i] *= bkg
n_error_array[i] = np.sqrt(n_error_array[i]**2 + error_bkg[i]**2)
# Substract background
if subtract_error > 0.:
n_data_array[i][mask] = n_data_array[i][mask] - bkg
n_data_array[i][np.logical_and(mask, n_data_array[i] <= 1e-3*bkg)] = 1e-3*bkg
std_bkg[i] = image[np.abs(image-bkg)/bkg < 1.].std()
background[i] = bkg
if display:
display_bkg(data, background, std_bkg, headers, rectangle=rectangle, savename=savename, plots_folder=plots_folder)
return n_data_array, n_error_array, headers, background

368
package/lib/convex_hull.py Executable file
View File

@@ -0,0 +1,368 @@
"""
Library functions for graham algorithm implementation (find the convex hull of a given list of points).
"""
from copy import deepcopy
import numpy as np
def clean_ROI(image):
"""
Remove instruments borders from an observation.
"""
H, J = [], []
shape = np.array(image.shape)
row, col = np.indices(shape)
for i in range(0, shape[0]):
r = row[i, :][image[i, :] > 0.]
c = col[i, :][image[i, :] > 0.]
if len(r) > 1 and len(c) > 1:
H.append((r[0], c[0]))
H.append((r[-1], c[-1]))
H = np.array(H)
for j in range(0, shape[1]):
r = row[:, j][image[:, j] > 0.]
c = col[:, j][image[:, j] > 0.]
if len(r) > 1 and len(c) > 1:
J.append((r[0], c[0]))
J.append((r[-1], c[-1]))
J = np.array(J)
xmin = np.min([H[:, 1].min(), J[:, 1].min()])
xmax = np.max([H[:, 1].max(), J[:, 1].max()])+1
ymin = np.min([H[:, 0].min(), J[:, 0].min()])
ymax = np.max([H[:, 0].max(), J[:, 0].max()])+1
return np.array([xmin, xmax, ymin, ymax])
# Define angle and vectors operations
def vector(A, B):
"""
Define a vector from the inputed points A, B.
"""
return (B[0] - A[0], B[1] - A[1])
def determinant(u, v):
"""
Compute the determinant of 2 vectors.
--------
Inputs:
u, v : vectors (list of length 2)
Vectors for which the determinant must be computed.
----------
Returns:
determinant : real
Determinant of the input vectors.
"""
return u[0] * v[1] - u[1] * v[0]
def pseudo_angle(A, B, C):
"""
Define a pseudo-angle from the inputed points A, B, C.
"""
u = vector(A, B)
v = vector(A, C)
return determinant(u, v)
def distance(A, B):
"""
Compute the euclidian distance betwwen 2 points of the plan.
----------
Inputs:
A, B : points (list of length 2)
Points between which the distance must be computed.
----------
Returns:
distance : real
Euclidian distance between A, B.
"""
x, y = vector(A, B)
return np.sqrt(x ** 2 + y ** 2)
# Define lexicographic and composition order
def lexico(A, B):
"""
Define the lexicographic order between points A, B.
A <= B if:
- y_a < y_b
or - y_a = y_b and x_a < x_b
----------
Inputs:
A, B : points (list of length 2)
Points compared for the lexicographic order
----------
Returns:
lexico : boolean
True if A<=B, False otherwise.
"""
return (A[1] < B[1]) or (A[1] == B[1] and A[0] <= B[0])
def min_lexico(s):
"""
Return the minimum of a list of points for the lexicographic order.
----------
Inputs:
s : list of points
List of points in which we look for the minimum for the lexicographic
order
----------
Returns:
m : point (list of length 2)
Minimum for the lexicographic order in the input list s.
"""
m = s[0]
for x in s:
if lexico(x, m):
m = x
return m
def comp(Omega, A, B):
"""
Test the order relation between points such that A <= B :
This is True if :
- (Omega-A,Omega-B) is a right-hand basis of the plan.
or - (Omega-A,Omega-B) are linearly dependent.
----------
Inputs:
Omega, A, B : points (list of length 2)
Points compared for the composition order
----------
Returns:
comp : boolean
True if A <= B, False otherwise.
"""
t = pseudo_angle(Omega, A, B)
if t > 0:
return True
elif t < 0:
return False
else:
return distance(Omega, A) <= distance(Omega, B)
# Implement quicksort
def partition(s, left, right, order):
"""
Take a random element of a list 's' between indexes 'left', 'right' and place it
at its right spot using relation order 'order'. Return the index at which
it was placed.
----------
Inputs:
s : list
List of elements to be ordered.
left, right : int
Index of the first and last elements to be considered.
order : func: A, B -> bool
Relation order between 2 elements A, B that returns True if A<=B,
False otherwise.
----------
Returns:
index : int
Index at which have been placed the element chosen by the function.
"""
i = left - 1
for j in range(left, right):
if order(s[j], s[right]):
i = i + 1
temp = deepcopy(s[i])
s[i] = deepcopy(s[j])
s[j] = deepcopy(temp)
temp = deepcopy(s[i+1])
s[i+1] = deepcopy(s[right])
s[right] = deepcopy(temp)
return i + 1
def sort_aux(s, left, right, order):
"""
Sort a list 's' between indexes 'left', 'right' using relation order 'order' by
dividing it in 2 sub-lists and sorting these.
"""
if left <= right:
# Call partition function that gives an index on which the list will be divided
q = partition(s, left, right, order)
sort_aux(s, left, q - 1, order)
sort_aux(s, q + 1, right, order)
def quicksort(s, order):
"""
Implement the quicksort algorithm on the list 's' using the relation order
'order'.
"""
# Call auxiliary sort on whole list.
sort_aux(s, 0, len(s) - 1, order)
def sort_angles_distances(Omega, s):
"""
Sort the list of points 's' for the composition order given reference point
Omega.
"""
def order(A, B): return comp(Omega, A, B)
quicksort(s, order)
# Define fuction for stacks (use here python lists with stack operations).
def empty_stack(): return []
def stack(S, A): S.append(A)
def unstack(S): S.pop()
def stack_top(S): return S[-1]
def stack_sub_top(S): return S[-2]
# Alignement handling
def del_aux(s, k, Omega):
"""
Returne the index of the first element in 's' after the index 'k' that is
not linearly dependent of (Omega-s[k]), first element not aligned to Omega
and s[k].
----------
Inputs:
s : list
List of points
k : int
Index from which to look for aligned points in s.
Omega : point (list of length 2)
Reference point in 's' to test alignement.
----------
Returns:
index : int
Index of the first element not aligned to Omega and s[k].
"""
p = s[k]
j = k + 1
while (j < len(s)) and (pseudo_angle(Omega, p, s[j]) == 0):
j = j + 1
return j
def del_aligned(s, Omega):
"""
Deprive a list of points 's' from all intermediary points that are aligned
using Omega as a reference point.
----------
Inputs:
s : list
List of points
Omega : point (list of length 2)
Point of reference to test alignement.
-----------
Returns:
s1 : list
List of points where no points are aligned using reference point Omega.
"""
s1 = [s[0]]
n = len(s)
k = 1
while k < n:
# Get the index of the last point aligned with (Omega-s[k])
k = del_aux(s, k, Omega)
s1.append(s[k - 1])
return s1
# Implement Graham algorithm
def convex_hull(H):
"""
Implement Graham algorithm to find the convex hull of a given list of
points, handling aligned points.
----------
Inputs:
A : list
List of points for which the the convex hull must be returned.
----------
S : list
List of points defining the convex hull of the input list A.
"""
S = empty_stack()
Omega = min_lexico(H)
sort_angles_distances(Omega, H)
A = del_aligned(H, Omega)
if len(A) < 3:
return H
else:
stack(S, A[0])
stack(S, A[1])
stack(S, A[2])
for i in range(3, len(A)):
while pseudo_angle(stack_sub_top(S), stack_top(S), A[i]) <= 0:
unstack(S)
stack(S, A[i])
return S
def image_hull(image, step=5, null_val=0., inside=True):
"""
Compute the convex hull of a 2D image and return the 4 relevant coordinates
of the maximum included rectangle (ie. crop image to maximum rectangle).
----------
Inputs:
image : numpy.ndarray
2D array of floats corresponding to an image in intensity that need to
be cropped down.
step : int, optional
For images with straight edges, not all lines and columns need to be
browsed in order to have a good convex hull. Step value determine
how many row/columns can be jumped. With step=2 every other line will
be browsed.
Defaults to 5.
null_val : float, optional
Pixel value determining the threshold for what is considered 'outside'
the image. All border pixels below this value will be taken out.
Defaults to 0.
inside : boolean, optional
If True, the cropped image will be the maximum rectangle included
inside the image. If False, the cropped image will be the minimum
rectangle in which the whole image is included.
Defaults to True.
----------
Returns:
vertex : numpy.ndarray
Array containing the vertex of the maximum included rectangle.
"""
H = []
shape = np.array(image.shape)
row, col = np.indices(shape)
for i in range(0, int(min(shape)/2), step):
r1, r2 = row[i, :][image[i, :] > null_val], row[-i, :][image[-i, :] > null_val]
c1, c2 = col[i, :][image[i, :] > null_val], col[-i, :][image[-i, :] > null_val]
if r1.shape[0] > 1:
H.append((r1[0], c1[0]))
H.append((r1[-1], c1[-1]))
if r2.shape[0] > 1:
H.append((r2[0], c2[0]))
H.append((r2[-1], c2[-1]))
S = np.array(convex_hull(H))
x_min, y_min = S[:, 0] < S[:, 0].mean(), S[:, 1] < S[:, 1].mean()
x_max, y_max = S[:, 0] > S[:, 0].mean(), S[:, 1] > S[:, 1].mean()
# Get the 4 extrema
# S0 = S[x_min*y_min][np.argmin(S[x_min*y_min][:, 0])]
# S1 = S[x_min*y_max][np.argmax(S[x_min*y_max][:, 1])]
# S2 = S[x_max*y_min][np.argmin(S[x_max*y_min][:, 1])]
# S3 = S[x_max*y_max][np.argmax(S[x_max*y_max][:, 0])]
S0 = S[x_min*y_min][np.abs(0-S[x_min*y_min].sum(axis=1)).min() == np.abs(0-S[x_min*y_min].sum(axis=1))][0]
S1 = S[x_min*y_max][np.abs(shape[1]-S[x_min*y_max].sum(axis=1)).min() == np.abs(shape[1]-S[x_min*y_max].sum(axis=1))][0]
S2 = S[x_max*y_min][np.abs(shape[0]-S[x_max*y_min].sum(axis=1)).min() == np.abs(shape[0]-S[x_max*y_min].sum(axis=1))][0]
S3 = S[x_max*y_max][np.abs(shape.sum()-S[x_max*y_max].sum(axis=1)).min() == np.abs(shape.sum()-S[x_max*y_max].sum(axis=1))][0]
# Get the vertex of the biggest included rectangle
if inside:
f0 = np.max([S0[0], S1[0]])
f1 = np.min([S2[0], S3[0]])
f2 = np.max([S0[1], S2[1]])
f3 = np.min([S1[1], S3[1]])
else:
f0 = np.min([S0[0], S1[0]])
f1 = np.max([S2[0], S3[0]])
f2 = np.min([S0[1], S2[1]])
f3 = np.max([S1[1], S3[1]])
return np.array([f0, f1, f2, f3]).astype(int)

249
package/lib/cross_correlation.py Executable file
View File

@@ -0,0 +1,249 @@
"""
Library functions for phase cross-correlation computation.
"""
# Prefer FFTs via the new scipy.fft module when available (SciPy 1.4+)
# Otherwise fall back to numpy.fft.
# Like numpy 1.15+ scipy 1.3+ is also using pocketfft, but a newer
# C++/pybind11 version called pypocketfft
try:
import scipy.fft as fft
except ImportError:
import numpy.fft as fft
import numpy as np
def _upsampled_dft(data, upsampled_region_size, upsample_factor=1,
axis_offsets=None):
"""
Upsampled DFT by matrix multiplication.
This code is intended to provide the same result as if the following
operations were performed:
- Embed the array "data" in an array that is ``upsample_factor`` times
larger in each dimension. ifftshift to bring the center of the
image to (1,1).
- Take the FFT of the larger array.
- Extract an ``[upsampled_region_size]`` region of the result, starting
with the ``[axis_offsets+1]`` element.
It achieves this result by computing the DFT in the output array without
the need to zeropad. Much faster and memory efficient than the zero-padded
FFT approach if ``upsampled_region_size`` is much smaller than
``data.size * upsample_factor``.
----------
Inputs:
data : array
The input data array (DFT of original data) to upsample.
upsampled_region_size : integer or tuple of integers, optional
The size of the region to be sampled. If one integer is provided, it
is duplicated up to the dimensionality of ``data``.
upsample_factor : integer, optional
The upsampling factor. Defaults to 1.
axis_offsets : tuple of integers, optional
The offsets of the region to be sampled. Defaults to None (uses
image center)
----------
Returns:
output : ndarray
The upsampled DFT of the specified region.
"""
# if people pass in an integer, expand it to a list of equal-sized sections
if not hasattr(upsampled_region_size, "__iter__"):
upsampled_region_size = [upsampled_region_size, ] * data.ndim
else:
if len(upsampled_region_size) != data.ndim:
raise ValueError("shape of upsampled region sizes must be equal "
"to input data's number of dimensions.")
if axis_offsets is None:
axis_offsets = [0, ] * data.ndim
else:
if len(axis_offsets) != data.ndim:
raise ValueError("number of axis offsets must be equal to input "
"data's number of dimensions.")
im2pi = 1j * 2 * np.pi
dim_properties = list(zip(data.shape, upsampled_region_size, axis_offsets))
for (n_items, ups_size, ax_offset) in dim_properties[::-1]:
kernel = ((np.arange(ups_size) - ax_offset)[:, None]
* fft.fftfreq(n_items, upsample_factor))
kernel = np.exp(-im2pi * kernel)
# Equivalent to:
# data[i, j, k] = kernel[i, :] @ data[j, k].T
data = np.tensordot(kernel, data, axes=(1, -1))
return data
def _compute_phasediff(cross_correlation_max):
"""
Compute global phase difference between the two images (should be
zero if images are non-negative).
----------
Inputs:
cross_correlation_max : complex
The complex value of the cross correlation at its maximum point.
"""
return np.arctan2(cross_correlation_max.imag, cross_correlation_max.real)
def _compute_error(cross_correlation_max, src_amp, target_amp):
"""
Compute RMS error metric between ``src_image`` and ``target_image``.
----------
Inputs:
cross_correlation_max : complex
The complex value of the cross correlation at its maximum point.
src_amp : float
The normalized average image intensity of the source image
target_amp : float
The normalized average image intensity of the target image
"""
error = 1.0 - cross_correlation_max * cross_correlation_max.conj() /\
(src_amp * target_amp)
return np.sqrt(np.abs(error))
def phase_cross_correlation(reference_image, moving_image, *,
upsample_factor=1, space="real",
return_error=True, overlap_ratio=0.3):
"""
Efficient subpixel image translation registration by cross-correlation.
This code gives the same precision as the FFT upsampled cross-correlation
in a fraction of the computation time and with reduced memory requirements.
It obtains an initial estimate of the cross-correlation peak by an FFT and
then refines the shift estimation by upsampling the DFT only in a small
neighborhood of that estimate by means of a matrix-multiply DFT.
----------
Inputs:
reference_image : array
Reference image.
moving_image : array
Image to register. Must be same dimensionality as
``reference_image``.
upsample_factor : int, optional^
Upsampling factor. Images will be registered to within
``1 / upsample_factor`` of a pixel. For example
``upsample_factor == 20`` means the images will be registered
within 1/20th of a pixel. Default is 1 (no upsampling).
Not used if any of ``reference_mask`` or ``moving_mask`` is not None.
space : string, one of "real" or "fourier", optional
Defines how the algorithm interprets input data. "real" means
data will be FFT'd to compute the correlation, while "fourier"
data will bypass FFT of input data. Case insensitive.
return_error : bool, optional
Returns error and phase difference if on, otherwise only
shifts are returned.
overlap_ratio : float, optional
Minimum allowed overlap ratio between images. The correlation for
translations corresponding with an overlap ratio lower than this
threshold will be ignored. A lower `overlap_ratio` leads to smaller
maximum translation, while a higher `overlap_ratio` leads to greater
robustness against spurious matches due to small overlap between
masked images.
----------
Returns:
shifts : ndarray
Shift vector (in pixels) required to register ``moving_image``
with ``reference_image``. Axis ordering is consistent with
numpy (e.g. Z, Y, X)
error : float
Translation invariant normalized RMS error between
``reference_image`` and ``moving_image``.
phasediff : float
Global phase difference between the two images (should be
zero if images are non-negative).
----------
References:
[1] Manuel Guizar-Sicairos, Samuel T. Thurman, and James R. Fienup,
"Efficient subpixel image registration algorithms,"
Optics Letters 33, 156-158 (2008). :DOI:`10.1364/OL.33.000156`
[2] James R. Fienup, "Invariant error metrics for image reconstruction"
Optics Letters 36, 8352-8357 (1997). :DOI:`10.1364/AO.36.008352`
[3] Dirk Padfield. Masked Object Registration in the Fourier Domain.
IEEE Transactions on Image Processing, vol. 21(5),
pp. 2706-2718 (2012). :DOI:`10.1109/TIP.2011.2181402`
[4] D. Padfield. "Masked FFT registration". In Proc. Computer Vision and
Pattern Recognition, pp. 2918-2925 (2010).
:DOI:`10.1109/CVPR.2010.5540032`
"""
# images must be the same shape
if reference_image.shape != moving_image.shape:
raise ValueError("images must be same shape")
# assume complex data is already in Fourier space
if space.lower() == 'fourier':
src_freq = reference_image
target_freq = moving_image
# real data needs to be fft'd.
elif space.lower() == 'real':
src_freq = fft.fftn(reference_image)
target_freq = fft.fftn(moving_image)
else:
raise ValueError('space argument must be "real" of "fourier"')
# Whole-pixel shift - Compute cross-correlation by an IFFT
shape = src_freq.shape
image_product = src_freq * target_freq.conj()
cross_correlation = fft.ifftn(image_product)
# Locate maximum
maxima = np.unravel_index(np.argmax(np.abs(cross_correlation)),
cross_correlation.shape)
midpoints = np.array([np.fix(axis_size / 2) for axis_size in shape])
shifts = np.stack(maxima).astype(np.float64)
shifts[shifts > midpoints] -= np.array(shape)[shifts > midpoints]
if upsample_factor == 1:
if return_error:
src_amp = np.sum(np.real(src_freq * src_freq.conj()))
src_amp /= src_freq.size
target_amp = np.sum(np.real(target_freq * target_freq.conj()))
target_amp /= target_freq.size
CCmax = cross_correlation[maxima]
# If upsampling > 1, then refine estimate with matrix multiply DFT
else:
# Initial shift estimate in upsampled grid
shifts = np.round(shifts * upsample_factor) / upsample_factor
upsampled_region_size = np.ceil(upsample_factor * 1.5)
# Center of output array at dftshift + 1
dftshift = np.fix(upsampled_region_size / 2.0)
upsample_factor = np.array(upsample_factor, dtype=np.float64)
# Matrix multiply DFT around the current shift estimate
sample_region_offset = dftshift - shifts*upsample_factor
cross_correlation = _upsampled_dft(image_product.conj(),
upsampled_region_size,
upsample_factor,
sample_region_offset).conj()
# Locate maximum and map back to original pixel grid
maxima = np.unravel_index(np.argmax(np.abs(cross_correlation)),
cross_correlation.shape)
CCmax = cross_correlation[maxima]
maxima = np.stack(maxima).astype(np.float64) - dftshift
shifts = shifts + maxima / upsample_factor
if return_error:
src_amp = np.sum(np.real(src_freq * src_freq.conj()))
target_amp = np.sum(np.real(target_freq * target_freq.conj()))
# If its only one row or column the shift along that dimension has no
# effect. We set to zero.
for dim in range(src_freq.ndim):
if shape[dim] == 1:
shifts[dim] = 0
if return_error:
# Redirect user to masked_phase_cross_correlation if NaNs are observed
if np.isnan(CCmax) or np.isnan(src_amp) or np.isnan(target_amp):
raise ValueError(
"NaN values found, please remove NaNs from your input data")
return shifts, _compute_error(CCmax, src_amp, target_amp), \
_compute_phasediff(CCmax)
else:
return shifts

563
package/lib/deconvolve.py Executable file
View File

@@ -0,0 +1,563 @@
"""
Library functions for the implementation of various deconvolution algorithms.
prototypes :
- gaussian_psf(FWHM, shape) -> kernel
Return the normalized gaussian point spread function over some kernel shape.
- from_file_psf(filename) -> kernel
Get the point spread function from an external FITS file.
- wiener(image, psf, alpha, clip) -> im_deconv
Implement the simplified Wiener filtering.
- van_cittert(image, psf, alpha, iterations, clip, filter_epsilon) -> im_deconv
Implement Van-Cittert iterative algorithm.
- richardson_lucy(image, psf, iterations, clip, filter_epsilon) -> im_deconv
Implement Richardson-Lucy iterative algorithm.
- one_step_gradient(image, psf, iterations, clip, filter_epsilon) -> im_deconv
Implement One-step gradient iterative algorithm.
- conjgrad(image, psf, alpha, error, iterations) -> im_deconv
Implement the Conjugate Gradient algorithm.
- deconvolve_im(image, psf, alpha, error, iterations, clip, filter_epsilon, algo) -> im_deconv
Prepare data for deconvolution using specified algorithm.
"""
import numpy as np
from scipy.signal import convolve
from astropy.io import fits
def abs2(x):
"""Returns the squared absolute value of its agument."""
if np.iscomplexobj(x):
x_re = x.real
x_im = x.imag
return x_re*x_re + x_im*x_im
else:
return x*x
def zeropad(arr, shape):
"""
Zero-pad array ARR to given shape.
The contents of ARR is approximately centered in the result.
"""
rank = arr.ndim
if len(shape) != rank:
raise ValueError("bad number of dimensions")
diff = np.asarray(shape) - np.asarray(arr.shape)
if diff.min() < 0:
raise ValueError("output dimensions must be larger or equal input dimensions")
offset = diff//2
z = np.zeros(shape, dtype=arr.dtype)
if rank == 1:
i0 = offset[0]
n0 = i0 + arr.shape[0]
z[i0:n0] = arr
elif rank == 2:
i0 = offset[0]
n0 = i0 + arr.shape[0]
i1 = offset[1]
n1 = i1 + arr.shape[1]
z[i0:n0, i1:n1] = arr
elif rank == 3:
i0 = offset[0]
n0 = i0 + arr.shape[0]
i1 = offset[1]
n1 = i1 + arr.shape[1]
i2 = offset[2]
n2 = i2 + arr.shape[2]
z[i0:n0, i1:n1, i2:n2] = arr
elif rank == 4:
i0 = offset[0]
n0 = i0 + arr.shape[0]
i1 = offset[1]
n1 = i1 + arr.shape[1]
i2 = offset[2]
n2 = i2 + arr.shape[2]
i3 = offset[3]
n3 = i3 + arr.shape[3]
z[i0:n0, i1:n1, i2:n2, i3:n3] = arr
elif rank == 5:
i0 = offset[0]
n0 = i0 + arr.shape[0]
i1 = offset[1]
n1 = i1 + arr.shape[1]
i2 = offset[2]
n2 = i2 + arr.shape[2]
i3 = offset[3]
n3 = i3 + arr.shape[3]
i4 = offset[4]
n4 = i4 + arr.shape[4]
z[i0:n0, i1:n1, i2:n2, i3:n3, i4:n4] = arr
elif rank == 6:
i0 = offset[0]
n0 = i0 + arr.shape[0]
i1 = offset[1]
n1 = i1 + arr.shape[1]
i2 = offset[2]
n2 = i2 + arr.shape[2]
i3 = offset[3]
n3 = i3 + arr.shape[3]
i4 = offset[4]
n4 = i4 + arr.shape[4]
i5 = offset[5]
n5 = i5 + arr.shape[5]
z[i0:n0, i1:n1, i2:n2, i3:n3, i4:n4, i5:n5] = arr
else:
raise ValueError("too many dimensions")
return z
def gaussian2d(x, y, sigma):
return np.exp(-(x**2+y**2)/(2*sigma**2))/(2*np.pi*sigma**2)
def gaussian_psf(FWHM=1., shape=(5, 5)):
"""
Define the gaussian Point-Spread-Function of chosen shape and FWHM.
----------
Inputs:
FWHM : float, optional
The Full Width at Half Maximum of the desired gaussian function for the
PSF in pixel increments.
Defaults to 1.
shape : tuple, optional
The shape of the PSF kernel. Must be of dimension 2.
Defaults to (5,5).
----------
Returns:
kernel : numpy.ndarray
Kernel containing the weights of the desired gaussian PSF.
"""
# Compute standard deviation from FWHM
stdev = FWHM/(2.*np.sqrt(2.*np.log(2.)))
# Create kernel of desired shape
x, y = np.meshgrid(np.arange(-shape[0]/2, shape[0]/2), np.arange(-shape[1]/2, shape[1]/2))
kernel = gaussian2d(x, y, stdev)
return kernel/kernel.sum()
def from_file_psf(filename):
"""
Get the Point-Spread-Function from an external FITS file.
Such PSF can be generated using the TinyTim standalone program by STSCI.
See:
[1] https://www.stsci.edu/hst/instrumentation/focus-and-pointing/focus/tiny-tim-hst-psf-modeling
[2] https://doi.org/10.1117/12.892762
----------
Inputs:
filename : str
----------
kernel : numpy.ndarray
Kernel containing the weights of the desired gaussian PSF.
"""
with fits.open(filename) as f:
psf = f[0].data
if isinstance(psf, np.ndarray) or len(psf) != 2:
raise ValueError("Invalid PSF image in PrimaryHDU at {0:s}".format(filename))
# Return the normalized Point Spread Function
kernel = psf/psf.max()
return kernel
def wiener(image, psf, alpha=0.1, clip=True):
"""
Implement the simplified Wiener filtering.
----------
Inputs:
image : numpy.ndarray
Input degraded image (can be N dimensional) of floats.
psf : numpy.ndarray
The kernel of the point spread function. Must have shape less or equal to
the image shape. If less, it will be zeropadded.
alpha : float, optional
A parameter value for numerous deconvolution algorithms.
Defaults to 0.1
clip : boolean, optional
If true, pixel value of the result above 1 or under -1 are thresholded
for skimage pipeline compatibility.
Defaults to True.
----------
Returns:
im_deconv : ndarray
The deconvolved image.
"""
float_type = np.promote_types(image.dtype, np.float32)
image = image.astype(float_type, copy=False)
psf = zeropad(psf.astype(float_type, copy=False), image.shape)
psf /= psf.sum()
im_deconv = image.copy()
ft_y = np.fft.fftn(im_deconv)
ft_h = np.fft.fftn(np.fft.ifftshift(psf))
ft_x = ft_h.conj()*ft_y / (abs2(ft_h) + alpha)
im_deconv = np.fft.ifftn(ft_x).real
if clip:
im_deconv[im_deconv > 1] = 1
im_deconv[im_deconv < -1] = -1
return im_deconv/im_deconv.max()
def van_cittert(image, psf, alpha=0.1, iterations=20, clip=True, filter_epsilon=None):
"""
Van-Citter deconvolution algorithm.
----------
Inputs:
image : numpy.darray
Input degraded image (can be N dimensional) of floats between 0 and 1.
psf : numpy.darray
The point spread function.
alpha : float, optional
A weight parameter for the deconvolution step.
iterations : int, optional
Number of iterations. This parameter plays the role of
regularisation.
clip : boolean, optional
True by default. If true, pixel value of the result above 1 or
under -1 are thresholded for skimage pipeline compatibility.
filter_epsilon: float, optional
Value below which intermediate results become 0 to avoid division
by small numbers.
----------
Returns:
im_deconv : ndarray
The deconvolved image.
"""
float_type = np.promote_types(image.dtype, np.float32)
image = image.astype(float_type, copy=False)
psf = psf.astype(float_type, copy=False)
psf /= psf.sum()
im_deconv = image.copy()
for _ in range(iterations):
conv = convolve(im_deconv, psf, mode='same')
if filter_epsilon:
relative_blur = np.where(conv < filter_epsilon, 0, image - conv)
else:
relative_blur = image - conv
im_deconv += alpha*relative_blur
if clip:
im_deconv[im_deconv > 1] = 1
im_deconv[im_deconv < -1] = -1
return im_deconv
def richardson_lucy(image, psf, iterations=20, clip=True, filter_epsilon=None):
"""
Richardson-Lucy deconvolution algorithm.
----------
Inputs:
image : numpy.darray
Input degraded image (can be N dimensional) of floats between 0 and 1.
psf : numpy.darray
The point spread function.
iterations : int, optional
Number of iterations. This parameter plays the role of
regularisation.
clip : boolean, optional
True by default. If true, pixel value of the result above 1 or
under -1 are thresholded for skimage pipeline compatibility.
filter_epsilon: float, optional
Value below which intermediate results become 0 to avoid division
by small numbers.
----------
Returns:
im_deconv : ndarray
The deconvolved image.
----------
References
[1] https://doi.org/10.1364/JOSA.62.000055
[2] https://en.wikipedia.org/wiki/Richardson%E2%80%93Lucy_deconvolution
"""
float_type = np.promote_types(image.dtype, np.float32)
image = image.astype(float_type, copy=False)
psf = psf.astype(float_type, copy=False)
psf /= psf.sum()
im_deconv = image.copy()
psf_mirror = np.flip(psf)
for _ in range(iterations):
conv = convolve(im_deconv, psf, mode='same')
if filter_epsilon:
relative_blur = np.where(conv < filter_epsilon, 0, image / conv)
else:
relative_blur = image / conv
im_deconv *= convolve(relative_blur, psf_mirror, mode='same')
if clip:
im_deconv[im_deconv > 1] = 1
im_deconv[im_deconv < -1] = -1
return im_deconv
def one_step_gradient(image, psf, iterations=20, clip=True, filter_epsilon=None):
"""
One-step gradient deconvolution algorithm.
----------
Inputs:
image : numpy.darray
Input degraded image (can be N dimensional) of floats between 0 and 1.
psf : numpy.darray
The point spread function.
iterations : int, optional
Number of iterations. This parameter plays the role of
regularisation.
clip : boolean, optional
True by default. If true, pixel value of the result above 1 or
under -1 are thresholded for skimage pipeline compatibility.
filter_epsilon: float, optional
Value below which intermediate results become 0 to avoid division
by small numbers.
----------
Returns:
im_deconv : ndarray
The deconvolved image.
"""
float_type = np.promote_types(image.dtype, np.float32)
image = image.astype(float_type, copy=False)
psf = psf.astype(float_type, copy=False)
psf /= psf.sum()
im_deconv = image.copy()
psf_mirror = np.flip(psf)
for _ in range(iterations):
conv = convolve(im_deconv, psf, mode='same')
if filter_epsilon:
relative_blur = np.where(conv < filter_epsilon, 0, image - conv)
else:
relative_blur = image - conv
im_deconv += convolve(relative_blur, psf_mirror, mode='same')
if clip:
im_deconv[im_deconv > 1] = 1
im_deconv[im_deconv < -1] = -1
return im_deconv
def conjgrad(image, psf, alpha=0.1, error=None, iterations=20):
"""
Implement the Conjugate Gradient algorithm.
----------
Inputs:
image : numpy.ndarray
Input degraded image (can be N dimensional) of floats.
psf : numpy.ndarray
The kernel of the point spread function. Must have shape less or equal to
the image shape. If less, it will be zeropadded.
alpha : float, optional
A weight parameter for the regularisation matrix.
Defaults to 0.1
error : numpy.ndarray, optional
Known background noise on the inputed image. Will be used for weighted
deconvolution. If None, all weights will be set to 1.
Defaults to None.
iterations : int, optional
Number of iterations. This parameter plays the role of
regularisation.
Defaults to 20.
----------
Returns:
im_deconv : ndarray
The deconvolved image.
"""
float_type = np.promote_types(image.dtype, np.float32)
image = image.astype(float_type, copy=False)
psf = psf.astype(float_type, copy=False)
psf /= psf.sum()
# A.x = b avec A = HtWH+aDtD et b = HtWy
# Define ft_h : the zeropadded and shifted Fourier transform of the PSF
ft_h = np.fft.fftn(np.fft.ifftshift(zeropad(psf, image.shape)))
# Define weights as normalized signal to noise ratio
if error is None:
wgt = np.ones(image.shape)
else:
wgt = image/error
wgt /= wgt.max()
def W(x):
"""Define W operator : apply weights"""
return wgt*x
def H(x):
"""Define H operator : convolution with PSF"""
return np.fft.ifftn(ft_h*np.fft.fftn(x)).real
def Ht(x):
"""Define Ht operator : transpose of H"""
return np.fft.ifftn(ft_h.conj()*np.fft.fftn(x)).real
def DtD(x):
"""Returns the result of D'.D.x where D is a (multi-dimensional)
finite difference operator and D' is its transpose."""
dims = x.shape
r = np.zeros(dims, dtype=x.dtype) # to store the result
rank = x.ndim # number of dimensions
if rank == 0:
return r
if dims[0] >= 2:
dx = x[1:-1, ...] - x[0:-2, ...]
r[1:-1, ...] += dx
r[0:-2, ...] -= dx
if rank == 1:
return r
if dims[1] >= 2:
dx = x[:, 1:-1, ...] - x[:, 0:-2, ...]
r[:, 1:-1, ...] += dx
r[:, 0:-2, ...] -= dx
if rank == 2:
return r
if dims[2] >= 2:
dx = x[:, :, 1:-1, ...] - x[:, :, 0:-2, ...]
r[:, :, 1:-1, ...] += dx
r[:, :, 0:-2, ...] -= dx
if rank == 3:
return r
if dims[3] >= 2:
dx = x[:, :, :, 1:-1, ...] - x[:, :, :, 0:-2, ...]
r[:, :, :, 1:-1, ...] += dx
r[:, :, :, 0:-2, ...] -= dx
if rank == 4:
return r
if dims[4] >= 2:
dx = x[:, :, :, :, 1:-1, ...] - x[:, :, :, :, 0:-2, ...]
r[:, :, :, :, 1:-1, ...] += dx
r[:, :, :, :, 0:-2, ...] -= dx
if rank == 5:
return r
raise ValueError("too many dimensions")
def A(x):
"""Define symetric positive semi definite operator A"""
return Ht(W(H(x)))+alpha*DtD(x)
# Define obtained vector A.x = b
b = Ht(W(image))
def inner(x, y):
"""Compute inner product of X and Y regardless their shapes
(their number of elements must however match)."""
return np.inner(x.ravel(), y.ravel())
# Compute initial residuals.
r = np.copy(b)
x = np.zeros(b.shape, dtype=b.dtype)
rho = inner(r, r)
epsilon = np.max([0., 1e-5*np.sqrt(rho)])
# Conjugate gradient iterations.
beta = 0.0
k = 0
while (k <= iterations) and (np.sqrt(rho) > epsilon):
if np.sqrt(rho) <= epsilon:
print("Converged before maximum iteration.")
break
k += 1
if k > iterations:
print("Didn't converge before maximum iteration.")
break
# Next search direction.
if beta == 0.0:
p = r
else:
p = r + beta*p
# Make optimal step along search direction.
q = A(p)
gamma = inner(p, q)
if gamma <= 0.0:
raise ValueError("Operator A is not positive definite")
alpha = rho/gamma
x += alpha*p
r -= alpha*q
rho_prev, rho = rho, inner(r, r)
beta = rho/rho_prev
# Return normalized solution
im_deconv = x/x.max()
return im_deconv
def deconvolve_im(image, psf, alpha=0.1, error=None, iterations=20, clip=True,
filter_epsilon=None, algo='richardson'):
"""
Prepare an image for deconvolution using a chosen algorithm and return
results.
----------
Inputs:
image : numpy.ndarray
Input degraded image (can be N dimensional) of floats.
psf : numpy.ndarray
The kernel of the point spread function. Must have shape less or equal to
the image shape. If less, it will be zeropadded.
alpha : float, optional
A parameter value for numerous deconvolution algorithms.
Defaults to 0.1
error : numpy.ndarray, optional
Known background noise on the inputed image. Will be used for weighted
deconvolution. If None, all weights will be set to 1.
Defaults to None.
iterations : int, optional
Number of iterations. This parameter plays the role of
regularisation.
Defaults to 20.
clip : boolean, optional
If true, pixel value of the result above 1 or under -1 are thresholded
for skimage pipeline compatibility.
Defaults to True.
filter_epsilon: float, optional
Value below which intermediate results become 0 to avoid division
by small numbers.
Defaults to None.
algo : str, optional
Name of the deconvolution algorithm that will be used. Implemented
algorithms are the following : 'Wiener', 'Van-Cittert', 'One Step Gradient',
'Conjugate Gradient' and 'Richardson-Lucy'.
Defaults to 'Richardson-Lucy'.
----------
Returns:
im_deconv : ndarray
The deconvolved image.
"""
# Normalize image to highest pixel value
pxmax = image[np.isfinite(image)].max()
if pxmax == 0.:
raise ValueError("Invalid image")
norm_image = image/pxmax
# Deconvolve normalized image
if algo.lower() in ['wiener', 'wiener simple']:
norm_deconv = wiener(image=norm_image, psf=psf, alpha=alpha, clip=clip)
elif algo.lower() in ['van-cittert', 'vancittert', 'cittert']:
norm_deconv = van_cittert(image=norm_image, psf=psf, alpha=alpha,
iterations=iterations, clip=clip, filter_epsilon=filter_epsilon)
elif algo.lower() in ['1grad', 'one_step_grad', 'one step grad']:
norm_deconv = one_step_gradient(image=norm_image, psf=psf,
iterations=iterations, clip=clip, filter_epsilon=filter_epsilon)
elif algo.lower() in ['conjgrad', 'conj_grad', 'conjugate gradient']:
norm_deconv = conjgrad(image=norm_image, psf=psf, alpha=alpha,
error=error, iterations=iterations)
else: # Defaults to Richardson-Lucy
norm_deconv = richardson_lucy(image=norm_image, psf=psf,
iterations=iterations, clip=clip, filter_epsilon=filter_epsilon)
# Output deconvolved image with original pxmax value
im_deconv = pxmax*norm_deconv
return im_deconv

206
package/lib/fits.py Executable file
View File

@@ -0,0 +1,206 @@
"""
Library function for simplified fits handling.
prototypes :
- get_obs_data(infiles, data_folder) -> data_array, headers
Extract the observationnal data from fits files
- save_Stokes(I, Q, U, Stokes_cov, P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P, headers, data_mask, filename, data_folder, return_hdul) -> ( HDUL_data )
Save computed polarimetry parameters to a single fits file (and return HDUList)
"""
import numpy as np
from os.path import join as path_join
from astropy.io import fits
from astropy.wcs import WCS
from .convex_hull import clean_ROI
def get_obs_data(infiles, data_folder="", compute_flux=False):
"""
Extract the observationnal data from the given fits files.
----------
Inputs:
infiles : strlist
List of the fits file names to be added to the observation set.
data_folder : str, optional
Relative or absolute path to the folder containing the data.
compute_flux : boolean, optional
If True, return data_array will contain flux information, assuming
raw data are counts and header have keywork EXPTIME and PHOTFLAM.
Default to False.
----------
Returns:
data_array : numpy.ndarray
Array of images (2D floats) containing the observation data.
headers : header list
List of headers objects corresponding to each image in data_array.
"""
data_array, headers = [], []
for i in range(len(infiles)):
with fits.open(path_join(data_folder, infiles[i])) as f:
headers.append(f[0].header)
data_array.append(f[0].data)
data_array = np.array(data_array, dtype=np.double)
# Prevent negative count value in imported data
for i in range(len(data_array)):
data_array[i][data_array[i] < 0.] = 0.
# force WCS to convention PCi_ja unitary, cdelt in deg
for header in headers:
new_wcs = WCS(header).celestial.deepcopy()
if new_wcs.wcs.has_cd() or (new_wcs.wcs.cdelt[:2] == np.array([1., 1.])).all():
# Update WCS with relevant information
if new_wcs.wcs.has_cd():
old_cd = new_wcs.wcs.cd
del new_wcs.wcs.cd
keys = list(new_wcs.to_header().keys())+['CD1_1', 'CD1_2', 'CD1_3', 'CD2_1', 'CD2_2', 'CD2_3', 'CD3_1', 'CD3_2', 'CD3_3']
for key in keys:
header.remove(key, ignore_missing=True)
new_cdelt = np.linalg.eig(old_cd)[0]
elif (new_wcs.wcs.cdelt == np.array([1., 1.])).all() and \
(new_wcs.array_shape in [(512, 512), (1024, 512), (512, 1024), (1024, 1024)]):
old_cd = new_wcs.wcs.pc
new_wcs.wcs.pc = np.dot(old_cd, np.diag(1./new_cdelt))
new_wcs.wcs.cdelt = new_cdelt
for key, val in new_wcs.to_header().items():
header[key] = val
# header['orientat'] = princ_angle(float(header['orientat']))
# force WCS for POL60 to have same pixel size as POL0 and POL120
is_pol60 = np.array([head['filtnam1'].lower() == 'pol60' for head in headers], dtype=bool)
cdelt = np.round(np.array([WCS(head).wcs.cdelt[:2] for head in headers]), 14)
if np.unique(cdelt[np.logical_not(is_pol60)], axis=0).size != 2:
print(np.unique(cdelt[np.logical_not(is_pol60)], axis=0))
raise ValueError("Not all images have same pixel size")
else:
for i in np.arange(len(headers))[is_pol60]:
headers[i]['cdelt1'], headers[i]['cdelt2'] = np.unique(cdelt[np.logical_not(is_pol60)], axis=0)[0]
if compute_flux:
for i in range(len(infiles)):
# Compute the flux in counts/sec
data_array[i] /= headers[i]['EXPTIME']
return data_array, headers
def save_Stokes(I_stokes, Q_stokes, U_stokes, Stokes_cov, P, debiased_P, s_P,
s_P_P, PA, s_PA, s_PA_P, headers, data_mask, filename, data_folder="",
return_hdul=False):
"""
Save computed polarimetry parameters to a single fits file,
updating header accordingly.
----------
Inputs:
I_stokes, Q_stokes, U_stokes, P, debiased_P, s_P, s_P_P, PA, s_PA, s_PA_P : numpy.ndarray
Images (2D float arrays) containing the computed polarimetric data :
Stokes parameters I, Q, U, Polarization degree and debieased,
its error propagated and assuming Poisson noise, Polarization angle,
its error propagated and assuming Poisson noise.
Stokes_cov : numpy.ndarray
Covariance matrix of the Stokes parameters I, Q, U.
headers : header list
Header of reference some keywords will be copied from (CRVAL, CDELT,
INSTRUME, PROPOSID, TARGNAME, ORIENTAT, EXPTOT).
data_mask : numpy.ndarray
2D boolean array delimiting the data to work on.
filename : str
Name that will be given to the file on writing (will appear in header).
data_folder : str, optional
Relative or absolute path to the folder the fits file will be saved to.
Defaults to current folder.
return_hdul : boolean, optional
If True, the function will return the created HDUList from the
input arrays.
Defaults to False.
----------
Return:
hdul : astropy.io.fits.hdu.hdulist.HDUList
HDUList containing I_stokes in the PrimaryHDU, then Q_stokes, U_stokes,
P, s_P, PA, s_PA in this order. Headers have been updated to relevant
informations (WCS, orientation, data_type).
Only returned if return_hdul is True.
"""
# Create new WCS object given the modified images
ref_header = headers[0]
exp_tot = np.array([header['exptime'] for header in headers]).sum()
new_wcs = WCS(ref_header).deepcopy()
if data_mask.shape != (1, 1):
vertex = clean_ROI(data_mask)
shape = vertex[1::2]-vertex[0::2]
new_wcs.array_shape = shape
new_wcs.wcs.crpix = np.array(new_wcs.wcs.crpix) - vertex[0::-2]
header = new_wcs.to_header()
header['telescop'] = (ref_header['telescop'] if 'TELESCOP' in list(ref_header.keys()) else 'HST', 'telescope used to acquire data')
header['instrume'] = (ref_header['instrume'] if 'INSTRUME' in list(ref_header.keys()) else 'FOC', 'identifier for instrument used to acuire data')
header['photplam'] = (ref_header['photplam'], 'Pivot Wavelength')
header['photflam'] = (ref_header['photflam'], 'Inverse Sensitivity in DN/sec/cm**2/Angst')
header['exptot'] = (exp_tot, 'Total exposure time in sec')
header['proposid'] = (ref_header['proposid'], 'PEP proposal identifier for observation')
header['targname'] = (ref_header['targname'], 'Target name')
header['orientat'] = (ref_header['orientat'], 'Angle between North and the y-axis of the image')
header['filename'] = (filename, 'Original filename')
header['P_int'] = (ref_header['P_int'], 'Integrated polarisation degree')
header['P_int_err'] = (ref_header['P_int_err'], 'Integrated polarisation degree error')
header['PA_int'] = (ref_header['PA_int'], 'Integrated polarisation angle')
header['PA_int_err'] = (ref_header['PA_int_err'], 'Integrated polarisation angle error')
# Crop Data to mask
if data_mask.shape != (1, 1):
I_stokes = I_stokes[vertex[2]:vertex[3], vertex[0]:vertex[1]]
Q_stokes = Q_stokes[vertex[2]:vertex[3], vertex[0]:vertex[1]]
U_stokes = U_stokes[vertex[2]:vertex[3], vertex[0]:vertex[1]]
P = P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
debiased_P = debiased_P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
s_P = s_P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
s_P_P = s_P_P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
PA = PA[vertex[2]:vertex[3], vertex[0]:vertex[1]]
s_PA = s_PA[vertex[2]:vertex[3], vertex[0]:vertex[1]]
s_PA_P = s_PA_P[vertex[2]:vertex[3], vertex[0]:vertex[1]]
new_Stokes_cov = np.zeros((*Stokes_cov.shape[:-2], *shape[::-1]))
for i in range(3):
for j in range(3):
Stokes_cov[i, j][(1-data_mask).astype(bool)] = 0.
new_Stokes_cov[i, j] = Stokes_cov[i, j][vertex[2]:vertex[3], vertex[0]:vertex[1]]
Stokes_cov = new_Stokes_cov
data_mask = data_mask[vertex[2]:vertex[3], vertex[0]:vertex[1]]
data_mask = data_mask.astype(float, copy=False)
# Create HDUList object
hdul = fits.HDUList([])
# Add I_stokes as PrimaryHDU
header['datatype'] = ('I_stokes', 'type of data stored in the HDU')
I_stokes[(1-data_mask).astype(bool)] = 0.
primary_hdu = fits.PrimaryHDU(data=I_stokes, header=header)
primary_hdu.name = 'I_stokes'
hdul.append(primary_hdu)
# Add Q, U, Stokes_cov, P, s_P, PA, s_PA to the HDUList
for data, name in [[Q_stokes, 'Q_stokes'], [U_stokes, 'U_stokes'],
[Stokes_cov, 'IQU_cov_matrix'], [P, 'Pol_deg'],
[debiased_P, 'Pol_deg_debiased'], [s_P, 'Pol_deg_err'],
[s_P_P, 'Pol_deg_err_Poisson_noise'], [PA, 'Pol_ang'],
[s_PA, 'Pol_ang_err'], [s_PA_P, 'Pol_ang_err_Poisson_noise'],
[data_mask, 'Data_mask']]:
hdu_header = header.copy()
hdu_header['datatype'] = name
if not name == 'IQU_cov_matrix':
data[(1-data_mask).astype(bool)] = 0.
hdu = fits.ImageHDU(data=data, header=hdu_header)
hdu.name = name
hdul.append(hdu)
# Save fits file to designated filepath
hdul.writeto(path_join(data_folder, filename+".fits"), overwrite=True)
if return_hdul:
return hdul
else:
return 0

2487
package/lib/plots.py Executable file

File diff suppressed because it is too large Load Diff

195
package/lib/query.py Executable file
View File

@@ -0,0 +1,195 @@
#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
Library function to query and download datatsets from MAST api.
"""
from os import system
from os.path import join as path_join, exists as path_exists
from astroquery.mast import MastMissions, Observations
from astropy.table import unique, Column
from astropy.time import Time, TimeDelta
import astropy.units as u
import numpy as np
def divide_proposal(products):
"""
Divide observation in proposals by time or filter
"""
for pid in np.unique(products['Proposal ID']):
obs = products[products['Proposal ID'] == pid].copy()
same_filt = np.unique(np.array(np.sum([obs['Filters'][:, 1:] == filt[1:] for filt in obs['Filters']], axis=2) < 3, dtype=bool), axis=0)
if len(same_filt) > 1:
for filt in same_filt:
products['Proposal ID'][np.any([products['Dataset'] == dataset for dataset in obs['Dataset'][filt]], axis=0)] = "_".join(
[obs['Proposal ID'][filt][0], "_".join([fi for fi in obs['Filters'][filt][0][1:] if fi[:-1] != "CLEAR"])])
for pid in np.unique(products['Proposal ID']):
obs = products[products['Proposal ID'] == pid].copy()
close_date = np.unique([[np.abs(TimeDelta(obs['Start'][i].unix-date.unix, format='sec'))
< 7.*u.d for i in range(len(obs))] for date in obs['Start']], axis=0)
if len(close_date) > 1:
for date in close_date:
products['Proposal ID'][np.any([products['Dataset'] == dataset for dataset in obs['Dataset'][date]], axis=0)
] = "_".join([obs['Proposal ID'][date][0], str(obs['Start'][date][0])[:10]])
return products
def get_product_list(target=None, proposal_id=None):
"""
Retrieve products list for a given target from the MAST archive
"""
mission = MastMissions(mission='hst')
radius = '3'
select_cols = [
'sci_data_set_name',
'sci_spec_1234',
'sci_actual_duration',
'sci_start_time',
'sci_stop_time',
'sci_central_wavelength',
'sci_instrume',
'sci_aper_1234',
'sci_targname',
'sci_pep_id',
'sci_pi_last_name']
cols = [
'Dataset',
'Filters',
'Exptime',
'Start',
'Stop',
'Central wavelength',
'Instrument',
'Size',
'Target name',
'Proposal ID',
'PI last name']
if target is None:
target = input("Target name:\n>")
# Use query_object method to resolve the object name into coordinates
results = mission.query_object(
target,
radius=radius,
select_cols=select_cols,
sci_spec_1234='POL*',
sci_obs_type='image',
sci_aec='S',
sci_instrume='foc')
for c, n_c in zip(select_cols, cols):
results.rename_column(c, n_c)
results['Proposal ID'] = Column(results['Proposal ID'], dtype='U35')
results['Filters'] = Column(np.array([filt.split(";") for filt in results['Filters']], dtype=str))
results['Start'] = Column(Time(results['Start']))
results['Stop'] = Column(Time(results['Stop']))
results = divide_proposal(results)
obs = results.copy()
# Remove single observations for which a FIND filter is used
to_remove = []
for i in range(len(obs)):
if "F1ND" in obs[i]['Filters']:
to_remove.append(i)
obs.remove_rows(to_remove)
# Remove observations for which a polarization filter is missing
polfilt = {"POL0": 0, "POL60": 1, "POL120": 2}
for pid in np.unique(obs['Proposal ID']):
used_pol = np.zeros(3)
for dataset in obs[obs['Proposal ID'] == pid]:
used_pol[polfilt[dataset['Filters'][0]]] += 1
if np.any(used_pol < 1):
obs.remove_rows(np.arange(len(obs))[obs['Proposal ID'] == pid])
tab = unique(obs, ['Target name', 'Proposal ID'])
obs["Obs"] = [np.argmax(np.logical_and(tab['Proposal ID'] == data['Proposal ID'], tab['Target name'] == data['Target name']))+1 for data in obs]
try:
n_obs = unique(obs[["Obs", "Filters", "Start", "Central wavelength", "Instrument", "Size", "Target name", "Proposal ID", "PI last name"]], 'Obs')
except IndexError:
raise ValueError(
"There is no observation with POL0, POL60 and POL120 for {0:s} in HST/FOC Legacy Archive".format(target))
b = np.zeros(len(results), dtype=bool)
if proposal_id is not None and str(proposal_id) in obs['Proposal ID']:
b[results['Proposal ID'] == str(proposal_id)] = True
else:
n_obs.pprint(len(n_obs)+2)
a = [np.array(i.split(":"), dtype=str)
for i in input("select observations to be downloaded ('1,3,4,5' or '1,3:5' or 'all','*' default to 1)\n>").split(',')]
if a[0][0] == '':
a = [[1]]
if a[0][0] in ['a', 'all', '*']:
b = np.ones(len(results), dtype=bool)
else:
a = [np.array(i, dtype=int) for i in a]
for i in a:
if len(i) > 1:
for j in range(i[0], i[1]+1):
b[np.array([dataset in obs['Dataset'][obs["Obs"] == j] for dataset in results['Dataset']])] = True
else:
b[np.array([dataset in obs['Dataset'][obs['Obs'] == i[0]] for dataset in results['Dataset']])] = True
observations = Observations.query_criteria(obs_id=list(results['Dataset'][b]))
products = Observations.filter_products(Observations.get_product_list(observations),
productType=['SCIENCE'],
dataproduct_type=['image'],
calib_level=[2],
description="DADS C0F file - Calibrated exposure WFPC/WFPC2/FOC/FOS/GHRS/HSP")
products['proposal_id'] = Column(products['proposal_id'], dtype='U35')
products['target_name'] = Column(observations['target_name'])
for prod in products:
prod['proposal_id'] = results['Proposal ID'][results['Dataset'] == prod['productFilename'][:len(results['Dataset'][0])].upper()][0]
for prod in products:
prod['target_name'] = observations['target_name'][observations['obsid'] == prod['obsID']][0]
tab = unique(products, ['target_name', 'proposal_id'])
products["Obs"] = [np.argmax(np.logical_and(tab['proposal_id'] == data['proposal_id'], tab['target_name'] == data['target_name']))+1 for data in products]
return target, products
def retrieve_products(target=None, proposal_id=None, output_dir='./data'):
"""
Given a target name and a proposal_id, create the local directories and retrieve the fits files from the MAST Archive
"""
target, products = get_product_list(target=target, proposal_id=proposal_id)
prodpaths = []
# data_dir = path_join(output_dir, target)
out = ""
for obs in unique(products, 'Obs'):
filepaths = []
# obs_dir = path_join(data_dir, obs['prodposal_id'])
# if obs['target_name']!=target:
obs_dir = path_join(path_join(output_dir, target), obs['proposal_id'])
if not path_exists(obs_dir):
system("mkdir -p {0:s} {1:s}".format(obs_dir, obs_dir.replace("data", "plots")))
for file in products['productFilename'][products['Obs'] == obs['Obs']]:
fpath = path_join(obs_dir, file)
if not path_exists(fpath):
out += "{0:s} : {1:s}\n".format(file, Observations.download_file(
products['dataURI'][products['productFilename'] == file][0], local_path=fpath)[0])
else:
out += "{0:s} : Exists\n".format(file)
filepaths.append([obs_dir, file])
prodpaths.append(np.array(filepaths, dtype=str))
return target, prodpaths
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Query MAST for target products')
parser.add_argument('-t', '--target', metavar='targetname', required=False,
help='the name of the target', type=str, default=None)
parser.add_argument('-p', '--proposal_id', metavar='proposal_id', required=False,
help='the proposal id of the data products', type=int, default=None)
parser.add_argument('-o', '--output_dir', metavar='directory_path', required=False,
help='output directory path for the data products', type=str, default="./data")
args = parser.parse_args()
prodpaths = retrieve_products(target=args.target, proposal_id=args.proposal_id)
print(prodpaths)

1581
package/lib/reduction.py Executable file

File diff suppressed because it is too large Load Diff

46
package/lib/utils.py Executable file
View File

@@ -0,0 +1,46 @@
import numpy as np
def rot2D(ang):
"""
Return the 2D rotation matrix of given angle in degrees
"""
alpha = np.pi*ang/180
return np.array([[np.cos(alpha), np.sin(alpha)], [-np.sin(alpha), np.cos(alpha)]])
def princ_angle(ang):
"""
Return the principal angle in the 0° to 180° quadrant as PA is always
defined at p/m 180°.
"""
if not isinstance(ang, np.ndarray):
A = np.array([ang])
else:
A = np.array(ang)
while np.any(A < 0.):
A[A < 0.] = A[A < 0.]+360.
while np.any(A >= 180.):
A[A >= 180.] = A[A >= 180.]-180.
if type(ang) is type(A):
return A
else:
return A[0]
def sci_not(v, err, rnd=1, out=str):
"""
Return the scientifque error notation as a string.
"""
power = - int(('%E' % v)[-3:])+1
output = [r"({0}".format(round(v*10**power, rnd)), round(v*10**power, rnd)]
if isinstance(err, list):
for error in err:
output[0] += r" $\pm$ {0}".format(round(error*10**power, rnd))
output.append(round(error*10**power, rnd))
else:
output[0] += r" $\pm$ {0}".format(round(err*10**power, rnd))
output.append(round(err*10**power, rnd))
if out == str:
return output[0]+r")e{0}".format(-power)
else:
return *output[1:], -power

44
package/overplot_IC5063.py Executable file
View File

@@ -0,0 +1,44 @@
#!/usr/bin/python3
from astropy.io import fits
import numpy as np
from lib.plots import overplot_radio, overplot_pol
from matplotlib.colors import LogNorm
Stokes_UV = fits.open("./data/IC5063/5918/IC5063_FOC_b0.10arcsec_c0.20arcsec.fits")
Stokes_18GHz = fits.open("./data/IC5063/radio/IC5063_18GHz.fits")
Stokes_24GHz = fits.open("./data/IC5063/radio/IC5063_24GHz.fits")
Stokes_103GHz = fits.open("./data/IC5063/radio/IC5063_103GHz.fits")
Stokes_229GHz = fits.open("./data/IC5063/radio/IC5063_229GHz.fits")
Stokes_357GHz = fits.open("./data/IC5063/radio/IC5063_357GHz.fits")
# Stokes_S2 = fits.open("./data/IC5063/POLARIZATION_COMPARISON/S2_rot_crop.fits")
Stokes_IR = fits.open("./data/IC5063/IR/u2e65g01t_c0f_rot.fits")
# levelsMorganti = np.array([1.,2.,3.,8.,16.,32.,64.,128.])
levelsMorganti = np.logspace(-0.1249, 1.97, 7)/100.
levels18GHz = levelsMorganti*Stokes_18GHz[0].data.max()
A = overplot_radio(Stokes_UV, Stokes_18GHz)
A.plot(levels=levels18GHz, SNRp_cut=2.0, SNRi_cut=10.0, savename='./plots/IC5063/18GHz_overplot.pdf', vec_scale=None)
levels24GHz = levelsMorganti*Stokes_24GHz[0].data.max()
B = overplot_radio(Stokes_UV, Stokes_24GHz)
B.plot(levels=levels24GHz, SNRp_cut=2.0, SNRi_cut=10.0, savename='./plots/IC5063/24GHz_overplot.pdf', vec_scale=None)
levels103GHz = levelsMorganti*Stokes_103GHz[0].data.max()
C = overplot_radio(Stokes_UV, Stokes_103GHz)
C.plot(levels=levels103GHz, SNRp_cut=2.0, SNRi_cut=10.0, savename='./plots/IC5063/103GHz_overplot.pdf', vec_scale=None)
levels229GHz = levelsMorganti*Stokes_229GHz[0].data.max()
D = overplot_radio(Stokes_UV, Stokes_229GHz)
D.plot(levels=levels229GHz, SNRp_cut=2.0, SNRi_cut=10.0, savename='./plots/IC5063/229GHz_overplot.pdf', vec_scale=None)
levels357GHz = levelsMorganti*Stokes_357GHz[0].data.max()
E = overplot_radio(Stokes_UV, Stokes_357GHz)
E.plot(levels=levels357GHz, SNRp_cut=2.0, SNRi_cut=10.0, savename='./plots/IC5063/357GHz_overplot.pdf', vec_scale=None)
# F = overplot_pol(Stokes_UV, Stokes_S2)
# F.plot(SNRp_cut=3.0, SNRi_cut=80.0, savename='./plots/IC5063/S2_overplot.pdf', norm=LogNorm(vmin=5e-20,vmax=5e-18))
G = overplot_pol(Stokes_UV, Stokes_IR, cmap='inferno')
G.plot(SNRp_cut=2.0, SNRi_cut=10.0, savename='./plots/IC5063/IR_overplot.pdf', vec_scale=None,
norm=LogNorm(Stokes_IR[0].data.max()*Stokes_IR[0].header['photflam']/1e3, Stokes_IR[0].data.max()*Stokes_IR[0].header['photflam']), cmap='inferno_r')

26
package/overplot_MRK463E.py Executable file
View File

@@ -0,0 +1,26 @@
#!/usr/bin/python3
from astropy.io import fits
import numpy as np
from lib.plots import overplot_chandra, overplot_pol
from matplotlib.colors import LogNorm
Stokes_UV = fits.open("./data/MRK463E/5960/MRK463E_FOC_b0.05arcsec_c0.10arcsec.fits")
Stokes_IR = fits.open("./data/MRK463E/WFPC2/IR_rot_crop.fits")
Stokes_Xr = fits.open("./data/MRK463E/Chandra/X_ray_crop.fits")
levels = np.geomspace(1., 99., 7)
# A = overplot_chandra(Stokes_UV, Stokes_Xr)
# A.plot(levels=levels, SNRp_cut=3.0, SNRi_cut=20.0, zoom=1, savename='./plots/MRK463E/Chandra_overplot.pdf')
B = overplot_chandra(Stokes_UV, Stokes_Xr, norm=LogNorm())
B.plot(levels=levels, SNRp_cut=3.0, SNRi_cut=3.0, vec_scale=5, zoom=1, savename='./plots/MRK463E/Chandra_overplot_forced.pdf')
B.write_to(path1="./data/MRK463E/FOC_data_Chandra.fits", path2="./data/MRK463E/Chandra_data.fits", suffix="aligned")
# C = overplot_pol(Stokes_UV, Stokes_IR)
# C.plot(SNRp_cut=3.0, SNRi_cut=20.0, savename='./plots/MRK463E/IR_overplot.pdf')
levels = np.array([0.8, 2, 5, 10, 20, 50])/100.*Stokes_UV[0].header['photflam']
D = overplot_pol(Stokes_UV, Stokes_IR, norm=LogNorm())
D.plot(levels=levels, SNRp_cut=3.0, SNRi_cut=3.0, vec_scale=5, norm=LogNorm(8.5e-18, 2.5e-15), savename='./plots/MRK463E/IR_overplot_forced.pdf')
D.write_to(path1="./data/MRK463E/FOC_data_WFPC.fits", path2="./data/MRK463E/WFPC_data.fits", suffix="aligned")

0
package/src/__init__.py Normal file
View File

39
package/src/analysis.py Executable file
View File

@@ -0,0 +1,39 @@
#!/usr/bin/python
from getopt import getopt, error as get_error
from sys import argv
arglist = argv[1:]
options = "hf:p:i:l:"
long_options = ["help", "fits=", "snrp=", "snri=", "lim="]
fits_path = None
SNRp_cut, SNRi_cut = 3, 3
flux_lim = None
out_txt = None
try:
arg, val = getopt(arglist, options, long_options)
for curr_arg, curr_val in arg:
if curr_arg in ("-h", "--help"):
print("python3 analysis.py -f <path_to_reduced_fits> -p <SNRp_cut> -i <SNRi_cut> -l <flux_lim>")
elif curr_arg in ("-f", "--fits"):
fits_path = str(curr_val)
elif curr_arg in ("-p", "--snrp"):
SNRp_cut = int(curr_val)
elif curr_arg in ("-i", "--snri"):
SNRi_cut = int(curr_val)
elif curr_arg in ("-l", "--lim"):
flux_lim = list("".join(curr_val).split(','))
except get_error as err:
print(str(err))
if fits_path is not None:
from astropy.io import fits
from lib.plots import pol_map
Stokes_UV = fits.open(fits_path)
p = pol_map(Stokes_UV, SNRp_cut=SNRp_cut, SNRi_cut=SNRi_cut, flux_lim=flux_lim)
else:
print("python3 analysis.py -f <path_to_reduced_fits> -p <SNRp_cut> -i <SNRi_cut> -l <flux_lim>")

View File

@@ -0,0 +1,214 @@
#!/usr/bin/python
from src.lib.background import gauss, bin_centers
from src.lib.deconvolve import zeropad
from src.lib.reduction import align_data
from src.lib.plots import princ_angle
from matplotlib.colors import LogNorm
from os.path import join as path_join
from astropy.io import fits
from astropy.wcs import WCS
from scipy.ndimage import shift
from scipy.optimize import curve_fit
import numpy as np
import matplotlib.pyplot as plt
root_dir = path_join('/home/t.barnouin/Documents/Thesis/HST')
root_dir_K = path_join(root_dir, 'Kishimoto', 'output')
root_dir_S = path_join(root_dir, 'FOC_Reduction', 'output')
root_dir_data_S = path_join(root_dir, 'FOC_Reduction', 'data', 'NGC1068', '5144')
root_dir_plot_S = path_join(root_dir, 'FOC_Reduction', 'plots', 'NGC1068', '5144', 'notaligned')
filename_S = "NGC1068_FOC_b10.00pixel_not_aligned.fits"
plt.rcParams.update({'font.size': 15})
SNRi_cut = 30.
SNRp_cut = 3.
data_K = {}
data_S = {}
for d, i in zip(['I', 'Q', 'U', 'P', 'PA', 'sI', 'sQ', 'sU', 'sP', 'sPA'], [0, 1, 2, 5, 8, (3, 0, 0), (3, 1, 1), (3, 2, 2), 6, 9]):
data_K[d] = np.loadtxt(path_join(root_dir_K, d+'.txt'))
with fits.open(path_join(root_dir_data_S, filename_S)) as f:
if not type(i) is int:
data_S[d] = np.sqrt(f[i[0]].data[i[1], i[2]])
else:
data_S[d] = f[i].data
if i == 0:
header = f[i].header
wcs = WCS(header)
convert_flux = header['photflam']
bkg_S = np.median(data_S['I'])/3
bkg_K = np.median(data_K['I'])/3
# zeropad data to get same size of array
shape = data_S['I'].shape
for d in data_K:
data_K[d] = zeropad(data_K[d], shape)
# shift array to get same information in same pixel
data_arr, error_ar, heads, data_msk, shifts, shifts_err = align_data(np.array([data_S['I'], data_K['I']]), [header, header], error_array=np.array(
[data_S['sI'], data_K['sI']]), background=np.array([bkg_S, bkg_K]), upsample_factor=10., ref_center='center', return_shifts=True)
for d in data_K:
data_K[d] = shift(data_K[d], shifts[1], order=1, cval=0.)
# compute pol components from shifted array
for d in [data_S, data_K]:
for i in d:
d[i][np.isnan(d[i])] = 0.
d['P'] = np.where(np.logical_and(np.isfinite(d['I']), d['I'] > 0.), np.sqrt(d['Q']**2+d['U']**2)/d['I'], 0.)
d['sP'] = np.where(np.logical_and(np.isfinite(d['I']), d['I'] > 0.), np.sqrt((d['Q']**2*d['sQ']**2+d['U']**2*d['sU']**2) /
(d['Q']**2+d['U']**2)+((d['Q']/d['I'])**2+(d['U']/d['I'])**2)*d['sI']**2)/d['I'], 0.)
d['d_P'] = np.where(np.logical_and(np.isfinite(d['P']), np.isfinite(d['sP'])), np.sqrt(d['P']**2-d['sP']**2), 0.)
d['PA'] = 0.5*np.arctan2(d['U'], d['Q'])+np.pi
d['SNRp'] = np.zeros(d['d_P'].shape)
d['SNRp'][d['sP'] > 0.] = d['d_P'][d['sP'] > 0.]/d['sP'][d['sP'] > 0.]
d['SNRi'] = np.zeros(d['I'].shape)
d['SNRi'][d['sI'] > 0.] = d['I'][d['sI'] > 0.]/d['sI'][d['sI'] > 0.]
d['mask'] = np.logical_and(d['SNRi'] > SNRi_cut, d['SNRp'] > SNRp_cut)
data_S['mask'], data_K['mask'] = np.logical_and(data_S['mask'], data_K['mask']), np.logical_and(data_S['mask'], data_K['mask'])
#
# Compute histogram of measured polarization in cut
#
bins = int(data_S['mask'].sum()/5)
bin_size = 1./bins
mod_p = np.linspace(0., 1., 300)
for d in [data_S, data_K]:
d['hist'], d['bin_edges'] = np.histogram(d['d_P'][d['mask']], bins=bins, range=(0., 1.))
d['binning'] = bin_centers(d['bin_edges'])
peak, bins_fwhm = d['binning'][np.argmax(d['hist'])], d['binning'][d['hist'] > d['hist'].max()/2.]
fwhm = bins_fwhm[1]-bins_fwhm[0]
p0 = [d['hist'].max(), peak, fwhm]
try:
popt, pcov = curve_fit(gauss, d['binning'], d['hist'], p0=p0)
except RuntimeError:
popt = p0
d['hist_chi2'] = np.sum((d['hist']-gauss(d['binning'], *popt))**2)/d['hist'].size
d['hist_popt'] = popt
fig_p, ax_p = plt.subplots(num="Polarization degree histogram", figsize=(10, 6), constrained_layout=True)
ax_p.errorbar(data_S['binning'], data_S['hist'], xerr=bin_size/2., fmt='b.', ecolor='b', label='P through this pipeline')
ax_p.plot(mod_p, gauss(mod_p, *data_S['hist_popt']), 'b--', label='mean = {1:.2f}, stdev = {2:.2f}'.format(*data_S['hist_popt']))
ax_p.errorbar(data_K['binning'], data_K['hist'], xerr=bin_size/2., fmt='r.', ecolor='r', label="P through Kishimoto's pipeline")
ax_p.plot(mod_p, gauss(mod_p, *data_K['hist_popt']), 'r--', label='mean = {1:.2f}, stdev = {2:.2f}'.format(*data_K['hist_popt']))
ax_p.set(xlabel="Polarization degree", ylabel="Counts", title="Histogram of polarization degree computed in the cut for both pipelines.")
ax_p.legend()
fig_p.savefig(path_join(root_dir_plot_S, "NGC1068_K_pol_deg.png"), bbox_inches="tight", dpi=300)
#
# Compute angular difference between the maps in cut
#
dtheta = np.where(data_S['mask'], 0.5*np.arctan((np.sin(2*data_S['PA'])*np.cos(2*data_K['PA'])-np.cos(2*data_S['PA']) *
np.cos(2*data_K['PA']))/(np.cos(2*data_S['PA'])*np.cos(2*data_K['PA'])+np.cos(2*data_S['PA'])*np.sin(2*data_K['PA']))), np.nan)
fig_pa = plt.figure(num="Polarization degree alignement")
ax_pa = fig_pa.add_subplot(111, projection=wcs)
cbar_ax_pa = fig_pa.add_axes([0.88, 0.12, 0.01, 0.75])
ax_pa.set_title(r"Degree of alignement $\zeta$ of the polarization angles from the 2 pipelines in the cut")
im_pa = ax_pa.imshow(np.cos(2*dtheta), vmin=-1., vmax=1., origin='lower', cmap='bwr', label=r"$\zeta$ between this pipeline and Kishimoto's")
cbar_pa = plt.colorbar(im_pa, cax=cbar_ax_pa, label=r"$\zeta = \cos\left( 2 \cdot \delta\theta_P \right)$")
ax_pa.coords[0].set_axislabel('Right Ascension (J2000)')
ax_pa.coords[1].set_axislabel('Declination (J2000)')
fig_pa.savefig(path_join(root_dir_plot_S, "NGC1068_K_pol_ang.png"), bbox_inches="tight", dpi=300)
#
# Compute power uncertainty difference between the maps in cut
#
eta = np.where(data_S['mask'], np.abs(data_K['d_P']-data_S['d_P'])/np.sqrt(data_S['sP']**2+data_K['sP']**2)/2., np.nan)
fig_dif_p = plt.figure(num="Polarization power difference ratio")
ax_dif_p = fig_dif_p.add_subplot(111, projection=wcs)
cbar_ax_dif_p = fig_dif_p.add_axes([0.88, 0.12, 0.01, 0.75])
ax_dif_p.set_title(r"Degree of difference $\eta$ of the polarization from the 2 pipelines in the cut")
im_dif_p = ax_dif_p.imshow(eta, vmin=0., vmax=2., origin='lower', cmap='bwr_r', label=r"$\eta$ between this pipeline and Kishimoto's")
cbar_dif_p = plt.colorbar(im_dif_p, cax=cbar_ax_dif_p, label=r"$\eta = \frac{2 \left|P^K-P^S\right|}{\sqrt{{\sigma^K_P}^2+{\sigma^S_P}^2}}$")
ax_dif_p.coords[0].set_axislabel('Right Ascension (J2000)')
ax_dif_p.coords[1].set_axislabel('Declination (J2000)')
fig_dif_p.savefig(path_join(root_dir_plot_S, "NGC1068_K_pol_diff.png"), bbox_inches="tight", dpi=300)
#
# Compute angle uncertainty difference between the maps in cut
#
eta = np.where(data_S['mask'], np.abs(data_K['PA']-data_S['PA'])/np.sqrt(data_S['sPA']**2+data_K['sPA']**2)/2., np.nan)
fig_dif_pa = plt.figure(num="Polarization angle difference ratio")
ax_dif_pa = fig_dif_pa.add_subplot(111, projection=wcs)
cbar_ax_dif_pa = fig_dif_pa.add_axes([0.88, 0.12, 0.01, 0.75])
ax_dif_pa.set_title(r"Degree of difference $\eta$ of the polarization from the 2 pipelines in the cut")
im_dif_pa = ax_dif_pa.imshow(eta, vmin=0., vmax=2., origin='lower', cmap='bwr_r', label=r"$\eta$ between this pipeline and Kishimoto's")
cbar_dif_pa = plt.colorbar(im_dif_pa, cax=cbar_ax_dif_pa,
label=r"$\eta = \frac{2 \left|\theta_P^K-\theta_P^S\right|}{\sqrt{{\sigma^K_{\theta_P}}^2+{\sigma^S_{\theta_P}}^2}}$")
ax_dif_pa.coords[0].set_axislabel('Right Ascension (J2000)')
ax_dif_pa.coords[1].set_axislabel('Declination (J2000)')
fig_dif_pa.savefig(path_join(root_dir_plot_S, "NGC1068_K_polang_diff.png"), bbox_inches="tight", dpi=300)
# display both polarization maps to check consistency
# plt.rcParams.update({'font.size': 15})
fig = plt.figure(num="Polarization maps comparison", figsize=(10, 10))
ax = fig.add_subplot(111, projection=wcs)
fig.subplots_adjust(right=0.85)
cbar_ax = fig.add_axes([0.88, 0.12, 0.01, 0.75])
for d in [data_S, data_K]:
d['X'], d['Y'] = np.meshgrid(np.arange(d['I'].shape[1]), np.arange(d['I'].shape[0]))
d['xy_U'], d['xy_V'] = np.where(d['mask'], d['d_P']*np.cos(np.pi/2.+d['PA']), np.nan), np.where(d['mask'], d['d_P']*np.sin(np.pi/2.+d['PA']), np.nan)
im0 = ax.imshow(data_S['I']*convert_flux, norm=LogNorm(data_S['I'][data_S['I'] > 0].min()*convert_flux, data_S['I']
[data_S['I'] > 0].max()*convert_flux), origin='lower', cmap='gray', label=r"$I_{STOKES}$ through this pipeline")
quiv0 = ax.quiver(data_S['X'], data_S['Y'], data_S['xy_U'], data_S['xy_V'], units='xy', angles='uv', scale=0.5, scale_units='xy',
pivot='mid', headwidth=0., headlength=0., headaxislength=0., width=0.2, color='b', alpha=0.75, label="PA through this pipeline")
quiv1 = ax.quiver(data_K['X'], data_K['Y'], data_K['xy_U'], data_K['xy_V'], units='xy', angles='uv', scale=0.5, scale_units='xy',
pivot='mid', headwidth=0., headlength=0., headaxislength=0., width=0.1, color='r', alpha=0.75, label="PA through Kishimoto's pipeline")
ax.set_title(r"$SNR_P \geq$ "+str(SNRi_cut)+r"$\; & \; SNR_I \geq $"+str(SNRp_cut))
# ax.coords.grid(True, color='white', ls='dotted', alpha=0.5)
ax.coords[0].set_axislabel('Right Ascension (J2000)')
ax.coords[0].set_axislabel_position('b')
ax.coords[0].set_ticklabel_position('b')
ax.coords[1].set_axislabel('Declination (J2000)')
ax.coords[1].set_axislabel_position('l')
ax.coords[1].set_ticklabel_position('l')
# ax.axis('equal')
cbar = plt.colorbar(im0, cax=cbar_ax, label=r"$F_{\lambda}$ [$ergs \cdot cm^{-2} \cdot s^{-1} \cdot \AA^{-1}$]")
ax.legend(loc='upper right')
fig.savefig(path_join(root_dir_plot_S, "NGC1068_K_comparison.png"), bbox_inches="tight", dpi=300)
# compute integrated polarization parameters on a specific cut
for d in [data_S, data_K]:
d['I_dil'] = np.sum(d['I'][d['mask']])
d['sI_dil'] = np.sqrt(np.sum(d['sI'][d['mask']]**2))
d['Q_dil'] = np.sum(d['Q'][d['mask']])
d['sQ_dil'] = np.sqrt(np.sum(d['sQ'][d['mask']]**2))
d['U_dil'] = np.sum(d['U'][d['mask']])
d['sU_dil'] = np.sqrt(np.sum(d['sU'][d['mask']]**2))
d['P_dil'] = np.sqrt(d['Q_dil']**2+d['U_dil']**2)/d['I_dil']
d['sP_dil'] = np.sqrt((d['Q_dil']**2*d['sQ_dil']**2+d['U_dil']**2*d['sU_dil']**2)/(d['Q_dil']**2+d['U_dil']**2) +
((d['Q_dil']/d['I_dil'])**2+(d['U_dil']/d['I_dil'])**2)*d['sI_dil']**2)/d['I_dil']
d['d_P_dil'] = np.sqrt(d['P_dil']**2-d['sP_dil']**2)
d['PA_dil'] = princ_angle((90./np.pi)*np.arctan2(d['U_dil'], d['Q_dil']))
d['sPA_dil'] = princ_angle((90./(np.pi*(d['Q_dil']**2+d['U_dil']**2)))*np.sqrt(d['Q_dil']**2*d['sU_dil']**2+d['U_dil']**2*d['sU_dil']**2))
print('From this pipeline :\n', "P = {0:.2f} ± {1:.2f} %\n".format(
data_S['d_P_dil']*100., data_S['sP_dil']*100.), "PA = {0:.2f} ± {1:.2f} °".format(data_S['PA_dil'], data_S['sPA_dil']))
print("From Kishimoto's pipeline :\n", "P = {0:.2f} ± {1:.2f} %\n".format(
data_K['d_P_dil']*100., data_K['sP_dil']*100.), "PA = {0:.2f} ± {1:.2f} °".format(data_K['PA_dil'], data_K['sPA_dil']))
# compare different types of error
print("This pipeline : average sI/I={0:.2f} ; sQ/Q={1:.2f} ; sU/U={2:.2f} ; sP/P={3:.2f}".format(np.mean(data_S['sI'][data_S['mask']]/data_S['I'][data_S['mask']]), np.mean(
data_S['sQ'][data_S['mask']]/data_S['Q'][data_S['mask']]), np.mean(data_S['sU'][data_S['mask']]/data_S['U'][data_S['mask']]), np.mean(data_S['sP'][data_S['mask']]/data_S['P'][data_S['mask']])))
print("Kishimoto's pipeline : average sI/I={0:.2f} ; sQ/Q={1:.2f} ; sU/U={2:.2f} ; sP/P={3:.2f}".format(np.mean(data_K['sI'][data_S['mask']]/data_K['I'][data_S['mask']]), np.mean(
data_K['sQ'][data_S['mask']]/data_K['Q'][data_S['mask']]), np.mean(data_K['sU'][data_S['mask']]/data_K['U'][data_S['mask']]), np.mean(data_K['sP'][data_S['mask']]/data_K['P'][data_S['mask']])))
for d, i in zip(['I', 'Q', 'U', 'P', 'PA', 'sI', 'sQ', 'sU', 'sP', 'sPA'], [0, 1, 2, 5, 8, (3, 0, 0), (3, 1, 1), (3, 2, 2), 6, 9]):
data_K[d] = np.loadtxt(path_join(root_dir_K, d+'.txt'))
with fits.open(path_join(root_dir_data_S, filename_S)) as f:
if not type(i) is int:
data_S[d] = np.sqrt(f[i[0]].data[i[1], i[2]])
else:
data_S[d] = f[i].data
if i == 0:
header = f[i].header
# from Kishimoto's pipeline : IQU_dir, IQU_shift, IQU_stat, IQU_trans
# from my pipeline : raw_bg, raw_flat, raw_psf, raw_shift, raw_wav, IQU_dir
# but errors from my pipeline are propagated all along, how to compare then ?
plt.show()

47
package/src/get_cdelt.py Executable file
View File

@@ -0,0 +1,47 @@
#!/usr/bin/python
def main(infiles=None):
"""
Retrieve native spatial resolution from given observation.
"""
from os.path import join as path_join
from warnings import catch_warnings, filterwarnings
from astropy.io.fits import getheader
from astropy.wcs import WCS, FITSFixedWarning
from numpy.linalg import eig
if infiles is None:
print("Usage: \"python get_cdelt.py -f infiles\"")
return 1
prod = [["/".join(filepath.split('/')[:-1]), filepath.split('/')[-1]] for filepath in infiles]
data_folder = prod[0][0]
infiles = [p[1] for p in prod]
cdelt = {}
size = {}
for currfile in infiles:
with catch_warnings():
filterwarnings('ignore', message="'datfix' made the change", category=FITSFixedWarning)
wcs = WCS(getheader(path_join(data_folder, currfile))).celestial
key = currfile[:-5]
size[key] = wcs.array_shape
if wcs.wcs.has_cd():
cdelt[key] = eig(wcs.wcs.cd)[0]*3600.
else:
cdelt[key] = wcs.wcs.cdelt*3600.
print("Image name, native resolution in arcsec and shape")
for currfile in infiles:
key = currfile[:-5]
print(key, cdelt[key], size[key])
return 0
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Query MAST for target products')
parser.add_argument('-f', '--files', metavar='path', required=False, nargs='*', help='the full or relative path to the data products', default=None)
args = parser.parse_args()
exitcode = main(infiles=args.files)