Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hfb.analysis): Add on-source moving average #262

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 67 additions & 23 deletions ch_pipeline/hfb/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from . import containers
from .io import BeamSelectionMixin
from .pfb import DeconvolvePFB
import gc


class HFBAverage(task.SingleTask):
Expand Down Expand Up @@ -443,13 +444,16 @@ class HFBOnOffDifference(task.SingleTask):
----------
offset : int
Number of samples on either side of the on-source sample to be ignored.
nsamples : int
nsamples_off : int
Number of off-source samples on either side of the on-source sample to
be averaged.
be averaged over RA axis.
nsamples_on : int
Number of on-source samples to be averaged along RA axis.
"""

offset = config.Property(proptype=int, default=5)
nsamples = config.Property(proptype=int, default=5)
nsamples_off = config.Property(proptype=int, default=5)
nsamples_on = config.Property(proptype=int, default=2)

def process(self, stream):
"""Takes the average of off-source data and subtract it from on-source data.
Expand Down Expand Up @@ -479,63 +483,92 @@ def process(self, stream):
ra = stream.ra[:]
nra = len(ra)
# Create a 1D kernel to select off-source data.
kernel = np.zeros_like(ra)
off_kernel = np.zeros_like(ra)
# Fill desired elements with one. nsample elements on either sides
left = np.arange(self.offset + 1, self.offset + self.nsamples + 1)
right = np.arange(nra - self.offset - self.nsamples, nra - self.offset)
kernel[left] = 1
kernel[right] = 1
left = np.arange(self.offset + 1, self.offset + self.nsamples_off + 1)
right = np.arange(nra - self.offset - self.nsamples_off, nra - self.offset)
off_kernel[left] = 1
off_kernel[right] = 1
# We want the weighted average of off-source data, which is
# sum(off-source data * weights) / sum(off-source weights)

# Take the weighted sum of off-source data (numerator of the
# weighted average). To do that, convolve the kernel above
# with the data
ker_fft = np.fft.fft(kernel)
ker_fft = np.fft.fft(off_kernel)
dat_fft = np.fft.fft(data * weight, axis=2)
sum_data = np.fft.ifft(
sum_data_off = np.fft.ifft(
dat_fft * ker_fft[np.newaxis, np.newaxis, :, np.newaxis], axis=2
).real

# Then, convolve the kernel above with the weights to find the sum
# of off-source weights (denominator of the weighted average)
weight_fft = np.fft.fft(weight, axis=2)

# n is the sum of off-source weights over ra axis
sum_weight = np.fft.ifft(
# Sum of off-source weights over ra axis
sum_weight_off = np.fft.ifft(
weight_fft * ker_fft[np.newaxis, np.newaxis, :, np.newaxis], axis=2
).real

# If the weight is zero, ifft above returns very small, but non-zero
# value for n. This number goes to the denominator of weighted mean
# and makes the weighted mean very large. To avoid such numerical
# error (10^-16), zero n whereever weight is zero.
sum_weight[weight == 0] = 0
sum_weight_off[weight == 0] = 0

# Weighted average of off-source data
off = sum_data * tools.invert_no_zero(sum_weight)
off = sum_data_off * tools.invert_no_zero(sum_weight_off)

del sum_data_off, ker_fft, off_kernel

self.log.debug(
"The following intermediate arrays are removed: sum_data_off, ker_fft, off_kernel"
)

# Computing weighted average of on-source data
on_kernel = np.zeros(nra)
on_samples = np.arange(self.nsamples_on)
on_kernel[on_samples] = 1
ker_fft = np.fft.fft(on_kernel)
# Weighted sum of on-source data
sum_data_on = np.fft.ifft(dat_fft * ker_fft[None, None, :, None], axis=2).real
# Sum over on-source weights
sum_weight_on = np.fft.ifft(
weight_fft * ker_fft[None, None, :, None], axis=2
).real # sum over weights
sum_weight_on[weight == 0] = 0
# Weighted average of on-source data
on = data * weight * tools.invert_no_zero(weight)
on = sum_data_on * tools.invert_no_zero(sum_weight_on)

del sum_data_on, ker_fft, on_kernel
self.log.debug(
"The following intermediate arrays are removed: sum_data_on, ker_fft, on_kernel"
)

# And on-off difference is:
on_off = on - off

del on, off
self.log.debug("The following intermediate arrays are removed: on, off")

# Now, evaluate the weight of on-off differenced data

# Note that this only returns the weights corresponding to
# inverse variance weighted data
# TODO: Compute the weights corresponding to uniform average of data
# On-source variance
var = tools.invert_no_zero(weight)

# Weight of average of off-source data is the sum over weights
# That sum if given by n. So variance is 1/n
var_off = tools.invert_no_zero(sum_weight)
# On-source and off-source variances
var_on = tools.invert_no_zero(sum_weight_on)
var_off = tools.invert_no_zero(sum_weight_off)

# variance of on-off data
var_diff = var + var_off
var_diff[var == 0] = 0
var_diff = var_on + var_off
var_diff[var_on == 0] = 0

del var_on, var_off
self.log.debug("The following intermediate arrays are removed: var_on, var_off")

# Garbage collection
gc.collect()

# weight of on-off data
weight_diff = tools.invert_no_zero(var_diff)
Expand Down Expand Up @@ -1419,9 +1452,20 @@ def process(self, stream):
# Calculate weighted median (to exclude flagged data) along beam axis:
median = weighted_median.weighted_median(data_s, binary_weight)

del binary_weight, data_s
self.log.debug(
"The following intermediate arrays are removed: binary_weight, data_s"
)

# Subtract weighted median along all beams from the data
diff = data - median[:, :, np.newaxis, :]

del median
self.log.debug("The following intermediate array is removed: median")

# Garbage collection
gc.collect()

# Create container to hold output
out = containers.HFBData(stream)

Expand Down
Loading