Source code for sofia_redux.instruments.fifi_ls.get_badpix

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from datetime import datetime
import os

from astropy import log
from astropy.io import fits
from astropy.time import Time
import numpy as np
import pandas

from sofia_redux.instruments import fifi_ls
from sofia_redux.toolkit.utilities import hdinsert, goodfile

__all__ = ['clear_badpix_cache', 'get_badpix_from_cache',
           'store_badpix_in_cache', 'read_defaults_table',
           'get_badpix']

__badpix_cache = {}


[docs] def clear_badpix_cache(): """ Clear all data from the badpix cache. """ global __badpix_cache __badpix_cache = {}
[docs] def get_badpix_from_cache(badpixfile): """ Retrieves bad pixel masks or default file from the badpix cache. Checks to see if the file still exists, can be read, and has not been altered since last time. If the file has changed, it will be deleted from the cache so that it can be re-read. Parameters ---------- badpixfile : str File path to the badpix file Returns ------- badpix : numpy.ndarray or pandas.DataFrame Bad pixel mask or defaults table """ global __badpix_cache if badpixfile not in __badpix_cache: return if not goodfile(badpixfile): try: del __badpix_cache[badpixfile] except KeyError: # pragma: no cover # could happen in race conditions in parallel processing pass return modtime = str(os.path.getmtime(badpixfile)) if modtime not in __badpix_cache.get(badpixfile, {}): return log.debug("Retrieving data from badpix file (%s) in cache" % badpixfile) return __badpix_cache.get(badpixfile, {}).get(modtime)
[docs] def store_badpix_in_cache(badpixfile, badpix): """ Store badpix data in the badpix cache. Parameters ---------- badpixfile : str File path to the badpix file badpix : numpy.ndarray or pandas.DataFrame Bad pixel mask or defaults table """ global __badpix_cache log.debug("Storing data from badpix (%s) in cache" % badpixfile) __badpix_cache[badpixfile] = {} modtime = str(os.path.getmtime(badpixfile)) __badpix_cache[badpixfile][modtime] = badpix
[docs] def read_defaults_table(): """ Read the badpix defaults table. """ datapath = os.path.join(os.path.dirname(fifi_ls.__file__), 'data') default_file = os.path.join( datapath, 'badpix_files', 'badpix_default.txt') defaults_table = get_badpix_from_cache(default_file) if defaults_table is not None: return defaults_table log.debug("Reading badpix defaults file") if not goodfile(default_file, verbose=True): msg = "Could not read badpix default file: {}".format(default_file) log.error(msg) raise ValueError(msg) table = pandas.read_csv( default_file, comment='#', sep=r'\s+', names=['date', 'channel', 'filename'], dtype={'date': int}, converters={'filename': lambda x: os.path.join(datapath, x), 'channel': lambda x: str(x).upper().strip()}) store_badpix_in_cache(default_file, table) return table
[docs] def get_badpix(header, filename=None): """ Retrieve bad pixel data. Badpix files are identified by a configuration file called badpix_default.txt in the data/badpix_files directory. This file must contain 3 columns: end date (YYYYMMDD), channel (b1, b2, or r), and filepath (relative to the data directory). The procedure is: 1. Identify badpix file, unless override is provided 2. Read badpix data from file 3. Return badpix array Parameters ---------- header : fits.Header FITS header from which to determine badpix file. Will be updated with BDPXFILE keyword. filename : str, optional Direct path to a badpix file (overrides header logic) Returns ------- numpy.ndarray (n_values, (spexel number, spaxel_number)) """ if isinstance(filename, str) and goodfile(filename, verbose=True): badpix_file = filename[:] else: if not isinstance(header, fits.Header): log.error("Invalid header") return dateobs = header.get( 'DATE-OBS', Time(datetime.utcnow(), format='datetime').isot) try: yyyymmdd = int(dateobs.split('T')[0].replace('-', '')) except (ValueError, IndexError): log.warning( "Could not determine DATE-OBS - using most recent file") yyyymmdd = 88888888 channel = header.get('CHANNEL', 'UNKNOWN').strip().upper() channel = 'B' if channel == 'BLUE' else 'R' table = read_defaults_table() badpix_file = table.loc[table[ (table['channel'] == channel) & (table['date'] >= yyyymmdd)]['date'].idxmin()].filename log.debug("Using badpix file %s" % badpix_file) data = get_badpix_from_cache(badpix_file) if data is not None: if isinstance(header, fits.Header): hdinsert(header, 'BDPXFILE', os.path.basename(badpix_file)) return data log.debug("Loading badpix file %s" % badpix_file) if isinstance(header, fits.Header): hdinsert(header, 'BDPXFILE', os.path.basename(badpix_file)) try: data = pandas.read_csv( badpix_file, names=['spaxel', 'spexel'], dtype={'spaxel': int, 'spexel': int}, comment='#', sep=r'\s+').values except ValueError: msg = "Invalid badpix file: {}".format(badpix_file) log.error(msg) return if isinstance(data, np.ndarray) and len(data.shape) == 2: # Convert coordinates to 0-based array coordinates data -= 1 # flipping the axis here as it's used for indexing a numpy array # (y, x) ordering, unlike file format data = np.flip(data, axis=1) store_badpix_in_cache(badpix_file, data) return data