Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

utils: add Auditd #61

Merged
merged 2 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pytest_mh/_private/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ def _pytest_report_teststatus(
for item in self.roles + self.hosts:
result = mh_utility_pytest_report_teststatus(item, report, config)
if result is not None:
# Change stored outcome since the hook may have changed it.
self.data.outcome = report.outcome
return result

return None
Expand Down
11 changes: 11 additions & 0 deletions pytest_mh/_private/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, pytest_config: pytest.Config) -> None:
self.current_mh: MultihostFixture | None = None
self.current_topology: str | None = None
self.required_hosts: list[MultihostHost] = []
self.pytest_session: pytest.Session | None = None

# CLI options
self.mh_config: str = pytest_config.getoption("mh_config")
Expand Down Expand Up @@ -168,6 +169,8 @@ def pytest_sessionstart(self, session: pytest.Session) -> None:

:meta private:
"""
self.pytest_session = session

# Calling the setup here instead of in constructor to allow running
# pytest --help and other action-less parameters.
self.setup()
Expand Down Expand Up @@ -435,9 +438,17 @@ def pytest_report_teststatus(
if self.current_mh is None:
return None

# Store current outcome in case it is changed by the hook
original_outcome = report.outcome

status = self.current_mh._pytest_report_teststatus(report, config)
setattr(report, "_pytest_mh__teststatus", status)

# If the outcome is changed and failed, count it towards failures.
if original_outcome != report.outcome and report.failed:
if self.pytest_session is not None:
self.pytest_session.testsfailed += 1

return status

@pytest.hookimpl(hookwrapper=True)
Expand Down
140 changes: 140 additions & 0 deletions pytest_mh/utils/auditd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from __future__ import annotations

import re
from typing import Literal

import pytest

from .. import MultihostHost, MultihostUtility
from ..ssh import SSHLog

__all__ = ["Auditd"]


class Auditd(MultihostUtility[MultihostHost]):
"""
Auditd utilities.

Collects audit logs and detects AVC denials.
"""

def __init__(
self,
host: MultihostHost,
*,
avc_mode: Literal["fail", "warn", "ignore"],
avc_filter: str | None = None,
) -> None:
"""
``avc_mode`` values:

* ``ignore``: all failures are ignored
* ``warn``: test result category is set to "AVC DENIALS" and the test is
marked as such in a test summary, however test outcome and pytest exit
code is kept intact
* ``fail``: test result category is set to "AVC DENIALS" and the test is
marked as such in a test summary, if a test outcome is ``passed`` it
is set to ``failed`` and pytest will return non-zero exit code

:param host: Multihost host.
:type host: MultihostHost
:param avc_mode: Action taken when AVC denial is found in audit logs.
:type avc_mode: Literal["fail", "warn", "ignore"]
:param avc_filter: Regular expression used to filter the AVC denials,
defaults to None
:type avc_filter: str | None, optional
"""
super().__init__(host)

self.avc_mode: Literal["fail", "warn", "ignore"] = avc_mode
self.avc_filter: str | None = avc_filter

self.artifacts: set[str] = {"/var/log/audit/audit.log"}
self._backup: str | None = None

def setup(self) -> None:
"""
Create backup of audit logs and clear them for current test run.
"""
super().setup()

result = self.host.ssh.run(
"""
set -e
tmp=`mktemp -d`
cp -r --archive /var/log/audit "$tmp"
truncate --size 0 /var/log/audit/audit.log*
echo $tmp
""",
log_level=SSHLog.Error,
)

self._backup = result.stdout.strip()

def teardown(self) -> None:
"""
Restore previous audit logs from backup and remove the backup.
"""
if self._backup is not None:
self.host.ssh.run(
f"""
set -e

for f in "{self._backup}"/audit/audit.log*; do
name=`basename "$f"`
cat "$f" > "/var/log/audit/$name"
done

rm -fr "{self._backup}"
""",
log_level=SSHLog.Error,
)

return super().teardown()

def pytest_report_teststatus(
self, report: pytest.CollectReport | pytest.TestReport, config: pytest.Config
) -> tuple[str, str, str | tuple[str, dict[str, bool]]] | None:
"""
Report AVC denial error if found and matches requested filter.

:param report: Pytest report
:type report: pytest.CollectReport | pytest.TestReport
:param config: Pytest config
:type config: pytest.Config
:return: Pytest test status
:rtype: tuple[str, str, str | tuple[str, dict[str, bool]]] | None
"""
if report.when != "call":
return None

if self.avc_mode == "ignore" or report.outcome == "skipped":
return None

self.logger.info("Checking for AVC denials")

result = self.host.ssh.run(
"ausearch --input-logs -m AVC,USER_AVC", raise_on_error=False, log_level=SSHLog.Silent
)
if result.rc:
return None

records = result.stdout
if not records:
return None

# Ignore if no message matches the filter
if self.avc_filter:
match = re.search(self.avc_filter, records)
if match is None:
return None

original_outcome = report.outcome

# Fail the test if fail mode is selected
if report.outcome == "passed" and self.avc_mode == "fail":
report.outcome = "failed"

# Count this test into "AVC DENIALS" category in the final summary,
# mark it with "A"/"AVC DENIAL" in short/verbose listing.
return ("AVC DENIALS", "A", f"{original_outcome.upper()}/AVC DENIAL")
Loading