Skip to content

Commit

Permalink
MVCCAdapter.load: Raise ReadConflictError only if pack is running sim…
Browse files Browse the repository at this point in the history
…ultaneously

Currently when load(oid) finds that the object was deleted, it raises
ReadConflictError - not POSKeyError - because a pack could be running
simultaneously and the deletion could result from the pack. In that case
we want corresponding transaction to be retried - not failed - via
raising ConflictError subclass for backward-compatibility reason.
However from semantic point of view, it is more logically correct to
raise POSKeyError, when an object is found to be deleted or
not-yet-created, and raise ReadConflictError only if a pack was actually
running simultaneously, and the deletion could result from that pack.

-> Fix MVCCAdapter.load to do this - now it raises ReadConflictError
only if MVCCAdapterInstance view appears before storage packtime, which
indicates that there could indeed be conflict in between read access and
pack removing the object.

To detect if pack was running and beyond MVCCAdapterInstance view, we
need to teach storage drivers to provide way to known what was the last
pack time/transaction. Add optional IStorageLastPack interface with
.lastPack() method to do so. If a storage does not implement lastPack,
we take conservative approach and raise ReadConflictError
unconditionally as before.

Add/adapt corresponding tests.

Teach FileStorage, MappingStorage and DemoStorage to implement the new
interface.

NOTE: storages that implement IMVCCStorage natively already raise
POSKeyError - not ReadConflictError - when load(oid) finds deleted
object. This is so because IMVCCStorages natively provide isolation, via
e.g. RDBMS in case of RelStorage. The isolation property provided by
RDBMS guarantees that connection view of the database is not affected by
other changes - e.g. pack - until connection's transaction is complete.

/cc @jimfulton
  • Loading branch information
navytux committed Jul 26, 2020
1 parent 0d499a3 commit e21642f
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 18 deletions.
6 changes: 6 additions & 0 deletions src/ZODB/DemoStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,20 @@ def __init__(self, name=None, base=None, changes=None,
self._temporary_changes = True
changes = ZODB.MappingStorage.MappingStorage()
zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage)
zope.interface.alsoProvides(self, ZODB.interfaces.IStorageLastPack)
if close_changes_on_close is None:
close_changes_on_close = False
else:
if ZODB.interfaces.IBlobStorage.providedBy(changes):
zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage)
if ZODB.interfaces.IStorageLastPack.providedBy(changes):
zope.interface.alsoProvides(self, ZODB.interfaces.IStorageLastPack)
if close_changes_on_close is None:
close_changes_on_close = True

if ZODB.interfaces.IStorageLastPack.providedBy(self):
self.lastPack = changes.lastPack

self.changes = changes
self.close_changes_on_close = close_changes_on_close

