Skip to content

Commit

Permalink
Let cache lock fail after 24h and warn after 2s (#498)
Browse files Browse the repository at this point in the history
* Let cache lock fail after 24h and warn after 2s

* Try to escape messages under Windows

* Simplify warning message

* Simplify docstring

* Remove unneeded code

* Update docstrings

* Update docstring of FolderLock
  • Loading branch information
hagenw authored Feb 20, 2025
1 parent 3cc10aa commit 551b58f
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 28 deletions.
1 change: 1 addition & 0 deletions audb/core/define.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"""

# Cache lock
TIMEOUT = 86400 # 24 h
CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions
LOCK_FILE = ".lock"
TIMEOUT_MSG = "Lock could not be acquired. Timeout exceeded."
Expand Down
18 changes: 8 additions & 10 deletions audb/core/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ def load(
pickle_tables: bool = True,
cache_root: str = None,
num_workers: int | None = 1,
timeout: float = -1,
timeout: float = define.TIMEOUT,
verbose: bool = True,
) -> audformat.Database | None:
r"""Load database.
Expand Down Expand Up @@ -1093,10 +1093,9 @@ def load(
num_workers: number of parallel jobs or 1 for sequential
processing. If ``None`` will be set to the number of
processors on the machine multiplied by 5
timeout: maximum wait time if another thread or process is already
accessing the database. If timeout is reached, ``None`` is
returned. If timeout < 0 the method will block until the
database can be accessed
timeout: maximum time in seconds
before giving up acquiring a lock to the database cache folder.
``None`` is returned in this case
verbose: show debug messages
Returns:
Expand Down Expand Up @@ -1491,7 +1490,7 @@ def load_media(
sampling_rate: int = None,
cache_root: str = None,
num_workers: int | None = 1,
timeout: float = -1,
timeout: float = define.TIMEOUT,
verbose: bool = True,
) -> list | None:
r"""Load media file(s).
Expand Down Expand Up @@ -1525,10 +1524,9 @@ def load_media(
num_workers: number of parallel jobs or 1 for sequential
processing. If ``None`` will be set to the number of
processors on the machine multiplied by 5
timeout: maximum wait time if another thread or process is already
accessing the database. If timeout is reached, ``None`` is
returned. If timeout < 0 the method will block until the
database can be accessed
timeout: maximum time in seconds
before giving up acquiring a lock to the database cache folder.
``None`` is returned in this case
verbose: show debug messages
Returns:
Expand Down
55 changes: 45 additions & 10 deletions audb/core/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from collections.abc import Sequence
import types
import warnings

import filelock

Expand All @@ -15,7 +16,8 @@ def __init__(
self,
folders: str | Sequence[str],
*,
timeout: float = -1,
timeout: float = define.TIMEOUT,
warning_timeout: float = 2,
):
r"""Lock one or more folders.
Expand All @@ -25,27 +27,60 @@ def __init__(
Args:
folders: path to one or more folders that should be locked
timeout: maximum wait time if another thread or process
is already accessing one or more locks.
timeout: maximum time in seconds
before giving up acquiring a lock to the database cache folder.
If timeout is reached,
an exception is raised.
If timeout < 0 the method will block
until the resource can be accessed
an exception is raised
warning_timeout: time in seconds
after which a warning is shown to the user
that the lock could not yet get acquired
Raises:
:class:`filelock.Timeout`: if a timeout is reached
"""
folders = audeer.to_list(folders)
files = [audeer.path(folder, define.LOCK_FILE) for folder in folders]

self.locks = [filelock.SoftFileLock(file) for file in files]
# In the past we used ``-1`` as default value for timeout
# to wait infinitely until the lock is acquired.
if timeout < 0:
warnings.warn(
"'timeout' values <0 are no longer supported. "
f"Changing your provided value of {timeout} to {define.TIMEOUT}"
)
timeout = define.TIMEOUT

self.lock_files = [audeer.path(folder, define.LOCK_FILE) for folder in folders]
self.locks = [filelock.SoftFileLock(file) for file in self.lock_files]
self.timeout = timeout
self.warning_timeout = warning_timeout

def __enter__(self) -> "FolderLock":
r"""Acquire the lock(s)."""
for lock in self.locks:
lock.acquire(self.timeout)
for lock, lock_file in zip(self.locks, self.lock_files):
remaining_time = self.timeout
acquired = False
# First try to acquire lock in warning_timeout time
if self.warning_timeout < self.timeout:
try:
lock.acquire(timeout=self.warning_timeout)
acquired = True
except filelock.Timeout:
warnings.warn(
f"Lock could not be acquired immediately.\n"
"Another user might loading the same database,\n"
f"or the lock file '{lock_file}' is left from a failed job "
"and needs to be deleted manually.\n"
"You can check who created it when by running: "
f"'ls -lh {lock_file}' in bash.\n"
f"Still trying for {self.timeout - self.warning_timeout:.1f} "
"more seconds...\n"
)
remaining_time = self.timeout - self.warning_timeout

if not acquired:
lock.acquire(timeout=remaining_time)

return self

def __exit__(
Expand Down
10 changes: 5 additions & 5 deletions audb/core/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import audformat

from audb.core import define
from audb.core.api import dependencies
from audb.core.cache import database_cache_root
from audb.core.dependencies import error_message_missing_object
Expand Down Expand Up @@ -428,7 +429,7 @@ def stream(
full_path: bool = True,
cache_root: str = None,
num_workers: int | None = 1,
timeout: float = -1,
timeout: float = define.TIMEOUT,
verbose: bool = True,
) -> DatabaseIterator:
r"""Stream table and media files of a database.
Expand Down Expand Up @@ -488,10 +489,9 @@ def stream(
num_workers: number of parallel jobs or 1 for sequential
processing. If ``None`` will be set to the number of
processors on the machine multiplied by 5
timeout: maximum wait time if another thread or process is already
accessing the database. If timeout is reached, ``None`` is
returned. If timeout < 0 the method will block until the
database can be accessed
timeout: maximum time in seconds
before giving up acquiring a lock to the database cache folder.
``None`` is returned in this case
verbose: show debug messages
Returns:
Expand Down
24 changes: 24 additions & 0 deletions tests/test_lock.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import re
import threading
import time

import filelock
import pytest

import audeer

Expand Down Expand Up @@ -143,3 +145,25 @@ def test_lock(tmpdir):
num_workers=3,
)
assert result == [1, 0, 1]


def test_lock_warning_and_failure(tmpdir):
"""Test user warning and lock failure messages."""
# Create lock file to force failing acquiring of lock
lock_file = audeer.touch(tmpdir, ".lock")
lock_error = filelock.Timeout
lock_error_msg = f"The file lock '{lock_file}' could not be acquired."
warning_msg = (
f"Lock could not be acquired immediately.\n"
"Another user might loading the same database,\n"
f"or the lock file '{lock_file}' is left from a failed job "
"and needs to be deleted manually.\n"
"You can check who created it when by running: "
f"'ls -lh {lock_file}' in bash.\n"
f"Still trying for 0.1 "
"more seconds...\n"
)
with pytest.warns(UserWarning, match=re.escape(warning_msg)):
with pytest.raises(lock_error, match=re.escape(lock_error_msg)):
with FolderLock(tmpdir, warning_timeout=0.1, timeout=0.2):
pass
35 changes: 33 additions & 2 deletions tests/test_lock_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ def load_db(timeout):
@pytest.mark.parametrize(
"num_workers, timeout, expected",
[
(2, -1, 2),
(2, 9999, 2),
(2, 0, 1),
],
Expand Down Expand Up @@ -330,6 +329,39 @@ def test_lock_load_crash(set_repositories):
load_db(-1)


@pytest.mark.parametrize(
"set_repositories",
["slow-file-system"],
indirect=True,
)
@pytest.mark.parametrize(
"num_workers, timeout, expected",
[
(2, -1, 2),
],
)
def test_lock_load_deprecated_timeout(
set_repositories,
num_workers,
timeout,
expected,
):
"""Test timeout <0 argument."""
params = [([timeout], {})] * num_workers
msg = (
"'timeout' values <0 are no longer supported. "
f"Changing your provided value of {timeout} to {audb.core.define.TIMEOUT}"
)
with pytest.warns(UserWarning, match=msg):
result = audeer.run_tasks(
load_db,
params,
num_workers=num_workers,
)
result = [x for x in result if x is not None]
assert len(result) == expected


@pytest.mark.parametrize(
"set_repositories",
["file-system"],
Expand Down Expand Up @@ -501,7 +533,6 @@ def load_media(timeout):
@pytest.mark.parametrize(
"num_workers, timeout, expected",
[
(2, -1, 2),
(2, 9999, 2),
(2, 0, 1),
],
Expand Down
1 change: 0 additions & 1 deletion tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,5 @@ def test_database_iterator_error():
full_path=False,
cache_root=None,
num_workers=1,
timeout=-1,
verbose=False,
)

0 comments on commit 551b58f

Please sign in to comment.