From 3bcf099c7eaa8b1f513858f0f31c3032fc6237bb Mon Sep 17 00:00:00 2001 From: Alex Reda Date: Fri, 19 Aug 2022 10:35:40 -0700 Subject: [PATCH 1/4] feat(galt_auto_cont): Add container and task for saving raw Galt autos to disk. --- ch_pipeline/core/containers.py | 33 ++++++++++++++++++++++ ch_pipeline/core/io.py | 50 +++++++++++++++++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/ch_pipeline/core/containers.py b/ch_pipeline/core/containers.py index f4dae46a..6188cb33 100644 --- a/ch_pipeline/core/containers.py +++ b/ch_pipeline/core/containers.py @@ -802,6 +802,39 @@ def source(self): return self.index_map["source"] +class GaltAutocorrelation(ContainerBase): + _axes = ("freq", "pol", "time") + + _dataset_spec = { + "auto": { + "axes": ["freq", "pol", "time"], + "dtype": np.complex64, + "initialise": True, + "distributed": True, + "distributed_axis": "freq", + }, + "weight": { + "axes": ["freq", "pol", "time"], + "dtype": np.float64, + "initialise": True, + "distributed": True, + "distributed_axis": "freq", + }, + } + + @property + def auto(self): + return self.datasets["auto"] + + @property + def weight(self): + return self.datasets["weight"] + + @property + def fpga_count(self): + return self.index_map["time"]["fpga_count"] + + def make_empty_corrdata( freq=None, input=None, diff --git a/ch_pipeline/core/io.py b/ch_pipeline/core/io.py index 52a6ea85..cd4e972e 100644 --- a/ch_pipeline/core/io.py +++ b/ch_pipeline/core/io.py @@ -40,7 +40,6 @@ from draco.core import task, io - class LoadCorrDataFiles(task.SingleTask): """Load data from files passed into the setup routine. @@ -473,3 +472,52 @@ def next(self, files): ) return sorted(list(new_files)) + +class SaveGaltAutoCorrelation(task.SingleTask): + """Extract the autocorrelations of the Galt telescope from a holography acquisition.""" + + # YY, YX, XX + _galt_prods = [2450, 2746, 3568] + + def process(self, data): + """Extract the Galt autocorrelations and write them to disk. + Parameters + ---------- + data: TimeStream + A TimeStream container holding a raw holography acquisition. + Returns + ------- + autocorrelation: containers.GaltAutocorrelation + A GaltAutocorrelation container holding the extracted Galt autos + as a function of frequency, polarization product, and time. + """ + from .containers import GaltAutocorrelation + + # Redistribute over freq + data.redistribute("freq") + + # Dereference beam and weight datasets + beam = data.vis[:].view(np.ndarray) + weight = data.weight[:].view(np.ndarray) + + # Load only the data corresponding to the Galt inputs + galt_auto = beam[:, self._galt_prods, :] + galt_weight = weight[:, self._galt_prods, :] + + # Initialize the auto container + autocorrelation = GaltAutocorrelation( + pol=np.array([b"YY", b"YX", b"XX"]), + attrs_from=data, + freq=data.freq, + time=data.index_map["time"], + comm=data.comm, + distributed=data.distributed, + ) + + # Redistribute output container over frequency + autocorrelation.redistribute("freq") + + autocorrelation.auto[:] = galt_auto + autocorrelation.weight[:] = galt_weight + + return autocorrelation From c2e52c7c1d5f6292e48c9f45b9a0bb09793479b5 Mon Sep 17 00:00:00 2001 From: Alex Reda Date: Mon, 22 Aug 2022 13:30:00 -0700 Subject: [PATCH 2/4] refactor(calibration): Move task for saving 26m autos to calibration.py. --- ch_pipeline/analysis/calibration.py | 57 +++++++++++++++++++++++++++++ ch_pipeline/core/containers.py | 2 +- ch_pipeline/core/io.py | 48 ------------------------ 3 files changed, 58 insertions(+), 49 deletions(-) diff --git a/ch_pipeline/analysis/calibration.py b/ch_pipeline/analysis/calibration.py index 8f45495a..e3bd5dd1 100644 --- a/ch_pipeline/analysis/calibration.py +++ b/ch_pipeline/analysis/calibration.py @@ -2364,3 +2364,60 @@ def _calculate_uv(freq, prod, inputmap): uv = dist[:, np.newaxis, :] / lmbda[np.newaxis, :, np.newaxis] return uv + +class ExtractGaltAutoCorrelation(task.SingleTask): + """Extract the autocorrelations of the Galt telescope from a holography acquisition.""" + + _galt_inputs = [1225, 1521] + + def process(self, data): + """Extract the Galt autocorrelations and write them to disk. + Parameters + ---------- + data: TimeStream + A TimeStream container holding a raw holography acquisition. + Returns + ------- + autocorrelation: containers.GaltAutocorrelation + A GaltAutocorrelation container holding the extracted Galt autos + as a function of frequency, polarization product, and time. + """ + # Redistribute over freq + data.redistribute("freq") + + # Get the product map and inputs + prodmap = data.prod + ina, inb = prodmap["input_a"], prodmap["input_b"] + + # Locate the Galt autocorrelations and cross-pol correlation + flag_YY = np.where((ina == self._galt_inputs[0]) & (inb == self._galt_inputs[0]), 1, 0) + flag_YX = np.where((ina == self._galt_inputs[0]) & (inb == self._galt_inputs[1]), 1, 0) + flag_XX = np.where((ina == self._galt_inputs[1]) & (inb == self._galt_inputs[1]), 1, 0) + + auto_flag = (flag_YY + flag_YX + flag_XX).astype(bool) + + # Dereference beam and weight datasets + beam = data.vis[:].local_array + weight = data.weight[:].local_array + + # Load only the data corresponding to the Galt inputs + galt_auto = beam[:, auto_flag, :] + galt_weight = weight[:, auto_flag, :] + + # Initialize the auto container + autocorrelation = containers.GaltAutocorrelation( + pol=np.array([b"YY", b"YX", b"XX"]), + attrs_from=data, + freq=data.freq, + time=data.index_map["time"], + comm=data.comm, + distributed=data.distributed, + ) + + # Redistribute output container over frequency + autocorrelation.redistribute("freq") + + autocorrelation.auto[:].local_array[:] = galt_auto + autocorrelation.weight[:].local_array[:] = galt_weight + + return autocorrelation diff --git a/ch_pipeline/core/containers.py b/ch_pipeline/core/containers.py index 6188cb33..761f294f 100644 --- a/ch_pipeline/core/containers.py +++ b/ch_pipeline/core/containers.py @@ -802,7 +802,7 @@ def source(self): return self.index_map["source"] -class GaltAutocorrelation(ContainerBase): +class GaltAutocorrelation(FreqContainer, TODContainer): _axes = ("freq", "pol", "time") _dataset_spec = { diff --git a/ch_pipeline/core/io.py b/ch_pipeline/core/io.py index cd4e972e..622332b5 100644 --- a/ch_pipeline/core/io.py +++ b/ch_pipeline/core/io.py @@ -473,51 +473,3 @@ def next(self, files): return sorted(list(new_files)) -class SaveGaltAutoCorrelation(task.SingleTask): - """Extract the autocorrelations of the Galt telescope from a holography acquisition.""" - - # YY, YX, XX - _galt_prods = [2450, 2746, 3568] - - def process(self, data): - """Extract the Galt autocorrelations and write them to disk. - Parameters - ---------- - data: TimeStream - A TimeStream container holding a raw holography acquisition. - Returns - ------- - autocorrelation: containers.GaltAutocorrelation - A GaltAutocorrelation container holding the extracted Galt autos - as a function of frequency, polarization product, and time. - """ - from .containers import GaltAutocorrelation - - # Redistribute over freq - data.redistribute("freq") - - # Dereference beam and weight datasets - beam = data.vis[:].view(np.ndarray) - weight = data.weight[:].view(np.ndarray) - - # Load only the data corresponding to the Galt inputs - galt_auto = beam[:, self._galt_prods, :] - galt_weight = weight[:, self._galt_prods, :] - - # Initialize the auto container - autocorrelation = GaltAutocorrelation( - pol=np.array([b"YY", b"YX", b"XX"]), - attrs_from=data, - freq=data.freq, - time=data.index_map["time"], - comm=data.comm, - distributed=data.distributed, - ) - - # Redistribute output container over frequency - autocorrelation.redistribute("freq") - - autocorrelation.auto[:] = galt_auto - autocorrelation.weight[:] = galt_weight - - return autocorrelation From 9c80c0c7093d701bfeb180dc3014d8981a8b1cfe Mon Sep 17 00:00:00 2001 From: Alex Reda Date: Wed, 24 Aug 2022 08:56:05 -0700 Subject: [PATCH 3/4] refactor(ExtractGaltAutoCorrelation): Query for holographic indices through layout table. --- ch_pipeline/analysis/beam.py | 3 +++ ch_pipeline/analysis/calibration.py | 16 +++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/ch_pipeline/analysis/beam.py b/ch_pipeline/analysis/beam.py index 571f6112..149a438f 100644 --- a/ch_pipeline/analysis/beam.py +++ b/ch_pipeline/analysis/beam.py @@ -328,6 +328,8 @@ def process(self, data): # Convert input times to hour angle lha = unwrap_lha(self.observer.unix_to_lsa(data.time), ra) + self.log.warning(f"The starting and ending hour angles are {lha.min():.2f} and {lha.max():.2f}.") + # perform regridding success = 1 try: @@ -1302,6 +1304,7 @@ def unwrap_lha(lsa, src_ra): Hour angle. """ # ensure monotonic + lsa[lsa > 180] -= 360.0 start_lsa = lsa[0] lsa -= start_lsa lsa[lsa < 0] += 360.0 diff --git a/ch_pipeline/analysis/calibration.py b/ch_pipeline/analysis/calibration.py index e3bd5dd1..f1348ced 100644 --- a/ch_pipeline/analysis/calibration.py +++ b/ch_pipeline/analysis/calibration.py @@ -16,6 +16,7 @@ from ch_util import fluxcat from ch_util import finder from ch_util import rfi +from ch_util import layout from draco.core import task from draco.util import _fast_tools @@ -2368,8 +2369,6 @@ def _calculate_uv(freq, prod, inputmap): class ExtractGaltAutoCorrelation(task.SingleTask): """Extract the autocorrelations of the Galt telescope from a holography acquisition.""" - _galt_inputs = [1225, 1521] - def process(self, data): """Extract the Galt autocorrelations and write them to disk. Parameters @@ -2385,14 +2384,21 @@ def process(self, data): # Redistribute over freq data.redistribute("freq") + # Locate the holographic indices + layout_graph = layout.graph.from_db(data.time[0]) + + galt_inputs = get_holographic_index( + get_correlator_inputs(layout_graph, correlator="chime") + ) + # Get the product map and inputs prodmap = data.prod ina, inb = prodmap["input_a"], prodmap["input_b"] # Locate the Galt autocorrelations and cross-pol correlation - flag_YY = np.where((ina == self._galt_inputs[0]) & (inb == self._galt_inputs[0]), 1, 0) - flag_YX = np.where((ina == self._galt_inputs[0]) & (inb == self._galt_inputs[1]), 1, 0) - flag_XX = np.where((ina == self._galt_inputs[1]) & (inb == self._galt_inputs[1]), 1, 0) + flag_YY = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[0]), 1, 0) + flag_YX = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[1]), 1, 0) + flag_XX = np.where((ina == galt_inputs[1]) & (inb == galt_inputs[1]), 1, 0) auto_flag = (flag_YY + flag_YX + flag_XX).astype(bool) From 079d2655a83272c61e3bcd3fae024d1bc54ebfe5 Mon Sep 17 00:00:00 2001 From: Alex Reda Date: Tue, 20 Sep 2022 08:16:02 -0700 Subject: [PATCH 4/4] refactor(galt_auto_cont): Construct products and pol indirectly, use TimeStream container. --- ch_pipeline/analysis/beam.py | 2 -- ch_pipeline/analysis/calibration.py | 40 ++++++++++++++++++----------- ch_pipeline/core/containers.py | 33 ------------------------ ch_pipeline/core/io.py | 1 - 4 files changed, 25 insertions(+), 51 deletions(-) diff --git a/ch_pipeline/analysis/beam.py b/ch_pipeline/analysis/beam.py index 149a438f..754e8701 100644 --- a/ch_pipeline/analysis/beam.py +++ b/ch_pipeline/analysis/beam.py @@ -328,8 +328,6 @@ def process(self, data): # Convert input times to hour angle lha = unwrap_lha(self.observer.unix_to_lsa(data.time), ra) - self.log.warning(f"The starting and ending hour angles are {lha.min():.2f} and {lha.max():.2f}.") - # perform regridding success = 1 try: diff --git a/ch_pipeline/analysis/calibration.py b/ch_pipeline/analysis/calibration.py index f1348ced..a991da70 100644 --- a/ch_pipeline/analysis/calibration.py +++ b/ch_pipeline/analysis/calibration.py @@ -2369,12 +2369,14 @@ def _calculate_uv(freq, prod, inputmap): class ExtractGaltAutoCorrelation(task.SingleTask): """Extract the autocorrelations of the Galt telescope from a holography acquisition.""" - def process(self, data): + def process(self, data, input_map): """Extract the Galt autocorrelations and write them to disk. + Parameters ---------- data: TimeStream A TimeStream container holding a raw holography acquisition. + Returns ------- autocorrelation: containers.GaltAutocorrelation @@ -2385,22 +2387,25 @@ def process(self, data): data.redistribute("freq") # Locate the holographic indices - layout_graph = layout.graph.from_db(data.time[0]) - - galt_inputs = get_holographic_index( - get_correlator_inputs(layout_graph, correlator="chime") - ) + galt_inputs = np.array(tools.get_holographic_index(input_map)) # Get the product map and inputs prodmap = data.prod ina, inb = prodmap["input_a"], prodmap["input_b"] # Locate the Galt autocorrelations and cross-pol correlation - flag_YY = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[0]), 1, 0) - flag_YX = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[1]), 1, 0) - flag_XX = np.where((ina == galt_inputs[1]) & (inb == galt_inputs[1]), 1, 0) + flag_cp1 = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[0]), 1, 0) + flag_xp = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[1]), 1, 0) + flag_cp2 = np.where((ina == galt_inputs[1]) & (inb == galt_inputs[1]), 1, 0) - auto_flag = (flag_YY + flag_YX + flag_XX).astype(bool) + auto_flag = (flag_cp1 + flag_xp + flag_cp2).astype(bool) + + # Construct the prod axis of the output container, and the corresponding pols + galt_auto_prod = prodmap[auto_flag] + + galt_auto_prod_pol = np.array( + [input_map[aa].pol + input_map[bb].pol for (aa, bb) in galt_auto_prod] + ) # Dereference beam and weight datasets beam = data.vis[:].local_array @@ -2411,11 +2416,12 @@ def process(self, data): galt_weight = weight[:, auto_flag, :] # Initialize the auto container - autocorrelation = containers.GaltAutocorrelation( - pol=np.array([b"YY", b"YX", b"XX"]), + autocorrelation = containers.TimeStream( attrs_from=data, - freq=data.freq, - time=data.index_map["time"], + axes_from=data, + stack=galt_auto_prod, + input=galt_inputs, + prod=galt_auto_prod, comm=data.comm, distributed=data.distributed, ) @@ -2423,7 +2429,11 @@ def process(self, data): # Redistribute output container over frequency autocorrelation.redistribute("freq") - autocorrelation.auto[:].local_array[:] = galt_auto + autocorrelation.vis[:].local_array[:] = galt_auto autocorrelation.weight[:].local_array[:] = galt_weight + # Save attributes describing the polarizations of the saved products + autocorrelation.attrs["prod_pol"] = galt_auto_prod_pol + autocorrelation.attrs["pol"] = np.array([input_map[ii].pol for ii in galt_inputs]) + return autocorrelation diff --git a/ch_pipeline/core/containers.py b/ch_pipeline/core/containers.py index 761f294f..f4dae46a 100644 --- a/ch_pipeline/core/containers.py +++ b/ch_pipeline/core/containers.py @@ -802,39 +802,6 @@ def source(self): return self.index_map["source"] -class GaltAutocorrelation(FreqContainer, TODContainer): - _axes = ("freq", "pol", "time") - - _dataset_spec = { - "auto": { - "axes": ["freq", "pol", "time"], - "dtype": np.complex64, - "initialise": True, - "distributed": True, - "distributed_axis": "freq", - }, - "weight": { - "axes": ["freq", "pol", "time"], - "dtype": np.float64, - "initialise": True, - "distributed": True, - "distributed_axis": "freq", - }, - } - - @property - def auto(self): - return self.datasets["auto"] - - @property - def weight(self): - return self.datasets["weight"] - - @property - def fpga_count(self): - return self.index_map["time"]["fpga_count"] - - def make_empty_corrdata( freq=None, input=None, diff --git a/ch_pipeline/core/io.py b/ch_pipeline/core/io.py index 622332b5..1a3dfb4e 100644 --- a/ch_pipeline/core/io.py +++ b/ch_pipeline/core/io.py @@ -472,4 +472,3 @@ def next(self, files): ) return sorted(list(new_files)) -