Skip to content

Commit

Permalink
feat(quarterstack): update for daily pipeline rev_08 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ljgray committed Aug 7, 2024
1 parent 478b23d commit 8f8427f
Showing 1 changed file with 162 additions and 74 deletions.
236 changes: 162 additions & 74 deletions ch_pipeline/processing/quarterstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,40 +70,64 @@
level_rank0: DEBUG
level_all: WARNING
# Test the MPI environment so that the pipeline fails
# early if there are issues
- type: draco.core.misc.CheckMPIEnvironment
params:
timeout: 420
# Aggressivly try to establish a database connection
- type: ch_pipeline.core.dataquery.ConnectDatabase
params:
timeout: 5
ntries: 5
# Load the telescope manager object
- type: draco.core.io.LoadProductManager
out: manager
params:
product_directory: "{product_path}"
# Load each Sidereal Stream which will go into this stack
- type: draco.core.io.LoadBasicCont
out: sstream
params:
files: *days
selections:
freq_range: [{freq[0]:d}, {freq[1]:d}]
# Mask out daytime data
- type: ch_pipeline.analysis.flagging.MaskDay
in: sstream
out: sstream_mask
# Mask out the moon when it can affect the data
- type: ch_pipeline.analysis.flagging.MaskMoon
in: sstream_mask
out: sstream_mask2
# Flag data based on database flags
- type: ch_pipeline.analysis.flagging.DataFlagger
in: sstream_mask2
out: sstream_mask3
params:
flag_type:
- acjump_sd
- rain1mm_sd
- srs/bad_ringmap_broadband
- bad_calibration_gains
- bad_calibration_fpga_restart
- bad_calibration_acquisition_restart
- snow
- decorrelated_cylinder
# Flag periods of rainfall which could affect data
- type: ch_pipeline.analysis.flagging.FlagRainfall
in: sstream_mask3
out: sstream_mask4
params:
accumulation_time: 30.0
threshold: 1.0
# Load gain errors as a function of time
- type: ch_pipeline.core.io.LoadSetupFile
out: gain_err
Expand All @@ -116,7 +140,7 @@
# Apply a mask that removes frequencies and times that suffer from gain errors
- type: ch_pipeline.analysis.calibration.FlagNarrowbandGainError
requires: gain_err
in: sstream_mask3
in: sstream_mask4
out: mask_gain_err
params:
transition: 600.0
Expand All @@ -125,38 +149,13 @@
save: false
- type: draco.analysis.flagging.ApplyRFIMask
in: [sstream_mask3, mask_gain_err]
out: sstream_mask4
# Flag out low weight samples to remove transient RFI artifacts at the edges of
# flagged regions
- type: draco.analysis.flagging.ThresholdVisWeightBaseline
requires: manager
in: sstream_mask4
out: full_tvwb_mask
params:
relative_threshold: 0.5
ignore_absolute_threshold: -1
average_type: "mean"
pols_to_flag: "all"
# Apply the tvwb mask. This will modify the data inplace.
- type: draco.analysis.flagging.ApplyBaselineMask
in: [sstream_mask4, full_tvwb_mask]
in: [sstream_mask4, mask_gain_err]
out: sstream_mask5
- type: draco.analysis.flagging.RFIMask
in: sstream_mask5
out: rfi_mask
params:
stack_ind: 66
- type: draco.analysis.flagging.ApplyRFIMask
in: [sstream_mask5, rfi_mask]
out: sstream_mask6
# Calculate a median in RA over a specified RA window. This acts
# as an estimation of the cross-talk for this stack
- type: ch_pipeline.analysis.sidereal.SiderealMean
in: sstream_mask6
in: sstream_mask5
out: med
params:
mask_ra: [[{ra_range[0]:.2f}, {ra_range[1]:.2f}]]
Expand All @@ -165,18 +164,26 @@
inverse_variance: false
- type: ch_pipeline.analysis.sidereal.ChangeSiderealMean
in: [sstream_mask6, med]
out: sstream_mask7
in: [sstream_mask5, med]
out: sstream_mask6
# Update the stack with each sidereal stream. This is effectively
# a weighted average
- type: draco.analysis.sidereal.SiderealStacker
in: sstream_mask7
out: sstack_stack
in: sstream_mask6
out: sstack
params:
tag: {tag}
# Apply a gradient rebin correction
- type: draco.analysis.sidereal.RebinGradientCorrection
requires: sstack
in: sstack
out: sstack_grad_fix
# Precision truncate the sidereal stack data
- type: draco.core.io.Truncate
in: sstack_stack
in: sstack_grad_fix
out: sstack_trunc
params:
dataset:
Expand All @@ -193,13 +200,20 @@
# Save the sstack out to a zarr zip file
- type: draco.core.io.SaveZarrZip
in: sstack_trunc
out: zip_handle
out: sstack_zip_handle
params:
output_name: "sstack.zarr.zip"
# Block until the stack file is written out. Otherwise,
# masking steps can end up getting applied before saving.
- type: draco.core.misc.WaitUntil
requires: sstack_zip_handle
in: sstack_grad_fix
out: sstack2
- type: draco.analysis.ringmapmaker.RingMapMaker
requires: manager
in: sstack_trunc
in: sstack2
out: ringmap
params:
single_beam: true
Expand Down Expand Up @@ -229,25 +243,82 @@
# Save the ringmap out to a ZarrZip file
- type: draco.core.io.SaveZarrZip
in: ringmap_trunc
out: zip_handle
out: ringmap_zip_handle
params:
output_name: "ringmap.zarr.zip"
# Estimate the delay spectrum
- type: draco.analysis.delay.DelaySpectrumEstimator
# Mask out the bright sources so we can see the high delay structure more easily
- type: ch_pipeline.analysis.flagging.MaskSource
in: sstack2
out: sstack_flag_src
params:
source: ["CAS_A", "CYG_A", "TAU_A", "VIR_A"]
# Try and derive an optimal time-freq factorizable mask that covers the
# existing masked entries
- type: draco.analysis.flagging.MaskFreq
in: sstack_flag_src
out: factmask
params:
factorize: true
save: true
output_name: "fact_mask.h5"
# Apply the RFI mask. This will modify the data in place.
- type: draco.analysis.flagging.ApplyTimeFreqMask
in: [sstack_flag_src, factmask]
out: sstack_factmask
# Estimate the delay power spectrum
- type: draco.analysis.delay.DelayPowerSpectrumStokesIEstimator
requires: manager
in: sstack_stack
in: sstack_factmask
params:
freq_frac: 0.01
time_frac: 0.01
remove_mean: true
freq_zero: 800.0
nfreq: {nfreq_delay}
nsamp: 100
maxpost: true
maxpost_tol: 1.0e-4
complex_timedomain: true
save: true
output_name: "delayspectrum.h5"
# Apply delay filter to stream
- type: draco.analysis.delay.DelayFilter
requires: manager
in: sstack_factmask
out: sstack_dfilter
params:
delay_cut: 0.1
za_cut: 1.0
window: true
# Estimate the high-pass filtered delay power spectrum
- type: draco.analysis.delay.DelayPowerSpectrumStokesIEstimator
requires: manager
in: sstack_dfilter
params:
freq_frac: 0.01
time_frac: 0.01
remove_mean: true
freq_zero: 800.0
nfreq: {nfreq_delay}
nsamp: 40
nsamp: 100
maxpost: true
maxpost_tol: 1.0e-4
complex_timedomain: true
save: true
output_name: "dspec.h5"
output_name: "delayspectrum.h5"
# Wait for the Zipping to finish
- type: draco.core.io.WaitZarrZip
in: zip_handle
in: sstack_zip_handle
- type: draco.core.io.WaitZarrZip
in: ringmap_zip_handle
"""


Expand Down Expand Up @@ -282,15 +353,15 @@ class QuarterStackProcessing(base.ProcessingType):
default_params: ClassVar = {
# Daily processing revisions to use (later entries in this list take precedence
# over earlier ones)
"daily_revisions": ["rev_07"],
"daily_revisions": ["rev_08"],
# Usually the opinions are queried for each revision, this dictionary allows
# that to be overridden. Each `data_rev: opinion_rev` pair means that the
# opinions used to select days for `data_rev` will instead be taken from
# `opinion_rev`.
"opinion_overrides": {
"rev_03": "rev_02",
},
"daily_root": "/project/rpp-chime/chime/chime_processed/",
"daily_root": None,
# Frequencies to process
"freq": [0, 1024],
"nfreq_delay": 1025,
Expand Down Expand Up @@ -347,50 +418,64 @@ def _create_hook(self):
This tries to determine which days are good and bad, and partitions the
available good days into the individual stacks.
"""
# Request additional information from the user
daily_revs = input(
"Enter the daily revisions to include (<rev_ij>,<rev_ik>,...): "
)
if daily_revs:
daily_revs = re.compile(r"rev_[0-9]{2}").findall(daily_revs)
for d in daily_revs:
if d not in self.default_params["daily_revisions"]:
self.default_params["daily_revisions"].append(d)

