Source code for sofia_redux.instruments.exes.readraw

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import re

from astropy import log
import bottleneck as bn
import numpy as np

from sofia_redux.instruments.exes.lincor import lincor
from sofia_redux.instruments.exes.get_badpix import get_badpix
from sofia_redux.instruments.exes.utils import get_reset_dark

__all__ = ['readraw']


[docs] def readraw(data, header, do_lincor=False, algorithm=None, toss_nint=0, copy_int=True): """ Correct for nonlinearity combine individual readouts. First corrects nonlinearity for each readout frame (`exes.lincor`), then determines the readout pattern from the OTPAT keyword. For readout coadding methods, this step currently has support for Fowler mode, simple destructive read, and equally spaced sample-up-the-ramp patterns. Frames are combined and variance is calculated based on the readout pattern, using algorithms from the following paper: Nonlinearity Corrections and Statistical Uncertainties Associated with Near-Infrared Arrays, William D. Vacca, Michael C. Cushing and John T. Rayner (2004, PASP 116, 352). The readout algorithm may also be directly specified by the `algorithm` parameter, which takes the following possible integer values: - 0 : Use the last destructive frame only - 1 : Simple destructive mode - 2 : Use the first and last frames only - 3 : Use the second and penultimate frames only - 4 : Fowler mode - 5 : Sample-up-the-ramp mode After readout coadd, multiple frames taken at the same nod position (as indicated by the NINT keyword) are averaged. Extra frames that do not match the OTPAT or NINT pattern are dropped. Parameters ---------- data : numpy.ndarray [nframe, nspec, nspat] array of float values. header : fits.Header FITS header produced by `exes.readhdr`. Note that the header will be updated in-place. do_lincor : bool, optional If True, do the nonlinear correction. If False, no check is performed on data quality: the output mask will be all False (no bad pixels). algorithm : int, optional Used to select processing mode. If None, defaults to that determined by `check_readout_pattern`. toss_nint : int, optional If provided and greater than 0 but less than the number of integrations present in the data, `toss_nint` integrations will be discarded from the beginning of the data array. If the number equals the number of integrations, the first nod is a B nod, and there is another B nod in the data array, the first integrations are instead replaced with a copy of the second B nod. copy_int : bool, optional If `toss_nint` is greater than 0 and `copy_int` is True, replacement integrations are always copied from the next B nod, regardless of the number of integrations available. Returns ------- coadd_data, variance, mask : 3-tuple of numpy.ndarray The coadded data, variance, and good data mask. The coadd_data and variance have shape (nframes, ny, nx). The mask has shape (ny, nx) and Boolean data type, where True indicates a good pixel; False indicates a bad pixel. """ _check_header(header) data = _check_data(data) data, mask = _get_data_subarray(data, header) data, readmode = _check_readout_pattern(data, header) if isinstance(algorithm, int): touse_lookup = { 0: 'last destructive', 1: 'destructive', 2: 'first/last nd', 3: 'second/penultimate nd', 4: 'fowler', 5: 'sample-up-the-ramp'} touse = touse_lookup.get(algorithm) if touse is None: raise ValueError(f'Invalid algorithm: {touse}') else: touse = readmode['mode'] if do_lincor: log.info("Applying linear correction") lindata, linmask = lincor(data, header) # "And" the output mask to get all nonlinear detector pixels mask &= np.all(linmask, axis=0) else: log.info("Linear correction not applied: do_lincor=False") lindata = data.copy() log.info('') log.info(f'Using read mode algorithm: {touse}') log.info('') if touse == 'destructive' or touse == 'last destructive': coadd_data, variance = _process_destructive(data, header, readmode) elif touse == 'first/last nd': coadd_data, variance = _process_nondestructive1( lindata, header, readmode) elif touse == 'second/penultimate nd': coadd_data, variance = _process_nondestructive2( lindata, header, readmode) elif touse == 'fowler': coadd_data, variance = _process_fowler(lindata, header, readmode) elif touse == 'sample-up-the-ramp': coadd_data, variance = _process_sample_up_the_ramp( data, header, readmode) else: # pragma: no cover # shouldn't be reachable raise ValueError(f'Unrecognized readmode: {readmode["mode"]}') coadd_data, variance = _combine_nods(coadd_data, variance, header, readmode, toss_nint, copy_int) return coadd_data, variance, mask
def _get_data_subarray(data, header): ectpat = str(header.get('ECTPAT', 'UNKNOWN')).split() if len(ectpat) == 6: ectpat = np.asarray(ectpat, dtype=int) ystart = ectpat[2] * 2 ystop = ystart + (ectpat[3] * 2) xstart = ectpat[4] xstop = xstart + (ectpat[5]) else: xstart = 0 xstop = data.shape[2] ystart = 0 ystop = data.shape[1] nframes, ny, nx = data.shape bpm = get_badpix(header) if bpm is not None: if bpm.shape[0] < ny or bpm.shape[1] < nx: raise ValueError( "Bad pixel mask is too small %s; data shape is %s" % (repr(bpm.shape), repr(data.shape[1:]))) # Reference columns are marked as 2 in the badpix mask refidx = bpm == 2 if refidx.any(): xrange = np.where(~refidx)[1] data = data[:, :, xrange.min():xrange.max() + 1] bpm = bpm[:, xrange.min():xrange.max() + 1] nx = data.shape[2] xstart = 0 xstop = nx # Take subarray if needed bpm = bpm[ystart:ystop, xstart:xstop] mask = np.full((ny, nx), True) if bpm is not None: mask[bpm != 1] = False # Store data size header['DETSEC'] = '[%i:%i,%i:%i]' % ( xstart + 1, xstop, ystart + 1, ystop) header['NSPAT'] = nx header['NSPEC'] = ny # Correct frametime for subarray size as needed header['FRAMETIM'] *= ny / 1024. return data, mask def _check_header(header): """Ensure header values are of the correct type""" header['OTPAT'] = str(header['OTPAT']).upper().strip() header['NINT'] = int(header['NINT']) header['FRAMETIM'] = float(header['FRAMETIM']) header['READNOIS'] = float(header['READNOIS']) header['DARKVAL'] = float(header['DARKVAL']) try: header['PAGAIN'] = float(header['PAGAIN']) except (KeyError, ValueError): header['PAGAIN'] = 1.0 try: header['EPERADU'] = float(header['EPERADU']) except (KeyError, ValueError): header['EPERADU'] = 1.0 def _check_data(data): data = np.asarray(data, dtype=float) if data.ndim == 2: data = data[None] if data.ndim != 3: raise ValueError("Data must be a 3-D cube (nframe, nspec, nspat)") return data def _check_readout_pattern(data, header): regex = re.compile('[STNDC][0-9]+') if regex.match(header['OTPAT']) is None: raise ValueError("Unreadable OT pattern. OTPAT=%s" % header['OTPAT']) patterns = regex.findall(header['OTPAT']) spin = trash = nondest = dest = coadd = nread = npass = 0 for pattern in patterns: optype, n_op = pattern[0], int(pattern[1:]) + 1 if optype == 'S': spin += n_op npass += n_op elif optype == 'T': trash += n_op elif optype == 'D': dest += n_op npass += n_op elif optype == 'C': coadd += n_op dest += n_op npass += n_op elif optype == 'N': nread += 1 nondest += n_op npass += n_op nframes = coadd if coadd > 0 else (nondest + dest) npattern = data.shape[0] // nframes if npattern == 0: raise ValueError(f"Data does not match OTPAT={header['OTPAT']}." " A full pattern is not present; aborting.") elif (npattern * nframes) != data.shape[0]: log.warning(f"Data does not match OTPAT={header['OTPAT']}." " Dropping extra frames.") data = data[:npattern * nframes] if nread == 2 or (nread == 1 and nondest == 1): mode = 'fowler' elif nondest == 0 and dest == 1 and nframes == 1: mode = 'destructive' elif nondest != 0 and dest == 1: mode = 'sample-up-the-ramp' else: raise ValueError("Unrecognized readmode") log.info('') log.info(f"Readout pattern: {header['OTPAT']}") log.info(f"Frame(s) per pattern: {nframes}") log.info(f"Total frames: {data.shape[0]}") log.info(f"Recommended readout mode: {mode}") readmode = {'spin': spin, 'trash': trash, 'nondest': nondest, 'dest': dest, 'coadd': coadd, 'nread': nread, 'npass': npass, 'nframes': nframes, 'npattern': npattern, 'mode': mode} return data, readmode def _process_destructive(data, header, readmode): # Using D only log.info('Last destructive read') frametime = float(header['FRAMETIM']) interval = (readmode['dest'] + readmode['nondest'] + readmode['spin']) * frametime log.info(f'Interval = {interval}') read_noise = float(header['READNOIS']) gain = float(header['PAGAIN']) eperadu = float(header['EPERADU']) zeroval = float(header['DARKVAL']) / (frametime * gain) gain_factor = interval * gain v_gain_factor = interval * eperadu shape = readmode['npattern'], data.shape[1], data.shape[2] coadd_data = np.empty(shape, dtype=float) variance = np.empty(shape, dtype=float) dark1s = get_reset_dark(header) for i in range(readmode['npattern']): if readmode['coadd'] == 1: # pragma: no cover # The Fowler coadd/subtraction should have been done in hardware # (this OTPAT was never used for EXES) coadd_data[i] = zeroval - data[i] / gain_factor else: pattern_start = i * readmode['nframes'] pattern_end = pattern_start + readmode['nframes'] signal = data[pattern_end - 1] coadd_data[i] = zeroval - (signal - dark1s[None]) / gain_factor variance[i] = (np.abs(coadd_data[i]) / v_gain_factor + (read_noise / v_gain_factor) ** 2) header['NFRAME'] = 1 header['BEAMTIME'] = interval return coadd_data, variance def _process_nondestructive1(lindata, header, readmode): if readmode['nondest'] + readmode['dest'] < 2: raise ValueError('OTPAT is not suitable for First/Last ND mode') log.info("First/Last ND mode") frametime = float(header['FRAMETIM']) nread = (readmode['nondest'] + readmode['dest']) // 2 interval = (readmode['dest'] + readmode['nondest'] + readmode['spin'] - 1) * frametime log.info(f'Interval = {interval}') gain = float(header['PAGAIN']) gainfac = gain * interval vgainfac = float(header['EPERADU']) * interval readnoise = float(header['READNOIS']) darkval = float(header['DARKVAL']) zeroval = darkval / (frametime * gain) nframes = readmode['nframes'] shape = readmode['npattern'], lindata.shape[1], lindata.shape[2] coadd_data = np.empty(shape, dtype=float) variance = np.empty(shape, dtype=float) for i in range(shape[0]): if readmode['coadd'] == 1: # pragma: no cover # (this OTPAT was never used for EXES) coadd_data[i] = zeroval - lindata[i] / gainfac else: pattern_start = i * nframes # 1st frame pattern_end = pattern_start + nframes # last frame pattern_data = lindata[pattern_start:pattern_end] pedestal = pattern_data[0] signal = pattern_data[pattern_end - pattern_start - 1] coadd_data[i] = zeroval - ((signal - pedestal) / gainfac) variance[i] = ((np.abs(coadd_data[i]) / vgainfac) * (1 - (frametime * (nread ** 2 - 1)) / (3 * interval * nread)) + 2 * (readnoise ** 2) / ((vgainfac ** 2) * nread)) header['NFRAME'] = nread header['BEAMTIME'] = interval return coadd_data, variance def _process_nondestructive2(lindata, header, readmode): if readmode['nondest'] + readmode['dest'] < 4: raise ValueError('OTPAT is not suitable for ' 'Second/Penultimate ND mode') log.info("Second/Penultimate ND mode") frametime = float(header['FRAMETIM']) nread = (readmode['nondest'] - 1) // 2 interval = (readmode['nondest'] + readmode['spin'] - 2) * frametime log.info(f'Interval = {interval}') gain = float(header['PAGAIN']) gainfac = gain * interval vgainfac = float(header['EPERADU']) * interval readnoise = float(header['READNOIS']) darkval = float(header['DARKVAL']) zeroval = darkval / (frametime * gain) nframes = readmode['nframes'] shape = readmode['npattern'], lindata.shape[1], lindata.shape[2] coadd_data = np.empty(shape, dtype=float) variance = np.empty(shape, dtype=float) for i in range(shape[0]): if readmode['coadd'] == 1: # pragma: no cover # (this OTPAT was never used for EXES) coadd_data[i] = zeroval - lindata[i] / gainfac else: pattern_start = i * nframes + 1 # 2nd frame pattern_end = pattern_start + nframes - 2 # penultimate frame pattern_data = lindata[pattern_start:pattern_end] pedestal = pattern_data[0] signal = pattern_data[pattern_end - pattern_start - 1] coadd_data[i] = zeroval - ((signal - pedestal) / gainfac) variance[i] = ((np.abs(coadd_data[i]) / vgainfac) * (1 - (frametime * (nread ** 2 - 1)) / (3 * interval * nread)) + 2 * (readnoise ** 2) / ((vgainfac ** 2) * nread)) header['NFRAME'] = nread header['BEAMTIME'] = interval return coadd_data, variance def _process_fowler(lindata, header, readmode): log.info("Fowler mode") frametime = float(header['FRAMETIM']) gain = float(header['PAGAIN']) eperadu = float(header['EPERADU']) readnoise = float(header['READNOIS']) darkval = float(header['DARKVAL']) zeroval = darkval / (frametime * gain) nread = (readmode['nondest'] + readmode['dest']) // 2 interval = (readmode['npass'] - nread) * frametime gainfac = interval * gain vgainfac = interval * eperadu log.info(f'Interval = {interval}') shape = readmode['npattern'], lindata.shape[1], lindata.shape[2] coadd_data = np.empty(shape, dtype=float) variance = np.empty(shape, dtype=float) for i in range(shape[0]): if readmode['coadd'] == 1: # pragma: no cover # The Fowler coadd/subtraction should have been done in hardware # (this OTPAT was never used for EXES) coadd_data[i] = zeroval - lindata[i] / gainfac else: pattern_start = i * readmode['nframes'] pattern_end = pattern_start + readmode['nframes'] pattern_data = lindata[pattern_start:pattern_end] if nread > 1: # Add initial reads for pedestal level pedestal = bn.nansum(pattern_data[:nread], axis=0) # Add final reads for signal level signal = bn.nansum( pattern_data[nread:readmode['nframes']], axis=0) else: pedestal = pattern_data[0] signal = pattern_data[1] coadd_data[i] = zeroval - ((signal - pedestal) / (nread * gainfac)) variance[i] = ((np.abs(coadd_data[i]) / vgainfac) * (1 - (frametime * (nread ** 2 - 1)) / (3 * interval * nread)) + 2 * (readnoise ** 2) / ((vgainfac ** 2) * nread)) header['NFRAME'] = nread header['BEAMTIME'] = interval return coadd_data, variance def _process_sample_up_the_ramp(data, header, readmode): if readmode['nondest'] + readmode['dest'] < 3: raise ValueError('OTPAT is not suitable for ' 'Second/Penultimate ND mode') log.info("Sample-up-the-ramp mode") frametime = float(header['FRAMETIM']) gain = float(header['PAGAIN']) eperadu = float(header['EPERADU']) darkval = float(header['DARKVAL']) zeroval = darkval / (frametime * gain) readnoise = float(header['READNOIS']) nread = readmode['nframes'] interval = (readmode['npass'] - 1) * frametime alpha = nread * (nread + 1) // 12 gainfac = interval * gain vgainfac = interval * eperadu log.info(f'Interval = {interval}') shape = readmode['npattern'], data.shape[1], data.shape[2] coadd_data = np.empty(shape, dtype=float) variance = np.empty(shape, dtype=float) for i in range(readmode['npattern']): if readmode['coadd'] == 1: # pragma: no cover # The SUTR fit should have been done in hardware # (this OTPAT was never used for EXES) coadd_data[i] = zeroval - (data[i] / gainfac) else: nframes = readmode['nframes'] pattern_start = i * nframes pattern_end = pattern_start + nframes pattern_data = data[pattern_start:pattern_end] fac = ((np.arange(nread) + 1) - ((nread + 1) / 2)) / alpha s = bn.nansum(pattern_data[:nread] * fac[:, None, None], axis=0) coadd_data[i] = zeroval - (s / gainfac) variance[i] = (6 * np.abs(coadd_data[i]) * ((nread ** 2) + 1)) variance[i] /= (5 * vgainfac * nread * (nread + 1)) variance[i] += ((12 * (readnoise ** 2) * (nread - 1)) / ((vgainfac ** 2) * nread * (nread + 1))) header['NFRAME'] = nread header['BEAMTIME'] = interval return coadd_data, variance def _combine_nods(coadd_data, variance, header, readmode, toss_nint, copy_int): # Check for multiple frames at the same nod position nint = int(header['NINT']) if nint <= 1: return coadd_data, variance npatt = readmode['npattern'] nout = npatt // nint itot = header['BEAMTIME'] * nint * nout header['INTTIME'] = itot if 'OFF_SLIT' in str(header.get('INSTMODE', 'UNKNOWN')).upper(): header['EXPTIME'] = itot / 2. else: header['EXPTIME'] = itot header['NEXP'] = nint * nout log.info(f"Combining {nint} frames per nod position") if npatt < nint: raise ValueError(f"Data does not match NINT={nint}") elif npatt % nint != 0: log.warning(f"Data does not match NINT={nint}; " f"ignoring extra frames.") shape = nout, coadd_data.shape[1], coadd_data.shape[2] mcoadd = np.empty(shape, dtype=float) mvar = np.empty(shape, dtype=float) nodn = header.get('NODN', 1) for i in range(nout): if i == 0: if toss_nint > 0: if toss_nint < nint and (not copy_int or nodn < 2): log.info(f'Dropping the first {toss_nint} frames') start = toss_nint else: if toss_nint <= nint and nodn >= 2: # allow copying the sky data from the next nod # if available log.warning('Copying the first ' 'integration(s) from the next B nod') cstart = 2 * nint cstop = 2 * nint + nint coadd_data[0:nint] = coadd_data[cstart:cstop] variance[0:nint] = variance[cstart:cstop] else: # otherwise, just ignore the toss for this file log.warning('TOSS_NINT is set higher than ' 'NINT; ignoring.') # in either case, use all frames start = 0 else: start = 0 else: start = i * nint stop = i * nint + nint ncomb = stop - start mcoadd[i] = bn.nansum(coadd_data[start:stop], axis=0) / ncomb mvar[i] = bn.nansum(variance[start:stop], axis=0) / (ncomb ** 2) return mcoadd, mvar