diff --git a/imgs/example-ip.png b/imgs/example-ip.png index 453862c..8556ccf 100644 Binary files a/imgs/example-ip.png and b/imgs/example-ip.png differ diff --git a/pyproject.toml b/pyproject.toml index 7cf7ae1..ea7205d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,9 +29,9 @@ classifiers = [ "Topic :: Security", ] dependencies = [ - "pydantic~=2.7.0", + "pydantic~=2.7.2", "python-dotenv~=1.0.1", - "requests~=2.31.0", + "requests~=2.32.3", "rich~=13.7.1", "shodan~=1.31.0", ] @@ -65,7 +65,7 @@ dependencies = [ "mypy", "pytest", "pytest-cov", - "types-requests~=2.31.0", + "types-requests~=2.32.0", ] [tool.hatch.envs.default.scripts] typecheck = "mypy -p {args:wtfis}" diff --git a/tests/test_cli.py b/tests/test_cli.py index 1f34cd2..5b3cd5c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -408,10 +408,12 @@ def test_handler_domain_1(self, fake_load_dotenv_1): assert entity.console == console assert entity.progress == progress assert isinstance(entity._vt, VTClient) - assert isinstance(entity._enricher, IpWhoisClient) + assert isinstance(entity._geoasn, IpWhoisClient) assert isinstance(entity._whois, PTClient) + assert entity._shodan is None assert entity._greynoise is None assert entity._urlhaus is None + assert entity._abuseipdb is None unset_env_vars() @patch("sys.argv", ["main", "www.example[.]com", "-s", "-g", "-u", "-m", "5"]) @@ -423,8 +425,9 @@ def test_handler_domain_2(self, fake_load_dotenv_1): progress = simulate_progress(console), entity = generate_entity_handler(parse_args(), console, progress) assert entity.max_resolutions == 5 - assert isinstance(entity._enricher, ShodanClient) + assert isinstance(entity._geoasn, IpWhoisClient) assert isinstance(entity._whois, PTClient) + assert isinstance(entity._shodan, ShodanClient) assert isinstance(entity._greynoise, GreynoiseClient) assert isinstance(entity._urlhaus, UrlHausClient) unset_env_vars() @@ -464,7 +467,7 @@ def test_handler_ip_1(self, fake_load_dotenv_1): assert entity.console == console assert entity.progress == progress assert isinstance(entity._vt, VTClient) - assert isinstance(entity._enricher, IpWhoisClient) + assert isinstance(entity._geoasn, IpWhoisClient) assert isinstance(entity._whois, PTClient) assert entity._greynoise is None assert entity._urlhaus is None @@ -478,8 +481,9 @@ def test_handler_ip_2(self, fake_load_dotenv_1): console = Console() progress = simulate_progress(console), entity = generate_entity_handler(parse_args(), console, progress) - assert isinstance(entity._enricher, ShodanClient) + assert isinstance(entity._geoasn, IpWhoisClient) assert isinstance(entity._whois, PTClient) + assert isinstance(entity._shodan, ShodanClient) assert isinstance(entity._greynoise, GreynoiseClient) assert isinstance(entity._urlhaus, UrlHausClient) assert isinstance(entity._abuseipdb, AbuseIpDbClient) @@ -496,8 +500,9 @@ def test_view_domain_1(self, m_domain_view, test_data): console=MagicMock(), progress=MagicMock(), vt_client=MagicMock(), - ip_enricher_client=MagicMock(), + ip_geoasn_client=MagicMock(), whois_client=MagicMock(), + shodan_client=MagicMock(), greynoise_client=MagicMock(), abuseipdb_client=MagicMock(), urlhaus_client=MagicMock(), @@ -516,8 +521,9 @@ def test_view_ip_1(self, m_ip_view, test_data): console=MagicMock(), progress=MagicMock(), vt_client=MagicMock(), - ip_enricher_client=MagicMock(), + ip_geoasn_client=MagicMock(), whois_client=MagicMock(), + shodan_client=MagicMock(), greynoise_client=MagicMock(), abuseipdb_client=MagicMock(), urlhaus_client=MagicMock(), diff --git a/tests/test_clients.py b/tests/test_clients.py index 6078f4b..806dc34 100644 --- a/tests/test_clients.py +++ b/tests/test_clients.py @@ -2,13 +2,15 @@ import pytest from unittest.mock import MagicMock, patch +from shodan import APIError + from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.base import requests from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ip2whois import Ip2WhoisClient from wtfis.clients.ipwhois import IpWhoisClient from wtfis.clients.passivetotal import PTClient -from wtfis.clients.shodan import APIError, Shodan, ShodanClient +from wtfis.clients.shodan import Shodan, ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient from wtfis.models.ipwhois import IpWhoisMap @@ -172,7 +174,7 @@ def test_get_ip_apierror_invalid_key(self, mock_shodan_host, shodan_client): shodan_client.enrich_ips("thisdoesntmatter") assert e.type == APIError - assert str(e.value) == "Invalid Shodan API key" + assert str(e.value) == "Invalid API key" @patch.object(Shodan, "host") def test_get_ip_apierror_other(self, mock_shodan_host, shodan_client): diff --git a/tests/test_handlers.py b/tests/test_handlers.py index a408ec7..b7a0b9a 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -10,6 +10,7 @@ from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ipwhois import IpWhoisClient from wtfis.clients.passivetotal import PTClient +from wtfis.clients.shodan import ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient from wtfis.handlers.domain import DomainHandler @@ -25,8 +26,9 @@ def generate_domain_handler(max_resolutions=3): console=Console(), progress=MagicMock(), vt_client=VTClient("dummykey"), - ip_enricher_client=IpWhoisClient(), + ip_geoasn_client=IpWhoisClient(), whois_client=PTClient("dummyuser", "dummykey"), + shodan_client=ShodanClient("dummykey"), greynoise_client=GreynoiseClient("dummykey"), abuseipdb_client=AbuseIpDbClient("dummykey"), urlhaus_client=UrlHausClient(), @@ -40,8 +42,9 @@ def generate_ip_handler(): console=Console(), progress=MagicMock(), vt_client=VTClient("dummykey"), - ip_enricher_client=IpWhoisClient(), + ip_geoasn_client=IpWhoisClient(), whois_client=PTClient("dummyuser", "dummykey"), + shodan_client=ShodanClient("dummykey"), greynoise_client=GreynoiseClient("dummykey"), abuseipdb_client=AbuseIpDbClient("dummykey"), urlhaus_client=UrlHausClient(), @@ -70,16 +73,18 @@ def test_fetch_data_1(self, domain_handler, test_data): handler._fetch_vt_domain = MagicMock() handler._fetch_vt_resolutions = MagicMock() - handler._fetch_ip_enrichments = MagicMock() + handler._fetch_geoasn = MagicMock() handler._fetch_whois = MagicMock() + handler._fetch_shodan = MagicMock() handler._fetch_greynoise = MagicMock() handler._fetch_urlhaus = MagicMock() handler.fetch_data() handler._fetch_vt_domain.assert_called_once() handler._fetch_vt_resolutions.assert_called_once() - handler._fetch_ip_enrichments.assert_called_once() + handler._fetch_geoasn.assert_called_once() handler._fetch_whois.assert_called_once() + handler._fetch_shodan.assert_called_once() handler._fetch_greynoise.assert_called_once() handler._fetch_urlhaus.assert_called_once() @@ -88,21 +93,23 @@ def test_fetch_data_2(self, domain_handler): handler = domain_handler(0) handler._fetch_vt_domain = MagicMock() handler._fetch_vt_resolutions = MagicMock() - handler._fetch_ip_enrichments = MagicMock() + handler._fetch_geoasn = MagicMock() handler._fetch_whois = MagicMock() + handler._fetch_shodan = MagicMock() handler._fetch_greynoise = MagicMock() handler._fetch_urlhaus = MagicMock() handler.fetch_data() handler._fetch_vt_domain.assert_called_once() handler._fetch_vt_resolutions.assert_not_called() - handler._fetch_ip_enrichments.assert_not_called() + handler._fetch_geoasn.assert_not_called() handler._fetch_whois.assert_called_once() + handler._fetch_shodan.assert_not_called() handler._fetch_greynoise.assert_not_called() handler._fetch_urlhaus.assert_called_once() - assert handler.ip_enrich == IpWhoisMap.model_validate({}) - assert handler.ip_enrich.root == {} + assert handler.geoasn == IpWhoisMap.model_validate({}) + assert handler.geoasn.root == {} @patch.object(requests.Session, "get") def test_vt_http_error(self, mock_requests_get, domain_handler, capsys): @@ -173,7 +180,7 @@ def test_ipwhois_http_error(self, mock_requests_get, domain_handler, capsys, tes mock_resp.status_code = 500 mock_requests_get.return_value = mock_resp - handler._fetch_ip_enrichments(*["1.2.3.4", "1.2.3.5"]) + handler._fetch_geoasn(*["1.2.3.4", "1.2.3.5"]) assert handler.warnings[0].startswith("Could not fetch IPWhois: 500 Server Error:") handler.print_warnings() @@ -191,12 +198,12 @@ def test_ipwhois_validation_error(self, mock_requests_get, domain_handler, capsy mock_requests_get.return_value = mock_resp with pytest.raises(SystemExit) as e: - handler._fetch_ip_enrichments("1.2.3.4") + handler._fetch_geoasn("1.2.3.4") capture = capsys.readouterr() assert capture.err.startswith( - "Data model validation error: 16 validation errors for IpWhois\n" + "Data model validation error: 9 validation errors for IpWhois\n" ) assert e.type == SystemExit assert e.value.code == 1 @@ -374,16 +381,18 @@ def test_entity_refang(self, ip_handler): def test_fetch_data(self, ip_handler): handler = ip_handler() handler._fetch_vt_ip_address = MagicMock() - handler._fetch_ip_enrichments = MagicMock() + handler._fetch_geoasn = MagicMock() handler._fetch_whois = MagicMock() + handler._fetch_shodan = MagicMock() handler._fetch_greynoise = MagicMock() handler._fetch_urlhaus = MagicMock() handler._fetch_abuseipdb = MagicMock() handler.fetch_data() handler._fetch_vt_ip_address.assert_called_once() - handler._fetch_ip_enrichments.assert_called_once() + handler._fetch_geoasn.assert_called_once() handler._fetch_whois.assert_called_once() + handler._fetch_shodan.assert_called_once() handler._fetch_greynoise.assert_called_once() handler._fetch_urlhaus.assert_called_once() handler._fetch_abuseipdb.assert_called_once() @@ -438,7 +447,7 @@ def test_ipwhois_http_error(self, mock_requests_get, ip_handler, capsys): mock_resp.status_code = 502 mock_requests_get.return_value = mock_resp - handler._fetch_ip_enrichments("1.2.3.4") + handler._fetch_geoasn("1.2.3.4") assert handler.warnings[0].startswith("Could not fetch IPWhois: 502 Server Error:") handler.print_warnings() @@ -473,16 +482,35 @@ def test_ipwhois_validation_error(self, mock_requests_get, ip_handler, capsys): mock_requests_get.return_value = mock_resp with pytest.raises(SystemExit) as e: - handler._fetch_ip_enrichments("1.2.3.4") + handler._fetch_geoasn("1.2.3.4") capture = capsys.readouterr() assert capture.err.startswith( - "Data model validation error: 16 validation errors for IpWhois\n" + "Data model validation error: 9 validation errors for IpWhois\n" ) assert e.type == SystemExit assert e.value.code == 1 + @patch.object(requests.Session, "get") + def test_shodan_api_error(self, mock_requests_get, ip_handler, capsys): + """ + Test fail open behavior of Shodan on invalid API key + """ + handler = ip_handler() + mock_resp = requests.models.Response() + + mock_resp.status_code = 401 + mock_resp._content = b'' + mock_requests_get.return_value = mock_resp + + handler._fetch_shodan("1.2.3.4") + assert handler.warnings[0].startswith("Could not fetch Shodan: Invalid API key") + + handler.print_warnings() + capture = capsys.readouterr() + assert capture.out.startswith("WARN: Could not fetch Shodan: Invalid API key") + @patch.object(requests.Session, "get") def test_greynoise_http_error(self, mock_requests_get, ip_handler, capsys): """ diff --git a/tests/test_ui_domain_view.py b/tests/test_ui_domain_view.py index b082f17..6f15b16 100644 --- a/tests/test_ui_domain_view.py +++ b/tests/test_ui_domain_view.py @@ -17,6 +17,7 @@ from wtfis.models.ip2whois import Whois as Ip2Whois from wtfis.models.ipwhois import IpWhoisMap from wtfis.models.passivetotal import Whois as PTWhois +from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap from wtfis.models.virustotal import ( Domain, @@ -31,17 +32,18 @@ def view01(test_data, mock_ipwhois_get): """ gist.github.com with PT whois. Complete test of all panels. Also test print(). """ resolutions = Resolutions.model_validate(json.loads(test_data("vt_resolutions_gist.json"))) - ipwhois_pool = json.loads(test_data("ipwhois_gist.json")) - ipwhois_client = IpWhoisClient() - ipwhois_client._get_ipwhois = MagicMock(side_effect=lambda ip: mock_ipwhois_get(ip, ipwhois_pool)) - ip_enrich = ipwhois_client.enrich_ips(*resolutions.ip_list(3)) + geoasn_pool = json.loads(test_data("ipwhois_gist.json")) + geoasn_client = IpWhoisClient() + geoasn_client._get_ipwhois = MagicMock(side_effect=lambda ip: mock_ipwhois_get(ip, geoasn_pool)) + geoasn_enrich = geoasn_client.enrich_ips(*resolutions.ip_list(3)) return DomainView( console=Console(), entity=Domain.model_validate(json.loads(test_data("vt_domain_gist.json"))), resolutions=resolutions, + geoasn=geoasn_enrich, whois=PTWhois.model_validate(json.loads(test_data("pt_whois_gist.json"))), - ip_enrich=ip_enrich, + shodan=ShodanIpMap.model_validate({}), greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=UrlHausMap.model_validate({}), @@ -58,8 +60,9 @@ def view02(test_data): console=Console(), entity=MagicMock(), resolutions=Resolutions.model_validate(json.loads(test_data("vt_resolutions_gist.json"))), + geoasn=IpWhoisMap.model_validate({}), whois=VTWhois.model_validate(json.loads(test_data("vt_whois_gist.json"))), - ip_enrich=IpWhoisMap.model_validate({}), + shodan=ShodanIpMap.model_validate({}), greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), @@ -74,8 +77,9 @@ def view03(test_data): console=Console(), entity=MagicMock(), resolutions=MagicMock(), + geoasn=MagicMock(), whois=VTWhois.model_validate(json.loads(test_data("vt_whois_bbc.json"))), - ip_enrich=MagicMock(), + shodan=MagicMock(), greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), @@ -92,8 +96,9 @@ def view04(test_data): console=Console(), entity=Domain.model_validate(json.loads(test_data("vt_domain_google.json"))), resolutions=None, + geoasn=MagicMock(), whois=MagicMock(), - ip_enrich=MagicMock(), + shodan=MagicMock(), greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), @@ -107,8 +112,9 @@ def view05(test_data): console=Console(), entity=Domain.model_validate(json.loads(test_data("vt_domain_tucows.json"))), resolutions=MagicMock(), + geoasn=MagicMock(), whois=MagicMock(), - ip_enrich=MagicMock(), + shodan=MagicMock(), greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), @@ -122,8 +128,9 @@ def view06(test_data): console=Console(), entity=MagicMock(), resolutions=MagicMock(), + geoasn=MagicMock(), whois=VTWhois.model_validate(json.loads(test_data("vt_whois_example_2.json"))), - ip_enrich=MagicMock(), + shodan=MagicMock(), greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), @@ -131,21 +138,27 @@ def view06(test_data): @pytest.fixture() -def view07(test_data, mock_shodan_get_ip): - """ gist.github.com with Shodan. Only test resolution and IP enrich. """ +def view07(test_data, mock_ipwhois_get, mock_shodan_get_ip): + """ gist.github.com with Shodan. Only test resolution, geoasn and Shodan. """ resolutions = Resolutions.model_validate(json.loads(test_data("vt_resolutions_gist.json"))) + geoasn_pool = json.loads(test_data("ipwhois_gist.json")) + geoasn_client = IpWhoisClient() + geoasn_client._get_ipwhois = MagicMock(side_effect=lambda ip: mock_ipwhois_get(ip, geoasn_pool)) + geoasn_enrich = geoasn_client.enrich_ips(*resolutions.ip_list(3)) + shodan_pool = json.loads(test_data("shodan_gist.json")) shodan_client = ShodanClient(MagicMock()) shodan_client._get_ip = MagicMock(side_effect=lambda ip: mock_shodan_get_ip(ip, shodan_pool)) - ip_enrich = shodan_client.enrich_ips(*resolutions.ip_list(3)) + shodan_enrich = shodan_client.enrich_ips(*resolutions.ip_list(3)) return DomainView( console=Console(), entity=MagicMock(), resolutions=resolutions, + geoasn=geoasn_enrich, whois=MagicMock(), - ip_enrich=ip_enrich, + shodan=shodan_enrich, greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), @@ -154,20 +167,21 @@ def view07(test_data, mock_shodan_get_ip): @pytest.fixture() def view08(test_data, mock_shodan_get_ip): - """ www.wired.com with Shodan. Only test resolution and IP enrich. """ + """ www.wired.com with Shodan. Only test resolution and Shodan. """ resolutions = Resolutions.model_validate(json.loads(test_data("vt_resolutions_wired.json"))) shodan_pool = json.loads(test_data("shodan_wired.json")) shodan_client = ShodanClient(MagicMock()) shodan_client._get_ip = MagicMock(side_effect=lambda ip: mock_shodan_get_ip(ip, shodan_pool)) - ip_enrich = shodan_client.enrich_ips(*resolutions.ip_list(1)) + shodan_enrich = shodan_client.enrich_ips(*resolutions.ip_list(1)) return DomainView( console=Console(), entity=MagicMock(), resolutions=resolutions, + geoasn=IpWhoisMap.model_validate({}), whois=MagicMock(), - ip_enrich=ip_enrich, + shodan=shodan_enrich, greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), @@ -177,13 +191,13 @@ def view08(test_data, mock_shodan_get_ip): @pytest.fixture() def view09(test_data, mock_shodan_get_ip, mock_greynoise_get, mock_abuseipdb_get): - """ one.one.one.one with Shodan, Greynoise and AbuseIPDB. Only test resolution and IP enrich. """ + """ one.one.one.one with Shodan, Greynoise and AbuseIPDB. Only test mentioned services. """ resolutions = Resolutions.model_validate(json.loads(test_data("vt_resolutions_one.json"))) shodan_pool = json.loads(test_data("shodan_one.json")) shodan_client = ShodanClient(MagicMock()) shodan_client._get_ip = MagicMock(side_effect=lambda ip: mock_shodan_get_ip(ip, shodan_pool)) - ip_enrich = shodan_client.enrich_ips(*resolutions.ip_list(1)) + shodan_enrich = shodan_client.enrich_ips(*resolutions.ip_list(1)) greynoise_pool = json.loads(test_data("greynoise_one.json")) greynoise_client = GreynoiseClient("dummykey") @@ -199,8 +213,9 @@ def view09(test_data, mock_shodan_get_ip, mock_greynoise_get, mock_abuseipdb_get console=Console(), entity=MagicMock(), resolutions=resolutions, + geoasn=MagicMock(), whois=MagicMock(), - ip_enrich=ip_enrich, + shodan=shodan_enrich, greynoise=greynoise_enrich, abuseipdb=abuseipdb_enrich, urlhaus=MagicMock(), @@ -215,8 +230,9 @@ def view10(test_data): console=Console(), entity=MagicMock(), resolutions=MagicMock(), + geoasn=MagicMock(), whois=VTWhois.model_validate(json.loads(test_data("vt_whois_foo.json"))), - ip_enrich=MagicMock(), + shodan=MagicMock(), greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), @@ -225,20 +241,21 @@ def view10(test_data): @pytest.fixture() def view11(test_data, mock_shodan_get_ip): - """ gist.github.com with Shodan. Only test IP enrich. Test empty open ports. """ + """ gist.github.com with Shodan. Only test Shodan. Test empty open ports. """ resolutions = Resolutions.model_validate(json.loads(test_data("vt_resolutions_gist.json"))) shodan_pool = json.loads(test_data("shodan_gist_2.json")) shodan_client = ShodanClient(MagicMock()) shodan_client._get_ip = MagicMock(side_effect=lambda ip: mock_shodan_get_ip(ip, shodan_pool)) - ip_enrich = shodan_client.enrich_ips(*resolutions.ip_list(3)) + shodan_enrich = shodan_client.enrich_ips(*resolutions.ip_list(3)) return DomainView( console=Console(), entity=MagicMock(), resolutions=resolutions, + geoasn=MagicMock(), whois=MagicMock(), - ip_enrich=ip_enrich, + shodan=shodan_enrich, greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=MagicMock(), @@ -252,8 +269,9 @@ def view12(test_data): console=Console(), entity=MagicMock(), resolutions=MagicMock(), + geoasn=MagicMock(), whois=Ip2Whois.model_validate(json.loads(test_data("ip2whois_whois_hotmail.json"))), - ip_enrich=MagicMock(), + shodan=MagicMock(), greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), @@ -267,8 +285,9 @@ def view13(test_data): console=Console(), entity=MagicMock(), resolutions=MagicMock(), + geoasn=MagicMock(), whois=Ip2Whois.model_validate(json.loads(test_data("ip2whois_whois_bbc.json"))), - ip_enrich=MagicMock(), + shodan=MagicMock(), greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), @@ -280,10 +299,10 @@ def view14(test_data, mock_ipwhois_get, mock_urlhaus_get): """ Same as view01() but with Urlhaus enrichment. Test URLhaus only. """ resolutions = Resolutions.model_validate(json.loads(test_data("vt_resolutions_gist.json"))) - ipwhois_pool = json.loads(test_data("ipwhois_gist.json")) - ipwhois_client = IpWhoisClient() - ipwhois_client._get_ipwhois = MagicMock(side_effect=lambda ip: mock_ipwhois_get(ip, ipwhois_pool)) - ip_enrich = ipwhois_client.enrich_ips(*resolutions.ip_list(3)) + geoasn_pool = json.loads(test_data("ipwhois_gist.json")) + geoasn_client = IpWhoisClient() + geoasn_client._get_ipwhois = MagicMock(side_effect=lambda ip: mock_ipwhois_get(ip, geoasn_pool)) + geoasn_enrich = geoasn_client.enrich_ips(*resolutions.ip_list(3)) urlhaus_pool = json.loads(test_data("urlhaus_gist.json")) urlhaus_client = UrlHausClient() @@ -294,8 +313,9 @@ def view14(test_data, mock_ipwhois_get, mock_urlhaus_get): console=Console(), entity=Domain.model_validate(json.loads(test_data("vt_domain_gist.json"))), resolutions=resolutions, + geoasn=geoasn_enrich, whois=PTWhois.model_validate(json.loads(test_data("pt_whois_gist.json"))), - ip_enrich=ip_enrich, + shodan=ShodanIpMap.model_validate({}), greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=AbuseIpDbMap.model_validate({}), urlhaus=urlhaus_enrich, @@ -952,23 +972,20 @@ def test_resolutions_panel(self, view07, theme, display_timestamp): spans=[Span(0, 8, "link https://www.shodan.io/host/13.234.210.38")] ), "Tags:", - "Last Scan:", ] assert group[1].columns[1].style == theme.table_value assert group[1].columns[1].justify == "left" assert group[1].columns[1]._cells == [ Text("0/94 malicious", spans=[Span(0, 14, theme.info)]), display_timestamp("2022-08-06T14:56:20Z"), - Text( - "16509 (Amazon Data Services India)", - spans=[Span(7, 33, theme.asn_org)] - ), + Text("16509 (Amazon Data Services India)", spans=[Span(7, 33, "bright_white")]), "Amazon.com, Inc.", Text( - "Mumbai, India", + "Mumbai, Maharashtra, India", spans=[ Span(6, 8, "default"), - ] + Span(19, 21, "default") + ], ), Text( "22/tcp, 80/tcp, 443/tcp", @@ -989,7 +1006,6 @@ def test_resolutions_panel(self, view07, theme, display_timestamp): Span(0, 5, 'bright_white on black') ] ), - display_timestamp("2022-08-21T07:21:05Z") ] # Spacing @@ -1020,23 +1036,20 @@ def test_resolutions_panel(self, view07, theme, display_timestamp): "Services:", spans=[Span(0, 8, "link https://www.shodan.io/host/192.30.255.113")] ), - "Last Scan:", ] assert group[1].columns[1].style == theme.table_value assert group[1].columns[1].justify == "left" assert group[1].columns[1]._cells == [ Text("1/94 malicious", spans=[Span(0, 14, theme.error)]), display_timestamp("2022-06-21T18:10:54Z"), - Text( - "36459 (GitHub, Inc.)", - spans=[Span(7, 19, theme.asn_org)] - ), + Text("36459 (GitHub, Inc.)", spans=[Span(7, 19, "bright_white")]), "GitHub, Inc.", Text( - "Seattle, United States", + "Seattle, Washington, United States", spans=[ Span(7, 9, "default"), - ] + Span(19, 21, "default") + ], ), Text( "22/tcp, 80/tcp, 443/tcp", @@ -1051,7 +1064,6 @@ def test_resolutions_panel(self, view07, theme, display_timestamp): Span(19, 23, theme.transport) ] ), - display_timestamp("2022-08-21T22:33:53Z") ] # Spacing @@ -1086,23 +1098,20 @@ def test_resolutions_panel(self, view07, theme, display_timestamp): spans=[Span(0, 8, "link https://www.shodan.io/host/13.234.176.102")] ), "Tags:", - "Last Scan:", ] assert table.columns[1].style == theme.table_value assert table.columns[1].justify == "left" assert table.columns[1]._cells == [ Text("0/94 malicious", spans=[Span(0, 14, theme.info)]), display_timestamp("2015-08-17T07:11:53Z"), - Text( - "16509 (Amazon Data Services India)", - spans=[Span(7, 33, theme.asn_org)] - ), + Text("16509 (Amazon Data Services India)", spans=[Span(7, 33, "bright_white")]), "Amazon.com, Inc.", Text( - "Mumbai, India", + "Mumbai, Maharashtra, India", spans=[ Span(6, 8, "default"), - ] + Span(19, 21, "default") + ], ), Text( "22/tcp, 80/tcp, 443/tcp", @@ -1123,7 +1132,6 @@ def test_resolutions_panel(self, view07, theme, display_timestamp): Span(0, 5, 'bright_white on black') ] ), - display_timestamp("2022-08-21T02:13:35Z") ] # Old timestamp warning @@ -1167,32 +1175,17 @@ def test_resolutions_panel(self, view08, theme, display_timestamp): spans=[Span(0, 8, 'link https://virustotal.com/gui/ip-address/199.232.34.194')], ), "Resolved:", - "ASN:", - "ISP:", - "Location:", Text( "Services:", spans=[Span(0, 8, "link https://www.shodan.io/host/199.232.34.194")] ), "Tags:", - "Last Scan:", ] assert group[1].columns[1].style == theme.table_value assert group[1].columns[1].justify == "left" assert group[1].columns[1]._cells == [ Text("0/93 malicious", spans=[Span(0, 14, theme.info)]), display_timestamp("2022-06-03T22:32:19Z"), - Text( - "54113 (Fastly, Inc.)", - spans=[Span(7, 19, theme.asn_org)] - ), - "Fastly, Inc.", - Text( - "Atlanta, United States", - spans=[ - Span(7, 9, "default"), - ] - ), Text( "Varnish HTTP Cache (80/tcp)\nOther (443/tcp)", spans=[ @@ -1210,7 +1203,6 @@ def test_resolutions_panel(self, view08, theme, display_timestamp): Span(0, 3, 'bright_white on black'), ] ), - display_timestamp("2022-08-21T01:33:13Z") ] # Spacing @@ -1255,14 +1247,10 @@ def test_resolutions_panel(self, view09, theme, display_timestamp): spans=[Span(0, 8, 'link https://virustotal.com/gui/ip-address/1.0.0.1')], ), "Resolved:", - "ASN:", - "ISP:", - "Location:", Text( "Services:", spans=[Span(0, 8, "link https://www.shodan.io/host/1.0.0.1")] ), - "Last Scan:", Text( "GreyNoise:", spans=[Span(0, 9, "link https://viz.greynoise.io/riot/1.0.0.1")] @@ -1277,17 +1265,6 @@ def test_resolutions_panel(self, view09, theme, display_timestamp): assert table.columns[1]._cells == [ Text("2/94 malicious", spans=[Span(0, 14, theme.error)]), display_timestamp("2020-08-01T22:07:20Z"), - Text( - "13335 (APNIC and Cloudflare DNS Resolver project)", - spans=[Span(7, 48, theme.asn_org)], - ), - "Cloudflare, Inc.", - Text( - "Los Angeles, United States", - spans=[ - Span(11, 13, "default"), - ] - ), Text( ( "CloudFlare (80/tcp, 8080/tcp)\nOther (53/tcp, 53/udp, 443/tcp, 2082/tcp, " @@ -1323,7 +1300,6 @@ def test_resolutions_panel(self, view09, theme, display_timestamp): Span(96, 100, theme.transport), ] ), - display_timestamp("2022-08-22T02:35:34Z"), Text( "✓ riot ✗ noise ✓ benign", spans=[ @@ -1406,35 +1382,19 @@ def test_resolutions_panel(self, view11, theme, display_timestamp): spans=[Span(0, 8, 'link https://virustotal.com/gui/ip-address/13.234.210.38')], ), "Resolved:", - "ASN:", - "ISP:", - "Location:", "Tags:", - "Last Scan:", ] assert group[1].columns[1].style == theme.table_value assert group[1].columns[1].justify == "left" assert group[1].columns[1]._cells == [ Text("0/94 malicious", spans=[Span(0, 14, theme.info)]), display_timestamp("2022-08-06T14:56:20Z"), - Text( - "16509 (Amazon Data Services India)", - spans=[Span(7, 33, theme.asn_org)] - ), - "Amazon.com, Inc.", - Text( - "Mumbai, India", - spans=[ - Span(6, 8, "default"), - ] - ), Text( "cloud", spans=[ Span(0, 5, theme.tags) ] ), - display_timestamp("2022-08-21T07:21:05Z") ] diff --git a/tests/test_ui_ip_view.py b/tests/test_ui_ip_view.py index 7770c95..bd74e39 100644 --- a/tests/test_ui_ip_view.py +++ b/tests/test_ui_ip_view.py @@ -24,25 +24,30 @@ @pytest.fixture() -def view01(test_data, mock_abuseipdb_get, mock_ipwhois_get, mock_greynoise_get, mock_urlhaus_get): +def view01(test_data, mock_abuseipdb_get, mock_ipwhois_get, mock_shodan_get_ip, mock_greynoise_get, mock_urlhaus_get): """ 1.1.1.1 with PT whois. Complete test of all panels. Also test print(). """ ip = "1.1.1.1" - abuseipdb_pool = json.loads(test_data("abuseipdb_1.1.1.1_red.json")) - abuseipdb_client = AbuseIpDbClient("dummykey") - abuseipdb_client._get_ip = MagicMock(side_effect=lambda ip: mock_abuseipdb_get(ip, abuseipdb_pool)) - abuseipdb_enrich = abuseipdb_client.enrich_ips(ip) + geoasn_pool = json.loads(test_data("ipwhois_1.1.1.1.json")) + geoasn_client = IpWhoisClient() + geoasn_client._get_ipwhois = MagicMock(side_effect=lambda ip: mock_ipwhois_get(ip, geoasn_pool)) + geoasn_enrich = geoasn_client.enrich_ips(ip) - ipwhois_pool = json.loads(test_data("ipwhois_1.1.1.1.json")) - ipwhois_client = IpWhoisClient() - ipwhois_client._get_ipwhois = MagicMock(side_effect=lambda ip: mock_ipwhois_get(ip, ipwhois_pool)) - ip_enrich = ipwhois_client.enrich_ips(ip) + shodan_pool = json.loads(test_data("shodan_1.1.1.1.json")) + shodan_client = ShodanClient(MagicMock()) + shodan_client._get_ip = MagicMock(side_effect=lambda ip: mock_shodan_get_ip(ip, shodan_pool)) + shodan_enrich = shodan_client.enrich_ips(ip) greynoise_pool = json.loads(test_data("greynoise_1.1.1.1.json")) greynoise_client = GreynoiseClient("dummykey") greynoise_client._get_ip = MagicMock(side_effect=lambda ip: mock_greynoise_get(ip, greynoise_pool)) greynoise_enrich = greynoise_client.enrich_ips(ip) + abuseipdb_pool = json.loads(test_data("abuseipdb_1.1.1.1_red.json")) + abuseipdb_client = AbuseIpDbClient("dummykey") + abuseipdb_client._get_ip = MagicMock(side_effect=lambda ip: mock_abuseipdb_get(ip, abuseipdb_pool)) + abuseipdb_enrich = abuseipdb_client.enrich_ips(ip) + urlhaus_pool = json.loads(test_data("urlhaus_1.1.1.1.json")) urlhaus_client = UrlHausClient() urlhaus_client._get_host = MagicMock(side_effect=lambda ip: mock_urlhaus_get(ip, urlhaus_pool)) @@ -51,8 +56,9 @@ def view01(test_data, mock_abuseipdb_get, mock_ipwhois_get, mock_greynoise_get, return IpAddressView( console=Console(), entity=IpAddress.model_validate(json.loads(test_data("vt_ip_1.1.1.1.json"))), + geoasn=geoasn_enrich, whois=PTWhois.model_validate(json.loads(test_data("pt_whois_1.1.1.1.json"))), - ip_enrich=ip_enrich, + shodan=shodan_enrich, greynoise=greynoise_enrich, abuseipdb=abuseipdb_enrich, urlhaus=urlhaus_enrich, @@ -60,13 +66,18 @@ def view01(test_data, mock_abuseipdb_get, mock_ipwhois_get, mock_greynoise_get, @pytest.fixture() -def view02(test_data, mock_shodan_get_ip, mock_greynoise_get): +def view02(test_data, mock_ipwhois_get, mock_shodan_get_ip, mock_greynoise_get): """ 1.1.1.1 with Shodan and Greynoise. Test the whole IP panel. """ ip = "1.1.1.1" + geoasn_pool = json.loads(test_data("ipwhois_1.1.1.1.json")) + geoasn_client = IpWhoisClient() + geoasn_client._get_ipwhois = MagicMock(side_effect=lambda ip: mock_ipwhois_get(ip, geoasn_pool)) + geoasn_enrich = geoasn_client.enrich_ips(ip) + shodan_pool = json.loads(test_data("shodan_1.1.1.1.json")) shodan_client = ShodanClient(MagicMock()) shodan_client._get_ip = MagicMock(side_effect=lambda ip: mock_shodan_get_ip(ip, shodan_pool)) - ip_enrich = shodan_client.enrich_ips(ip) + shodan_enrich = shodan_client.enrich_ips(ip) greynoise_pool = json.loads(test_data("greynoise_1.1.1.1.json")) greynoise_client = GreynoiseClient("dummykey") @@ -76,8 +87,9 @@ def view02(test_data, mock_shodan_get_ip, mock_greynoise_get): return IpAddressView( console=Console(), entity=IpAddress.model_validate(json.loads(test_data("vt_ip_1.1.1.1.json"))), + geoasn=geoasn_enrich, whois=MagicMock(), - ip_enrich=ip_enrich, + shodan=shodan_enrich, greynoise=greynoise_enrich, abuseipdb=MagicMock(), urlhaus=UrlHausMap.model_validate({}), @@ -90,8 +102,9 @@ def view03(test_data): return IpAddressView( console=Console(), entity=MagicMock(), + geoasn=MagicMock(), whois=VTWhois.model_validate(json.loads(test_data("vt_whois_1.1.1.1.json"))), - ip_enrich=MagicMock(), + shodan=MagicMock(), greynoise=MagicMock(), abuseipdb=MagicMock(), urlhaus=MagicMock(), @@ -107,8 +120,9 @@ def view04(test_data): return IpAddressView( console=Console(), entity=IpAddress.model_validate(json.loads(test_data("vt_ip_142.251.220.110.json"))), + geoasn=IpWhoisMap.model_validate({}), whois=MagicMock(), - ip_enrich=IpWhoisMap.model_validate({}), + shodan=MagicMock(), greynoise=GreynoiseIpMap.model_validate({}), abuseipdb=MagicMock(), urlhaus=UrlHausMap.model_validate({}), @@ -127,8 +141,9 @@ def view05(test_data, mock_greynoise_get): return IpAddressView( console=Console(), entity=IpAddress.model_validate(json.loads(test_data("vt_ip_1.1.1.1.json"))), + geoasn=IpWhoisMap.model_validate({}), whois=MagicMock(), - ip_enrich=IpWhoisMap.model_validate({}), + shodan=MagicMock(), greynoise=greynoise_enrich, abuseipdb=MagicMock(), urlhaus=UrlHausMap.model_validate({}), @@ -147,8 +162,9 @@ def view06(test_data, mock_greynoise_get): return IpAddressView( console=Console(), entity=IpAddress.model_validate(json.loads(test_data("vt_ip_1.1.1.1.json"))), + geoasn=IpWhoisMap.model_validate({}), whois=MagicMock(), - ip_enrich=IpWhoisMap.model_validate({}), + shodan=MagicMock(), greynoise=greynoise_enrich, abuseipdb=MagicMock(), urlhaus=MagicMock(), @@ -167,8 +183,9 @@ def view07(test_data, mock_abuseipdb_get): return IpAddressView( console=Console(), entity=IpAddress.model_validate(json.loads(test_data("vt_ip_1.1.1.1.json"))), + geoasn=IpWhoisMap.model_validate({}), whois=MagicMock(), - ip_enrich=IpWhoisMap.model_validate({}), + shodan=MagicMock(), greynoise=MagicMock(), abuseipdb=abuseipdb_enrich, urlhaus=MagicMock(), @@ -187,8 +204,9 @@ def view08(test_data, mock_abuseipdb_get): return IpAddressView( console=Console(), entity=IpAddress.model_validate(json.loads(test_data("vt_ip_1.1.1.1.json"))), + geoasn=IpWhoisMap.model_validate({}), whois=MagicMock(), - ip_enrich=IpWhoisMap.model_validate({}), + shodan=MagicMock(), greynoise=MagicMock(), abuseipdb=abuseipdb_enrich, urlhaus=MagicMock(), @@ -204,9 +222,10 @@ def test_ip_panel(self, view01, theme, display_timestamp): # Sections vt_section = ip.renderable.renderables[0] - enrich_section = ip.renderable.renderables[2] - urlhaus_section = ip.renderable.renderables[4] - other_section = ip.renderable.renderables[6] + geoasn_section = ip.renderable.renderables[2] + shodan_section = ip.renderable.renderables[4] + urlhaus_section = ip.renderable.renderables[6] + other_section = ip.renderable.renderables[8] # Line breaks between sections assert ip.renderable.renderables[1] == "" @@ -255,15 +274,15 @@ def test_ip_panel(self, view01, theme, display_timestamp): ] # - # IP enrich section + # IP location and ASN section # # Heading - assert enrich_section.renderables[0] == Text("IPwhois") - assert enrich_section.renderables[0].style == theme.heading_h1 + assert geoasn_section.renderables[0] == Text("IPWhois") + assert geoasn_section.renderables[0].style == theme.heading_h1 # Table - table = enrich_section.renderables[1] + table = geoasn_section.renderables[1] assert type(table) is Table assert table.columns[0].style == theme.table_field assert table.columns[0].justify == "left" @@ -283,12 +302,86 @@ def test_ip_panel(self, view01, theme, display_timestamp): Text( "Sydney, New South Wales, Australia", spans=[ - Span(6, 8, 'default'), - Span(23, 25, 'default'), + Span(6, 8, "default"), + Span(23, 25, "default"), ] ), ] + # + # Shodan section + # + + # Heading + assert shodan_section.renderables[0] == Text("Shodan") + assert shodan_section.renderables[0].style == theme.heading_h1 + + # Table + table = shodan_section.renderables[1] + assert type(table) is Table + assert table.columns[0].style == theme.table_field + assert table.columns[0].justify == "left" + assert table.columns[0]._cells == [ + Text( + "Services:", + spans=[Span(0, 8, 'link https://www.shodan.io/host/1.1.1.1')], + ), + "Last Scan:", + ] + assert table.columns[1].style == theme.table_value + assert table.columns[1].justify == "left" + assert table.columns[1]._cells == [ + Text( + ( + "Cisco router tftpd (69/udp)\nCloudFlare (80/tcp, 8080/tcp, 8880/tcp)\n" + "DrayTek Vigor Router (443/tcp)\nOther (53/tcp, 53/udp, 161/udp, " + "2082/tcp, 2083/tcp, 2086/tcp, 2087/tcp, 8443/tcp)" + ), + spans=[ + Span(0, 18, theme.product), + Span(20, 22, theme.port), + Span(22, 26, theme.transport), + Span(28, 38, theme.product), + Span(40, 42, theme.port), + Span(42, 46, theme.transport), + Span(46, 48, "default"), + Span(48, 52, theme.port), + Span(52, 56, theme.transport), + Span(56, 58, "default"), + Span(58, 62, theme.port), + Span(62, 66, theme.transport), + Span(68, 88, theme.product), + Span(90, 93, theme.port), + Span(93, 97, theme.transport), + Span(99, 104, theme.product), + Span(106, 108, theme.port), + Span(108, 112, theme.transport), + Span(112, 114, "default"), + Span(114, 116, theme.port), + Span(116, 120, theme.transport), + Span(120, 122, "default"), + Span(122, 125, theme.port), + Span(125, 129, theme.transport), + Span(129, 131, "default"), + Span(131, 135, theme.port), + Span(135, 139, theme.transport), + Span(139, 141, "default"), + Span(141, 145, theme.port), + Span(145, 149, theme.transport), + Span(149, 151, "default"), + Span(151, 155, theme.port), + Span(155, 159, theme.transport), + Span(159, 161, "default"), + Span(161, 165, theme.port), + Span(165, 169, theme.transport), + Span(169, 171, "default"), + Span(171, 175, theme.port), + Span(175, 179, theme.transport), + ] + ), + display_timestamp("2022-09-04T01:03:56Z"), + ] + # # Urlhaus section # @@ -453,8 +546,9 @@ def test_ip_panel(self, view02, theme, display_timestamp): # Sections vt_section = ip.renderable.renderables[0] - enrich_section = ip.renderable.renderables[2] - other_section = ip.renderable.renderables[4] + geoasn_section = ip.renderable.renderables[2] + shodan_section = ip.renderable.renderables[4] + other_section = ip.renderable.renderables[6] # Line breaks between sections assert ip.renderable.renderables[1] == "" @@ -502,15 +596,15 @@ def test_ip_panel(self, view02, theme, display_timestamp): ] # - # IP enrich section + # IP location and ASN section # # Heading - assert enrich_section.renderables[0] == Text("Shodan") - assert enrich_section.renderables[0].style == theme.heading_h1 + assert geoasn_section.renderables[0] == Text("IPWhois") + assert geoasn_section.renderables[0].style == theme.heading_h1 # Table - table = enrich_section.renderables[1] + table = geoasn_section.renderables[1] assert type(table) is Table assert table.columns[0].style == theme.table_field assert table.columns[0].justify == "left" @@ -518,11 +612,6 @@ def test_ip_panel(self, view02, theme, display_timestamp): "ASN:", "ISP:", "Location:", - Text( - "Services:", - spans=[Span(0, 8, 'link https://www.shodan.io/host/1.1.1.1')], - ), - "Last Scan:", ] assert table.columns[1].style == theme.table_value assert table.columns[1].justify == "left" @@ -533,9 +622,37 @@ def test_ip_panel(self, view02, theme, display_timestamp): ), "Cloudflare, Inc.", Text( - "Los Angeles, United States", - spans=[Span(11, 13, "default")] + "Sydney, New South Wales, Australia", + spans=[ + Span(6, 8, "default"), + Span(23, 25, "default") + ], + ), + ] + + # + # Shodan section + # + + # Heading + assert shodan_section.renderables[0] == Text("Shodan") + assert shodan_section.renderables[0].style == theme.heading_h1 + + # Table + table = shodan_section.renderables[1] + assert type(table) is Table + assert table.columns[0].style == theme.table_field + assert table.columns[0].justify == "left" + assert table.columns[0]._cells == [ + Text( + "Services:", + spans=[Span(0, 8, 'link https://www.shodan.io/host/1.1.1.1')], ), + "Last Scan:", + ] + assert table.columns[1].style == theme.table_value + assert table.columns[1].justify == "left" + assert table.columns[1]._cells == [ Text( ( "Cisco router tftpd (69/udp)\nCloudFlare (80/tcp, 8080/tcp, 8880/tcp)\n" diff --git a/wtfis/clients/base.py b/wtfis/clients/base.py index b80e127..640a97a 100644 --- a/wtfis/clients/base.py +++ b/wtfis/clients/base.py @@ -4,8 +4,8 @@ from typing import Optional, Union -from wtfis.models.common import WhoisBase -from wtfis.types import DomainEnrichmentType, IpEnrichmentType +from wtfis.models.base import WhoisBase +from wtfis.models.types import DomainEnrichmentType, IpEnrichmentType class AbstractAttribute: diff --git a/wtfis/clients/ipwhois.py b/wtfis/clients/ipwhois.py index 26bfc6b..7a13115 100644 --- a/wtfis/clients/ipwhois.py +++ b/wtfis/clients/ipwhois.py @@ -1,5 +1,6 @@ from wtfis.clients.base import BaseIpEnricherClient, BaseRequestsClient -from wtfis.models.ipwhois import IpWhois, IpWhoisMap +from wtfis.models.ipwhois import IpWhoisMap +from wtfis.models.ipwhois import IpWhois from typing import Optional @@ -19,9 +20,9 @@ def _get_ipwhois(self, ip: str) -> Optional[IpWhois]: return IpWhois.model_validate(result) if result.get("success") is True else None def enrich_ips(self, *ips: str) -> IpWhoisMap: - ipwhois_map = {} + map_ = {} for ip in ips: ipwhois = self._get_ipwhois(ip) if ipwhois: - ipwhois_map[ipwhois.ip] = ipwhois - return IpWhoisMap.model_validate(ipwhois_map) + map_[ipwhois.ip] = ipwhois + return IpWhoisMap.model_validate(map_) diff --git a/wtfis/clients/shodan.py b/wtfis/clients/shodan.py index 4f784e9..2573c37 100644 --- a/wtfis/clients/shodan.py +++ b/wtfis/clients/shodan.py @@ -1,5 +1,4 @@ from shodan import Shodan -from shodan.exception import APIError from typing import Optional from wtfis.clients.base import BaseClient, BaseIpEnricherClient @@ -18,13 +17,7 @@ def name(self) -> str: return "Shodan" def _get_ip(self, ip: str) -> Optional[ShodanIp]: - try: - return ShodanIp.model_validate(self.s.host(ip, minify=False)) - except APIError as e: - if str(e) == "Invalid API key": - raise APIError("Invalid Shodan API key") - else: - raise + return ShodanIp.model_validate(self.s.host(ip, minify=False)) def enrich_ips(self, *ips: str) -> ShodanIpMap: shodan_map = {} diff --git a/wtfis/clients/types.py b/wtfis/clients/types.py new file mode 100644 index 0000000..9a72a3d --- /dev/null +++ b/wtfis/clients/types.py @@ -0,0 +1,22 @@ +""" +Type aliases +""" +from typing import Union + +from wtfis.clients.ip2whois import Ip2WhoisClient +from wtfis.clients.ipwhois import IpWhoisClient +from wtfis.clients.passivetotal import PTClient +from wtfis.clients.virustotal import VTClient + + +# IP geolocation and ASN client types +IpGeoAsnClientType = Union[ + IpWhoisClient, +] + +# IP whois client types +IpWhoisClientType = Union[ + Ip2WhoisClient, + PTClient, + VTClient, +] diff --git a/wtfis/handlers/base.py b/wtfis/handlers/base.py index c962800..f79765e 100644 --- a/wtfis/handlers/base.py +++ b/wtfis/handlers/base.py @@ -15,19 +15,18 @@ from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient -from wtfis.clients.ip2whois import Ip2WhoisClient -from wtfis.clients.ipwhois import IpWhoisClient -from wtfis.clients.passivetotal import PTClient from wtfis.clients.shodan import ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient from wtfis.models.abuseipdb import AbuseIpDbMap -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase from wtfis.models.greynoise import GreynoiseIpMap from wtfis.models.ipwhois import IpWhoisMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap +from wtfis.models.types import IpGeoAsnMapType from wtfis.models.virustotal import Domain, IpAddress +from wtfis.clients.types import IpGeoAsnClientType, IpWhoisClientType from wtfis.ui.theme import Theme from wtfis.utils import error_and_exit, refang @@ -69,8 +68,9 @@ def __init__( console: Console, progress: Progress, vt_client: VTClient, - ip_enricher_client: Union[IpWhoisClient, ShodanClient], - whois_client: Union[Ip2WhoisClient, PTClient, VTClient], + ip_geoasn_client: IpGeoAsnClientType, + whois_client: IpWhoisClientType, + shodan_client: Optional[ShodanClient], greynoise_client: Optional[GreynoiseClient], abuseipdb_client: Optional[AbuseIpDbClient], urlhaus_client: Optional[UrlHausClient], @@ -82,16 +82,18 @@ def __init__( # Clients self._vt = vt_client - self._enricher = ip_enricher_client + self._geoasn = ip_geoasn_client self._whois = whois_client + self._shodan = shodan_client self._greynoise = greynoise_client self._abuseipdb = abuseipdb_client self._urlhaus = urlhaus_client # Dataset containers self.vt_info: Union[Domain, IpAddress] - self.ip_enrich: Union[IpWhoisMap, ShodanIpMap] = IpWhoisMap.empty() + self.geoasn: IpGeoAsnMapType = IpWhoisMap.empty() # Default to ipwhois self.whois: WhoisBase + self.shodan: ShodanIpMap = ShodanIpMap.empty() self.greynoise: GreynoiseIpMap = GreynoiseIpMap.empty() self.abuseipdb: AbuseIpDbMap = AbuseIpDbMap.empty() self.urlhaus: UrlHausMap = UrlHausMap.empty() @@ -105,9 +107,15 @@ def fetch_data(self) -> None: return NotImplemented # type: ignore # pragma: no coverage @common_exception_handler - @failopen_exception_handler("_enricher") - def _fetch_ip_enrichments(self, *ips: str) -> None: - self.ip_enrich = self._enricher.enrich_ips(*ips) + @failopen_exception_handler("_geoasn") + def _fetch_geoasn(self, *ips: str) -> None: + self.geoasn = self._geoasn.enrich_ips(*ips) + + @common_exception_handler + @failopen_exception_handler("_shodan") + def _fetch_shodan(self, *ips: str) -> None: + if self._shodan: + self.shodan = self._shodan.enrich_ips(*ips) @common_exception_handler @failopen_exception_handler("_greynoise") diff --git a/wtfis/handlers/domain.py b/wtfis/handlers/domain.py index 5f8db3c..11431ed 100644 --- a/wtfis/handlers/domain.py +++ b/wtfis/handlers/domain.py @@ -1,7 +1,7 @@ """ Logic handler for domain and hostname inputs """ -from typing import Optional, Union +from typing import Optional from requests.exceptions import HTTPError from rich.console import Console @@ -9,9 +9,6 @@ from wtfis.clients.abuseipdb import AbuseIpDbClient from wtfis.clients.greynoise import GreynoiseClient -from wtfis.clients.ip2whois import Ip2WhoisClient -from wtfis.clients.ipwhois import IpWhoisClient -from wtfis.clients.passivetotal import PTClient from wtfis.clients.shodan import ShodanClient from wtfis.clients.urlhaus import UrlHausClient from wtfis.clients.virustotal import VTClient @@ -23,6 +20,7 @@ ) from wtfis.models.virustotal import Resolutions +from wtfis.clients.types import IpGeoAsnClientType, IpWhoisClientType class DomainHandler(BaseHandler): @@ -32,15 +30,16 @@ def __init__( console: Console, progress: Progress, vt_client: VTClient, - ip_enricher_client: Union[IpWhoisClient, ShodanClient], - whois_client: Union[Ip2WhoisClient, PTClient, VTClient], + ip_geoasn_client: IpGeoAsnClientType, + whois_client: IpWhoisClientType, + shodan_client: Optional[ShodanClient], greynoise_client: Optional[GreynoiseClient], abuseipdb_client: Optional[AbuseIpDbClient], urlhaus_client: Optional[UrlHausClient], max_resolutions: int = 0, ): - super().__init__(entity, console, progress, vt_client, ip_enricher_client, - whois_client, greynoise_client, abuseipdb_client, urlhaus_client) + super().__init__(entity, console, progress, vt_client, ip_geoasn_client, whois_client, + shodan_client, greynoise_client, abuseipdb_client, urlhaus_client) # Extended attributes self.max_resolutions = max_resolutions @@ -77,25 +76,31 @@ def fetch_data(self): self.progress.update(task_v, completed=100) if self.resolutions and self.resolutions.data: - task_r = self.progress.add_task(f"Fetching IP enrichments from {self._enricher.name}") - self.progress.update(task_r, advance=50) - self._fetch_ip_enrichments(*self.resolutions.ip_list(self.max_resolutions)) - self.progress.update(task_r, completed=100) + task_g = self.progress.add_task(f"Fetching IP location and ASN from {self._geoasn.name}") + self.progress.update(task_g, advance=50) + self._fetch_geoasn(*self.resolutions.ip_list(self.max_resolutions)) + self.progress.update(task_g, completed=100) + + if self._shodan: + task_s = self.progress.add_task(f"Fetching IP data from {self._shodan.name}") + self.progress.update(task_s, advance=50) + self._fetch_shodan(*self.resolutions.ip_list(self.max_resolutions)) + self.progress.update(task_s, completed=100) if self._greynoise: - task_g = self.progress.add_task(f"Fetching IP enrichments from {self._greynoise.name}") + task_g = self.progress.add_task(f"Fetching IP data from {self._greynoise.name}") self.progress.update(task_g, advance=50) self._fetch_greynoise(*self.resolutions.ip_list(self.max_resolutions)) self.progress.update(task_g, completed=100) if self._abuseipdb: - task_g = self.progress.add_task(f"Fetching IP enrichments from {self._abuseipdb.name}") + task_g = self.progress.add_task(f"Fetching IP data from {self._abuseipdb.name}") self.progress.update(task_g, advance=50) self._fetch_abuseipdb(*self.resolutions.ip_list(self.max_resolutions)) self.progress.update(task_g, completed=100) if self._urlhaus: - task_u = self.progress.add_task(f"Fetching domain enrichments from {self._urlhaus.name}") + task_u = self.progress.add_task(f"Fetching domain data from {self._urlhaus.name}") self.progress.update(task_u, advance=50) self._fetch_urlhaus() self.progress.update(task_u, completed=100) diff --git a/wtfis/handlers/ip.py b/wtfis/handlers/ip.py index 088268c..118948f 100644 --- a/wtfis/handlers/ip.py +++ b/wtfis/handlers/ip.py @@ -25,25 +25,31 @@ def fetch_data(self): self._fetch_vt_ip_address() self.progress.update(task_v, completed=100) - task_i = self.progress.add_task(f"Fetching IP enrichments from {self._enricher.name}") - self.progress.update(task_i, advance=50) - self._fetch_ip_enrichments(self.entity) - self.progress.update(task_i, completed=100) + task_g = self.progress.add_task(f"Fetching IP location and ASN from {self._geoasn.name}") + self.progress.update(task_g, advance=50) + self._fetch_geoasn(self.entity) + self.progress.update(task_g, completed=100) + + if self._shodan: + task_s = self.progress.add_task(f"Fetching IP data from {self._shodan.name}") + self.progress.update(task_s, advance=50) + self._fetch_shodan(self.entity) + self.progress.update(task_s, completed=100) if self._urlhaus: - task_u = self.progress.add_task(f"Fetching IP enrichments from {self._urlhaus.name}") + task_u = self.progress.add_task(f"Fetching IP data from {self._urlhaus.name}") self.progress.update(task_u, advance=50) self._fetch_urlhaus() self.progress.update(task_u, completed=100) if self._greynoise: - task_g = self.progress.add_task(f"Fetching IP enrichments from {self._greynoise.name}") + task_g = self.progress.add_task(f"Fetching IP data from {self._greynoise.name}") self.progress.update(task_g, advance=50) self._fetch_greynoise(self.entity) self.progress.update(task_g, completed=100) if self._abuseipdb: - task_a = self.progress.add_task(f"Fetching IP enrichments from {self._abuseipdb.name}") + task_a = self.progress.add_task(f"Fetching IP data from {self._abuseipdb.name}") self.progress.update(task_a, advance=50) self._fetch_abuseipdb(self.entity) self.progress.update(task_a, completed=100) diff --git a/wtfis/main.py b/wtfis/main.py index beaad17..51377cc 100644 --- a/wtfis/main.py +++ b/wtfis/main.py @@ -8,7 +8,6 @@ from rich.progress import Progress from typing import Union from wtfis.clients.abuseipdb import AbuseIpDbClient - from wtfis.clients.greynoise import GreynoiseClient from wtfis.clients.ip2whois import Ip2WhoisClient from wtfis.clients.ipwhois import IpWhoisClient @@ -108,12 +107,9 @@ def generate_entity_handler( # Virustotal client vt_client = VTClient(os.environ["VT_API_KEY"]) - # IP enrichment client selector - enricher_client: Union[IpWhoisClient, ShodanClient] = ( - ShodanClient(os.environ["SHODAN_API_KEY"]) - if args.use_shodan - else IpWhoisClient() - ) + # IP geolocation and ASN client selector + # TODO: add more options + ip_geoasn_client = IpWhoisClient() # Whois client selector # Order of use based on set envvars: @@ -129,6 +125,12 @@ def generate_entity_handler( else: whois_client = vt_client + shodan_client = ( + ShodanClient(os.environ["SHODAN_API_KEY"]) + if args.use_shodan + else None + ) + # Greynoise client (optional) greynoise_client = ( GreynoiseClient(os.environ["GREYNOISE_API_KEY"]) @@ -156,8 +158,9 @@ def generate_entity_handler( console=console, progress=progress, vt_client=vt_client, - ip_enricher_client=enricher_client, + ip_geoasn_client=ip_geoasn_client, whois_client=whois_client, + shodan_client=shodan_client, greynoise_client=greynoise_client, abuseipdb_client=abuseipdb_client, urlhaus_client=urlhaus_client, @@ -170,8 +173,9 @@ def generate_entity_handler( console=console, progress=progress, vt_client=vt_client, - ip_enricher_client=enricher_client, + ip_geoasn_client=ip_geoasn_client, whois_client=whois_client, + shodan_client=shodan_client, greynoise_client=greynoise_client, abuseipdb_client=abuseipdb_client, urlhaus_client=urlhaus_client, @@ -191,8 +195,9 @@ def generate_view( console, entity.vt_info, entity.resolutions, + entity.geoasn, entity.whois, - entity.ip_enrich, + entity.shodan, entity.greynoise, entity.abuseipdb, entity.urlhaus, @@ -202,8 +207,9 @@ def generate_view( view = IpAddressView( console, entity.vt_info, + entity.geoasn, entity.whois, - entity.ip_enrich, + entity.shodan, entity.greynoise, entity.abuseipdb, entity.urlhaus, diff --git a/wtfis/models/common.py b/wtfis/models/base.py similarity index 56% rename from wtfis/models/common.py rename to wtfis/models/base.py index 3ad9bdb..1d36424 100644 --- a/wtfis/models/common.py +++ b/wtfis/models/base.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import abc import sys -from pydantic import BaseModel, BeforeValidator +from pydantic import BaseModel, BeforeValidator, ConfigDict, RootModel from pydantic.v1.validators import str_validator -from typing import List, Optional +from typing import List, Mapping, Optional if sys.version_info >= (3, 9): from typing import Annotated @@ -33,3 +35,30 @@ class WhoisBase(BaseModel, abc.ABC): date_changed: Optional[str] = None date_expires: Optional[str] = None dnssec: Optional[str] = None + + +class IpGeoAsnBase(BaseModel, abc.ABC): + """ Use to normalize IP geolocation and ASN fields """ + model_config = ConfigDict(coerce_numbers_to_str=True) + + ip: str + + # Geolocation + city: Optional[str] + continent: Optional[str] + country: Optional[str] + region: Optional[str] + + # ASN + asn: Optional[str] + org: Optional[str] + isp: Optional[str] + domain: Optional[str] + + +class IpGeoAsnMapBase(RootModel, abc.ABC): + root: Mapping[str, IpGeoAsnBase] + + @classmethod + def empty(cls) -> IpGeoAsnMapBase: + return cls.model_validate({}) # pragma: no coverage diff --git a/wtfis/models/ip2whois.py b/wtfis/models/ip2whois.py index f7abb06..7a25fe9 100644 --- a/wtfis/models/ip2whois.py +++ b/wtfis/models/ip2whois.py @@ -1,7 +1,7 @@ from pydantic import Field, field_validator, model_validator from typing import List, Optional -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase class Whois(WhoisBase): diff --git a/wtfis/models/ipwhois.py b/wtfis/models/ipwhois.py index c3897f7..bab1d88 100644 --- a/wtfis/models/ipwhois.py +++ b/wtfis/models/ipwhois.py @@ -1,45 +1,33 @@ +""" +ipwhois datamodels +API doc: https://ipwhois.io/documentation +""" from __future__ import annotations -from pydantic import BaseModel, Field, RootModel from typing import Dict -from wtfis.models.common import LaxStr +from pydantic import AliasPath, Field +from wtfis.models.base import IpGeoAsnBase, IpGeoAsnMapBase -class Flag(BaseModel): - img: str - emoji: str - emoji_unicode: str +class IpWhois(IpGeoAsnBase): + # Metadata + source: str = "IPWhois" -class Connection(BaseModel): - asn: LaxStr - org: str - isp: str - domain: str - - -class IpWhois(BaseModel): + # Results ip: str - success: bool - type_: str = Field(alias="type") - continent: str - continent_code: str - country: str - country_code: str - region: str - region_code: str city: str - is_eu: bool - postal: str - calling_code: str - capital: str - borders: str - flag: Flag - connection: Connection + region: str + country: str + continent: str + asn: str = Field(validation_alias=AliasPath("connection", "asn")) + org: str = Field(validation_alias=AliasPath("connection", "org")) + isp: str = Field(validation_alias=AliasPath("connection", "isp")) + domain: str = Field(validation_alias=AliasPath("connection", "domain")) -class IpWhoisMap(RootModel): +class IpWhoisMap(IpGeoAsnMapBase): root: Dict[str, IpWhois] @classmethod diff --git a/wtfis/models/passivetotal.py b/wtfis/models/passivetotal.py index 2fe59b9..717bb2f 100644 --- a/wtfis/models/passivetotal.py +++ b/wtfis/models/passivetotal.py @@ -1,7 +1,7 @@ from pydantic import Field, model_validator from typing import List, Optional -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase class Whois(WhoisBase): diff --git a/wtfis/models/shodan.py b/wtfis/models/shodan.py index cd57696..0282720 100644 --- a/wtfis/models/shodan.py +++ b/wtfis/models/shodan.py @@ -1,8 +1,10 @@ +from __future__ import annotations + from collections import defaultdict, namedtuple from pydantic import BaseModel, RootModel from typing import Dict, List, Optional -from wtfis.models.common import LaxStr +from wtfis.models.base import LaxStr class PortData(BaseModel): @@ -43,3 +45,7 @@ def group_ports_by_product(self) -> dict: class ShodanIpMap(RootModel): root: Dict[str, ShodanIp] + + @classmethod + def empty(cls) -> ShodanIpMap: + return cls.model_validate({}) diff --git a/wtfis/types.py b/wtfis/models/types.py similarity index 55% rename from wtfis/types.py rename to wtfis/models/types.py index 0ff68da..73d2241 100644 --- a/wtfis/types.py +++ b/wtfis/models/types.py @@ -5,18 +5,30 @@ from wtfis.models.abuseipdb import AbuseIpDbMap from wtfis.models.greynoise import GreynoiseIpMap -from wtfis.models.ipwhois import IpWhoisMap +from wtfis.models.ipwhois import IpWhois, IpWhoisMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap -# IP enrichment types + +# IP enrichment map types IpEnrichmentType = Union[ + AbuseIpDbMap, GreynoiseIpMap, IpWhoisMap, ShodanIpMap, UrlHausMap, - AbuseIpDbMap, ] -# Domain/FQDN enrichment types -DomainEnrichmentType = UrlHausMap +# Domain/FQDN enrichment map types +DomainEnrichmentType = Union[ + UrlHausMap, +] + +# IP geolocation and ASN types +IpGeoAsnType = Union[ + IpWhois, +] + +IpGeoAsnMapType = Union[ + IpWhoisMap, +] diff --git a/wtfis/models/virustotal.py b/wtfis/models/virustotal.py index 527185a..d85fe8f 100644 --- a/wtfis/models/virustotal.py +++ b/wtfis/models/virustotal.py @@ -1,7 +1,7 @@ from pydantic import BaseModel, Field, RootModel, field_validator, model_validator from typing import Any, Dict, List, Optional -from wtfis.models.common import LaxStr, WhoisBase +from wtfis.models.base import LaxStr, WhoisBase class BaseData(BaseModel): diff --git a/wtfis/ui/base.py b/wtfis/ui/base.py index 893f957..4457cb2 100644 --- a/wtfis/ui/base.py +++ b/wtfis/ui/base.py @@ -11,11 +11,10 @@ from rich.text import Text from typing import Any, Generator, List, Optional, Tuple, Union from wtfis.models.abuseipdb import AbuseIpDb, AbuseIpDbMap - -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase from wtfis.models.greynoise import GreynoiseIp, GreynoiseIpMap -from wtfis.models.ipwhois import IpWhois, IpWhoisMap from wtfis.models.shodan import ShodanIp, ShodanIpMap +from wtfis.models.types import IpGeoAsnMapType, IpGeoAsnType from wtfis.models.virustotal import ( LastAnalysisStats, PopularityRanks, @@ -38,16 +37,18 @@ def __init__( self, console: Console, entity: Any, + geoasn: IpGeoAsnMapType, whois: Optional[WhoisBase], - ip_enrich: Union[IpWhoisMap, ShodanIpMap], + shodan: ShodanIpMap, greynoise: GreynoiseIpMap, abuseipdb: AbuseIpDbMap, urlhaus: UrlHausMap, ) -> None: self.console = console self.entity = entity + self.geoasn = geoasn self.whois = whois - self.ip_enrich = ip_enrich + self.shodan = shodan self.greynoise = greynoise self.abuseipdb = abuseipdb self.urlhaus = urlhaus @@ -298,8 +299,11 @@ def _gen_asn_text( .append(")")) return text - def _get_ip_enrichment(self, ip: str) -> Optional[Union[IpWhois, ShodanIp]]: - return self.ip_enrich.root[ip] if ip in self.ip_enrich.root.keys() else None + def _get_geoasn_enrichment(self, ip: str) -> Optional[IpGeoAsnType]: + return self.geoasn.root[ip] if ip in self.geoasn.root.keys() else None + + def _get_shodan_enrichment(self, ip: str) -> Optional[ShodanIp]: + return self.shodan.root[ip] if ip in self.shodan.root.keys() else None def _get_greynoise_enrichment(self, ip: str) -> Optional[GreynoiseIp]: return self.greynoise.root[ip] if ip in self.greynoise.root.keys() else None @@ -363,41 +367,48 @@ def _gen_vt_section(self) -> RenderableType: self._gen_heading_text("VirusTotal") ) - def _gen_ip_enrich_section(self) -> Optional[RenderableType]: - """ IP enrichment section. Applies to IP views only """ - enrich = self._get_ip_enrichment(self.entity.data.id_) + def _gen_geoasn_section(self) -> Optional[RenderableType]: + """ IP location and ASN section. Applies to IP views only """ + enrich = self._get_geoasn_enrichment(self.entity.data.id_) data: List[Tuple[Union[str, Text], Union[RenderableType, None]]] = [] if enrich: - if isinstance(enrich, IpWhois): - # IPWhois - section_title = "IPwhois" - asn = self._gen_asn_text(enrich.connection.asn, enrich.connection.org) - data += [ - ("ASN:", asn), - ("ISP:", enrich.connection.isp), - ("Location:", smart_join(enrich.city, enrich.region, enrich.country)), - ] - else: - # Shodan - section_title = "Shodan" - asn = self._gen_asn_text(enrich.asn, enrich.org) - tags = smart_join(*enrich.tags, style=self.theme.tags) if enrich.tags else None - services_field = self._gen_linked_field_name( - "Services", - hyperlink=f"{self.shodan_gui_baseurl}/{self.entity.data.id_}" - ) - data += [ - ("ASN:", asn), - ("ISP:", enrich.isp), - ("Location:", smart_join(enrich.city, enrich.region_name, enrich.country_name)), - ("OS:", enrich.os), - (services_field, self._gen_shodan_services(enrich)), - ("Tags:", tags), - ("Last Scan:", Timestamp(f"{enrich.last_update}+00:00").render), # Timestamps are UTC - # (source: Google) - ] + section_title = enrich.source + asn = self._gen_asn_text(enrich.asn, enrich.org) + data += [ + ("ASN:", asn), + ("ISP:", enrich.isp), + ("Location:", smart_join(enrich.city, enrich.region, enrich.country)), + ] + return self._gen_section( + + self._gen_table(*data), + self._gen_heading_text(section_title) + ) + + return None # No enrichment data + + def _gen_shodan_section(self) -> Optional[RenderableType]: + """ Shodan section. Applies to IP views only """ + enrich = self._get_shodan_enrichment(self.entity.data.id_) + + data: List[Tuple[Union[str, Text], Union[RenderableType, None]]] = [] + + if enrich: + section_title = "Shodan" + tags = smart_join(*enrich.tags, style=self.theme.tags) if enrich.tags else None + services_field = self._gen_linked_field_name( + "Services", + hyperlink=f"{self.shodan_gui_baseurl}/{self.entity.data.id_}" + ) + data += [ + ("OS:", enrich.os), + (services_field, self._gen_shodan_services(enrich)), + ("Tags:", tags), + ("Last Scan:", Timestamp(f"{enrich.last_update}+00:00").render), # Timestamps are UTC + # (source: Google) + ] return self._gen_section( self._gen_table(*data), diff --git a/wtfis/ui/view.py b/wtfis/ui/view.py index d34653f..dcadcd8 100644 --- a/wtfis/ui/view.py +++ b/wtfis/ui/view.py @@ -10,11 +10,11 @@ from typing import List, Optional, Tuple, Union from wtfis.models.abuseipdb import AbuseIpDbMap -from wtfis.models.common import WhoisBase +from wtfis.models.base import WhoisBase from wtfis.models.greynoise import GreynoiseIpMap -from wtfis.models.ipwhois import IpWhois, IpWhoisMap from wtfis.models.shodan import ShodanIpMap from wtfis.models.urlhaus import UrlHausMap +from wtfis.models.types import IpGeoAsnMapType from wtfis.models.virustotal import ( Domain, IpAddress, @@ -34,14 +34,15 @@ def __init__( console: Console, entity: Domain, resolutions: Optional[Resolutions], + geoasn: IpGeoAsnMapType, whois: WhoisBase, - ip_enrich: Union[IpWhoisMap, ShodanIpMap], + shodan: ShodanIpMap, greynoise: GreynoiseIpMap, abuseipdb: AbuseIpDbMap, urlhaus: UrlHausMap, max_resolutions: int = 3, ) -> None: - super().__init__(console, entity, whois, ip_enrich, greynoise, abuseipdb, urlhaus) + super().__init__(console, entity, geoasn, whois, shodan, greynoise, abuseipdb, urlhaus) self.resolutions = resolutions self.max_resolutions = max_resolutions @@ -87,40 +88,34 @@ def resolutions_panel(self) -> Optional[Panel]: ("Resolved:", Timestamp(attributes.date).render), ] - # IP Enrichment - enrich = self._get_ip_enrichment(attributes.ip_address) - - if enrich: - if isinstance(enrich, IpWhois): - # IPWhois - asn = self._gen_asn_text(enrich.connection.asn, enrich.connection.org) - data += [ - ("ASN:", asn), - ("ISP:", enrich.connection.isp), - ("Location:", smart_join(enrich.city, enrich.region, enrich.country)), - ] - else: - # Shodan - asn = self._gen_asn_text(enrich.asn, enrich.org) - tags = smart_join(*enrich.tags, style=self.theme.tags) if enrich.tags else None - services_field = self._gen_linked_field_name( - "Services", - hyperlink=f"{self.shodan_gui_baseurl}/{attributes.ip_address}" - ) - data += [ - ("ASN:", asn), - ("ISP:", enrich.isp), - ("Location:", smart_join(enrich.city, enrich.region_name, enrich.country_name)), - ("OS:", enrich.os), - (services_field, self._gen_shodan_services(enrich)), - ("Tags:", tags), - ("Last Scan:", Timestamp(f"{enrich.last_update}+00:00").render), # Timestamps are UTC - # (source: Google) - ] + # IP geolocation and ASN + geoasn = self._get_geoasn_enrichment(attributes.ip_address) + if geoasn: + asn = self._gen_asn_text(geoasn.asn, geoasn.org) + data += [ + ("ASN:", asn), + ("ISP:", geoasn.isp), + ("Location:", smart_join(geoasn.city, geoasn.region, geoasn.country)), + ] + + # Shodan + shodan = self._get_shodan_enrichment(attributes.ip_address) + if shodan: + tags = smart_join(*shodan.tags, style=self.theme.tags) if shodan.tags else None + services_field = self._gen_linked_field_name( + "Services", + hyperlink=f"{self.shodan_gui_baseurl}/{attributes.ip_address}" + ) + data += [ + ("OS:", shodan.os), + (services_field, self._gen_shodan_services(shodan)), + ("Tags:", tags), + # ("Last Scan:", Timestamp(f"{shodan.last_update}+00:00").render), # Timestamps are UTC + # # (source: Google) + ] # Greynoise greynoise = self._get_greynoise_enrichment(attributes.ip_address) - if greynoise: data += [self._gen_greynoise_tuple(greynoise)] @@ -190,18 +185,20 @@ def __init__( self, console: Console, entity: IpAddress, + geoasn: IpGeoAsnMapType, whois: WhoisBase, - ip_enrich: Union[IpWhoisMap, ShodanIpMap], + shodan: ShodanIpMap, greynoise: GreynoiseIpMap, abuseipdb: AbuseIpDbMap, urlhaus: UrlHausMap, ) -> None: - super().__init__(console, entity, whois, ip_enrich, greynoise, abuseipdb, urlhaus) + super().__init__(console, entity, geoasn, whois, shodan, greynoise, abuseipdb, urlhaus) def ip_panel(self) -> Panel: content = [self._gen_vt_section()] # VT section for section in ( - self._gen_ip_enrich_section(), # IP enrich section + self._gen_geoasn_section(), # IP location and ASN section + self._gen_shodan_section(), # Shodan section self._gen_urlhaus_section(), # URLhaus section self._gen_ip_other_section(), # Other section ):