Skip to content

Commit

Permalink
feat(dayenu): optionally apply dayenu filter in loop from single or m…
Browse files Browse the repository at this point in the history
…ultiple datasets
  • Loading branch information
ljgray committed Dec 16, 2024
1 parent 0b35171 commit d3604b3
Showing 1 changed file with 46 additions and 20 deletions.
66 changes: 46 additions & 20 deletions draco/analysis/dayenu.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,55 +570,47 @@ class ApplyDelayFilterHybridVis(task.SingleTask):

atten_threshold = config.Property(proptype=float, default=0.0)

def setup(self, ss):
"""Set the DAYENU filter to be applied to hybrid beamformed data.
Parameters
----------
ss: containers.HybridVisStream
The filter of HybridVisStream to be applied.
"""
self.sstream = ss

def process(self, hv):
def process(self, hv, source):
"""Apply the DAYENU filter to a HybridVisStream.
Parameters
----------
hv: containers.HybridVisStream
The data the filter will be applied to.
The data the filter will be applied to.
source: containers.HybridVisStream
The filter of HybridVisStream to be applied.
Returns
-------
hv_filt: containers.HybridVisStream
The filtered dataset.
The filtered dataset.
"""
# Distribute over products
hv.redistribute(["ra", "time"])
self.sstream.redistribute(["ra", "time"])
source.redistribute(["ra", "time"])

# Validate that both hybrid beamformed visibilites match
if not np.array_equal(self.sstream.freq, hv.freq):
if not np.array_equal(source.freq, hv.freq):
raise ValueError("Frequencies do not match for hybrid visibilities.")

if not np.array_equal(self.sstream.index_map["el"], hv.index_map["el"]):
if not np.array_equal(source.index_map["el"], hv.index_map["el"]):
raise ValueError("Elevations do not match for hybrid visibilities.")

if not np.array_equal(self.sstream.index_map["ew"], hv.index_map["ew"]):
if not np.array_equal(source.index_map["ew"], hv.index_map["ew"]):
raise ValueError("EW baselines do not match for hybrid visibilities.")

if not np.array_equal(self.sstream.index_map["pol"], hv.index_map["pol"]):
if not np.array_equal(source.index_map["pol"], hv.index_map["pol"]):
raise ValueError("Polarisations do not match for hybrid visibilities.")

if not np.array_equal(self.sstream.ra, hv.ra):
if not np.array_equal(source.ra, hv.ra):
raise ValueError("Right Ascension do not match for hybrid visibilities.")

npol, nfreq, new, nel, ntime = hv.vis.local_shape

# Dereference the required datasets
vis = hv.vis[:].local_array
weight = hv.weight[:].local_array
filt = self.sstream.filter[:].local_array
filt = source.filter[:].local_array

# loop over products
for tt in range(ntime):
Expand Down Expand Up @@ -655,6 +647,40 @@ def process(self, hv):
return hv


class ApplyDelayFilterHybridVisSingleSource(ApplyDelayFilterHybridVis):
"""Apply a previously saved filter to the hybrid beamformed visibilities.
This task differs from `ApplyDelayFilterHybridVis` in that it applies
a _single_ filter to multiple possible datasets.
"""

def setup(self, source):
"""Set the DAYENU filter to be applied to hybrid beamformed data.
Parameters
----------
source: containers.HybridVisStream
The filter of HybridVisStream to be applied.
"""
self.source = source

def process(self, hv):
"""Apply the DAYENU filter to a HybridVisStream.
Parameters
----------
hv: containers.HybridVisStream
The data the filter will be applied to.
Returns
-------
hv_filt: containers.HybridVisStream
The filtered dataset.
"""
# Apply the previously saved filter to this dataset
return super().process(hv, self.source)


class DayenuDelayFilterMap(task.SingleTask):
"""Apply a DAYENU high-pass delay filter to ringmap data.
Expand Down

0 comments on commit d3604b3

Please sign in to comment.