diff --git a/tests/conftest.py b/tests/conftest.py index 2522539..b96f4d9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,9 +8,9 @@ from wtfis.models.abuseipdb import AbuseIpDb from wtfis.models.greynoise import GreynoiseIp from wtfis.models.ipwhois import IpWhois +from wtfis.models.r7insight import Rapid7Insight from wtfis.models.shodan import ShodanIp from wtfis.models.urlhaus import UrlHaus -from wtfis.models.r7insight import Rapid7Insight class TestTheme: @@ -78,6 +78,7 @@ def urlhaus_get_host(entity, pool) -> UrlHaus: """Mock replacement for UrlHausClient()._get_host()""" return UrlHaus.model_validate(pool[entity]) + def rapid7_get_host(entity, pool) -> UrlHaus: """Mock replacement for UrlHausClient()._get_host()""" return Rapid7Insight.model_validate(pool[entity]) @@ -130,6 +131,7 @@ def mock_shodan_get_ip(): def mock_urlhaus_get(): return urlhaus_get_host + @pytest.fixture(scope="module") def mock_rapid7_get(): return rapid7_get_host diff --git a/tests/test_cli.py b/tests/test_cli.py index 530a204..9d9fe26 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -142,6 +142,7 @@ def fake_load_dotenv_ip2whois(tmp_path): } return fake_load_dotenv(tmp_path, fake_env_vars) + @pytest.fixture() def fake_load_dotenv_rapid7(tmp_path): fake_env_vars = { @@ -504,6 +505,7 @@ def test_handler_domain_1(self, fake_load_dotenv_1): assert entity._greynoise is None assert entity._urlhaus is None assert entity._abuseipdb is None + assert entity._rapid7insight is None unset_env_vars() @patch("sys.argv", ["main", "www.example[.]com", "-s", "-g", "-u", "-m", "5"]) @@ -597,6 +599,7 @@ def test_view_domain_1(self, m_domain_view, test_data): greynoise_client=MagicMock(), abuseipdb_client=MagicMock(), urlhaus_client=MagicMock(), + rapid7insight_client=MagicMock(), ) entity.vt_info = Domain.model_validate( json.loads(test_data("vt_domain_gist.json")) @@ -620,6 +623,7 @@ def test_view_ip_1(self, m_ip_view, test_data): greynoise_client=MagicMock(), abuseipdb_client=MagicMock(), urlhaus_client=MagicMock(), + rapid7insight_client=MagicMock(), ) entity.vt_info = IpAddress.model_validate( json.loads(test_data("vt_ip_1.1.1.1.json")) diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 373aad1..e092d92 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -10,6 +10,7 @@ from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ipwhois import IpWhoisClient from wtfis.clients.passivetotal import PTClient +from wtfis.clients.r7insight import Rapid7InsightClient from wtfis.clients.shodan import ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient @@ -32,6 +33,7 @@ def generate_domain_handler(max_resolutions=3): greynoise_client=GreynoiseClient("dummykey"), abuseipdb_client=AbuseIpDbClient("dummykey"), urlhaus_client=UrlHausClient(), + rapid7insight_client=Rapid7InsightClient("dummyuser", "dummykey"), max_resolutions=max_resolutions, ) @@ -48,6 +50,7 @@ def generate_ip_handler(): greynoise_client=GreynoiseClient("dummykey"), abuseipdb_client=AbuseIpDbClient("dummykey"), urlhaus_client=UrlHausClient(), + rapid7insight_client=Rapid7InsightClient("dummyuser", "dummykey"), ) diff --git a/tests/test_ui_domain_view.py b/tests/test_ui_domain_view.py index f382511..ac2a675 100644 --- a/tests/test_ui_domain_view.py +++ b/tests/test_ui_domain_view.py @@ -17,6 +17,7 @@ from wtfis.models.ip2whois import Whois as Ip2Whois from wtfis.models.ipwhois import IpWhoisMap from wtfis.models.passivetotal import Whois as PTWhois +from wtfis.models.r7insight import Rapid7InsightMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap from wtfis.models.virustotal import Domain, Resolutions @@ -48,6 +49,7 @@ def view01(test_data, mock_ipwhois_get): greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=UrlHausMap.model_validate({}), + rapid7Insight=Rapid7InsightMap.model_validate({}), ) @@ -69,6 +71,7 @@ def view02(test_data): greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), max_resolutions=1, ) @@ -86,6 +89,7 @@ def view03(test_data): greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), ) @@ -105,6 +109,7 @@ def view04(test_data): greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), ) @@ -122,6 +127,7 @@ def view05(test_data): greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), ) @@ -138,6 +144,7 @@ def view06(test_data): greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), ) @@ -172,6 +179,7 @@ def view07(test_data, mock_ipwhois_get, mock_shodan_get_ip): greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), ) @@ -199,6 +207,7 @@ def view08(test_data, mock_shodan_get_ip): greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), max_resolutions=1, ) @@ -242,6 +251,7 @@ def view09(test_data, mock_shodan_get_ip, mock_greynoise_get, mock_abuseipdb_get greynoise=greynoise_enrich, abuseipdb=abuseipdb_enrich, urlhaus=MagicMock(), + rapid7Insight=MagicMock(), max_resolutions=1, ) @@ -259,6 +269,7 @@ def view10(test_data): greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), ) @@ -286,6 +297,7 @@ def view11(test_data, mock_shodan_get_ip): greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), ) @@ -304,6 +316,7 @@ def view12(test_data): greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), ) @@ -320,6 +333,7 @@ def view13(test_data): greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), + rapid7Insight=MagicMock(), ) @@ -354,6 +368,7 @@ def view14(test_data, mock_ipwhois_get, mock_urlhaus_get): greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=urlhaus_enrich, + rapid7Insight=Rapid7InsightMap.model_validate({}), ) diff --git a/tests/test_ui_ip_view.py b/tests/test_ui_ip_view.py index 6db0a55..4816b98 100644 --- a/tests/test_ui_ip_view.py +++ b/tests/test_ui_ip_view.py @@ -77,6 +77,7 @@ def view01( greynoise=greynoise_enrich, abuseipdb=abuseipdb_enrich, urlhaus=urlhaus_enrich, + rapid7insight=MagicMock(), ) @@ -114,6 +115,7 @@ def view02(test_data, mock_ipwhois_get, mock_shodan_get_ip, mock_greynoise_get): greynoise=greynoise_enrich, abuseipdb=MagicMock(), urlhaus=UrlHausMap.model_validate({}), + rapid7insight=MagicMock(), ) @@ -129,6 +131,7 @@ def view03(test_data): greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), + rapid7insight=MagicMock(), ) @@ -149,6 +152,7 @@ def view04(test_data): greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=MagicMock(), urlhaus=UrlHausMap.model_validate({}), + rapid7insight=MagicMock(), ) @@ -172,6 +176,7 @@ def view05(test_data, mock_greynoise_get): greynoise=greynoise_enrich, abuseipdb=MagicMock(), urlhaus=UrlHausMap.model_validate({}), + rapid7insight=MagicMock(), ) @@ -196,6 +201,7 @@ def view06(test_data, mock_greynoise_get): greynoise=greynoise_enrich, abuseipdb=MagicMock(), urlhaus=MagicMock(), + rapid7insight=MagicMock(), ) @@ -219,6 +225,7 @@ def view07(test_data, mock_abuseipdb_get): greynoise=MagicMock(), abuseipdb=abuseipdb_enrich, urlhaus=MagicMock(), + rapid7insight=MagicMock(), ) @@ -242,6 +249,7 @@ def view08(test_data, mock_abuseipdb_get): greynoise=MagicMock(), abuseipdb=abuseipdb_enrich, urlhaus=MagicMock(), + rapid7insight=MagicMock(), ) diff --git a/wtfis/clients/r7insight.py b/wtfis/clients/r7insight.py index ba7c695..ac53d7f 100644 --- a/wtfis/clients/r7insight.py +++ b/wtfis/clients/r7insight.py @@ -1,13 +1,16 @@ -from typing import Optional - -from requests.exceptions import HTTPError from requests.auth import HTTPBasicAuth -from wtfis.clients.base import BaseIpEnricherClient, BaseRequestsClient, BaseDomainEnricherClient +from wtfis.clients.base import ( + BaseDomainEnricherClient, + BaseIpEnricherClient, + BaseRequestsClient, +) from wtfis.models.r7insight import Rapid7Insight, Rapid7InsightMap -class Rapid7InsightClient(BaseRequestsClient, BaseDomainEnricherClient, BaseIpEnricherClient): +class Rapid7InsightClient( + BaseRequestsClient, BaseDomainEnricherClient, BaseIpEnricherClient +): """ Rapid7 Insight client """ @@ -20,20 +23,14 @@ def __init__(self, user_id: str, api_key: str) -> None: self.api_key = api_key self.s.auth = HTTPBasicAuth(self.user_id, self.api_key) - @property def name(self) -> str: return "Rapid7 Insight" def _get_host(self, host: str) -> Rapid7Insight: - try: - return Rapid7Insight.model_validate( - self._get(f"v3/iocs/ioc-by-value?iocValue={host}") - ) - except HTTPError as e: - if e.response.status_code == 404: - return None - raise + return Rapid7Insight.model_validate( + self._get(f"v3/iocs/ioc-by-value?iocValue={host}") + ) def _enrich(self, *entities: str) -> Rapid7InsightMap: """Method is the same whether input is a domain or IP""" diff --git a/wtfis/handlers/base.py b/wtfis/handlers/base.py index d0a8e28..73d47df 100644 --- a/wtfis/handlers/base.py +++ b/wtfis/handlers/base.py @@ -15,20 +15,20 @@ from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient +from wtfis.clients.r7insight import Rapid7InsightClient from wtfis.clients.shodan import ShodanClient from wtfis.clients.types import IpGeoAsnClientType, IpWhoisClientType from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient -from wtfis.clients.r7insight import Rapid7InsightClient from wtfis.models.abuseipdb import AbuseIpDbMap from wtfis.models.base import WhoisBase from wtfis.models.greynoise import GreynoiseIpMap from wtfis.models.ipwhois import IpWhoisMap +from wtfis.models.r7insight import Rapid7InsightMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.types import IpGeoAsnMapType from wtfis.models.urlhaus import UrlHausMap from wtfis.models.virustotal import Domain, IpAddress -from wtfis.models.r7insight import Rapid7InsightMap from wtfis.ui.theme import Theme from wtfis.utils import error_and_exit, refang diff --git a/wtfis/handlers/domain.py b/wtfis/handlers/domain.py index df9a6f5..b02efd0 100644 --- a/wtfis/handlers/domain.py +++ b/wtfis/handlers/domain.py @@ -10,11 +10,11 @@ from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient +from wtfis.clients.r7insight import Rapid7InsightClient from wtfis.clients.shodan import ShodanClient from wtfis.clients.types import IpGeoAsnClientType, IpWhoisClientType from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient -from wtfis.clients.r7insight import Rapid7InsightClient from wtfis.handlers.base import ( BaseHandler, common_exception_handler, @@ -124,7 +124,9 @@ def fetch_data(self): f"Fetching IP data from {self._rapid7insight.name}" ) self.progress.update(task_r, advance=50) - self._fetch_rapid7insight(*self.resolutions.ip_list(self.max_resolutions)) + self._fetch_rapid7insight( + *self.resolutions.ip_list(self.max_resolutions) + ) self.progress.update(task_r, completed=100) if self._urlhaus: diff --git a/wtfis/main.py b/wtfis/main.py index 351c855..136c89d 100644 --- a/wtfis/main.py +++ b/wtfis/main.py @@ -13,10 +13,10 @@ from wtfis.clients.ip2whois import Ip2WhoisClient from wtfis.clients.ipwhois import IpWhoisClient from wtfis.clients.passivetotal import PTClient +from wtfis.clients.r7insight import Rapid7InsightClient from wtfis.clients.shodan import ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient -from wtfis.clients.r7insight import Rapid7InsightClient from wtfis.exceptions import WtfisException from wtfis.handlers.base import BaseHandler from wtfis.handlers.domain import DomainHandler @@ -79,7 +79,10 @@ def parse_args() -> Namespace: action="store_true", ) parser.add_argument( - "-r", "--use-rapid7", help="Enable Rapid7 Insight for IPs and Domains", action="store_true" + "-r", + "--use-rapid7", + help="Enable Rapid7 Insight for IPs and Domains", + action="store_true", ) parser.add_argument( "-n", "--no-color", help="Show output without colors", action="store_true" @@ -178,7 +181,11 @@ def generate_entity_handler( # Rapid7 Insights client (optional) rapid7insight_client = ( - Rapid7InsightClient(os.environ["RAPID7_ACCOUNT_ID"], os.environ["RAPID7_API_KEY"]) if args.use_rapid7 else None + Rapid7InsightClient( + os.environ["RAPID7_ACCOUNT_ID"], os.environ["RAPID7_API_KEY"] + ) + if args.use_rapid7 + else None ) # Domain / FQDN handler diff --git a/wtfis/models/r7insight.py b/wtfis/models/r7insight.py index 7ac6e36..cfad112 100644 --- a/wtfis/models/r7insight.py +++ b/wtfis/models/r7insight.py @@ -10,6 +10,7 @@ class Rapid7InsightFeed(BaseModel): name: str confidenceLevel: int + class Rapid7Insight(BaseModel): value: str type: str diff --git a/wtfis/models/types.py b/wtfis/models/types.py index 64a4005..0f6e76b 100644 --- a/wtfis/models/types.py +++ b/wtfis/models/types.py @@ -7,9 +7,9 @@ from wtfis.models.abuseipdb import AbuseIpDbMap from wtfis.models.greynoise import GreynoiseIpMap from wtfis.models.ipwhois import IpWhois, IpWhoisMap +from wtfis.models.r7insight import Rapid7InsightMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap -from wtfis.models.r7insight import Rapid7InsightMap # IP enrichment map types IpEnrichmentType = Union[ @@ -22,10 +22,7 @@ ] # Domain/FQDN enrichment map types -DomainEnrichmentType = Union[ - UrlHausMap, - Rapid7InsightMap - ] +DomainEnrichmentType = Union[UrlHausMap, Rapid7InsightMap] # IP geolocation and ASN types IpGeoAsnType = Union[IpWhois,] diff --git a/wtfis/ui/base.py b/wtfis/ui/base.py index de9d3b4..2892a69 100644 --- a/wtfis/ui/base.py +++ b/wtfis/ui/base.py @@ -10,11 +10,11 @@ from wtfis.models.abuseipdb import AbuseIpDb, AbuseIpDbMap from wtfis.models.base import WhoisBase from wtfis.models.greynoise import GreynoiseIp, GreynoiseIpMap +from wtfis.models.r7insight import Rapid7Insight, Rapid7InsightMap from wtfis.models.shodan import ShodanIp, ShodanIpMap from wtfis.models.types import IpGeoAsnMapType, IpGeoAsnType from wtfis.models.urlhaus import UrlHaus, UrlHausMap from wtfis.models.virustotal import LastAnalysisStats, PopularityRanks -from wtfis.models.r7insight import Rapid7Insight, Rapid7InsightMap from wtfis.ui.theme import Theme from wtfis.utils import Timestamp, is_ip, smart_join @@ -301,14 +301,13 @@ def _gen_abuseipdb_tuple(self, ip: AbuseIpDb) -> Tuple[Text, Text]: text.append(f" ({ip.total_reports} reports)", style=self.theme.table_value) return title, text - def _gen_rapid7_tuple(self, ip: Rapid7Insight) -> Tuple[Text, Text]: # # Title # - title = "Rapid7 Insight" + title = Text("Rapid7 Insight:") # # Content @@ -318,9 +317,11 @@ def _gen_rapid7_tuple(self, ip: Rapid7Insight) -> Tuple[Text, Text]: return title, text if ip.severity == "High": - style = self.theme.info + style = self.theme.error elif ip.severity == "Medium": style = self.theme.warn + elif ip.severity == "Low": + style = self.theme.info else: style = self.theme.error @@ -332,19 +333,22 @@ def _gen_rapid7_tuple(self, ip: Rapid7Insight) -> Tuple[Text, Text]: ) if len(ip.relatedMalware) > 0: - text.append(f"\nrelated malware:", style=self.theme.tags) + text.append("\nrelated malware:", style=self.theme.tags) for malware in ip.relatedMalware: - text.append(f" {malware}", style=self.theme.table_value) + text.append(f" {malware},", style=self.theme.table_value) + text = text[:-1] if len(ip.relatedCampaigns) > 0: - text.append(f"\nrelated campaigns:", style=self.theme.tags) + text.append("\nrelated campaigns:", style=self.theme.tags) for campaign in ip.relatedCampaigns: - text.append(f" {campaign}", style=self.theme.table_value) + text.append(f" {campaign},", style=self.theme.table_value) + text = text[:-1] if len(ip.reportedFeeds) > 0: - text.append(f"\nreported feeds:", style=self.theme.tags) + text.append("\nreported feeds:", style=self.theme.tags) for feed in ip.reportedFeeds: - text.append(f" {feed.name}", style=self.theme.table_value) + text.append(f" {feed.name},", style=self.theme.table_value) + text = text[:-1] return title, text @@ -378,9 +382,13 @@ def _get_abuseipdb_enrichment(self, ip: str) -> Optional[AbuseIpDb]: def _get_urlhaus_enrichment(self, entity: str) -> Optional[UrlHaus]: return self.urlhaus.root[entity] if entity in self.urlhaus.root.keys() else None - + def _get_rapid7insight_enrichment(self, entity: str) -> Optional[Rapid7Insight]: - return self.rapid7insight.root[entity] if entity in self.rapid7insight.root.keys() else None + return ( + self.rapid7insight.root[entity] + if entity in self.rapid7insight.root.keys() + else None + ) def _gen_vt_section(self) -> RenderableType: """Virustotal section. Applies to both domain and IP views""" diff --git a/wtfis/ui/view.py b/wtfis/ui/view.py index 23c1ee4..9f5438e 100644 --- a/wtfis/ui/view.py +++ b/wtfis/ui/view.py @@ -9,11 +9,11 @@ from wtfis.models.abuseipdb import AbuseIpDbMap from wtfis.models.base import WhoisBase from wtfis.models.greynoise import GreynoiseIpMap +from wtfis.models.r7insight import Rapid7InsightMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.types import IpGeoAsnMapType from wtfis.models.urlhaus import UrlHausMap from wtfis.models.virustotal import Domain, IpAddress, Resolutions -from wtfis.models.r7insight import Rapid7InsightMap from wtfis.ui.base import BaseView from wtfis.utils import Timestamp, smart_join @@ -38,7 +38,15 @@ def __init__( max_resolutions: int = 3, ) -> None: super().__init__( - console, entity, geoasn, whois, shodan, greynoise, abuseipdb, urlhaus, rapid7Insight + console, + entity, + geoasn, + whois, + shodan, + greynoise, + abuseipdb, + urlhaus, + rapid7Insight, ) self.resolutions = resolutions @@ -211,7 +219,15 @@ def __init__( rapid7insight: Rapid7InsightMap, ) -> None: super().__init__( - console, entity, geoasn, whois, shodan, greynoise, abuseipdb, urlhaus, rapid7insight + console, + entity, + geoasn, + whois, + shodan, + greynoise, + abuseipdb, + urlhaus, + rapid7insight, ) def ip_panel(self) -> Panel: