Source code for sofia_redux.instruments.hawc.steps.basehawc

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Utility functions that may be used by multiple pipeline steps."""

from astropy import log
import numpy as np
import numba as nb

nb.config.THREADING_LAYER = 'threadsafe'

__all__ = ['calchilo', 'readchop', 'readnod', 'readhwp', 'clipped_mean']


[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def calchilo(chop, nsamp, choptol, good): # pragma: no cover """ Tag data as high, low, or bad. Parameters ---------- chop : array-like of float Chop or nod offset values. nsamp : int Number of samples in the chop array. choptol : float Tolerance for deviation from high or low value. good : array-like of int Flag value for each sample. Must match the dimension of `chop`. Updated in place. Returns ------- chophi : float The high chop value. choplo : float The low chop value. chopeff : float The effective chop value. """ # average middle value hi = 0.0 nnan = 0 for i in range(nsamp): if chop[i] == chop[i]: hi += chop[i] else: nnan += 1 himean = hi / (nsamp - nnan) lomean = himean # initial values for hi / lomin / max himax = 1.0e30 himin = himean lomin = -1.0e30 lomax = himean # sigma clip for 10 iterations nhi = 0 nlo = 0 for i in range(10): hi = 0.0 lo = 0.0 hi2 = 0.0 lo2 = 0.0 nhi = 0 nlo = 0 for j in range(nsamp): if (chop[j] >= himin) and (chop[j] <= himax): hi += chop[j] hi2 += chop[j] * chop[j] nhi += 1 if (chop[j] >= lomin) and (chop[j] <= lomax): lo += chop[j] lo2 += chop[j] * chop[j] nlo += 1 himean = hi / nhi lomean = lo / nlo histd = np.sqrt((hi2 - hi * hi / nhi) / (nhi - 1)) lostd = np.sqrt((lo2 - lo * lo / nlo) / (nlo - 1)) if histd < choptol: histd = choptol if lostd < choptol: lostd = choptol himax = himean + histd himin = himean - histd lomax = lomean + lostd lomin = lomean - lostd chophi = himean choplo = lomean chopeff = (nhi + nlo) / (1.0 * nsamp) for i in range(nsamp): if np.abs(chop[i] - chophi) <= choptol: good[i] = 1 elif np.abs(chop[i] - choplo) <= choptol: good[i] = -1 else: good[i] = 0 return chophi, choplo, chopeff
[docs] def readchop(step, nsamp, chop_tol, chopflag=True): """ Read chop state into high, low, or not used values. Parameters ---------- step : StepParent The calling pipe step, containing data loaded into step.praw. nsamp: int The number of samples in the input data. chop_tol : float Chopper tolerance in arcseconds. chopflag : bool, optional If not set, it is assumed that no chopping was performed. Returns ------- array-like of int The chop state for each sample. 1 indicates high state, -1 indicates low, 0 indicates not used. """ if chopflag: log.debug('Reading Chop signal to find high/low chop values') chopstate = np.zeros(nsamp, dtype=np.int32) chophi, choplo, chopeff = calchilo( step.praw['Chop Offset'], nsamp, chop_tol, chopstate) log.info("Chopper high,low,eff = %lf, %lf, %lf" % (chophi, choplo, chopeff)) else: log.warning('Assuming no chopping has occurred') chopstate = np.ones(nsamp, dtype=np.int32) return chopstate
[docs] def readhwp(step, nsamp, hwp_tol, sampfreq): """ Determine HWP state for all samples. This function determines HWP angles and moving intervals. It also makes sure all HWP intervals have roughly the same length (short intervals are removed). The `hwp_tol` parameter is used to determine when the HWP is moving. If the HWP angle changes by more than this amount between two samples, then the code counts this as a division between HWP angles, and starts a new HWP position. Furthermore, a division will only be accounted for if the HWP angle stays in its new position for more than 2 seconds, in order to avoid considering noisy peaks that might occur as a real change in the HWP angle. Set `hwp_tol` to a number greater than 360 to force all the data to be in the same HWP angle The recommended value is between 0.2 and 0.3 for discrete HWP and 400 for continuous HWP movement. The step.praw['HWP Index'] column is updated in place by this function: it is filled with the angle index found (starting at 0). Parameters ---------- step : StepParent The calling pipe step, containing data loaded into step.praw. nsamp: int The number of samples in the input data. hwp_tol : float HWP angle tolerance in degrees. sampfreq : float The sampling frequency in Hz. Returns ------- hwpstate : array-like The HWP state array contains values 1 (good) and 0 (bad). nhwp : int The number of HWP angles found. """ # Setup Loop and run program hwpangle = step.praw['HWP Angle'] hwpstate = np.zeros(nsamp, dtype=np.int32) step.praw['HWP Index'][:] = -1 # sample indices for HWP position start posbeg = [] # sample indices for HWP position end posend = [] # lower limit for duration of a HWP position mintime = 1.0 minsamp = int(round(mintime * sampfreq)) # Temporary variable hwpangval = 0.0 # Look for regions with hwpangle within 2*hwptol of the first value # current sample index sampi = 0 while sampi < len(hwpangle): # Check if we're in or out of a HWP position if len(posbeg) == len(posend): # I.e. looking for next positon if sampi + minsamp >= len(hwpangle): # stop loop if a new HWP position would have too few samples break # Check if sample minsamp ahead has hwpangle within hwptol if abs(hwpangle[sampi] - hwpangle[sampi + minsamp]) < 1.0 * hwp_tol: # Found a valid HWP position start posbeg.append(sampi) hwpangval = hwpangle[sampi] sampi += minsamp else: # No start found, increase sampi sampi += 1 else: # I.e. looking for end of current HWP position # Check if current sample is within 2*hwptol of start sample if abs(hwpangval - hwpangle[sampi]) > 2.0 * hwp_tol: posend.append(sampi) sampi += 1 # Finish posend list if necessary if len(posbeg) > len(posend): posend.append(len(hwpangle)) # Trim points to make sure all is within hwptol of median # Loop over HWP positions for posi in range(len(posbeg)): hwpangval = np.median(hwpangle[posbeg[posi]:posend[posi]]) # Trim at start while abs(hwpangle[posbeg[posi]] - hwpangval) > hwp_tol: posbeg[posi] += 1 # Trim at end while abs(hwpangle[posend[posi] - 1] - hwpangval) > hwp_tol: posend[posi] -= 1 # Trim points at both ends to make sure all are within hwptol/2 # of median of the first/last minsamp samples. This lowers the # number of bad samples at both ends. for posi in range(len(posbeg)): # Trim at start hwpangval = np.median(hwpangle[posbeg[posi]:posbeg[posi] + minsamp]) while abs(hwpangle[posbeg[posi]] - hwpangval) > hwp_tol / 2.0: posbeg[posi] += 1 # Trim at end hwpangval = np.median(hwpangle[posend[posi] - minsamp:posend[posi]]) while abs(hwpangle[posend[posi] - 1] - hwpangval) > hwp_tol / 2.0: posend[posi] -= 1 # Remove HWP angle positions that last less than half the median duration poslen = [posend[i] - posbeg[i] for i in range(len(posbeg))] lenmed = np.median(poslen) # countdown to avoid index messup for posi in range(len(posbeg) - 1, -1, -1): if poslen[posi] < lenmed / 2: del posbeg[posi] del posend[posi] # Set hwpstate for posi in range(len(posbeg)): hwpstate[posbeg[posi]:posend[posi]] = 1 step.praw['HWP Index'][posbeg[posi]:posend[posi]] = posi # Return nhwp = len(posbeg) s = "Found %d HWP angles:" % nhwp if nhwp < 10: for posi in range(len(posbeg)): s += ' %.1f' % np.median(hwpangle[posbeg[posi]:posend[posi]]) log.info(s) return hwpstate, nhwp
[docs] def readnod(step, nsamp, nod_tol, nodflag=True): """ Read nod state into high, low, or not used values. Parameters ---------- step : StepParent The calling pipe step, containing data loaded into step.praw. nsamp: int The number of samples in the input data. nod_tol : float Nodding tolerance in arcseconds. nodflag : bool, optional If not set, it is assumed that no nodding was performed. Returns ------- array-like of int The nod state for each sample. 1 indicates high state, -1 indicates low, 0 indicates not used. """ if nodflag is True: log.debug('Reading Nod signal to find high/low nod values') nodstate = np.zeros(nsamp, dtype=np.int32) nodhi, nodlo, nodeff = calchilo( step.praw['Nod Offset'], nsamp, nod_tol, nodstate) log.info("Nod high,low,eff = %lf, %lf, %lf" % (nodhi, nodlo, nodeff)) else: log.warning('Assuming no nodding has occurred') nodstate = np.ones(nsamp, dtype=np.int32) return nodstate
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def clipped_mean(data, mask, sigma=5.0): # pragma: no cover """ Compute a sigma-clipped mean of the input data. Mean is computed along the zeroth axis. Data is assumed to have three dimensions. Parameters ---------- data : array-like of float Input data. Must have three dimensions. mask : array-like of int Mask for bad values in input data. Must match data dimensions. 0 indicates good value, 1 indicates bad. Updated in place. sigma : float, optional The sigma-clipping threshold. Returns ------- datamean : array-like The mean array. datastd : array-like The standard deviation array. """ nframe, nrow, ncol = data.shape datamean = np.zeros((nrow, ncol), dtype=np.float64) datastd = np.zeros((nrow, ncol), dtype=np.float64) for i in range(nrow): for j in range(ncol): sumx = 0. sumx2 = 0. sumn = 0 for k in range(nframe): if mask[k, i, j] == 0: sumx += data[k, i, j] sumx2 += data[k, i, j] * data[k, i, j] sumn += 1 total = sumn + 1 mean = 0 std = 0 while sumn < total: if sumn > 1: mean = sumx / sumn std = np.sqrt((sumx2 - sumx * sumx / (1.0 * sumn)) / (1.0 * sumn - 1)) elif sumn <= 1: break total = sumn sumx = 0. sumx2 = 0. sumn = 0 for k in range(nframe): if np.abs(data[k, i, j] - mean) <= sigma * std: sumx += data[k, i, j] sumx2 += data[k, i, j] * data[k, i, j] sumn += 1 else: mask[k, i, j] = 1 datamean[i, j] = mean datastd[i, j] = std / np.sqrt(total) return datamean, datastd