# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log
from astropy.io import fits
import numpy as np
from sofia_redux.toolkit.utilities.fits import add_history_wrap, kref, hdinsert
from sofia_redux.toolkit.image.adjust import rotate, register_image
from sofia_redux.instruments.forcast.getpar import getpar
from sofia_redux.instruments.forcast.imgshift_header import imgshift_header
from sofia_redux.instruments.forcast.peakfind import peakfind
from sofia_redux.instruments.forcast.shift import shift
addhist = add_history_wrap('Register')
__all__ = ['coadd_centroid', 'coadd_correlation', 'coadd_header',
'coadd_user', 'register']
def headererror(header, msg):
addhist(header, msg)
log.error(msg)
[docs]
def coadd_centroid(data, reference, header=None, variance=None,
crpix=None, border=0, rot_angle=None, missing=np.nan,
shift_order=None, rotation_order=1, get_offsets=False):
"""
Shift an image for coadding using a centroid algorithm
Steps are:
1. Rotate data and variance wrt reference (if rot_angle
is provided)
2. Run peakfind on data wrt reference on 1 peak
3. Return rotated data (if it was rotated) should peakfind
fail.
4. Shift image by offsets calculated wrt reference using
peakfind.
5. Record the shifts in the header with the COADX0, COADY0
keywords.
Parameters
----------
data : numpy.ndarray
input image array (nrow, ncol)
reference : numpy.ndarray
input reference array to compare data to (nrow, ncol)
header : astropy.fits.header.Header
FITS header of input data
border : int
exclude `border` pixels from the outside edges when
attempting to find peaks in the image and reference
shift_order : int, optional
Order of interpolation for the shift. The shift order must be
between 0 and 5, with a default of 3
rotation_order: int, optional
Order of interpolaton for the rotation. The rotation order
must be between 0 and 5, with a default of 3
variance : numpy.ndarray
propagate provided variance if set (nrow, ncol)
rot_angle : float
Indicates that the data image is rotated wrt the reference
image by this amount. The data image (and variance if supplied)
will be rotated clockwise by `rot_angle` degrees
crpix : list, optional
If provided, will be updated to the new [CRPIX1, CRPIX2]
values following the shift
get_offsets : bool, optional
Do not shift the image. Only return the (x, y) offset
missing : float, optional
Value to represent missing data during shift
Returns
-------
2-tuple
numpy.ndarray : The shifted image (nrow, ncol)
numpy.ndarray : The shifted variance (nrow, ncol)
"""
if not isinstance(header, fits.header.Header):
header = fits.header.Header()
addhist(header, 'Created header')
var = variance.copy() if isinstance(variance, np.ndarray) else None
if not isinstance(data, np.ndarray) or len(data.shape) != 2:
headererror(header, "Coadd not applied (invalid data)")
return
dovar = isinstance(var, np.ndarray) and var.shape == data.shape
var = None if not dovar else var
if variance is not None and not dovar:
addhist(header, 'Not propagating variance (Invalid variance)')
log.warning("invalid variance")
if not isinstance(reference, np.ndarray) or reference.shape != data.shape:
headererror(header, "Coadd not applied (invalid reference)")
return
image = data.copy()
ref = reference.copy()
if border > 0:
if border * 2 >= data.shape[0] or border * 2 >= data.shape[1]:
headererror(header, 'Coadd not applied (invalid border)')
return
image = image[border: -border, border: -border]
ref = ref[border: -border, border: -border]
rot_data = data.copy()
rot_var = var.copy() if dovar else None
if rot_angle is not None and abs(rot_angle % 360) != 0:
image = rotate(image, rot_angle, order=rotation_order)
rot_data = rotate(data, rot_angle, order=rotation_order)
if dovar:
rot_var = rotate(var, rot_angle, order=rotation_order)
fwhm = getpar(header, 'MFWHM', dtype=float, default=4.5,
comment='fwhm for peakfind algorithm')
shift_coords = peakfind(
ref, image, fwhm=fwhm, npeaks=1, positive=True, refine=True,
coordinates=True, silent=True)
if len(shift_coords) == 0:
headererror(header, 'Coadd not applied (peakfind failed)')
return
offset = shift_coords[0] # there should only be one peak
if get_offsets:
return offset
addhist(header, 'Used centroid registration to determine shifts')
addhist(header, 'X, Y shift of image is (%f, %f)' %
(offset[0], offset[1]))
log.debug("calculated X,Y shift: (%s, %s)" % (offset[0], offset[1]))
hdinsert(header, 'COADX0', offset[0], refkey=kref,
comment='X Shift during coadd process')
hdinsert(header, 'COADY0', offset[1], refkey=kref,
comment='Y shift during coadd process')
return shift(rot_data, offset, variance=rot_var, missing=missing,
crpix=crpix, order=shift_order)
[docs]
def coadd_correlation(data, reference, header=None, variance=None,
border=0, rot_angle=None, xydither=None, crpix=None,
shift_order=None, rotation_order=1, upsample=100,
get_offsets=False, missing=np.nan):
"""
Shift an image for coaddition using a correlation algorithm
Parameters
----------
data : numpy.ndarray
Data to be shifted (nrow, ncol)
reference : numpy.ndarray
Data to be compared with (nrow, ncol)
header : The fits header of the new input data file, optional
variance : None or numpy.ndarray, optional
Propagate the provided variance (nrow, ncol)
border : int, optional
Remove `border` pixels from the edge of the image before
correlating
shift_order : int, optional
Order of interpolation for the shift. The shift order must be
between 0 and 5, with a default of 3
rotation_order: int, optional
Order of interpolaton for the rotation. The rotation order
must be between 0 and 5, with a default of 3
xydither : array_like, optional
Initial x,y shift estimates. If not set, default is 0, 0
crpix : array-like, optional
If provided, will be updated to match image shift_image [x, y]
upsample : int, optional
Data will be registered to within 1 / `upsample` of a pixel
rot_angle : float, optional
Indicates that the data image is rotated wrt the reference
image by this amount. The data image (and variance if supplied)
will be rotated clockwise by `rot_angle` degrees
get_offsets : bool, optional
Only return the (x, y) shift. Do not shift the data.
missing : float, optional
Value to represent missing data during shift
Returns
-------
numpy.ndarray, numpy.ndarray
The shifted image (nrow, ncol)
The shifted variance (nrow, ncol)
"""
if not isinstance(header, fits.header.Header):
header = fits.header.Header()
addhist(header, 'Created header')
var = variance.copy() if isinstance(variance, np.ndarray) else None
if not isinstance(data, np.ndarray) or len(data.shape) != 2:
headererror(header, "Coadd not applied (invalid data)")
return
dovar = isinstance(var, np.ndarray) and var.shape == data.shape
var = None if not dovar else var
if variance is not None and not dovar:
addhist(header, 'Not propagating variance (Invalid variance)')
log.warning("Variance must match data: %s" % type(variance))
if not isinstance(reference, np.ndarray) or reference.shape != data.shape:
headererror(header, "Coadd not applied (invalid reference)")
return
image = data.copy()
ref = reference.copy()
if border > 0:
if border * 2 >= data.shape[0] or border * 2 >= data.shape[1]:
headererror(header, 'Coadd not applied (invalid border)')
return
image = image[border: -border, border: -border]
ref = ref[border: -border, border: -border]
rot_data = data.copy()
rot_var = var.copy() if dovar else None
if rot_angle is not None and abs(rot_angle % 360) != 0:
image = rotate(image, rot_angle, order=rotation_order)
rot_data = rotate(data, rot_angle, order=rotation_order)
if dovar:
rot_var = rotate(var, rot_angle, order=rotation_order)
xyshift = abs(getpar(header, 'XYSHIFT', dtype=float, default=40.0,
comment='maximum shift to be applied +/-'))
offset = register_image(
image, ref, upsample=upsample, maxshift=xyshift, shift0=xydither)
log.debug("calculated X,Y shift: (%s, %s)" % (offset[0], offset[1]))
if np.isnan(offset).any() or np.array(abs(offset) > xyshift).any():
headererror(header, "Coadd not applied (exceeded maxshift)")
return
if get_offsets:
return offset
addhist(header, 'Used correlation registration to determine shifts')
addhist(header, 'X, Y shift of image is (%f, %f)' % (offset[0], offset[1]))
hdinsert(header, 'COADX0', offset[0], refkey=kref,
comment='X shift during coadd process')
hdinsert(header, 'COADY0', offset[1], refkey=kref,
comment='Y shift during coadd process')
return shift(rot_data, offset, variance=rot_var, missing=missing,
crpix=crpix, order=shift_order)
[docs]
def coadd_user(data, reference, position, header=None, variance=None,
crpix=None, get_offsets=False, shift_order=None,
missing=np.nan):
"""
Shift an image for coaddition using header information
Parameters
----------
data : numpy.ndarray
Data to be shifted (nrow, ncol)
reference : array_like of float
2-element (x, y) reference position in pixels
position : array_like of float
2-element (x, y) user position in pixels
header : The fits header of the new input data file, optional
variance : None or numpy.ndarray, optional
Propagate the provided variance (nrow, ncol)
shift_order : int, optional
Order of interpolation for the shift. The shift order must be
between 0 and 5, with a default of 3
crpix : array-like, optional
If provided, will be updated to match image shift_image [x, y]
get_offsets : bool, optional
Only return the (x, y) shift. Do not shift the data.
missing : float, optional
Value to represent missing data during shift
Returns
-------
2-tuple
The shifted image (nrow, ncol)
The shifted variance (nrow, ncol)
"""
if not hasattr(reference, '__len__') or len(reference) != 2:
headererror(header, 'Invalid user reference position')
return
elif not hasattr(position, '__len__') or len(position) != 2:
headererror(header, 'Invalid user position')
return
offset = np.array(reference) - np.array(position)
maxshift = getpar(header, 'MAXREGSH', dtype=int, default=2000,
update_header=False)
absoff = np.abs(offset)
if np.array(absoff > maxshift).any():
log.warning('Shift (%f, %f) larger than %i. No shift applied' %
(offset[0], offset[1], maxshift))
offset *= 0
offset[absoff < 1e-4] = 0
if get_offsets:
return offset
if header is not None:
addhist(header, 'Applied user shifts')
addhist(header, 'X, Y shift of image is (%f, %f)' %
(offset[0], offset[1]))
hdinsert(header, 'COADX0', offset[0], refkey=kref,
comment='X shift during coadd process')
hdinsert(header, 'COADY0', offset[1], refkey=kref,
comment='Y shift during coadd process')
return shift(data, offset, variance=variance, missing=missing,
crpix=crpix, order=shift_order)
[docs]
def register(data, header, reference=None, variance=None, crpix=None,
position=None, get_offsets=False, missing=np.nan,
algorithm=None):
"""
Use dither data to shift_image input image to a reference image.
This function shifts dithered observations of the same object into
the same pixel coordinate system, in preparation for coaddition
of the images. No rotation or scaling is performed. The method
for determining the shift_image is read from the configuration file.
In the configuration file, if CORCOADD is set to to CENTROID, then a
centroiding algorithm is used to determine the shift_image. If CORCOADD is
XCOR, a cross-correlation algorithm is used. If CORCOADD is HEADER,
ir no reference data is provided, then dither information from the
FITS header is used to determine the shift_image. If CORCOADD is NOSHIFT,
then no shift_image is performed. If the centroiding algorithm is selected
and it fails for any reason, then a header shift_image algorithm is used
instead. If a reference source position and new source position are
provided in <parameter: reference> and <parameter: position>, they will
be used to determine the shift_image, regardless of the value of CORCOADD.
The order of the interpolation used to shift_image the data is determined
by the SHIFTORD keyword in the configuration file. SHIFTORD=1 or 3 will
interpolate to perform sub-pixel shifts; SHIFTORD=0 will shift_image by
integer pixels (no interpolation).
Parameters
----------
data : numpy.ndarray
The image to register (nrow, ncol)
header : astropy.io.fits.header.Header
The FITS header of the input data file
reference : numpy.ndarray or 2-tuple, optional
The reference image or data to register to. May be an image
array (for centroiding or cross-correlation) or a source position
(x, y). If a reference source position is provided in REFDATA,
a new source position must be provided in <parameter: position>.
If <parameter: reference> is not specified, and the algorithm is not
NOSHIFT, then header data will be used to register the image.
variance : numpy.ndarray, optional
Variance array (nrow, ncol) to update in parallel with the
data array.
crpix : array_like, optional
2-element (x, y) array to update
position : 2-tuple, optional
Position (x, y) to register to a reference position provided
in <parameter: reference>, used with the interactive USER registration
algorithm.
get_offsets : bool, optional
If True, only return the (x, y) shift. Headers will still be updated
missing : float, optional
Value to represent missing data during shift
algorithm : str, optional
Algorithm to use when registering images. Default is to read from
the configuration.
Returns
-------
2-tuple
numpy.ndarray : Registered image array (nrow, ncol)
numpy.ndarray : Registered variance array (nrow, ncol)
"""
if not isinstance(header, fits.header.Header):
if header is None:
header = fits.header.Header()
addhist(header, "Created header")
else:
log.error("Coadd failed (invalid header)")
return
if not isinstance(data, np.ndarray) or len(data.shape) != 2:
headererror(header, "Coadd failed (invalid data)")
return
var = variance.copy() if isinstance(variance, np.ndarray) else None
dovar = var is not None and var.shape == data.shape
if variance is not None and not dovar:
headererror(header, "Variance not propagated (invalid variance)")
if algorithm is None:
algorithm = getpar(header, 'CORCOADD', default='HEADER')
if position is not None:
algorithm = 'USER'
elif reference is None and algorithm != 'NOSHIFT':
algorithm = 'HEADER'
if algorithm == 'XCOR':
s = imgshift_header(header, chop=False, nod=False)
result = coadd_correlation(
data, reference, header=header, variance=var, missing=missing,
xydither=(s['ditherx'], s['dithery']), get_offsets=get_offsets,
crpix=crpix)
elif algorithm == 'CENTROID':
result = coadd_centroid(
data, reference, header=header, variance=var, missing=missing,
get_offsets=get_offsets)
elif algorithm == 'HEADER':
log.debug('Using header shift to register frame')
result = coadd_header(data, header, variance=var, missing=missing,
get_offsets=get_offsets, crpix=crpix)
elif algorithm == 'USER':
result = coadd_user(
data, reference, position, header=header, missing=missing,
variance=var, get_offsets=get_offsets, crpix=crpix)
elif algorithm == 'NOSHIFT':
log.debug("No shift applied")
addhist(header, "No shift applied")
if get_offsets:
result = (0., 0.)
else:
result = data.copy(), var
else:
headererror(header, 'Unknown registration algorithm: %s' %
repr(algorithm))
result = None
if result is None:
headererror(header, 'Coadd registration failed')
return result