Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add a new disk ensurer that is easier to call and use it for GAM merging #783

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
55 changes: 55 additions & 0 deletions src/toil_vg/vg_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
from distutils.spawn import find_executable
import collections
import inspect
import socket
import uuid
import platform
Expand Down Expand Up @@ -914,6 +915,60 @@ def title_to_filename(kind, i, title, extension):
return ''.join(part_list)


def ensure_disk_bytes(job, job_function, required_disk_bytes):
"""
Make sure the give running job is running with at least the given number of
disk bytes.

If so, returns None.

If not, queues up the job_function again with the same arguments and the
given disk limit, and returns a promise for the value the caller should
return.

Should be called directly by a Toil job function and not through an
intermediate.

Will cause problems if anyone has already added child jobs to the job being
re-queued that expect to work with its return value.
"""

if job.disk < required_disk_bytes:
# We need more space

# Grab our caller's stack frame.
job_frame = inspect.stack()[1][0]
try:
# Grab the arg names and the dict to look them up in
(job_run_args, job_run_varargs, job_run_kwargs, job_locals) = inspect.getargvalues(job_frame)
# Drop the job argument itself and look them all up in job_locals.
# job_run_args[0]'s value should always be the job itself, and we don't need to pass that along.
job_rerun_args = [job_locals[name] for name in job_run_args[1:]]
if job_run_varargs is not None:
# Append all the varargs arguments to the call
job_run_args += job_locals[job_run_varargs]

# Get the base kwargs that came into the job, or {} if there were none
job_rerun_kwargs = {} if job_run_kwargs is None else dict(job_locals[job_run_kwargs])
# Add the cores/memory/disk that Toil will pull out
job_rerun_kwargs.update({"cores": job.cores, "memory": job.memory, "disk": required_disk_bytes})

RealtimeLogger.info("Re-queueing {} because we only have {}/{} estimated necessary disk space.".format(
job_function.__name__, job.disk, required_disk_bytes))
# Queue the job again as a child with more disk.
promise = job.addChildJobFn(job_function, *job_rerun_args, **job_rerun_kwargs).rv()

return promise
finally:
del job_frame

else:
# The job has sufficient disk
RealtimeLogger.info("Job {} has {}/{} estimated necessary disk space.".format(
job_function.__name__, job.disk, required_disk_bytes))

return None

def ensure_disk(job, job_fn, job_fn_args, job_fn_kwargs, file_id_list, factor=8, padding=1024 ** 3):
"""
Ensure that the currently running job has enough disk to load all the given
Expand Down
46 changes: 40 additions & 6 deletions src/toil_vg/vg_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from toil.realtimeLogger import RealtimeLogger
from toil_vg.vg_common import *
from toil_vg.context import Context, run_write_info_to_outstore
from toil_vg.vg_surject import *
from toil_vg.vg_surject import run_merge_bams, run_whole_surject

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -271,7 +271,13 @@ def run_split_reads(job, context, fastq, gam_input_reads, bam_input_reads, reads


def run_split_fastq(job, context, fastq, fastq_i, sample_fastq_id):


disk_required = job.fileStore.getGlobalFileSize(sample_fastq_id) * 2 + (2 * 1024**3)
requeued = ensure_disk_bytes(job, run_split_fastq, disk_required)
if requeued is not None:
# If not, requeue the job with more disk.
return requeued

RealtimeLogger.info("Starting fastq split")
start_time = timeit.default_timer()

Expand Down Expand Up @@ -323,6 +329,13 @@ def run_split_fastq(job, context, fastq, fastq_i, sample_fastq_id):
def run_split_gam_reads(job, context, gam_input_reads, gam_reads_file_id):
""" split up an input reads file in GAM format
"""

disk_required = job.fileStore.getGlobalFileSize(gam_reads_file_id) * 2 + (2 * 1024**3)
requeued = ensure_disk_bytes(job, run_split_gam_reads, disk_required)
if requeued is not None:
# If not, requeue the job with more disk.
return requeued

RealtimeLogger.info("Starting gam split")
start_time = timeit.default_timer()

Expand Down Expand Up @@ -359,6 +372,13 @@ def run_split_gam_reads(job, context, gam_input_reads, gam_reads_file_id):
def run_split_bam_reads(job, context, bam_input_reads, bam_reads_file_id):
""" split up an input reads file in BAM format
"""

disk_required = job.fileStore.getGlobalFileSize(bam_reads_file_id) * 2 + (2 * 1024**3)
requeued = ensure_disk_bytes(job, run_split_bam_reads, disk_required)
if requeued is not None:
# If not, requeue the job with more disk.
return requeued

RealtimeLogger.info("Starting bam split")
start_time = timeit.default_timer()

Expand Down Expand Up @@ -484,7 +504,8 @@ def run_chunk_alignment(job, context, gam_input_reads, bam_input_reads, sample_n
Takes a dict from index type to index file ID. Some indexes are extra and
specifying them will change mapping behavior.
"""


# TODO: Work out what indexes we will want and ensure_disk_bytes

RealtimeLogger.info("Starting {} alignment on {} chunk {}".format(mapper, sample_name, chunk_id))

Expand Down Expand Up @@ -758,12 +779,24 @@ def run_merge_gams(job, context, sample_name, id_ranges_file_id, gam_chunk_file_
total_running_time += float(running_time)

return chr_gam_ids, total_running_time

def run_merge_chrom_gam(job, context, sample_name, chr_name, chunk_file_ids):
"""
Make a chromosome gam by merging up a bunch of gam ids, one
for each shard.
Make a chromosome gam by merging up a list of gam ids, one
for each shard.
"""

RealtimeLogger.info('For chrom {}, merge files: {}'.format(chr_name, chunk_file_ids))
RealtimeLogger.info('Args: {} {} {} {} {}'.format(job, context, sample_name, chr_name, chunk_file_ids))

# Check disk requirements to make sure we have enough room for 2 copies of everything, plus a bit.
# Get file size in a way that is robust to strs
disk_required = sum((job.fileStore.getGlobalFileSize(gam) for gam in chunk_file_ids)) * 2 + (2 * 1024**3)
requeued = ensure_disk_bytes(job, run_merge_chrom_gam, disk_required)
if requeued is not None:
# If not, requeue the job with more disk.
return requeued

# Define work directory for docker calls
work_dir = job.fileStore.getLocalTempDir()

Expand All @@ -776,6 +809,7 @@ def run_merge_chrom_gam(job, context, sample_name, chr_name, chunk_file_ids):
with open(output_file, 'a') as merge_file:
for chunk_gam_id in chunk_file_ids:
tmp_gam_file = os.path.join(work_dir, 'tmp_{}.gam'.format(uuid4()))
RealtimeLogger.info('Download file ID {} to {}'.format(chunk_gam_id, tmp_gam_file))
job.fileStore.readGlobalFile(chunk_gam_id, tmp_gam_file)
with open(tmp_gam_file) as tmp_f:
shutil.copyfileobj(tmp_f, merge_file)
Expand Down