Skip to content

Commit

Permalink
Add job progress tracking to dataset submit
Browse files Browse the repository at this point in the history
  • Loading branch information
bennybp committed Jan 16, 2025
1 parent 480ab4f commit 282cf1a
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions qcfractal/qcfractal/components/dataset_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import load_only, lazyload, joinedload, with_polymorphic
from sqlalchemy.orm.attributes import flag_modified
from qcfractal.db_socket.helpers import get_count

from qcfractal.components.dataset_db_models import (
BaseDatasetORM,
Expand Down Expand Up @@ -1206,6 +1207,7 @@ def submit(
owner_group: Optional[Union[int, str]],
find_existing: bool,
*,
job_progress: Optional[JobProgress] = None,
session: Optional[Session] = None,
) -> InsertCountsMetadata:
"""
Expand All @@ -1232,6 +1234,8 @@ def submit(
Group with additional permission for these records
find_existing
If True, search for existing records and return those. If False, always add new records
job_progress
Object used to track progress if this function is being run in a background job
session
An existing SQLAlchemy session to use. If None, one will be created. If an existing session
is used, it will be flushed (but not committed) before returning from this function.
Expand Down Expand Up @@ -1279,6 +1283,12 @@ def submit(
# Do all entries in batches using server-side cursors
stmt = select(self.entry_orm)
stmt = stmt.where(self.entry_orm.dataset_id == dataset_id)

# for progress tracking
if job_progress is not None:
total_records = len(ds_specs) * get_count(session, stmt)
records_done = 0

r = session.execute(stmt).scalars()

while entries_batch := r.fetchmany(500):
Expand Down Expand Up @@ -1310,8 +1320,18 @@ def submit(
n_inserted += batch_meta.n_inserted
n_existing += batch_meta.n_existing

if job_progress is not None:
job_progress.raise_if_cancelled()
records_done += len(entries_batch)
job_progress.update_progress(100 * (records_done * len(ds_specs)) / total_records)

else: # entry names were given

# for progress tracking
if job_progress is not None:
total_records = len(ds_specs) * len(entry_names)
records_done = 0

# For checking for missing entries
found_entries = []

Expand Down Expand Up @@ -1352,6 +1372,11 @@ def submit(
n_inserted += batch_meta.n_inserted
n_existing += batch_meta.n_existing

if job_progress is not None:
job_progress.raise_if_cancelled()
records_done += len(entries_names_batch)
job_progress.update_progress(100 * (records_done * len(ds_specs)) / total_records)

if entry_names is not None:
missing_entries = set(entry_names) - set(found_entries)
if missing_entries:
Expand Down

0 comments on commit 282cf1a

Please sign in to comment.