Skip to content

Commit

Permalink
ss/daily filter (#291)
Browse files Browse the repository at this point in the history
* feat(dayenu): add a task that delay filters hybrid beamformed visibilities

* feat(dayenu): add option to save filter that was applied

* feat(dayenu): add option for multiple stopbands in delay filter

* feat(sidereal): support for rebinning HybridVisStream containers

* feat(sidereal): capability to stack multiple HybridVisStream

* feat(transform): task that iterates over sub-bands of an input container

* feat(ringmapmaker): new windows and option to ignore ns dirty beam

This provides two new windows for north-south beamforming,
triangular and tukey.  The triangular window approximates
natural weighting for chime, but can be scaled to ensure
achromatic synthesized beam response.  The tukey window
tapers the north and south ends of the cylinder, with the
fraction of cylinder tapered being an adjustable parameter.
Also, adds a flag to BeamformNS that if set does forces the
task to not save the dirty_beam to the container.

* feat(ringmapmaker): do not require dirty_beam in BeamformEW task

* fix(ringmapmaker): add missing factors in weight propagation for BeamformEW

* feat(ringmapmaker): option to flag arbitary ew baselines in BeamformEW

* feat(container): allow copy of non-filtered datasets

Currently when using the copy_datasets_filter method of a container,
datasets that do not contain an axis that is being filtered are
not copied over.  This gives the option to copy over those datasets
as well (and in full since they have no filter applied).  This updates
the SelectFreq task to do this by default.

* feat(beamform): beamform to a catalog of sources from HybridVisStream

* feat(beamform): fit beamformed visibilities versus hour angle

Linear fit to a model consisting of an additive offset and the primary beam.
Also only add redshift dataset to FormedBeam containers if the catalog
has redshifts.

* feat(flagging): update masking tasks to handle other types of containers

* feat(flagging): update RFIMaskChisqHighDelay to apply sumthreshold

* feat(flagging): polarisation dependent masking

Capability to generate a polarisation dependent mask in
RFIMaskChisqHighDelay.  Capability to apply a polarisation
dependent mask with the ApplyTimeFreqMask task.

* feat(rfi): option to flag only positive excursions in sumthreshold
  • Loading branch information
ssiegelx authored Oct 16, 2024
1 parent 556ca5f commit 38f7dc6
Show file tree
Hide file tree
Showing 9 changed files with 1,530 additions and 186 deletions.
406 changes: 391 additions & 15 deletions draco/analysis/beamform.py

Large diffs are not rendered by default.

220 changes: 207 additions & 13 deletions draco/analysis/dayenu.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,137 @@ def process(self, stream):
return out


class DayenuDelayFilterHybridVis(task.SingleTask):
"""Apply a DAYENU high-pass delay filter to hybrid beamformed visibilities.
Attributes
----------
tauw : float or np.ndarray[nstopband,]
The half width of the stop-band region in micro-seconds.
tauc : float or np.ndarray[nstopband,]
The centre of the stop-band region in micro-seconds.
Defaults to 0.
epsilon : float or np.ndarray[nstopband,]
The stop-band rejection. Defaults to 1e-12.
atten_threshold : float
Mask any frequency where the diagonal element of the filter
is less than this fraction of the median value over all
unmasked frequencies. Default is 0.0 (i.e., do not mask
frequencies with low attenuation).
save_filter : bool
Save the filter that was applied to the output container.
"""

tauw = config.Property(proptype=np.atleast_1d, default=0.4)
tauc = config.Property(proptype=np.atleast_1d, default=0.0)
epsilon = config.Property(proptype=np.atleast_1d, default=1e-12)

atten_threshold = config.Property(proptype=float, default=0.0)
save_filter = config.Property(proptype=bool, default=False)

def process(self, stream):
"""Filter out delays from a SiderealStream or TimeStream.
Parameters
----------
stream : HybridVisStream
Data to filter.
Returns
-------
stream_filt : HybridVisStream
Filtered dataset.
"""
# Create a filter dataset
if self.save_filter:
if np.any(np.abs(self.tauc) > 0.0):
stream.add_dataset("complex_filter")
else:
stream.add_dataset("filter")
stream.filter[:] = 0.0

# Distribute over products
stream.redistribute(["ra", "time"])

# Extract the required axes
freq = stream.freq[:]

npol, nfreq, new, nel, ntime = stream.vis.local_shape

# Dereference the required datasets
vis = stream.vis[:].local_array
weight = stream.weight[:].local_array
if self.save_filter:
filt = stream.filter[:].local_array

# Loop over products
for tt in range(ntime):

t0 = time.time()

flag = weight[..., tt] > 0.0
flag = np.all(flag, axis=0, keepdims=True)
weight[..., tt] *= flag.astype(weight.dtype)

for xx in range(new):

self.log.debug(f"Filter time {tt} of {ntime}, baseline {xx} of {new}.")

flagx = flag[0, :, xx, np.newaxis]
if not np.any(flagx):
continue

# Construct the filter
try:
NF, index = delay_filter(
freq,
flagx,
tau_width=self.tauw,
tau_centre=self.tauc,
epsilon=self.epsilon,
)

except np.linalg.LinAlgError as exc:
self.log.error(
f"Failed to converge while processing time {tt}: {exc}"
)
weight[:, :, xx, tt] = 0.0
continue

# Apply the filter
for pp in range(npol):

# Save the filter to the container
if self.save_filter:
filt[pp, :, :, xx, tt] = NF[0]

# Grab datasets for this pol and ew baseline
tvis = np.ascontiguousarray(vis[pp, :, xx, :, tt])
tvar = tools.invert_no_zero(weight[pp, :, xx, tt])

# Apply filter
vis[pp, :, xx, :, tt] = np.matmul(NF[0], tvis)
weight[pp, :, xx, tt] = tools.invert_no_zero(
np.matmul(np.abs(NF[0]) ** 2, tvar)
)

# Flag frequencies with large attenuation
if self.atten_threshold > 0.0:
diag = np.abs(np.diag(NF[0]))
med_diag = np.median(diag[diag > 0.0])

flag_low = diag > (self.atten_threshold * med_diag)

weight[pp, :, xx, tt] *= flag_low.astype(weight.dtype)

self.log.debug(f"Took {time.time() - t0:0.3f} seconds in total.")

# Problems saving to disk when distributed over last axis
stream.redistribute("freq")

return stream


class DayenuDelayFilterMap(task.SingleTask):
"""Apply a DAYENU high-pass delay filter to ringmap data.
Expand Down Expand Up @@ -753,54 +884,117 @@ def _get_cut(self, freq, xsep):
)


def highpass_delay_filter(freq, tau_cut, flag, epsilon=1e-12):
"""Construct a high-pass delay filter.
def delay_filter(freq, flag, tau_width, tau_centre=0.0, epsilon=1e-12):
"""Construct a delay filter.
The stop band will range from [-tau_cut, tau_cut].
The filter will attenuate signals with delays ranging from
[tau_centre - tau_width, tau_centre + tau_width]. If more
than one value of tau_centre and tau_width are provided,
then the filter will have multiple stop bands.
Parameters
----------
freq : np.ndarray[nfreq,]
Frequency in MHz.
tau_cut : float
The half width of the stop band in micro-seconds.
flag : np.ndarray[nfreq, ntime]
Boolean flag that indicates what frequencies are valid
as a function of time.
epsilon : float
The stop-band rejection of the filter. Defaults to 1e-12.
tau_width : float or np.ndarray[nstopband,]
The half width of the stop-band region in micro-seconds.
tau_centre : float or np.ndarray[nstopband,]
The centre of the stop-band region in micro-seconds.
Defaults to 0.
epsilon : float or np.ndarray[nstopband,]
The stop-band rejection. Defaults to 1e-12.
Returns
-------
pinv : np.ndarray[ntime_uniq, nfreq, nfreq]
High pass delay filter for each set of unique frequency flags.
Delay filter for each set of unique frequency flags.
index : list of length ntime_uniq
Maps the first axis of pinv to the original time axis.
Apply pinv[i] to the time samples at index[i].
"""

# Ensure consistent size for parameter values
def _ensure_consistent(param, nstopband):
if np.isscalar(param):
return [param] * nstopband
if len(param) == 1:
return [param[0]] * nstopband
assert len(param) == nstopband
return param

args = [tau_width, tau_centre, epsilon]
nstopband = np.max([np.atleast_1d(param).size for param in args])
args = [np.array(_ensure_consistent(param, nstopband)) for param in args]

# Determine datatype
dtype = np.complex128 if np.any(np.abs(args[1]) > 0.0) else np.float64

# Make sure the flag array is properly sized
ishp = flag.shape
nfreq = freq.size
assert ishp[0] == nfreq
assert len(ishp) == 2

cov = np.eye(nfreq, dtype=np.float64)
cov += (
np.sinc(2.0 * tau_cut * (freq[:, np.newaxis] - freq[np.newaxis, :])) / epsilon
)
# Construct the covariance matrix
dfreq = freq[:, np.newaxis] - freq[np.newaxis, :]

cov = np.eye(nfreq, dtype=dtype)
for tw, tc, eps in zip(*args):

term = np.sinc(2.0 * tw * dfreq) / eps
if np.abs(tc) > 0.0:
term = term * np.exp(-2.0j * np.pi * tc * dfreq)

cov += term

# Identify unique sets of flags versus frequency
uflag, uindex = np.unique(flag.reshape(nfreq, -1), return_inverse=True, axis=-1)
uflag = uflag.T
uflag = uflag[:, np.newaxis, :] & uflag[:, :, np.newaxis]
uflag = uflag.astype(np.float64)

# Create a separate covariance matrix for each set of unique flags
ucov = uflag * cov[np.newaxis, :, :]

# Peform a pseudo inverse of the masked covariance matrices
pinv = np.linalg.pinv(ucov, hermitian=True) * uflag
index = [np.flatnonzero(uindex == uu) for uu in range(pinv.shape[0])]

return pinv, index


def highpass_delay_filter(freq, tau_cut, flag, epsilon=1e-12):
"""Construct a high-pass delay filter.
The stop band will range from [-tau_cut, tau_cut].
This function is maintained for backwards compatability,
use delay_filter instead.
Parameters
----------
freq : np.ndarray[nfreq,]
Frequency in MHz.
tau_cut : float
The half width of the stop band in micro-seconds.
flag : np.ndarray[nfreq, ntime]
Boolean flag that indicates what frequencies are valid
as a function of time.
epsilon : float
The stop-band rejection of the filter. Defaults to 1e-12.
Returns
-------
pinv : np.ndarray[ntime_uniq, nfreq, nfreq]
High pass delay filter for each set of unique frequency flags.
index : list of length ntime_uniq
Maps the first axis of pinv to the original time axis.
Apply pinv[i] to the time samples at index[i].
"""
return delay_filter(freq, flag, tau_cut, 0.0, epsilon)


def bandpass_mmode_filter(ra, m_center, m_cut, flag, epsilon=1e-10):
"""Construct a bandpass m-mode filter.
Expand Down
Loading

0 comments on commit 38f7dc6

Please sign in to comment.