Source code for sofia_redux.instruments.exes.cirrus

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from astropy import log
import numpy as np

from sofia_redux.instruments.exes import utils

__all__ = ['cirrus']


[docs] def cirrus(data, header, abeams, bbeams, flat): """ Correct nod-off-slit data for residual sky noise. Remove sky noise by subtracting (a + by) * B + (c + dy) / flat from A, where B is the sky nod array, A is the source array, and a, b, c, d are chosen to minimize the sum of (A - B)^2. Note that sky noise depends on y because clouds can vary during an array readout. This algorithm removes the average continuum along the slit. If this is not desired, background subtraction should be done during spectral extraction instead. Parameters ---------- data : numpy.ndarray Data cube of shape (nframe, nspec, nspat) or image (nspec, nspat). header : fits.Header Header associated with the input data. abeams : `list` of int List of frame indices corresponding to A frames in the data cube. bbeams : `list` of int List of frame indices corresponding to B frames in the data cube. flat : numpy.ndarray Flat image of shape (nspec, nspat). Returns ------- data : numpy.ndarray The corrected data. """ nx = header['NSPAT'] ny = header['NSPEC'] try: nz = utils.check_data_dimensions(data=data, nx=nx, ny=ny) except RuntimeError: log.error(f'Data has wrong dimensions {data.shape}. ' f'Not correcting background.') return data try: _check_beams(abeams, bbeams, nz) except RuntimeError: log.error('A and B beams must be specified and number ' 'must match. Not correcting background.') return data log.info('Subtracting sky fluctuations in cirrus') log.warning('This algorithm removes average continuum along slit. ' 'Use background subtraction during spectral extraction ' 'if this is not desired.') # Make index array to get distance from center y, x = np.mgrid[0:ny, 0:nx] xny2 = (ny + 1) / 2 dy = (y - xny2) / ny # Initialize alpha = np.zeros((4, 4)) beta = np.zeros(4) corrected = np.zeros((nz, ny, nx)) xpar = np.zeros((4, ny, nx)) # Get correctable points from flat z = flat > 0 if np.sum(z) == 0: log.error('No good points in flat.') return data for k in range(int(nz / 2)): # Get A and B data adata = data[abeams[k]] bdata = data[bbeams[k]] # Find sky noise parameters # Set to zero where flat is bad (so no contribution to sum) xpar[0][z] = bdata[z] xpar[1][z] = bdata[z] * dy[z] xpar[2][z] = 1 / flat[z] xpar[3][z] = dy[z] / flat[z] for i in range(4): for j in range(4): alpha[i, j] = np.nansum(xpar[i] * xpar[j]) beta[i] = np.nansum((adata - bdata) * xpar[i]) try: inv_alpha = np.linalg.inv(alpha) except np.linalg.LinAlgError: log.warning('Could not find sky parameters. ' 'Not correcting background.') return data alpha = inv_alpha a = np.zeros(4) for i in range(4): a[i] = np.nansum(beta * alpha[i]) log.info(f'Sky noise parameters for pair {k+1}: ') log.info(a) # Correct A beam adata[z] = (adata[z] - (a[0] + a[1] * dy[z]) * bdata[z] - (a[2] + a[3] * dy[z]) / flat[z]) corrected[abeams[k]] = adata corrected[bbeams[k]] = bdata return corrected
def _check_beams(abeams, bbeams, nz): if (len(abeams) == 0 or len(bbeams) == 0 or len(abeams) != len(bbeams) or len(abeams) != nz // 2): raise RuntimeError