Source code for sofia_redux.instruments.forcast.peakfind

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from warnings import simplefilter, catch_warnings

from astropy import log
from astropy.convolution import Gaussian2DKernel, convolve_fft
from astropy.modeling import models, fitting
from astropy.stats import gaussian_fwhm_to_sigma
from astropy.table import Table
from astropy.utils.exceptions import AstropyWarning
import numpy as np
from pandas import DataFrame
from photutils.detection import DAOStarFinder
from scipy.signal import correlate2d

__all__ = ['peakfind', 'PeakFinder']


[docs] class PeakFinder(object): """Configure and run peak finding algorithm.""" def __init__(self, image, reference=None, npeaks=4, fwhm=4.5, sharplo=0.2, sharphi=1.0, roundlo=-0.75, roundhi=0.75, silent=False, maxiter=1000, epsilon=5, eps=1e-7, ncut=30, chopnoddist=None, refine=True, positive=False, smooth=True): self.image = image self.reference = reference self.npeaks = npeaks self.fwhm = fwhm self.gfit_image = None self.gfit_reference = None self.sharplo = sharplo self.sharphi = sharphi self.roundlo = roundlo self.roundhi = roundhi self.iteration = 0 self.maxiter = maxiter self.epsilon = epsilon self.eps = eps self.ncut = ncut self.positive = positive self.smooth = smooth self.refine = refine self.silent = silent self.image_table = Table() self.reference_table = Table() self.current = None self.frame = DataFrame() self.threshold = None try: self.chopdist = chopnoddist[0] self.noddist = chopnoddist[1] except TypeError: self.chopdist = None self.noddist = None @property def sigma(self): return self.fwhm * gaussian_fwhm_to_sigma
[docs] def print(self, message): if not self.silent: log.info(message)
[docs] def chopnod_sort(self, table): """ Select peaks that follow a particular pattern for merge The table passed in will be cut in place. Parameters ---------- table : astropy.table.Table """ if not isinstance(table, Table): return elif None in [self.chopdist, self.noddist]: return elif 'xcentroid' not in table.columns or \ 'ycentroid' not in table.columns: return dist = np.sqrt((self.chopdist ** 2) + (self.noddist ** 2)) x0, y0 = table['xcentroid'], table['ycentroid'] valid = [False] * len(table) for idx, row in enumerate(table): dx = x0 - row['xcentroid'] dy = y0 - row['ycentroid'] dr = np.sqrt((dx ** 2) + (dy ** 2)) dchop = abs(dr - self.chopdist) dnod = abs(dr - self.noddist) dchopnod = abs(dr - dist) ok = (np.array([dchop, dnod, dchopnod]) < self.epsilon) if ok.astype(int).sum() >= 2: valid[idx] = True table = table[valid]
[docs] def search_peaks(self, image): """Initial search for peaks in the image Parameters ---------- image : numpy.ndarray Returns ------- astropy.table.Table """ table = Table() if not isinstance(image, np.ndarray): return table search_image = image if self.smooth: with catch_warnings(): simplefilter('ignore') search_image = convolve_fft( search_image, Gaussian2DKernel(self.sigma), normalize_kernel=True, preserve_nan=True) # always replace NaNs with median level: DAOStarFinder # won't find sources with NaN on top nval = np.isnan(search_image) search_image[nval] = np.median(search_image[~nval]) # always take absolute value for fitting purposes search_image = abs(search_image) threshold = np.array([np.nanmin(search_image), np.nanmax(search_image)]) threshold *= [0.9, 1.1] if threshold[0] < 0: # pragma: no cover threshold[0] = 0. self.iteration = 0 while self.iteration < self.maxiter: self.iteration += 1 with catch_warnings(): simplefilter('ignore', AstropyWarning) self.threshold = threshold.mean() finder = DAOStarFinder( self.threshold, self.fwhm, sharplo=self.sharplo, sharphi=self.sharphi, roundlo=self.roundlo, roundhi=self.roundhi) table = finder.find_stars(search_image) self.chopnod_sort(table) if self.refine and self.positive: self.refine_table(image, table) if not table: nfound = 0 else: nfound = len(table) if abs(threshold[0] - threshold[1]) < self.eps and ( nfound != self.npeaks): self.print('Min/max interval is null, breaking loop at ' 'iteration #%s' % self.iteration) return table elif nfound < self.npeaks: threshold[1] = self.threshold elif nfound > self.npeaks: threshold[0] = self.threshold else: return table else: return table
[docs] def refine_table(self, image, table): """ Refine table contents with a fit on the image The table is modified in place. Parameters ---------- image : np.ndarray table : astropy.table.Table """ if table is None or len(table) == 0 or \ not isinstance(image, np.ndarray): return x0, y0 = np.mgrid[-self.ncut:self.ncut, -self.ncut:self.ncut] model_g = models.Gaussian2D( 1.0, self.ncut, self.ncut, x_stddev=self.sigma, y_stddev=self.sigma) psf = np.zeros(x0.shape) psf = model_g.render(psf) ymax = image.shape[0] - self.ncut xmax = image.shape[1] - self.ncut remove_rows = [] for idx, row in enumerate(table): x, y = row['xcentroid'], row['ycentroid'] if (self.ncut < x < xmax) and (self.ncut < y < ymax): bxmin = int(np.floor(x - self.ncut)) bxmax = int(np.ceil(x + self.ncut)) bymin = int(np.floor(y - self.ncut)) bymax = int(np.ceil(y + self.ncut)) box = image[bymin:bymax, bxmin:bxmax].copy() g_init = models.Gaussian2D(row['flux'], x, y, x_stddev=self.sigma, y_stddev=self.sigma) flip = np.nanmean(correlate2d(box, psf)) < 0 box *= -1 if flip else 1 fitter = fitting.LevMarLSQFitter() by, bx = np.mgrid[bymin:bymax, bxmin:bxmax] nn = ~np.isnan(box) g = fitter(g_init, bx[nn], by[nn], box[nn]) # catch fit failures - just skip refinement in this case try: failure = (g.fit_info['ierr'] not in [1, 2, 3, 4]) except (AttributeError, KeyError): # pragma: no cover failure = False bad = not np.all( np.isfinite([g.x_mean.value, g.y_mean.value, g.amplitude.value])) if failure or bad: # pragma: no cover # ignore bad fits continue if flip: g.amplitude *= -1 row['xcentroid'] = g.x_mean.value row['ycentroid'] = g.y_mean.value row['flux'] = g.amplitude.value if self.positive: if row['flux'] < 0 or row['peak'] < 0: remove_rows.append(idx) for idx in sorted(remove_rows, reverse=True): table.remove_row(idx)
[docs] def findpeaks(self): columns = ['id', 'xcentroid', 'ycentroid', 'sharpness', 'roundness1', 'roundness2', 'npix', 'sky', 'peak', 'flux', 'mag'] self.frame = DataFrame(index=range(self.npeaks), columns=columns) self.frame['id'] = self.frame['id'].fillna(-1) self.frame = self.frame.fillna(np.nan) tables = [] for attr in ['reference', 'image']: image = getattr(self, attr) table_name = attr + '_table' table = Table() setattr(self, table_name, table) if not isinstance(image, np.ndarray) or (np.isnan(image)).all(): continue table = self.search_peaks(image) if not isinstance(table, Table) or len(table) == 0: self.print("Could not create %s table" % attr) return if self.refine and not self.positive: self.refine_table(image, table) tables.append(table) self.print('Number of loops run: %s/%s' % (self.iteration, self.maxiter)) self.print('Peaks found: %s/%s' % (len(table), self.npeaks)) self.print("Threshold used: %s" % self.threshold) if len(tables) == 0: log.error("No peaks could be found") return if len(tables) == 2: if len(tables[0]) != len(tables[1]): # pragma: no cover # edge case of peak finding failure log.error("Matching peaks could not be found") return dtable = tables[1].copy() dtable['xcentroid'] -= tables[0]['xcentroid'] dtable['ycentroid'] -= tables[0]['ycentroid'] else: dtable = tables[0].copy() dtable_frame = dtable.to_pandas() self.frame.update(dtable_frame) # try to keep the dtypes from the table common_columns = self.frame.columns.intersection(dtable_frame.columns) fluxcol = 'fit_amplitude' if self.refine else 'flux' self.frame = (self.frame .astype(dtable_frame[common_columns].dtypes, errors='ignore') .rename(columns={ 'xcentroid': 'x', 'ycentroid': 'y', 'flux': fluxcol})) cols = ['x', 'y', 'sharpness', 'roundness2', 'peak', fluxcol] self.print('\n' + self.frame[cols].to_string( header=[c.upper() for c in cols]))
[docs] def peakfind(coadded, newimage=None, refine=True, npeaks=4, fwhm=4.5, sharplo=0.2, sharphi=1.0, roundlo=-0.75, roundhi=0.75, silent=False, maxiter=1000, epsilon=5, eps=1e-7, ncut=30, chopnoddist=None, positive=False, smooth=True, return_object=False, coordinates=False): """ Find peaks (stars) in FORCAST images Identifies the desired number of peaks in the image using the DAO search algorithm followed by optional (default=True) Levenberg- Marquardt least squares fitting of a gaussian psf to refine the fit. Parameters ---------- coadded : numpy.ndarray Reference image to find peak positions newimage : numpy.ndarray, optional Secondary image to compare to coadded. If provided, the return value will be the shifts required to move the sources in newimage onto the sources in coadded. If not provided, the return value is the positions of the peaks in coadded refine : bool, optional If set, the x, y coordinates are fine tuned by fitting a Gaussian to the profile of each found center npeaks : int, optional Number of peaks to find; usually determined from the chop/nod mode fwhm : float, optional Expected FWHM in pixels roundlo : float, optional The lower bound on roundness for object detection roundhi : float, optional The upper bound on roundness for object detection sharplo : float, optional The lower bound on sharpness for object detection sharphi : float, optional The upper bound on sharpness for object detection silent : bool, optional If set, output will be suppressed maxiter : int, optional Maximum number of iterations for iteratefind epsilon : int or float, optional Maximum nod/chop deviation for exclusion in chopnod_sort eps : float, optional Precision to terminate iteration ncut : int, optional Positive integer cutout region used for Gaussian fitting (pixels) chopnoddist : (list or tuple) of float, optional If provided, will be used to identify the expected positions of the peaks and prioritize those that are nearest to where they should be. Format is [chop distance, nod distance] positive : bool, optional If set, only positive peaks will be returned. smooth : bool, optional If True, the data will be smoothed before searching for peaks. return_object : bool, optional If set, return the PeakFinder object rather than the DataFrame coordinates : bool, optional If set, return tuples of (x, y) coordinates as a list Returns ------- pandas.DataFrame or PeakFinder or list The default DataFrame represents either shift_image distances to move newimage on top of coadded, or positions of the peaks in coadded if newimage was not supplied as an input parameter. The PeakFinder object contains lots of useful little attributes such as the fits, tables and models of coadded and newimage. If coordinates=True, a list of (x, y) coordinates will be returned. """ # If newimage is input then we calculate the peaks for that image. # If not, we set x and y to 0. If newimage is input then we will # return the shift_image, otherwise we return the peak positions. findpeak = PeakFinder(coadded, reference=newimage, npeaks=npeaks, fwhm=fwhm, sharplo=sharplo, sharphi=sharphi, roundlo=roundlo, roundhi=roundhi, silent=silent, maxiter=maxiter, epsilon=epsilon, eps=eps, ncut=ncut, chopnoddist=chopnoddist, refine=refine, positive=positive, smooth=smooth) findpeak.findpeaks() if return_object: return findpeak else: if coordinates: if isinstance(findpeak.frame, DataFrame): result = [] if 'x' not in findpeak.frame: return result for _, row in findpeak.frame.iterrows(): result.append((row['x'], row['y'])) return result else: return findpeak.frame