Skip to content

Commit

Permalink
feat(anta): Added testcase to verify the BGP Redistributed Routes (#993)
Browse files Browse the repository at this point in the history
  • Loading branch information
geetanjalimanegslab authored Feb 13, 2025
1 parent b652ed1 commit 6eabc52
Show file tree
Hide file tree
Showing 6 changed files with 688 additions and 11 deletions.
83 changes: 79 additions & 4 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
"""Match hostname like `my-hostname`, `my-hostname-1`, `my-hostname-1-2`."""


# Regular expression for BGP redistributed routes
REGEX_IPV4_UNICAST = r"ipv4[-_ ]?unicast$"
REGEX_IPV4_MULTICAST = r"ipv4[-_ ]?multicast$"
REGEX_IPV6_UNICAST = r"ipv6[-_ ]?unicast$"
REGEX_IPV6_MULTICAST = r"ipv6[-_ ]?multicast$"


def aaa_group_prefix(v: str) -> str:
"""Prefix the AAA method with 'group' if it is known."""
built_in_methods = ["local", "none", "logging"]
Expand Down Expand Up @@ -92,10 +99,10 @@ def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
patterns = {
f"{r'dynamic[-_ ]?path[-_ ]?selection$'}": "dps",
f"{r'dps$'}": "dps",
f"{r'ipv4[-_ ]?unicast$'}": "ipv4Unicast",
f"{r'ipv6[-_ ]?unicast$'}": "ipv6Unicast",
f"{r'ipv4[-_ ]?multicast$'}": "ipv4Multicast",
f"{r'ipv6[-_ ]?multicast$'}": "ipv6Multicast",
f"{REGEX_IPV4_UNICAST}": "ipv4Unicast",
f"{REGEX_IPV6_UNICAST}": "ipv6Unicast",
f"{REGEX_IPV4_MULTICAST}": "ipv4Multicast",
f"{REGEX_IPV6_MULTICAST}": "ipv6Multicast",
f"{r'ipv4[-_ ]?labeled[-_ ]?Unicast$'}": "ipv4MplsLabels",
f"{r'ipv4[-_ ]?mpls[-_ ]?labels$'}": "ipv4MplsLabels",
f"{r'ipv6[-_ ]?labeled[-_ ]?Unicast$'}": "ipv6MplsLabels",
Expand Down Expand Up @@ -132,6 +139,54 @@ def validate_regex(value: str) -> str:
return value


def bgp_redistributed_route_proto_abbreviations(value: str) -> str:
"""Abbreviations for different BGP redistributed route protocols.
Handles different separators (hyphen, underscore, space) and case sensitivity.
Examples
--------
```python
>>> bgp_redistributed_route_proto_abbreviations("IPv4 Unicast")
'v4u'
>>> bgp_redistributed_route_proto_abbreviations("IPv4-multicast")
'v4m'
>>> bgp_redistributed_route_proto_abbreviations("IPv6_multicast")
'v6m'
>>> bgp_redistributed_route_proto_abbreviations("ipv6unicast")
'v6u'
```
"""
patterns = {REGEX_IPV4_UNICAST: "v4u", REGEX_IPV4_MULTICAST: "v4m", REGEX_IPV6_UNICAST: "v6u", REGEX_IPV6_MULTICAST: "v6m"}

for pattern, replacement in patterns.items():
match = re.match(pattern, value, re.IGNORECASE)
if match:
return replacement

return value


def update_bgp_redistributed_proto_user(value: str) -> str:
"""Update BGP redistributed route `User` proto with EOS SDK.
Examples
--------
```python
>>> update_bgp_redistributed_proto_user("User")
'EOS SDK'
>>> update_bgp_redistributed_proto_user("Bgp")
'Bgp'
>>> update_bgp_redistributed_proto_user("RIP")
'RIP'
```
"""
if value == "User":
value = "EOS SDK"

return value


# AntaTest.Input types
AAAAuthMethod = Annotated[str, AfterValidator(aaa_group_prefix)]
Vlan = Annotated[int, Field(ge=0, le=4094)]
Expand Down Expand Up @@ -319,3 +374,23 @@ def snmp_v3_prefix(auth_type: Literal["auth", "priv", "noauth"]) -> str:
"inVersionErrs", "inBadCommunityNames", "inBadCommunityUses", "inParseErrs", "outTooBigErrs", "outNoSuchNameErrs", "outBadValueErrs", "outGeneralErrs"
]
SnmpVersionV3AuthType = Annotated[Literal["auth", "priv", "noauth"], AfterValidator(snmp_v3_prefix)]
RedistributedProtocol = Annotated[
Literal[
"AttachedHost",
"Bgp",
"Connected",
"Dynamic",
"IS-IS",
"OSPF Internal",
"OSPF External",
"OSPF Nssa-External",
"OSPFv3 Internal",
"OSPFv3 External",
"OSPFv3 Nssa-External",
"RIP",
"Static",
"User",
],
AfterValidator(update_bgp_redistributed_proto_user),
]
RedistributedAfiSafi = Annotated[Literal["v4u", "v4m", "v6u", "v6m"], BeforeValidator(bgp_redistributed_route_proto_abbreviations)]
75 changes: 72 additions & 3 deletions anta/input_models/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pydantic import BaseModel, ConfigDict, Field, PositiveInt, model_validator
from pydantic_extra_types.mac_address import MacAddress

from anta.custom_types import Afi, BgpDropStats, BgpUpdateError, MultiProtocolCaps, Safi, Vni
from anta.custom_types import Afi, BgpDropStats, BgpUpdateError, MultiProtocolCaps, RedistributedAfiSafi, RedistributedProtocol, Safi, Vni

if TYPE_CHECKING:
import sys
Expand Down Expand Up @@ -68,8 +68,7 @@ class BgpAddressFamily(BaseModel):
check_peer_state: bool = False
"""Flag to check if the peers are established with negotiated AFI/SAFI. Defaults to `False`.
Can be enabled in the `VerifyBGPPeerCount` tests.
"""
Can be enabled in the `VerifyBGPPeerCount` tests."""

@model_validator(mode="after")
def validate_inputs(self) -> Self:
Expand Down Expand Up @@ -256,3 +255,73 @@ def __str__(self) -> str:
- Next-hop: 192.168.66.101 Origin: Igp
"""
return f"Next-hop: {self.nexthop} Origin: {self.origin}"


class BgpVrf(BaseModel):
"""Model representing a VRF in a BGP instance."""

vrf: str = "default"
"""VRF context."""
address_families: list[AddressFamilyConfig]
"""List of address family configuration."""

def __str__(self) -> str:
"""Return a human-readable string representation of the BgpVrf for reporting.
Examples
--------
- VRF: default
"""
return f"VRF: {self.vrf}"


class RedistributedRouteConfig(BaseModel):
"""Model representing a BGP redistributed route configuration."""

proto: RedistributedProtocol
"""The redistributed protocol."""
include_leaked: bool = False
"""Flag to include leaked routes of the redistributed protocol while redistributing."""
route_map: str | None = None
"""Optional route map applied to the redistribution."""

@model_validator(mode="after")
def validate_inputs(self) -> Self:
"""Validate that 'include_leaked' is not set when the redistributed protocol is AttachedHost, User, Dynamic, or RIP."""
if self.include_leaked and self.proto in ["AttachedHost", "EOS SDK", "Dynamic", "RIP"]:
msg = f"'include_leaked' field is not supported for redistributed protocol '{self.proto}'"
raise ValueError(msg)
return self

def __str__(self) -> str:
"""Return a human-readable string representation of the RedistributedRouteConfig for reporting.
Examples
--------
- Proto: Connected, Include Leaked: True, Route Map: RM-CONN-2-BGP
"""
base_string = f"Proto: {self.proto}"
if self.include_leaked:
base_string += f", Include Leaked: {self.include_leaked}"
if self.route_map:
base_string += f", Route Map: {self.route_map}"
return base_string


class AddressFamilyConfig(BaseModel):
"""Model representing a BGP address family configuration."""

afi_safi: RedistributedAfiSafi
"""AFI/SAFI abbreviation per EOS."""
redistributed_routes: list[RedistributedRouteConfig]
"""List of redistributed route configuration."""

def __str__(self) -> str:
"""Return a human-readable string representation of the AddressFamilyConfig for reporting.
Examples
--------
- AFI-SAFI: IPv4 Unicast
"""
mappings = {"v4u": "IPv4 Unicast", "v4m": "IPv4 Multicast", "v6u": "IPv6 Unicast", "v6m": "IPv6 Multicast"}
return f"AFI-SAFI: {mappings[self.afi_safi]}"
98 changes: 97 additions & 1 deletion anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

from pydantic import field_validator

from anta.input_models.routing.bgp import BgpAddressFamily, BgpAfi, BgpNeighbor, BgpPeer, BgpRoute, VxlanEndpoint
from anta.input_models.routing.bgp import BgpAddressFamily, BgpAfi, BgpNeighbor, BgpPeer, BgpRoute, BgpVrf, VxlanEndpoint
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import format_data, get_item, get_value

# Using a TypeVar for the BgpPeer model since mypy thinks it's a ClassVar and not a valid type when used in field validators
T = TypeVar("T", bound=BgpPeer)

# TODO: Refactor to reduce the number of lines in this module later


def _check_bgp_neighbor_capability(capability_status: dict[str, bool]) -> bool:
"""Check if a BGP neighbor capability is advertised, received, and enabled.
Expand Down Expand Up @@ -1797,3 +1799,97 @@ def test(self) -> None:
# Verify BGP and RIB nexthops are same.
if len(bgp_nexthops) != len(route_entry["vias"]):
self.result.is_failure(f"{route} - Nexthops count mismatch - BGP: {len(bgp_nexthops)}, RIB: {len(route_entry['vias'])}")


class VerifyBGPRedistribution(AntaTest):
"""Verifies BGP redistribution.
This test performs the following checks for each specified VRF in the BGP instance:
1. Ensures that the expected address-family is configured on the device.
2. Confirms that the redistributed route protocol, include leaked and route map match the expected values.
Expected Results
----------------
* Success: If all of the following conditions are met:
- The expected address-family is configured on the device.
- The redistributed route protocol, include leaked and route map align with the expected values for the route.
* Failure: If any of the following occur:
- The expected address-family is not configured on device.
- The redistributed route protocol, include leaked or route map does not match the expected values.
Examples
--------
```yaml
anta.tests.routing:
bgp:
- VerifyBGPRedistribution:
vrfs:
- vrf: default
address_families:
- afi_safi: ipv4Unicast
redistributed_routes:
- proto: Connected
include_leaked: True
route_map: RM-CONN-2-BGP
- proto: Static
include_leaked: True
route_map: RM-CONN-2-BGP
- afi_safi: IPv6 Unicast
redistributed_routes:
- proto: User # Converted to EOS SDK
route_map: RM-CONN-2-BGP
- proto: Static
include_leaked: True
route_map: RM-CONN-2-BGP
```
"""

categories: ClassVar[list[str]] = ["bgp"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bgp instance vrf all", revision=4)]

class Input(AntaTest.Input):
"""Input model for the VerifyBGPRedistribution test."""

vrfs: list[BgpVrf]
"""List of VRFs in the BGP instance."""

def _validate_redistribute_route(self, vrf_data: str, addr_family: str, afi_safi_configs: list[dict[str, Any]], route_info: dict[str, Any]) -> list[Any]:
"""Validate the redstributed route details for a given address family."""
failure_msg = []
# If the redistributed route protocol does not match the expected value, test fails.
if not (actual_route := get_item(afi_safi_configs.get("redistributedRoutes"), "proto", route_info.proto)):
failure_msg.append(f"{vrf_data}, {addr_family}, Proto: {route_info.proto} - Not configured")
return failure_msg

# If includes leaked field applicable, and it does not matches the expected value, test fails.
if (act_include_leaked := actual_route.get("includeLeaked", False)) != route_info.include_leaked:
failure_msg.append(f"{vrf_data}, {addr_family}, {route_info} - Include leaked mismatch - Actual: {act_include_leaked}")

# If route map is required and it is not matching the expected value, test fails.
if all([route_info.route_map, (act_route_map := actual_route.get("routeMap", "Not Found")) != route_info.route_map]):
failure_msg.append(f"{vrf_data}, {addr_family}, {route_info} - Route map mismatch - Actual: {act_route_map}")
return failure_msg

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBGPRedistribution."""
self.result.is_success()
command_output = self.instance_commands[0].json_output

for vrf_data in self.inputs.vrfs:
# If the specified VRF details are not found, test fails.
if not (instance_details := get_value(command_output, f"vrfs.{vrf_data.vrf}")):
self.result.is_failure(f"{vrf_data} - Not configured")
continue
for address_family in vrf_data.address_families:
# If the AFI-SAFI configuration details are not found, test fails.
if not (afi_safi_configs := get_value(instance_details, f"afiSafiConfig.{address_family.afi_safi}")):
self.result.is_failure(f"{vrf_data}, {address_family} - Not redistributed")
continue

for route_info in address_family.redistributed_routes:
failure_msg = self._validate_redistribute_route(str(vrf_data), str(address_family), afi_safi_configs, route_info)
for msg in failure_msg:
self.result.is_failure(msg)
20 changes: 20 additions & 0 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,26 @@ anta.tests.routing.bgp:
- VerifyBGPPeersHealthRibd:
# Verifies the health of all the BGP IPv4 peer(s).
check_tcp_queues: True
- VerifyBGPRedistribution:
# Verifies BGP redistribution.
vrfs:
- vrf: default
address_families:
- afi_safi: ipv4Unicast
redistributed_routes:
- proto: Connected
include_leaked: True
route_map: RM-CONN-2-BGP
- proto: Static
include_leaked: True
route_map: RM-CONN-2-BGP
- afi_safi: IPv6 Unicast
redistributed_routes:
- proto: User # Converted to EOS SDK
route_map: RM-CONN-2-BGP
- proto: Static
include_leaked: True
route_map: RM-CONN-2-BGP
- VerifyBGPRouteECMP:
# Verifies BGP IPv4 route ECMP paths.
route_entries:
Expand Down
Loading

0 comments on commit 6eabc52

Please sign in to comment.