# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Data storage class for FITS images and tables."""
from datetime import datetime
import os
import time
import gc
from astropy import log
from astropy.io import fits
import numpy as np
from sofia_redux.instruments.hawc.dataparent import DataParent
from sofia_redux.toolkit.utilities.fits import hdinsert
__all__ = ['DataFits']
[docs]
class DataFits(DataParent):
"""
Pipeline data FITS object.
The data is stored as a list of (multi-dimensional) images and a
similar list of tables. FITS file name and all headers are
stored as well.
"""
# File Name Fit: Regexp expression that fits valid filenames
# will fit .fits or .fts at end of name
filenamefit = r'\.(fits|fts)\Z'
"""str : Regular expression that matches valid FITS file names."""
def __init__(self, filename='', config=None):
"""
Initialize data object and variables.
The parent constructor is not called by this function, since
this class requires different attribute handling than its parent.
Parameters
----------
filename : str, optional
Path to a text file. If specified, data will be loaded
immediately from the file.
config : `configobj.ConfigObj`, dict, str, or list of str, optional
If specified, the configuration will be loaded.
"""
# set up internal variables
self.filename = ''
self.rawname = ''
self.loaded = False
# retrieve config directory location from current file path
# assumes config is in sofia_redux/instruments/hawc/data/config
# and this file is in sofia_redux/instruments/hawc
pipe_path = os.path.dirname(os.path.abspath(__file__))
self.data_path = os.path.join(pipe_path, 'data', '')
self.config_path = os.path.join(pipe_path, 'data', 'config', '')
# set the configuration
self.mode = None
self.config = None
self.config_files = []
self.setconfig(config)
# Image Variables:
# At all times len(imgdata) == len(imgnames) == len(imgheads)
# - If only loadhead() was called or first HDU has no data:
# imghead[0]=header, imgdata[0]=None, imgnames[0]='Primary Header'
# List with image data - data is in array of int/float
self.imgdata = []
# List with image names (all uppercase)
self.imgnames = []
# List with image headers
self.imgheads = []
# Table Variables:
# At all times len(tabdata) == len(tabnames) == len(tabheads)
# List with tables each item is a record array
self.tabdata = []
# List with table names (all uppercase)
self.tabnames = []
# List with table headers
self.tabheads = []
# if file exists, load it
if os.path.exists(filename):
self.load(filename)
elif filename != '':
# user specified a non-existent filename
msg = 'No such file %s' % filename
log.error(msg)
raise IOError(msg)
def __getattr__(self, name):
"""
Get attribute.
Allows access to these attributes:
- header: header of primary HDU
- image: the first image
- table: the first table
From DataParent.__getattr__:
- filenamebegin: filename start
- filenameend: filename end
Parameters
----------
name : str
The attribute to retrieve.
"""
# return data
if name == 'data':
if self.imgdata and self.imgdata[0] is not None:
# return image if available
return self.imgdata[0]
elif self.tabdata:
# else return a table
return self.tabdata[0]
else:
msg = "'%s' object has no data" % type(self).__name__
log.warning(msg)
return None
# return image if it's requested - raise error if no images present
if name == 'image':
if len(self.imgdata) == 0:
msg = "'%s' object has no image data" % type(self).__name__
log.warning(msg)
return None
elif self.imgdata[0] is None:
msg = "'%s' object has no image data" % type(self).__name__
log.warning(msg)
return self.imgdata[0]
# return table if it's requested - raise error if no tables present
if name == 'table':
if len(self.tabdata) == 0:
msg = "'%s' object has no table data" % type(self).__name__
log.warning(msg)
return None
return self.tabdata[0]
# return header if it's requested - raise error if no images present
if name == 'header':
if len(self.imgheads) == 0:
msg = "'%s' object has no header data" % type(self).__name__
log.warning(msg)
return None
return self.imgheads[0]
# run parent function (filenamebegin and filenameend)
return super().__getattr__(name)
def __setattr__(self, name, value):
"""
Set attribute.
Allows setting of these attributes:
- header: header of primary HDU
- image: the first image
- table: the first table
Parameters
----------
name : str
The attribute to set.
value : object
The value to set.
"""
# set the data to image
if name == 'data':
# if it's a table put it in table
if issubclass(value.__class__, np.recarray):
self.table = value
# else assume it's an image
else:
self.image = value
elif name == 'image':
# set the image if it's requested
if len(self.imgdata) > 0:
self.imgdata[0] = value
else:
self.imgdata = [value]
self.imgnames = ['PRIMARY IMAGE']
self.imgheads = [fits.Header()]
self.loaded = True
elif name == 'table':
# set the table if it's requested
if len(self.tabdata) > 0:
self.tabdata[0] = value
else:
self.tabdata = [value]
self.tabnames = ['PRIMARY TABLE']
self.tabheads = [fits.Header()]
self.loaded = True
elif name == 'header':
# set the header if it's requested
if len(self.imgdata) > 0:
self.imgheads[0] = value
else:
self.imgdata = [None]
self.imgnames = ['PRIMARY HEADER']
self.imgheads = [value]
else:
# else pass the command to the parent function
super().__setattr__(name, value)
[docs]
def loadhead(self, filename='', dataname='', hdul=None):
"""
Load and return the primary header of the FITS file given.
This also checks for file existence and type. Only the header
is loaded into the DataFits object; the data is not loaded.
Parameters
----------
filename : str, optional
The name of the file to load. If omitted,
self.filename is used.
dataname : str or int, optional
The EXTNAME value or extension number of the header to be loaded
If such a header is not found, or dataname=='', the first header
is loaded. This option should be used if the main file
information is not in the primary header.
hdul : fits.HDUList, optional
An in-memory HDUList to load, in place of an on-disk file.
"""
# check for file existence, type and get primary header
# set self.filename and filename
if len(filename) > 0:
self.filename = filename
self.rawname = filename
else:
filename = self.filename
# read fits header, checks for existing valid fits file
if hdul is None:
try:
hdus = fits.open(filename)
except (IOError, OSError, IndexError,
TypeError, ValueError) as error:
log.error('LoadHead: FITS read error ' + filename)
raise error
else:
hdus = hdul
# No dataname -> return primary header
if dataname == '':
header = [hdus[0].header]
elif dataname == 'all':
# Retrieve headers from all extensions
header = []
for ext in hdus:
header.append(ext.header)
else:
# Look for correct dataname
try:
header = [hdus[dataname].header]
except (KeyError, ValueError, IndexError):
msg = "loadhead: HDU with EXTNAME=%s not found" % dataname
log.error(msg)
raise ValueError(msg)
# Fill in the header, if necessary fill in data
self.imgheads = header
self.imgdata = [None] * len(header)
try:
self.imgnames = [h['EXTNAME'].upper() for h in header]
except KeyError:
self.imgnames = ['PRIMARY HEADER'] * len(header)
# Update the config for the current file if possible
try:
date = self.getheadval('DATE-OBS', errmsg=False)
date = datetime.strptime(
date.split('.')[0], "%Y-%m-%dT%H:%M:%S")
self.mergeconfig(date=date)
except (KeyError, ValueError, AttributeError):
log.debug('No date found for %s' % filename)
# Set the default pipeline mode from the config
self.mode = self.get_pipe_mode()
if self.mode is not None:
self.mergeconfig(mode=self.mode)
# Fill in the filename
self.filename = filename
if hdul is None:
hdus.close()
[docs]
def load(self, filename='', hdul=None):
"""
Load a FITS file into the data object.
Loads headers, images, and tables from the file on disk.
After loading, self.loaded is set to True.
Parameters
----------
filename : str, optional
The name of the file to load. If omitted, self.filename is used.
hdul : fits.HDUList, optional
An in-memory HDUList to load, in place of an on-disk file.
"""
# Clear file data
self.imgdata, self.imgheads, self.imgnames = [], [], []
self.tabdata, self.tabheads, self.tabnames = [], [], []
# Get filename and file checks
# set self.filename and filename
if len(filename) > 0:
self.filename = filename
self.rawname = filename
else:
filename = self.filename
if hdul is not None:
# directly load provided HDUL
self.loadhead(hdul=hdul)
hdus = hdul
else:
# check for file existence, type and get primary header
# fills first entry into imgdata, imgnames
self.loadhead(filename)
# Read File
hdus = fits.open(filename, memmap=False)
# Collect images / Load them
# Search for ImageHDUs (does not include PrimaryHDU)
imgind = [i for i in range(len(hdus))
if isinstance(hdus[i], fits.ImageHDU)]
# store number of images
imgn = len(imgind)
# get naxis and naxis1 from primary header, check if keywords exist
naxis = 0
naxis1 = 0
try:
naxis = int(self.getheadval('NAXIS'))
if naxis > 0:
naxis1 = self.getheadval('NAXIS1', errmsg=False)
except (KeyError, ValueError, AttributeError):
# KeyError: keyword not found (no keywords available)
log.warning('Load: missing naxis keywords in fits file '
+ filename)
# Check if file has no image (i.e. if naxis==1 and naxis1==0)
# -> No change, primary image stays as it was from loadhead()
if naxis * naxis1 == 0:
log.debug('Load: No image data in first HDU')
else:
# Load first HDU data if there is image data in it
imgn += 1
self.imgdata[0] = hdus[0].data
# Set image name
if 'EXTNAME' in self.imgheads[0]:
self.imgnames[0] = self.imgheads[0]['EXTNAME'].upper()
else:
self.imgnames[0] = 'PRIMARY IMAGE'
# Message with number of images
log.debug('Load: %d image(s) in file %s' % (imgn, filename))
# Load subsequent images
for ind in imgind:
# get name
if 'EXTNAME' in hdus[ind].header.keys():
self.imgnames.append(hdus[ind].header['EXTNAME'].upper())
else:
self.imgnames.append('SECONDARY IMAGE %d' % ind)
# get data and header
self.imgdata.append(hdus[ind].data)
self.imgheads.append(hdus[ind].header)
# Search for BinTableHDUs
tabind = [i for i in range(len(hdus))
if isinstance(hdus[i], fits.BinTableHDU)]
tabn = len(tabind)
# Messages on number of tables
log.debug('Load: %d table(s) in file %s' % (tabn, filename))
# Load all tables
for ind in tabind:
# if table is empty -> warning and skip it
try:
if hdus[ind].data is None or len(hdus[ind].data) == 0:
msg = 'Load: Table in HDU number %d has no data' % ind
msg += ' -> Ignoring this HDU'
log.warning(msg)
continue
except (IndexError, AttributeError, TypeError):
msg = 'Load: Problem loading table in HDU number %d' % ind
msg += ' -> Ignoring this HDU'
log.warning(msg)
continue
# get name
if 'EXTNAME' in hdus[ind].header:
self.tabnames.append(hdus[ind].header['EXTNAME'].upper())
else:
# tables can't be in primary extension, so start secondary
# after index 1
if ind > 1:
self.tabnames.append('SECONDARY TABLE %d' % (ind - 1))
else:
self.tabnames.append('PRIMARY TABLE')
# get data, header
self.tabdata.append(np.rec.array(hdus[ind].data))
self.tabheads.append(hdus[ind].header)
# Update the config for the current file if possible
try:
date = self.getheadval('DATE-OBS', errmsg=False)
date = datetime.strptime(
date.split('.')[0], "%Y-%m-%dT%H:%M:%S")
self.mergeconfig(date=date)
except (KeyError, ValueError, AttributeError):
log.debug('No date found for %s' % filename)
# Set the default pipeline mode from the config
self.mode = self.get_pipe_mode()
if self.mode is not None:
self.mergeconfig(mode=self.mode)
# Close the file
if hdul is None:
hdus.close()
gc.collect()
self.loaded = True
log.debug('Load: loaded fits file')
[docs]
def to_hdulist(self, filename=None, save_tables=True):
"""
Return an astropy HDUList equivalent to the current data object.
Also sets pipeline keywords in headers: PIPEVERS, FILENAME, DATE.
The pipeline version for PIPEVERS is read from self.pipever,
usually set in the DataParent class.
Parameters
----------
filename : str, optional
The file name to store in the FILENAME keyword. If not
provided, self.filename will be used.
save_tables : bool, optional
If not set, binary tables stored in the current object
will not be passed to the output HDUList.
Returns
-------
fits.HDUList
The HDUList containing all requested data.
"""
if filename is None:
filename = self.filename
# update pipeline keywords
self.setheadval('PIPEVERS', DataParent.pipever.replace('.', '_'),
'Pipeline Version')
self.setheadval('FILENAME', os.path.split(filename)[-1])
self.setheadval('DATE', time.strftime('%Y-%m-%dT%H:%M:%S'))
# make the data primary HDU -> List
hdul = []
for i in range(len(self.imgnames)):
if i == 0:
hdui = fits.PrimaryHDU(self.imgdata[i], self.imgheads[i])
else:
hdui = fits.ImageHDU(self.imgdata[i], self.imgheads[i])
hdui.header['EXTNAME'] = (self.imgnames[i].upper(),
'ID of the HDU')
hdul.append(hdui)
# make hdus for tables
if save_tables:
for i in range(len(self.tabnames)):
hdut = fits.BinTableHDU(self.tabdata[i], self.tabheads[i])
hdut.header['EXTNAME'] = (self.tabnames[i].upper(),
'ID of the HDU')
hdul.append(hdut)
# make an HDU list
hdulist = fits.HDUList(hdul)
return hdulist
[docs]
def save(self, filename=None):
"""
Save the data in the object to the specified file.
Existing files are overwritten.
Parameters
----------
filename : str, optional
The file name to store in the FILENAME keyword. If not
provided, self.filename will be used.
"""
# get file name
if filename is None:
filename = self.filename
# make an HDU list
hdulist = self.to_hdulist(filename=filename)
# save the file (produce errors if not successful)
try:
hdulist.writeto(filename, output_verify='fix', overwrite=True)
except (IOError, TypeError) as error:
log.error('Save: Failed to write fits file to ' + filename)
raise error
log.debug('Save: wrote FITS file %s' % filename)
[docs]
def copy(self):
"""
Return a copy of the current object.
Returns
-------
DataFits
"""
# create new object
out = DataFits(config=self.config)
# copy all images
out.imgnames = self.imgnames[:]
out.imgdata = []
out.imgheads = []
for imgi in range(len(self.imgdata)):
if self.imgdata[imgi] is not None:
out.imgdata.append(self.imgdata[imgi].copy())
else:
out.imgdata.append(None)
out.imgheads.append(self.imgheads[imgi].copy())
# copy tables
out.tabnames = self.tabnames[:]
out.tabdata = []
out.tabheads = []
for tabi in range(len(self.tabdata)):
if self.tabdata[tabi] is not None:
out.tabdata.append(self.tabdata[tabi].copy())
else:
out.tabdata.append(None)
out.tabheads.append(self.tabheads[tabi].copy())
# copy filename
out.filename = self.filename
out.rawname = self.rawname
out.loaded = self.loaded
# return new object
return out
[docs]
def mergehead(self, other):
"""
Merge the header of another data object to the existing header.
The merge is between primary headers only.
Header keywords are merged according to the configuration
values specified for the 'headmerge' section. Options are:
FIRST, LAST, MIN, MAX, SUM, OR, AND, CONCATENATE, DEFAULT.
Parameters
----------
other : DataFits
The DataFits to merge from.
"""
# get cards
otherheader = other.header
othercards = otherheader.cards
selfcards = self.header.cards
# add history keywords (no duplicates)
otherhist = [card.value for card in othercards
if card.keyword == 'HISTORY']
selfhist = [card.value for card in selfcards
if card.keyword == 'HISTORY']
for hist in otherhist:
if hist not in selfhist:
hdinsert(self.header, 'HISTORY', hist, after=True)
# Go through keywords listed in headmerge: assume self is first
headmerge = self.config['headmerge']
for key in headmerge.keys():
if key in self.header and key in otherheader:
selfval = self.header[key]
otherval = otherheader[key]
operation = headmerge[key].upper()
if operation == 'LAST':
selfval = otherval
elif operation == 'MIN':
selfval = min(selfval, otherval)
elif operation == 'MAX':
selfval = max(selfval, otherval)
elif operation == 'SUM':
selfval += otherval
elif operation == 'OR':
selfval = selfval | otherval
elif operation == 'AND':
selfval = selfval & otherval
elif operation == 'CONCATENATE':
if ',' in str(selfval):
vlist = str(selfval).split(',')
else:
vlist = [str(selfval)]
if ',' in str(otherval):
olist = str(otherval).split(',')
else:
olist = [str(otherval)]
for oval in olist:
if oval not in vlist:
vlist.append(oval)
selfval = ','.join(sorted(vlist))
elif operation == 'DEFAULT':
if type(selfval) is str:
selfval = 'UNKNOWN'
elif type(selfval) is int:
selfval = -9999
elif type(selfval) is float:
selfval = -9999.0
self.header[key] = selfval
[docs]
def copyhead(self, other, name=None, overwrite=True):
"""
Copy a header into the current object.
This function copies all header keywords, comments,
and history from other. Will overwrite existing cards, unless
overwrite flag is set to False. Exceptions: HISTORY and COMMENT
cards are always appended to the end of the list of such keywords
present in self.
If name is None, the header from the first HDU (self.header,
other.header) will be used.
Parameters
----------
other : DataFits
The object to copy from.
name : str, optional
The name of the extension header to copy.
overwrite : bool, optional
If not set, keywords already present in the current header
will not be overwritten.
"""
if name is None:
head1 = other.header
head2 = self.header
else:
head1 = other.getheader(name)
head2 = self.getheader(name)
try:
nhist = len(head1['history'])
except KeyError:
nhist = 0
try:
ncomm = len(head1['comment'])
except KeyError:
ncomm = 0
for k in head1.keys():
if k in ('COMMENT', 'HISTORY'):
# handle later
pass
elif k in head2.keys():
if overwrite:
head2[k] = (head1[k], head1.comments[k])
else:
head2[k] = (head1[k], head1.comments[k])
for i in range(nhist):
head2.add_history(head1['history'][i])
for i in range(ncomm):
head2.add_comment(head1['comment'][i])
[docs]
def copydata(self, other, dataname):
"""
Copy data into the current object.
Copies data (image or table) from another DataFits object to the
current object. If an object of that name already exists it is
overwritten. Both the data and the header are copied.
Parameters
----------
other : DataFits
The object to copy from
dataname : str
The name of the extension header to copy.
"""
# Check type and presence of data in other object
isimg = False
if dataname.upper() in other.imgnames:
isimg = True
othind = other.imageindex(dataname)
elif dataname.upper() in other.tabnames:
othind = other.tableindex(dataname)
else:
log.error('CopyData: dataname not found')
raise ValueError('dataname not found')
# Check if name exists in local data, delete if it's different type
if dataname.upper() in self.imgnames:
if not isimg:
self.imagedel(dataname)
elif dataname.upper() in self.tabnames:
if isimg:
self.tabledel(dataname)
# Copy / Overwrite object
if isimg:
self.imageset(other.imgdata[othind].copy(), dataname,
other.imgheads[othind].copy())
else:
self.tableset(other.tabdata[othind].copy(), dataname,
other.tabheads[othind].copy())
[docs]
def imageindex(self, imagename=None):
"""
Return the index of an image in the current object.
Given this index, the associated image can be accessed
via self.imgdata[index].
Parameters
----------
imagename: str, optional
The name of the requested image. If not provided,
the first image will be returned.
Returns
-------
int
The image index.
Raises
------
ValueError
If the image is not found.
"""
# empty name -> return first image
if imagename is None:
return 0
# check for valid name
try:
ind = self.imgnames.index(imagename.upper())
except ValueError as error:
msg = 'invalid image name (%s)' % imagename
log.error('Imageindex: ' + msg)
raise error
# return index
return ind
[docs]
def imageget(self, imagename=None):
"""
Return an image.
Parameters
----------
imagename : str, optional
The name of the requested image. The first image is returned
if not provided.
Returns
-------
array-like
The requested image.
"""
# get index
ind = self.imageindex(imagename)
# return image
return self.imgdata[ind]
[docs]
def imageset(self, imagedata, imagename=None, imageheader=None, index=-1):
"""
Set an image.
This should be used to add or replace an image in the current
object. The index flag determines the position of the image
in the image list.
Parameters
----------
imagedata : array-like or None
A multi dimensional array containing the image data
imagename : str or None, optional
The name of the image to set (None for first image)
imageheader : fits.Header
FITS header for the image
index : int, optional
Indicates the position of the image in the image list
Ignored if == -1.
"""
totalindex = len(self.imgdata)
# If index is not set, get a valid index
if index < 0:
if imagename is None:
index = 0
if totalindex > 0:
imagename = self.imgnames[0]
else:
imagename = 'PRIMARY'
elif imagename.upper() in self.imgnames:
index = self.imgnames.index(imagename.upper())
else:
# name given, but not an existing name
if totalindex > 0 and self.imgdata[0] is None:
# add to primary HDU, if primary HDU empty
index = 0
else:
# add to end otherwise
index = totalindex
# re-use header, if none specified
if imageheader is None:
if index < totalindex:
imageheader = self.imgheads[index]
# Set image
if index == 0:
hdu = fits.PrimaryHDU(imagedata, header=imageheader)
else:
hdu = fits.ImageHDU(imagedata, header=imageheader)
if index < totalindex:
# overwriting an existing image
self.imgnames[index] = imagename.upper()
self.imgdata[index] = imagedata
self.imgheads[index] = hdu.header
else:
self.imgnames.append(imagename.upper())
self.imgdata.append(imagedata)
self.imgheads.append(hdu.header)
self.loaded = True
[docs]
def imagedel(self, imagename=None):
"""
Remove an image.
Parameters
----------
imagename : str, optional
The name of the image to delete. The first image will be
deleted if not specified.
"""
# get index
ind = self.imageindex(imagename)
# delete image
del self.imgnames[ind]
del self.imgdata[ind]
del self.imgheads[ind]
[docs]
def tableindex(self, tablename=None):
"""
Return the index of an table in the current object.
Given this index, the associated table can be accessed
via self.tabdata[index].
Parameters
----------
tablename: str, optional
The name of the requested table. If not provided,
the first table will be returned.
Returns
-------
int
The table index.
Raises
------
RuntimeError
If there are no tables present.
ValueError
If the specified table is not found.
"""
# Check if tables are present
if len(self.tabnames) == 0:
msg = 'no tables in data'
log.error('Tableindex: ' + msg)
raise RuntimeError(msg)
# empty name -> return first table
if tablename is None:
return 0
# check for valid name
try:
ind = self.tabnames.index(tablename.upper())
except ValueError as error:
msg = 'invalid table name (%s)' % tablename
log.error('Tableindex: ' + msg)
raise error
# return index
return ind
[docs]
def tableget(self, tablename=None):
"""
Return a table.
Parameters
----------
tablename : str, optional
The name of the requested table. The first table is returned
if not provided.
Returns
-------
array-like
The requested table.
"""
# get index
ind = self.tableindex(tablename)
# return table
return self.tabdata[ind]
[docs]
def tableset(self, tabledata, tablename=None, tableheader=None, index=-1):
"""
Set a table.
This should be used to add or replace a table in the current
object. The index flag determines the position of the table
in the table list.
Parameters
----------
tabledata : array-like or None
A multi dimensional array containing the table data
tablename : str or None, optional
The name of the table to set (None for first table)
tableheader : fits.Header
FITS header for the table
index : int, optional
Indicates the position of the table in the table list
Ignored if == -1.
"""
# If index is not set, get valid index
totalindex = len(self.tabdata)
if index < 0:
if tablename is None:
index = 0
if totalindex > 0:
tablename = self.tabnames[0]
else:
tablename = 'PRIMARY TABLE'
# if table exists - replace
elif tablename.upper() in self.tabnames:
index = self.tabnames.index(tablename.upper())
else:
# add to end otherwise
index = totalindex
# Set table
if index < totalindex:
# overwrite
self.tabnames[index] = tablename.upper()
self.tabdata[index] = tabledata
if tableheader is not None:
self.tabheads[index] = tableheader
else:
self.tabnames.append(tablename.upper())
self.tabdata.append(tabledata)
# make sure tableheader is valid
if tableheader is None:
tableheader = fits.Header()
self.tabheads.append(tableheader)
self.loaded = True
[docs]
def tabledel(self, tablename=None):
"""
Remove a table.
Parameters
----------
tablename : str, optional
The name of the table to delete. The first table will be
deleted if not specified.
"""
# get index
ind = self.tableindex(tablename)
# delete table
del self.tabnames[ind]
del self.tabdata[ind]
del self.tabheads[ind]
[docs]
def tableaddcol(self, colname, array, tablename=None, dtype=None):
"""
Add a column to the table.
If the table under tablename doesn't exist, it is created.
This is intended to be used with single-dimension columns only.
Parameters
----------
colname : str
The new column name.
array : array-like
Values for the new column
tablename : str
The name of the table to add to (None for first table)
dtype : type, optional
Data type for the new column
"""
# make a basic array
array = np.asarray(array)
# set correct data type
if dtype is None:
dtype = array.dtype
# get additional dimension
if len(array.shape) > 1:
newtype = (colname, dtype, array.shape[1])
else:
newtype = (colname, dtype)
# Get table index (-1 if table needs to be created)
tabind = -1
if tablename is None:
if len(self.tabnames) > 0:
tabind = 0
else:
tablename = "Table"
else:
if tablename.upper() in self.tabnames:
tabind = self.tableindex(tablename)
# Make new table if necessary
if tabind < 0:
# If there is no existing table:
# get new table data type
newdtype = np.dtype([newtype])
# Make new table
newtable = np.empty(array.shape[0], dtype=newdtype)
# Add to list of tables
tabind = len(self.tabnames)
self.tabdata.append(newtable)
self.tabnames.append(tablename.upper())
self.tabheads.append(fits.Header())
else:
# table exists: base new table on old table
table = self.tabdata[tabind]
# check if dimension of new value array is correct
if len(table) != len(array):
msg = 'column array len (%d) != table len (%d)' % \
(len(array), len(table))
log.error('TableAddCol: ' + msg)
raise ValueError(msg)
# get new data type for table
newdtype = np.dtype(table.dtype.descr + [newtype])
# make new table
newtable = np.empty(table.shape, dtype=newdtype)
# fill old table values
for field in table.dtype.fields:
newtable[field] = table[field]
# fill new table values
newtable[colname] = array
# copy new table to self.table
self.tabdata[tabind] = newtable
[docs]
def tableaddrow(self, tablerow, tablename=None):
"""
Add a row to a data table.
Parameters
----------
tablerow : array-like
The elements of the row to be added
tablename : str
The name of the table to add to (None for first table)
"""
# Get table
tabind = self.tableindex(tablename)
table = self.tabdata[tabind]
# check if tablerow format is correct (by converting it)
if len(tablerow) != len(table[0]):
log.error('TableAddRow: table row has invalid length')
raise ValueError('table row has invalid length')
try:
tablerow = np.rec.array(tablerow, dtype=table.dtype)
except (ValueError, TypeError):
log.error('TableAddRow: table row has invalid element type')
raise ValueError('table row has invalid element type')
# Add to table
self.tabdata[tabind] = np.insert(table, len(table), tablerow)
[docs]
def tabledelcol(self, colname, tablename=None):
"""
Delete a column of the data table.
Parameters
----------
colname: str or list of str
The name(s) of the column(s) to delete
tablename: str, optional
The name of the table to delete from (None for first table)
"""
# Get table
tabind = self.tableindex(tablename)
table = self.tabdata[tabind]
# Check if colname is valid, make sure it's a list
olddt = table.dtype
if isinstance(colname, str):
colname = [colname]
if isinstance(colname, (list, tuple)):
for c in colname:
if c not in olddt.names:
msg = 'Invalid column name %s' % c
log.error('TableDelCol: ' + msg)
raise ValueError(msg)
else:
msg = "Invalid colname '%s'. Must be string " \
"or list/tuple" % colname
log.error('TableDelCol: ' + msg)
raise ValueError(msg)
# if all columns deleted, clear table
if len(olddt) - len(colname) <= 0:
del self.tabdata[tabind]
del self.tabnames[tabind]
del self.tabheads[tabind]
else:
# remove column(s) from table
# if it's a pyfits.FITS_rec -> Treat specially
# (this was pyfits.fitsrec.FITS_rec)
# test for FITS_rec. ndarray and recarray don't
# support names method
try:
names = table.names
formats = table.columns.formats
dims = table.columns.dims
units = table.columns.units
cols = []
for n, f, d, u in zip(names, formats, dims, units):
if n not in colname:
cols.append(fits.Column(name=n, format=f, dim=d,
unit=u, array=table.field(n)))
tbhdu = fits.BinTableHDU.from_columns(fits.ColDefs(cols))
self.tabdata[tabind] = tbhdu.data
self.tabheads[tabind] = tbhdu.header
except AttributeError:
# assume table is a regular record array
newnames = [n for n in olddt.names if n not in colname]
newtable = table[newnames]
self.tabdata[tabind] = newtable
[docs]
def tabledelrow(self, index, tablename=None):
"""
Delete a row of the data table.
Parameters
----------
index: int
The index of the row to delete.
tablename: str, optional
The name of the table to delete from (None for first table)
"""
# Get table
tabind = self.tableindex(tablename)
table = self.tabdata[tabind]
# Check if index is valid
if index >= len(table):
log.error('TableDelRow: Invalid row index %d' % index)
raise ValueError('Invalid row index %d' % index)
# if table has one row -> clear table
if len(table) < 2:
del self.tabdata[tabind]
del self.tabnames[tabind]
del self.tabheads[tabind]
else:
# remove row from table
self.tabdata[tabind] = np.delete(table, index).copy()
[docs]
def tablemergerows(self, rows):
"""
Merge several table rows into a single row.
Each column is merged according to the rules defined in the
[table] section of the configuration file. Options are:
FIRST, LAST, MIN, MAX, MED, AVG, SUM, WTAVG.
Parameters
----------
rows : record
NumPy record or FITS table rows to merge.
Returns
-------
record
The merged row.
Raises
------
AttributeError
If input rows have incorrect data type.
"""
# check if rows has same format as table
try:
rows.dtype
except AttributeError as error:
msg = 'input rows are incorrect data type'
log.error('TableMergeRows: %s' % msg)
raise error
# if _newdtype != table.dtype :
# msg = 'input rows have different format than table'
# log.error('TableMergeRows: %s' % msg)
# raise TypeError(msg)
# make output table row (copy from first row)
outrow = (rows[0:1].copy())[0]
# run through columns and merge values
for colname in rows.dtype.names:
# get merge function
try:
funct = self.config['table'][colname.lower()].lower()
except (IndexError, ValueError, KeyError):
# if keyword is not available
log.warning('TableMergeRows: Missing table '
'merge entry for '
'column -%s- returning first '
'row value' % colname)
funct = 'first'
# Try to run the function
# Comment: float() is necessary, otherwise values get messed up
try:
if funct == 'first':
pass
elif funct == 'last':
outrow[colname] = rows[-1][colname]
elif funct == 'min':
outrow[colname] = float(np.nanmin(rows[colname]))
elif funct == 'max':
outrow[colname] = float(np.nanmax(rows[colname]))
elif funct == 'med':
outrow[colname] = float(np.nanmedian(rows[colname]))
elif funct == 'avg':
outrow[colname] = float(np.nanmean(rows[colname]))
elif funct == 'sum':
outrow[colname] = float(np.nansum(rows[colname]))
elif funct == 'wtavg':
tmp = float(np.nansum(rows[colname] * rows['Samples']))
tmpsum = float(np.nansum(rows['Samples']))
if tmpsum > 0:
outrow[colname] = tmp / tmpsum
else:
outrow[colname] = tmp * 0.0
else:
log.warning('TableMergeRows: Unknown operation -'
+ funct + '- for column -' + colname
+ '- returning first row value')
except (NameError, TypeError, ValueError, KeyError):
# if unsuccessful return error
log.warning('TableMergeRows: Error in %s( %s ) - '
'returning first row value' %
(funct, colname))
return outrow
[docs]
def tablemergetables(self, tables):
"""
Return a new table containing data merged from the input table(s).
Columns are merged according to the rules defined in the [table]
section of the configuration file. Options are:
FIRST, LAST, MIN, MAX, MED, AVG, SUM, WTAVG.
Note that each table is assumed to have a single
row of data. If you need to merge rows of data, use
tablemergerows().
Parameters
----------
tables : list of BinTableHDU data
Tables should be NumPy records or FITS tables.
Returns
-------
BinTableHDU
The merged table.
"""
ntable = len(tables)
try:
# FITS tables
names = tables[0].names
formats = tables[0].columns.formats
dims = tables[0].columns.dims
units = tables[0].columns.units
except AttributeError:
msg = 'TableMergeTables is only available for FITS tables'
log.error(msg)
raise ValueError(msg)
if ntable == 1:
msg = 'TableMergeTables: Only 1 table passed'
log.debug(msg)
tbhdu = fits.BinTableHDU(tables[0])
return tbhdu
# loop through all tables, make sure names, formats, dims, and units
# are the same among them.
for i in range(1, ntable):
if (names != tables[i].names
or formats != tables[i].columns.formats
or dims != tables[i].columns.dims
or units != tables[i].columns.units):
log.error('TableMergeTables: columns differ for merging')
raise ValueError
cols = []
for n, f, d, u in zip(names, formats, dims, units):
# get merge function
try:
funct = self.config['table'][n.lower()].lower()
except (KeyError, ValueError):
# if keyword is not available
log.warning('TableMergeTables: Missing table '
'merge entry for column -%s- '
'returning first row value' % n)
funct = 'first'
try:
if funct == 'first':
tmp = tables[0][n]
elif funct == 'last':
tmp = tables[-1][n]
elif funct == 'min':
tmp = np.nanmin([a[n] for a in tables], axis=0)
elif funct == 'max':
tmp = np.nanmax([a[n] for a in tables], axis=0)
elif funct == 'med':
tmp = np.nanmedian([a[n] for a in tables], axis=0)
elif funct == 'avg':
tmp = np.nanmean([a[n] for a in tables], axis=0)
elif funct == 'sum':
tmp = np.nansum([a[n] for a in tables], axis=0)
elif funct == 'wtavg':
tmp = np.nansum([a[n] * a['Samples']
for a in tables], axis=0)
tmpsum = np.nansum([a['Samples']
for a in tables], axis=0)
tmp = tmp / tmpsum
else:
log.warning('TableMergeTables: Unknown '
'operation -' + funct + '- for column -'
+ n + '- returning first row value')
tmp = tables[0][n]
except (NameError, TypeError, ValueError, KeyError):
# if unsuccessful return error
log.warning('TableMergeRows: Error in %s( %s ) - '
'returning first row value' %
(funct, n))
tmp = tables[0][n]
cols.append(fits.Column(name=n, format=f, dim=d,
unit=u, array=tmp))
tbhdu = fits.BinTableHDU.from_columns(fits.ColDefs(cols))
return tbhdu
[docs]
def getheadval(self, key, dataname='', errmsg=True):
"""
Get a header value.
Returns the value of the requested key from
the header. If the keyword is present in the [Header] section
of the configuration that value is returned instead. In case that
value from the configuration file is itself a header key, the value
stored under that key is returned. If the key can not be found an
KeyError is produced and a warning is issued.
Parameters
----------
key : str
The header key to retrieve.
dataname : str, optional
The header extension to retrieve from. First, if not specified.
errmsg : bool, optional
If set, an error message will be raised if the keyword could
not be found.
Returns
-------
str, int, float, or bool
The header value.
Raises
------
KeyError
If the key is not found, and errmsg = True.
"""
val = None
# Look in the config
try:
# get the value
val = self.config['header'][key.upper()]
# Check if it's optional header replacement i.e. starts with '?_'
if val[:2] in ['?_', '? ', '?-']:
# if key is not in the header -> use key name
# under value instead
if key not in self.header:
log.info('Getheadval: Using %s keyword for %s' %
(val[2:].upper(), key))
key = val[2:].upper()
val = None
elif val[0].isupper() and val[:2] not in ['T ', 'F ']:
# it's a Header replacement (but not T/F)
log.info('Getheadval: Using %s value for %s' %
(val.upper(), key))
key = val.upper()
val = None
else:
# make it a pyfits.Card then get value and comment
card = fits.Card()
card = card.fromstring(key.upper() + ' = ' + val)
log.info('Getheadval: Setting %s to %s' % (key, val))
val = card.value
# update value in header
self.setheadval(key, val, card.comment,
dataname=dataname)
except KeyError:
# if key is not in config - continue
pass
except TypeError:
# if config is not yet loaded - return error
log.warning('GetHeadVal: Missing Configuration')
# Look in the header
if val is None:
# get the header from dataname
header = self.getheader(dataname)
# get value from header
try:
val = header[key]
except KeyError:
# if keyword is not found
msg = 'Missing %s keyword in header %s' % (key, dataname)
if errmsg:
log.error('GetHeadVal: %s' % msg)
raise KeyError(msg)
# if Value is a pyfits.core.Undefined i.e. no keyword
if isinstance(val, fits.Undefined):
msg = 'Missing value for key = %s - returning empty string' % key
log.warning('GetHeadVal: %s' % msg)
val = ''
return val
[docs]
def setheadval(self, key, value, comment=None, dataname=''):
"""
Set a FITS header keyword.
If no header exists, it will be created and added as the
primary header.
Parameters
----------
key : str
The header key to set.
value : str, int, float, or bool
The keyword value to set.
comment : str, optional
The comment to set for the keyword.
dataname : str, optional
The header extension to retrieve from. First, if not specified.
"""
# If no header exists, make a first empty image
if len(self.imgheads) == 0:
self.imgheads = [fits.PrimaryHDU().header]
self.imgdata = [None]
self.imgnames = ['PRIMARY']
# Set value into hdr
hdr = self.getheader(dataname=dataname)
if key == 'HISTORY':
hdinsert(hdr, key, value, after=True)
else:
hdinsert(hdr, key, value, comment=comment)
[docs]
def delheadval(self, key, dataname=''):
"""
Delete one or more FITS keywords.
Parameters
----------
key : str or list of str
The header key to delete.
dataname : str, optional
The header extension to retrieve from. First, if not specified.
Raises
------
ValueError
If the key is badly specified.
"""
if isinstance(key, (list, tuple)):
for k in key:
self.delheadval(k, dataname)
elif isinstance(key, str):
header = self.getheader(dataname=dataname)
if key in header:
del header[key]
else:
msg = 'Invalid key (%s). Must be a str, ' \
'list, or tuple' % repr(key)
log.error('DelHeadVal: ' + msg)
raise ValueError(msg)