Source code for sofia_redux.instruments.fifi_ls.combine_nods

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import os

import numba as nb
import numpy as np
from astropy import log
from astropy.io import fits
from astropy.time import Time
from pandas import DataFrame
from scipy.interpolate import interp1d
from scipy.optimize import curve_fit

from sofia_redux.instruments.fifi_ls.apply_static_flat import (calculate_flat,
                                                               get_flat)
from sofia_redux.instruments.fifi_ls.get_atran import get_atran_interpolated
from sofia_redux.instruments.fifi_ls.get_resolution import get_resolution
from sofia_redux.instruments.fifi_ls.lambda_calibrate import wave
from sofia_redux.instruments.fifi_ls.make_header import make_header
from sofia_redux.toolkit.interpolate import \
    interp_1d_point_with_error as interp
from sofia_redux.toolkit.utilities import gethdul, hdinsert, write_hdul

__all__ = ['classify_files', 'combine_extensions', 'combine_nods']


def _mjd(dateobs):
    """Get the MJD from a DATE-OBS."""
    try:
        mean_time = Time(dateobs).mjd
    except (ValueError, AttributeError):
        mean_time = 0
    return mean_time


def _unix(dateobs):
    """Get the Unix Time from a DATE-OBS."""
    try:
        mean_time = Time(dateobs).unix
    except (ValueError, AttributeError):
        mean_time = 0
    return mean_time


def _read_exthdrs(hdul, key, default=0):
    """
    Read FIFI-LS extension headers.

    Parameters
    ----------
    hdul : astropy.io.fits.HDUList
        The HDU list from which to read the headers.
    key : str
        The keyword to look up in the headers.
    default : int, optional
        The default value to return if the key is not found in the headers.
        Defaults to 0.

    Returns
    -------
    numpy.ndarray
        Adds one flux header per grating position
    """
    result = []
    if len(hdul) <= 1:
        return result
    ngrating = hdul[0].header.get('NGRATING', 1)
    for idx in range(ngrating):
        name = f'FLUX_G{idx}'
        header = hdul[name].header
        result.append(header.get(key, default))
    return np.array(result)


def _from_hdul(hdul, key):
    """Read a header keyword from the PHU."""
    return hdul[0].header[key.upper().strip()]

def _em_func(e, a, c):
    return a +c*e

def _em_func_b(em_spax):
    def wow(lam ,a, b, c):
        m = a + b*lam + c*em_spax
        return m
    return wow


def _apply_flat_for_telluric(hdul, flatdata, wave, skip_err=True):
    # flat data from get_flat:
    flatfile, spatdata, specdata, specwave, specerr = flatdata

    data = np.asarray(hdul[1].data, dtype=float) # Flux
    var = np.asarray(hdul[2].data,               # Stddev
                        dtype=float) ** 2
    if data.ndim < 3:
        data = data.reshape((1, *data.shape))
        var = var.reshape((1, *var.shape))

    hdu_result = calculate_flat(wave, data, var, spatdata, specdata,
                                specwave, specerr, skip_err)
    # calculate_flat return has many values, only need actual data
    return hdu_result[0][0]

def _atransmission(hdul,row,hdr0, hdul0):
    # gets transmission from current A file

    # data.shape (numramp, 16, 25) -- OTF
    # data.shape (16, 25) -- non OTF, not covered here
    numramp, numspexel, numspaxel = hdul.data.shape
    dimspexel = 1
    # Values from main header for wavelength calibration
    dichroic = hdr0['DICHROIC']
    channel=hdr0['CHANNEL']
    obsdate = hdr0['DATE-OBS']
    # Obsdate needs special format for lambda_calibrate.py
    try:
        obsdate = [int(x) for x in obsdate[:10].split('-')]
    except (ValueError, TypeError, IndexError):
        log.error('Invalid DATE-OBS')
        return
    channel = hdr0['CHANNEL']
    b_order = int(hdr0['G_ORD_B', -1])
    if channel == 'BLUE':
        if b_order in [1, 2]:
            blue = 'B%i' % b_order
        else:
            log.error("Invalid Blue grating order")
            return
    else:
        blue = None

    # Values from extention header for wavelength calibration
    ind = row['indpos']

    # Get spectral resolution (RESFILE keyword is added to primehead)
    resolution = get_resolution(hdr0)
    if resolution is None:
        log.error("Unable to determine spectral resolution")
        return

    # Get ATRAN data from input file or default on disk, smoothed to
    # current resolution
    atran_data = get_atran_interpolated(
        hdr0, resolution=resolution,
        atran_dir=None, use_wv=True,
        get_unsmoothed=True
    )

    if atran_data is None or atran_data[0] is None:
        log.error("Unable to get ATRAN data")
        return
    atran, unsmoothed = atran_data
    # Split in wavelength (x) and transmission factor (y)
    w_atran=atran[0]
    t_atran=atran[1]


    # Get calibration from lambda_calibrate.py
    # calibration = {'wavelength': w, 'width': p, 'wavefile': wavefile}
    # wavecal is optional to hand over, DataFrame containing wave calibration
    # data. May be supplied in order to remove the overhead of reading the file
    # every iteration of this function.

    calibration = wave(ind, obsdate, dichroic, blue=blue, wavecal=None)
    w_cal = calibration['wavelength']

    # Initialize the arrays to return for each spaxel
    t =[np.empty(numspexel) * np.nan for _ in range(numspaxel)]

    for spaxel in range(numspaxel):

        w_cal_loop=w_cal[:,spaxel]
        lambda_min = np.min(w_cal_loop)
        lambda_max = np.max(w_cal_loop)

        # Limit the ATRAN wavelengths to lamnda_min and lambda_max
        mask = (w_atran >= lambda_min) & (w_atran <= lambda_max)
        w_atran_masked = w_atran[mask]
        t_atran_masked = t_atran[mask]

        # Find spexel indices where data is not NaN
        valid_spexel = np.where(~np.isnan(hdul.data[:, :, spaxel]))[dimspexel]
        if len(valid_spexel) > 0:  # Proceed only if spaxel has valid data

            # Create a function for nearest neighbor interpolation
            interp_func = interp1d(w_atran_masked, t_atran_masked,
                                   kind='nearest', fill_value='extrapolate')

            # Interpolate the y values at low-resolution x values
            t[spaxel] = interp_func(w_cal_loop[valid_spexel])

            # Bring back emission and result to original size 16
            # and refill NaNs at originall positions. Can be done
            # on original data, no change in NaNs Inizialize empty
            # array with size 16
            t_full = np.empty(hdul.data.shape[dimspexel])
            t_full[:] = np.nan
            # Initialize array with ones at non-NaN indices
            nanarray = np.zeros(hdul.data.shape[dimspexel])
            nanarray[valid_spexel] = 1
            # Create two indices for the arrays of different
            # lengths, then only increment the index of the longer
            # array (original data).
            original_data_index = 0
            optimized_data_index = 0

            for value in nanarray:
                if value == 1:
                    if original_data_index < hdul.data.shape[dimspexel]:
                        t_full[original_data_index] = \
                            t[spaxel][optimized_data_index]
                        original_data_index += 1
                        optimized_data_index += 1
                elif value == 0:
                    # Only increment index of original data
                    original_data_index += 1

            # Write back into return array
            t[spaxel] = t_full
    t = np.transpose(np.array(t))

    return t


