From 4ffb72580181f3c8c4638360e7a37ac641141d85 Mon Sep 17 00:00:00 2001 From: Morten Brekkevold Date: Thu, 18 Jul 2024 15:58:41 +0200 Subject: [PATCH 1/4] Translate InetAddress and IpAddress as ip_address PySNMP doesn't necessarily pretty-print InetAddress and IpAddress value types as anything useful for us (This are textual conventions of an OctetString, and PySNMP tends to just raw-dog the octet string into something like a hexadecimal string representation of the content). If the value type is `InetAddress` or `IpAddress` we can expect `ipaddress.ip_address()` to properly parse the raw bytes into an IPv4Address or IPv6Address object. --- src/zino/snmp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/zino/snmp.py b/src/zino/snmp.py index bcad2c872..c7cea0682 100644 --- a/src/zino/snmp.py +++ b/src/zino/snmp.py @@ -5,6 +5,7 @@ import threading from collections import defaultdict from dataclasses import dataclass +from ipaddress import ip_address from typing import Any, NamedTuple, Sequence, Tuple, Union from pyasn1.type import univ @@ -509,7 +510,7 @@ def _mib_value_to_python(value: SupportedTypes) -> Union[str, int, OID]: value = int(value) if not value.namedValues else value.prettyPrint() elif isinstance(value, univ.OctetString): if type(value).__name__ in ("InetAddress", "IpAddress"): - value = value.prettyPrint() + value = ip_address(bytes(value)) else: value = str(value) elif isinstance(value, (ObjectIdentity, univ.ObjectIdentifier)): From ac5a73ffbb2ab41357d55ae9c72f592af11ef014 Mon Sep 17 00:00:00 2001 From: Morten Brekkevold Date: Thu, 18 Jul 2024 16:01:00 +0200 Subject: [PATCH 2/4] Remove unnecessary usage of `parse_ip()`. `InetAddress` and `IpAddress` MIB values are now expected to be returned as IPv4Address or IPv6Address objects from the SNMP layer. --- src/zino/tasks/bfdtask.py | 14 ++++---------- src/zino/tasks/bgpstatemonitortask.py | 16 ++-------------- tests/tasks/test_bfdtask.py | 2 +- 3 files changed, 7 insertions(+), 25 deletions(-) diff --git a/src/zino/tasks/bfdtask.py b/src/zino/tasks/bfdtask.py index 4e2e239e9..88ad09231 100644 --- a/src/zino/tasks/bfdtask.py +++ b/src/zino/tasks/bfdtask.py @@ -4,9 +4,9 @@ from zino.oid import OID from zino.scheduler import get_scheduler from zino.snmp import SparseWalkResponse -from zino.statemodels import BFDEvent, BFDSessState, BFDState, Port +from zino.statemodels import BFDEvent, BFDSessState, BFDState, IPAddress, Port from zino.tasks.task import Task -from zino.utils import parse_ip, reverse_dns +from zino.utils import reverse_dns _log = logging.getLogger(__name__) @@ -142,19 +142,13 @@ async def _get_single_row(self, session_index: int, *variables: Sequence[str]) - row = {var.object: val for var, val in response} return {OID((session_index,)): row} - def _parse_row(self, index: OID, state: str, discr: int, addr: str) -> BFDState: - try: - ipaddr = parse_ip(addr) - except ValueError as e: - _log.error(f"Error converting addr {addr} to an IP address on device {self.device.name}: {e}") - ipaddr = None - + def _parse_row(self, index: OID, state: str, discr: int, addr: IPAddress) -> BFDState: # convert from OID object to int session_index = int(index[0]) bfd_state = BFDState( session_state=BFDSessState(state), session_index=session_index, session_discr=discr, - session_addr=ipaddr, + session_addr=addr, ) return bfd_state diff --git a/src/zino/tasks/bgpstatemonitortask.py b/src/zino/tasks/bgpstatemonitortask.py index 61be283eb..5e144cc13 100644 --- a/src/zino/tasks/bgpstatemonitortask.py +++ b/src/zino/tasks/bgpstatemonitortask.py @@ -13,7 +13,6 @@ IPAddress, ) from zino.tasks.task import Task -from zino.utils import parse_ip _logger = logging.getLogger(__name__) @@ -159,7 +158,7 @@ async def _get_cisco_bgp_info(self) -> Optional[list[BaseBGPRow]]: return None for oid, result in cisco_bgp_info.items(): - result["cbgpPeer2RemoteAddr"] = oid + result["cbgpPeer2RemoteAddr"] = ip_address(oid) cisco_bgp_info = self._transform_variables_from_specific_to_general( bgp_info=cisco_bgp_info, bgp_style=BGPStyle.CISCO @@ -227,18 +226,7 @@ def _transform_variables_from_specific_to_general( _logger.info(f"router {self.device.name} misses BGP variables ({missing_variables})") return None - results = list() - - # Fix up peer remote address and transform to BaseBGPRow - for result in generalized_bgp_info.values(): - try: - result["peer_remote_address"] = parse_ip(result["peer_remote_address"]) - except ValueError: - _logger.debug(f"{self.device_state.name}: Invalid peer_remote_address {result['peer_remote_address']}") - else: - results.append(BaseBGPRow(**result)) - - return results + return [BaseBGPRow(**result) for result in generalized_bgp_info.values()] def _update_single_bgp_entry(self, data: BaseBGPRow, local_as: int, uptime: int): if data.peer_remote_address in BUGGY_REMOTE_ADDRESSES: diff --git a/tests/tasks/test_bfdtask.py b/tests/tasks/test_bfdtask.py index e955459cd..a62989763 100644 --- a/tests/tasks/test_bfdtask.py +++ b/tests/tasks/test_bfdtask.py @@ -16,7 +16,7 @@ def test_parse_row_creates_correct_bfd_state(self, task, bfd_state): OID(f".{bfd_state.session_index}"), "up", bfd_state.session_discr, - "0x7f000001", + IPv4Address("127.0.0.1"), ) assert state == bfd_state From 83eeb3e25adb433b873b307aa9babb3a2c8da7d0 Mon Sep 17 00:00:00 2001 From: Morten Brekkevold Date: Thu, 18 Jul 2024 16:04:56 +0200 Subject: [PATCH 3/4] Fix incorrect list of BUGGY_REMOTE_ADDRESSES The BGP state monitor task would test for membership in this list using IPv4Address or IPv6Address objects, but the list contained raw strings, so the tests would never match. --- src/zino/tasks/bgpstatemonitortask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zino/tasks/bgpstatemonitortask.py b/src/zino/tasks/bgpstatemonitortask.py index 5e144cc13..af5a9d5e5 100644 --- a/src/zino/tasks/bgpstatemonitortask.py +++ b/src/zino/tasks/bgpstatemonitortask.py @@ -46,9 +46,9 @@ } BUGGY_REMOTE_ADDRESSES = [ # Bug in JunOS -- info from IPv6 BGP sessions spill over - "0.0.0.0", + ip_address("0.0.0.0"), # Bug in earlier Cisco IOS, info from elsewhere (IPv6?) spills over - "32.1.7.0", + ip_address("32.1.7.0"), ] From ee31ed70a257756b262f8703aee066fcbaeff3af Mon Sep 17 00:00:00 2001 From: Johanna England Date: Mon, 22 Jul 2024 14:59:10 +0200 Subject: [PATCH 4/4] Remove obsolete parse_ip function --- src/zino/utils.py | 26 -------------------------- tests/utils_test.py | 43 +------------------------------------------ 2 files changed, 1 insertion(+), 68 deletions(-) diff --git a/src/zino/utils.py b/src/zino/utils.py index 66a82e4b1..06317761b 100644 --- a/src/zino/utils.py +++ b/src/zino/utils.py @@ -3,36 +3,10 @@ import os import stat from functools import wraps -from ipaddress import ip_address from time import time from typing import Optional, Union import aiodns -from pyasn1.type.univ import OctetString - -from zino.statemodels import IPAddress - - -def parse_ip(ip: str) -> IPAddress: - """Parses IPv4 and IPv6 addresses in different formats""" - try: - return ip_address(ip) - except ValueError: - if ip.startswith("0x"): - return _parse_hexa_string_ip(ip) - if ":" in ip: - return _parse_colon_separated_ip(ip) - raise ValueError(f"Input {ip} could not be converted to IP address.") - - -def _parse_hexa_string_ip(ip: str) -> IPAddress: - """Parses IP addresses formatted as hexastrings, e.g. 0x7f000001""" - return ip_address(bytes(OctetString(hexValue=ip[2:]))) - - -def _parse_colon_separated_ip(ip: str) -> IPAddress: - """Parses IP addresses formatted with a colon symbol separating every octet, e.g. 7F:00:00:01""" - return ip_address(bytes(OctetString(hexValue=ip.replace(":", "")))) async def reverse_dns(ip: str) -> Optional[str]: diff --git a/tests/utils_test.py b/tests/utils_test.py index ae9bbf1d8..a0d134b79 100644 --- a/tests/utils_test.py +++ b/tests/utils_test.py @@ -1,53 +1,12 @@ import logging import os import stat -from ipaddress import IPv4Address, IPv6Address from unittest.mock import AsyncMock, MagicMock import aiodns import pytest -from zino.utils import file_is_world_readable, log_time_spent, parse_ip, reverse_dns - - -class TestParseIP: - def test_should_parse_normal_ipv4_string_correctly(self): - ip_string = "127.0.0.1" - ip = parse_ip(ip_string) - assert ip == IPv4Address(ip_string) - - def test_should_parse_normal_ipv6_string_correctly(self): - ip_string = "13c7:db1c:4430:c826:6333:aed0:e605:3a3b" - ip = parse_ip(ip_string) - assert ip == IPv6Address(ip_string) - - def test_should_parse_hexastring_ipv4_correctly(self): - ip = parse_ip("0x7f000001") - assert ip == IPv4Address("127.0.0.1") - - def test_should_parse_hexastring_ipv6_correctly(self): - ip = parse_ip("0x13c7db1c4430c8266333aed0e6053a3b") - assert ip == IPv6Address("13c7:db1c:4430:c826:6333:aed0:e605:3a3b") - - def test_should_parse_colon_separated_ipv4_octets_correctly(self): - ip = parse_ip("7f:00:00:01") - assert ip == IPv4Address("127.0.0.1") - - def test_should_parse_colon_separated_ipv6_octets_correctly(self): - ip = parse_ip("13:c7:db:1c:44:30:c8:26:63:33:ae:d0:e6:05:3a:3b") - assert ip == IPv6Address("13c7:db1c:4430:c826:6333:aed0:e605:3a3b") - - def test_should_raise_valueerror_if_invalid_ip_format(self): - with pytest.raises(ValueError): - parse_ip("invalidformat") - - def test_should_raise_error_if_invalid_hexastring(self): - with pytest.raises(ValueError): - parse_ip("0xinvalidstring") - - def test_should_raise_error_if_invalid_colon_separated_string(self): - with pytest.raises(ValueError): - parse_ip(":thisis:just:randomstuff") +from zino.utils import file_is_world_readable, log_time_spent, reverse_dns class TestReverseDNS: