Skip to content

Commit

Permalink
Merge pull request #205 from desihub/more-cleanup
Browse files Browse the repository at this point in the history
fix miscellaneous remaining bugs
  • Loading branch information
moustakas authored Jan 5, 2025
2 parents 400245c + a499ec4 commit 17851bb
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 59 deletions.
101 changes: 75 additions & 26 deletions bin/mpi-fastspecfit
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,23 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=F
# Split the MPI.COMM_WORLD communicator into subcommunicators (of size
# args.mp) so we can MPI-parallelize over objects.
allranks = np.arange(comm.size)
colors = np.arange(comm.size) // args.mp
if args.purempi:
colors = np.arange(comm.size) // args.mp
color = rank // args.mp
else:
colors = np.arange(comm.size)
color = rank

color = rank // args.mp
subcomm = comm.Split(color=color, key=rank)

if rank == 0:
subranks0 = allranks[::args.mp] # rank=0 in each subcomm
log.info(f'Rank {rank}: dividing filelist into {len(subranks0):,d} sub-communicator(s) ' + \
f'(size={comm.size:,d}, mp={args.mp}).')
if args.purempi:
subranks0 = allranks[::args.mp] # rank=0 in each subcomm
log.info(f'Rank {rank}: dividing filelist into {len(subranks0):,d} sub-communicator(s) ' + \
f'(size={comm.size:,d}, mp={args.mp}).')
else:
subranks0 = allranks
log.info(f'Rank {rank}: dividing filelist across {comm.size:,d} ranks.')
else:
subranks0 = None

Expand Down Expand Up @@ -169,7 +177,7 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=F
if subcomm.rank == 0:
subranks = allranks[np.isin(colors, color)]
# process from smallest to largest
srt = np.argsort(ntargets)
srt = np.argsort(ntargets)#[::-1]
redrockfiles = redrockfiles[srt]
outfiles = outfiles[srt]
ntargets = ntargets[srt]
Expand All @@ -182,12 +190,22 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=F
redrockfiles = subcomm.recv(source=0, tag=4)
outfiles = subcomm.recv(source=0, tag=5)
ntargets = subcomm.recv(source=0, tag=6)
else:
redrockfiles = all_redrockfiles
outfiles = all_outfiles
ntargets = all_ntargets
#print(f'Rank={comm.rank}, subrank={subcomm.rank}, redrockfiles={redrockfiles}, ntargets={ntargets}')

for redrockfile, outfile, ntarget in zip(redrockfiles, outfiles, ntargets):
if subcomm.rank == 0:
log.debug(f'Rank {rank} (subrank {subcomm.rank}) started ' + \
f'at {time.asctime()}')
if subcomm:
if subcomm.rank == 0:
if args.purempi:
log.debug(f'Rank {rank} (subrank {subcomm.rank}) started ' + \
f'at {time.asctime()}')
else:
log.debug(f'Rank {rank} started at {time.asctime()}')
elif rank == 0:
log.debug(f'Rank {rank} started at {time.asctime()}')

if args.makeqa:
from fastspecfit.qa import fastqa as fast
Expand All @@ -200,41 +218,68 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=F
cmd, cmdargs, logfile = build_cmdargs(args, redrockfile, outfile, sample=sample,
fastphot=fastphot, input_redshifts=input_redshifts)

if subcomm.rank == 0:
log.info(f'Rank {rank} (nsubrank={subcomm.size}): ' + \
f'ntargets={ntarget}: {cmd} {cmdargs}')
if subcomm:
if subcomm.rank == 0:
if args.purempi:
log.info(f'Rank {rank} (nsubrank={subcomm.size}): ' + \
f'ntargets={ntarget}: {cmd} {cmdargs}')
else:
log.info(f'Rank {rank} ntargets={ntarget}: {cmd} {cmdargs}')
elif rank == 0:
log.info(f'Rank {rank}: ntargets={ntarget}: {cmd} {cmdargs}')

if args.dry_run:
continue

try:
if subcomm.rank == 0:
if subcomm:
if subcomm.rank == 0:
t1 = time.time()
outdir = os.path.dirname(logfile)
if not os.path.isdir(outdir):
os.makedirs(outdir, exist_ok=True)
elif rank == 0:
t1 = time.time()
outdir = os.path.dirname(logfile)
if not os.path.isdir(outdir):
os.makedirs(outdir, exist_ok=True)

