Skip to content

Commit

Permalink
Merge pull request #55 from padraic-shafer/event-page
Browse files Browse the repository at this point in the history
Use bulk write operation to insert Event/Datum Page into database
  • Loading branch information
tacaswell authored Jan 22, 2025
2 parents 14333df + fe61e2d commit e1493d8
Showing 1 changed file with 62 additions and 22 deletions.
84 changes: 62 additions & 22 deletions suitcase/mongo_normalized/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import functools

import event_model
import pymongo
from ._version import get_versions
Expand Down Expand Up @@ -156,10 +158,7 @@ def _insert(self, name, doc):
) from err
else:
doc.pop("_id")
if name == "datum":
id_name = "datum_id"
else:
id_name = "uid"
id_name = unique_id_name(name)
existing = self._collections[name].find_one(
{id_name: doc[id_name]}, {"_id": False}
)
Expand All @@ -171,6 +170,32 @@ def _insert(self, name, doc):
f"Existing document:\n{existing}\nNew document:\n{doc}"
) from err

def _insert_many(self, name, docs):
"""Bulk write the documents for fewer network requests."""
try:
self._collections[name].insert_many(docs)
except pymongo.errors.BulkWriteError as err:
DUPLICATE_KEY_ERROR = 11000
duplicate_docs = (
error["op"] for error in err.details["writeErrors"]
if error["code"] == DUPLICATE_KEY_ERROR
)
id_name = unique_id_name(name)
duplicate_ids = set(
doc[id_name] for doc in duplicate_docs
)
if duplicate_ids:
error_message = "\n".join((
"A document with the same unique id as this one "
f"already exists in the database. Document:\n{doc}"
for doc in docs
if doc[id_name] in sorted(duplicate_ids)
))
raise DuplicateUniqueID(error_message) from err
else:
# For example, a "write concern" error
raise err

def update(self, name, doc):
"""
Update documents. Currently only 'start' documents are supported.
Expand Down Expand Up @@ -259,29 +284,33 @@ def event(self, doc):
self._insert("event", doc)

def event_page(self, doc):
# Unpack an EventPage into Events and do the actual insert inside
# the `event` method. (This is the oppose what DocumentRouter does by
# default.)

event_method = self.event # Avoid attribute lookup in hot loop.
filled_events = []

for event_doc in event_model.unpack_event_page(doc):
filled_events.append(event_method(event_doc))
"""Unpack an EventPage into Events and do a bulk insert."""
event_docs = tuple(
event_doc for event_doc in event_model.unpack_event_page(doc)
)
try:
self._insert_many("event", event_docs)
except DuplicateUniqueID:
# Bulk writing the Events failed; retry inserting each Event
event_method = self.event # Avoid attribute lookup in hot loop.
for event_doc in event_docs:
event_method(event_doc)

def datum(self, doc):
self._insert("datum", doc)

def datum_page(self, doc):
# Unpack an DatumPage into Datum and do the actual insert inside
# the `datum` method. (This is the oppose what DocumentRouter does by
# default.)

datum_method = self.datum # Avoid attribute lookup in hot loop.
filled_datums = []

for datum_doc in event_model.unpack_datum_page(doc):
filled_datums.append(datum_method(datum_doc))
"""Unpack a DatumPage into 'Datum's and do a bulk insert."""
datum_docs = tuple(
datum_doc for datum_doc in event_model.unpack_datum_page(doc)
)
try:
self._insert_many("datum", datum_docs)
except DuplicateUniqueID:
# Bulk writing the 'Datum's failed; retry inserting each Datum
datum_method = self.datum # Avoid attribute lookup in hot loop.
for datum_doc in datum_docs:
datum_method(datum_doc)

def stop(self, doc):
self._insert("stop", doc)
Expand All @@ -295,6 +324,17 @@ def __repr__(self):
)


@functools.lru_cache(maxsize=16)
def unique_id_name(name: str) -> str:
"""Return the name of the unique id field for a named document type."""
if name == "datum":
id_name = "datum_id"
else:
id_name = "uid"

return id_name


def _get_database(uri, tls):
if not pymongo.uri_parser.parse_uri(uri)["database"]:
raise ValueError(
Expand Down

0 comments on commit e1493d8

Please sign in to comment.