Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Baseline-dependent rebinner #269

Merged
merged 8 commits into from
May 27, 2024
10 changes: 8 additions & 2 deletions draco/analysis/flagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2298,10 +2298,16 @@ def process(self, data):
The modified data. This is the same object as the input, and it has been
modified in place.
"""
if type(self.data_stack) != type(data):
if "effective_ra" in data.datasets:
raise TypeError(
"Blending uncorrected rebinned data not supported. "
"Please apply a correction such as `sidereal.RebinGradientCorrection."
)

if not isinstance(data, type(self.data_stack)):
raise TypeError(
f"type(data) (={type(data)}) must match"
f"type(data_stack) (={type(self.type)}"
f"type(data_stack) (={type(self.data_stack)}"
)

# Try and get both the stack and the incoming data to have the same
Expand Down
287 changes: 260 additions & 27 deletions draco/analysis/sidereal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from cora.util import units

from ..core import containers, io, task
from ..util import tools
from ..util import regrid, tools
from .transform import Regridder


Expand Down Expand Up @@ -477,6 +477,204 @@ def _regrid(self, vis, weight, lsd):
return interp_grid, interp_vis, interp_weight


class SiderealRebinner(SiderealRegridder):
"""Regrid a sidereal day of data using a binning method.

Assign a fraction of each time sample to the nearest RA bin based
on the propotion of the time bin that overlaps the RA bin.

Tracks the weighted effective centre of RA bin so that a centring
correction can be applied afterwards. A correction option is
implemented in `RebinGradientCorrection`.
"""

def process(self, data):
"""Rebin the sidereal day.

Parameters
----------
data : containers.TimeStream
Timestream data for the day (must have a `LSD` attribute).

Returns
-------
sdata : containers.SiderealStream
The regularly gridded sidereal timestream.
"""
import scipy.sparse as ss

self.log.info(f"Rebinning LSD:{data.attrs['lsd']:.0f}")

# Redistribute if needed too
data.redistribute("freq")

# Convert data timestamps into LSDs
timestamp_lsd = self.observer.unix_to_lsd(data.time)

# Fetch which LSD this is to set bounds
self.start = data.attrs["lsd"]
self.end = self.start + 1

# Create the output container
sdata = containers.SiderealStream(axes_from=data, ra=self.samples)
# Initialize the `effective_ra` dataset
sdata.add_dataset("effective_ra")
sdata.redistribute("freq")
sdata.attrs["lsd"] = self.start
sdata.attrs["tag"] = f"lsd_{self.start:.0f}"

# Get view of data
weight = data.weight[:].local_array
vis_data = data.vis[:].local_array

# Get the median time sample width
width_t = np.median(np.abs(np.diff(timestamp_lsd)))
# Create the regular grid of RA samples
target_lsd = np.linspace(self.start, self.end, self.samples, endpoint=False)
# Create the rebinning matrix
R = regrid.rebin_matrix(timestamp_lsd, target_lsd, width_t=width_t)
Rt = ss.csr_array(R.T)
# The square is used to calculate rebinned weights
Rtsq = Rt.power(2)

# dereference arrays before loop
sera = sdata.effective_ra[:].local_array
ssv = sdata.vis[:].local_array
ssw = sdata.weight[:].local_array

# Create a repeating view of the gridded ra axis. This is
# used to fill the effective ra of masked samples
grid_ra = np.broadcast_to(sdata.ra, (*sera.shape[1:],))

for fi in range(vis_data.shape[0]):
# Normalisation for rebinned datasets
norm = tools.invert_no_zero(weight[fi] @ Rt)

# Weighted rebin of the visibilities
ssv[fi] = norm * ((vis_data[fi] * weight[fi]) @ Rt)

# Weighted rebin of the effective RA
effective_lsd = norm * ((timestamp_lsd[np.newaxis] * weight[fi]) @ Rt)
sera[fi] = 360 * (effective_lsd - self.start)

# Rebin the weights
rvar = weight[fi] @ Rtsq
ssw[fi] = tools.invert_no_zero(norm**2 * rvar)

# Correct the effective ra where weights are zero. This
# is required to avoid discontinuities
mask = ssw[fi] == 0.0
sera[fi][mask] = grid_ra[mask]

return sdata


class RebinGradientCorrection(task.SingleTask):
"""Apply a linear gradient correction to shift RA samples to bin centres.

Requires a sidereal day with full sidereal coverage to calculate a local
gradient for each RA bin. The dataset value at the RA bin centre is
interpolated based on the local gradient and difference between bin centre
and effective bin centre.