days = {}

core.connect()

opinion_overrides = self.default_params.get("opinion_overrides", {})

# Go over each revision and construct the set of LSDs we should stack, and save
# the path to each.
# NOTE: later entries in `daily_revisions` will override LSDs found in earlier
# revisions.
# the path to each. Later entries in `daily_revisions` will override LSDs found
# in earlier revisions.
for rev in self.default_params["daily_revisions"]:
daily_path = (
self.root_path
if self.default_params["daily_root"] is None
else self.default_params["daily_root"]
)
daily_rev = daily.DailyProcessing(rev, root_path=daily_path)
try:
daily_rev = daily.DailyProcessing(rev, root_path=daily_path)
except Exception: # noqa: BLE001
# TODO: some sort of warning here
continue

# Get the revision used to determine the opinions, by default this is the
# revision, but it can be overriden
opinion_rev = opinion_overrides.get(rev, rev)

# Get all the bad days in this revision
revision = df.DataRevision.get(name=opinion_rev)
query = (
df.DataFlagOpinion.select(df.DataFlagOpinion.lsd)
.distinct()
.where(
df.DataFlagOpinion.revision == revision,
df.DataFlagOpinion.decision == "bad",
if opinion_rev is not None:
# Get all the bad days in this revision
revision = df.DataRevision.get(name=opinion_rev)
query = (
df.DataFlagOpinion.select(df.DataFlagOpinion.lsd)
.distinct()
.where(
df.DataFlagOpinion.revision == revision,
df.DataFlagOpinion.decision == "bad",
)
)
)
bad_days = [x[0] for x in query.tuples()]

# Get all the good days
query = (
df.DataFlagOpinion.select(df.DataFlagOpinion.lsd)
.distinct()
.where(
df.DataFlagOpinion.revision == revision,
df.DataFlagOpinion.decision == "good",
bad_days = [x[0] for x in query.tuples()]

# Get all the good days
query = (
df.DataFlagOpinion.select(df.DataFlagOpinion.lsd)
.distinct()
.where(
df.DataFlagOpinion.revision == revision,
df.DataFlagOpinion.decision == "good",
)
)
)
good_days = [x[0] for x in query.tuples()]
good_days = [x[0] for x in query.tuples()]

for d in daily_rev.ls():
try:
Expand All @@ -400,9 +485,12 @@ def _create_hook(self):
f'Could not parse string tag "{d}" into a valid LSD'
) from e

# Filter out known bad days here
if (lsd in bad_days) or (lsd not in good_days):
continue
# Filter out known bad days here. If `opinion_rev` is None,
# ignore opinions and automatically include all available days.
# This is only true if the opinion override is explicitly set
if opinion_rev is not None:
if (lsd in bad_days) or (lsd not in good_days):
continue

# Insert the day and path into the dict, this will replace the entries
# from prior revisions
Expand Down

0 comments on commit 8f8427f

Please sign in to comment.