Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(galt_auto_cont): Add container and task for saving raw Galt autos. #109

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ch_pipeline/analysis/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ def process(self, data):
# Convert input times to hour angle
lha = unwrap_lha(self.observer.unix_to_lsa(data.time), ra)

self.log.warning(f"The starting and ending hour angles are {lha.min():.2f} and {lha.max():.2f}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think unless there is something wrong with the starting and ending hour angles, this should use self.log.info instead of self.log.warning.


# perform regridding
success = 1
try:
Expand Down Expand Up @@ -1302,6 +1304,7 @@ def unwrap_lha(lsa, src_ra):
Hour angle.
"""
# ensure monotonic
lsa[lsa > 180] -= 360.0
start_lsa = lsa[0]
lsa -= start_lsa
lsa[lsa < 0] += 360.0
Expand Down
63 changes: 63 additions & 0 deletions ch_pipeline/analysis/calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ch_util import fluxcat
from ch_util import finder
from ch_util import rfi
from ch_util import layout

from draco.core import task
from draco.util import _fast_tools
Expand Down Expand Up @@ -2364,3 +2365,65 @@ def _calculate_uv(freq, prod, inputmap):
uv = dist[:, np.newaxis, :] / lmbda[np.newaxis, :, np.newaxis]

return uv

class ExtractGaltAutoCorrelation(task.SingleTask):
"""Extract the autocorrelations of the Galt telescope from a holography acquisition."""

def process(self, data):
"""Extract the Galt autocorrelations and write them to disk.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add blank lines between the summary and parameters section, and between the parameters and returns section.

Parameters
----------
data: TimeStream
A TimeStream container holding a raw holography acquisition.
Returns
-------
autocorrelation: containers.GaltAutocorrelation
A GaltAutocorrelation container holding the extracted Galt autos
as a function of frequency, polarization product, and time.
"""
# Redistribute over freq
data.redistribute("freq")

# Locate the holographic indices
layout_graph = layout.graph.from_db(data.time[0])

galt_inputs = get_holographic_index(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the get_holographic_index method defined in this module. Does this need to be imported from somewhere?

Alternatively, you could use the core.dataquery.QueryInputs task to query the layout database for the input_map. This will most likely already have been done as part of the holography pipeline.

Then, you would provide that input_map as a parameter with def process(self, data, input_map):, and use ch_util.tools.is_holographic to extract the galt inputs:

galt_inputs = np.flatnonzero([tools.is_holographic(inp) for inp in input_map])

I think this is probably preferable, because QueryInputs does a bunch of other stuff. It ensures only a single node queries the layout database at one time. It also gives the option to cache the input_map from a previous query, since the input_map rarely changes and since the query itself is slow and prone to connection errors.

get_correlator_inputs(layout_graph, correlator="chime")
)

# Get the product map and inputs
prodmap = data.prod
ina, inb = prodmap["input_a"], prodmap["input_b"]

# Locate the Galt autocorrelations and cross-pol correlation
flag_YY = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[0]), 1, 0)
flag_YX = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[1]), 1, 0)
flag_XX = np.where((ina == galt_inputs[1]) & (inb == galt_inputs[1]), 1, 0)

auto_flag = (flag_YY + flag_YX + flag_XX).astype(bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the way this is done, there is no guarantee that the products picked out by auto_flag will have the polarisation order that you define when creating the output container (although in practice it may end up working out based on the way the CHIME holography acquisitions are ordered).

I would probably construct the polarisation axis as follows:

pol = np.array([input_map[aa].pol + input_map[bb].pol for aa, bb in prodmap[auto_flag]])


# Dereference beam and weight datasets
beam = data.vis[:].local_array
weight = data.weight[:].local_array

# Load only the data corresponding to the Galt inputs
galt_auto = beam[:, auto_flag, :]
galt_weight = weight[:, auto_flag, :]

# Initialize the auto container
autocorrelation = containers.GaltAutocorrelation(
pol=np.array([b"YY", b"YX", b"XX"]),
attrs_from=data,
freq=data.freq,
time=data.index_map["time"],
comm=data.comm,
distributed=data.distributed,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to use freq=data.index_map["freq"] here to retain both the centre and width of each frequency channel.

More generally, you can pass axes_from=data as a keyword and then only specify the axes that are not present in data or that you would like to overwrite (which in this case would be the pol axis).


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if instead of creating a separate GaltAutocorrelation container, we should use the existing draco.core.containers.TimeStream container with only 2 inputs. I think that has everything we need except for a way to specify the polarisations. The polarisations of the two inputs could be saved as an attribute and then for later tasks that use the autocorrelation data, you could derive the polarisation products as follows:

autocorrelation.attrs["pol"] = np.array([input_map[ii].pol for ii in galt_inputs])
pol_prod = [autocorrelation.attrs["pol"][aa] + autocorrelation.attrs["pol"][bb] for aa, bb in autocorrelation.prod]

The benefit of doing it this way is that we reduce the overall number of containers and we can apply other pipeline tasks that have been developed for the CHIME timestreams to the holography autocorrelation data without any modifications to those tasks.

# Redistribute output container over frequency
autocorrelation.redistribute("freq")

autocorrelation.auto[:].local_array[:] = galt_auto
autocorrelation.weight[:].local_array[:] = galt_weight

return autocorrelation
33 changes: 33 additions & 0 deletions ch_pipeline/core/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,39 @@ def source(self):
return self.index_map["source"]


class GaltAutocorrelation(FreqContainer, TODContainer):
_axes = ("freq", "pol", "time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support common axes like freq and time, there are base classes in draco that you should inherit from. draco.core.containers.FreqContainer and draco.core.containers.TODContainer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_axes = ("freq", "pol", "time")
_axes = ("pol",)

If you inherit from the base classes, then you no longer need to specify the axes covered by those classes. A new instance will have all the unique axes contained in all subclasses.


_dataset_spec = {
"auto": {
"axes": ["freq", "pol", "time"],
"dtype": np.complex64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this container is explicitly for autos, this probably doesn't need to be complex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but in principle we also want the cross-correlation of the two 26m polarisations in addition to 26m XX and YY, and that has a phase, so I set the container to be complex.

"initialise": True,
"distributed": True,
"distributed_axis": "freq",
},
"weight": {
"axes": ["freq", "pol", "time"],
"dtype": np.float64,
"initialise": True,
"distributed": True,
"distributed_axis": "freq",
},
}

@property
def auto(self):
return self.datasets["auto"]

@property
def weight(self):
return self.datasets["weight"]

@property
def fpga_count(self):
return self.index_map["time"]["fpga_count"]


def make_empty_corrdata(
freq=None,
input=None,
Expand Down
2 changes: 1 addition & 1 deletion ch_pipeline/core/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

from draco.core import task, io


class LoadCorrDataFiles(task.SingleTask):
"""Load data from files passed into the setup routine.

Expand Down Expand Up @@ -473,3 +472,4 @@ def next(self, files):
)

return sorted(list(new_files))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these new lines can be be removed so that this PR does not modify the io module.