From 6a4301d7fcbf8b0653a1ffc5300598c4dfdd4a42 Mon Sep 17 00:00:00 2001 From: joon Date: Sun, 2 Jun 2024 11:24:21 -0700 Subject: [PATCH] Decouple geoip and ASN lookups from Shodan results --- pyproject.toml | 2 +- wtfis/clients/base.py | 4 +- wtfis/clients/ipwhois.py | 9 +-- wtfis/clients/types.py | 22 ++++++++ wtfis/handlers/base.py | 31 ++++++---- wtfis/handlers/domain.py | 35 +++++++----- wtfis/handlers/ip.py | 20 ++++--- wtfis/main.py | 28 ++++++---- wtfis/models/{common.py => base.py} | 33 ++++++++++- wtfis/models/ip2whois.py | 2 +- wtfis/models/ipwhois.py | 48 ++++++---------- wtfis/models/passivetotal.py | 2 +- wtfis/models/shodan.py | 8 ++- wtfis/{ => models}/types.py | 22 ++++++-- wtfis/models/virustotal.py | 2 +- wtfis/ui/base.py | 87 ++++++++++++++++------------- wtfis/ui/view.py | 73 ++++++++++++------------ 17 files changed, 259 insertions(+), 169 deletions(-) create mode 100644 wtfis/clients/types.py rename wtfis/models/{common.py => base.py} (56%) rename wtfis/{ => models}/types.py (55%) diff --git a/pyproject.toml b/pyproject.toml index 7cf7ae1..574f5ca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ "Topic :: Security", ] dependencies = [ - "pydantic~=2.7.0", + "pydantic~=2.7.2", "python-dotenv~=1.0.1", "requests~=2.31.0", "rich~=13.7.1", diff --git a/wtfis/clients/base.py b/wtfis/clients/base.py index b80e127..640a97a 100644 --- a/wtfis/clients/base.py +++ b/wtfis/clients/base.py @@ -4,8 +4,8 @@ from typing import Optional, Union -from wtfis.models.common import WhoisBase -from wtfis.types import DomainEnrichmentType, IpEnrichmentType +from wtfis.models.base import WhoisBase +from wtfis.models.types import DomainEnrichmentType, IpEnrichmentType class AbstractAttribute: diff --git a/wtfis/clients/ipwhois.py b/wtfis/clients/ipwhois.py index 26bfc6b..7a13115 100644 --- a/wtfis/clients/ipwhois.py +++ b/wtfis/clients/ipwhois.py @@ -1,5 +1,6 @@ from wtfis.clients.base import BaseIpEnricherClient, BaseRequestsClient -from wtfis.models.ipwhois import IpWhois, IpWhoisMap +from wtfis.models.ipwhois import IpWhoisMap +from wtfis.models.ipwhois import IpWhois from typing import Optional @@ -19,9 +20,9 @@ def _get_ipwhois(self, ip: str) -> Optional[IpWhois]: return IpWhois.model_validate(result) if result.get("success") is True else None def enrich_ips(self, *ips: str) -> IpWhoisMap: - ipwhois_map = {} + map_ = {} for ip in ips: ipwhois = self._get_ipwhois(ip) if ipwhois: - ipwhois_map[ipwhois.ip] = ipwhois - return IpWhoisMap.model_validate(ipwhois_map) + map_[ipwhois.ip] = ipwhois + return IpWhoisMap.model_validate(map_) diff --git a/wtfis/clients/types.py b/wtfis/clients/types.py new file mode 100644 index 0000000..9a72a3d --- /dev/null +++ b/wtfis/clients/types.py @@ -0,0 +1,22 @@ +""" +Type aliases +""" +from typing import Union + +from wtfis.clients.ip2whois import Ip2WhoisClient +from wtfis.clients.ipwhois import IpWhoisClient +from wtfis.clients.passivetotal import PTClient +from wtfis.clients.virustotal import VTClient + + +# IP geolocation and ASN client types +IpGeoAsnClientType = Union[ + IpWhoisClient, +] + +# IP whois client types +IpWhoisClientType = Union[ + Ip2WhoisClient, + PTClient, + VTClient, +] diff --git a/wtfis/handlers/base.py b/wtfis/handlers/base.py index c962800..e6c0c39 100644 --- a/wtfis/handlers/base.py +++ b/wtfis/handlers/base.py @@ -15,19 +15,17 @@ from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient -from wtfis.clients.ip2whois import Ip2WhoisClient -from wtfis.clients.ipwhois import IpWhoisClient -from wtfis.clients.passivetotal import PTClient from wtfis.clients.shodan import ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient from wtfis.models.abuseipdb import AbuseIpDbMap -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase from wtfis.models.greynoise import GreynoiseIpMap -from wtfis.models.ipwhois import IpWhoisMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap +from wtfis.models.types import IpGeoAsnMapType from wtfis.models.virustotal import Domain, IpAddress +from wtfis.clients.types import IpGeoAsnClientType, IpWhoisClientType from wtfis.ui.theme import Theme from wtfis.utils import error_and_exit, refang @@ -69,8 +67,9 @@ def __init__( console: Console, progress: Progress, vt_client: VTClient, - ip_enricher_client: Union[IpWhoisClient, ShodanClient], - whois_client: Union[Ip2WhoisClient, PTClient, VTClient], + ip_geoasn_client: IpGeoAsnClientType, + whois_client: IpWhoisClientType, + shodan_client: Optional[ShodanClient], greynoise_client: Optional[GreynoiseClient], abuseipdb_client: Optional[AbuseIpDbClient], urlhaus_client: Optional[UrlHausClient], @@ -82,16 +81,18 @@ def __init__( # Clients self._vt = vt_client - self._enricher = ip_enricher_client + self._geoasn = ip_geoasn_client self._whois = whois_client + self._shodan = shodan_client self._greynoise = greynoise_client self._abuseipdb = abuseipdb_client self._urlhaus = urlhaus_client # Dataset containers self.vt_info: Union[Domain, IpAddress] - self.ip_enrich: Union[IpWhoisMap, ShodanIpMap] = IpWhoisMap.empty() + self.geoasn: IpGeoAsnMapType self.whois: WhoisBase + self.shodan: ShodanIpMap = ShodanIpMap.empty() self.greynoise: GreynoiseIpMap = GreynoiseIpMap.empty() self.abuseipdb: AbuseIpDbMap = AbuseIpDbMap.empty() self.urlhaus: UrlHausMap = UrlHausMap.empty() @@ -105,9 +106,15 @@ def fetch_data(self) -> None: return NotImplemented # type: ignore # pragma: no coverage @common_exception_handler - @failopen_exception_handler("_enricher") - def _fetch_ip_enrichments(self, *ips: str) -> None: - self.ip_enrich = self._enricher.enrich_ips(*ips) + @failopen_exception_handler("_geoasn") + def _fetch_geoasn(self, *ips: str) -> None: + self.geoasn = self._geoasn.enrich_ips(*ips) + + @common_exception_handler + @failopen_exception_handler("_shodan") + def _fetch_shodan(self, *ips: str) -> None: + if self._shodan: + self.shodan = self._shodan.enrich_ips(*ips) @common_exception_handler @failopen_exception_handler("_greynoise") diff --git a/wtfis/handlers/domain.py b/wtfis/handlers/domain.py index 5f8db3c..11431ed 100644 --- a/wtfis/handlers/domain.py +++ b/wtfis/handlers/domain.py @@ -1,7 +1,7 @@ """ Logic handler for domain and hostname inputs """ -from typing import Optional, Union +from typing import Optional from requests.exceptions import HTTPError from rich.console import Console @@ -9,9 +9,6 @@ from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient -from wtfis.clients.ip2whois import Ip2WhoisClient -from wtfis.clients.ipwhois import IpWhoisClient -from wtfis.clients.passivetotal import PTClient from wtfis.clients.shodan import ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient @@ -23,6 +20,7 @@ ) from wtfis.models.virustotal import Resolutions +from wtfis.clients.types import IpGeoAsnClientType, IpWhoisClientType class DomainHandler(BaseHandler): @@ -32,15 +30,16 @@ def __init__( console: Console, progress: Progress, vt_client: VTClient, - ip_enricher_client: Union[IpWhoisClient, ShodanClient], - whois_client: Union[Ip2WhoisClient, PTClient, VTClient], + ip_geoasn_client: IpGeoAsnClientType, + whois_client: IpWhoisClientType, + shodan_client: Optional[ShodanClient], greynoise_client: Optional[GreynoiseClient], abuseipdb_client: Optional[AbuseIpDbClient], urlhaus_client: Optional[UrlHausClient], max_resolutions: int = 0, ): - super().__init__(entity, console, progress, vt_client, ip_enricher_client, - whois_client, greynoise_client, abuseipdb_client, urlhaus_client) + super().__init__(entity, console, progress, vt_client, ip_geoasn_client, whois_client, + shodan_client, greynoise_client, abuseipdb_client, urlhaus_client) # Extended attributes self.max_resolutions = max_resolutions @@ -77,25 +76,31 @@ def fetch_data(self): self.progress.update(task_v, completed=100) if self.resolutions and self.resolutions.data: - task_r = self.progress.add_task(f"Fetching IP enrichments from {self._enricher.name}") - self.progress.update(task_r, advance=50) - self._fetch_ip_enrichments(*self.resolutions.ip_list(self.max_resolutions)) - self.progress.update(task_r, completed=100) + task_g = self.progress.add_task(f"Fetching IP location and ASN from {self._geoasn.name}") + self.progress.update(task_g, advance=50) + self._fetch_geoasn(*self.resolutions.ip_list(self.max_resolutions)) + self.progress.update(task_g, completed=100) + + if self._shodan: + task_s = self.progress.add_task(f"Fetching IP data from {self._shodan.name}") + self.progress.update(task_s, advance=50) + self._fetch_shodan(*self.resolutions.ip_list(self.max_resolutions)) + self.progress.update(task_s, completed=100) if self._greynoise: - task_g = self.progress.add_task(f"Fetching IP enrichments from {self._greynoise.name}") + task_g = self.progress.add_task(f"Fetching IP data from {self._greynoise.name}") self.progress.update(task_g, advance=50) self._fetch_greynoise(*self.resolutions.ip_list(self.max_resolutions)) self.progress.update(task_g, completed=100) if self._abuseipdb: - task_g = self.progress.add_task(f"Fetching IP enrichments from {self._abuseipdb.name}") + task_g = self.progress.add_task(f"Fetching IP data from {self._abuseipdb.name}") self.progress.update(task_g, advance=50) self._fetch_abuseipdb(*self.resolutions.ip_list(self.max_resolutions)) self.progress.update(task_g, completed=100) if self._urlhaus: - task_u = self.progress.add_task(f"Fetching domain enrichments from {self._urlhaus.name}") + task_u = self.progress.add_task(f"Fetching domain data from {self._urlhaus.name}") self.progress.update(task_u, advance=50) self._fetch_urlhaus() self.progress.update(task_u, completed=100) diff --git a/wtfis/handlers/ip.py b/wtfis/handlers/ip.py index 088268c..118948f 100644 --- a/wtfis/handlers/ip.py +++ b/wtfis/handlers/ip.py @@ -25,25 +25,31 @@ def fetch_data(self): self._fetch_vt_ip_address() self.progress.update(task_v, completed=100) - task_i = self.progress.add_task(f"Fetching IP enrichments from {self._enricher.name}") - self.progress.update(task_i, advance=50) - self._fetch_ip_enrichments(self.entity) - self.progress.update(task_i, completed=100) + task_g = self.progress.add_task(f"Fetching IP location and ASN from {self._geoasn.name}") + self.progress.update(task_g, advance=50) + self._fetch_geoasn(self.entity) + self.progress.update(task_g, completed=100) + + if self._shodan: + task_s = self.progress.add_task(f"Fetching IP data from {self._shodan.name}") + self.progress.update(task_s, advance=50) + self._fetch_shodan(self.entity) + self.progress.update(task_s, completed=100) if self._urlhaus: - task_u = self.progress.add_task(f"Fetching IP enrichments from {self._urlhaus.name}") + task_u = self.progress.add_task(f"Fetching IP data from {self._urlhaus.name}") self.progress.update(task_u, advance=50) self._fetch_urlhaus() self.progress.update(task_u, completed=100) if self._greynoise: - task_g = self.progress.add_task(f"Fetching IP enrichments from {self._greynoise.name}") + task_g = self.progress.add_task(f"Fetching IP data from {self._greynoise.name}") self.progress.update(task_g, advance=50) self._fetch_greynoise(self.entity) self.progress.update(task_g, completed=100) if self._abuseipdb: - task_a = self.progress.add_task(f"Fetching IP enrichments from {self._abuseipdb.name}") + task_a = self.progress.add_task(f"Fetching IP data from {self._abuseipdb.name}") self.progress.update(task_a, advance=50) self._fetch_abuseipdb(self.entity) self.progress.update(task_a, completed=100) diff --git a/wtfis/main.py b/wtfis/main.py index beaad17..51377cc 100644 --- a/wtfis/main.py +++ b/wtfis/main.py @@ -8,7 +8,6 @@ from rich.progress import Progress from typing import Union from wtfis.clients.abuseipdb import AbuseIpDbClient - from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ip2whois import Ip2WhoisClient from wtfis.clients.ipwhois import IpWhoisClient @@ -108,12 +107,9 @@ def generate_entity_handler( # Virustotal client vt_client = VTClient(os.environ["VT_API_KEY"]) - # IP enrichment client selector - enricher_client: Union[IpWhoisClient, ShodanClient] = ( - ShodanClient(os.environ["SHODAN_API_KEY"]) - if args.use_shodan - else IpWhoisClient() - ) + # IP geolocation and ASN client selector + # TODO: add more options + ip_geoasn_client = IpWhoisClient() # Whois client selector # Order of use based on set envvars: @@ -129,6 +125,12 @@ def generate_entity_handler( else: whois_client = vt_client + shodan_client = ( + ShodanClient(os.environ["SHODAN_API_KEY"]) + if args.use_shodan + else None + ) + # Greynoise client (optional) greynoise_client = ( GreynoiseClient(os.environ["GREYNOISE_API_KEY"]) @@ -156,8 +158,9 @@ def generate_entity_handler( console=console, progress=progress, vt_client=vt_client, - ip_enricher_client=enricher_client, + ip_geoasn_client=ip_geoasn_client, whois_client=whois_client, + shodan_client=shodan_client, greynoise_client=greynoise_client, abuseipdb_client=abuseipdb_client, urlhaus_client=urlhaus_client, @@ -170,8 +173,9 @@ def generate_entity_handler( console=console, progress=progress, vt_client=vt_client, - ip_enricher_client=enricher_client, + ip_geoasn_client=ip_geoasn_client, whois_client=whois_client, + shodan_client=shodan_client, greynoise_client=greynoise_client, abuseipdb_client=abuseipdb_client, urlhaus_client=urlhaus_client, @@ -191,8 +195,9 @@ def generate_view( console, entity.vt_info, entity.resolutions, + entity.geoasn, entity.whois, - entity.ip_enrich, + entity.shodan, entity.greynoise, entity.abuseipdb, entity.urlhaus, @@ -202,8 +207,9 @@ def generate_view( view = IpAddressView( console, entity.vt_info, + entity.geoasn, entity.whois, - entity.ip_enrich, + entity.shodan, entity.greynoise, entity.abuseipdb, entity.urlhaus, diff --git a/wtfis/models/common.py b/wtfis/models/base.py similarity index 56% rename from wtfis/models/common.py rename to wtfis/models/base.py index 3ad9bdb..723cea1 100644 --- a/wtfis/models/common.py +++ b/wtfis/models/base.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import abc import sys -from pydantic import BaseModel, BeforeValidator +from pydantic import BaseModel, BeforeValidator, ConfigDict, RootModel from pydantic.v1.validators import str_validator -from typing import List, Optional +from typing import List, Mapping, Optional if sys.version_info >= (3, 9): from typing import Annotated @@ -33,3 +35,30 @@ class WhoisBase(BaseModel, abc.ABC): date_changed: Optional[str] = None date_expires: Optional[str] = None dnssec: Optional[str] = None + + +class IpGeoAsnBase(BaseModel, abc.ABC): + """ Use to normalize IP geolocation and ASN fields """ + model_config = ConfigDict(coerce_numbers_to_str=True) + + ip: str + + # Geolocation + city: Optional[str] + continent: Optional[str] + country: Optional[str] + region: Optional[str] + + # ASN + asn: Optional[str] + org: Optional[str] + isp: Optional[str] + domain: Optional[str] + + +class IpGeoAsnMapBase(RootModel, abc.ABC): + root: Mapping[str, IpGeoAsnBase] + + @classmethod + def empty(cls) -> IpGeoAsnMapBase: + return cls.model_validate({}) diff --git a/wtfis/models/ip2whois.py b/wtfis/models/ip2whois.py index f7abb06..7a25fe9 100644 --- a/wtfis/models/ip2whois.py +++ b/wtfis/models/ip2whois.py @@ -1,7 +1,7 @@ from pydantic import Field, field_validator, model_validator from typing import List, Optional -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase class Whois(WhoisBase): diff --git a/wtfis/models/ipwhois.py b/wtfis/models/ipwhois.py index c3897f7..bab1d88 100644 --- a/wtfis/models/ipwhois.py +++ b/wtfis/models/ipwhois.py @@ -1,45 +1,33 @@ +""" +ipwhois datamodels +API doc: https://ipwhois.io/documentation +""" from __future__ import annotations -from pydantic import BaseModel, Field, RootModel from typing import Dict -from wtfis.models.common import LaxStr +from pydantic import AliasPath, Field +from wtfis.models.base import IpGeoAsnBase, IpGeoAsnMapBase -class Flag(BaseModel): - img: str - emoji: str - emoji_unicode: str +class IpWhois(IpGeoAsnBase): + # Metadata + source: str = "IPWhois" -class Connection(BaseModel): - asn: LaxStr - org: str - isp: str - domain: str - - -class IpWhois(BaseModel): + # Results ip: str - success: bool - type_: str = Field(alias="type") - continent: str - continent_code: str - country: str - country_code: str - region: str - region_code: str city: str - is_eu: bool - postal: str - calling_code: str - capital: str - borders: str - flag: Flag - connection: Connection + region: str + country: str + continent: str + asn: str = Field(validation_alias=AliasPath("connection", "asn")) + org: str = Field(validation_alias=AliasPath("connection", "org")) + isp: str = Field(validation_alias=AliasPath("connection", "isp")) + domain: str = Field(validation_alias=AliasPath("connection", "domain")) -class IpWhoisMap(RootModel): +class IpWhoisMap(IpGeoAsnMapBase): root: Dict[str, IpWhois] @classmethod diff --git a/wtfis/models/passivetotal.py b/wtfis/models/passivetotal.py index 2fe59b9..717bb2f 100644 --- a/wtfis/models/passivetotal.py +++ b/wtfis/models/passivetotal.py @@ -1,7 +1,7 @@ from pydantic import Field, model_validator from typing import List, Optional -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase class Whois(WhoisBase): diff --git a/wtfis/models/shodan.py b/wtfis/models/shodan.py index cd57696..0282720 100644 --- a/wtfis/models/shodan.py +++ b/wtfis/models/shodan.py @@ -1,8 +1,10 @@ +from __future__ import annotations + from collections import defaultdict, namedtuple from pydantic import BaseModel, RootModel from typing import Dict, List, Optional -from wtfis.models.common import LaxStr +from wtfis.models.base import LaxStr class PortData(BaseModel): @@ -43,3 +45,7 @@ def group_ports_by_product(self) -> dict: class ShodanIpMap(RootModel): root: Dict[str, ShodanIp] + + @classmethod + def empty(cls) -> ShodanIpMap: + return cls.model_validate({}) diff --git a/wtfis/types.py b/wtfis/models/types.py similarity index 55% rename from wtfis/types.py rename to wtfis/models/types.py index 0ff68da..73d2241 100644 --- a/wtfis/types.py +++ b/wtfis/models/types.py @@ -5,18 +5,30 @@ from wtfis.models.abuseipdb import AbuseIpDbMap from wtfis.models.greynoise import GreynoiseIpMap -from wtfis.models.ipwhois import IpWhoisMap +from wtfis.models.ipwhois import IpWhois, IpWhoisMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap -# IP enrichment types + +# IP enrichment map types IpEnrichmentType = Union[ + AbuseIpDbMap, GreynoiseIpMap, IpWhoisMap, ShodanIpMap, UrlHausMap, - AbuseIpDbMap, ] -# Domain/FQDN enrichment types -DomainEnrichmentType = UrlHausMap +# Domain/FQDN enrichment map types +DomainEnrichmentType = Union[ + UrlHausMap, +] + +# IP geolocation and ASN types +IpGeoAsnType = Union[ + IpWhois, +] + +IpGeoAsnMapType = Union[ + IpWhoisMap, +] diff --git a/wtfis/models/virustotal.py b/wtfis/models/virustotal.py index 527185a..d85fe8f 100644 --- a/wtfis/models/virustotal.py +++ b/wtfis/models/virustotal.py @@ -1,7 +1,7 @@ from pydantic import BaseModel, Field, RootModel, field_validator, model_validator from typing import Any, Dict, List, Optional -from wtfis.models.common import LaxStr, WhoisBase +from wtfis.models.base import LaxStr, WhoisBase class BaseData(BaseModel): diff --git a/wtfis/ui/base.py b/wtfis/ui/base.py index 893f957..2f844bc 100644 --- a/wtfis/ui/base.py +++ b/wtfis/ui/base.py @@ -11,11 +11,10 @@ from rich.text import Text from typing import Any, Generator, List, Optional, Tuple, Union from wtfis.models.abuseipdb import AbuseIpDb, AbuseIpDbMap - -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase from wtfis.models.greynoise import GreynoiseIp, GreynoiseIpMap -from wtfis.models.ipwhois import IpWhois, IpWhoisMap from wtfis.models.shodan import ShodanIp, ShodanIpMap +from wtfis.models.types import IpGeoAsnMapType, IpGeoAsnType from wtfis.models.virustotal import ( LastAnalysisStats, PopularityRanks, @@ -38,16 +37,18 @@ def __init__( self, console: Console, entity: Any, + ip_geoasn: IpGeoAsnMapType, whois: Optional[WhoisBase], - ip_enrich: Union[IpWhoisMap, ShodanIpMap], + shodan: ShodanIpMap, greynoise: GreynoiseIpMap, abuseipdb: AbuseIpDbMap, urlhaus: UrlHausMap, ) -> None: self.console = console self.entity = entity + self.ip_geoasn = ip_geoasn self.whois = whois - self.ip_enrich = ip_enrich + self.shodan = shodan self.greynoise = greynoise self.abuseipdb = abuseipdb self.urlhaus = urlhaus @@ -298,8 +299,11 @@ def _gen_asn_text( .append(")")) return text - def _get_ip_enrichment(self, ip: str) -> Optional[Union[IpWhois, ShodanIp]]: - return self.ip_enrich.root[ip] if ip in self.ip_enrich.root.keys() else None + def _get_geoasn_enrichment(self, ip: str) -> Optional[IpGeoAsnType]: + return self.ip_geoasn.root[ip] if ip in self.ip_geoasn.root.keys() else None + + def _get_shodan_enrichment(self, ip: str) -> Optional[ShodanIp]: + return self.shodan.root[ip] if ip in self.shodan.root.keys() else None def _get_greynoise_enrichment(self, ip: str) -> Optional[GreynoiseIp]: return self.greynoise.root[ip] if ip in self.greynoise.root.keys() else None @@ -363,41 +367,48 @@ def _gen_vt_section(self) -> RenderableType: self._gen_heading_text("VirusTotal") ) - def _gen_ip_enrich_section(self) -> Optional[RenderableType]: - """ IP enrichment section. Applies to IP views only """ - enrich = self._get_ip_enrichment(self.entity.data.id_) + def _gen_ip_geoasn_section(self) -> Optional[RenderableType]: + """ IP location and ASN section. Applies to IP views only """ + enrich = self._get_geoasn_enrichment(self.entity.data.id_) data: List[Tuple[Union[str, Text], Union[RenderableType, None]]] = [] if enrich: - if isinstance(enrich, IpWhois): - # IPWhois - section_title = "IPwhois" - asn = self._gen_asn_text(enrich.connection.asn, enrich.connection.org) - data += [ - ("ASN:", asn), - ("ISP:", enrich.connection.isp), - ("Location:", smart_join(enrich.city, enrich.region, enrich.country)), - ] - else: - # Shodan - section_title = "Shodan" - asn = self._gen_asn_text(enrich.asn, enrich.org) - tags = smart_join(*enrich.tags, style=self.theme.tags) if enrich.tags else None - services_field = self._gen_linked_field_name( - "Services", - hyperlink=f"{self.shodan_gui_baseurl}/{self.entity.data.id_}" - ) - data += [ - ("ASN:", asn), - ("ISP:", enrich.isp), - ("Location:", smart_join(enrich.city, enrich.region_name, enrich.country_name)), - ("OS:", enrich.os), - (services_field, self._gen_shodan_services(enrich)), - ("Tags:", tags), - ("Last Scan:", Timestamp(f"{enrich.last_update}+00:00").render), # Timestamps are UTC - # (source: Google) - ] + section_title = enrich.source + asn = self._gen_asn_text(enrich.asn, enrich.org) + data += [ + ("ASN:", asn), + ("ISP:", enrich.isp), + ("Location:", smart_join(enrich.city, enrich.region, enrich.country)), + ] + return self._gen_section( + + self._gen_table(*data), + self._gen_heading_text(section_title) + ) + + return None # No enrichment data + + def _gen_shodan_section(self) -> Optional[RenderableType]: + """ Shodan section. Applies to IP views only """ + enrich = self._get_shodan_enrichment(self.entity.data.id_) + + data: List[Tuple[Union[str, Text], Union[RenderableType, None]]] = [] + + if enrich: + section_title = "Shodan" + tags = smart_join(*enrich.tags, style=self.theme.tags) if enrich.tags else None + services_field = self._gen_linked_field_name( + "Services", + hyperlink=f"{self.shodan_gui_baseurl}/{self.entity.data.id_}" + ) + data += [ + ("OS:", enrich.os), + (services_field, self._gen_shodan_services(enrich)), + ("Tags:", tags), + ("Last Scan:", Timestamp(f"{enrich.last_update}+00:00").render), # Timestamps are UTC + # (source: Google) + ] return self._gen_section( self._gen_table(*data), diff --git a/wtfis/ui/view.py b/wtfis/ui/view.py index d34653f..6bfb9c9 100644 --- a/wtfis/ui/view.py +++ b/wtfis/ui/view.py @@ -10,11 +10,11 @@ from typing import List, Optional, Tuple, Union from wtfis.models.abuseipdb import AbuseIpDbMap -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase from wtfis.models.greynoise import GreynoiseIpMap -from wtfis.models.ipwhois import IpWhois, IpWhoisMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap +from wtfis.models.types import IpGeoAsnMapType from wtfis.models.virustotal import ( Domain, IpAddress, @@ -34,14 +34,15 @@ def __init__( console: Console, entity: Domain, resolutions: Optional[Resolutions], + ip_geoasn: IpGeoAsnMapType, whois: WhoisBase, - ip_enrich: Union[IpWhoisMap, ShodanIpMap], + shodan: ShodanIpMap, greynoise: GreynoiseIpMap, abuseipdb: AbuseIpDbMap, urlhaus: UrlHausMap, max_resolutions: int = 3, ) -> None: - super().__init__(console, entity, whois, ip_enrich, greynoise, abuseipdb, urlhaus) + super().__init__(console, entity, ip_geoasn, whois, shodan, greynoise, abuseipdb, urlhaus) self.resolutions = resolutions self.max_resolutions = max_resolutions @@ -87,40 +88,34 @@ def resolutions_panel(self) -> Optional[Panel]: ("Resolved:", Timestamp(attributes.date).render), ] - # IP Enrichment - enrich = self._get_ip_enrichment(attributes.ip_address) - - if enrich: - if isinstance(enrich, IpWhois): - # IPWhois - asn = self._gen_asn_text(enrich.connection.asn, enrich.connection.org) - data += [ - ("ASN:", asn), - ("ISP:", enrich.connection.isp), - ("Location:", smart_join(enrich.city, enrich.region, enrich.country)), - ] - else: - # Shodan - asn = self._gen_asn_text(enrich.asn, enrich.org) - tags = smart_join(*enrich.tags, style=self.theme.tags) if enrich.tags else None - services_field = self._gen_linked_field_name( - "Services", - hyperlink=f"{self.shodan_gui_baseurl}/{attributes.ip_address}" - ) - data += [ - ("ASN:", asn), - ("ISP:", enrich.isp), - ("Location:", smart_join(enrich.city, enrich.region_name, enrich.country_name)), - ("OS:", enrich.os), - (services_field, self._gen_shodan_services(enrich)), - ("Tags:", tags), - ("Last Scan:", Timestamp(f"{enrich.last_update}+00:00").render), # Timestamps are UTC - # (source: Google) - ] + # IP geolocation and ASN + geoasn = self._get_geoasn_enrichment(attributes.ip_address) + if geoasn: + asn = self._gen_asn_text(geoasn.asn, geoasn.org) + data += [ + ("ASN:", asn), + ("ISP:", geoasn.isp), + ("Location:", smart_join(geoasn.city, geoasn.region, geoasn.country)), + ] + + # Shodan + shodan = self._get_shodan_enrichment(attributes.ip_address) + if shodan: + tags = smart_join(*shodan.tags, style=self.theme.tags) if shodan.tags else None + services_field = self._gen_linked_field_name( + "Services", + hyperlink=f"{self.shodan_gui_baseurl}/{attributes.ip_address}" + ) + data += [ + ("OS:", shodan.os), + (services_field, self._gen_shodan_services(shodan)), + ("Tags:", tags), + # ("Last Scan:", Timestamp(f"{shodan.last_update}+00:00").render), # Timestamps are UTC + # # (source: Google) + ] # Greynoise greynoise = self._get_greynoise_enrichment(attributes.ip_address) - if greynoise: data += [self._gen_greynoise_tuple(greynoise)] @@ -190,18 +185,20 @@ def __init__( self, console: Console, entity: IpAddress, + ip_geoasn: IpGeoAsnMapType, whois: WhoisBase, - ip_enrich: Union[IpWhoisMap, ShodanIpMap], + shodan: ShodanIpMap, greynoise: GreynoiseIpMap, abuseipdb: AbuseIpDbMap, urlhaus: UrlHausMap, ) -> None: - super().__init__(console, entity, whois, ip_enrich, greynoise, abuseipdb, urlhaus) + super().__init__(console, entity, ip_geoasn, whois, shodan, greynoise, abuseipdb, urlhaus) def ip_panel(self) -> Panel: content = [self._gen_vt_section()] # VT section for section in ( - self._gen_ip_enrich_section(), # IP enrich section + self._gen_ip_geoasn_section(), # IP location and ASN section + self._gen_shodan_section(), # Shodan section self._gen_urlhaus_section(), # URLhaus section self._gen_ip_other_section(), # Other section ):