diff --git a/ch_pipeline/analysis/beam.py b/ch_pipeline/analysis/beam.py index 571f6112..754e8701 100644 --- a/ch_pipeline/analysis/beam.py +++ b/ch_pipeline/analysis/beam.py @@ -1302,6 +1302,7 @@ def unwrap_lha(lsa, src_ra): Hour angle. """ # ensure monotonic + lsa[lsa > 180] -= 360.0 start_lsa = lsa[0] lsa -= start_lsa lsa[lsa < 0] += 360.0 diff --git a/ch_pipeline/analysis/calibration.py b/ch_pipeline/analysis/calibration.py index 8f45495a..a991da70 100644 --- a/ch_pipeline/analysis/calibration.py +++ b/ch_pipeline/analysis/calibration.py @@ -16,6 +16,7 @@ from ch_util import fluxcat from ch_util import finder from ch_util import rfi +from ch_util import layout from draco.core import task from draco.util import _fast_tools @@ -2364,3 +2365,75 @@ def _calculate_uv(freq, prod, inputmap): uv = dist[:, np.newaxis, :] / lmbda[np.newaxis, :, np.newaxis] return uv + +class ExtractGaltAutoCorrelation(task.SingleTask): + """Extract the autocorrelations of the Galt telescope from a holography acquisition.""" + + def process(self, data, input_map): + """Extract the Galt autocorrelations and write them to disk. + + Parameters + ---------- + data: TimeStream + A TimeStream container holding a raw holography acquisition. + + Returns + ------- + autocorrelation: containers.GaltAutocorrelation + A GaltAutocorrelation container holding the extracted Galt autos + as a function of frequency, polarization product, and time. + """ + # Redistribute over freq + data.redistribute("freq") + + # Locate the holographic indices + galt_inputs = np.array(tools.get_holographic_index(input_map)) + + # Get the product map and inputs + prodmap = data.prod + ina, inb = prodmap["input_a"], prodmap["input_b"] + + # Locate the Galt autocorrelations and cross-pol correlation + flag_cp1 = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[0]), 1, 0) + flag_xp = np.where((ina == galt_inputs[0]) & (inb == galt_inputs[1]), 1, 0) + flag_cp2 = np.where((ina == galt_inputs[1]) & (inb == galt_inputs[1]), 1, 0) + + auto_flag = (flag_cp1 + flag_xp + flag_cp2).astype(bool) + + # Construct the prod axis of the output container, and the corresponding pols + galt_auto_prod = prodmap[auto_flag] + + galt_auto_prod_pol = np.array( + [input_map[aa].pol + input_map[bb].pol for (aa, bb) in galt_auto_prod] + ) + + # Dereference beam and weight datasets + beam = data.vis[:].local_array + weight = data.weight[:].local_array + + # Load only the data corresponding to the Galt inputs + galt_auto = beam[:, auto_flag, :] + galt_weight = weight[:, auto_flag, :] + + # Initialize the auto container + autocorrelation = containers.TimeStream( + attrs_from=data, + axes_from=data, + stack=galt_auto_prod, + input=galt_inputs, + prod=galt_auto_prod, + comm=data.comm, + distributed=data.distributed, + ) + + # Redistribute output container over frequency + autocorrelation.redistribute("freq") + + autocorrelation.vis[:].local_array[:] = galt_auto + autocorrelation.weight[:].local_array[:] = galt_weight + + # Save attributes describing the polarizations of the saved products + autocorrelation.attrs["prod_pol"] = galt_auto_prod_pol + autocorrelation.attrs["pol"] = np.array([input_map[ii].pol for ii in galt_inputs]) + + return autocorrelation diff --git a/ch_pipeline/core/io.py b/ch_pipeline/core/io.py index 52a6ea85..1a3dfb4e 100644 --- a/ch_pipeline/core/io.py +++ b/ch_pipeline/core/io.py @@ -40,7 +40,6 @@ from draco.core import task, io - class LoadCorrDataFiles(task.SingleTask): """Load data from files passed into the setup routine.