# Licensed under a 3-clause BSD style license - see LICENSE.rst
import os
from astropy import log
from astropy.io import fits
import numpy as np
import pandas
from sofia_redux.toolkit.utilities.fits import hdinsert, kref
import sofia_redux.instruments.forcast as drip
import sofia_redux.instruments.forcast.configuration as dripconfig
from sofia_redux.instruments.forcast.getpar import getpar
__all__ = ['pinhole_defaults', 'read_pinhole_file',
'pinhole_model', 'view_model', 'distcorr_model']
[docs]
def pinhole_defaults():
# get pinhole filename with getpar
# (will load config if not already done)
filename = getpar(fits.header.Header(),
'fpinhole', default='undefined')
# may have subdirectories specified with /
# replace them OS independently.
filename = os.path.join(*filename.split('/'))
datadir = os.path.join(os.path.dirname(drip.__file__), 'data', 'pinhole')
fpinhole = os.path.join(datadir, filename)
# retrieve the rest of the values directly
# from the config
config = dripconfig.configuration
order = int(config.get('order', 4))
return {
'fpinhole': fpinhole, 'order': order
}
[docs]
def read_pinhole_file(pinhole_file):
"""
Read the pinhole file and return a dataframe
Note that 1 is subtracted from the xpos and ypos pixel
positions for consistency with Python/IDL indexing.
Parameters
----------
pinhole_file : str, optional
Path to the pinhole file. If None is supplied, will read
the default pinhole file
Returns
-------
pandas.DataFrame
contents of the pinhole file as a dataframe
"""
if not isinstance(pinhole_file, str) or not os.path.isfile(pinhole_file):
log.error("File %s does not exist" % repr(pinhole_file))
return
log.info("Reading pinhole file %s" % pinhole_file)
table = pandas.read_csv(pinhole_file, comment='#',
sep=r'\s+')
return table
[docs]
def pinhole_model(xpos, ypos, xid, yid):
"""
Generate the pinhole model from paramters.
Parameters
----------
xpos : array-like
ypos : array-like
xid : array-like
yid : array-like
Returns
-------
dict
The model parameters. Keys are 'avgdx', 'avgdy',
'angle', 'xmodel', 'ymodel'.
"""
# the FORCAST array is 256 x 256
nximg = 256
nyimg = 256
# number of pinhole points in table
nxpts = np.max(xid) + 1
nypts = np.max(yid) + 1
# Check that positions make sense (greater than 0 and lower than
# image size)
if (xpos[~np.isnan(xpos)] < 0).any() or \
(xpos[~np.isnan(xpos)] >= nximg).any():
log.error("X positions are lower than 0 or greater than %s" % nximg)
return
elif (ypos[~np.isnan(ypos)] < 0).any() or \
(ypos[~np.isnan(ypos)] >= nyimg).any():
log.error("Y positions are lower than 0 or greater than %s" % nyimg)
return
results = {}
# Rearrange pinholes into arrays
xpos_arr = np.full((nypts, nxpts), np.nan)
ypos_arr = np.full((nypts, nxpts), np.nan)
xpos_arr[yid, xid] = xpos
ypos_arr[yid, xid] = ypos
# Diff adjacent pins to get dx and dy, for pin spacing
# NaNs are propagated
dxpos = xpos_arr[:, 1:] - xpos_arr[:, :-1]
dypos = ypos_arr[1:, :] - ypos_arr[:-1, :]
# Also get the change in x position up a column, for angle
# estimation
dxlpos = xpos_arr[1:, :] - xpos_arr[:-1, :]
theta = np.arctan2(dxlpos[:, :-1], dxpos[:-1, :])
results['angle'] = float(np.nanmean(theta))
# If angle is small, it's better to just set it
# to zero
log.debug('Estimated angle: {}'.format(np.rad2deg(results['angle'])))
if np.abs(np.rad2deg(results['angle'])) < 0.9:
log.debug('Estimated angle is close to zero: '
'ignoring angle correction.')
results['angle'] = 0.0
# Average x and y separation of pin holes,
# corrected by the angle
results['avgdx'] = float(np.nanmean(dxpos)) / abs(np.cos(results['angle']))
results['avgdy'] = float(np.nanmean(dypos)) / abs(np.cos(results['angle']))
# Create model x,y coordinates
mody, modx = np.mgrid[:nypts, :nxpts].astype('float')
modx = modx * results['avgdx'] + xpos_arr[0, 0]
mody = mody * results['avgdy'] + ypos_arr[0, 0]
# Rotate model about the center of the array
xcen = nximg / 2.0 + 0.5
ycen = nyimg / 2.0 + 0.5
ang = results['angle']
xp = ((modx - xcen) * np.cos(ang)) + (mody - ycen) * np.sin(ang) + xcen
yp = ((modx - xcen) * -np.sin(ang)) + (mody - ycen) * np.cos(ang) + ycen
# Offset to match a central pinhole
distsq = (xpos_arr - xcen)**2 + (ypos_arr - ycen)**2
minpin = np.unravel_index(np.nanargmin(distsq), distsq.shape)
xp = xp - xp[minpin] + xpos_arr[minpin]
yp = yp - yp[minpin] + ypos_arr[minpin]
# Flatten arrays and remove NaNs
idx = np.logical_and(~np.isnan(xpos_arr), ~np.isnan(ypos_arr))
results['xpos'] = xpos_arr[idx].flatten()
results['ypos'] = ypos_arr[idx].flatten()
results['xmodel'] = xp[idx].flatten()
results['ymodel'] = yp[idx].flatten()
return results
[docs]
def view_model(x, y, fwhm=1.5, amplitude=3000,
write_file=None, force=False, show=False):
"""
View the pinhole model and optionally write to FITS file.
Image will effectively be convolved with a gaussian for
visibility
Parameters
----------
x : array-like
x-pixel pinhole locations
y : array-like
y-pixel pinhole locations
fwhm : float, optional
fwhm of gaussian convolution kernel (pixels)
amplitude : float, optional
Amplitude of gaussian convolution kernel
force : bool, optional
Force overwrite of `write_file` if it already exists
write_file : str, optional
Write the image in FITS format to this location
show : bool, optional
Show plot with matplotlib.
Returns
-------
None
"""
from astropy.modeling.models import Gaussian2D
from astropy.stats import gaussian_sigma_to_fwhm
nx, ny = 256, 256
yy, xx = np.mgrid[:ny, :nx]
fwhm = gaussian_sigma_to_fwhm * fwhm
gmodel = Gaussian2D(amplitude=amplitude, x_stddev=fwhm, y_stddev=fwhm)
image = np.zeros((ny, nx), dtype=float)
for gy, gx in zip(y, x):
gmodel.x_mean = gx
gmodel.y_mean = gy
image += gmodel(xx, yy)
if show: # pragma: no cover
import matplotlib.pyplot as plt
plt.ion()
plt.figure()
plt.imshow(image, origin='lower')
plt.title('Pinhole Model')
plt.show()
plt.pause(0.001)
if isinstance(write_file, str):
if os.path.isfile(write_file):
if not force:
log.warning("will not overwrite: %s" % write_file)
return
else:
os.remove(write_file)
fits.writeto(write_file, image)
[docs]
def distcorr_model(pinhole=None,
viewpin=False, basehead=None, order=None):
"""
Generate model array of pin holes base on input file
Read the positions of the pinholes and average the distance
between positions; creates a model where the pinholes are
regularly spaced. The model is then shifted and rotated in
order to match the observed pinhole mask.
Parameters
----------
pinhole : str or pandas.dataframe
File path to a text file containing a list of the positions
of the pinholes or dataframe object. Columns are
xid, yid, xpos, ypos.
viewpin : bool or str, optional
If set to True, will display the pin model.
If a string is provided, it will be interpreted
as a filename, and the model will be written to the
file instead of shown.
basehead : astropy.io.fits.header.Header, optional
Header array to update with pin model array
(PIN_MOD=[dx,dy,angle,order]).
order : int, optional
Order of distortion model. If not provided, the default
value of 4 will be used.
Notes
-----
A value of None for pinhole or order will cause default
values specified in the drip configuration file to be used.
Returns
-------
dict
model -> numpy.ndarray
N(x, y) model reference positions of shape (N, 2)
pins -> numpy.ndarray
N(x, y) pin positions of shape (N, 2)
nx, ny -> int
define number of pixels for both pins and image in x and y
dx, dy -> float
model x, y spacing
angle -> float
clockwise rotation of the model about the center
in degrees.
order -> int
order to be used if using the "polynomial" method in
sofia_redux.instruments.forcast.undistort based upon this model.
"""
defaults = pinhole_defaults()
if pinhole is None:
pinhole = defaults.get('fpinhole')
if order is None:
order = defaults.get('order')
if isinstance(pinhole, pandas.DataFrame):
table = pinhole.copy()
else:
table = read_pinhole_file(pinhole)
if not isinstance(table, pandas.DataFrame):
log.error("Invalid pinhole data")
return
xpos = table.xpos.values.copy()
ypos = table.ypos.values.copy()
xid = table.xid.values.copy()
yid = table.yid.values.copy()
positions = pinhole_model(xpos, ypos, xid, yid)
log.info("avgdx=%f, avgdy=%f, angle=%f" %
(positions['avgdx'],
positions['avgdy'],
np.rad2deg(positions['angle'])))
if viewpin:
if isinstance(viewpin, str):
fout = viewpin
show = False
else: # pragma: no cover
fout = None
show = True
view_model(positions['xmodel'], positions['ymodel'],
write_file=fout, show=show)
# Update header
if isinstance(basehead, fits.header.Header):
value = '[%f,%f,%f,%s]' % (
positions['avgdx'], positions['avgdy'],
np.rad2deg(positions['angle']), int(order))
comment = "pinhole model coeffs"
hdinsert(basehead, 'PIN_MOD', value, comment=comment, refkey=kref)
return {
'model': np.stack((positions['xmodel'], positions['ymodel']), axis=1),
'pins': np.stack((positions['xpos'], positions['ypos']), axis=1),
'dx': positions['avgdx'],
'dy': positions['avgdy'],
'nx': 256,
'ny': 256,
'order': int(order),
'angle': np.rad2deg(positions['angle'])
}