Source code for sofia_redux.instruments.fifi_ls.spatial_calibrate

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import os

from astropy import log
from astropy.io import fits
import numpy as np
import pandas

from sofia_redux.instruments import fifi_ls
from sofia_redux.toolkit.utilities \
    import (gethdul, goodfile, hdinsert, write_hdul, multitask)


__all__ = ['clear_spatial_cache', 'get_spatial_from_cache',
           'store_spatial_in_cache', 'offset_xy',
           'get_deltavec_coeffs', 'calculate_offsets',
           'spatial_calibrate', 'wrap_spatial_calibrate']

__spatial_cache = {}


[docs] def clear_spatial_cache(): """ Clear all data from the spatial cache. """ global __spatial_cache __spatial_cache = {}
[docs] def get_spatial_from_cache(spatialfile, obsdate): """ Retrieves spatial data from the spatial cache. Checks to see if the file still exists, can be read, and has not been altered since last time. If the file has changed, it will be deleted from the cache so that it can be re-read. Parameters ---------- spatialfile : str File path to the spatial file obsdate : int Observation date Returns ------- arg1, arg2 arg1 : str or dict If reading an offsets file, `arg1` will be the filename. If reading a coefficients file, `arg1` will be a dictionary of coefficients. arg2 : numpy.ndarray or dict If reading an offsets file, `arg2` will be an array of offsets. If reading a coefficients file, `arg2` will be a dictionary of secondary coefficients. """ global __spatial_cache key = spatialfile, int(obsdate) if key not in __spatial_cache: return if not goodfile(spatialfile, verbose=True): try: del __spatial_cache[key] except KeyError: # pragma: no cover pass return modtime = str(os.path.getmtime(spatialfile)) if modtime not in __spatial_cache.get(key, {}): return log.debug("Retrieving spatial data from cache (%s, date %s)" % key) return __spatial_cache.get(key, {}).get(modtime)
[docs] def store_spatial_in_cache(spatialfile, obsdate, arg1, arg2): """ Store spatial data in the spatial cache. Parameters ---------- spatialfile : str File path to the spatial file obsdate : int Observation date arg1 : str or dict If reading an offsets file, `arg1` will be the filename. If reading a coefficients file, `arg1` will be a dictionary of coefficients. arg2 : numpy.ndarray or dict If reading an offsets file, `arg2` will be an array of offsets. If reading a coefficients file, `arg2` will be a dictionary of secondary coefficients. """ global __spatial_cache key = spatialfile, int(obsdate) log.debug("Storing spatial data in cache (%s, date %s)" % key) __spatial_cache[key] = {} modtime = str(os.path.getmtime(spatialfile)) __spatial_cache[key][modtime] = arg1, arg2
[docs] def offset_xy(date, blue=False): """ Calculate X and Y offsets for each spaxel. The procedure is: 1. Identify calibration file by date and channel 2. Read offsets from file and reform into array 3. Return array Parameters ---------- date : array_like of int Date of observation as a [YYYY, M, D] vector (e.g. [2009,3,23] blue : bool, optional If True, BLUE channel is assumed, otherwise, RED Returns ------- str, numpy.ndarray calibration file used, (25, 2) X and Y offsets of each spaxel (in mm) """ fifi_datapath = os.path.join(os.path.dirname(fifi_ls.__file__), 'data') calfile_default = os.path.join( fifi_datapath, 'spatial_cal', 'poscal_default.txt') if not goodfile(calfile_default, verbose=True): return if not hasattr(date, '__len__') or len(date) != 3 or date[0] < 1e3: log.error("DATE must be a 3-element array [yyyy, m, d]") return obsdate = int(''.join([str(x).zfill(2) for x in date])) df = pandas.read_csv( calfile_default, comment='#', names=['date', 'chan', 'file'], sep=r'\s+', dtype={'date': int}) df = df.sort_values('date') df['chan'].apply(str.lower) channel = 'b' if blue else 'r' df = df[(df['chan'] == channel) & (df['date'] >= obsdate)] if len(df) == 0: log.error("No spatial calibration file found for " "channel %s" % channel) return calfile = os.path.join(fifi_datapath, df['file'].values[0]) offset_data = get_spatial_from_cache(calfile, obsdate) if offset_data is not None: return offset_data if not goodfile(calfile, verbose=True): return offsets = pandas.read_csv( calfile, names=['x', 'y'], dtype=float, sep=r'\s+') offsets = np.array([offsets['x'], offsets['y']]).T store_spatial_in_cache(calfile, obsdate, calfile, offsets) return calfile, offsets
[docs] def get_deltavec_coeffs(header, obsdate, telsim2det=0.842): """ Read the correct delta vector spatial coefficients. Parameters ---------- header : fits.Header obsdate : array_like of int Date of observation as a [YYYY, M, D] vector (e.g. [2009,3,23] telsim2det : float, optional Conversion factor from telsim to detector units Returns ------- dict, dict """ prime_array = header.get('PRIMARAY') channel = header.get('CHANNEL') dichroic = header.get('DICHROIC') coeff_file = os.path.join( os.path.dirname(fifi_ls.__file__), 'data', 'spatial_cal', 'FIFI_LS_DeltaVector_Coeffs.txt') longdate = int(''.join([str(x).zfill(2) for x in obsdate])) coefficients = get_spatial_from_cache(coeff_file, longdate) if coefficients is not None: return coefficients if not goodfile(coeff_file, verbose=True): return df = pandas.read_csv( coeff_file, comment='#', sep=r'\s+', names=['dt', 'ch', 'dch', 'bx', 'ax', 'rx', 'by', 'ay', 'ry']) # The offsets from instrument boresight to telescope boresight try: rows = df[(df['dt'] >= longdate) & (df['ch'] == prime_array[0].lower()) & (df['dch'] == dichroic)].sort_values('dt') except TypeError: rows = [] c1 = {'ax': 0.0, 'bx': 0.0, 'rx': 0.0, 'ay': 0.0, 'by': 0.0, 'ry': 0.0} c2 = c1.copy() c2['required'] = False if len(rows) == 0: log.error("No boresight offsets found for %s" % longdate) return None else: c1 = rows.iloc[0].to_dict() if (channel == 'RED' and prime_array == 'BLUE') or \ (channel == 'BLUE' and prime_array == 'RED'): # The offsets between primary and secondary arrays rows2 = df[(df['dt'] >= longdate) & (df['ch'] == channel[0].lower()) & (df['dch'] == dichroic)].sort_values('dt').reset_index() c2 = rows2.loc[0].to_dict() c2['required'] = True for k in c1.keys(): if k[0] in ['a', 'b']: c1[k] *= telsim2det c2[k] *= telsim2det store_spatial_in_cache(coeff_file, longdate, c1, c2) return c1, c2
[docs] def calculate_offsets(hdul, obsdate=None, flipsign=None, rotate=False): """ Calculate X and Y spatial offsets. Parameters ---------- hdul : fits.HDUList obsdate : array_like of int, optional flipsign : bool, optional rotate : bool, optional If True, rotate by detector angle to set N up Returns ------- fits.HDUList HDUList updated with 'XS', 'YS' spatial offsets, and 'RA', 'DEC' equatorial coordinates. """ result = fits.HDUList(fits.PrimaryHDU(header=hdul[0].header)) header = result[0].header outname = os.path.basename(header.get('FILENAME', 'UNKNOWN')) outname = outname.replace('WAV', 'XYC') header['HISTORY'] = 'XY offsets added' hdinsert(header, 'PRODTYPE', 'spatial_calibrated') hdinsert(header, 'FILENAME', outname) channel = header.get('CHANNEL') angle = np.radians(header.get('DET_ANGL', 0)) if obsdate is None: obsdate = header.get('DATE-OBS') if not isinstance(obsdate, str) or '-' not in obsdate: log.error("DATE-OBS not found in file header") return try: obsdate = [int(x) for x in obsdate[:10].split('-')] except (ValueError, IndexError, TypeError): log.error("Bad DATE-OBS found in file header") return caloffsets = offset_xy(obsdate, blue=channel == 'BLUE') if caloffsets is None: return calfile, xy = caloffsets if len(xy) != 25: log.error("Invalid number of XY offsets") return hdinsert(header, 'SPATFILE', os.path.basename(calfile), comment='Spatial calibration file') telsim = 'TELSIM' in [header.get('OBJ_NAME').strip().upper(), header.get('OBJECT').strip().upper()] rotation = 0.0 if (telsim or rotate) else angle hdinsert(header, 'SKY_ANGL', np.rad2deg(rotation), comment='Sky angle after calibration (deg)') # plate scale in arcsec/mm plate_scale = header.get('PLATSCAL', 0) # Map offset in arcsec dlam_map = header.get('DLAM_MAP', 0) dbet_map = header.get('DBET_MAP', 0) if telsim: # x-y stage, mm dx = header.get('DLAM_MAP') dy = header.get('DBET_MAP') if None in [dx, dy]: dx = header.get('OBSDEC', 0) / 10 # OBSRA dy = header.get('OBSRA', 0) / 10 # OBSDEC xs = dx - xy[:, 1] ys = dy - xy[:, 0] delta_coeffs = None, None else: # Real target, not TELSIM # flip sign on DBET and DLAM if needed (mostly just older data) if flipsign is None: longdate = int(''.join([str(x).zfill(2) for x in obsdate])) flipsign = longdate < 20150501 # May, 2015 log.debug("DLam/DBet sign convention: %s" % ('-' if flipsign else '+')) if flipsign: dbet_map *= -1 dlam_map *= -1 delta_coeffs = get_deltavec_coeffs(header, obsdate) if delta_coeffs is None: log.error('Problem in deltavec coefficients') return xs, ys = None, None ngrating = hdul[0].header.get('NGRATING', 1) for idx in range(ngrating): name = f'FLUX_G{idx}' # if scanpos present, then use array of dlam/dbet instead # of header keyword pname = name.replace('FLUX', 'SCANPOS') if pname in hdul: posdata = hdul[pname].data # broadcast to nramp x nspaxel nramp = len(posdata) dlam_map = np.broadcast_to(posdata['DLAM_MAP'], (25, nramp)).T dbet_map = np.broadcast_to(posdata['DBET_MAP'], (25, nramp)).T if flipsign: dlam_map = dlam_map * -1.0 dbet_map = dbet_map * -1.0 else: nramp = 1 exthdr = hdul[name].header if not telsim: c1, c2 = delta_coeffs indpos_p = exthdr['INDPOS_P'] # Calculate offset from instrument boresight to telescope boresight dx = c1['bx'] + c1['ax'] * indpos_p - c1['rx'] dy = -c1['by'] - c1['ay'] * indpos_p - c1['ry'] if c2['required']: # Calculate spatial offset between primary and secondary array indpos = exthdr['INDPOS'] dx += (c2['bx'] - c1['bx']) \ + (c2['ax'] * indpos - c1['ax'] * indpos_p) dy -= (c2['by'] - c1['by']) \ + (c2['ay'] * indpos - c1['ay'] * indpos_p) # Calculate offsets with detangl xs = (plate_scale * (dx + xy[:, 0]) + np.cos(angle) * dlam_map - np.sin(angle) * dbet_map) ys = -(plate_scale * (dy - xy[:, 1]) + np.sin(angle) * dlam_map + np.cos(angle) * dbet_map) # Rotate by 180 for changed convention in KOSMA translator if not flipsign: xsr = xs * np.cos(-np.pi) - ys * np.sin(-np.pi) ysr = xs * np.sin(-np.pi) + ys * np.cos(-np.pi) xs = xsr ys = ysr # Rotate coords by detangl to set N up if desired if rotate: xsr = xs * np.cos(angle) - ys * np.sin(angle) ysr = xs * np.sin(angle) + ys * np.cos(angle) xs = xsr ys = ysr ra, dec = detector_coordinates_to_ra_dec(xs, ys, header) result.append(hdul[name].copy()) result.append(hdul[name.replace('FLUX', 'STDDEV')].copy()) result.append(hdul[name.replace('FLUX', 'LAMBDA')].copy()) # reshape xs and ys data if necessary exthdr['BUNIT'] = 'arcsec' if xs.ndim > 1: result.append(fits.ImageHDU(xs.reshape((nramp, 1, 25)), header=exthdr, name=name.replace('FLUX', 'XS'))) result.append(fits.ImageHDU(ys.reshape((nramp, 1, 25)), header=exthdr, name=name.replace('FLUX', 'YS'))) exthdr['BUNIT'] = 'hourangle' result.append(fits.ImageHDU(ra.reshape((nramp, 1, 25)), header=exthdr, name=name.replace('FLUX', 'RA'))) exthdr['BUNIT'] = 'degree' result.append(fits.ImageHDU(dec.reshape((nramp, 1, 25)), header=exthdr, name=name.replace('FLUX', 'DEC'))) else: result.append(fits.ImageHDU(xs, header=exthdr, name=name.replace('FLUX', 'XS'))) result.append(fits.ImageHDU(ys, header=exthdr, name=name.replace('FLUX', 'YS'))) exthdr['BUNIT'] = 'hourangle' result.append(fits.ImageHDU(ra, header=exthdr, name=name.replace('FLUX', 'RA'))) exthdr['BUNIT'] = 'degree' result.append(fits.ImageHDU(dec, header=exthdr, name=name.replace('FLUX', 'DEC'))) return result
def detector_coordinates_to_ra_dec(xs, ys, header): """ Convert native XS/YS detector coordinates to RA/DEC Parameters ---------- xs : numpy.ndarray The equatorial X offsets (not reverse longitude). ys : numpy.ndarray The equatorial Y offsets. header : fits.Header The FITS header associated with the coordinates. Returns ------- ra, dec : numpy.ndarray, numpy.ndarray The output detector position equatorial coordinates. """ obsra = header.get('OBSLAM', 0) #* 15 # hourangle to degree obsdec = header.get('OBSBET', 0) sky_angle = header.get('SKY_ANGL', 0) # dbet_map = header.get('DBET_MAP', 0) / 3600 # arcsec to degree # dlam_map = header.get('DLAM_MAP', 0) / 3600 # arcsec to degree if sky_angle != 0: radians = np.radians(sky_angle) cos_a, sin_a = np.cos(radians), np.sin(radians) x = (xs * cos_a) - (ys * sin_a) y = (xs * sin_a) + (ys * cos_a) else: x, y = xs, ys center_ra, center_dec = obsra, obsdec #center_ra -= dlam_map / np.cos(np.radians(center_dec)) #center_dec -= dbet_map # xs is in native RA offsets i.e., should normally subtract but add # in this case. # Herez - change to positive ra = center_ra - (x / (3600 * np.cos(np.radians(center_dec)))) dec = center_dec + (y / 3600) return ra / 15, dec # convert RA back to hourangle
[docs] def spatial_calibrate(filename, obsdate=None, flipsign=None, rotate=False, outdir=None, write=False): """ Apply spatial calibration (x and y offsets). Each data extension will be updated with xs and ys fields, containing spatial coordinates for each pixel in the data array. The input file should have been created by fifi_ls.lambda_calibrate. This procedure handles only ERF coordinates; SIRF mapping is neither detected nor handled properly. The FIFI-LS optics are not perfectly aligned so the pixels do not fall on an even grid. This routine calculates the x, y locations for each pixel and shifts them according to dither positions. The procedure is, for each grating scan extension: 1. Calculate spatial positions in mm (from fifi_ls.offset_xy) 2. Convert positions to arcseconds, using PLATSCAL, dither offsets (DLAM_MAP and DBET_MAP, possibly with a sign-flip on each), and offsets of the secondary array from the primary (from data/secondary_offset.txt). 3. Convert arcsecond offsets to RA and Dec coordinates. 4. Create FITS file and (optionally) write results to disk. For OTF mode data, each input ramp sample has a separate DLAM_MAP and DBET_MAP value, as recorded in the SCANPOS table in the input FITS file. These values are used to calculate the X and Y offsets for each sample. The output XS and YS image extensions have dimension 25 x 1 x n_ramp. The SCANPOS table is not propagated to the output. For all other data, the X and Y offsets are calculated from the DLAM_MAP and DBET_MAP keywords in the header, and are attached to the output as a 25-element array in the XS and YS image extensions. For all data, RA and DEC image extensions are additionally attached, containing the sky coordinates for each spaxel in hours and degrees, respectively. Array dimensions for these extensions match the XS and YS arrays. Parameters ---------- filename : str File to be spatial-calibrated obsdate : array_like of int, optional Date of observation. Intended for files that do not have the DATE-OBS keyword (and value) in the FITS primary header (early files do not). Format is [YYYY,MM,DD]. flipsign : bool, optional If True, DLAM_MAP and DBET_MAP will be multiplied by -1. If False, DLAM_MAP and DBET_MAP will be used as is. If None (default), the observation date will be used to determine what the sign convention should be. rotate : bool, optional If True, rotate by the detector angle to set N up. outdir : str, optional Directory path to write output. If None, output files will be written to the same directory as the input files. write : bool, optional If True, write to disk and return the path to the output file. If False, return the HDUList. The output filename is created from the input filename, with the suffix 'WAV' replaced with 'XYC'. Returns ------- fits.HDUList or str Either the HDUList (if write is False) or the filename of the output file (if write is True). """ if isinstance(outdir, str): if not os.path.isdir(outdir): log.error("Output directory %s does not exist" % outdir) return hdul = gethdul(filename, verbose=True) if hdul is None: return if not isinstance(filename, str): filename = hdul[0].header['FILENAME'] if not isinstance(outdir, str): outdir = os.path.dirname(filename) result = calculate_offsets(hdul, obsdate=obsdate, flipsign=flipsign, rotate=rotate) if result is None: log.error('Offset calculation failed') return if not write: return result else: return write_hdul(result, outdir=outdir, overwrite=True)
def spatial_calibrate_wrap_helper(_, kwargs, filename): return spatial_calibrate(filename, **kwargs)
[docs] def wrap_spatial_calibrate(files, outdir=None, obsdate=None, flipsign=None, rotate=True, allow_errors=False, write=False, jobs=None): """ Wrapper for spatial_calibrate over multiple files. See `spatial_calibrate` for full description of reduction on a single file. Parameters ---------- files : array_like of str paths to files to be spatially calibrated outdir : str, optional Directory path to write output. If None, output files will be written to the same directory as the input files. obsdate : array_like of int, optional Date of observation. Intended for files that do not have the DATE-OBS keyword (and value) in the FITS primary header (early files do not). Format is [YYYY,MM,DD]. flipsign : bool, optional If True, DLAM_MAP and DBET_MAP will be multiplied by -1. If False, DLAM_MAP and DBET_MAP will be used as is. If None (default), the observation date will be used to determine what the sign convention should be. rotate : bool, optional If True, rotate by the detector angle to set N up. allow_errors : bool, optional If True, return all created files on error. Otherwise, return None write : bool, optional If True, write the output to disk and return the filename instead of the HDU. jobs : int, optional Specifies the maximum number of concurrently running jobs. Values of 0 or 1 will result in serial processing. A negative value sets jobs to `n_cpus + 1 + jobs` such that -1 would use all cpus, and -2 would use all but one cpu. Returns ------- tuple of str output filenames written to disk """ if isinstance(files, str): files = [files] if not hasattr(files, '__len__'): log.error("Invalid input files type (%s)" % repr(files)) return clear_spatial_cache() kwargs = {'outdir': outdir, 'obsdate': obsdate, 'write': write, 'flipsign': flipsign, 'rotate': rotate} output = multitask(spatial_calibrate_wrap_helper, files, None, kwargs, jobs=jobs) failure = False result = [] for x in output: if x is None: failure = True else: result.append(x) if failure: if len(result) > 0: if not allow_errors: log.error("Errors were encountered but the following " "files were created:\n%s" % '\n'.join(result)) return clear_spatial_cache() return tuple(result)