Skip to content

Commit

Permalink
feat(flagging): mask rfi based on delay filtered visibilities
Browse files Browse the repository at this point in the history
Adds three new tasks:
 - analysis.dayenu.DayenuDelayFilterFixedCutoff enables
   delay filtering of timeseries with rapidly changing frequency masks.
 - analysis.transform.ReduceChisq calculates a chi2-per-dof test
   statistic by comparing the delay filtered visibilities to the
   inverse noise variance stored in the weights.
 - analysis.flagging.RFIMaskChisqHighDelay generates a mask from this
   chi2-per-dof test statistic.
  • Loading branch information
Seth Siegel committed May 14, 2024
1 parent 9cf3e8b commit d95d81f
Show file tree
Hide file tree
Showing 3 changed files with 423 additions and 4 deletions.
138 changes: 138 additions & 0 deletions draco/analysis/dayenu.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,144 @@ def _get_cut(self, prod):
return baseline_delay_cut + self.tauw


class DayenuDelayFilterFixedCutoff(task.SingleTask):
"""Apply a DAYENU high-pass delay filter to visibility data.
This task loops over time instead of baseline. It can be used
to filter timeseries that have a rapidly changing frequency mask,
with the caveat that one must use the same delay cutoff for all baselines.
Attributes
----------
epsilon : float
The stop-band rejection of the filter. Default is 1e-12.
tauw : float
Delay cutoff in micro-seconds. Default is 0.4 micro-seconds.
single_mask : bool
Apply a single frequency mask for all baselines. Only includes
frequencies where the weights are nonzero for all baselines.
Otherwise will construct a filter for all unique single-time
frequency masks (can be significantly slower). Default is True.
atten_threshold : float
Mask any frequency where the diagonal element of the filter
is less than this fraction of the median value over all
unmasked frequencies. Default is 0.0 (i.e., do not mask
frequencies with low attenuation).
"""

epsilon = config.Property(proptype=float, default=1e-12)
tauw = config.Property(proptype=float, default=0.400)
single_mask = config.Property(proptype=bool, default=True)
atten_threshold = config.Property(proptype=float, default=0.0)

def process(self, stream):
"""Filter delays below some cutoff.
Modifies container in place.
Parameters
----------
stream : TimeStream or SiderealStream
Returns
-------
stream_filt : TimeStream or SiderealStream
"""
# Distribute over products
stream.redistribute(["ra", "time"])

# Extract the required axes
freq = stream.freq[:]

ntime = stream.vis.local_shape[2]

# Dereference the required datasets
vis = stream.vis[:].local_array
weight = stream.weight[:].local_array

# Identify baselines that are always flagged
temp = np.any(weight > 0.0, axis=(0, 2))
baseline_flag = np.zeros_like(temp)
self.comm.Allreduce(temp, baseline_flag, op=MPI.LOR)
baseline_mask = ~baseline_flag

if np.all(baseline_mask):
self.log.error("All data flaged as bad.")
return
elif np.any(baseline_mask) and not self.single_mask:
valid = np.flatnonzero(baseline_flag)
else:
valid = slice(None)

# Loop over products
for tt in range(ntime):

t0 = time.time()

# Flag frequencies and times with zero weight
if self.single_mask:
flag = (weight[:, :, tt] > 0.0) | baseline_mask
flag = np.all(flag, axis=-1, keepdims=True)
weight[:, :, tt] *= flag.astype(weight.dtype)
else:
flag = weight[:, valid, tt] > 0.0

if not np.any(flag):
continue

tvis = np.ascontiguousarray(vis[:, valid, tt])
tvar = tools.invert_no_zero(weight[:, valid, tt])

self.log.debug(f"Filter time {tt} of {ntime}. [{self.tauw:0.3f} microsec]")

# Construct the filter
try:
NF, index = highpass_delay_filter(
freq, self.tauw, flag, epsilon=self.epsilon
)

except np.linalg.LinAlgError as exc:
self.log.error("Failed to converge while processing time {tt}: {exc}")
weight[:, :, tt] = 0.0
continue

# Apply the filter
if self.single_mask:
vis[:, :, tt] = np.matmul(NF[0], tvis)
weight[:, :, tt] = tools.invert_no_zero(np.matmul(NF[0] ** 2, tvar))

if self.atten_threshold > 0.0:
diag = np.diag(NF[0])
med_diag = np.median(diag[diag > 0.0])

flag_low = diag > (self.atten_threshold * med_diag)

weight[:, :, tt] *= flag_low[:, np.newaxis].astype(weight.dtype)

else:
self.log.debug(f"There are {len(index)} unique masks/filters.")
for ii, ind in enumerate(index):

vis[:, valid[ind], tt] = np.matmul(NF[ii], tvis[:, ind])
weight[:, valid[ind], tt] = tools.invert_no_zero(
np.matmul(NF[ii] ** 2, tvar[:, ind])
)

if self.atten_threshold > 0.0:
diag = np.diag(NF[ii])
med_diag = np.median(diag[diag > 0.0])

flag_low = diag > (self.atten_threshold * med_diag)

weight[:, valid[ind], tt] *= flag_low[:, np.newaxis].astype(
weight.dtype
)

self.log.debug(f"Took {time.time() - t0:0.3f} seconds in total.")

return stream


class DayenuDelayFilterMap(task.SingleTask):
"""Apply a DAYENU high-pass delay filter to ringmap data.
Expand Down
Loading

0 comments on commit d95d81f

Please sign in to comment.