if args.nolog:
err = fast(args=cmdargs.split(), comm=subcomm)
if args.purempi:
err = fast(args=cmdargs.split(), comm=subcomm)
else:
err = fast(args=cmdargs.split(), comm=None)
else:
with stdouterr_redirected(to=logfile, overwrite=args.overwrite, comm=subcomm):
#log.info(f'rank={comm.rank} subrank={subcomm.rank}')
err = fast(args=cmdargs.split(), comm=subcomm)

if subcomm.rank == 0:
dt1 = time.time() - t1
log.info(f' rank {rank} done in {dt1:.2f} sec')
if args.purempi:
err = fast(args=cmdargs.split(), comm=subcomm)
else:
err = fast(args=cmdargs.split(), comm=None)

if subcomm:
if subcomm.rank == 0:
log.info(f'Rank {rank} done in {time.time() - t1:.2f} sec')
if err != 0:
if not os.path.exists(outfile):
log.warning(f'Rank {rank} missing {outfile}')
raise IOError
elif rank == 0:
log.info(f'Rank {rank} done in {time.time() - t1:.2f} sec')
if err != 0:
if not os.path.exists(outfile):
log.warning(f' rank {rank} missing {outfile}')
log.warning(f'Rank {rank} missing {outfile}')
raise IOError

except:
log.warning(f' rank {rank} raised an exception')
log.warning(f'Rank {rank} raised an exception')
import traceback
traceback.print_exc()

if subcomm.rank == 0:
log.debug(f' rank {rank} is done')
if subcomm:
if subcomm.rank == 0:
log.debug(f'Rank {rank} is done')
elif rank == 0:
log.debug(f'Rank {rank} is done')

if comm:
comm.barrier()
Expand Down Expand Up @@ -270,7 +315,7 @@ def main():
parser.add_argument('--samplefile', default=None, type=str, help='Full path to sample (FITS) file with {survey,program,healpix,targetid}.')
parser.add_argument('--input-redshifts', action='store_true', help='Only used with --samplefile; if set, fit with redshift "Z" values.')
parser.add_argument('--input-seeds', type=str, default=None, help='Comma-separated list of input random-number seeds corresponding to the (required) --targetids input.')
parser.add_argument('--nmonte', type=int, default=50, help='Number of Monte Carlo realizations.')
parser.add_argument('--nmonte', type=int, default=10, help='Number of Monte Carlo realizations.')
parser.add_argument('--seed', type=int, default=1, help='Random seed for Monte Carlo reproducibility; ignored if --input-seeds is passed.')

parser.add_argument('--mp', type=int, default=1, help='Number of multiprocessing processes per MPI rank or node.')
Expand Down Expand Up @@ -303,6 +348,7 @@ def main():
parser.add_argument('--plan', action='store_true', help='Plan how many nodes to use and how to distribute the targets.')
parser.add_argument('--profile', action='store_true', help='Write out profiling / timing files..')
parser.add_argument('--nompi', action='store_true', help='Do not use MPI parallelism.')
parser.add_argument('--purempi', action='store_true', help='Use only MPI parallelism; no multiprocessing.')
parser.add_argument('--nolog', action='store_true', help='Do not write to the log file.')
parser.add_argument('--dry-run', action='store_true', help='Generate but do not run commands.')

Expand All @@ -318,13 +364,16 @@ def main():
else:
try:
from mpi4py import MPI
# needed when profiling; no effect otherwise
# https://docs.linaroforge.com/24.0.6/html/forge/map/python_profiling/profile_python_script.html
#MPI.Init_thread(MPI.THREAD_SINGLE)
comm = MPI.COMM_WORLD
except ImportError:
comm = None

if comm:
rank = comm.rank
if comm.size > 1 and args.mp > 1 and comm.size < args.mp:
if rank == 0 and args.purempi and comm.size > 1 and args.mp > 1 and comm.size < args.mp:
log.warning(f'Number of MPI tasks {comm.size} should be >{args.mp} for MPI parallelism.')
else:
rank = 0
Expand Down
2 changes: 2 additions & 0 deletions doc/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Change Log
3.1.1 (not released yet)
------------------------

* Miscellaneous bug fixes [`PR #205`_].
* Pure-MPI implementation; new Podman container; bug fixes [`PR #203`_].
* Updated algorithm for updating QSO redshifts [`PR #201`_].
* Progress toward pure-MPI production code [`PR #200`_].
Expand All @@ -16,6 +17,7 @@ Change Log
.. _`PR #200`: https://github.com/desihub/fastspecfit/pull/200
.. _`PR #201`: https://github.com/desihub/fastspecfit/pull/201
.. _`PR #203`: https://github.com/desihub/fastspecfit/pull/203
.. _`PR #205`: https://github.com/desihub/fastspecfit/pull/205

3.1.0 (2024-11-21)
------------------
Expand Down
31 changes: 14 additions & 17 deletions podman/Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ RUN wget --no-check-certificate -nv https://www.mpich.org/static/downloads/$mpic

RUN /sbin/ldconfig

## Try to prevent MKL from throttling AMD
## https://gitlab.com/NERSC/python-benchmark/-/tree/main/amd
#COPY fakeintel.c /src/fakeintel.c
#RUN gcc -shared -fPIC -o /usr/local/lib/libfakeintel.so /src/fakeintel.c
#ENV LD_PRELOAD=/usr/local/lib/libfakeintel.so
# Try to prevent MKL from throttling AMD
# https://gitlab.com/NERSC/python-benchmark/-/tree/main/amd
COPY fakeintel.c /src/fakeintel.c
RUN gcc -shared -fPIC -o /usr/local/lib/libfakeintel.so /src/fakeintel.c
ENV LD_PRELOAD=/usr/local/lib/libfakeintel.so

# Install Miniconda
ENV CONDA_DIR=/opt/miniconda
Expand Down Expand Up @@ -73,6 +73,10 @@ RUN conda install -y -c conda-forge \
pytest \
"numpy<2.0" \
scipy \
libblas=*=*mkl \
mkl \
# mkl_fft \
# intel-cmplr-lib-rt \
astropy \
healpy \
fitsio \
Expand All @@ -81,13 +85,7 @@ RUN conda install -y -c conda-forge \
matplotlib \
ipython \
ipykernel \
h5py

RUN conda install -y -c conda-forge \
libblas=*=*mkl \
# also tried mkl_fft<1.3.9
# mkl_fft \
intel-cmplr-lib-rt \
h5py \
&& conda clean --all -y

# Need to install mpi4py from source to link it properly to MPICH.
Expand All @@ -100,7 +98,6 @@ ENV DESITARGET_VERSION=2.8.0
ENV DESISPEC_VERSION=0.68.1
ENV SPECLITE_VERSION=v0.20
ENV FASTSPECFIT_VERSION=main
#ENV FASTSPECFIT_VERSION=3.1.1

RUN pip install git+https://github.com/desihub/desiutil.git@${DESIUTIL_VERSION}#egg=desiutil
RUN pip install git+https://github.com/desihub/desimodel.git@${DESIMODEL_VERSION}#egg=desimodel
Expand All @@ -109,10 +106,10 @@ RUN pip install git+https://github.com/desihub/desispec.git@${DESISPEC_VERSION}#
RUN pip install git+https://github.com/desihub/speclite.git@${SPECLITE_VERSION}#egg=speclite
RUN pip install git+https://github.com/desihub/fastspecfit.git@${FASTSPECFIT_VERSION}#egg=fastspecfit

ENV DESI_SPECTRO_REDUX=/dvs_ro/cfs/cdirs/desi/spectro/redux
ENV DUST_DIR=/dvs_ro/cfs/cdirs/cosmo/data/dust/v0_1
ENV FPHOTO_DIR=/dvs_ro/cfs/cdirs/desi/external/legacysurvey/dr9
ENV FTEMPLATES_DIR=/dvs_ro/cfs/cdirs/desi/public/external/templates/fastspecfit
ENV DESI_SPECTRO_REDUX=/global/cfs/cdirs/desi/spectro/redux
ENV DUST_DIR=/global/cfs/cdirs/cosmo/data/dust/v0_1
ENV FPHOTO_DIR=/global/cfs/cdirs/desi/external/legacysurvey/dr9
ENV FTEMPLATES_DIR=/global/cfs/cdirs/desi/public/external/templates/fastspecfit

