Skip to content

Commit

Permalink
Merge pull request #322 from arXiv/ARXIVCE-2416-break-into-smaller-qu…
Browse files Browse the repository at this point in the history
…eries

interactions with write database are chunked
  • Loading branch information
kyokukou authored Oct 1, 2024
2 parents a36062b + 20210bd commit 2da3be0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 25 deletions.
4 changes: 2 additions & 2 deletions gcp/cloud_functions/aggregate_hourly_downloads/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ to install
and
` pip install -r src/requirements-dev.txt `

# enviroment variables
# enviroment variables for running locally
```
export ENVIRONMENT=DEVELOPMENT LOG_LEVEL=INFO DOWNLOAD_TABLE=arxiv-development.arxiv_stats.papers_downloaded_by_ip_recently WRITE_TABLE='sqlite:///../tests/output_test.db'
export ENVIRONMENT=DEVELOPMENT LOG_LEVEL=INFO DOWNLOAD_TABLE=arxiv-development.arxiv_stats.papers_downloaded_by_ip_recently WRITE_TABLE='sqlite:///../tests/output_test.db' LOG_LOCALLY=1
```

Expand Down
57 changes: 34 additions & 23 deletions gcp/cloud_functions/aggregate_hourly_downloads/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, aliased

MAX_QUERY_TO_WRITE=1000 #the latexmldb we write to has a stack size limit

#logging setup
if not(os.environ.get('LOG_LOCALLY')):
import google.cloud.logging
Expand Down Expand Up @@ -304,28 +306,34 @@ class HourlyDownloadData(Base):
(item.country, item.download_type, item.category, item.time)
for item in aggregated_data.keys()
]
existing_records = session.query(HourlyDownloadData).filter(
tuple_(
HourlyDownloadData.country,
HourlyDownloadData.download_type,
HourlyDownloadData.category,
HourlyDownloadData.start_dttm
).in_(keys_to_check)
).all()

#update existing records
keys_to_update=[
DownloadKey(
time=record.start_dttm,
country=record.country,
download_type=record.download_type,
archive=record.archive,
category_id=record.category
)
for record in existing_records
]
all_keys_to_update=[]
for i in range(0, len(keys_to_check), MAX_QUERY_TO_WRITE):
#query a section of data
existing_records = session.query(HourlyDownloadData).filter(
tuple_(
HourlyDownloadData.country,
HourlyDownloadData.download_type,
HourlyDownloadData.category,
HourlyDownloadData.start_dttm
).in_(keys_to_check[i:i+MAX_QUERY_TO_WRITE])
).all()

#record found records
keys_to_update=[
DownloadKey(
time=record.start_dttm,
country=record.country,
download_type=record.download_type,
archive=record.archive,
category_id=record.category
)
for record in existing_records
]
all_keys_to_update += keys_to_update

#create data to be updated vs inserted
update_data=[]
for key in keys_to_update:
for key in all_keys_to_update:
counts=aggregated_data[key]
entry={
'country': key.country,
Expand Down Expand Up @@ -354,8 +362,11 @@ class HourlyDownloadData(Base):
]

#add data
session.bulk_save_objects(data_to_insert)
session.bulk_update_mappings(HourlyDownloadData, update_data)
for i in range(0, len(data_to_insert), MAX_QUERY_TO_WRITE):
session.bulk_save_objects(data_to_insert[i:i+MAX_QUERY_TO_WRITE])
#update existing data
for i in range(0, len(update_data), MAX_QUERY_TO_WRITE):
session.bulk_update_mappings(HourlyDownloadData, update_data[i:i+MAX_QUERY_TO_WRITE])
session.commit()
session.close()
logging.info(f"added {len(data_to_insert)} rows, updated {len(update_data)} rows")
Expand Down

0 comments on commit 2da3be0

Please sign in to comment.