Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Very basic autotesting #28

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ build/
dist/
*.egg-info
notes.md
.tox/
6 changes: 5 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ This project targets Python 3 exclusively. Python 3.4 support is optional and ca

If you are planning on working on a "larger" issue or feature, please add yourself to the corresponding issue on GitHub or create a new one there - before you start working. This helps to reduce duplicate effort and allows to coordinate developers.

Everything is supposed to be tested. However, right now, *refuse* does not have a single test on its own. It can merely be tested through filesystems relying on it. This is currently done based on [LoggedFS-python](https://github.com/pleiszenburg/loggedfs-python) for x86_64 Linux. **The main objective therefore is to add a testing infrastructure.** New features are welcome, too, but tests come first. Tests based on Qemu are the likely way to go because Qemu can emulate all kinds of architectures on a single machine. Anything else that helps testing *refuse* is also highly welcome. Static code analysis comes to mind, for instance.
Everything is supposed to be tested. However, right now, *refuse* has a very limited test suite. It can merely be tested through filesystems relying on it. This is currently done based on [LoggedFS-python](https://github.com/pleiszenburg/loggedfs-python) for x86_64 Linux. **The main objective therefore is to add a testing infrastructure.** New features are welcome, too, but tests come first. Tests based on Qemu are the likely way to go because Qemu can emulate all kinds of architectures on a single machine. Anything else that helps testing *refuse* is also highly welcome. Static code analysis comes to mind, for instance.

## Verifying changes

tox -e py38 # to run tests
11 changes: 11 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
[metadata]
long_description = file: README.md
license_file = LICENSE

[tox]
envlist =
py36,py37,py38,
flake8,
isolated_build = true

[testenv]
commands = pytest {posargs}
deps =
pytest
Empty file added tests/__init__.py
Empty file.
Empty file added tests/high/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions tests/high/test_mounted_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from unittest import mock
import os

from refuse.high import Operations

from tests.tools import fuse_high_mountpoint


def test_init_gets_called():
with mock.patch.object(Operations, 'init') as mocked:
mocked.return_value = None
with fuse_high_mountpoint():
mocked.assert_called_once_with('/')


def test_destroy_gets_called():
with mock.patch.object(Operations, 'destroy') as mocked:
mocked.return_value = None
with fuse_high_mountpoint():
mocked.assert_not_called()
mocked.assert_called_once_with('/')


def test_readdir_root():
with fuse_high_mountpoint() as mountpoint:
assert os.listdir(mountpoint) == []


def test_readdir_root_nonempty():
with fuse_high_mountpoint() as mountpoint:
with mock.patch.object(Operations, 'readdir') as mocked_readdir:
mocked_readdir.return_value = ['.', '..', 'omg_an_entry']
assert os.listdir(mountpoint) == ['omg_an_entry']
mocked_readdir.assert_called_once_with('/', 0)
Empty file added tests/low/__init__.py
Empty file.
21 changes: 21 additions & 0 deletions tests/low/test_mounted_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from unittest import mock

from refuse.low import FUSELL

from tests.tools import fuse_low_mountpoint


def test_init_gets_called():
with mock.patch.object(FUSELL, 'init') as mocked:
mocked.return_value = None
with fuse_low_mountpoint():
# TODO - verify parameters (second parameter `conn` is hard)
mocked.assert_called_once()


def test_destroy_gets_called():
with mock.patch.object(FUSELL, 'destroy') as mocked:
mocked.return_value = None
with fuse_low_mountpoint():
mocked.assert_not_called()
mocked.assert_called_once_with(None)
103 changes: 103 additions & 0 deletions tests/tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from contextlib import contextmanager
from threading import Thread
import subprocess
import tempfile
import time

from refuse.high import FUSE, Operations
from refuse.low import FUSELL


class MountTimeout(Exception):
pass


class FUSEThread(Thread):
def __init__(
self,
mountpoint,
fuse_class,
**fuse_kwargs,
):
super().__init__()
self.mountpoint = mountpoint
self.fuse_class = fuse_class
self.fuse_kwargs = fuse_kwargs

def run(self):
self.exc = None
try:
self.mount()
except Exception as e:
self.exc = e

def join(self):
super().join()
if self.exc:
raise self.exc

def mount(self):
self.fuse_class(
mountpoint=self.mountpoint,
**self.fuse_kwargs,
)

def unmount(self):
# fuse.fuse_exit() causes segafult
subprocess.run(
['fusermount', '-u', self.mountpoint, '-q'],
check=self.is_alive(), # this command has to succeed only if fuse thread is feeling good
)

def __enter__(self):
self.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.unmount()
self.join()


@contextmanager
def fuse_mountpoint(
*args,
mount_timeout=5.0, # seconds
**kwargs,
):
with tempfile.TemporaryDirectory() as mountpoint:
with FUSEThread(mountpoint, *args, **kwargs) as fuse_thread:

# dirty dirty active waiting for now
# no idea how to do it the clean way
waiting_start = time.monotonic()
while fuse_thread.is_alive() and not is_mountpoint_ready(mountpoint):
if time.monotonic() - waiting_start > mount_timeout:
raise MountTimeout()
time.sleep(0.01)

# do wrapped things
yield mountpoint


def fuse_high_mountpoint():
operations = Operations()
setattr(operations, 'use_ns', True)
return fuse_mountpoint(FUSE, operations=operations, foreground=True)


class FUSELLNS(FUSELL):
use_ns = True


def fuse_low_mountpoint():
return fuse_mountpoint(FUSELLNS)


def is_mountpoint_ready(mountpoint):
# AFAIK works only under linux. More platform-agnostic version may come later.
with open('/proc/mounts', 'rt') as mounts:
for mount in mounts:
typ, this_mountpoint, *_ = mount.split(' ')
if this_mountpoint == mountpoint:
return True
return False