def _telluric_scaling(hdul, brow, hdr0, hdul0, sig_rel):

    numspexel, numspaxel = hdul.data.shape
    dimspexel = 0
    stddev = hdul0[2].data

    # Values from main header for wavelength calibration
    dichroic = hdr0['DICHROIC']
    channel=hdr0['CHANNEL']
    obsdate = hdr0['DATE-OBS']
    # Obsdate needs special format for lambda_calibrate.py
    try:
        obsdate = [int(x) for x in obsdate[:10].split('-')]
    except (ValueError, TypeError, IndexError):
        log.error('Invalid DATE-OBS')
        return
    channel = hdr0['CHANNEL']
    b_order = int(hdr0['G_ORD_B', -1])
    if channel == 'BLUE':
        if b_order in [1, 2]:
            blue = 'B%i' % b_order
        else:
            log.error("Invalid Blue grating order")
            return
    else:
        blue = None

    # Values from extention header for wavelength calibration
    ind = brow['indpos']

    # Get spectral resolution (RESFILE keyword is added to primehead)
    resolution = get_resolution(hdr0)
    if resolution is None:
        log.error("Unable to determine spectral resolution")
        return

    # Get ATRAN data from input file or default on disk, smoothed to
    # current resolution
    atran_data = get_atran_interpolated(
        hdr0, resolution=resolution,
        atran_dir=None, use_wv=True,
        get_unsmoothed=True
    )

    if atran_data is None or atran_data[0] is None:
        log.error("Unable to get ATRAN data")
        return
    atran, unsmoothed = atran_data
    # Split in wavelength (x) and transmission factor (y)
    w_atran=atran[0]
    t_atran=atran[1]


    # Get calibration from lambda_calibrate.py.
    # Calibration format: {'wavelength': w, 'width': p, 'wavefile': wavefile}.
    # The `wavecal` parameter is optional and may be supplied as a DataFrame
    # containing wave calibration data. This avoids the overhead of reading the
    # file in every iteration of this function.


    calibration = wave(ind, obsdate, dichroic, blue=blue, wavecal=None)
    w_cal = calibration['wavelength']

    # Perform flat fielding as in apply_static_flat.py to remove noise. This is
    # already performed per grating position here. The calculation occurs in a
    # for loop per grating position. The flat-fielded data is only used to
    # obtain the 'a' and 'c' values from the curve fit. Un-flatted data will be
    # used in subsequent pipeline steps as before.

    flatdata = get_flat(hdr0)
    flatval = _apply_flat_for_telluric(hdul0, flatdata, w_cal, skip_err=True)


    # Bounds in the physical range for all wavelengths, c must be positive as
    # there are no negative emissions
    param_bounds = ([0, 0], [np.inf, np.inf])
    param_bounds_b = ([-np.inf,-np.inf, 0], [ np.inf, np.inf ,np.inf])

    # Initialize the arrays to return for each spaxel
    em_opt =[np.empty(numspexel) * np.nan for _ in range(numspaxel)]
    em_opt_b =[np.empty(numspexel) * np.nan for _ in range(numspaxel)]
    em_opt_sig =[np.empty(numspexel) * np.nan for _ in range(numspaxel)]
    em_opt_b_sig =[np.empty(numspexel) * np.nan for _ in range(numspaxel)]
    t =[np.empty(numspexel) * np.nan for _ in range(numspaxel)]
    popt_b =[np.empty(3) * np.nan for _ in range(numspaxel)]  # a, b, c
    popt =[np.empty(2) * np.nan for _ in range(numspaxel)]  # a, c
    popt_b_sig =[np.empty(3) * np.nan for _ in range(numspaxel)]  # a, b, c
    popt_sig =[np.empty(2) * np.nan for _ in range(numspaxel)]  # a, c
    pr = 0
    for spaxel in range(numspaxel):
        pr +=1

        w_cal_loop=w_cal[:,spaxel]
        lambda_min = np.min(w_cal_loop)
        lambda_max = np.max(w_cal_loop)

        mask = (w_atran >= lambda_min) & (w_atran <= lambda_max)
        w_atran_masked = w_atran[mask]
        t_atran_masked = t_atran[mask]

        # Find spexel indices where data is not NaN
        valid_spexel = np.where(~np.isnan(hdul.data[:, spaxel]))[dimspexel]
        if len(valid_spexel) > 0:  # Proceed only if spaxel has valid data

            # Create a function for nearest neighbor interpolation
            interp_func = interp1d(
                w_atran_masked, t_atran_masked,
                kind='nearest', fill_value='extrapolate'
            )

            # Interpolate the y values at low-resolution x values
            t[spaxel] = interp_func(w_cal_loop[valid_spexel])

            # Emission = (1 − T ransmission)*1/(lambda^5*e^(h*c/k*lambda*T)-1)
            # Simplified: Emission = 1-Transmission
            # EmissionModel = a + b ∗ λ + c ∗ Emission(λ, T)
            # em = a + b*λ + c(1-T)
            e = 1-t[spaxel]
            #  # data.shape (16, 25)
            # numspexel, numspaxel = hdul.data.shape

            # Perform optimized linear fit
            if sig_rel:
                e_mean = np.mean(e)

                e_kehr = 1/e
                e_mean_kehr= 1/e_mean
                e_kehr_rel = e_kehr/e_mean_kehr
                sigma_mean = np.mean(stddev[valid_spexel,spaxel])
                sigma_rel = stddev[valid_spexel,spaxel]/sigma_mean
                sigma_used = np.sqrt(np.square(sigma_rel)+np.square(e_kehr_rel))

            else:
                sigma_used = stddev[valid_spexel,spaxel]
            popt_sig[spaxel], pcov = curve_fit(
                _em_func,
                e,
                np.array(flatval)[valid_spexel, spaxel],
                sigma = sigma_used, bounds=param_bounds)
            popt_b_sig[spaxel], pcov = curve_fit(
                _em_func_b(e),
                w_cal_loop[valid_spexel],
                np.array(flatval)[valid_spexel, spaxel],
                sigma = sigma_used, bounds=param_bounds_b)

            popt[spaxel], pcov = curve_fit(
                _em_func,
                e,
                np.array(flatval)[valid_spexel, spaxel],
                bounds=param_bounds)
            popt_b[spaxel], pcov = curve_fit(
                _em_func_b(e),
                w_cal_loop[valid_spexel],
                np.array(flatval)[valid_spexel,spaxel],
                bounds=param_bounds_b)

            a, c = popt[spaxel]
            em_opt[spaxel] = a + c*e
            a_sig, c_sig = popt_sig[spaxel]
            em_opt_sig[spaxel] = a_sig + c_sig*e

            a_b,b_b, c_b = popt_b[spaxel]
            em_opt_b[spaxel] = a_b + b_b*w_cal_loop[valid_spexel]+c_b*e
            a_b_sig,b_b_sig, c_b_sig = popt_b_sig[spaxel]
            em_opt_b_sig[spaxel] = (a_b_sig + b_b_sig*w_cal_loop[valid_spexel]
                                    + c_b_sig*e)



            # Bring back emission and result to original size 16 and refill NaNs
            # at original positions. Can be done on original data, no change in
            # NaNs. Inizialize empty array with size 16
            restored_em_opt = np.empty(hdul.data.shape[dimspexel])
            restored_em_opt_b = np.empty(hdul.data.shape[dimspexel])
            restored_em_opt_sig = np.empty(hdul.data.shape[dimspexel])
            restored_em_opt_b_sig = np.empty(hdul.data.shape[dimspexel])
            t_full = np.empty(hdul.data.shape[dimspexel])
            restored_em_opt[:] = np.nan
            restored_em_opt_b[:] = np.nan
            restored_em_opt_sig[:] = np.nan
            restored_em_opt_b_sig[:] = np.nan
            t_full[:] = np.nan
            # Initialize array with ones at non-NaN indices
            nanarray = np.zeros(hdul.data.shape[dimspexel])
            nanarray[valid_spexel] = 1
            # Create two indices for the arrays of different lengths, then only
            # increment the index of the longer array (original data).
            original_data_index = 0
            optimized_data_index = 0

            for value in nanarray:
                if value == 1:
                    if original_data_index < hdul.data.shape[dimspexel]:
                        restored_em_opt[original_data_index] = \
                            em_opt[spaxel][optimized_data_index]
                        restored_em_opt_b[original_data_index] = \
                            em_opt_b[spaxel][optimized_data_index]
                        restored_em_opt_sig[original_data_index] = \
                            em_opt_sig[spaxel][optimized_data_index]
                        restored_em_opt_b_sig[original_data_index] = \
                            em_opt_b_sig[spaxel][optimized_data_index]
                        t_full[original_data_index] = \
                            t[spaxel][optimized_data_index]
                        original_data_index += 1
                        optimized_data_index += 1
                elif value == 0:
                    # Only increment index of original data
                    original_data_index += 1

            # Write back into return array
            em_opt[spaxel] = restored_em_opt
            em_opt_b[spaxel] = restored_em_opt_b
            em_opt_sig[spaxel] = restored_em_opt_sig
            em_opt_b_sig[spaxel] = restored_em_opt_b_sig
            t[spaxel] = t_full


    em_opt = np.transpose(np.array(em_opt))
    em_opt_b = np.transpose(np.array(em_opt_b))
    popt = np.array(popt)
    popt_b = np.array(popt_b)
    popt_sig = np.array(popt_sig)
    popt_b_sig = np.array(popt_b_sig)
    t = np.transpose(np.array(t))
    return popt, t, flatval, popt_b, em_opt_b, popt_sig, popt_b_sig, em_opt, w_cal


