Skip to content

Commit

Permalink
feat(interpolate): refactor baseline cuts and optional inpaint based …
Browse files Browse the repository at this point in the history
…on mask
  • Loading branch information
ljgray committed Jan 22, 2025
1 parent f744c5b commit 70506a0
Showing 1 changed file with 142 additions and 39 deletions.
181 changes: 142 additions & 39 deletions draco/analysis/interpolate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ class DPSSInpaint(task.SingleTask):
of a covariance matrix defined as a sum of `sinc` functions,
which represent top-hats in the spectral inverse of the data.
This class is fully-functional, but only supports applying a
constant cutoff.
Attributes
----------
axis : str
Expand Down Expand Up @@ -49,9 +52,21 @@ class DPSSInpaint(task.SingleTask):
centres = config.Property(proptype=list)
halfwidths = config.Property(proptype=list)
snr_cov = config.Property(proptype=float, default=1.0e-3)
flag_above_cutoff = config.Property(proptype=bool, default=False)
flag_above_cutoff = config.Property(proptype=bool, default=True)
copy = config.Property(proptype=bool, default=True)

def setup(self, mask=None):
"""Use an optional mask dataset.
Parameters
----------
mask : containers.RFIMask, optional
Container used to select samples to inpaint. If
not provided, inpaint samples where the data
weights are zero.
"""
self.mask = mask

def process(self, data):
"""Inpaint visibility data.
Expand Down Expand Up @@ -97,6 +112,11 @@ def inpaint(self, vis, weight, samples):
vobs, vaxind = _flatten_axes(vis, (*self.iter_axes, self.axis))
wobs, waxind = _flatten_axes(weight, (*self.iter_axes, self.axis))

if self.mask is not None:
mobs, _ = _flatten_axes(self.mask.mask, (*self.iter_axes, self.axis))
# Invert the mask to avoid doing it every loop
mobs = ~mobs

# Pre-allocate the full output array
vinp = np.zeros_like(vobs)
winp = np.zeros_like(wobs)
Expand All @@ -112,13 +132,16 @@ def inpaint(self, vis, weight, samples):
for ii in range(vobs.shape[0]):
# Get the correct basis for each slice
A = modes[amap[ii]]
# Write to the preallocated output array
W = wobs[ii] > 0

# Get a selection for data to keep
M = wobs[ii] > 0
W = mobs if self.mask is not None else M

vinp[ii], winp[ii] = dpss.inpaint(vobs[ii], wobs[ii], A, W, self.snr_cov)

# Re-flag gaps above the cutoff width
if self.flag_above_cutoff:
winp[ii] *= dpss.flag_above_cutoff(W, cutoff)
winp[ii] *= dpss.flag_above_cutoff(M, cutoff)

# Reshape and move the interpolation axis back
vinp = _inv_move_front(vinp, vaxind, vis.local_shape)
Expand All @@ -144,44 +167,42 @@ def _get_basis(self, samples):
return [modes], amap


class DPSSInpaintDelay(DPSSInpaint):
"""Inpaint with baseline-dependent delay cut.
class DPSSInpaintBaseline(DPSSInpaint):
"""Inpaint with baseline-dependent cut.
This is a non-functional base class which provides functionality
for selecting the correct baselines and making a set of unique
basis functions.
Users should override the `_get_cuts` method to make baseline-
dependent cuts along the desired axis.
Attributes
----------
axis : str
Name of axis over which to inpaint. `freq` is the only
accepted argument.
za_cut : float
Sine of the maximum zenith angle included in baseline-dependent delay
filtering. Default is 1 which corresponds to the horizon (ie: filters out all
zenith angles). Setting to zero turns off baseline dependent cut.
extra_cut : float
Increase the delay threshold beyond the baseline dependent term.
telescope_orientation : one of ('NS', 'EW', 'none')
Determines if the baseline-dependent delay cut is based on the north-south
component, the east-west component or the full baseline length. For
cylindrical telescopes oriented in the NS direction (like CHIME) use 'NS'.
The default is 'NS'.
"""

axis = config.enum(["freq"], default="freq")
za_cut = config.Property(proptype=float, default=1.0)
extra_cut = config.Property(proptype=float, default=0.0)
telescope_orientation = config.enum(["NS", "EW", "none"], default="NS")

def setup(self, telescope):
def setup(self, telescope, mask=None):
"""Load a telescope object.
This is required to establish baseline-dependent
delay cuts.
Parameters
----------
telescope : TransitTelescope
Telescope object with baseline information.
mask : containers.RFIMask, optional
Container used to select samples to inpaint. If
not provided, inpaint samples where the data
weights are zero.
"""
self.telescope = io.get_telescope(telescope)
# Pass the mask to the parent class
super().setup(mask)

