From abe888f9a05bdb0bb2375a5ee705503d85278c71 Mon Sep 17 00:00:00 2001 From: vitthalmagadum Date: Sat, 23 Nov 2024 23:56:30 -0500 Subject: [PATCH 1/7] Updated input model --- anta/input_models/security.py | 69 +++++++++++++++++++++++++++++++++++ anta/tests/security.py | 24 ++---------- docs/api/tests.security.md | 16 ++++++++ 3 files changed, 88 insertions(+), 21 deletions(-) create mode 100644 anta/input_models/security.py diff --git a/anta/input_models/security.py b/anta/input_models/security.py new file mode 100644 index 000000000..cb94714c3 --- /dev/null +++ b/anta/input_models/security.py @@ -0,0 +1,69 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Module containing input models for security tests.""" + +from __future__ import annotations + +from ipaddress import IPv4Address +from typing import Any +from warnings import warn + +from pydantic import BaseModel, ConfigDict + + +class IPSecPeer(BaseModel): + """IPSec (Internet Protocol Security) model represents the details of an IPv4 security peer.""" + + model_config = ConfigDict(extra="forbid") + peer: IPv4Address + """The IPv4 address of the security peer.""" + vrf: str = "default" + """VRF context. Defaults to `default`.""" + connections: list[IPSecConn] | None = None + """A list of IPv4 security connections associated with the peer. Defaults to None.""" + + def __str__(self) -> str: + """Return a string representation of the IPSecPeer model. Used in failure messages. + + Examples + -------- + - Peer: 1.1.1.1 VRF: default + """ + return f"Peer: {self.peer} VRF: {self.vrf}" + +class IPSecConn(BaseModel): + """Details of an IPv4 security connection for a peer.""" + + model_config = ConfigDict(extra="forbid") + source_address: IPv4Address + """The IPv4 address of the source in the security connection.""" + destination_address: IPv4Address + """The IPv4 address of the destination in the security connection.""" + + def __str__(self) -> str: + """Return a string representation of the IPSecConn model. Used in failure messages. + + Examples + -------- + - Source: 1.1.1.1 Destination: 2.2.2.2 + """ + return f"Source: {self.source_address} Destination: {self.destination_address}" + + +class IPSecPeers(IPSecPeer): # pragma: no cover + """Alias for the IPSecPeer model to maintain backward compatibility. + + When initialized, it will emit a deprecation warning and call the IPSecPeers model. + + TODO: Remove this class in ANTA v2.0.0. + """ + + def __init__(self, **data: Any) -> None: # noqa: ANN401 + """Initialize the BgpAfi class, emitting a deprecation warning.""" + warn( + message="IPSecPeers model is deprecated and will be removed in ANTA v2.0.0. Use the IPSecPeer model instead.", + category=DeprecationWarning, + stacklevel=2, + ) + super().__init__(**data) diff --git a/anta/tests/security.py b/anta/tests/security.py index 13a48e577..bfcb50415 100644 --- a/anta/tests/security.py +++ b/anta/tests/security.py @@ -16,6 +16,7 @@ from anta.custom_types import EcdsaKeySize, EncryptionAlgorithm, PositiveInteger, RsaKeySize from anta.models import AntaCommand, AntaTemplate, AntaTest from anta.tools import get_failed_logs, get_item, get_value +from anta.input_models.security import IPSecPeer, IPSecPeers if TYPE_CHECKING: import sys @@ -726,28 +727,9 @@ class VerifySpecificIPSecConn(AntaTest): class Input(AntaTest.Input): """Input model for the VerifySpecificIPSecConn test.""" - ip_security_connections: list[IPSecPeers] + ip_security_connections: list[IPSecPeer] """List of IP4v security peers.""" - - class IPSecPeers(BaseModel): - """Details of IPv4 security peers.""" - - peer: IPv4Address - """IPv4 address of the peer.""" - - vrf: str = "default" - """Optional VRF for the IP security peer.""" - - connections: list[IPSecConn] | None = None - """Optional list of IPv4 security connections of a peer.""" - - class IPSecConn(BaseModel): - """Details of IPv4 security connections for a peer.""" - - source_address: IPv4Address - """Source IPv4 address of the connection.""" - destination_address: IPv4Address - """Destination IPv4 address of the connection.""" + IPSecPeers: ClassVar[type[IPSecPeers]] = IPSecPeers def render(self, template: AntaTemplate) -> list[AntaCommand]: """Render the template for each input IP Sec connection.""" diff --git a/docs/api/tests.security.md b/docs/api/tests.security.md index fe008ba66..b482b19cd 100644 --- a/docs/api/tests.security.md +++ b/docs/api/tests.security.md @@ -7,7 +7,10 @@ anta_title: ANTA catalog for security tests ~ that can be found in the LICENSE file. --> +# Tests + ::: anta.tests.security + options: show_root_heading: false show_root_toc_entry: false @@ -18,3 +21,16 @@ anta_title: ANTA catalog for security tests filters: - "!test" - "!render" + +# Input models + +::: anta.input_models.security + + options: + show_root_heading: false + show_root_toc_entry: false + show_bases: false + merge_init_into_class: false + anta_hide_test_module_description: true + show_labels: true + filters: ["!^__str__"] From caff862885a984e7dc357f98b8a6a25348a10da6 Mon Sep 17 00:00:00 2001 From: vitthalmagadum Date: Mon, 25 Nov 2024 05:53:36 -0500 Subject: [PATCH 2/7] Updated unit tests --- anta/input_models/security.py | 1 + anta/tests/security.py | 44 ++++++++++++------------- tests/units/anta_tests/test_security.py | 24 +++++--------- 3 files changed, 32 insertions(+), 37 deletions(-) diff --git a/anta/input_models/security.py b/anta/input_models/security.py index cb94714c3..b58b7e739 100644 --- a/anta/input_models/security.py +++ b/anta/input_models/security.py @@ -32,6 +32,7 @@ def __str__(self) -> str: """ return f"Peer: {self.peer} VRF: {self.vrf}" + class IPSecConn(BaseModel): """Details of an IPv4 security connection for a peer.""" diff --git a/anta/tests/security.py b/anta/tests/security.py index bfcb50415..f40f16468 100644 --- a/anta/tests/security.py +++ b/anta/tests/security.py @@ -8,15 +8,14 @@ # Mypy does not understand AntaTest.Input typing # mypy: disable-error-code=attr-defined from datetime import datetime, timezone -from ipaddress import IPv4Address from typing import TYPE_CHECKING, ClassVar, get_args from pydantic import BaseModel, Field, model_validator from anta.custom_types import EcdsaKeySize, EncryptionAlgorithm, PositiveInteger, RsaKeySize +from anta.input_models.security import IPSecPeer, IPSecPeers from anta.models import AntaCommand, AntaTemplate, AntaTest from anta.tools import get_failed_logs, get_item, get_value -from anta.input_models.security import IPSecPeer, IPSecPeers if TYPE_CHECKING: import sys @@ -693,15 +692,23 @@ def test(self) -> None: class VerifySpecificIPSecConn(AntaTest): - """Verifies the state of IPv4 security connections for a specified peer. + """Verifies the IPv4 security connections. + + This test performs the following checks for each specified address family: - It optionally allows for the verification of a specific path for a peer by providing source and destination addresses. - If these addresses are not provided, it will verify all paths for the specified peer. + 1. Validates that the VRF is configured. + 2. Checks for the presence of IPv4 security connections for the specified peer. + 3. For each relevant peer: + - If source and destination addresses are provided, Verifies the security connection for the specific path. + - If no addresses are provided, Verifies all security connections associated with the peer. + - Verifies that the connection is in the `Established` state. Expected Results ---------------- - * Success: The test passes if the IPv4 security connection for a peer is established in the specified VRF. - * Failure: The test fails if IPv4 security is not configured, a connection is not found for a peer, or the connection is not established in the specified VRF. + * Success: If all checks pass for all specified IPv4 security connections. + * Failure: If any of the following occur: + - No IPv4 security connections are found for the peer + - The security connection is not established for the specified path or any of the peer's connections when no path is specified. Examples -------- @@ -720,9 +727,9 @@ class VerifySpecificIPSecConn(AntaTest): ``` """ - description = "Verifies IPv4 security connections for a peer." + description = "Verifies the IPv4 security connections." categories: ClassVar[list[str]] = ["security"] - commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show ip security connection vrf {vrf} path peer {peer}")] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show ip security connection vrf {vrf} path peer {peer}", revision=2)] class Input(AntaTest.Input): """Input model for the VerifySpecificIPSecConn test.""" @@ -739,15 +746,15 @@ def render(self, template: AntaTemplate) -> list[AntaCommand]: def test(self) -> None: """Main test function for VerifySpecificIPSecConn.""" self.result.is_success() + for command_output, input_peer in zip(self.instance_commands, self.inputs.ip_security_connections): conn_output = command_output.json_output["connections"] - peer = command_output.params.peer - vrf = command_output.params.vrf conn_input = input_peer.connections + vrf = input_peer.vrf # Check if IPv4 security connection is configured if not conn_output: - self.result.is_failure(f"No IPv4 security connection configured for peer `{peer}`.") + self.result.is_failure(f"{input_peer} - Not configured") continue # If connection details are not provided then check all connections of a peer @@ -757,11 +764,7 @@ def test(self) -> None: if state != "Established": source = conn_data.get("saddr") destination = conn_data.get("daddr") - vrf = conn_data.get("tunnelNs") - self.result.is_failure( - f"Expected state of IPv4 security connection `source:{source} destination:{destination} vrf:{vrf}` for peer `{peer}` is `Established` " - f"but found `{state}` instead." - ) + self.result.is_failure(f"{input_peer} Source: {source} Destination: {destination} - Connection down; Expected: Established Actual: {state}") continue # Create a dictionary of existing connections for faster lookup @@ -777,13 +780,10 @@ def test(self) -> None: existing_state = existing_connections[(source_input, destination_input, vrf)] if existing_state != "Established": self.result.is_failure( - f"Expected state of IPv4 security connection `source:{source_input} destination:{destination_input} vrf:{vrf}` " - f"for peer `{peer}` is `Established` but found `{existing_state}` instead." + f"{input_peer} Source: {source_input} Destination: {destination_input} - Connection down; Expected: Established Actual: {existing_state}" ) else: - self.result.is_failure( - f"IPv4 security connection `source:{source_input} destination:{destination_input} vrf:{vrf}` for peer `{peer}` is not found." - ) + self.result.is_failure(f"{input_peer} Source: {source_input} Destination: {destination_input} - Connection not found.") class VerifyHardwareEntropy(AntaTest): diff --git a/tests/units/anta_tests/test_security.py b/tests/units/anta_tests/test_security.py index 0d4a478b0..2e9b92302 100644 --- a/tests/units/anta_tests/test_security.py +++ b/tests/units/anta_tests/test_security.py @@ -1079,7 +1079,7 @@ }, ] }, - "expected": {"result": "failure", "messages": ["No IPv4 security connection configured for peer `10.255.0.1`."]}, + "expected": {"result": "failure", "messages": ["Peer: 10.255.0.1 VRF: default - Not configured"]}, }, { "name": "failure-not-established", @@ -1127,14 +1127,10 @@ "expected": { "result": "failure", "messages": [ - "Expected state of IPv4 security connection `source:172.18.3.2 destination:172.18.2.2 vrf:default` for peer `10.255.0.1` is `Established` " - "but found `Idle` instead.", - "Expected state of IPv4 security connection `source:100.64.2.2 destination:100.64.1.2 vrf:default` for peer `10.255.0.1` is `Established` " - "but found `Idle` instead.", - "Expected state of IPv4 security connection `source:100.64.2.2 destination:100.64.1.2 vrf:MGMT` for peer `10.255.0.2` is `Established` " - "but found `Idle` instead.", - "Expected state of IPv4 security connection `source:172.18.2.2 destination:172.18.1.2 vrf:MGMT` for peer `10.255.0.2` is `Established` " - "but found `Idle` instead.", + "Peer: 10.255.0.1 VRF: default Source: 172.18.3.2 Destination: 172.18.2.2 - Connection down; Expected: Established Actual: Idle", + "Peer: 10.255.0.1 VRF: default Source: 100.64.2.2 Destination: 100.64.1.2 - Connection down; Expected: Established Actual: Idle", + "Peer: 10.255.0.2 VRF: MGMT Source: 100.64.2.2 Destination: 100.64.1.2 - Connection down; Expected: Established Actual: Idle", + "Peer: 10.255.0.2 VRF: MGMT Source: 172.18.2.2 Destination: 172.18.1.2 - Connection down; Expected: Established Actual: Idle", ], }, }, @@ -1194,12 +1190,10 @@ "expected": { "result": "failure", "messages": [ - "Expected state of IPv4 security connection `source:172.18.3.2 destination:172.18.2.2 vrf:default` for peer `10.255.0.1` is `Established` " - "but found `Idle` instead.", - "Expected state of IPv4 security connection `source:100.64.3.2 destination:100.64.2.2 vrf:default` for peer `10.255.0.1` is `Established` " - "but found `Idle` instead.", - "IPv4 security connection `source:100.64.4.2 destination:100.64.1.2 vrf:default` for peer `10.255.0.2` is not found.", - "IPv4 security connection `source:172.18.4.2 destination:172.18.1.2 vrf:default` for peer `10.255.0.2` is not found.", + "Peer: 10.255.0.1 VRF: default Source: 172.18.3.2 Destination: 172.18.2.2 - Connection down; Expected: Established Actual: Idle", + "Peer: 10.255.0.1 VRF: default Source: 100.64.3.2 Destination: 100.64.2.2 - Connection down; Expected: Established Actual: Idle", + "Peer: 10.255.0.2 VRF: default Source: 100.64.4.2 Destination: 100.64.1.2 - Connection not found.", + "Peer: 10.255.0.2 VRF: default Source: 172.18.4.2 Destination: 172.18.1.2 - Connection not found.", ], }, }, From b0cc060af7b564ed2818f276ba3e4407ee46a622 Mon Sep 17 00:00:00 2001 From: vitthalmagadum Date: Mon, 25 Nov 2024 06:10:13 -0500 Subject: [PATCH 3/7] Updated documentation --- anta/input_models/security.py | 6 +++--- docs/api/tests.security.md | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/anta/input_models/security.py b/anta/input_models/security.py index b58b7e739..f8b59545c 100644 --- a/anta/input_models/security.py +++ b/anta/input_models/security.py @@ -53,15 +53,15 @@ def __str__(self) -> str: class IPSecPeers(IPSecPeer): # pragma: no cover - """Alias for the IPSecPeer model to maintain backward compatibility. + """Alias for the IPSecPeers model to maintain backward compatibility. - When initialized, it will emit a deprecation warning and call the IPSecPeers model. + When initialized, it will emit a deprecation warning and call the IPSecPeer model. TODO: Remove this class in ANTA v2.0.0. """ def __init__(self, **data: Any) -> None: # noqa: ANN401 - """Initialize the BgpAfi class, emitting a deprecation warning.""" + """Initialize the IPSecPeer class, emitting a deprecation warning.""" warn( message="IPSecPeers model is deprecated and will be removed in ANTA v2.0.0. Use the IPSecPeer model instead.", category=DeprecationWarning, diff --git a/docs/api/tests.security.md b/docs/api/tests.security.md index b482b19cd..599783236 100644 --- a/docs/api/tests.security.md +++ b/docs/api/tests.security.md @@ -33,4 +33,6 @@ anta_title: ANTA catalog for security tests merge_init_into_class: false anta_hide_test_module_description: true show_labels: true - filters: ["!^__str__"] + filters: + - "!^__init__" + - "!^__str__" From eb584dc0f4744fe31302d281a5e3d2485dcac586 Mon Sep 17 00:00:00 2001 From: vitthalmagadum Date: Thu, 28 Nov 2024 01:50:03 -0500 Subject: [PATCH 4/7] Addressed review comments: updated failure msgs --- anta/tests/security.py | 11 ++++++----- tests/units/anta_tests/test_security.py | 12 ++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/anta/tests/security.py b/anta/tests/security.py index f40f16468..33805c4f8 100644 --- a/anta/tests/security.py +++ b/anta/tests/security.py @@ -727,7 +727,6 @@ class VerifySpecificIPSecConn(AntaTest): ``` """ - description = "Verifies the IPv4 security connections." categories: ClassVar[list[str]] = ["security"] commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show ip security connection vrf {vrf} path peer {peer}", revision=2)] @@ -737,6 +736,7 @@ class Input(AntaTest.Input): ip_security_connections: list[IPSecPeer] """List of IP4v security peers.""" IPSecPeers: ClassVar[type[IPSecPeers]] = IPSecPeers + """To maintain backward compatibility.""" def render(self, template: AntaTemplate) -> list[AntaCommand]: """Render the template for each input IP Sec connection.""" @@ -764,7 +764,9 @@ def test(self) -> None: if state != "Established": source = conn_data.get("saddr") destination = conn_data.get("daddr") - self.result.is_failure(f"{input_peer} Source: {source} Destination: {destination} - Connection down; Expected: Established Actual: {state}") + self.result.is_failure( + f"{input_peer} Source: {source} Destination: {destination} - Connection down - Expected: Established, Actual: {state}" + ) continue # Create a dictionary of existing connections for faster lookup @@ -779,9 +781,8 @@ def test(self) -> None: if (source_input, destination_input, vrf) in existing_connections: existing_state = existing_connections[(source_input, destination_input, vrf)] if existing_state != "Established": - self.result.is_failure( - f"{input_peer} Source: {source_input} Destination: {destination_input} - Connection down; Expected: Established Actual: {existing_state}" - ) + failure = f"Expected: Established, Actual: {existing_state}" + self.result.is_failure(f"{input_peer} Source: {source_input} Destination: {destination_input} - Connection down - {failure}") else: self.result.is_failure(f"{input_peer} Source: {source_input} Destination: {destination_input} - Connection not found.") diff --git a/tests/units/anta_tests/test_security.py b/tests/units/anta_tests/test_security.py index 2e9b92302..472eb7e18 100644 --- a/tests/units/anta_tests/test_security.py +++ b/tests/units/anta_tests/test_security.py @@ -1127,10 +1127,10 @@ "expected": { "result": "failure", "messages": [ - "Peer: 10.255.0.1 VRF: default Source: 172.18.3.2 Destination: 172.18.2.2 - Connection down; Expected: Established Actual: Idle", - "Peer: 10.255.0.1 VRF: default Source: 100.64.2.2 Destination: 100.64.1.2 - Connection down; Expected: Established Actual: Idle", - "Peer: 10.255.0.2 VRF: MGMT Source: 100.64.2.2 Destination: 100.64.1.2 - Connection down; Expected: Established Actual: Idle", - "Peer: 10.255.0.2 VRF: MGMT Source: 172.18.2.2 Destination: 172.18.1.2 - Connection down; Expected: Established Actual: Idle", + "Peer: 10.255.0.1 VRF: default Source: 172.18.3.2 Destination: 172.18.2.2 - Connection down - Expected: Established, Actual: Idle", + "Peer: 10.255.0.1 VRF: default Source: 100.64.2.2 Destination: 100.64.1.2 - Connection down - Expected: Established, Actual: Idle", + "Peer: 10.255.0.2 VRF: MGMT Source: 100.64.2.2 Destination: 100.64.1.2 - Connection down - Expected: Established, Actual: Idle", + "Peer: 10.255.0.2 VRF: MGMT Source: 172.18.2.2 Destination: 172.18.1.2 - Connection down - Expected: Established, Actual: Idle", ], }, }, @@ -1190,8 +1190,8 @@ "expected": { "result": "failure", "messages": [ - "Peer: 10.255.0.1 VRF: default Source: 172.18.3.2 Destination: 172.18.2.2 - Connection down; Expected: Established Actual: Idle", - "Peer: 10.255.0.1 VRF: default Source: 100.64.3.2 Destination: 100.64.2.2 - Connection down; Expected: Established Actual: Idle", + "Peer: 10.255.0.1 VRF: default Source: 172.18.3.2 Destination: 172.18.2.2 - Connection down - Expected: Established, Actual: Idle", + "Peer: 10.255.0.1 VRF: default Source: 100.64.3.2 Destination: 100.64.2.2 - Connection down - Expected: Established, Actual: Idle", "Peer: 10.255.0.2 VRF: default Source: 100.64.4.2 Destination: 100.64.1.2 - Connection not found.", "Peer: 10.255.0.2 VRF: default Source: 172.18.4.2 Destination: 172.18.1.2 - Connection not found.", ], From d2e0b3ba4bff418d9e5c98a3953dd267c3d124cd Mon Sep 17 00:00:00 2001 From: vitthalmagadum Date: Thu, 28 Nov 2024 22:33:31 -0500 Subject: [PATCH 5/7] Removed __str__ as redundant for now --- anta/input_models/security.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/anta/input_models/security.py b/anta/input_models/security.py index f8b59545c..373d89735 100644 --- a/anta/input_models/security.py +++ b/anta/input_models/security.py @@ -42,15 +42,6 @@ class IPSecConn(BaseModel): destination_address: IPv4Address """The IPv4 address of the destination in the security connection.""" - def __str__(self) -> str: - """Return a string representation of the IPSecConn model. Used in failure messages. - - Examples - -------- - - Source: 1.1.1.1 Destination: 2.2.2.2 - """ - return f"Source: {self.source_address} Destination: {self.destination_address}" - class IPSecPeers(IPSecPeer): # pragma: no cover """Alias for the IPSecPeers model to maintain backward compatibility. From 285438c0e45c17ca30b44ddc0ec0cf3a04831da0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:46:57 +0000 Subject: [PATCH 6/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- examples/tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/tests.yaml b/examples/tests.yaml index 273d20a5b..4f6147d29 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -640,7 +640,7 @@ anta.tests.security: - VerifySSHStatus: # Verifies if the SSHD agent is disabled in the default VRF. - VerifySpecificIPSecConn: - # Verifies IPv4 security connections for a peer. + # Verifies the IPv4 security connections. ip_security_connections: - peer: 10.255.0.1 - peer: 10.255.0.2 From 75826e7fa61861de7e4b2a8ecb503a0d880072dc Mon Sep 17 00:00:00 2001 From: Guillaume Mulocher Date: Wed, 4 Dec 2024 16:52:24 +0100 Subject: [PATCH 7/7] Apply suggestions from code review --- anta/tests/security.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/anta/tests/security.py b/anta/tests/security.py index 760fa17da..38bf2409e 100644 --- a/anta/tests/security.py +++ b/anta/tests/security.py @@ -694,21 +694,20 @@ def test(self) -> None: class VerifySpecificIPSecConn(AntaTest): """Verifies the IPv4 security connections. - This test performs the following checks for each specified address family: + This test performs the following checks for each peer: 1. Validates that the VRF is configured. 2. Checks for the presence of IPv4 security connections for the specified peer. 3. For each relevant peer: - - If source and destination addresses are provided, Verifies the security connection for the specific path. - - If no addresses are provided, Verifies all security connections associated with the peer. - - Verifies that the connection is in the `Established` state. + - If source and destination addresses are provided, verifies the security connection for the specific path exists and is `Established`. + - If no addresses are provided, verifies that all security connections associated with the peer are `Established`. Expected Results ---------------- * Success: If all checks pass for all specified IPv4 security connections. * Failure: If any of the following occur: - No IPv4 security connections are found for the peer - - The security connection is not established for the specified path or any of the peer's connections when no path is specified. + - The security connection is not established for the specified path or any of the peer connections is not established when no path is specified. Examples --------