From 4ec8031095fe5521f59315363be486bacae47f4d Mon Sep 17 00:00:00 2001 From: Zafer Balkan Date: Sat, 9 Mar 2024 23:10:51 +0200 Subject: [PATCH] Support for abuseIPDB (#70) * Implemented abuseIPDB client for IP addresses --- README.md | 8 +++++++ tests/test_cli.py | 30 ++++++++++--------------- tests/test_clients.py | 12 ++++++++++ tests/test_handlers.py | 6 ++++- tests/test_ui_domain_view.py | 19 ++++++++++++++-- tests/test_ui_ip_view.py | 7 ++++++ wtfis/clients/abuseipdb.py | 43 ++++++++++++++++++++++++++++++++++++ wtfis/handlers/base.py | 17 +++++++++++--- wtfis/handlers/domain.py | 15 +++++++++++-- wtfis/handlers/ip.py | 6 +++++ wtfis/main.py | 16 ++++++++++++++ wtfis/models/abuseipdb.py | 31 ++++++++++++++++++++++++++ wtfis/types.py | 3 ++- wtfis/ui/base.py | 37 +++++++++++++++++++++++++++++++ wtfis/ui/view.py | 15 +++++++++++-- 15 files changed, 236 insertions(+), 29 deletions(-) create mode 100644 wtfis/clients/abuseipdb.py create mode 100644 wtfis/models/abuseipdb.py diff --git a/README.md b/README.md index 2f42404..72262bf 100644 --- a/README.md +++ b/README.md @@ -192,6 +192,14 @@ Use the `-u` or `--use-urlhaus` flag to enable URLhaus enrichment for hostnames, The `Malware URLs` field name is a hyperlink (if terminal-supported) that takes you to the specific URLhaus database page for your query. +### AbuseIPDB enrichment + +Use the `-a` or `--use-abuseipdb` flag to enable AbuseIPDB enrichment for hostnames, domains and IPs. + +![image](https://github.com/zbalkan/wtfis/assets/39981909/0d48cfe4-7a99-47ae-980f-47839f4f0a96) + +The `AbuseIPDB` field name is a hyperlink (if terminal-supported) that takes you to the specific AbuseIPDB database page for your query. + ### Display options For FQDN and domain lookups, you can increase or decrease the maximum number of displayed IP resolutions with `-m NUMBER` or `--max-resolutions=NUMBER`. The upper limit is 10. If you don't need resolutions at all, set the number to `0`. diff --git a/tests/test_cli.py b/tests/test_cli.py index 00b8c20..50e2fc4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,18 +1,13 @@ import json import os +from pathlib import Path +from unittest.mock import MagicMock, patch + import pytest from dotenv import load_dotenv -from pathlib import Path from rich.console import Console -from rich.progress import ( - BarColumn, - Progress, - SpinnerColumn, - TaskProgressColumn, - TimeElapsedColumn, - TextColumn, -) -from unittest.mock import patch, MagicMock +from rich.progress import (BarColumn, Progress, SpinnerColumn, + TaskProgressColumn, TextColumn, TimeElapsedColumn) from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ip2whois import Ip2WhoisClient @@ -23,17 +18,11 @@ from wtfis.clients.virustotal import VTClient from wtfis.handlers.domain import DomainHandler from wtfis.handlers.ip import IpAddressHandler -from wtfis.main import ( - generate_entity_handler, - generate_view, - main, - parse_args, - parse_env, -) +from wtfis.main import (generate_entity_handler, generate_view, main, + parse_args, parse_env) from wtfis.models.virustotal import Domain, IpAddress from wtfis.ui.view import DomainView, IpAddressView - POSSIBLE_ENV_VARS = [ "VT_API_KEY", "PT_API_KEY", @@ -41,6 +30,7 @@ "IP2WHOIS_API_KEY", "SHODAN_API_KEY", "GREYNOISE_API_KEY", + "ABUSEIPDB_API_KEY", "WTFIS_DEFAULTS", ] @@ -84,6 +74,7 @@ def fake_load_dotenv_1(tmp_path): "IP2WHOIS_API_KEY": "alice", "SHODAN_API_KEY": "hunter2", "GREYNOISE_API_KEY": "upupdowndown", + "ABUSEIPDB_API_KEY": "dummy", } return fake_load_dotenv(tmp_path, fake_env_vars) @@ -274,6 +265,7 @@ def test_env_file(self, fake_load_dotenv_1): assert os.environ["IP2WHOIS_API_KEY"] == "alice" assert os.environ["SHODAN_API_KEY"] == "hunter2" assert os.environ["GREYNOISE_API_KEY"] == "upupdowndown" + assert os.environ["ABUSEIPDB_API_KEY"] == "dummy" unset_env_vars() @patch("wtfis.main.load_dotenv", MagicMock()) @@ -463,6 +455,7 @@ def test_view_domain_1(self, m_domain_view, test_data): ip_enricher_client=MagicMock(), whois_client=MagicMock(), greynoise_client=MagicMock(), + abuseipdb_client=MagicMock(), urlhaus_client=MagicMock(), ) entity.vt_info = Domain.model_validate(json.loads(test_data("vt_domain_gist.json"))) @@ -482,6 +475,7 @@ def test_view_ip_1(self, m_ip_view, test_data): ip_enricher_client=MagicMock(), whois_client=MagicMock(), greynoise_client=MagicMock(), + abuseipdb_client=MagicMock(), urlhaus_client=MagicMock(), ) entity.vt_info = IpAddress.model_validate(json.loads(test_data("vt_ip_1.1.1.1.json"))) diff --git a/tests/test_clients.py b/tests/test_clients.py index 8d27e6b..871e128 100644 --- a/tests/test_clients.py +++ b/tests/test_clients.py @@ -1,6 +1,7 @@ import json import pytest from unittest.mock import MagicMock, patch +from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.base import requests from wtfis.clients.greynoise import GreynoiseClient @@ -13,6 +14,11 @@ from wtfis.models.ipwhois import IpWhoisMap +@pytest.fixture() +def abuseipdb_client(): + return AbuseIpDbClient("dummykey") + + @pytest.fixture() def greynoise_client(): return GreynoiseClient("dummykey") @@ -96,6 +102,12 @@ def test_init(self, greynoise_client): assert greynoise_client.api_key == "dummykey" +class TestAbuseIPDBClient: + def test_init(self, abuseipdb_client): + assert abuseipdb_client.name == "AbuseIPDB" + assert abuseipdb_client.api_key == "dummykey" + + class TestIpWhoisClient: def test_init(self, ipwhois_client): assert ipwhois_client.name == "IPWhois" diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 0fefbb5..0af4d81 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -1,9 +1,11 @@ import json +from unittest.mock import MagicMock, patch + import pytest from requests.exceptions import ConnectionError from rich.console import Console -from unittest.mock import MagicMock, patch +from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.base import requests from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ipwhois import IpWhoisClient @@ -26,6 +28,7 @@ def generate_domain_handler(max_resolutions=3): ip_enricher_client=IpWhoisClient(), whois_client=PTClient("dummyuser", "dummykey"), greynoise_client=GreynoiseClient("dummykey"), + abuseipdb_client=AbuseIpDbClient("dummykey"), urlhaus_client=UrlHausClient(), max_resolutions=max_resolutions, ) @@ -40,6 +43,7 @@ def generate_ip_handler(): ip_enricher_client=IpWhoisClient(), whois_client=PTClient("dummyuser", "dummykey"), greynoise_client=GreynoiseClient("dummykey"), + abuseipdb_client=AbuseIpDbClient("dummykey"), urlhaus_client=UrlHausClient(), ) diff --git a/tests/test_ui_domain_view.py b/tests/test_ui_domain_view.py index ccdf265..487b01f 100644 --- a/tests/test_ui_domain_view.py +++ b/tests/test_ui_domain_view.py @@ -1,15 +1,16 @@ -import pytest import json +from unittest.mock import MagicMock +import pytest from rich.console import Console from rich.panel import Panel from rich.table import Table from rich.text import Span, Text -from unittest.mock import MagicMock from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ipwhois import IpWhoisClient from wtfis.clients.shodan import ShodanClient +from wtfis.models.abuseipdb import AbuseIpDbMap from wtfis.clients.urlhaus import UrlHausClient from wtfis.models.greynoise import GreynoiseIpMap from wtfis.models.ip2whois import Whois as Ip2Whois @@ -41,6 +42,7 @@ def view01(test_data, mock_ipwhois_get): whois=PTWhois.model_validate(json.loads(test_data("pt_whois_gist.json"))), ip_enrich=ip_enrich, greynoise=GreynoiseIpMap.model_validate({}), + abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=UrlHausMap.model_validate({}), ) @@ -58,6 +60,7 @@ def view02(test_data): whois=VTWhois.model_validate(json.loads(test_data("vt_whois_gist.json"))), ip_enrich=IpWhoisMap.model_validate({}), greynoise=GreynoiseIpMap.model_validate({}), + abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), max_resolutions=1, ) @@ -73,6 +76,7 @@ def view03(test_data): whois=VTWhois.model_validate(json.loads(test_data("vt_whois_bbc.json"))), ip_enrich=MagicMock(), greynoise=MagicMock(), + abuseipdb=MagicMock(), urlhaus=MagicMock(), ) @@ -90,6 +94,7 @@ def view04(test_data): whois=MagicMock(), ip_enrich=MagicMock(), greynoise=MagicMock(), + abuseipdb=MagicMock(), urlhaus=MagicMock(), ) @@ -104,6 +109,7 @@ def view05(test_data): whois=MagicMock(), ip_enrich=MagicMock(), greynoise=MagicMock(), + abuseipdb=MagicMock(), urlhaus=MagicMock(), ) @@ -118,6 +124,7 @@ def view06(test_data): whois=VTWhois.model_validate(json.loads(test_data("vt_whois_example_2.json"))), ip_enrich=MagicMock(), greynoise=MagicMock(), + abuseipdb=MagicMock(), urlhaus=MagicMock(), ) @@ -139,6 +146,7 @@ def view07(test_data, mock_shodan_get_ip): whois=MagicMock(), ip_enrich=ip_enrich, greynoise=GreynoiseIpMap.model_validate({}), + abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), ) @@ -160,6 +168,7 @@ def view08(test_data, mock_shodan_get_ip): whois=MagicMock(), ip_enrich=ip_enrich, greynoise=GreynoiseIpMap.model_validate({}), + abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), max_resolutions=1, ) @@ -187,6 +196,7 @@ def view09(test_data, mock_shodan_get_ip, mock_greynoise_get): whois=MagicMock(), ip_enrich=ip_enrich, greynoise=greynoise_enrich, + abuseipdb=MagicMock(), urlhaus=MagicMock(), max_resolutions=1, ) @@ -202,6 +212,7 @@ def view10(test_data): whois=VTWhois.model_validate(json.loads(test_data("vt_whois_foo.json"))), ip_enrich=MagicMock(), greynoise=GreynoiseIpMap.model_validate({}), + abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), ) @@ -223,6 +234,7 @@ def view11(test_data, mock_shodan_get_ip): whois=MagicMock(), ip_enrich=ip_enrich, greynoise=GreynoiseIpMap.model_validate({}), + abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), ) @@ -237,6 +249,7 @@ def view12(test_data): whois=Ip2Whois.model_validate(json.loads(test_data("ip2whois_whois_hotmail.json"))), ip_enrich=MagicMock(), greynoise=MagicMock(), + abuseipdb=MagicMock(), urlhaus=MagicMock(), ) @@ -251,6 +264,7 @@ def view13(test_data): whois=Ip2Whois.model_validate(json.loads(test_data("ip2whois_whois_bbc.json"))), ip_enrich=MagicMock(), greynoise=MagicMock(), + abuseipdb=MagicMock(), urlhaus=MagicMock(), ) @@ -277,6 +291,7 @@ def view14(test_data, mock_ipwhois_get, mock_urlhaus_get): whois=PTWhois.model_validate(json.loads(test_data("pt_whois_gist.json"))), ip_enrich=ip_enrich, greynoise=GreynoiseIpMap.model_validate({}), + abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=urlhaus_enrich, ) diff --git a/tests/test_ui_ip_view.py b/tests/test_ui_ip_view.py index 75121c2..2bbf5b2 100644 --- a/tests/test_ui_ip_view.py +++ b/tests/test_ui_ip_view.py @@ -6,6 +6,7 @@ from rich.table import Table from rich.text import Span, Text from unittest.mock import MagicMock +from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ipwhois import IpWhoisClient @@ -47,6 +48,7 @@ def view01(test_data, mock_ipwhois_get, mock_greynoise_get, mock_urlhaus_get): whois=PTWhois.model_validate(json.loads(test_data("pt_whois_1.1.1.1.json"))), ip_enrich=ip_enrich, greynoise=greynoise_enrich, + abuseipdb=MagicMock(), urlhaus=urlhaus_enrich, ) @@ -71,6 +73,7 @@ def view02(test_data, mock_shodan_get_ip, mock_greynoise_get): whois=MagicMock(), ip_enrich=ip_enrich, greynoise=greynoise_enrich, + abuseipdb=MagicMock(), urlhaus=UrlHausMap.model_validate({}), ) @@ -84,6 +87,7 @@ def view03(test_data): whois=VTWhois.model_validate(json.loads(test_data("vt_whois_1.1.1.1.json"))), ip_enrich=MagicMock(), greynoise=MagicMock(), + abuseipdb=MagicMock(), urlhaus=MagicMock(), ) @@ -100,6 +104,7 @@ def view04(test_data): whois=MagicMock(), ip_enrich=IpWhoisMap.model_validate({}), greynoise=GreynoiseIpMap.model_validate({}), + abuseipdb=MagicMock(), urlhaus=UrlHausMap.model_validate({}), ) @@ -119,6 +124,7 @@ def view05(test_data, mock_greynoise_get): whois=MagicMock(), ip_enrich=IpWhoisMap.model_validate({}), greynoise=greynoise_enrich, + abuseipdb=MagicMock(), urlhaus=UrlHausMap.model_validate({}), ) @@ -138,6 +144,7 @@ def view06(test_data, mock_greynoise_get): whois=MagicMock(), ip_enrich=IpWhoisMap.model_validate({}), greynoise=greynoise_enrich, + abuseipdb=MagicMock(), urlhaus=MagicMock(), ) diff --git a/wtfis/clients/abuseipdb.py b/wtfis/clients/abuseipdb.py new file mode 100644 index 0000000..effd069 --- /dev/null +++ b/wtfis/clients/abuseipdb.py @@ -0,0 +1,43 @@ +from typing import Optional + +from requests.exceptions import HTTPError + +from wtfis.clients.base import BaseIpEnricherClient, BaseRequestsClient +from wtfis.models.abuseipdb import AbuseIpDb, AbuseIpDbMap + + +class AbuseIpDbClient(BaseRequestsClient, BaseIpEnricherClient): + """ + AbuseIPDB client + """ + baseurl = "https://api.abuseipdb.com/api/v2/check" + + def __init__(self, api_key: str) -> None: + super().__init__() + self.api_key = api_key + + @property + def name(self) -> str: + return "AbuseIPDB" + + def _get_ip(self, ip: str) -> Optional[AbuseIpDb]: + # Let a 404 or invalid IP pass + try: + params = {"ipAddress": ip, "maxAgeInDays": "90"} + headers = {"key": self.api_key, "Accept": "application/json"} + + response = self._get(request="", headers=headers, params=params) + + return AbuseIpDb.model_validate(response["data"]) + except HTTPError as e: + if e.response.status_code == 404: + return None + raise + + def enrich_ips(self, *ips: str) -> AbuseIpDbMap: + abuseipdb_map = {} + for ip in ips: + ip_data = self._get_ip(ip) + if ip_data: + abuseipdb_map[ip_data.ip_address] = ip_data + return AbuseIpDbMap.model_validate(abuseipdb_map) diff --git a/wtfis/handlers/base.py b/wtfis/handlers/base.py index 59a3c35..c962800 100644 --- a/wtfis/handlers/base.py +++ b/wtfis/handlers/base.py @@ -13,6 +13,7 @@ from shodan.exception import APIError from typing import Callable, List, Optional, Union +from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ip2whois import Ip2WhoisClient from wtfis.clients.ipwhois import IpWhoisClient @@ -20,6 +21,7 @@ from wtfis.clients.shodan import ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient +from wtfis.models.abuseipdb import AbuseIpDbMap from wtfis.models.common import WhoisBase from wtfis.models.greynoise import GreynoiseIpMap from wtfis.models.ipwhois import IpWhoisMap @@ -70,6 +72,7 @@ def __init__( ip_enricher_client: Union[IpWhoisClient, ShodanClient], whois_client: Union[Ip2WhoisClient, PTClient, VTClient], greynoise_client: Optional[GreynoiseClient], + abuseipdb_client: Optional[AbuseIpDbClient], urlhaus_client: Optional[UrlHausClient], ): # Process-specific @@ -82,14 +85,16 @@ def __init__( self._enricher = ip_enricher_client self._whois = whois_client self._greynoise = greynoise_client + self._abuseipdb = abuseipdb_client self._urlhaus = urlhaus_client # Dataset containers - self.vt_info: Union[Domain, IpAddress] + self.vt_info: Union[Domain, IpAddress] self.ip_enrich: Union[IpWhoisMap, ShodanIpMap] = IpWhoisMap.empty() - self.whois: WhoisBase + self.whois: WhoisBase self.greynoise: GreynoiseIpMap = GreynoiseIpMap.empty() - self.urlhaus: UrlHausMap = UrlHausMap.empty() + self.abuseipdb: AbuseIpDbMap = AbuseIpDbMap.empty() + self.urlhaus: UrlHausMap = UrlHausMap.empty() # Warning messages container self.warnings: List[str] = [] @@ -110,6 +115,12 @@ def _fetch_greynoise(self, *ips: str) -> None: if self._greynoise: self.greynoise = self._greynoise.enrich_ips(*ips) + @common_exception_handler + @failopen_exception_handler("_abuseipdb") + def _fetch_abuseipdb(self, *ips: str) -> None: + if self._abuseipdb: + self.abuseipdb = self._abuseipdb.enrich_ips(*ips) + @common_exception_handler def _fetch_whois(self) -> None: # Let continue if rate limited diff --git a/wtfis/handlers/domain.py b/wtfis/handlers/domain.py index f49ce41..5f8db3c 100644 --- a/wtfis/handlers/domain.py +++ b/wtfis/handlers/domain.py @@ -1,11 +1,13 @@ """ Logic handler for domain and hostname inputs """ +from typing import Optional, Union + from requests.exceptions import HTTPError from rich.console import Console from rich.progress import Progress -from typing import Optional, Union +from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ip2whois import Ip2WhoisClient from wtfis.clients.ipwhois import IpWhoisClient @@ -13,11 +15,13 @@ from wtfis.clients.shodan import ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient + from wtfis.handlers.base import ( BaseHandler, common_exception_handler, failopen_exception_handler, ) + from wtfis.models.virustotal import Resolutions @@ -31,11 +35,12 @@ def __init__( ip_enricher_client: Union[IpWhoisClient, ShodanClient], whois_client: Union[Ip2WhoisClient, PTClient, VTClient], greynoise_client: Optional[GreynoiseClient], + abuseipdb_client: Optional[AbuseIpDbClient], urlhaus_client: Optional[UrlHausClient], max_resolutions: int = 0, ): super().__init__(entity, console, progress, vt_client, ip_enricher_client, - whois_client, greynoise_client, urlhaus_client) + whois_client, greynoise_client, abuseipdb_client, urlhaus_client) # Extended attributes self.max_resolutions = max_resolutions @@ -83,6 +88,12 @@ def fetch_data(self): self._fetch_greynoise(*self.resolutions.ip_list(self.max_resolutions)) self.progress.update(task_g, completed=100) + if self._abuseipdb: + task_g = self.progress.add_task(f"Fetching IP enrichments from {self._abuseipdb.name}") + self.progress.update(task_g, advance=50) + self._fetch_abuseipdb(*self.resolutions.ip_list(self.max_resolutions)) + self.progress.update(task_g, completed=100) + if self._urlhaus: task_u = self.progress.add_task(f"Fetching domain enrichments from {self._urlhaus.name}") self.progress.update(task_u, advance=50) diff --git a/wtfis/handlers/ip.py b/wtfis/handlers/ip.py index 232713c..39841ff 100644 --- a/wtfis/handlers/ip.py +++ b/wtfis/handlers/ip.py @@ -42,6 +42,12 @@ def fetch_data(self): self._fetch_greynoise(self.entity) self.progress.update(task_g, completed=100) + if self._abuseipdb: + task_g = self.progress.add_task(f"Fetching IP enrichments from {self._abuseipdb.name}") + self.progress.update(task_g, advance=50) + self._fetch_abuseipdb(self.entity) + self.progress.update(task_g, completed=100) + task_w = self.progress.add_task(f"Fetching IP whois from {self._whois.name}") self.progress.update(task_w, advance=50) self._fetch_whois() diff --git a/wtfis/main.py b/wtfis/main.py index 0c862ea..beaad17 100644 --- a/wtfis/main.py +++ b/wtfis/main.py @@ -7,6 +7,7 @@ from rich.console import Console from rich.progress import Progress from typing import Union +from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ip2whois import Ip2WhoisClient @@ -56,6 +57,7 @@ def parse_args() -> Namespace: ) parser.add_argument("-s", "--use-shodan", help="Use Shodan to enrich IPs", action="store_true") parser.add_argument("-g", "--use-greynoise", help="Enable Greynoise for IPs", action="store_true") + parser.add_argument("-a", "--use-abuseipdb", help="Enable AbuseIPDB for IPs", action="store_true") parser.add_argument("-u", "--use-urlhaus", help="Enable URLhaus for IPs and domains", action="store_true") parser.add_argument("-n", "--no-color", help="Show output without colors", action="store_true") parser.add_argument("-1", "--one-column", help="Display results in one column", action="store_true") @@ -74,6 +76,8 @@ def parse_args() -> Namespace: parsed.use_shodan = not parsed.use_shodan elif option in ("-g", "--use-greynoise"): parsed.use_greynoise = not parsed.use_greynoise + elif option in ("-a", "--use-abuseipdb"): + parsed.use_abuseipdb = not parsed.use_abuseipdb elif option in ("-u", "--use-urlhaus"): parsed.use_urlhaus = not parsed.use_urlhaus elif option in ("-n", "--no-color"): @@ -88,6 +92,8 @@ def parse_args() -> Namespace: argparse.ArgumentParser().error("SHODAN_API_KEY is not set") if parsed.use_greynoise and not os.environ.get("GREYNOISE_API_KEY"): argparse.ArgumentParser().error("GREYNOISE_API_KEY is not set") + if parsed.use_abuseipdb and not os.environ.get("ABUSEIPDB_API_KEY"): + argparse.ArgumentParser().error("ABUSEIPDB_API_KEY is not set") if is_ip(parsed.entity) and parsed.max_resolutions != DEFAULT_MAX_RESOLUTIONS: argparse.ArgumentParser().error("--max-resolutions is not applicable to IPs") @@ -130,6 +136,12 @@ def generate_entity_handler( else None ) + # AbuseIPDB client (optional) + abuseipdb_client = ( + AbuseIpDbClient(os.environ["ABUSEIPDB_API_KEY"]) + if args.use_abuseipdb else None + ) + # URLhaus client (optional) urlhaus_client = ( UrlHausClient() @@ -147,6 +159,7 @@ def generate_entity_handler( ip_enricher_client=enricher_client, whois_client=whois_client, greynoise_client=greynoise_client, + abuseipdb_client=abuseipdb_client, urlhaus_client=urlhaus_client, max_resolutions=args.max_resolutions, ) @@ -160,6 +173,7 @@ def generate_entity_handler( ip_enricher_client=enricher_client, whois_client=whois_client, greynoise_client=greynoise_client, + abuseipdb_client=abuseipdb_client, urlhaus_client=urlhaus_client, ) @@ -180,6 +194,7 @@ def generate_view( entity.whois, entity.ip_enrich, entity.greynoise, + entity.abuseipdb, entity.urlhaus, max_resolutions=args.max_resolutions, ) @@ -190,6 +205,7 @@ def generate_view( entity.whois, entity.ip_enrich, entity.greynoise, + entity.abuseipdb, entity.urlhaus, ) else: diff --git a/wtfis/models/abuseipdb.py b/wtfis/models/abuseipdb.py new file mode 100644 index 0000000..ad78173 --- /dev/null +++ b/wtfis/models/abuseipdb.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from typing import Optional, List, Dict + +from pydantic import BaseModel, Field, RootModel + + +class AbuseIpDb(BaseModel): + ip_address: str = Field(alias="ipAddress") + is_public: Optional[bool] = Field(None, alias="isPublic") + ip_version: Optional[int] = Field(None, alias="numDistinctUsers") + is_whitelisted: Optional[bool] = Field(None, alias="ipVersion") + abuse_confidence_score: int = Field(alias="abuseConfidenceScore") + country_code: Optional[str] = Field(None, alias="countryCode") + country_name: Optional[str] = Field(None, alias="countryName") + usage_type: Optional[str] = Field(None, alias="usageType") + isp: str + domain: Optional[str] = None + hostnames: Optional[List[str]] = None + is_tor: Optional[bool] = Field(None, alias="isTor") + total_reports: Optional[int] = Field(None, alias="totalReports") + num_distinct_users: Optional[int] = Field(None, alias="numDistinctUsers") + last_reported_at: Optional[str] = Field(None, alias="lastReportedAt") + + +class AbuseIpDbMap(RootModel): + root: Dict[str, AbuseIpDb] + + @classmethod + def empty(cls) -> AbuseIpDbMap: + return cls.model_validate({}) diff --git a/wtfis/types.py b/wtfis/types.py index d8f7910..0ff68da 100644 --- a/wtfis/types.py +++ b/wtfis/types.py @@ -3,18 +3,19 @@ """ from typing import Union +from wtfis.models.abuseipdb import AbuseIpDbMap from wtfis.models.greynoise import GreynoiseIpMap from wtfis.models.ipwhois import IpWhoisMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap - # IP enrichment types IpEnrichmentType = Union[ GreynoiseIpMap, IpWhoisMap, ShodanIpMap, UrlHausMap, + AbuseIpDbMap, ] # Domain/FQDN enrichment types diff --git a/wtfis/ui/base.py b/wtfis/ui/base.py index 4746d41..d445ad3 100644 --- a/wtfis/ui/base.py +++ b/wtfis/ui/base.py @@ -10,6 +10,7 @@ from rich.table import Table from rich.text import Text from typing import Any, Generator, List, Optional, Tuple, Union +from wtfis.models.abuseipdb import AbuseIpDb, AbuseIpDbMap from wtfis.models.common import WhoisBase from wtfis.models.greynoise import GreynoiseIp, GreynoiseIpMap @@ -40,6 +41,7 @@ def __init__( whois: Optional[WhoisBase], ip_enrich: Union[IpWhoisMap, ShodanIpMap], greynoise: GreynoiseIpMap, + abuseipdb: AbuseIpDbMap, urlhaus: UrlHausMap, ) -> None: self.console = console @@ -47,6 +49,7 @@ def __init__( self.whois = whois self.ip_enrich = ip_enrich self.greynoise = greynoise + self.abuseipdb = abuseipdb self.urlhaus = urlhaus self.theme = Theme() @@ -251,6 +254,32 @@ def _gen_greynoise_tuple(self, ip: GreynoiseIp) -> Tuple[Text, Text]: return title, text + def _gen_abuseipdb_tuple(self, ip: AbuseIpDb) -> Tuple[Text, Text]: + + # + # Title + # + title = self._gen_linked_field_name("AbuseIPDB", hyperlink=f"https://www.abuseipdb.com/check/{ip.ip_address}") + + # + # Content + # + + text = Text() + + score_message = f"{str(ip.abuse_confidence_score)} abuse confidence score" + abuseipdb_text: Text + if ip.abuse_confidence_score == 0: + abuseipdb_text = Text(score_message, style=self.theme.info) + elif ip.abuse_confidence_score <= 30: + abuseipdb_text = Text(score_message, style=self.theme.warn) + else: + abuseipdb_text = Text(score_message, style=self.theme.error) + + text.append(abuseipdb_text) + + return title, text + def _gen_asn_text( self, asn: Optional[str], @@ -272,6 +301,9 @@ def _get_ip_enrichment(self, ip: str) -> Optional[Union[IpWhois, ShodanIp]]: def _get_greynoise_enrichment(self, ip: str) -> Optional[GreynoiseIp]: return self.greynoise.root[ip] if ip in self.greynoise.root.keys() else None + def _get_abuseipdb_enrichment(self, ip: str) -> Optional[AbuseIpDb]: + return self.abuseipdb.root[ip] if ip in self.abuseipdb.root.keys() else None + def _get_urlhaus_enrichment(self, entity: str) -> Optional[UrlHaus]: return self.urlhaus.root[entity] if entity in self.urlhaus.root.keys() else None @@ -438,6 +470,11 @@ def _gen_ip_other_section(self) -> Optional[RenderableType]: if greynoise: data.append(self._gen_greynoise_tuple(greynoise)) + # abuseIPDB + abuseipdb = self._get_abuseipdb_enrichment(ip=self.entity.data.id_) + if abuseipdb: + data.append(self._gen_abuseipdb_tuple(abuseipdb)) + if data: return self._gen_section( self._gen_table(*data), diff --git a/wtfis/ui/view.py b/wtfis/ui/view.py index f8ea65d..e1811c1 100644 --- a/wtfis/ui/view.py +++ b/wtfis/ui/view.py @@ -8,6 +8,7 @@ from rich.panel import Panel from rich.text import Text from typing import List, Optional, Tuple, Union +from wtfis.models.abuseipdb import AbuseIpDbMap from wtfis.models.common import WhoisBase from wtfis.models.greynoise import GreynoiseIpMap @@ -27,6 +28,7 @@ class DomainView(BaseView): """ Handler for FQDN and domain lookup output """ + def __init__( self, console: Console, @@ -35,10 +37,12 @@ def __init__( whois: WhoisBase, ip_enrich: Union[IpWhoisMap, ShodanIpMap], greynoise: GreynoiseIpMap, + abuseipdb: AbuseIpDbMap, urlhaus: UrlHausMap, max_resolutions: int = 3, ) -> None: - super().__init__(console, entity, whois, ip_enrich, greynoise, urlhaus) + super().__init__(console, entity, whois, ip_enrich, greynoise, abuseipdb, urlhaus) + self.resolutions = resolutions self.max_resolutions = max_resolutions @@ -120,6 +124,11 @@ def resolutions_panel(self) -> Optional[Panel]: if greynoise: data += [self._gen_greynoise_tuple(greynoise)] + # abuseIPDB + abuseipdb = self._get_abuseipdb_enrichment(attributes.ip_address) + if abuseipdb: + data += [self._gen_abuseipdb_tuple(abuseipdb)] + # Include a disclaimer if last seen is older than 1 year # Note: Disabled for now because I originally understood that the resolution date was the last time # the domain was resolved, but it may actually be he first time the IP itself was seen with the domain. @@ -176,6 +185,7 @@ class IpAddressView(BaseView): """ Handler for IP Address lookup output """ + def __init__( self, console: Console, @@ -183,9 +193,10 @@ def __init__( whois: WhoisBase, ip_enrich: Union[IpWhoisMap, ShodanIpMap], greynoise: GreynoiseIpMap, + abuseipdb: AbuseIpDbMap, urlhaus: UrlHausMap, ) -> None: - super().__init__(console, entity, whois, ip_enrich, greynoise, urlhaus) + super().__init__(console, entity, whois, ip_enrich, greynoise, abuseipdb, urlhaus) def ip_panel(self) -> Panel: content = [self._gen_vt_section()] # VT section