Expand Down
15 changes: 15 additions & 0 deletions src/ZODB/FileStorage/FileStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from ZODB.interfaces import IStorageIteration
from ZODB.interfaces import IStorageRestoreable
from ZODB.interfaces import IStorageUndoable
from ZODB.interfaces import IStorageLastPack
from ZODB.POSException import ConflictError
from ZODB.POSException import MultipleUndoErrors
from ZODB.POSException import POSKeyError
Expand Down Expand Up @@ -133,6 +134,7 @@ def __init__(self, afile):
IStorageCurrentRecordIteration,
IExternalGC,
IStorage,
IStorageLastPack,
)
class FileStorage(
FileStorageFormatter,
Expand All @@ -145,6 +147,11 @@ class FileStorage(

# Set True while a pack is in progress; undo is blocked for the duration.
_pack_is_in_progress = False
# last tid used as pack cut-point
# TODO save/load _lastPack with index - for it not to go to z64 on next
# file reopen. Currently lastPack is used only to detect race of load
# wrt simultaneous pack, so not persisting lastPack is practically ok.
_lastPack = z64

def __init__(self, file_name, create=False, read_only=False, stop=None,
quota=None, pack_gc=True, pack_keep_old=True, packer=None,
Expand Down Expand Up @@ -1183,6 +1190,11 @@ def packer(storage, referencesf, stop, gc):
finally:
p.close()

def lastPack(self):
"""lastPack implements IStorageLastPack."""
with self._lock:
return self._lastPack

def pack(self, t, referencesf, gc=None):
"""Copy data from the current database file to a packed file
Expand All @@ -1207,6 +1219,7 @@ def pack(self, t, referencesf, gc=None):
if self._pack_is_in_progress:
raise FileStorageError('Already packing')
self._pack_is_in_progress = True
stop = min(stop, self._ltid)

if gc is None:
gc = self._pack_gc
Expand Down Expand Up @@ -1245,6 +1258,8 @@ def pack(self, t, referencesf, gc=None):
self._file = open(self._file_name, 'r+b')
self._initIndex(index, self._tindex)
self._pos = opos
if stop > self._lastPack:
self._lastPack = stop

# We're basically done. Now we need to deal with removed
# blobs and removing the .old file (see further down).
Expand Down
18 changes: 18 additions & 0 deletions src/ZODB/MappingStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@zope.interface.implementer(
ZODB.interfaces.IStorage,
ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageLastPack,
)
class MappingStorage(object):
"""In-memory storage implementation
Expand Down Expand Up @@ -186,13 +187,30 @@ def new_oid(self):
self._oid += 1
return ZODB.utils.p64(self._oid)

# ZODB.interfaces.IStorageLastPack
@ZODB.utils.locked(opened)
def lastPack(self):
packed = self._last_pack
if packed is None:
return ZODB.utils.z64
else:
return packed

# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def pack(self, t, referencesf, gc=True):
if not self._data:
return

stop = ZODB.TimeStamp.TimeStamp(*time.gmtime(t)[:5]+(t%60,)).raw()
# clip stop to last committed transaction.
# don't use ._ltid as head - for MVCCMappingStorage ._ltid is specific
# to current storage instance, not whole storage history.
head = ZODB.utils.z64
if self._transactions:
head = self._transactions.maxKey()
stop = min(stop, head)

if self._last_pack is not None and self._last_pack >= stop:
if self._last_pack == stop:
return
Expand Down
31 changes: 31 additions & 0 deletions src/ZODB/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) Zope Corporation and Contributors.
Expand Down Expand Up @@ -749,6 +750,8 @@ def pack(pack_time, referencesf):
extract object references from database records. This is
needed to determine which objects are referenced from object
revisions.
See also: IStorageLastPack.
"""

def registerDB(wrapper):
Expand Down Expand Up @@ -876,6 +879,34 @@ def prefetch(oids, tid):
more than once.
"""

class IStorageLastPack(IStorage):

def lastPack(): # -> pack-cut-point (tid)
"""lastPack returns ID of the last transaction used as pack cut point.
For a database view with at ≥ lastPack, the storage guarantees to
persist all objects revisions to represent such view. For a database
view with at < lastPack, the storage does not provide such guarantee.
In particular pack can remove objects revisions that were non-current as
of lastPack database view at pack time.
Similarly, the storage gurantees to persist all transactions in
[lastPack, lastTransaction] range, while for [0, lastPack) range there
is no such guarantee. In particular pack can remove transactions with
only non-current objects revisions as of lastPack database view.
lastPack is non-decreasing - it can only grow, or stay equal over time.
lastPack is always ≤ IStorage.lastTransaction.
lastPack is related to pack_time passed to IStorage.pack - internally
that time is converted to transaction ID format after clipping into
valid range and looking up nearby transaction.
lastPack value cannot be cached - for client/storage case the call has
to perform round-trip and synchronize with the server.
"""


class IMultiCommitStorage(IStorage):
"""A multi-commit storage can commit multiple transactions at once.
Expand Down
36 changes: 22 additions & 14 deletions src/ZODB/mvccadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,12 @@ def load(self, oid):
r = self._storage.loadBefore(oid, self._start)
if r is None:
# object was deleted or not-yet-created.
# raise ReadConflictError - not - POSKeyError due to backward
# compatibility: a pack(t+δ) could be running simultaneously to our
# transaction that observes database as of t state. Such pack,
# because it packs the storage from a "future-to-us" point of view,
# can remove object revisions that we can try to load, for example:
# raise POSKeyError, or ReadConflictError, if the deletion is
# potentially due to simultaneous pack: a pack(t+δ) could be
# running simultaneously to our transaction that observes database
# as of t state. Such pack, because it packs the storage from a
# "future-to-us" point of view, can remove object revisions that we
# can try to load, for example:
#
# txn1 <-- t
# obj.revA
Expand All @@ -168,15 +169,22 @@ def load(self, oid):
#
# for such case we want user transaction to be restarted - not
# failed - by raising ConflictError subclass.
#
# XXX we don't detect for pack to be actually running - just assume
# the worst. It would be good if storage could provide information
# whether pack is/was actually running and its details, take that
# into account, and raise ReadConflictError only in the presence of
# database being simultaneously updated from back of its log.
raise POSException.ReadConflictError(
"load %s @%s: object deleted, likely by simultaneous pack" %
(oid_repr(oid), tid_repr(p64(u64(self._start)-1))))
if hasattr(self._storage, 'pack'):
lastPack = getattr(self._storage, 'lastPack', None)
if lastPack is not None:
packed = lastPack()
packConflict = (p64(u64(self._start)-1) < packed)
else:
packConflict = True # no lastPack information - assume the worst

if packConflict:
# database simultaneously packed from back of its log over our view of it
raise POSException.ReadConflictError(
"load %s @%s: object deleted, likely by simultaneous pack" %
(oid_repr(oid), tid_repr(p64(u64(self._start)-1))))

# no simultaneous pack detected, or lastPack was before our view of the database
raise POSException.POSKeyError(oid)

return r[:2]

Expand Down
1 change: 1 addition & 0 deletions src/ZODB/tests/MVCCMappingStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def new_instance(self):
inst._commit_lock = self._commit_lock
inst.new_oid = self.new_oid
inst.pack = self.pack
inst.lastPack = self.lastPack
inst.loadBefore = self.loadBefore
inst._ltid = self._ltid
inst._main_lock = self._lock
Expand Down
108 changes: 107 additions & 1 deletion src/ZODB/tests/PackableStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

import doctest
import time
import warnings
import gc

from persistent import Persistent
from persistent.mapping import PersistentMapping
from ZODB import DB
from ZODB.POSException import ConflictError, StorageError
from ZODB.POSException import ConflictError, StorageError, POSKeyError, \
ReadConflictError
from ZODB.serialize import referencesf
from ZODB.tests.MinPO import MinPO
from ZODB.tests.MTStorage import TestThread
Expand Down Expand Up @@ -528,6 +531,109 @@ def checkPackOnlyOneObject(self):
eq(pobj.getoid(), oid2)
eq(pobj.value, 11)

def checkPackVSConnectionGet(self):
# verify behaviour of Connection.get vs simultaneous pack:
#
# For deleted objects, in normal circumstances, get should raise POSKeyError.
# However when a pack was run simultaneously with packtime going after
# connection view of the database, for storages that do not implement
# IMVCCStorage natively, get should raise ReadConflictError instead.
#
# IMVCCStorage storages are not affected, since they natively provide
# isolation, via e.g. RDBMS in case of RelStorage. The isolation
# property provided by RDBMS guarantees that connection view of the
# database is not affected by other changes - e.g. pack - until
# connection's transaction is complete.
db = DB(self._storage)
eq = self.assertEqual
raises = self.assertRaises

# connA is main connection through which database changes are made
# @at0 - start
tmA = transaction.TransactionManager()
connA = db.open(transaction_manager=tmA)
rootA = connA.root()
rootA[0] = None
tmA.commit()
at0 = rootA._p_serial

# conn0 is "current" db connection that observes database as of @at0 state
tm0 = transaction.TransactionManager()
conn0 = db.open(transaction_manager=tm0)

# @at1 - new object is added to database and linked to root
rootA[0] = objA = MinPO(1)
tmA.commit()
oid = objA._p_oid
at1 = objA._p_serial

# conn1 is "current" db connection that observes database as of @at1 state
tm1 = transaction.TransactionManager()
conn1 = db.open(transaction_manager=tm1)

# @at2 - object value is modified
objA.value = 2
tmA.commit()
at2 = objA._p_serial

# ---- before pack ----

# conn0.get(oid) -> POSKeyError (as of @at0 object was not yet created)
errGetNoObject = POSKeyError
if (not ZODB.interfaces.IMVCCStorage.providedBy(self._storage)) and \
(not ZODB.interfaces.IStorageLastPack.providedBy(self._storage)):
warnings.warn("FIXME %s does not implement lastPack" %
type(self._storage), DeprecationWarning)
errGetNoObject = ReadConflictError
raises(errGetNoObject, conn0.get, oid)

# conn1.get(oid) -> obj(1)
obj1 = conn1.get(oid)
eq(obj1._p_oid, oid)
eq(obj1.value, 1)

# --- after pack to latest db head ----
db.pack(time.time()+1)

# IMVCCStorage - as before
#
# !IMVCCStorage:
# conn0.get(oid) -> ReadConflictError
# conn1.get(oid) -> ReadConflictError
#
# ( conn1: the pack removes obj@at1 revision, which results in conn1
# finding the object in non-existent/deleted state, traditionally this
# is reported as ReadConflictError for conn1's transaction to be
# restarted.
#
# conn0: obj@at0 never existed, but after pack@at2 it is
# indistinguishable whether obj@at0 revision was removed, or it never
# existed -> ReadConflictError too, similarly to conn1 )
conn0.cacheMinimize()
conn1.cacheMinimize()
del obj1
gc.collect()
if ZODB.interfaces.IMVCCStorage.providedBy(self._storage):
raises(POSKeyError, conn0.get, oid)
obj1 = conn1.get(oid)
eq(obj1._p_oid, oid)
eq(obj1.value, 1)

else:
# !IMVCCStorage
raises(ReadConflictError, conn0.get, oid)
raises(ReadConflictError, conn1.get, oid)

# connA stays ok
connA.cacheMinimize()
objA_ = connA.get(oid)
self.assertIs(objA_, objA)
eq(objA_.value, 2)

# end
self._sanity_check()
db.close()

class PackableStorageWithOptionalGC(PackableStorage):

def checkPackAllRevisionsNoGC(self):
Expand Down
14 changes: 14 additions & 0 deletions src/ZODB/tests/testConnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,20 @@ def doctest_get(self):
Traceback (most recent call last):
...
POSKeyError: 0x01
A request for an object that doesn't exist yet as of connection view of
the database will raise a POSKeyError too.
>>> tm2 = transaction.TransactionManager()
>>> cn2 = db.open(transaction_manager=tm2)
>>> root2 = cn2.root()
>>> obj2 = Persistent()
>>> root2[2] = obj2
>>> tm2.commit()
>>> cn.get(obj2._p_oid) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
"""

def doctest_close(self):
Expand Down
6 changes: 3 additions & 3 deletions src/ZODB/tests/testmvcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@
We'll reuse the code from the example above, except that there will
only be a single revision of "b." As a result, the attempt to
activate "b" will result in a ReadConflictError.
activate "b" will result in a POSKeyError.
>>> ts = TestStorage()
>>> db = DB(ts)
Expand All @@ -413,7 +413,7 @@
>>> r1["b"]._p_activate() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ReadConflictError: ...
POSKeyError: ...
>>> db.close()
"""
Expand All @@ -427,7 +427,7 @@
(re.compile("b('.*?')"), r"\1"),
# Python 3 adds module name to exceptions.
(re.compile("ZODB.POSException.ConflictError"), r"ConflictError"),
(re.compile("ZODB.POSException.ReadConflictError"), r"ReadConflictError"),
(re.compile("ZODB.POSException.POSKeyError"), r"POSKeyError"),
])

def test_suite():
Expand Down

0 comments on commit e21642f

Please sign in to comment.