Source code for sofia_redux.toolkit.utilities.fits

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from contextlib import contextmanager
import os

from astropy import log
from astropy.io import fits
import configobj
import numpy as np

from sofia_redux.toolkit.utilities.func import goodfile, date2seconds

__all__ = ['hdinsert', 'add_history', 'add_history_wrap',
           'robust_read', 'getheader', 'getdata', 'header_to_chararray',
           'chararray_to_header', 'gethdul', 'write_hdul',
           'get_key_value', 'set_log_level',
           'order_headers', 'merge_headers']

kref = 'AAAAAAAA'  # Keywords reference key marker (via redux)
href = 'BBBBBBBB'  # HISTORY reference key marker (via redux)


[docs] def hdinsert(header, key, value, comment=None, refkey='HISTORY', after=False): """ Insert or replace a keyword and value in the header Note that the insert method of astropy.io.fits.header.Header appears to be broken in cases where a value is to be inserted after a certain keyword and before HISTORY. In these cases header.insert() inserts the new keyword after the first history card... which is bad. Therefore, we do the insertion using a more manual indexing. In cases where both `refkey` and `after` are supplied, the `after` parameter will be evaluated first. Parameters ---------- header : astropy.io.fits.header.Header key : str keyword name to insert or replace value The value for the keyword comment : str, optional If provided a new comment will be used. Otherwise, the previous comment will be used when a keyword value is being replaced. refkey : str or int or None or 2-tuple, optional The keyword before which the keyword should be inserted in cases where the new keyword does not yet exist in the header. By default new keywords will be placed before the first HISTORY card. If None, use the end of the header. after : bool If True, insert after `refkey` rather than before. Returns ------- None The header will be modified in place """ if not isinstance(header, fits.header.Header): log.error("Invalid header") return if key in header and comment is None: comment = header.comments[key] if refkey not in header or key in header: header[key] = value, comment else: header.insert(refkey, (key, value, comment), after=after)
[docs] def add_history(header, msg, prefix=None, refkey=href): """ Add HISTORY message to a FITS header before the pipeline. Parameters ---------- header : astropy.io.fits.header.Header FITS header used to write the HISTORY message msg : str the HISTORY message prefix : str, optional Prefix message with this string. refkey : str, optional A reference key to insert the message after. """ if header is not None: s = '' if prefix is None else str(prefix) + ': ' if refkey not in header: refkey = 'HISTORY' hdinsert(header, 'HISTORY', s + msg, refkey=refkey)
[docs] def add_history_wrap(prefix): """ Make a function to add HISTORY messages to a header, prefixed with a string. Parameters ---------- prefix : str The message to prefix. Returns ------- function """ def wrapper(header, message): add_history(header, message, prefix=prefix) return wrapper
[docs] def robust_read(filename, data_hdu=0, header_hdu=0, extension=None, verbose=True): """ Retrieve the data and header from a FITS file Does as many checks as possible to fix a potentially broken FITS file. Feel free to add other stuff. Parameters ---------- filename : str path to a FITS file header_hdu : int, optional Header Data Unit to retrieve header. Default is 0 (Primary) data_hdu : int, optional Header Data Unit to retrieve data. Default is 0 (Primary) extension : int, optional If supplied, overrides both `header_hdu` and `data_hdu` verbose : bool, optional If True, output log messages on error Returns ------- numpy.ndarray, astropy.io.fits.header.Header The data array and header of the FITS file as a 2-tuple """ hdul, dataout, headout = None, None, None if not goodfile(filename, verbose=verbose): return dataout, headout if isinstance(extension, int): data_hdu = extension header_hdu = extension try: hdul = fits.open( filename, mode='readonly', ignore_missing_end=True) hdul.verify('silentfix') for hidx in [data_hdu, header_hdu]: if not isinstance(hidx, int) or (hidx < 0) or (hidx >= len(hdul)): log.warning("HDU %s does not exist (data)" % data_hdu) return dataout, headout dataout = hdul[data_hdu].data headout = hdul[header_hdu].header except ValueError as err: if 'ASCII' in str(err): header = hdul[header_hdu].header try: for key in header.keys(): try: _ = header[key] except fits.verify.VerifyError: header.remove(key) header = header.copy() header[key] = 'UNKNOWN' headout = header except (Exception, ValueError): pass except (Exception, AttributeError): pass finally: try: hdul.close() except (AttributeError, Exception): pass return dataout, headout
[docs] def getheader(filename, hdu=0, verbose=True): """ Returns the header of a FITS file Uses robust_read to extract the header. Parameters ---------- filename : str path to a FITS file hdu : int, optional Header Data Unit. Primary HDU is 0 (default) verbose : bool, optional If True, output log messages on error Returns ------- astropy.io.fits.header.Header """ if not goodfile(filename, verbose=verbose): return _, header = robust_read(filename, header_hdu=hdu) if not isinstance(header, fits.header.Header): log.error("Could not read FITS header: %s" % filename) return return header
[docs] def getdata(filename, hdu=0, verbose=True): """ Returns the data from a FITS file Uses robust_read to extract the data. Parameters ---------- filename : str path to a FITS file hdu : int, optional Header Data Unit. Primary HDU is 0 (default) verbose : bool, optional If True, output log messages on error Returns ------- numpy.ndarray """ if not goodfile(filename, verbose=verbose): return data, _ = robust_read(filename, data_hdu=hdu) if not isinstance(data, (fits.fitsrec.FITS_rec, np.ndarray)): log.error("Could not read FITS data: %s" % filename) return return data
[docs] def header_to_chararray(header): """ Convert a FITS header to an array of strings For the weirdness of FIFI-LS Parameters ---------- header : astropy.io.fits.header.Header Returns ------- np.ndarray """ if not isinstance(header, fits.header.Header): log.error("Invalid header") return c = repr(header).split('\n') c = [x.ljust(80)[:80] for x in c] return np.char.array([c], itemsize=80, unicode=True)
[docs] def chararray_to_header(chararray): """ Convert an array of strings to a FITS header For the weirdness of FIFI-LS Parameters ---------- chararray : np.ndarray Returns ------- astropy.io.fits.header.Header """ if not isinstance(chararray, np.ndarray): log.error("Invalid chararray") return if chararray.ndim not in [1, 2]: log.error("Invalid chararray features") return try: c = chararray[0] if chararray.ndim == 2 else chararray c = ''.join([x.ljust(80)[:80] for x in c]) h = fits.header.Header.fromstring(c) return h except (ValueError, TypeError, AttributeError) as err: log.error(str(err)) return
[docs] def gethdul(filename, verbose=True): """ Returns the HDUList from a FITS file Performs a few additional sanity checks Parameters ---------- filename : str or HDUList or array of HDU Path to a FITS file verbose : bool, optional If True, output log messages on error. Returns ------- astropy.io.fits.HDUList """ if isinstance(filename, fits.HDUList): return filename if (isinstance(filename, list) and len(filename) > 0 and isinstance(filename[0], fits.PrimaryHDU)): return fits.HDUList(filename) if not goodfile(filename, verbose=verbose): return try: hdul = fits.open(filename, ignore_missing_end=True) hdul.verify('silentfix') except (AttributeError, Exception, fits.VerifyError) as err: if verbose: log.error("Unable to read FITS file %s: %s" % (str(err), filename)) return return hdul
[docs] def write_hdul(hdul, outdir=None, overwrite=True): """ Write a HDULists to disk. Output filename is extracted from the primary header (extension 0) of each HDUList from the FILENAME keyword. Parameters ---------- hdul : astropy.io.fits.HDUList outdir : str, optional Name of the output path. Default is the current working directory. overwrite : bool, optional If False, will fail if the output filename already exists. Returns ------- str Filename written to disk """ outfile = hdul[0].header.get('FILENAME') if outfile is None: log.error("Missing FILENAME in HDUList primary header") return if outdir is not None: if isinstance(outdir, str): outfile = os.path.join(outdir, outfile) else: raise ValueError('Invalid output directory type.') if os.path.isfile(outfile): if os.path.exists(outfile): if not overwrite: log.error("%s already exists - will not overwrite" % outfile) return try: os.remove(outfile) except (IOError, OSError): # pragma: no cover log.error("Unable to remove file %s" % outfile) return try: hdul.writeto(outfile, overwrite=True, output_verify='silentfix') except (fits.VerifyError, OSError, Exception) as err: log.error("Unable to write to %s: %s" % (outfile, str(err))) return finally: hdul.close() return outfile
[docs] def get_key_value(header, key, default='UNKNOWN'): """ Get a key value from a header. Parameters ---------- header : `astropy.io.fits.Header` FITS header. key : str Key to retrieve. default : Value to return if not retrievable from the header. Returns ------- str, int, float, or bool FITS keyword value; default if not found. """ try: value = header[key] if isinstance(value, str): value = str(value).strip().upper() except (KeyError, TypeError): value = default return value
[docs] @contextmanager def set_log_level(level): """ Context manager to temporarily set the log level. Parameters ---------- level : str or int Logging level as defined in the `logging` module. """ orig_level = log.level log.setLevel(level) try: yield finally: log.setLevel(orig_level)
[docs] def order_headers(headers): """ Order headers based on contents. Return the earliest header and the header list sorted by date. B nods are never returned as the earliest header. Parameters ---------- headers : array_like of fits.Header Returns ------- 2-tuple fits.Header : earliest header list of fits.Header : ordered headers """ nhead = len(headers) if nhead == 1: return headers[0].copy(), [headers[0]] dateobs, nodbeam = [], [] for header in headers: dateobs.append( date2seconds( str(header.get('DATE-OBS', default='3000-01-01T00:00:00')))) nodbeam.append(str(header.get('NODBEAM', 'UNKNOWN')).upper()) # get the earliest A header as the basehead # Otherwise, just use the earliest header idx = np.argsort(dateobs) if 'A' in nodbeam: a_test = np.where(np.array(nodbeam)[idx] == 'A')[0] earliest_a = int(a_test[0]) else: earliest_a = 0 earliest_idx = idx[earliest_a] basehead = headers[earliest_idx].copy() # sort all headers by date-obs, including the basehead # This is used to get 'last' values, whether # in A or B nod sorted_headers = [headers[i] for i in idx] return basehead, sorted_headers
[docs] def merge_headers(headers, keyword_configuration, reference_header=None): """ Merge input headers. Keyword handling is defined in the input configuration file, in ConfigObj format, as a list of keyword = value pairs. Allowed values are: - LAST : take the value from the latest file, by DATE-OBS - CONCATENATE : join unique string values with a ',' - DEFAULT : set to 'UNKNOWN' for strings, -9999 for numbers - OR : logical-or boolean values - AND : logical-and boolean values - MEAN : mean of all values - SUM : sum of all values Parameters ---------- headers : list of astropy.io.fits.Header Headers to check. keyword_configuration : str, dict, or ConfigObj Configuration file name or object. reference_header : astropy.io.fits.Header, optional If provided, is used as the base header to be updated. If not provided, the first header (by DATE-OBS) is used as the reference. Returns ------- astropy.io.fits.Header Combined header. """ try: merge_conf = configobj.ConfigObj(keyword_configuration) except configobj.ConfigObjError as error: msg = 'Error while loading header merge configuration file.' log.error(msg) raise error basehead, all_headers = order_headers(headers) if reference_header is not None: basehead = reference_header.copy() for key, operation in merge_conf.items(): values = [h[key] for h in all_headers if key in h] if len(values) == 0: continue value = values[0] if operation == 'LAST': value = values[-1] elif operation == 'SUM': try: value = np.sum(values) except TypeError: log.warning(f'Key merge SUM operation is invalid for {key}') continue elif operation == 'MEAN': try: value = np.mean(values) except TypeError: log.warning(f'Key merge MEAN operation is invalid for {key}') continue elif operation == 'AND': try: if not np.all([type(v) is bool for v in values]): raise TypeError value = np.all(values) except TypeError: log.warning(f'Key merge AND operation is invalid for {key}') continue elif operation == 'OR': try: if not np.all([type(v) is bool for v in values]): raise TypeError value = np.any(values) except TypeError: log.warning(f'Key merge OR operation is invalid for {key}') continue elif operation == 'CONCATENATE': split_list = list() for v in values: split_list.extend(str(v).split(',')) unique = set(split_list) value = ','.join(sorted(unique)) elif operation == 'DEFAULT': test_val = values[0] if type(test_val) is str: value = 'UNKNOWN' elif type(test_val) is int: value = -9999 elif type(test_val) is float: value = -9999.0 elif operation == 'FIRST': value = values[0] else: log.warning(f'Invalid key merge operation {operation}') continue # comments are not handled -- it is assumed these keys # are already in the base header hdinsert(basehead, key, value) return basehead