def _set_sel(self, data):
"""Set the local baselines."""
Expand All @@ -195,6 +216,56 @@ def _get_basis(self, samples):
Returns a list of bases and a map.
"""
# Get cutoffs for each baseline
cuts = self._get_baseline_cuts()

# Compute covariances for each unique baseline and
# map to each individual baseline.
cuts, amap = np.unique(cuts, return_inverse=True)

modes = []

for ii, cut in enumerate(cuts):
self.log.debug(
f"Making unique covariance {ii+1}/{len(cuts)} with cut={cut}ms."
)
cov = dpss.make_covariance(samples, cut, 0.0)
modes.append(dpss.get_sequence(cov))

return modes, amap

def _get_baseline_cuts(self):
"""Get an array of cutoffs for each baseline."""
raise NotImplementedError()


class DPSSInpaintDelay(DPSSInpaintBaseline):
"""Inpaint with baseline-dependent delay cut.
Attributes
----------
axis : str
Name of axis over which to inpaint. `freq` is the only
accepted argument.
za_cut : float
Sine of the maximum zenith angle included in baseline-dependent delay
filtering. Default is 1 which corresponds to the horizon (ie: filters out all
zenith angles). Setting to zero turns off baseline dependent cut.
extra_cut : float
Increase the delay threshold beyond the baseline dependent term.
telescope_orientation : one of ('NS', 'EW', 'none')
Determines if the baseline-dependent delay cut is based on the north-south
component, the east-west component or the full baseline length. For
cylindrical telescopes oriented in the NS direction (like CHIME) use 'NS'.
The default is 'NS'.
"""

axis = config.enum(["freq"], default="freq")
za_cut = config.Property(proptype=float, default=1.0)
extra_cut = config.Property(proptype=float, default=0.0)

def _get_baseline_cuts(self):
"""Get an array of delay cuts."""
# Calculate delay cuts based on telescope orientation
if self.telescope_orientation == "NS":
blen = abs(self._baselines[:, 1])
Expand All @@ -207,36 +278,66 @@ def _get_basis(self, samples):
# to three decimal places to reduce repeat calculations
delay_cut = self.za_cut * blen / units.c * 1.0e6 + self.extra_cut
delay_cut = np.maximum(delay_cut, self.halfwidths[0])
delay_cut = np.round(delay_cut, decimals=3)

# Compute covariances for each unique baseline and
# map to each individual baseline.
delay_cut, amap = np.unique(delay_cut, return_inverse=True)
return np.round(delay_cut, decimals=3)

modes = []

for ii, cut in enumerate(delay_cut):
self.log.debug(
f"Making unique covariance {ii+1}/{len(delay_cut)} with cut={cut}ms."
)
cov = dpss.make_covariance(samples, cut, 0.0)
modes.append(dpss.get_sequence(cov))
class DPSSInpaintMMode(DPSSInpaintBaseline):
"""Inpaint with a baseline-dependent m cut.
return modes, amap
Attributes
----------
axis : str
Name of axis over which to inpaint. `freq` is the only
accepted argument.
"""

axis = config.enum(["ra"], default="ra")

class DPSSInpaintDelayStokesI(DPSSInpaintDelay):
"""Inpaint with baseline-dependent delay cut.
def _get_baseline_cuts(self):
"""Make the DPSS basis for each unique m cut.
Returns a list of bases and a map.
"""
# Calculate cuts based on telescope orientation.
# Note that this is opposite from the baseline
# component used for delay, since we care
# about the direction of fringing here
if self.telescope_orientation == "NS":
blen = abs(self._baselines[:, 0])
elif self.telescope_orientation == "EW":
blen = abs(self._baselines[:, 1])
else:
blen = np.linalg.norm(self._baselines, axis=1)

# Get highest frequency in MHz
freq = self.telescope.freq_start
dec = np.deg2rad(self.telescope.latitude)
# Cut at the maximum `m` expected for each baseline.
# Compensate for the fact the ra samples is in degrees
mcut = (np.pi / 180) * freq * 1e6 * blen / (units.c * np.cos(dec))
mcut = np.maximum(mcut, self.halfwidths[0])

return np.round(mcut, decimals=2)

The input container must contain Stokes I visibilities.
"""

class StokesIMixin:
"""Change baseline selection assuming Stokes I only."""

def _set_sel(self, data):
"""Set the local baselines."""
# Baseline lengths extracted from the stack axis
self._baselines = data.stack[data.vis[:].local_bounds]


class DPSSInpaintDelayStokesI(StokesIMixin, DPSSInpaintDelay):
"""Inpaint Stokes I with baseline-dependent delay cut."""


class DPSSInpaintMModeStokesI(StokesIMixin, DPSSInpaintMMode):
"""Inpaint Stokes I with baseline-dependent m-mode cut."""


def _flatten_axes(data, axes):
"""Move the specified axes to the front of a dataset.
Expand All @@ -253,7 +354,9 @@ def _flatten_axes(data, axes):
f"but axes {axes} were requested."
)

return _move_front(data[:].local_array, axind, data.local_shape), axind
ds = data[:].view(np.ndarray)

return _move_front(ds, axind, ds.shape), axind


def _move_front(arr: np.ndarray, axis: int | list, shape: tuple) -> np.ndarray:
Expand Down

0 comments on commit 70506a0

Please sign in to comment.