Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(galt_auto_cont): Add container and task for saving raw Galt autos. #109

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

sareda26
Copy link
Contributor

This PR creates a task for extracting the Galt autocorrelations from raw holography acquisitions, before regridding, and saving them into a new container for longterm storage without keeping raw acquisitions.

_dataset_spec = {
"auto": {
"axes": ["freq", "pol", "time"],
"dtype": np.complex64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this container is explicitly for autos, this probably doesn't need to be complex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but in principle we also want the cross-correlation of the two 26m polarisations in addition to 26m XX and YY, and that has a phase, so I set the container to be complex.

@@ -802,6 +802,39 @@ def source(self):
return self.index_map["source"]


class GaltAutocorrelation(ContainerBase):
_axes = ("freq", "pol", "time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support common axes like freq and time, there are base classes in draco that you should inherit from. draco.core.containers.FreqContainer and draco.core.containers.TODContainer.

"""Extract the autocorrelations of the Galt telescope from a holography acquisition."""

# YY, YX, XX
_galt_prods = [2450, 2746, 3568]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would probably be preferable to determine these from the index_map rather than hard code them. I don't think these indices would have changed, but in principle they are allowed to.

@@ -473,3 +472,52 @@ def next(self, files):
)

return sorted(list(new_files))

class SaveGaltAutoCorrelation(task.SingleTask):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is a bit nit-picky, but I would suggest renaming this to something like ExtractGaltAutocorrelationand moving it out of io (since it only performs io if you choose to save the output, same as any task). I'm not sure where would be the best place for it... Maybe analysis.calibration since I assume this is meant for noise source analysis. analysis.beam could also work.

data.redistribute("freq")

# Dereference beam and weight datasets
beam = data.vis[:].view(np.ndarray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the preferred way to this now is to use local_array

Suggested change
beam = data.vis[:].view(np.ndarray)
beam = data.vis[:].local_array

# Redistribute output container over frequency
autocorrelation.redistribute("freq")

autocorrelation.auto[:] = galt_auto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure but I think this might trigger a warning with the new mpiarray... The alternative would be to use local_array again.

Suggested change
autocorrelation.auto[:] = galt_auto
autocorrelation.auto[:].local_array[:] = galt_auto

But if it works without generating warnings with the latest caput I guess it's fine.

@sareda26 sareda26 requested a review from newburgh August 22, 2022 20:19
Copy link
Contributor

@ssiegelx ssiegelx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Alex, looks great, just a few comments. The biggest one concerns the container type that we use to store the autocorrelations.

@@ -328,6 +328,8 @@ def process(self, data):
# Convert input times to hour angle
lha = unwrap_lha(self.observer.unix_to_lsa(data.time), ra)

self.log.warning(f"The starting and ending hour angles are {lha.min():.2f} and {lha.max():.2f}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think unless there is something wrong with the starting and ending hour angles, this should use self.log.info instead of self.log.warning.

"""Extract the autocorrelations of the Galt telescope from a holography acquisition."""

def process(self, data):
"""Extract the Galt autocorrelations and write them to disk.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add blank lines between the summary and parameters section, and between the parameters and returns section.

# Locate the holographic indices
layout_graph = layout.graph.from_db(data.time[0])

galt_inputs = get_holographic_index(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the get_holographic_index method defined in this module. Does this need to be imported from somewhere?

Alternatively, you could use the core.dataquery.QueryInputs task to query the layout database for the input_map. This will most likely already have been done as part of the holography pipeline.

Then, you would provide that input_map as a parameter with def process(self, data, input_map):, and use ch_util.tools.is_holographic to extract the galt inputs:

galt_inputs = np.flatnonzero([tools.is_holographic(inp) for inp in input_map])

I think this is probably preferable, because QueryInputs does a bunch of other stuff. It ensures only a single node queries the layout database at one time. It also gives the option to cache the input_map from a previous query, since the input_map rarely changes and since the query itself is slow and prone to connection errors.

flag_YX = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[1]), 1, 0)
flag_XX = np.where((ina == galt_inputs[1]) & (inb == galt_inputs[1]), 1, 0)

auto_flag = (flag_YY + flag_YX + flag_XX).astype(bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the way this is done, there is no guarantee that the products picked out by auto_flag will have the polarisation order that you define when creating the output container (although in practice it may end up working out based on the way the CHIME holography acquisitions are ordered).

I would probably construct the polarisation axis as follows:

pol = np.array([input_map[aa].pol + input_map[bb].pol for aa, bb in prodmap[auto_flag]])

time=data.index_map["time"],
comm=data.comm,
distributed=data.distributed,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to use freq=data.index_map["freq"] here to retain both the centre and width of each frequency channel.

More generally, you can pass axes_from=data as a keyword and then only specify the axes that are not present in data or that you would like to overwrite (which in this case would be the pol axis).

comm=data.comm,
distributed=data.distributed,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if instead of creating a separate GaltAutocorrelation container, we should use the existing draco.core.containers.TimeStream container with only 2 inputs. I think that has everything we need except for a way to specify the polarisations. The polarisations of the two inputs could be saved as an attribute and then for later tasks that use the autocorrelation data, you could derive the polarisation products as follows:

autocorrelation.attrs["pol"] = np.array([input_map[ii].pol for ii in galt_inputs])
pol_prod = [autocorrelation.attrs["pol"][aa] + autocorrelation.attrs["pol"][bb] for aa, bb in autocorrelation.prod]

The benefit of doing it this way is that we reduce the overall number of containers and we can apply other pipeline tasks that have been developed for the CHIME timestreams to the holography autocorrelation data without any modifications to those tasks.

@@ -802,6 +802,39 @@ def source(self):
return self.index_map["source"]


class GaltAutocorrelation(FreqContainer, TODContainer):
_axes = ("freq", "pol", "time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_axes = ("freq", "pol", "time")
_axes = ("pol",)

If you inherit from the base classes, then you no longer need to specify the axes covered by those classes. A new instance will have all the unique axes contained in all subclasses.

@@ -473,3 +472,4 @@ def next(self, files):
)

return sorted(list(new_files))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these new lines can be be removed so that this PR does not modify the io module.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants