Skip to content

Commit

Permalink
refactor(anta): Update MultiProtocolCaps custom type to make it more …
Browse files Browse the repository at this point in the history
…restrictive (#1021)

* Issue_1015: Updated MultiProtocolCaps custom type to make it more restrictive

* Updated regex pattern as match, and example docstring

* Issue_1015: Added regex patterns to support eos format with spaces, hyphen, underscore

* Added unit testsfor all the capabilities

* Fix codespell ignore + add Literal

* Fix unit tests

---------

Co-authored-by: Carl Baillargeon <[email protected]>
  • Loading branch information
geetanjalimanegslab and carl-baillargeon authored Feb 6, 2025
1 parent aaf4b9b commit 6f56280
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 78 deletions.
91 changes: 68 additions & 23 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@
REGEXP_TYPE_HOSTNAME = r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
"""Match hostname like `my-hostname`, `my-hostname-1`, `my-hostname-1-2`."""

# Regexp BGP AFI/SAFI
REGEXP_BGP_L2VPN_AFI = r"\b(l2[\s\-]?vpn[\s\-]?evpn)\b"
"""Match L2VPN EVPN AFI."""
REGEXP_BGP_IPV4_MPLS_LABELS = r"\b(ipv4[\s\-]?mpls[\s\-]?label(s)?)\b"
"""Match IPv4 MPLS Labels."""
REGEX_BGP_IPV4_MPLS_VPN = r"\b(ipv4[\s\-]?mpls[\s\-]?vpn)\b"
"""Match IPv4 MPLS VPN."""
REGEX_BGP_IPV4_UNICAST = r"\b(ipv4[\s\-]?uni[\s\-]?cast)\b"
"""Match IPv4 Unicast."""


def aaa_group_prefix(v: str) -> str:
"""Prefix the AAA method with 'group' if it is known."""
Expand Down Expand Up @@ -78,26 +68,57 @@ def interface_case_sensitivity(v: str) -> str:
def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
"""Abbreviations for different BGP multiprotocol capabilities.
Handles different separators (hyphen, underscore, space) and case sensitivity.
Examples
--------
- IPv4 Unicast
- L2vpnEVPN
- ipv4 MPLS Labels
- ipv4Mplsvpn
```python
>>> bgp_multiprotocol_capabilities_abbreviations("IPv4 Unicast")
'ipv4Unicast'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4-Flow_Spec Vpn")
'ipv4FlowSpecVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv6_labeled-unicast")
'ipv6MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4_mpls_vpn")
'ipv4MplsVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4 mpls labels")
'ipv4MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("rt-membership")
'rtMembership'
>>> bgp_multiprotocol_capabilities_abbreviations("dynamic-path-selection")
'dps'
```
"""
patterns = {
REGEXP_BGP_L2VPN_AFI: "l2VpnEvpn",
REGEXP_BGP_IPV4_MPLS_LABELS: "ipv4MplsLabels",
REGEX_BGP_IPV4_MPLS_VPN: "ipv4MplsVpn",
REGEX_BGP_IPV4_UNICAST: "ipv4Unicast",
f"{r'dynamic[-_ ]?path[-_ ]?selection$'}": "dps",
f"{r'dps$'}": "dps",
f"{r'ipv4[-_ ]?unicast$'}": "ipv4Unicast",
f"{r'ipv6[-_ ]?unicast$'}": "ipv6Unicast",
f"{r'ipv4[-_ ]?multicast$'}": "ipv4Multicast",
f"{r'ipv6[-_ ]?multicast$'}": "ipv6Multicast",
f"{r'ipv4[-_ ]?labeled[-_ ]?Unicast$'}": "ipv4MplsLabels",
f"{r'ipv4[-_ ]?mpls[-_ ]?labels$'}": "ipv4MplsLabels",
f"{r'ipv6[-_ ]?labeled[-_ ]?Unicast$'}": "ipv6MplsLabels",
f"{r'ipv6[-_ ]?mpls[-_ ]?labels$'}": "ipv6MplsLabels",
f"{r'ipv4[-_ ]?sr[-_ ]?te$'}": "ipv4SrTe", # codespell:ignore
f"{r'ipv6[-_ ]?sr[-_ ]?te$'}": "ipv6SrTe", # codespell:ignore
f"{r'ipv4[-_ ]?mpls[-_ ]?vpn$'}": "ipv4MplsVpn",
f"{r'ipv6[-_ ]?mpls[-_ ]?vpn$'}": "ipv6MplsVpn",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec$'}": "ipv4FlowSpec",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec$'}": "ipv6FlowSpec",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv4FlowSpecVpn",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv6FlowSpecVpn",
f"{r'l2[-_ ]?vpn[-_ ]?vpls$'}": "l2VpnVpls",
f"{r'l2[-_ ]?vpn[-_ ]?evpn$'}": "l2VpnEvpn",
f"{r'link[-_ ]?state$'}": "linkState",
f"{r'rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?mvpn$'}": "ipv4Mvpn",
}

for pattern, replacement in patterns.items():
match = re.search(pattern, value, re.IGNORECASE)
match = re.match(pattern, value, re.IGNORECASE)
if match:
return replacement

return value


Expand Down Expand Up @@ -145,7 +166,31 @@ def validate_regex(value: str) -> str:
EncryptionAlgorithm = Literal["RSA", "ECDSA"]
RsaKeySize = Literal[2048, 3072, 4096]
EcdsaKeySize = Literal[256, 384, 512]
MultiProtocolCaps = Annotated[str, BeforeValidator(bgp_multiprotocol_capabilities_abbreviations)]
MultiProtocolCaps = Annotated[
Literal[
"dps",
"ipv4Unicast",
"ipv6Unicast",
"ipv4Multicast",
"ipv6Multicast",
"ipv4MplsLabels",
"ipv6MplsLabels",
"ipv4SrTe",
"ipv6SrTe",
"ipv4MplsVpn",
"ipv6MplsVpn",
"ipv4FlowSpec",
"ipv6FlowSpec",
"ipv4FlowSpecVpn",
"ipv6FlowSpecVpn",
"l2VpnVpls",
"l2VpnEvpn",
"linkState",
"rtMembership",
"ipv4Mvpn",
],
BeforeValidator(bgp_multiprotocol_capabilities_abbreviations),
]
BfdInterval = Annotated[int, Field(ge=50, le=60000)]
BfdMultiplier = Annotated[int, Field(ge=3, le=50)]
ErrDisableReasons = Literal[
Expand Down
3 changes: 2 additions & 1 deletion anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,8 @@ class VerifyBGPPeerMPCaps(AntaTest):
vrf: default
strict: False
capabilities:
- ipv4Unicast
- ipv4 labeled-Unicast
- ipv4MplsVpn
```
"""

Expand Down
3 changes: 2 additions & 1 deletion examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ anta.tests.routing.bgp:
vrf: default
strict: False
capabilities:
- ipv4Unicast
- ipv4 labeled-Unicast
- ipv4MplsVpn
- VerifyBGPPeerRouteLimit:
# Verifies maximum routes and warning limit for BGP IPv4 peer(s).
bgp_peers:
Expand Down
14 changes: 7 additions & 7 deletions tests/units/anta_tests/routing/test_bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,12 +1384,12 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
{
"peer_address": "172.30.11.1",
"vrf": "default",
"capabilities": ["Ipv4 Unicast", "ipv4 Mpls labels"],
"capabilities": ["Ipv4Unicast", "ipv4 Mpls labels"],
},
{
"peer_address": "172.30.11.10",
"vrf": "MGMT",
"capabilities": ["ipv4 Unicast", "ipv4 MplsVpn"],
"capabilities": ["ipv4_Unicast", "ipv4 MplsVpn"],
},
]
},
Expand Down Expand Up @@ -1441,12 +1441,12 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
{
"peer_address": "172.30.11.10",
"vrf": "default",
"capabilities": ["ipv4Unicast", "L2 Vpn EVPN"],
"capabilities": ["ipv4Unicast", "l2-vpn-EVPN"],
},
{
"peer_address": "172.30.11.1",
"vrf": "MGMT",
"capabilities": ["ipv4Unicast", "L2 Vpn EVPN"],
"capabilities": ["ipv4Unicast", "l2vpnevpn"],
},
]
},
Expand Down Expand Up @@ -1575,7 +1575,7 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
{
"peer_address": "172.30.11.10",
"vrf": "MGMT",
"capabilities": ["ipv4unicast", "ipv4 mplsvpn", "L2vpnEVPN"],
"capabilities": ["ipv4_unicast", "ipv4 mplsvpn", "L2vpnEVPN"],
},
{
"peer_address": "172.30.11.11",
Expand Down Expand Up @@ -1656,13 +1656,13 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
"peer_address": "172.30.11.1",
"vrf": "default",
"strict": True,
"capabilities": ["Ipv4 Unicast", "ipv4 Mpls labels"],
"capabilities": ["Ipv4 Unicast", "ipv4MplsLabels"],
},
{
"peer_address": "172.30.11.10",
"vrf": "MGMT",
"strict": True,
"capabilities": ["ipv4 Unicast", "ipv4 MplsVpn"],
"capabilities": ["ipv4-Unicast", "ipv4MplsVpn"],
},
]
},
Expand Down
66 changes: 20 additions & 46 deletions tests/units/test_custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
import pytest

from anta.custom_types import (
REGEX_BGP_IPV4_MPLS_VPN,
REGEX_BGP_IPV4_UNICAST,
REGEX_TYPE_PORTCHANNEL,
REGEXP_BGP_IPV4_MPLS_LABELS,
REGEXP_BGP_L2VPN_AFI,
REGEXP_INTERFACE_ID,
REGEXP_PATH_MARKERS,
REGEXP_TYPE_EOS_INTERFACE,
Expand Down Expand Up @@ -50,40 +46,6 @@ def test_regexp_path_markers() -> None:
assert re.search(REGEXP_PATH_MARKERS, ".[]?<>") is None


def test_regexp_bgp_l2vpn_afi() -> None:
"""Test REGEXP_BGP_L2VPN_AFI."""
# Test strings that should match the pattern
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2vpn-evpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2 vpn evpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2-vpn evpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2vpn evpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2vpnevpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2 vpnevpn") is not None

# Test strings that should not match the pattern
assert re.search(REGEXP_BGP_L2VPN_AFI, "al2vpn evpn") is None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2vpn-evpna") is None


def test_regexp_bgp_ipv4_mpls_labels() -> None:
"""Test REGEXP_BGP_IPV4_MPLS_LABELS."""
assert re.search(REGEXP_BGP_IPV4_MPLS_LABELS, "ipv4-mpls-label") is not None
assert re.search(REGEXP_BGP_IPV4_MPLS_LABELS, "ipv4 mpls labels") is not None
assert re.search(REGEXP_BGP_IPV4_MPLS_LABELS, "ipv4Mplslabel") is None


def test_regex_bgp_ipv4_mpls_vpn() -> None:
"""Test REGEX_BGP_IPV4_MPLS_VPN."""
assert re.search(REGEX_BGP_IPV4_MPLS_VPN, "ipv4-mpls-vpn") is not None
assert re.search(REGEX_BGP_IPV4_MPLS_VPN, "ipv4_mplsvpn") is None


def test_regex_bgp_ipv4_unicast() -> None:
"""Test REGEX_BGP_IPV4_UNICAST."""
assert re.search(REGEX_BGP_IPV4_UNICAST, "ipv4-uni-cast") is not None
assert re.search(REGEX_BGP_IPV4_UNICAST, "ipv4+unicast") is None


def test_regexp_type_interface_id() -> None:
"""Test REGEXP_INTERFACE_ID."""
intf_id_re = re.compile(f"{REGEXP_INTERFACE_ID}")
Expand Down Expand Up @@ -209,13 +171,29 @@ def test_interface_autocomplete_failure() -> None:
("str_input", "expected_output"),
[
pytest.param("L2VPNEVPN", "l2VpnEvpn", id="l2VpnEvpn"),
pytest.param("ipv4-mplsLabels", "ipv4MplsLabels", id="ipv4MplsLabels"),
pytest.param("IPv4 Labeled Unicast", "ipv4MplsLabels", id="ipv4MplsLabels"),
pytest.param("ipv4-mpls-vpn", "ipv4MplsVpn", id="ipv4MplsVpn"),
pytest.param("ipv4-unicast", "ipv4Unicast", id="ipv4Unicast"),
pytest.param("BLAH", "BLAH", id="unmatched"),
pytest.param("ipv4_unicast", "ipv4Unicast", id="ipv4Unicast"),
pytest.param("ipv4 Mvpn", "ipv4Mvpn", id="ipv4Mvpn"),
pytest.param("ipv4_Flow-Spec Vpn", "ipv4FlowSpecVpn", id="ipv4FlowSpecVpn"),
pytest.param("Dynamic-Path-Selection", "dps", id="dps"),
pytest.param("ipv6unicast", "ipv6Unicast", id="ipv6Unicast"),
pytest.param("IPv4-Multicast", "ipv4Multicast", id="ipv4Multicast"),
pytest.param("IPv6_multicast", "ipv6Multicast", id="ipv6Multicast"),
pytest.param("ipv6_Mpls-Labels", "ipv6MplsLabels", id="ipv6MplsLabels"),
pytest.param("IPv4_SR_TE", "ipv4SrTe", id="ipv4SrTe"),
pytest.param("iPv6-sR-tE", "ipv6SrTe", id="ipv6SrTe"),
pytest.param("ipv6_mpls-vpn", "ipv6MplsVpn", id="ipv6MplsVpn"),
pytest.param("IPv4 Flow-spec", "ipv4FlowSpec", id="ipv4FlowSpec"),
pytest.param("IPv6Flow_spec", "ipv6FlowSpec", id="ipv6FlowSpec"),
pytest.param("ipv6 Flow-Spec Vpn", "ipv6FlowSpecVpn", id="ipv6FlowSpecVpn"),
pytest.param("L2VPN VPLS", "l2VpnVpls", id="l2VpnVpls"),
pytest.param("link-state", "linkState", id="linkState"),
pytest.param("RT_Membership", "rtMembership", id="rtMembership"),
pytest.param("ipv4-RT_Membership", "rtMembership", id="rtMembership"),
],
)
def test_bgp_multiprotocol_capabilities_abbreviationsh(str_input: str, expected_output: str) -> None:
def test_bgp_multiprotocol_capabilities_abbreviations(str_input: str, expected_output: str) -> None:
"""Test bgp_multiprotocol_capabilities_abbreviations."""
assert bgp_multiprotocol_capabilities_abbreviations(str_input) == expected_output

Expand Down Expand Up @@ -257,11 +235,7 @@ def test_interface_case_sensitivity_uppercase() -> None:
@pytest.mark.parametrize(
"str_input",
[
REGEX_BGP_IPV4_MPLS_VPN,
REGEX_BGP_IPV4_UNICAST,
REGEX_TYPE_PORTCHANNEL,
REGEXP_BGP_IPV4_MPLS_LABELS,
REGEXP_BGP_L2VPN_AFI,
REGEXP_INTERFACE_ID,
REGEXP_PATH_MARKERS,
REGEXP_TYPE_EOS_INTERFACE,
Expand Down

0 comments on commit 6f56280

Please sign in to comment.