If the rebinned dataset has full sidereal coverage, it can be used to
create the gradient.
"""

def setup(self, sstream_ref: containers.SiderealStream) -> None:
"""Provide the dataset to use in the gradient calculation.

This dataset must have complete sidereal coverage.

Parameters
----------
sstream_ref
Reference SiderealStream
"""
self.sstream_ref = sstream_ref

def process(self, sstream: containers.SiderealStream) -> containers.SiderealStream:
"""Apply the gradient correction to the input dataset.

Parameters
----------
sstream
sidereal day to apply a correction to

Returns
-------
sstream
Input sidereal day with gradient correction applied, if needed
"""
self.sstream_ref.redistribute("freq")
sstream.redistribute("freq")

# Allows a normal sidereal stream to pass through this task.
# Helpful for creating generic configs
try:
era = sstream.effective_ra[:].local_array
except KeyError:
self.log.info(
f"Dataset of type ({type(sstream)}) does not have an effective "
"ra dataset. No correction will be applied."
)
return sstream

try:
# If the reference dataset has an effective ra dataset, use this
# when calculating the gradient. This could be true if the reference
# and target datasets are the same
ref_ra = self.sstream_ref.effective_ra[:].local_array
except KeyError:
# Use fixed ra, which should be regularly sampled
ref_ra = self.sstream_ref.ra

vis = sstream.vis[:].local_array
weight = sstream.weight[:].local_array

ref_vis = self.sstream_ref.vis[:].local_array
ref_weight = self.sstream_ref.weight[:].local_array

# Iterate over frequencies and baselines for memory
for fi in range(vis.shape[0]):
# Skip if entirely masked already
if not np.any(weight[fi]):
continue

for vi in range(vis.shape[1]):
# Skip if entire baseline is masked
if not np.any(weight[fi, vi]):
continue

# Depends on whether the effective ra has baseline dependence
rra = ref_ra[fi, vi] if ref_ra.ndim > 1 else ref_ra
# Calculate the vis gradient at the reference RA points
mask = ref_weight[fi, vi] == 0.0
grad, mask = regrid.grad_1d(ref_vis[fi, vi], rra, mask, period=360.0)

# Apply the correction to estimate the sample value at the
# RA bin centre
vis[fi, vi] -= grad * (era[fi, vi] - sstream.ra)
# Zero any weights that could not be corrected
weight[fi, vi] *= (~mask).astype(weight.dtype)

ljgray marked this conversation as resolved.
Show resolved Hide resolved
# Copy to a new container without the effective_ra dataset
# This sort of functionality should probably be properly
# implemented somewhere else
out = containers.SiderealStream(attrs_from=sstream, axes_from=sstream)
# Iterate over datasets
for name, dset in sstream.datasets.items():
if name == "effective_ra":
continue

if name not in out.datasets:
out.add_dataset(name)

out.datasets[name][:] = dset[:]

return out


class SiderealStacker(task.SingleTask):
"""Take in a set of sidereal days, and stack them up.

