diff --git a/ch_pipeline/processing/quarterstack.py b/ch_pipeline/processing/quarterstack.py index eb7a209b..3c450ef1 100644 --- a/ch_pipeline/processing/quarterstack.py +++ b/ch_pipeline/processing/quarterstack.py @@ -70,11 +70,25 @@ level_rank0: DEBUG level_all: WARNING + # Test the MPI environment so that the pipeline fails + # early if there are issues + - type: draco.core.misc.CheckMPIEnvironment + params: + timeout: 420 + + # Aggressivly try to establish a database connection + - type: ch_pipeline.core.dataquery.ConnectDatabase + params: + timeout: 5 + ntries: 5 + + # Load the telescope manager object - type: draco.core.io.LoadProductManager out: manager params: product_directory: "{product_path}" + # Load each Sidereal Stream which will go into this stack - type: draco.core.io.LoadBasicCont out: sstream params: @@ -82,21 +96,23 @@ selections: freq_range: [{freq[0]:d}, {freq[1]:d}] + # Mask out daytime data - type: ch_pipeline.analysis.flagging.MaskDay in: sstream out: sstream_mask + # Mask out the moon when it can affect the data - type: ch_pipeline.analysis.flagging.MaskMoon in: sstream_mask out: sstream_mask2 + # Flag data based on database flags - type: ch_pipeline.analysis.flagging.DataFlagger in: sstream_mask2 out: sstream_mask3 params: flag_type: - acjump_sd - - rain1mm_sd - srs/bad_ringmap_broadband - bad_calibration_gains - bad_calibration_fpga_restart @@ -104,6 +120,14 @@ - snow - decorrelated_cylinder + # Flag periods of rainfall which could affect data + - type: ch_pipeline.analysis.flagging.FlagRainfall + in: sstream_mask3 + out: sstream_mask4 + params: + accumulation_time: 30.0 + threshold: 1.0 + # Load gain errors as a function of time - type: ch_pipeline.core.io.LoadSetupFile out: gain_err @@ -116,7 +140,7 @@ # Apply a mask that removes frequencies and times that suffer from gain errors - type: ch_pipeline.analysis.calibration.FlagNarrowbandGainError requires: gain_err - in: sstream_mask3 + in: sstream_mask4 out: mask_gain_err params: transition: 600.0 @@ -125,38 +149,13 @@ save: false - type: draco.analysis.flagging.ApplyRFIMask - in: [sstream_mask3, mask_gain_err] - out: sstream_mask4 - - # Flag out low weight samples to remove transient RFI artifacts at the edges of - # flagged regions - - type: draco.analysis.flagging.ThresholdVisWeightBaseline - requires: manager - in: sstream_mask4 - out: full_tvwb_mask - params: - relative_threshold: 0.5 - ignore_absolute_threshold: -1 - average_type: "mean" - pols_to_flag: "all" - - # Apply the tvwb mask. This will modify the data inplace. - - type: draco.analysis.flagging.ApplyBaselineMask - in: [sstream_mask4, full_tvwb_mask] + in: [sstream_mask4, mask_gain_err] out: sstream_mask5 - - type: draco.analysis.flagging.RFIMask - in: sstream_mask5 - out: rfi_mask - params: - stack_ind: 66 - - - type: draco.analysis.flagging.ApplyRFIMask - in: [sstream_mask5, rfi_mask] - out: sstream_mask6 - + # Calculate a median in RA over a specified RA window. This acts + # as an estimation of the cross-talk for this stack - type: ch_pipeline.analysis.sidereal.SiderealMean - in: sstream_mask6 + in: sstream_mask5 out: med params: mask_ra: [[{ra_range[0]:.2f}, {ra_range[1]:.2f}]] @@ -165,18 +164,26 @@ inverse_variance: false - type: ch_pipeline.analysis.sidereal.ChangeSiderealMean - in: [sstream_mask6, med] - out: sstream_mask7 + in: [sstream_mask5, med] + out: sstream_mask6 + # Update the stack with each sidereal stream. This is effectively + # a weighted average - type: draco.analysis.sidereal.SiderealStacker - in: sstream_mask7 - out: sstack_stack + in: sstream_mask6 + out: sstack params: tag: {tag} + # Apply a gradient rebin correction + - type: draco.analysis.sidereal.RebinGradientCorrection + requires: sstack + in: sstack + out: sstack_grad_fix + # Precision truncate the sidereal stack data - type: draco.core.io.Truncate - in: sstack_stack + in: sstack_grad_fix out: sstack_trunc params: dataset: @@ -193,13 +200,20 @@ # Save the sstack out to a zarr zip file - type: draco.core.io.SaveZarrZip in: sstack_trunc - out: zip_handle + out: sstack_zip_handle params: output_name: "sstack.zarr.zip" + # Block until the stack file is written out. Otherwise, + # masking steps can end up getting applied before saving. + - type: draco.core.misc.WaitUntil + requires: sstack_zip_handle + in: sstack_grad_fix + out: sstack2 + - type: draco.analysis.ringmapmaker.RingMapMaker requires: manager - in: sstack_trunc + in: sstack2 out: ringmap params: single_beam: true @@ -229,25 +243,82 @@ # Save the ringmap out to a ZarrZip file - type: draco.core.io.SaveZarrZip in: ringmap_trunc - out: zip_handle + out: ringmap_zip_handle params: output_name: "ringmap.zarr.zip" - # Estimate the delay spectrum - - type: draco.analysis.delay.DelaySpectrumEstimator + # Mask out the bright sources so we can see the high delay structure more easily + - type: ch_pipeline.analysis.flagging.MaskSource + in: sstack2 + out: sstack_flag_src + params: + source: ["CAS_A", "CYG_A", "TAU_A", "VIR_A"] + + # Try and derive an optimal time-freq factorizable mask that covers the + # existing masked entries + - type: draco.analysis.flagging.MaskFreq + in: sstack_flag_src + out: factmask + params: + factorize: true + save: true + output_name: "fact_mask.h5" + + # Apply the RFI mask. This will modify the data in place. + - type: draco.analysis.flagging.ApplyTimeFreqMask + in: [sstack_flag_src, factmask] + out: sstack_factmask + + # Estimate the delay power spectrum + - type: draco.analysis.delay.DelayPowerSpectrumStokesIEstimator requires: manager - in: sstack_stack + in: sstack_factmask params: + freq_frac: 0.01 + time_frac: 0.01 + remove_mean: true freq_zero: 800.0 + nfreq: {nfreq_delay} + nsamp: 100 + maxpost: true + maxpost_tol: 1.0e-4 complex_timedomain: true + save: true + output_name: "delayspectrum.h5" + + # Apply delay filter to stream + - type: draco.analysis.delay.DelayFilter + requires: manager + in: sstack_factmask + out: sstack_dfilter + params: + delay_cut: 0.1 + za_cut: 1.0 + window: true + + # Estimate the high-pass filtered delay power spectrum + - type: draco.analysis.delay.DelayPowerSpectrumStokesIEstimator + requires: manager + in: sstack_dfilter + params: + freq_frac: 0.01 + time_frac: 0.01 + remove_mean: true + freq_zero: 800.0 nfreq: {nfreq_delay} - nsamp: 40 + nsamp: 100 + maxpost: true + maxpost_tol: 1.0e-4 + complex_timedomain: true save: true - output_name: "dspec.h5" + output_name: "delayspectrum.h5" # Wait for the Zipping to finish - type: draco.core.io.WaitZarrZip - in: zip_handle + in: sstack_zip_handle + + - type: draco.core.io.WaitZarrZip + in: ringmap_zip_handle """