Skip to content

Commit

Permalink
feat(quarterstack): update config for pipeline revision 8
Browse files Browse the repository at this point in the history
  • Loading branch information
ljgray committed Sep 18, 2024
1 parent 79bb3d8 commit 90535a6
Showing 1 changed file with 140 additions and 55 deletions.
195 changes: 140 additions & 55 deletions ch_pipeline/processing/quarterstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,40 +70,64 @@
level_rank0: DEBUG
level_all: WARNING
# Test the MPI environment so that the pipeline fails
# early if there are issues
- type: draco.core.misc.CheckMPIEnvironment
params:
timeout: 420
# Aggressivly try to establish a database connection
- type: ch_pipeline.core.dataquery.ConnectDatabase
params:
timeout: 5
ntries: 5
# Load the telescope manager object
- type: draco.core.io.LoadProductManager
out: manager
params:
product_directory: "{product_path}"
# Load each Sidereal Stream which will go into this stack
- type: draco.core.io.LoadBasicCont
out: sstream
params:
files: *days
selections:
freq_range: [{freq[0]:d}, {freq[1]:d}]
# Mask out daytime data
- type: ch_pipeline.analysis.flagging.MaskDay
in: sstream
out: sstream_mask
# Mask out the moon when it can affect the data
- type: ch_pipeline.analysis.flagging.MaskMoon
in: sstream_mask
out: sstream_mask2
# Flag data based on database flags
- type: ch_pipeline.analysis.flagging.DataFlagger
in: sstream_mask2
out: sstream_mask3
params:
flag_type:
- acjump_sd
- rain1mm_sd
- srs/bad_ringmap_broadband
- bad_calibration_gains
- bad_calibration_fpga_restart
- bad_calibration_acquisition_restart
- snow
- decorrelated_cylinder
# Flag periods of rainfall which could affect data
- type: ch_pipeline.analysis.flagging.FlagRainfall
in: sstream_mask3
out: sstream_mask4
params:
accumulation_time: 30.0
threshold: 1.0
# Load gain errors as a function of time
- type: ch_pipeline.core.io.LoadSetupFile
out: gain_err
Expand All @@ -116,7 +140,7 @@
# Apply a mask that removes frequencies and times that suffer from gain errors
- type: ch_pipeline.analysis.calibration.FlagNarrowbandGainError
requires: gain_err
in: sstream_mask3
in: sstream_mask4
out: mask_gain_err
params:
transition: 600.0
Expand All @@ -125,38 +149,13 @@
save: false
- type: draco.analysis.flagging.ApplyRFIMask
in: [sstream_mask3, mask_gain_err]
out: sstream_mask4
# Flag out low weight samples to remove transient RFI artifacts at the edges of
# flagged regions
- type: draco.analysis.flagging.ThresholdVisWeightBaseline
requires: manager
in: sstream_mask4
out: full_tvwb_mask
params:
relative_threshold: 0.5
ignore_absolute_threshold: -1
average_type: "mean"
pols_to_flag: "all"
# Apply the tvwb mask. This will modify the data inplace.
- type: draco.analysis.flagging.ApplyBaselineMask
in: [sstream_mask4, full_tvwb_mask]
in: [sstream_mask4, mask_gain_err]
out: sstream_mask5
- type: draco.analysis.flagging.RFIMask
in: sstream_mask5
out: rfi_mask
params:
stack_ind: 66
- type: draco.analysis.flagging.ApplyRFIMask
in: [sstream_mask5, rfi_mask]
out: sstream_mask6
# Calculate a median in RA over a specified RA window. This acts
# as an estimation of the cross-talk for this stack
- type: ch_pipeline.analysis.sidereal.SiderealMean
in: sstream_mask6
in: sstream_mask5
out: med
params:
mask_ra: [[{ra_range[0]:.2f}, {ra_range[1]:.2f}]]
Expand All @@ -165,41 +164,68 @@
inverse_variance: false
- type: ch_pipeline.analysis.sidereal.ChangeSiderealMean
in: [sstream_mask6, med]
out: sstream_mask7
in: [sstream_mask5, med]
out: sstream_mask6
# Update the stack with each sidereal stream. This is effectively
# a weighted average
- type: draco.analysis.sidereal.SiderealStacker
in: sstream_mask7
out: sstack_stack
in: sstream_mask6
out: sstack
params:
tag: {tag}
# Apply a gradient rebin correction. If the input dataset was gridded
# using a method other than rebinning, this will be a no-op
- type: draco.analysis.sidereal.RebinGradientCorrection
requires: sstack
in: sstack
out: sstack_grad_fix
# Precision truncate the sidereal stack data
- type: draco.core.io.Truncate
in: sstack_stack
in: sstack_grad_fix
out: sstack_trunc
params:
dataset:
vis:
weight_dataset: vis_weight
variance_increase: 1.0e-4
variance_increase: 1.0e-5
vis_weight: 1.0e-6
# Save the sstack out to a zarr zip file
- type: draco.core.io.SaveZarrZip
in: sstack_trunc
out: sstack_zip_handle
params:
compression:
vis:
chunks: [16, 512, 512]
vis_weight:
chunks: [16, 512, 512]
save: true
output_name: "sstack.zarr.zip"
remove: true
# Save the sstack out to a zarr zip file
- type: draco.core.io.SaveZarrZip
in: sstack_trunc
out: zip_handle
# Block until the stack file is written out. Otherwise,
# masking steps can end up getting applied before saving.
- type: draco.core.misc.WaitUntil
requires: sstack_zip_handle
in: sstack_grad_fix
out: sstack2
# Make a vis grid
- type: draco.analysis.ringmapmaker.MakeVisGrid
requires: manager
in: sstack2
params:
output_name: "sstack.zarr.zip"
centered: true
save: true
output_name: "vis_grid.h5
- type: draco.analysis.ringmapmaker.RingMapMaker
requires: manager
in: sstack_trunc
in: sstack2
out: ringmap
params:
single_beam: true
Expand All @@ -216,38 +242,97 @@
dataset:
map:
weight_dataset: weight
variance_increase: 1.0e-4
variance_increase: 1.0e-5
weight: 1.0e-6
# Save the ringmap out to a ZarrZip file
- type: draco.core.io.SaveZarrZip
in: ringmap_trunc
out: ringmap_zip_handle
params:
compression:
map:
chunks: [1, 1, 16, 512, 512]
weight:
chunks: [1, 16, 512, 512]
dirty_beam:
chunks: [1, 1, 16, 512, 512]
save: true
output_name: "ringmap.zarr.zip"
remove: true
# Save the ringmap out to a ZarrZip file
- type: draco.core.io.SaveZarrZip
in: ringmap_trunc
out: zip_handle
# Mask out the bright sources so we can see the high delay structure more easily
- type: ch_pipeline.analysis.flagging.MaskSource
in: sstack2
out: sstack_flag_src
params:
output_name: "ringmap.zarr.zip"
source: ["CAS_A", "CYG_A", "TAU_A", "VIR_A"]
# Try and derive an optimal time-freq factorizable mask that covers the
# existing masked entries
- type: draco.analysis.flagging.MaskFreq
in: sstack_flag_src
out: factmask
params:
factorize: true
save: true
output_name: "fact_mask.h5"
# Apply the RFI mask. This will modify the data in place.
- type: draco.analysis.flagging.ApplyTimeFreqMask
in: [sstack_flag_src, factmask]
out: sstack_factmask
# Estimate the delay spectrum
- type: draco.analysis.delay.DelaySpectrumEstimator
# Estimate the delay power spectrum
- type: draco.analysis.delay.DelayPowerSpectrumStokesIEstimator
requires: manager
in: sstack_stack
in: sstack_factmask
params:
freq_frac: 0.01
time_frac: 0.01
remove_mean: true
freq_zero: 800.0
nfreq: {nfreq_delay}
nsamp: 100
maxpost: true
maxpost_tol: 1.0e-4
complex_timedomain: true
save: true
output_name: "delayspectrum.h5"
# Apply delay filter to stream
- type: draco.analysis.delay.DelayFilter
requires: manager
in: sstack_factmask
out: sstack_dfilter
params:
delay_cut: 0.1
za_cut: 1.0
window: true
# Estimate the high-pass filtered delay power spectrum
- type: draco.analysis.delay.DelayPowerSpectrumStokesIEstimator
requires: manager
in: sstack_dfilter
params:
freq_frac: 0.01
time_frac: 0.01
remove_mean: true
freq_zero: 800.0
nfreq: {nfreq_delay}
nsamp: 40
nsamp: 100
maxpost: true
maxpost_tol: 1.0e-4
complex_timedomain: true
save: true
output_name: "dspec.h5"
output_name: "delayspectrum_hpf.h5"
# Wait for the Zipping to finish
- type: draco.core.io.WaitZarrZip
in: zip_handle
in: sstack_zip_handle
- type: draco.core.io.WaitZarrZip
in: ringmap_zip_handle
"""


Expand Down

0 comments on commit 90535a6

Please sign in to comment.