Expand Down Expand Up @@ -512,6 +710,13 @@ def process(self, sdata):
sdata : containers.SiderealStream
Individual sidereal day to add to stack.
"""
# Check that the input container is of the correct type
if (self.stack is not None) and not isinstance(sdata, type(self.stack)):
raise TypeError(
f"type(sdata) (={type(sdata)}) does not match "
f"type(stack) (={type(self.stack)})."
)

sdata.redistribute("freq")

# Get the LSD (or CSD) label out of the input's attributes.
Expand Down Expand Up @@ -550,48 +755,45 @@ def process(self, sdata):
# Keep track of the sum of squared weights
# to perform Bessel's correction at the end.
if self.with_sample_variance:
self.sum_coeff_sq = np.zeros_like(self.stack.weight[:].view(np.ndarray))
self.sum_coeff_sq = mpiarray.zeros(
self.stack.weight[:].local_shape,
axis=0,
comm=self.stack.comm,
dtype=np.float32,
)

# Accumulate
self.log.info(f"Adding to stack LSD(s): {input_lsd!s}")

self.lsd_list += input_lsd

# Extract weight dataset
weight = sdata.weight[:]

# Determine if the input sidereal stream is itself a stack over days
if "nsample" in sdata.datasets:
# The input sidereal stream is already a stack
# over multiple sidereal days. Use the nsample
# dataset as the weight for the uniform case.
count = sdata.nsample[:]
else:
# The input sidereal stream contains a single
# sidereal day. Use a boolean array that
# sidereal day. Use a boolean array that
# indicates a non-zero weight dataset as
# the weight for the uniform case.
dtype = self.stack.nsample.dtype
count = (sdata.weight[:] > 0.0).astype(dtype)

# Determine the weights to be used in the average.
if self.weight == "uniform":
coeff = count.astype(np.float32)
# Accumulate the variances in the stack.weight dataset.
self.stack.weight[:] += (coeff**2) * tools.invert_no_zero(sdata.weight[:])
else:
coeff = sdata.weight[:]
# We are using inverse variance weights. In this case,
# we accumulate the inverse variances in the stack.weight
# dataset. Do that directly to avoid an unneccessary
# division in the more general expression above.
self.stack.weight[:] += sdata.weight[:]
count = (weight > 0.0).astype(self.stack.nsample.dtype)

# Accumulate the total number of samples.
self.stack.nsample[:] += count

# Below we will need to normalize by the current sum of coefficients.
# Can be found in the stack.nsample dataset for uniform case or
# the stack.weight dataset for inverse variance case.
if self.weight == "uniform":
sum_coeff = self.stack.nsample[:].astype(np.float32)
coeff = count.astype(np.float32)
self.stack.weight[:] += (coeff**2) * tools.invert_no_zero(weight)
sum_coeff = self.stack.nsample[:]

else:
coeff = weight
self.stack.weight[:] += weight
sum_coeff = self.stack.weight[:]

# Calculate weighted difference between the new data and the current mean.
Expand All @@ -600,6 +802,12 @@ def process(self, sdata):
# Update the mean.
self.stack.vis[:] += delta_before * tools.invert_no_zero(sum_coeff)

if "effective_ra" in self.stack.datasets:
# Accumulate the weighted average effective RA
delta_ra = coeff * (sdata.effective_ra[:] - self.stack.effective_ra[:])
# Update the mean
self.stack.effective_ra[:] += delta_ra * tools.invert_no_zero(sum_coeff)

# The calculations below are only required if the sample variance was requested
if self.with_sample_variance:
# Accumulate the sum of squared coefficients.
Expand All @@ -624,17 +832,23 @@ def process_finish(self):
self.stack.attrs["tag"] = self.tag
self.stack.attrs["lsd"] = np.array(self.lsd_list)

# For uniform weighting, normalize the accumulated variances by the total
# number of samples squared and then invert to finalize stack.weight.
if self.weight == "uniform":
# For uniform weighting, normalize the accumulated variances by the total
# number of samples squared and then invert to finalize stack.weight.
norm = self.stack.nsample[:].astype(np.float32)
self.stack.weight[:] = tools.invert_no_zero(self.stack.weight[:]) * norm**2

else:
# For inverse variance weighting without rebinning,
# additional normalization is not required.
norm = None

# We need to normalize the sample variance by the sum of coefficients.
# Can be found in the stack.nsample dataset for uniform case
# or the stack.weight dataset for inverse variance case.
if self.with_sample_variance:
if self.weight != "uniform":

if norm is None:
norm = self.stack.weight[:]

# Perform Bessel's correction. In the case of
Expand Down Expand Up @@ -684,15 +898,24 @@ def process(self, sdata):
sdata : containers.SiderealStream
Individual sidereal day to stack up.
"""
# Check that the input container is of the correct type
if (self.stack is not None) and not isinstance(sdata, type(self.stack)):
raise TypeError(
f"type(sdata) (={type(sdata)}) does not match "
f"type(stack) (={type(self.stack)})."
)

sdata.redistribute("freq")

if self.stack is None:
self.log.info("Starting new stack.")

self.stack = containers.empty_like(sdata)
self.stack.redistribute("freq")
self.stack.vis[:] = 0.0
self.stack.weight[:] = 0.0

# Initialise all datasets to zero
for ds in self.stack.datasets.values():
ds[:] = 0

self.count = 0
self.Ni_s = mpiarray.zeros(
Expand Down Expand Up @@ -741,6 +964,16 @@ def process(self, sdata):
# We need to keep the projection vector until the end
self.Vm.append(v)

if "effective_ra" in self.stack.datasets:
# Track the effective ra bin centres. We're using the averaged
# weights here, so this generally isn't optimal
delta = Ni_d * (sdata.effective_ra[:] - self.stack.effective_ra[:])
# Update the mean effective ra using the mean of the normalized weights
sum_weight = tools.invert_no_zero(self.stack.weight[:]) * self.Ni_s**2
self.stack.effective_ra[:] += delta * tools.invert_no_zero(
sum_weight.mean(axis=1)
)

# Get the LSD label out of the data (resort to using a CSD if it's
# present). If there's no label just use a place holder and stack
# anyway.
Expand Down
Loading
Loading