[docs] def classify_files(filenames, offbeam=False): """ Extract various properties of all files for subsequent combination. Parameters ---------- filenames : array_like of str File paths to FITS files offbeam : bool, optional If True, swap 'A' nods with 'B' nods and the following associated keywords: DLAM_MAP <-> DLAM_OFF, DBET_MAP <-> DBET_OFF. Returns ------- pandas.DataFrame DataFrame containing extracted properties of the input files. """ hduls = [] fname_list = [] for fname in filenames: hdul = gethdul(fname) if hdul is None: log.error("Invalid HDUList: %s" % fname) continue hduls.append(hdul) if not isinstance(fname, str): fname_list.append(_from_hdul(hdul, 'FILENAME')) else: fname_list.append(fname) filenames = fname_list n = len(filenames) if n == 0: log.error("No good files found.") return None keywords = ['nodstyle', 'detchan', 'channel', 'nodbeam', 'dlam_map', 'dbet_map', 'dlam_off', 'dbet_off', 'date-obs'] init = dict((key, [_from_hdul(hdul, key) for hdul in hduls]) for key in keywords) init['mjd'] = [_mjd(dateobs) for dateobs in init['date-obs']] init['indpos'] = [ _read_exthdrs(hdul, 'indpos', default=0) for hdul in hduls ] init['bglevl'] = [ _read_exthdrs(hdul, 'bglevl_a', default=0) for hdul in hduls ] init['asymmetric'] = [ x in ['ASYMMETRIC', 'C2NC2'] for x in init['nodstyle'] ] init['tsort'] = [0.0] * n init['sky'] = [False] * n # calculate later init['hdul'] = hduls init['chdul'] = [None] * n init['combined'] = [np.full(len(x), False) for x in init['indpos']] init['outfile'] = [''] * n init['mstddev'] = [ _read_exthdrs(hdul, 'mstddev', default=0) for hdul in hduls ] df = DataFrame(init, index=filenames) # If any files are asymmetric, treat them all as asymmetric if df['asymmetric'].any() and not df['asymmetric'].all(): log.warning("Mismatched NODSTYLE. Will attempt to combine anyway.") # Drop any bad dates bad_dates = df[df['mjd'] == 0] if len(bad_dates) > 0: for name, row in bad_dates.iterrows(): log.error('DATE-OBS in header is %s for %s' % (row['date-obs'], name)) df = df.drop(bad_dates.index) # If there's a good detchan value, use it in place of channel, # then set channel to either 1 (BLUE) or 0 (RED) valid_detchan = (df['detchan'] != 0) & (df['detchan'] != '0') df['channel'] = np.where(valid_detchan, df['detchan'], df['channel']) df['channel'] = df['channel'].apply(lambda x: 1 if x == 'BLUE' else 0) # update headers if offbeam is True if offbeam: # Switch A and B beams df['nodbeam'] = np.where(df['nodbeam'] != 'A', 'A', 'B') df = df.rename(index=str, columns={ 'dlam_map': 'dlam_off', 'dbet_map': 'dbet_off', 'dlam_off': 'dlam_map', 'dbet_off': 'dbet_map'}) for key in ['dlam_map', 'dlam_off', 'dbet_map', 'dbet_off', 'nodbeam']: df.apply(lambda x: hdinsert( x.hdul[0].header, key.upper(), x[key]), axis=1) # set on-source exptime to 0 for asym B beams and track as 'sky' files df['sky'] = df['asymmetric'] & (df['nodbeam'] != 'A') for hdul in df[df['sky']]['hdul'].values: if isinstance(hdul, fits.HDUList): hdul[0].header['EXPTIME'] = 0.0 hdul[0].header['NEXP'] = 0 return df
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def interp_b_nods(atime, btime, bdata, berr): # pragma: no cover """ Interpolate two B nods to the A time. Parameters ---------- atime : array-like of float The UNIX time for each A nod sample. btime : array-like float Before and after time for the B nods. Expected to have two elements; all `atime` values should fall between the first and second values. bdata : array-like of float 2 x nw x ns B nod data to interpolate. berr : array-like of float 2 x nw x ns B nod errors to interpolate. Returns ------- bflux, bvar : array-like of float nw x ns interpolated B nod flux and variance. """ nt = atime.size nn, nw, ns = bdata.shape bflux = np.empty((nt, nw, ns), dtype=nb.float64) bvar = np.empty((nt, nw, ns), dtype=nb.float64) for t in range(nt): for i in range(nw): for j in range(ns): # flux and error at this pixel bf = bdata[:, i, j] be = berr[:, i, j] # Interpolate B flux and error onto A time if np.any(np.isnan(bf)) or np.any(np.isnan(be)): bflux[t, i, j] = np.nan bvar[t, i, j] = np.nan else: # f, e = interp(btime, bf, be, atime[t]) # Get rid of day changes etc. in data if atime[t] - btime[0] > 180: bf[0] = bf[1] be[0] = be[1] elif btime[-1] - atime[t] > 180: bf[-1] = bf[0] be[-1] = be[0] f, z = interp(btime, bf, be, atime[t]) # interpolate the error like the flux without squared adding e, _ = interp(btime, be, bf, atime[t]) bflux[t, i, j] = f bvar[t, i, j] = e * e return bflux, bvar
[docs] def combine_extensions(df, b_nod_method='nearest', bg_scaling=False, telluric_scaling_on=False): """ Find a B nod for each A nod. For asymmetric data, DLAM and DBET do not need to match, B data can be used more than once, and the B needs to be subtracted, rather than added (symmetric B nods are multiplied by -1 in chop_subtract) For the 'interpolate' option for B nod combination for most data, the time of interpolation is taken to be the middle of the observation, as determined by the FIFISTRT and EXPTIME keywords in the primary header. For OTF data, the time is interpolated between RAMPSTRT and RAMPEND times in the extension header, for each ramp. Parameters ---------- df : pandas.DataFrame b_nod_method : {'nearest', 'average', 'interpolate'}, optional Determines the method of combining the two nearest before and after B nods. bg_scaling : float, optional A scaling factor to apply to the background data for B nod combination. telluric_scaling_on : bool, optional If True, applies telluric scaling during B nod combination. Returns ------- list of fits.HDUList A list of HDUList objects containing the combined B nod data for each A nod. Raises ------ ValueError If an invalid B nod method is specified, or if the data is not compatible with the selected method.its.HDUList """ # check B method parameter if b_nod_method not in ['nearest', 'average', 'interpolate']: raise ValueError("Bad b_nod_method: should be 'nearest', " "'average', or 'interpolate'.") get_two = b_nod_method != 'nearest' df.sort_values('mjd', inplace=True) blist = df[df['nodbeam'] == 'B'] alist = df[df['nodbeam'] == 'A'] # skip if no pairs available if len(blist) == 0: log.warning('No B nods found') return df elif len(alist) == 0: log.error('No A nods found') return df for afile, arow in alist.iterrows(): asymmetric = arow['asymmetric'] bselect = blist[(blist['channel'] == arow['channel']) & (blist['asymmetric'] == asymmetric)] if not asymmetric: bselect = bselect[(bselect['dlam_map'] == arow['dlam_map']) & (bselect['dbet_map'] == arow['dbet_map'])] # find closest matching B image in time if get_two and asymmetric: bselect['tsort'] = bselect['mjd'] - arow['mjd'] after = (bselect[bselect['tsort'] > 0]).sort_values('tsort') bselect = (bselect[bselect['tsort'] <= 0]).sort_values( 'tsort', ascending=False) else: bselect['tsort'] = abs(bselect['mjd'] - arow['mjd']) bselect = bselect.sort_values('tsort') after = None primehead, combined_hdul = None, None for aidx, apos in enumerate(arow['indpos']): bidx, bidx2 = [], [] bfile, bfile2 = None, None brow, brow2 = None, None for bfile, brow in bselect.iterrows(): bidx = brow['indpos'] == apos if not asymmetric: bidx &= ~brow['combined'] if np.any(bidx): break if after is not None: for bfile2, brow2 in after.iterrows(): # always asymmetric bidx2 = brow2['indpos'] == apos if np.any(bidx2): break if not np.any(bidx) and np.any(bidx2): bidx = bidx2 brow = brow2 bfile = bfile2 bidx2 = [] describe_a = f"A {os.path.basename(arow.name)} at ext{aidx + 1} " \ f"channel {arow['channel']} indpos {apos} " \ f"dlam {arow['dlam_map']} dbet {arow['dbet_map']}" if np.any(bidx): arow['combined'][aidx] = True a_fname = f'FLUX_G{aidx}' a_sname = f'STDDEV_G{aidx}' a_hdr = arow['hdul'][0].header bgidx = np.nonzero(bidx)[0][0] brow['combined'][bgidx] = True b_background = brow['bglevl'][bgidx] b_fname = f'FLUX_G{bgidx}' b_sname = f'STDDEV_G{bgidx}' b_flux = brow['hdul'][b_fname].data b_var = brow['hdul'][b_sname].data ** 2 b_hdr = brow['hdul'][0].header # check for offbeam with OTF mode: B nods # can't have an extra dimension if b_flux.ndim > 2: msg = 'Offbeam option is not available for OTF mode' log.error(msg) raise ValueError(msg) combine_headers = [a_hdr, b_hdr] # check for a second B nod: if not found, will do # 'nearest' for this A file if np.any(bidx2): # add in header for combination b2_hdr = brow2['hdul'][0].header combine_headers.append(b2_hdr) # get A and B times try: # read number of chop cycles per grating position from # header of current file, might change over # observations. Might be overkill, but still correct if arow['detchan'] == 'BLUE': a_chpg = a_hdr['C_CYC_B'] b_chpg1 = b_hdr['C_CYC_B'] b_chpg2 = b2_hdr['C_CYC_B'] else: a_chpg = a_hdr['C_CYC_R'] b_chpg1 = b_hdr['C_CYC_R'] b_chpg2 = b2_hdr['C_CYC_R'] # unix time at middle of grating position, each time # looking from A file # --> 3x adix as base as current A nod is reference atime = _unix(a_hdr['DATE-OBS']) \ + (aidx + 0.5)*((a_hdr['C_CHOPLN']*2/250)*a_chpg) btime1 = _unix(b_hdr['DATE-OBS']) \ + (aidx + 0.5)*((b_hdr['C_CHOPLN']*2/250)*b_chpg1) btime2 = _unix(b2_hdr['DATE-OBS']) \ + (aidx + 0.5)*((b2_hdr['C_CHOPLN']*2/250)*b_chpg2) except KeyError: raise ValueError('Missing DATE-OBS, C_CHOPLN or C_CYC ' 'keys in headers.') # get index for second B row bgidx2 = np.nonzero(bidx2)[0][0] brow2['combined'][bgidx2] = True b_fname = f'FLUX_G{bgidx2}' b_sname = f'STDDEV_G{bgidx2}' # Get header and data shape of current A file a_hdu_hdr = arow['hdul'][a_fname].header a_shape = arow['hdul'][a_fname].data.shape # Perform telluric scaling med = False # True: Takes median of factors. False: Takes curve fit params for each spaxel sig = True # True: Sigma into curve fit. False: No sigma into curve fit sig_rel = False # True: Normed Sigma. False: STDDEV ac = False # True: only a and c fit, False: a, b and c fit if telluric_scaling_on: if len(a_shape) == 3: popt1, t1 , b1_flat, popt1_b, b1_fitted_b, popt1_sig, popt1_b_sig, b1_fitted, lambda1 = \ _telluric_scaling(brow['hdul'][b_fname],brow, brow['hdul'][0].header, brow['hdul'], sig_rel) popt2, t2, b2_flat, popt2_b, b2_fitted_b, popt2_sig, popt2_b_sig, b2_fitted, lambda2 = \ _telluric_scaling(brow2['hdul'][b_fname],brow2, brow2['hdul'][0].header, brow2['hdul'], sig_rel) ta = _atransmission(arow['hdul'][a_fname],arow, arow['hdul'][0].header, arow['hdul']) # reshape into data.shape (16, 25) numspexel, numspaxel = brow['hdul'][b_fname].data.shape if ac: if sig: # Only a and c curve fit parameters a1 =popt1_sig[:, 0] # Extracts the first column (a values) c1= popt1_sig[:, 1] # Extracts the 2nd column (c values if no b values) a2 =popt2_sig[:, 0] # Extracts the first column (a values) c2= popt2_sig[:, 1] # Extracts the 2nd column (c values if no b values) else: a1 =popt1[:, 0] # Extracts the first column (a values) c1= popt1[:, 1] # Extracts the 2nd column (c values if no b values) a2 =popt2[:, 0] # Extracts the first column (a values) c2= popt2[:, 1] # Extracts the 2nd column (c values if no b values) if med: a1 = np.nanmedian(a1) a2 = np.nanmedian(a2) c1 = np.nanmedian(c1) c2 = np.nanmedian(c2) # write array of size 16, 25 with one value a1_full= np.full((numspexel, numspaxel), a1) c1_full= np.full((numspexel, numspaxel), c1) a2_full= np.full((numspexel, numspaxel), a2) c2_full= np.full((numspexel, numspaxel), c2) else: # Reshape into a 2D array (16, 25) a1_full= np.tile(a1, (numspexel, 1)) c1_full= np.tile(c1, (numspexel, 1)) a2_full= np.tile(a2, (numspexel, 1)) c2_full= np.tile(c2, (numspexel, 1)) b1_fitted = a1_full + np.multiply(c1_full,(1-t1)) b2_fitted = a2_full + np.multiply(c2_full,(1-t2)) telfac1 = 1 + np.divide(c1_full,b1_fitted)*(t1-ta) telfac2 = 1 + np.divide(c2_full,b2_fitted)*(t2-ta) b_flux = np.multiply(b_flux,telfac1) b_flux2 = np.multiply(brow2['hdul'][b_fname].data,telfac2) bdata = np.array([b_flux, b_flux2]) berr = np.array([np.multiply(np.sqrt(b_var),telfac1), np.multiply(brow2['hdul'][b_sname].data,telfac2)]) else: # a ,b and c curve fit parameters if sig: a1_b = popt1_b_sig[:, 0] b1_b = popt1_b_sig[:, 1] c1_b = popt1_b_sig[:, 2] a2_b = popt2_b_sig[:, 0] b2_b = popt2_b_sig[:, 1] c2_b = popt2_b_sig[:, 2] else: a1_b = popt1_b[:, 0] b1_b = popt1_b[:, 1] c1_b = popt1_b[:, 2] a2_b = popt2_b[:, 0] b2_b = popt2_b[:, 1] c2_b = popt2_b[:, 2] if med: a1_b = np.nanmedian(a1_b) b1_b = np.nanmedian(b1_b) c1_b = np.nanmedian(c1_b) a2_b = np.nanmedian(a2_b) b2_b = np.nanmedian(b2_b) c2_b = np.nanmedian(c2_b) a1_b_full= np.full((numspexel, numspaxel), a1_b) b1_b_full= np.full((numspexel, numspaxel), b1_b) c1_b_full= np.full((numspexel, numspaxel), c1_b) a2_b_full= np.full((numspexel, numspaxel), a2_b) b2_b_full= np.full((numspexel, numspaxel), b2_b) c2_b_full= np.full((numspexel, numspaxel), c2_b) else: # Reshape into a 2D array (16, 25) a1_b_full= np.tile(a1_b, (numspexel, 1)) b1_b_full= np.tile(b1_b, (numspexel, 1)) c1_b_full= np.tile(c1_b, (numspexel, 1)) a2_b_full= np.tile(a2_b, (numspexel, 1)) b2_b_full= np.tile(b2_b, (numspexel, 1)) c2_b_full= np.tile(c2_b, (numspexel, 1)) b1_fitted_b = a1_b_full + np.multiply(b1_b_full, lambda1) + np.multiply(c1_b_full,(1-t1)) b2_fitted_b = a2_b_full + np.multiply(b2_b_full, lambda2) + np.multiply(c2_b_full,(1-t2)) telfac1_b = 1 + np.divide(c1_b_full,b1_fitted_b)*(t1-ta) telfac2_b = 1 + np.divide(c2_b_full,b2_fitted_b)*(t2-ta) b_flux = np.multiply(b_flux,telfac1_b) b_flux2 = np.multiply(brow2['hdul'][b_fname].data,telfac2_b) bdata = np.array([b_flux, b_flux2]) berr = np.array([np.multiply(np.sqrt(b_var),telfac1_b), np.multiply(brow2['hdul'][b_sname].data,telfac2_b)]) else: log.warning("Telluric scaling not available for pointed data. \n" "Skipping telluric scaling") b_flux2 = brow2['hdul'][b_fname].data bdata = np.array([b_flux, b_flux2]) berr = np.array([np.sqrt(b_var), brow2['hdul'][b_sname].data]) else: b_flux2 = brow2['hdul'][b_fname].data bdata = np.array([b_flux, b_flux2]) berr = np.array([np.sqrt(b_var), brow2['hdul'][b_sname].data]) if b_nod_method == 'interpolate': # debug message msg = f'Interpolating B {bfile} at {btime1} ' \ f'and {bfile2} at {btime2} ' \ f'to A time {atime} and subbing from ' # UNIX time is a range of values for OTF data: # retrieve from RAMPSTRT and RAMPEND key if len(a_shape) == 3 \ and 'RAMPSTRT' in a_hdu_hdr \ and 'RAMPEND' in a_hdu_hdr: rampstart = a_hdu_hdr['RAMPSTRT'] rampend = a_hdu_hdr['RAMPEND'] nramp = a_shape[0] ramp_incr = (rampend - rampstart) / (nramp - 1) atime = np.full(nramp, rampstart) atime += np.arange(nramp, dtype=float) * ramp_incr else: atime = np.array([atime]) btime = np.array([btime1, btime2]) b_flux, b_var = \ interp_b_nods(atime, btime, bdata, berr) # reshape if there was only one atime if atime.size == 1: b_flux = b_flux[0] b_var = b_var[0] # # average over two background files b_background += brow2['bglevl'][bgidx2] b_background /= 2. # # interpolate background to header atime # b_background = \ # np.interp(atime, [btime1, btime2], # [b_background, brow2['bglevl'][bgidx2]]) # average background else: # debug message msg = f'Averaging B {bfile} and {bfile2} ' \ f'and subbing from ' # average flux b_flux += b_flux2 b_flux /= 2. # propagate variance b_var += brow2['hdul'][b_sname].data ** 2 b_var /= 4. # average background b_background += brow2['bglevl'][bgidx2] b_background /= 2. else: if asymmetric: msg = f'Subbing B {os.path.basename(brow.name)} from ' else: msg = f'Adding B {os.path.basename(brow.name)} to ' log.debug(msg + describe_a) # Note: in the OTF case, A data is a 3D cube with # ramps x spexels x spaxels, and B data is a # 2D array of spexels x spaxels. The B data is # subtracted at each ramp. # For other modes, A and B are both spexels x spaxels. flux = arow['hdul'][a_fname].data stddev = arow['hdul'][a_sname].data ** 2 + b_var if telluric_scaling_on and b_nod_method=='nearest': log.warning("Telluric Scaling only for interpolated data") if asymmetric: # Optional background scaling for unchopped observations if bg_scaling and a_hdr['C_AMP']==0: a_background = arow['bglevl'][aidx] flux -= b_flux*a_background/b_background else: if telluric_scaling_on: flux_diff = np.nanmedian(flux) \ - np.nanmedian(b_flux) flux_diff_full = np.full(flux.shape, flux_diff) flux -= b_flux + flux_diff_full else: flux -= b_flux else: # b_flux from source is negative for symmetric chops # as result of subtract chops flux += b_flux # divide by two for doubled source flux /= 2 stddev /= 4 stddev = np.sqrt(stddev) if combined_hdul is None: primehead = make_header(combine_headers) primehead['HISTORY'] = 'Nods combined' hdinsert(primehead, 'PRODTYPE', 'nod_combined') outfile, _ = os.path.splitext(os.path.basename(afile)) outfile = '_'.join(outfile.split('_')[:-2]) outfile += '_NCM_%s.fits' % primehead.get('FILENUM') df.loc[afile, 'outfile'] = outfile hdinsert(primehead, 'FILENAME', outfile) combined_hdul = fits.HDUList( fits.PrimaryHDU(header=primehead)) exthead = arow['hdul'][a_fname].header hdinsert(exthead, 'BGLEVL_B', b_background, comment='BG level nod B (ADU/s)') combined_hdul.append(fits.ImageHDU(flux, header=exthead, name=a_fname)) combined_hdul.append(fits.ImageHDU(stddev, header=exthead, name=a_sname)) # add in scanpos table from A nod if present a_pname = f'SCANPOS_G{aidx}' if a_pname in arow['hdul']: combined_hdul.append(arow['hdul'][a_pname].copy()) else: msg = "No matching B found for " log.debug(msg + describe_a) if combined_hdul is not None: df.at[afile, 'chdul'] = combined_hdul return df
[docs] def combine_nods(filenames, offbeam=False, b_nod_method='nearest', outdir=None, write=False, bg_scaling=False, telluric_scaling_on=False): """ Combine nods of ramp-fitted, chop-subtracted data. Writes a single FITS file to disk for each A nod found. Each HDU list contains n_san binary table extensions, each containing DATA and STDDEV data cubes, each 5x5x18. The output filename is created from the input filename, with the suffix 'CSB', 'RP0' or 'RP1' replaced with 'NCM', and with input file numbers numbers concatenated. Unless specified, the output directory is the same as the input files. Input files should have been generated by `subtract_chops`, or `fit_ramps` (for total power mode, which has no chops). The procedure is: 1. Read header information from each extension in each of the input files, making lists of A data and B data, with relevant metadata (dither) position, date/time observed (DATE-OBS), inductosyn position, channel, nod style). 2. Loop though all A data to find matching B data a. asymmetric nod style: find closest B nod in time with the same channel and inductosyn position. Dither position does not have to match, B data can be used more than once, and data must be subtracted rather than added. b. symmetric nod style: find closest B nod in time with the same channel, inductosyn position, and dither position. Each B nod can only be used once, since it contains a source observation, and data must be added rather than subtracted. 3. After addition or subtraction, create a FITS file and write results to disk. Parameters ---------- filenames : array_like of str File paths to the data to be combined offbeam : bool, optional If True, swap 'A' nods with 'B' nods and the following associated keywords: DLAM_MAP <-> DLAM_OFF, DBET_MAP <-> DBET_OFF. This option cannot be used with OTF-mode A nods. b_nod_method : {'nearest', 'average', 'interpolate'}, optional For asymmetric, data this option controls how the nearest B nods are combined. The 'nearest' option takes only the nearest B nod in time. The 'average' option averages the nearest before and after B nods. The 'interpolate' option linearly interpolates the nearest before and after B nods to the time of the A data. outdir : str, optional Directory path to write output. If None, output files will be written to the same directory as the input files. write : bool, optional If True, write to disk and return the path to the output file. If False, return the HDUL. telluric_scaling_on : bool, optional If True, apply telluric scaling to the data. bg_scaling : bool, optional If True, apply background scaling to the data. Returns ------- pandas.DataFrame The output pandas dataframe contains a huge variety of information indexed by original filename. The combined A-B FITS data are located in the 'chdul' column. Note that only A nod files contain combined data in this 'chdul' column. For example, in order to extract combined FITS data, one could issue the command:: df = combine_nods(filenames) combined_hduls = df[df['nodbeam'] == 'A']['chdul'] In order to extract rows from the dataframe that were not combined issue the command:: not_combined = df[(df['nodbeam'] == 'A') & (df['chdul'] == None)] files are considered 'combined' if at least one A extension was combined for an A-nod file. A true signifier of whether an extension was combined (both A and B nod files) can be found in the 'combined' column as a list of bools, one for each extension. """ if isinstance(filenames, str): filenames = [filenames] if not hasattr(filenames, '__len__'): log.error("Invalid input files type (%s)" % repr(filenames)) return if isinstance(outdir, str): if not os.path.isdir(outdir): log.error("Output directory %s does not exist" % outdir) return df = classify_files(filenames, offbeam=offbeam) if df is None: log.error("Problem in file classification") return df = combine_extensions(df, b_nod_method=b_nod_method, bg_scaling=bg_scaling, telluric_scaling_on=telluric_scaling_on) for filename, row in df[df['nodbeam'] == 'A'].iterrows(): if outdir is not None: outdir = str(outdir) else: outdir = os.path.dirname(filename) if write and row['chdul'] is not None: write_hdul(row['chdul'], outdir=outdir, overwrite=True) if row['outfile'] is not None: df.at[filename, 'outfile'] = os.path.join( outdir, os.path.basename(row['outfile'])) return df