Source code for sofia_redux.spectroscopy.extspec

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from astropy import log
import numpy as np
from sofia_redux.toolkit.fitting.polynomial import poly1d, polyfitnd
from sofia_redux.toolkit.utilities.func import nantrim
from sofia_redux.toolkit.stats import meancomb
import warnings

__all__ = ['col_subbg', 'extspec']


[docs] def col_subbg(col_arc, col_image, col_var, col_apmask, col_mask, bgorder, robust=4.0, **kwargs): """ Fit background to a single column. Parameters ---------- col_arc : numpy.ndarray of float (nrow,) spatial coordinates col_image : numpy.ndarray of float (nrow,) column image values col_var : numpy.ndarray of float (nrow,) variance of image column col_apmask : numpy.ndarray of float (nrow,) background mask where 0 indicates a background pixel col_mask : numpy.ndarray of bool (nrow,) True indicates a valid image pixel, False indicates a bad pixel. bgorder : int Order of polynomial to fit to the background robust : float Robust threshold. kwargs : dict Additional parameters to pass to polyfitnd. Returns ------- corrected_image, corrected_var, coefficients : 3-tuple of numpy.ndarray The column with background subtracted (nrow,), the corrected variance (nrow,) and the polynomial coefficients fit to the background (bgorder + 1,). """ region = np.array(np.isnan(col_apmask)) if region.sum() < (bgorder + 2): log.error("Not enough background points found.") return if bgorder == 0: # do simple robust mean for bgorder=0 data = col_image[region][col_mask[region]] var = col_var[region][col_mask[region]] bgfit, bgvar = meancomb(data, variance=var, robust=robust) bgcoeff = (bgfit,) else: # otherwise fit a polynomial bgmodel = polyfitnd(col_arc[region], col_image[region], bgorder, error=np.sqrt(col_var[region]), robust=robust, mask=col_mask[region], covar=True, model=True, **kwargs) if not bgmodel.success: log.error("Polynomial fit failed.") return bgfit, bgvar = bgmodel(col_arc, dovar=True) bgcoeff = bgmodel.coefficients return col_image - bgfit, col_var + bgvar, bgcoeff
def col_fixdata(fix, col_image, col_var, col_bits, col_mask, model_profile, scale_coeffs, threshold=5.0): """ Replace bad pixels in a single column. Parameters ---------- fix : numpy.ndarray of int Indices of data in the column to fix. col_image : numpy.ndarray of float (nrow,) column image values col_var : numpy.ndarray of float (nrow,) variance of image column col_bits : numpy.ndarray of int (nrow,) bit mask for image column col_mask : numpy.ndarray of bool (nrow,) boolean bad pixel mask for image column model_profile : numpy.ndarray of float (nrow,) spatial model for the image column scale_coeffs : array-like of float Coefficients for scaling the model to the flux. threshold : float Robust threshold for scaling the model to the variance image. """ # Replace bad pixels with the scaled model values col_image[fix] = poly1d( model_profile[fix], scale_coeffs) # Scale the profile to the variance and replace absprof = np.abs(model_profile) var_model = polyfitnd(absprof, col_var, 1, model=True, robust=threshold) col_var[fix] = var_model.stats.fit[fix] # set 2 in the bitmask col_bits[fix] += 2 # set False in the bool mask col_mask[fix] = False
[docs] def extspec(rectimg, profile=None, spatial_map=None, optimal=False, sub_background=True, fix_bad=False, bgorder=2, threshold=5.0, sum_function=None, error_function=None, verbose=False): """ Extracts spectra from a rectified spectral image. For each column (wavelength) in each order: a. (optional) Remove background using spatial regions identified in the aperture mask. b. (optional) Use the spatial model in spatial_map or profile to remove bad pixels. c. Extract the spectral value for that column using either the standard or optimal extraction algorithm. Standard Extraction: If bad pixels were identified via the spatial model, they are replaced by the value of the spatial model at that wavelength. The spectral value for a given aperture is then taken as the sum of all pixels lying on the aperture. Optimal Extraction: The spectral value for each aperture is taken by taking a mean of the column weighted by the spatial model, for pixels identified as the aperture radius in the aperture mask. Bad pixels are ignored. Parameters ---------- rectimg : dict Rectified image data with aperture definitions. The aperture mask is as produced by the `sofia_redux.spectroscopy.mkapmask` function. Structure is: order (int) -> dict image -> numpy.ndarray (ns, nw) Rectified order image. variance -> numpy.ndarray (ns, nw) Rectified variance for image. wave -> numpy.ndarray (nw,) Wavelength coordinates. spatial -> numpy.ndarray (ns,) Spatial coordinates. mask -> numpy.ndarray (ns, nw) Boolean bad pixel mask (True = good). bitmask -> numpy.ndarray (ns, nw) Bit-set mask. 1=nonlinear pixel. apmask -> numpy.ndarray (ns, nw) Aperture mask. apsign -> list of {1, -1}, optional Aperture signs for each aperture. Must match the number of apertures in the aperture mask if provided. All aperture signs are assumed positive if not provided. profile : dict, optional Median spatial profile for each order. Structure is: order (int) -> numpy.ndarray (ns,) spatial_map : dict, optional 2D spatial map for each order. If provided, `profile` is ignored. Structure is: order (int) -> numpy.ndarray (ns, nw) optimal : bool, optional If set, optimal extraction is used. sub_background : bool, optional If set, background regions identified in the aperture mask are fit and subtracted from each column. If set, `profile` or `spatial_map` must be provided. fix_bad : bool, optional If set, bad pixels will be fixed in the 2D spectral image. For standard extraction, the fixed pixels will be used to calculate the extracted flux. For optimal extraction, the fixed pixels will be ignored. bgorder : int, optional Background fitting order. threshold : float, optional Robust threshold for background fits, in number of sigma. sum_function : function, optional Function to use as sum over aperture, when optimal=False. Default is np.nansum(flux * weights). The provided function should accept one two array arguments (flux and weights). error_function : function, optional Function to use for error propagation, given variance input, when optimal=False. Default is np.sqrt(np.nansum(flux * weights)). The provided function should accept two array arguments (variance and weights). verbose : bool, optional If set, columns that fail background fit or optimal extraction will generate warnings for the log. Returns ------- spectra : dict order (int) -> dict spectra : numpy.ndarray of float (n_apertures, 4, n_spec) Each order contains a single array where: array[aperture, 0] = wavelength array[aperture, 1] = flux array[aperture, 2] = error array[aperture, 3] = bit-set mask """ # todo -- add checks for input structure use_model = False use_profile = False if optimal or fix_bad: if spatial_map is None: if profile is None: log.error("Optimal or fix_bad requires " "spatial map or median profile.") return else: use_profile = True use_model = True orders = np.unique(list(rectimg.keys())).astype(int) if use_profile: log.debug('Using median profile for extraction.') elif use_model: log.debug('Using spatial map for extraction.') else: log.debug('No profile or spatial map used for extraction.') result = {} # Loop through each order for orderi, order in enumerate(orders): image = rectimg[order]['image'] variance = rectimg[order]['variance'] wave = rectimg[order]['wave'] space = rectimg[order]['spatial'] nwave = wave.size nspace = space.size if 'mask' in rectimg[order]: mask = rectimg[order]['mask'] else: mask = np.full((nspace, nwave), True) if 'bitmask' in rectimg[order]: bitmask = rectimg[order]['bitmask'] else: bitmask = np.zeros((nspace, nwave), dtype=int) # get aperture mask, as generated by mkapmask apmask = rectimg[order]['apmask'] nap = int(np.nanmax(np.abs(apmask))) # assign aperture signs apsign = rectimg[order].get('apsign', None) if apsign is None or len(apsign) == 0: apsign = [1.0] * nap elif len(apsign) != nap: log.error('Mismatch between aperture signs and aperture mask.') return # check for background regions if sub_background and not np.any(np.isnan(apmask)): log.warning('No background regions found; ' 'not subtracting background.') sub_background = False oflux = np.full((nap, nwave), np.nan) oerr = np.full((nap, nwave), np.nan) obits = np.full((nap, nwave), 0) # initialize necessary arrays and variables scale_coeffs, model_profile, goodpix = None, None, None for wavei in range(nwave): col_image = image[:, wavei] col_mask = mask[:, wavei] col_var = variance[:, wavei] col_bits = bitmask[:, wavei] col_apmask = apmask[:, wavei].copy() if sub_background: bgresult = col_subbg( space, col_image, col_var, col_apmask, col_mask, bgorder) if bgresult is None and verbose: msg = "Background fit failed at order=%i" % order msg += " column=%i" % wavei msg += " wavelength=%f" % wave[wavei] log.warning(msg) else: col_image, col_var, c = bgresult if use_model: # Scale the profile model and identify bad pixels if use_profile: model_profile = profile[order] else: model_profile = spatial_map[order][:, wavei] scale_model = polyfitnd(model_profile, col_image, 1, model=True, robust=threshold, mask=col_mask) goodpix = scale_model.mask scale_coeffs = scale_model.coefficients # fix bad pixels in image if desired if fix_bad: fix = ~goodpix if fix.any(): col_fixdata(fix, col_image, col_var, col_bits, col_mask, model_profile, scale_coeffs, threshold) # get aperture masks in a more useful form, # avoiding float and NaN equality comparisons col_apmask[np.isnan(col_apmask)] = 0 psfmask = np.abs(col_apmask) inner_mask = np.zeros_like(col_apmask, dtype=int) inner_idx = col_apmask < 0 inner_mask[inner_idx] = np.abs(col_apmask[inner_idx]).astype(int) # Begin extraction for api in range(nap): zap = (psfmask > api) & (psfmask <= (api + 1)) sign = apsign[api] if optimal: # Optimal extraction # Enforce profile positivity pos_profile = np.clip(model_profile.copy() * sign, 0, np.inf) # Sum over PSF, accounting for pixels partially on aperture weight = psfmask[zap] - api with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) apsum = np.nansum(pos_profile[zap] * weight) if apsum == 0: pos_profile *= np.nan else: pos_profile /= apsum # Scale data values and calculate mean over # aperture radius only, ignoring fractional and bad pixels zinner = (inner_mask == (api + 1)) idx = zinner & (pos_profile != 0) & goodpix if idx.any(): scale_image = col_image[idx] / pos_profile[idx] scale_var = col_var[idx] / (pos_profile[idx] ** 2) mean, mvar = meancomb(scale_image, variance=scale_var) oflux[api, wavei] = sign * mean oerr[api, wavei] = np.sqrt(mvar) else: oflux[api, wavei] = np.nan oerr[api, wavei] = np.nan obits[api, wavei] += 8 if verbose: msg = "Optimal extraction failed at " msg += "order=%i aperture=%i " % (order, api) msg += "column=%i wavelength=%f" % (wavei, wave[wavei]) log.warning(msg) else: # Sum accounting for pixels partially on aperture weight = psfmask[zap] - api with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) if sum_function is None: oflux[api, wavei] = \ sign * np.nansum(col_image[zap] * weight) else: oflux[api, wavei] = \ sign * sum_function(col_image[zap], weight) if error_function is None: oerr[api, wavei] = \ np.sqrt(np.nansum(col_var[zap] * (weight ** 2))) else: oerr[api, wavei] = error_function(col_var[zap], weight) # set bitmask for any fixed pixels if np.any(col_bits[zap] > 1): obits[api, wavei] += 2 # Set bitmask for linearity if 1 in col_bits: obits[api, wavei] += 1 # Correct sign in image col_image[zap] *= sign # store modified image and variance back to input image[:, wavei] = col_image variance[:, wavei] = col_var # Store results nanstrip = nantrim(wave, 2) order_array = np.zeros((nap, 4, nanstrip.sum())) for api in range(nap): order_array[api, 0] = wave[nanstrip] order_array[api, 1] = oflux[api, nanstrip] order_array[api, 2] = oerr[api, nanstrip] order_array[api, 3] = np.mod(obits[api, nanstrip], 256) result[order] = order_array return result