Source code for sofia_redux.instruments.flitecam.mkflat

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from astropy.convolution import Gaussian2DKernel, convolve
from astropy.io import fits
from astropy.stats import gaussian_fwhm_to_sigma
import numpy as np
from photutils.segmentation import detect_sources
from photutils.segmentation import detect_threshold

from sofia_redux.instruments.forcast.hdmerge import hdmerge
from sofia_redux.toolkit.image.combine import combine_images
from sofia_redux.toolkit.utilities.fits import hdinsert, set_log_level

__all__ = ['mkflat']


[docs] def mkflat(infiles, method='median', weighted=True, robust=True, sigma=5, maxiters=None, psf_fwhm=6.0, obj_sigma=2.0): """ Make a flat from dithered sky images. The process is: - Directly combine all images into a draft flat. - Divide input images by the draft flat. - Use the resulting gain-corrected images to identify and mask any sources. - Scale all images to the median value of all the masked data. - Combine scaled, masked images into a final flat, propagating errors. Parameters ---------- infiles : list of fits.HDUList Input data. Should have FLUX, ERROR, and BADMASK extensions. method : {'mean', 'median', 'sum'}, optional Combination function to use. weighted : bool, optional If True and method is 'mean', the input variance will be used to weight the mean combination. Ignored if variance is not provided. robust : bool, optional If True, the threshold and maxiters parameters will be used to reject outliers before combination. Outlier rejection is performed via `astropy.stats.sigma_clip`. sigma : float, optional The number of standard deviations for clipping; passed to sigma_clip. maxiters : int or None, optional The maximum number of clipping iterations to perform; passed to sigma_clip psf_fwhm : float, optional Expected FWHM of sources for masking. Used to smooth the image before detecting objects. obj_sigma : float, optional The number of standard deviations above the background, used in detecting sources. Returns ------- fits.HDUList Normalized flat field correction image, with FLAT, FLAT_ERROR, and FLAT_BADMASK extensions. Array dimensions match input. """ shape = infiles[0][0].data.shape flat_mask = np.zeros(shape, dtype=int) data_list, var_list, header_list = list(), list(), list() for hdul in infiles: data_list.append(hdul['FLUX'].data.copy()) var_list.append(hdul['ERROR'].data.copy() ** 2) header_list.append(hdul[0].header) # combine images for a draft flat, with high robust rejection if len(data_list) == 1: # just directly use the data as a flat flat_data, flat_var = data_list[0], var_list[0] else: draft_method = 'median' draft_flat, draft_flat_var = combine_images( data_list, variance=var_list, method=draft_method, weighted=weighted, robust=robust, sigma=obj_sigma, maxiters=maxiters) draft_flat /= np.nanmedian(draft_flat) # get kernel to smooth image by the beam gsigma = psf_fwhm * gaussian_fwhm_to_sigma gsize = int(gsigma * 2) kernel = Gaussian2DKernel(gsigma, x_size=gsize, y_size=gsize) kernel.normalize() # draft correct and mask sources in all images masked_data = [] masked_var = [] scale_to = None for i, data in enumerate(data_list): # draft gain correction with np.errstate(invalid='ignore'): gain_corr = data / draft_flat # replace any NaNs with nearby non-NaN, # for smoother source detection nanval = np.isnan(gain_corr) idx = np.where(~nanval, np.arange(nanval.shape[1]), 0) np.maximum.accumulate(idx, axis=1, out=idx) gain_corr = gain_corr[np.arange(idx.shape[0])[:, None], idx] # clipped background threshold image with low # detection threshold with set_log_level('CRITICAL'): threshold = detect_threshold(gain_corr, nsigma=obj_sigma) # detect any 5 connected pixels above the threshold convolved_data = convolve(gain_corr, kernel) segmented = detect_sources(convolved_data, threshold, npixels=5) if segmented is not None: obj_mask = (segmented.data != 0) else: obj_mask = np.full(flat_mask.shape, False) # add objects to mask if np.any(obj_mask): flat_mask[obj_mask] += 1 # masked the data mdata = data_list[i].copy() mvar = var_list[i].copy() if np.any(obj_mask): mdata[obj_mask] = np.nan mvar[obj_mask] = np.nan masked_data.append(mdata) masked_var.append(mvar) # scale data to masked median value scale_to = np.nanmedian(masked_data) for d, v in zip(masked_data, masked_var): scale_from = np.nanmedian(d) d *= scale_to / scale_from v *= (scale_to / scale_from) ** 2 # make flat from masked, scaled data flat_data, flat_var = combine_images( masked_data, variance=masked_var, method=method, weighted=weighted, robust=robust, sigma=sigma, maxiters=maxiters) # normalize flat norm = np.nanmedian(flat_data) if np.allclose(norm, 0) or not np.isfinite(norm): raise ValueError('No valid flat data') flat_data /= norm flat_err = np.sqrt(flat_var) / norm flat_header = hdmerge(header_list) hdinsert(flat_header, 'EXTNAME', 'FLAT', comment='Extension name') hdinsert(flat_header, 'PRODTYPE', 'normalized_gain', comment='Product type') hdinsert(flat_header, 'FLATNORM', norm, comment='Flat normalization value') flat = fits.HDUList([fits.PrimaryHDU(data=flat_data, header=flat_header), fits.ImageHDU(data=flat_err, name='FLAT_ERROR'), fits.ImageHDU(data=flat_mask, name='FLAT_BADMASK')]) return flat