diff --git a/draco/analysis/delay.py b/draco/analysis/delay.py index a3908c3d8..6873264e4 100644 --- a/draco/analysis/delay.py +++ b/draco/analysis/delay.py @@ -12,6 +12,7 @@ from ..core import containers, io, task from ..util import random, tools +from .delayopt import delay_power_spectrum_maxpost class DelayFilter(task.SingleTask): @@ -362,6 +363,22 @@ class DelayTransformBase(task.SingleTask): assume the noise power in the data is `weight_boost` times lower, which is useful if you want the "true" noise to not be downweighted by the Wiener filter, or have it included in the Gibbs sampler. Default: 1.0. + freq_frac + The threshold for the fraction of time samples present in a frequency for it + to be retained. Must be strictly greater than this value, so the default + value 0, retains any channel with at least one sample. A value of 0.01 would + retain any frequency that has > 1% of time samples unmasked. + time_frac + The threshold for the fraction of frequency samples required to retain a + time sample. Must be strictly greater than this value. The default value (-1) + means that all time samples are kept. A value of 0.01 would keep any time + sample with >1% of frequencies unmasked. + remove_mean + Subtract the mean in time of each frequency channel. This is done after time + samples are pruned by the `time_frac` threshold. + scale_freq + Scale each frequency by its standard deviation to flatten the fluctuations + across the band. Applied before any apodisation is done. """ freq_zero = config.Property(proptype=float, default=None) @@ -385,6 +402,12 @@ class DelayTransformBase(task.SingleTask): complex_timedomain = config.Property(proptype=bool, default=False) weight_boost = config.Property(proptype=float, default=1.0) + freq_frac = config.Property(proptype=float, default=0.0) + time_frac = config.Property(proptype=float, default=-1.0) + + remove_mean = config.Property(proptype=bool, default=False) + scale_freq = config.Property(proptype=bool, default=False) + def process(self, ss): """Estimate the delay spectrum or power spectrum. @@ -542,8 +565,10 @@ def _calculate_delays( # NOTE: this not obviously the right level for this, but it's the only baseclass in # common to where it's used def _cut_data( - self, data: np.ndarray, weight: np.ndarray, channel_ind: np.ndarray - ) -> Optional[tuple[np.ndarray, np.ndarray, np.ndarray]]: + self, + data: np.ndarray, + weight: np.ndarray, + ) -> Optional[tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]: """Apply cuts on the data and weights and returned modified versions. Parameters @@ -553,8 +578,6 @@ def _cut_data( the second last. weight A n-d array of the weights. Axes the same as the data. - channel_ind - The indices of the frequency channels. Returns ------- @@ -563,35 +586,51 @@ def _cut_data( new_weight The new weights with cuts applied and averaged over the `average_axis` (i.e second last). - new_channel_ind - The indices of the remaining channels after cuts. + non_zero_freq + The selection of frequencies retained. A boolean array of shape N_freq that + is true at indices of frequencies retained after applying the freq_frac cut. + non_zero_time + The selection of times retained. A boolean array of shape N_rime that is + true at indices of time samples retained after applying the time_frac cut. """ - # Mask out data with completely zero'd weights and generate time - # averaged weights - weight_cut = ( - 1e-4 * weight.mean() - ) # Use approx threshold to ignore small weights - data = data * (weight > weight_cut) - weight = np.mean(weight, axis=-2) + ntime, nfreq = data.shape[-2:] + non_zero_time = (weight > 0).mean(axis=-1).reshape(-1, ntime).mean( + axis=0 + ) > self.time_frac + non_zero_freq = (weight > 0).mean(axis=-2).reshape(-1, nfreq).mean( + axis=0 + ) > self.freq_frac - if (data == 0.0).all(): + # If there are no non-zero weighted entries skip + if not non_zero_freq.any(): return None - # If there are no non-zero weighted entries skip - non_zero = (weight > 0).reshape(-1, weight.shape[-1]).all(axis=0) - if not non_zero.any(): + data = data[..., non_zero_time, :][..., non_zero_freq] + weight = weight[..., non_zero_time, :][..., non_zero_freq] + + # Remove the mean from the data before estimating the spectrum + if self.remove_mean: + # Do not apply this in place to make sure we don't modify + # the input data + data = data - data.mean(axis=0, keepdims=True) + + # If there are no non-zero data entries skip + if (data == 0.0).all(): return None - # Remove any frequency channel which is entirely zero, this is just to - # reduce the computational cost, it should make no difference to the result - data = data[..., non_zero] - weight = weight[..., non_zero] - non_zero_channel = channel_ind[non_zero] + # Scale the frequencies by the typical fluctuation size, with a scaling to + # obtain constant total power + if self.scale_freq: + dscl = ( + data.std(axis=-2)[..., np.newaxis, :] + / data.std(axis=(-1, -2))[..., np.newaxis, np.newaxis] + ) + data = data * tools.invert_no_zero(dscl) - # Increase the weights by a specified amount + weight = np.mean(weight, axis=-2) weight *= self.weight_boost - return data, weight, non_zero_channel + return data, weight, non_zero_freq, non_zero_time class DelayGibbsSamplerBase(DelayTransformBase, random.RandomTask): @@ -603,22 +642,34 @@ class DelayGibbsSamplerBase(DelayTransformBase, random.RandomTask): Attributes ---------- nsamp : int, optional - The number of Gibbs samples to draw. + If maxpost=False, the number of Gibbs samples to draw. If maxpost=True, + the number of iterations allowed in the call to scipy.optimize.minimize + in the maximum-likelihood estimator. initial_amplitude : float, optional The Gibbs sampler will be initialized with a flat power spectrum with - this amplitude. Default: 10. + this amplitude. Unused if maxpost=True (flat spectrum is a bad initial + guess for the max-likelihood estimator). Default: 10. save_samples : bool, optional. The entire chain of samples will be saved rather than just the final result. Default: False initial_sample_path : str, optional File path to load an initial power spectrum sample. If no file is given, - start with a flat power spectrum. Default: None + start with a flat power spectrum (Gibbs) or inverse FFT (max-likelihood). + Default: None + maxpost : bool, optional + The NRML maximum-likelihood delay spectrum estimator will be used instead + of the Gibbs sampler. + maxpost_tol : float, optional + Only used if maxpost=True. The convergence tolerance used by + scipy.optimize.minimize in the maximum likelihood estimator. """ nsamp = config.Property(proptype=int, default=20) initial_amplitude = config.Property(proptype=float, default=10.0) save_samples = config.Property(proptype=bool, default=False) initial_sample_path = config.Property(proptype=str, default=None) + maxpost = config.Property(proptype=bool, default=False) + maxpost_tol = config.Property(proptype=float, default=1e-3) def _create_output( self, @@ -663,6 +714,12 @@ def _create_output( if self.save_samples: delay_spec.add_dataset("spectrum_samples") + # If estimating delay spectrum w/ max-likelihood, initialize a mask dataset + # to record the baselines for which the estimator did/didn't converge. + if self.maxpost: + delay_spec.add_dataset("spectrum_mask") + delay_spec.datasets["spectrum_mask"][:] = 0 + # Save the frequency axis of the input data as an attribute in the output # container delay_spec.attrs["freq"] = ss.freq @@ -690,8 +747,16 @@ def _get_initial_S(self, nbase, ndelay, dtype): initial_S = cont.spectrum[:].local_array bl_ax = cont.spectrum.attrs["axis"].tolist().index("baseline") initial_S = np.moveaxis(initial_S, bl_ax, 0) - else: + # Gibbs case. + elif not self.maxpost: initial_S = np.ones((nbase, ndelay), dtype=dtype) * self.initial_amplitude + # Max-likelihood case. + else: + # Flat spectrum is a bad initial guess for max-likelihood. + # Passing None as the initial guess to the max-likelihood + # estimator will cause it to use an inverse FFT as the + # initial guess, which works well in practice. + initial_S = np.full(nbase, None) return initial_S @@ -734,30 +799,64 @@ def _evaluate(self, data_view, weight_view, out_cont, delays, channel_ind): weight = weight_view.local_array[lbi] # Apply the cuts to the data - t = self._cut_data(data, weight, channel_ind) + t = self._cut_data(data, weight) if t is None: continue - data, weight, non_zero_channel = t + data, weight, nzf, _ = t + + if self.maxpost: + spec, success = delay_power_spectrum_maxpost( + data, + ndelay, + weight, + initial_S[lbi], + window=self.window if self.apply_window else None, + fsel=channel_ind[nzf], + maxiter=self.nsamp, + tol=self.maxpost_tol, + ) - spec = delay_power_spectrum_gibbs( - data, - ndelay, - weight, - initial_S[lbi], - window=self.window if self.apply_window else None, - fsel=non_zero_channel, - niter=self.nsamp, - rng=rng, - complex_timedomain=self.complex_timedomain, - ) + # If max-likelihood didn't converge in allowed number of iters, reflect this in the mask. + if not success: + # Indexing into a MemDatasetDistributed object with the + # global index bi actually ends up (under the hood) + # indexing the underlying MPIArray with the local index. + out_cont.datasets["spectrum_mask"][bi] = 1 - # Take an average over the last half of the delay spectrum samples - # (presuming that removes the burn-in) - spec_av = np.median(spec[-(self.nsamp // 2) :], axis=0) - out_cont.spectrum[bi] = np.fft.fftshift(spec_av) + out_cont.spectrum[bi] = np.fft.fftshift(spec[-1]) - if self.save_samples: - out_cont.datasets["spectrum_samples"][:, bi] = spec + if self.save_samples: + nsamp = len(spec) + out_cont.datasets["spectrum_samples"][:, bi] = 0.0 + out_cont.datasets["spectrum_samples"][-nsamp:, bi] = np.array(spec) + + else: + spec = delay_power_spectrum_gibbs( + data, + ndelay, + weight, + initial_S[lbi], + window=self.window if self.apply_window else None, + fsel=channel_ind[nzf], + niter=self.nsamp, + rng=rng, + complex_timedomain=self.complex_timedomain, + ) + + # Take an average over the last half of the delay spectrum samples + # (presuming that removes the burn-in) + spec_av = np.median(spec[-(self.nsamp // 2) :], axis=0) + out_cont.spectrum[bi] = np.fft.fftshift(spec_av) + + if self.save_samples: + out_cont.datasets["spectrum_samples"][:, bi] = spec + + if self.maxpost: + # Record number of converged baselines for debugging info. + n_conv = nbase - out_cont.datasets["spectrum_mask"][:].allgather().sum() + self.log.debug( + f"{n_conv}/{nbase} baselines converged in maximum-likelihood estimate of delay power spectrum." + ) return out_cont @@ -968,10 +1067,10 @@ def _evaluate(self, data_view, weight_view, out_cont, delays, channel_ind): weight = weight_view.local_array[lbi] # Apply the cuts to the data - t = self._cut_data(data, weight, channel_ind) + t = self._cut_data(data, weight) if t is None: continue - data, weight, non_zero_channel = t + data, weight, nzf, _ = t # Pass the delay power spectrum and frequency spectrum for each "baseline" # to the Wiener filtering routine.The delay power spectrum has been @@ -983,7 +1082,7 @@ def _evaluate(self, data_view, weight_view, out_cont, delays, channel_ind): ndelay, weight, window=self.window if self.apply_window else None, - fsel=non_zero_channel, + fsel=channel_ind[nzf], complex_timedomain=self.complex_timedomain, ) # FFT-shift along the last axis @@ -1114,10 +1213,10 @@ def _evaluate(self, data_view, weight_view, out_cont, delays, channel_ind): weight = np.array([w.local_array[lbi] for w in weight_view]) # Apply the cuts to the data - t = self._cut_data(data, weight, channel_ind) + t = self._cut_data(data, weight) if t is None: continue - data, weight, non_zero_channel = t + data, weight, nzf, _ = t spec = delay_spectrum_gibbs_cross( data, @@ -1125,7 +1224,7 @@ def _evaluate(self, data_view, weight_view, out_cont, delays, channel_ind): weight, initial_S[lbi], window=self.window if self.apply_window else None, - fsel=non_zero_channel, + fsel=channel_ind[nzf], niter=self.nsamp, rng=rng, ) @@ -1512,7 +1611,7 @@ def _draw_signal_sample_f(S): # power spectrum equal to 0.5 times the power spectrum of the complex # delay spectrum, if the statistics are circularly symmetric S = 0.5 * np.repeat(S, 2) - Si = 1.0 / S + Si = 1.0 * tools.invert_no_zero(S) Ci = np.diag(Si) + FTNiF # Draw random vectors that form the perturbations diff --git a/draco/analysis/delayopt.py b/draco/analysis/delayopt.py new file mode 100644 index 000000000..0731d6088 --- /dev/null +++ b/draco/analysis/delayopt.py @@ -0,0 +1,592 @@ +"""Helper functions for the delay PS estimation via ML/MAP optimisation.""" + +from typing import Protocol + +import numpy as np +import scipy.linalg as la +from scipy.optimize import minimize + +from ..util import tools + + +class OptFunc(Protocol): + """A protocol for a function and its gradients to optimise. + + The function should be defined such that it can be minimised. + """ + + def value(self, x: np.ndarray) -> float: + """Compute the value of the function. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + The value of the function being optimized. + """ + raise NotImplementedError() + + def gradient(self, x: np.ndarray) -> np.ndarray: + """Compute the gradient of the function with respect to the optimization parameters. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + The gradient of the function being optimized. + """ + raise NotImplementedError() + + def hessian(self, x: np.ndarray) -> np.ndarray: + """Compute the hessian (matrix of second derivatives) of the function with respect to the optimization parameters. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + The hessian of the function being optimized. + """ + raise NotImplementedError() + + +class LogLikePS(OptFunc): + """Compute the likelihood, gradient, and hessian for delay PS estimation. + + This class efficiently computes the *negative* likelihood, as well as its gradient + and hessian. It will precompute and cache relevant quantities such that all of these + can be calculated per iteration without recomputing. It is designed to be used with + `scipy.optimize.minimize`. + + The parameters used for this are the log of the delay power spectrum samples. + + Parameters + ---------- + X + The covariance matrix of the data. + MF + The masked Fourier matrix which maps from delay space to frequency space and + applies zero to any masked channels. + N + The noise covariance matrix. + nsamp + The number of samples used to calculate the covariance. + fsel + An optional array selection to apply to limit the frequencies used in the + estimation. If not supplied, any frequencies entirely masked within `MF` are + skipped. + exact_hessian + If set, use the exact Hessian for the calculation. Otherwise use the Fisher + matrix in the same way as the original NRML methods. + """ + + def __init__( + self, + X: np.ndarray, + MF: np.ndarray, + N: np.ndarray, + nsamp: int, + fsel: np.ndarray | slice | list | None = None, + exact_hessian: bool = True, + ) -> None: + if fsel is None: + fsel = (MF != 0).any(axis=1) + + self.X = X[fsel][:, fsel] + self.MF = MF[fsel] + self.N = N[fsel][:, fsel] + + self.nsamp = nsamp + self.exact_hessian = exact_hessian + + # Store the location we are calculating for. + _s_a: np.ndarray | None = None + + def _precompute(self, x: np.ndarray) -> bool: + """Pre-compute useful matrices for the given value. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + True if a pre-computation was done, otherwise False. + """ + if np.all(x == self._s_a): + return False + + self._s_a = x + + S = np.exp(x) + dS = S + + self._C = (self.MF * S[np.newaxis, :]) @ self.MF.T.conj() + self.N + self._XC = self.X - self._C + + self._Ch = la.cholesky(self._C, lower=False) + + self._U = dS[np.newaxis, :] ** 0.5 * self.MF + self._Ut = la.cho_solve((self._Ch, False), self._U) + self._XC_Ut = self._XC @ self._Ut + + self._W = self._U + self._Wt = self._Ut + self._XC_Wt = self._XC_Ut + + return True + + def value(self, x: np.ndarray) -> float: + """Calculate the negative log-likelihood. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Value of negative log-likelihood for the given set of params. + """ + self._precompute(x) + + CiX = la.cho_solve((self._Ch, False), self.X) + + lndet = 2 * np.log(np.diagonal(self._Ch)).sum().real + + ll = lndet + np.diagonal(CiX).sum().real + + return self.nsamp * ll + + def gradient(self, x: np.ndarray) -> np.ndarray: + """Calculate the gradient of the negative log-likelihood. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Gradient of negative log-likelihood for the given set of params. + """ + self._precompute(x) + + g = -(self._Ut.conj() * self._XC_Ut).sum(axis=0).real + + return self.nsamp * g + + def hessian(self, x: np.ndarray) -> np.ndarray: + """Calculate the Hessian of the negative log-likelihood. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Hessian of negative log-likelihood for the given set of params. + """ + self._precompute(x) + + Ua_Utb = self._U.T.conj() @ self._Ut + Fab = Ua_Utb * Ua_Utb.T.conj() + H = Fab.real + + if self.exact_hessian: + Uta_dX_Utb = self._Ut.T.conj() @ self._XC_Ut + H += (2 * Uta_dX_Utb * Ua_Utb.T).real + t = -(self._Wt.conj() * self._XC_Wt).sum(axis=0).real + H += np.diag(t.real) + + return self.nsamp * H + + +class SmoothnessRegulariser(OptFunc): + """A smoothness prior on the values at given locations. + + This calculates the average in a window of `width` points, and then applies a + Gaussian with precision `alpha` and this average as the mean for each point. For a + `width` of 3 this is effectively a constraint on the second derivative. + + Parameters + ---------- + N + The number of points in the function we are inferring. The sample locations are + implicitly assumed to be at `np.arange(N)`. + alpha + The smoothness precision. + width + The width over which the smoothness is calculated in samples. + periodic + Assume that the function is periodic and wrap around. + """ + + def __init__( + self, N: int, alpha: float, *, width: int = 5, periodic: bool = True + ) -> None: + + # Calculate the matrix for the moving average + W = np.zeros((N, N)) + for i in range(N): + + ll, ul = i - (width - 1) // 2, i + (width + 1) // 2 + if not periodic: + ll, ul = max(0, ll), min(ul, N) + v = np.arange(ll, ul) + + W[i][v] = 1.0 / len(v) + + IW = np.identity(N) - W + self.Ci = alpha * (IW.T @ IW) + + def value(self, x: np.ndarray) -> float: + """Calculate the value of the smoothness regularizer. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Value of the smoothness regularizer for the given set of params. + """ + return 0.5 * float(x @ self.Ci @ x) + + def gradient(self, x: np.ndarray) -> np.ndarray: + """Calculate the gradient of the smoothness regularizer. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Gradient of the smoothness regularizer for the given set of params. + """ + return self.Ci @ x + + def hessian(self, x: np.ndarray) -> np.ndarray: + """Calculate the hessian of the smoothness regularizer. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Hessian of the smoothness regularizer for the given set of params. + """ + return self.Ci + + +class GaussianProcessPrior(OptFunc): + """A Gaussian process prior on the inputs. + + The kernel has a standard deviation of `width`, and the variance is `alpha**2`. + + Parameters + ---------- + N + The number of points in the function we are inferring. The sample locations are + implicitly assumed to be at `np.arange(N)`. + alpha + The scale of the distribution. + width + The width of the kernel used in the covariance. + kernel + The name of the kernel to use. Either 'gaussian' or 'rational'. + a + Parameter for the rational quadratic kernel. + periodic + Assume that the function is periodic and wrap around. + reg + Add a small diagonal entry of size `alpha**2 * reg` for numerical stability. + """ + + def __init__( + self, + N: int, + alpha: float, + width: int = 5, + *, + kernel: str = "gaussian", + a: float = 1.0, + periodic: bool = True, + reg: float = 1e-6, + ) -> None: + + ii = np.arange(N) + d = ii[:, np.newaxis] - ii[np.newaxis, :] + + if periodic: + d = ((d + N / 2) % N) - N / 2 + + d2 = (d / width) ** 2 + + if kernel == "gaussian": + C = np.exp(-0.5 * d2) + elif kernel == "rational": + C = (1 + d2 / (2 * a)) ** -a + else: + raise ValueError(f"Unknown kernel type '{kernel}'") + + self.Ci: np.ndarray = la.inv(C + np.identity(N) * reg) / alpha**2 + + def value(self, x: np.ndarray) -> float: + """Calculate the value of the gaussian process prior. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Value of the gaussian process prior for the given set of params. + """ + return 0.5 * float(x @ self.Ci @ x) + + def gradient(self, x: np.ndarray) -> np.ndarray: + """Calculate the gradient of the gaussian process prior. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Gradient of the gaussian process prior for the given set of params. + """ + return self.Ci @ x + + def hessian(self, x: np.ndarray) -> np.ndarray: + """Calculate the hessian of the gaussian process prior. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Hessian of the gaussian process prior for the given set of params. + """ + return self.Ci + + +class AddFunctions(OptFunc): + """Optimise the sum of several functions. + + The values, gradients and hessians of each function are added together. + + Parameters + ---------- + functions + A list of functions to optimise. The individual functions must all + take the same set of parameters. + """ + + def __init__(self, functions: list[OptFunc]) -> None: + if len(functions) <= 0: + raise ValueError("At least one function must be supplied.") + self.functions = functions + + def value(self, x: np.ndarray) -> float: + """Calculate the sum of the individual function values. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Sum of individual function values for the given set of params. + """ + return sum(f.value(x) for f in self.functions) + + def gradient(self, x: np.ndarray) -> np.ndarray: + """Calculate the sum of the individual function gradients. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Sum of individual function gradients for the given set of params. + """ + g = self.functions[0].gradient(x) + for f in self.functions[1:]: + g += f.gradient(x) + return g + + def hessian(self, x: np.ndarray) -> np.ndarray: + """Calculate the sum of the individual function hessians. + + Parameters + ---------- + x + The array of parameters in the optimization. + + Returns + ------- + Sum of individual function hessians for the given set of params. + """ + h = self.functions[0].hessian(x) + for f in self.functions[1:]: + h += f.hessian(x) + return h + + +def delay_power_spectrum_maxpost( + data, + N, + Ni, + initial_S: np.ndarray | None = None, + window: str = "nuttall", + fsel: np.ndarray | None = None, + maxiter: int = 30, + tol: float = 1e-3, +): + """Estimate the delay power spectrum with a maximum-likelihood estimator. + + This routine uses `scipy.optimize.minimize` to find the maximum likelihood power + spectrum. + + Parameters + ---------- + data : np.ndarray[:, freq] + Data to estimate the delay spectrum of. + N : int + The length of the output delay spectrum. There are assumed to be `N/2 + 1` + total frequency channels if assuming a real delay spectrum, or `N` channels + for a complex delay spectrum. + Ni : np.ndarray[freq] + Inverse noise variance. + initial_S : np.ndarray[delay] + The initial delay power spectrum guess. + window : one of {'nuttall', 'blackman_nuttall', 'blackman_harris', None}, optional + Apply an apodisation function. Default: 'nuttall'. + fsel : np.ndarray[freq], optional + Indices of channels that we have data at. By default assume all channels. + maxiter : int, optional + Maximum number of iterations to run of the solver. + tol : float, optional + The convergence tolerance for the optimization that is passed to scipy.optimize.minimize. + + Returns + ------- + spec : list + List of spectrum samples. + success : bool + True if the solver successfully converged, False otherwise. + """ + from .delay import fourier_matrix + + nsamp, Nf = data.shape + + if fsel is None: + fsel = np.arange(Nf) + elif len(fsel) != Nf: + raise ValueError( + "Length of frequency selection must match frequencies passed. " + f"{len(fsel)} != {data.shape[-1]}" + ) + + # Construct the Fourier matrix + F = fourier_matrix(N, fsel) + + # Compute the covariance matrix of the data + # Window the frequency data if requested + if window is not None: + # Construct the window function + x = fsel * 1.0 / N + w = tools.window_generalised(x, window=window) + + # Apply to the projection matrix and the data + F *= w[:, np.newaxis] + data = data * w[np.newaxis, :] + + X = data.T @ data.conj() + X /= nsamp + + # Construct the noise matrix from the diagonal of its inverse + Nm = np.diag(tools.invert_no_zero(Ni)) + + # Mask out any completely missing frequencies + F[Ni == 0] = 0.0 + + # Use the pseudo-inverse to give a starting point for the optimiser + if initial_S is None: + lsi = np.log((data @ la.pinv(F.T, rtol=1e-3)).var(axis=0)) + else: + lsi = np.log(initial_S) + + optfunc = AddFunctions( + [ + LogLikePS(X, F, Nm, nsamp, exact_hessian=True), + GaussianProcessPrior( + N, alpha=5, width=3.0, kernel="gaussian", a=5.0, reg=1e-8 + ), + ] + ) + + samples = [] + + # This callback is for getting the intermediate samples such that we can access + # convergence of the solution + def _get_intermediate(xk): + samples.append(np.exp(xk)) + + try: + res = minimize( + optfunc.value, + x0=lsi, + jac=optfunc.gradient, + hess=optfunc.hessian, + method="Newton-CG", + options={"maxiter": maxiter, "xtol": tol}, + callback=_get_intermediate, + ) + success = res.success + + # LinAlgError gets thrown for certain baselines in _precompute during a Cholesky decomposition + # of the covariance matrix (used in likelihood computation) when the covariance matrix isn't + # positive definite. This appears to happen after one of the minimization parameters blows up, + # likely from a numerical instability somewhere in the gradient/hessian computation. This has + # only been observed to affect a small number of (almost entirely masked) baselines that have + # very low weights. + # ValueError also occasionally gets thrown from (almost certainly) the same numerical + # instability: one of the minimization parameters blows up and causes an overflow when taking + # np.exp() of the minimization parameter to get the delay spectrum. The ValueError gets thrown + # again in the Cholesky decomposition when numpy runs a check_finite on the covariance (which + # at that point contains infs/nans). + except (la.LinAlgError, ValueError): + success = False + + # In rare cases a LinAlgError can be thrown before the _get_intermediate callback is called in `minimize`. + # In this scenario, an empty `samples` is returned and this causes errors in the calling function. + # Add the initial guess to the samples list in this case and ensure success is set to False. + if len(samples) == 0: + samples.append(np.exp(lsi)) + success = False + + # NOTE: the final sample in samples is already the final result + return samples, success diff --git a/draco/analysis/flagging.py b/draco/analysis/flagging.py index f648295d7..682ac937f 100644 --- a/draco/analysis/flagging.py +++ b/draco/analysis/flagging.py @@ -84,13 +84,11 @@ def process(self, sstream): if self.remove_average: # Estimate the mean level from unmasked data - import scipy.stats - nanvis = ( sstream.vis[:] * np.where(mask_bool, 1.0, np.nan)[np.newaxis, np.newaxis, :] ) - average = scipy.stats.nanmedian(nanvis, axis=-1)[:, :, np.newaxis] + average = np.nanmedian(nanvis, axis=-1)[:, :, np.newaxis] sstream.vis[:] -= average # Apply the mask to the data diff --git a/draco/core/containers.py b/draco/core/containers.py index 7104cc893..643ea1fba 100644 --- a/draco/core/containers.py +++ b/draco/core/containers.py @@ -2348,6 +2348,13 @@ class DelaySpectrum(DelayContainer): "distributed": True, "distributed_axis": "baseline", }, + "spectrum_mask": { + "axes": ["baseline"], + "dtype": bool, + "initialise": False, + "distributed": True, + "distributed_axis": "baseline", + }, } def __init__(self, *args, weight_boost=1.0, sample=1, **kwargs): diff --git a/requirements.txt b/requirements.txt index eae27fdfa..761004a1f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,8 +3,8 @@ caput[compression] @ git+https://github.com/radiocosmology/caput.git cora @ git+https://github.com/radiocosmology/cora.git Cython>0.18 mpi4py -numpy>=1.17 +numpy>=1.17, <2 scipy>=0.10 scikit-image skyfield -pywavelets \ No newline at end of file +pywavelets