diff --git a/draco/util/tools.py b/draco/util/tools.py index 688edd1b2..d1f708f26 100644 --- a/draco/util/tools.py +++ b/draco/util/tools.py @@ -3,11 +3,16 @@ Miscellaneous tasks should be placed in :py:mod:`draco.core.misc`. """ +import warnings + import numpy as np # Keep this here for compatibility -from caput.tools import invert_no_zero # noqa: F401 +from caput.tools import invert_no_zero from numpy.lib.recfunctions import structured_to_unstructured +from scipy import linalg as la +from scipy.signal import oaconvolve +from scipy.sparse import dia_array from ._fast_tools import _calc_redundancy @@ -534,3 +539,166 @@ def window_generalised(x, window="nuttall"): w = (a[:, np.newaxis] * np.cos(t)).sum(axis=0) return np.where((x >= 0) & (x <= 1), w, 0) + + +def arPLS_1d(y, mask=None, lam=1e2, end_frac=1e-2, max_iter=1000): + r"""Use arPLS to estimate a signal baseline. + + 1D implementation of symmetrically reweighted penalized least squares. + Solves for a signal baseline in the presence of high power outliers by + heavily weighting values below the signal and minimizing the weights + of values above the signal. + + Notes + ----- + arPLS solves the following linear system given signal :math: `\mathtt{y}` + .. math:: (W + \lambdaD_{d}^{T}D_{d})z = Wy + where the weighting function is given by + .. math:: w_{i} = (1 + \exp(2\sigma^{-1}(r_{i} - (2\sigma - \mu))))^{-1} + where :math:`\mathtt{r_{i}}` is the difference :math:`\mathtt{y_{i} - z_{i}}` + and :math:`\mathtt{\mu}` and :math:`\mathtt{\sigma}` are the mean and standard + deviation of the negative values in :math:`\mathbf{r}`. + The solver runs until `max_iter` iterations, or until the fractional mean + change in weights is less than `end_frac`. + + Reference: + https://www.sciencedirect.com/science/article/pii/S1090780706002266 + + + Properties + ---------- + y : np.ndarray + 1D signal array + mask : np.ndarray, optional + 1D boolean array of same length as `y`. Default is None. + lam : float, optional + Scaling parameter used to control importance of smoothness vs. + fit. High value prioritizes smoothness of the baseline estimate. + Default is 1e2. + end_frac : float, optional + Convergeance ratio `norm(delta_w) / norm(w)`. + max_iter : int, optional + Maximum number of iterations to run, even if the convergance + criteria is not met. + + Returns + ------- + z : np.ndarray + Baseline estimate of the same shape as `y` + """ + y = np.squeeze(y) + if y.ndim != 1: + raise ValueError(f"Expected 1D data array - got array with shape {y.shape}") + + N = y.shape[0] + + if mask is None: + mask = np.zeros(N, dtype=bool) + elif np.all(mask): + warnings.warn("Entire dataset is masked.") + + return np.zeros_like(y) + + mask = np.squeeze(mask) + + if mask.ndim != 1: + raise ValueError(f"Expected 1D mask array - got array with shape {mask.shape}") + + # Construct second-order difference matrix + D = np.array([[1, -2, 1]]).T.repeat(N - 1, axis=1) + D = dia_array((D, [-2, -1, 0]), shape=(N, N - 2)) + Hp = lam * D @ D.T + + # Create the banded smoothness matrix and weights matrix + H = np.ones((3, N), dtype=np.float64) + W = np.zeros_like(H) + + # Fill the lower banded matrix + for i in range(H.shape[0]): + H[i, : N - i] = Hp.diagonal(i) + + # Initialize weights to one + W[0] = 1.0 + + # Get the maximum exponential to avoid runtime warnings + maxpwr = np.log(np.finfo(y.dtype).max) + + for _ in range(max_iter): + # Ignore masked values + W[:, mask] = 0.0 + # Extract the actual weights. W is a 3xN matrix to match + # the banded shape of H, all off-diagonal elements are zero + w = W[0] + + z = la.solveh_banded(H + W, w * y, lower=True) + + # Get the difference between the signal and the baseline estimate, + # and compute the mean and std where unmasked data is less than zero + d = y - z + dn = d[(d < 0) & ~mask] + m = np.mean(dn) + s = np.std(dn) + + # Adjust weights based on the criteria discussed in the paper + pwr = 2 * (d - ((2 * s) - m)) * invert_no_zero(s) + pwr = np.clip(pwr, -maxpwr, maxpwr) + wt = invert_no_zero(1 + np.exp(pwr)) + + # Check for convergeance + if la.norm(w - wt) / la.norm(w) < end_frac: + break + + # Update the weights + W[0] = wt + + return z + + +def taper_mask(mask, nwidth, outer=False): + """Taper a 2d mask along the last axis. + + Parameters + ---------- + mask : np.ndarray + Mask to taper + nwidth : int + Number of samples on either side of the mask to taper + outer : bool, optional + If True, expand the mask outwards (wider). Otherwise, + expand the mask inwards (narrower). Default is False + + Returns + ------- + tapered_mask : np.ndarray[np.float64] + Mask convolved with taper window + """ + mask = np.atleast_2d(mask) + + width = 2 * nwidth - 1 + + taper = np.hanning(width)[np.newaxis] + taper /= np.sum(taper) + + tapered_mask = np.zeros( + (mask.shape[0], mask.shape[-1] + 2 * width), dtype=np.float64 + ) + tapered_mask[:, width:-width] = mask.astype(np.float64) + # Extend the edges + tapered_mask[:, :width] = tapered_mask[:, width][:, np.newaxis] + tapered_mask[:, -width:] = tapered_mask[:, -width - 1][:, np.newaxis] + + if outer: + tapered_mask = 1.0 - tapered_mask + + # First convolution. This creates a copy of the original boolean + # mask with the edges extended by `nwidth`. The second convolution + # applies the taper to this extended boolean mask. + tapered_mask = np.isclose( + oaconvolve(tapered_mask, taper, axes=-1, mode="same"), 1.0 + ).astype(np.float64) + tapered_mask = oaconvolve(tapered_mask, taper, axes=-1, mode="same") + + if outer: + tapered_mask = 1.0 - tapered_mask + + return tapered_mask[:, width:-width]