Skip to content

Commit

Permalink
feat(tools): add least squares baseline fit and mask taper functions.
Browse files Browse the repository at this point in the history
arPLS - 1D penalized least squares fit. Better than weighted median.

taper_mask: apply a hanning taper along the second axis of a 2D mask
  • Loading branch information
ljgray committed May 13, 2024
1 parent 8346e3c commit 7c051b1
Showing 1 changed file with 169 additions and 1 deletion.
170 changes: 169 additions & 1 deletion draco/util/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
Miscellaneous tasks should be placed in :py:mod:`draco.core.misc`.
"""

import warnings

import numpy as np

# Keep this here for compatibility
from caput.tools import invert_no_zero # noqa: F401
from caput.tools import invert_no_zero
from numpy.lib.recfunctions import structured_to_unstructured
from scipy import linalg as la
from scipy.signal import oaconvolve
from scipy.sparse import dia_array

from ._fast_tools import _calc_redundancy

Expand Down Expand Up @@ -534,3 +539,166 @@ def window_generalised(x, window="nuttall"):

w = (a[:, np.newaxis] * np.cos(t)).sum(axis=0)
return np.where((x >= 0) & (x <= 1), w, 0)


def arPLS_1d(y, mask=None, lam=1e2, end_frac=1e-2, max_iter=1000):
r"""Use arPLS to estimate a signal baseline.
1D implementation of symmetrically reweighted penalized least squares.
Solves for a signal baseline in the presence of high power outliers by
heavily weighting values below the signal and minimizing the weights
of values above the signal.
Notes
-----
arPLS solves the following linear system given signal :math: `\mathtt{y}`
.. math:: (W + \lambdaD_{d}^{T}D_{d})z = Wy
where the weighting function is given by
.. math:: w_{i} = (1 + \exp(2\sigma^{-1}(r_{i} - (2\sigma - \mu))))^{-1}
where :math:`\mathtt{r_{i}}` is the difference :math:`\mathtt{y_{i} - z_{i}}`
and :math:`\mathtt{\mu}` and :math:`\mathtt{\sigma}` are the mean and standard
deviation of the negative values in :math:`\mathbf{r}`.
The solver runs until `max_iter` iterations, or until the fractional mean
change in weights is less than `end_frac`.
Reference:
https://www.sciencedirect.com/science/article/pii/S1090780706002266
Properties
----------
y : np.ndarray
1D signal array
mask : np.ndarray, optional
1D boolean array of same length as `y`. Default is None.
lam : float, optional
Scaling parameter used to control importance of smoothness vs.
fit. High value prioritizes smoothness of the baseline estimate.
Default is 1e2.
end_frac : float, optional
Convergeance ratio `norm(delta_w) / norm(w)`.
max_iter : int, optional
Maximum number of iterations to run, even if the convergance
criteria is not met.
Returns
-------
z : np.ndarray
Baseline estimate of the same shape as `y`
"""
y = np.squeeze(y)
if y.ndim != 1:
raise ValueError(f"Expected 1D data array - got array with shape {y.shape}")

N = y.shape[0]

if mask is None:
mask = np.zeros(N, dtype=bool)
elif np.all(mask):
warnings.warn("Entire dataset is masked.")

return np.zeros_like(y)

mask = np.squeeze(mask)

if mask.ndim != 1:
raise ValueError(f"Expected 1D mask array - got array with shape {mask.shape}")

# Construct second-order difference matrix
D = np.array([[1, -2, 1]]).T.repeat(N - 1, axis=1)
D = dia_array((D, [-2, -1, 0]), shape=(N, N - 2))
Hp = lam * D @ D.T

# Create the banded smoothness matrix and weights matrix
H = np.ones((3, N), dtype=np.float64)
W = np.zeros_like(H)

# Fill the lower banded matrix
for i in range(H.shape[0]):
H[i, : N - i] = Hp.diagonal(i)

# Initialize weights to one
W[0] = 1.0

# Get the maximum exponential to avoid runtime warnings
maxpwr = np.log(np.finfo(y.dtype).max)

for _ in range(max_iter):
# Ignore masked values
W[:, mask] = 0.0
# Extract the actual weights. W is a 3xN matrix to match
# the banded shape of H, all off-diagonal elements are zero
w = W[0]

z = la.solveh_banded(H + W, w * y, lower=True)

# Get the difference between the signal and the baseline estimate,
# and compute the mean and std where unmasked data is less than zero
d = y - z
dn = d[(d < 0) & ~mask]
m = np.mean(dn)
s = np.std(dn)

# Adjust weights based on the criteria discussed in the paper
pwr = 2 * (d - ((2 * s) - m)) * invert_no_zero(s)
pwr = np.clip(pwr, -maxpwr, maxpwr)
wt = invert_no_zero(1 + np.exp(pwr))

# Check for convergeance
if la.norm(w - wt) / la.norm(w) < end_frac:
break

# Update the weights
W[0] = wt

return z


def taper_mask(mask, nwidth, outer=False):
"""Taper a 2d mask along the last axis.
Parameters
----------
mask : np.ndarray
Mask to taper
nwidth : int
Number of samples on either side of the mask to taper
outer : bool, optional
If True, expand the mask outwards (wider). Otherwise,
expand the mask inwards (narrower). Default is False
Returns
-------
tapered_mask : np.ndarray[np.float64]
Mask convolved with taper window
"""
mask = np.atleast_2d(mask)

width = 2 * nwidth - 1

taper = np.hanning(width)[np.newaxis]
taper /= np.sum(taper)

tapered_mask = np.zeros(
(mask.shape[0], mask.shape[-1] + 2 * width), dtype=np.float64
)
tapered_mask[:, width:-width] = mask.astype(np.float64)
# Extend the edges
tapered_mask[:, :width] = tapered_mask[:, width][:, np.newaxis]
tapered_mask[:, -width:] = tapered_mask[:, -width - 1][:, np.newaxis]

if outer:
tapered_mask = 1.0 - tapered_mask

# First convolution. This creates a copy of the original boolean
# mask with the edges extended by `nwidth`. The second convolution
# applies the taper to this extended boolean mask.
tapered_mask = np.isclose(
oaconvolve(tapered_mask, taper, axes=-1, mode="same"), 1.0
).astype(np.float64)
tapered_mask = oaconvolve(tapered_mask, taper, axes=-1, mode="same")

if outer:
tapered_mask = 1.0 - tapered_mask

return tapered_mask[:, width:-width]

0 comments on commit 7c051b1

Please sign in to comment.