# Some environment variables to ensure good performance with MKL.
# https://www.diracprogram.org/doc/master/installation/mkl.html
Expand Down
2 changes: 1 addition & 1 deletion podman/Containerfile-mkl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ RUN pip install git+https://github.com/desihub/desimodel.git@${DESIMODEL_VERSION
RUN pip install git+https://github.com/desihub/desitarget.git@${DESITARGET_VERSION}#egg=desitarget
RUN pip install git+https://github.com/desihub/desispec.git@${DESISPEC_VERSION}#egg=desispec
RUN pip install git+https://github.com/desihub/speclite.git@${SPECLITE_VERSION}#egg=speclite
RUN pip install git+https://github.com/desihub/fastspecfit.git@${FASTSPECFIT_VERSION}#egg=fastspecfit
RUN pip install git+https://github.com/desihub/fastspecfit.git@${FASTSPECFIT_VERSION}#egg=fastspecfit

ENV DESI_SPECTRO_REDUX=/dvs_ro/cfs/cdirs/desi/spectro/redux
ENV DUST_DIR=/dvs_ro/cfs/cdirs/cosmo/data/dust/v0_1
Expand Down
6 changes: 3 additions & 3 deletions py/fastspecfit/continuum.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ def can_compute_vdisp(redshift, specwave, min_restrange=(3800., 4800.), fit_rest


def continuum_fastphot(redshift, objflam, objflamivar, CTools, uniqueid=0,
nmonte=50, rng=None, debug_plots=False):
nmonte=10, rng=None, debug_plots=False):
"""Model the broadband photometry.
"""
Expand Down Expand Up @@ -1207,7 +1207,7 @@ def _continuum_nominal_vdisp(CTools, templates, specflux, specwave,
return tauv, vdisp, coeff, contmodel, chi2


def continuum_fastspec(redshift, objflam, objflamivar, CTools, nmonte=50,
def continuum_fastspec(redshift, objflam, objflamivar, CTools, nmonte=10,
rng=None, uniqueid=0, no_smooth_continuum=False,
debug_plots=False):
"""Jointly model the spectroscopy and broadband photometry.
Expand Down Expand Up @@ -1542,7 +1542,7 @@ def do_fit_full(objflam, specflux):


def continuum_specfit(data, fastfit, specphot, templates, igm, phot,
nmonte=50, seed=1, constrain_age=False,
nmonte=10, seed=1, constrain_age=False,
no_smooth_continuum=False, fitstack=False,
fastphot=False, debug_plots=False):
"""Fit the non-negative stellar continuum of a single spectrum.
Expand Down
4 changes: 2 additions & 2 deletions py/fastspecfit/fastspecfit.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def parse(options=None, rank=0):
parser.add_argument('--input-redshifts', type=str, default=None, help='Comma-separated list of input redshifts corresponding to the (required) --targetids input.')
parser.add_argument('--input-seeds', type=str, default=None, help='Comma-separated list of input random-number seeds corresponding to the (required) --targetids input.')
parser.add_argument('--seed', type=int, default=1, help='Random seed for Monte Carlo reproducibility; ignored if --input-seeds is passed.')
parser.add_argument('--nmonte', type=int, default=50, help='Number of Monte Carlo realizations.')
parser.add_argument('--nmonte', type=int, default=10, help='Number of Monte Carlo realizations.')
parser.add_argument('--zmin', type=float, default=None, help='Override the default minimum redshift required for modeling.')
parser.add_argument('--no-broadlinefit', default=True, action='store_false', dest='broadlinefit',
help='Do not model broad Balmer and helium line-emission.')
Expand Down Expand Up @@ -70,7 +70,7 @@ def parse(options=None, rank=0):
def fastspec_one(iobj, data, meta, fastfit_dtype, specphot_dtype, broadlinefit=True,
fastphot=False, fitstack=False, constrain_age=False,
no_smooth_continuum=False, debug_plots=False, uncertainty_floor=0.01,
minsnr_balmer_broad=2.5, nmonte=50, seed=1):
minsnr_balmer_broad=2.5, nmonte=10, seed=1):
"""Run :func:`fastspec` on a single object.
"""
Expand Down
9 changes: 6 additions & 3 deletions py/fastspecfit/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,10 @@ def update_qso_redshifts(zb, meta, qnfile, mgiifile, fitindx, specprod):
desi_target, bgs_target, mws_target, scnd_target = surv_target
desi_mask, bgs_mask, mws_mask, scnd_mask = surv_mask
IQSO = meta[desi_target] & desi_mask['QSO'] != 0
IWISE_VAR_QSO = meta[scnd_target] & scnd_mask['WISE_VAR_QSO'] != 0
if 'WISE_VAR_QSO' in scnd_mask.names():
IWISE_VAR_QSO = meta[scnd_target] & scnd_mask['WISE_VAR_QSO'] != 0
else:
IWISE_VAR_QSO = np.zeros(len(meta), bool)
if np.sum(IQSO) > 0 or np.sum(IWISE_VAR_QSO) > 0:
qn = Table(fitsio.read(qnfile, 'QN_RR', rows=fitindx, columns=QNCOLS))
assert(np.all(qn['TARGETID'] == meta['TARGETID']))
Expand Down Expand Up @@ -1081,7 +1084,7 @@ def read(self, photometry, fastphot=False, constrain_age=False):

if hasattr(photometry, 'fiber_filters'):
for iband, filt in enumerate(photometry.fiber_filters[onephotsys].names):
mw_transmission_fiberflux[I, iband] = mwdust_transmission(ebv, filt)
mw_transmission_fiberflux[I, iband] = mwdust_transmission(ebv[I], filt)
else:
mw_transmission_fiberflux = None

Expand Down Expand Up @@ -1552,7 +1555,7 @@ def read_fastspecfit(fastfitfile, rows=None, metadata_columns=None, specphot_col
def write_fastspecfit(meta, specphot, fastfit, modelspectra=None, outfile=None,
specprod=None, coadd_type=None, fphotofile=None,
template_file=None, emlinesfile=None, fastphot=False,
inputz=False, inputseeds=None, nmonte=50, seed=1,
inputz=False, inputseeds=None, nmonte=10, seed=1,
uncertainty_floor=0.01, minsnr_balmer_broad=2.5,
no_smooth_continuum=False, ignore_photometry=False,
broadlinefit=True, use_quasarnet=True, constrain_age=False,
Expand Down
15 changes: 13 additions & 2 deletions py/fastspecfit/linemasker.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def _get_contpix(zlinewaves, sigmas, nsigma_factor=1., linemask=None, zlyawave=[
pix.update({'patch_contpix': {}, 'dropped': [], 'merged': [], 'merged_from': []})
patchids = list(patchMap.keys())
npatch = len(patchids)
patchmaplines = np.hstack([patchMap[key][0] for key in patchMap.keys()])

FACTOR_DEFAULT = 2.
FACTORS = [2., 3., 4.]
Expand All @@ -95,8 +96,18 @@ def _get_contpix(zlinewaves, sigmas, nsigma_factor=1., linemask=None, zlyawave=[
continue
I = _get_linepix(zlinewave, sigma)
if len(I) > 0:
linemask[I] = True
pix['linepix'][linename] = I
if patchMap is None:
linemask[I] = True
pix['linepix'][linename] = I
else:
# Extreme corner case: fully masked r-band camera results
# in hgamma "surviving" with a single pixel with ivar!=0,
# which raises an exception below because the line is
# dropped from patchMap. Example:
# loa/main/bright/9512/39632986815075191
if linename in patchmaplines:
linemask[I] = True
pix['linepix'][linename] = I

# skip contpix
if not get_contpix:
Expand Down
1 change: 0 additions & 1 deletion py/fastspecfit/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ def plan(comm=None, specprod=None, specprod_dir=None, coadd_type='healpix',
ntargets.extend(comm.recv(source=onerank))
if len(ntargets) > 0:
ntargets = np.hstack(ntargets)
comm.barrier()
else:
if mp > 1:
with multiprocessing.Pool(mp) as P:
Expand Down
Loading

0 comments on commit 17851bb

Please sign in to comment.