Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple geoip and ASN lookups from Shodan results #78

Merged
merged 3 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified imgs/example-ip.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ classifiers = [
"Topic :: Security",
]
dependencies = [
"pydantic~=2.7.0",
"pydantic~=2.7.2",
"python-dotenv~=1.0.1",
"requests~=2.31.0",
"requests~=2.32.3",
"rich~=13.7.1",
"shodan~=1.31.0",
]
Expand Down Expand Up @@ -65,7 +65,7 @@ dependencies = [
"mypy",
"pytest",
"pytest-cov",
"types-requests~=2.31.0",
"types-requests~=2.32.0",
]
[tool.hatch.envs.default.scripts]
typecheck = "mypy -p {args:wtfis}"
Expand Down
18 changes: 12 additions & 6 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,12 @@ def test_handler_domain_1(self, fake_load_dotenv_1):
assert entity.console == console
assert entity.progress == progress
assert isinstance(entity._vt, VTClient)
assert isinstance(entity._enricher, IpWhoisClient)
assert isinstance(entity._geoasn, IpWhoisClient)
assert isinstance(entity._whois, PTClient)
assert entity._shodan is None
assert entity._greynoise is None
assert entity._urlhaus is None
assert entity._abuseipdb is None
unset_env_vars()

@patch("sys.argv", ["main", "www.example[.]com", "-s", "-g", "-u", "-m", "5"])
Expand All @@ -423,8 +425,9 @@ def test_handler_domain_2(self, fake_load_dotenv_1):
progress = simulate_progress(console),
entity = generate_entity_handler(parse_args(), console, progress)
assert entity.max_resolutions == 5
assert isinstance(entity._enricher, ShodanClient)
assert isinstance(entity._geoasn, IpWhoisClient)
assert isinstance(entity._whois, PTClient)
assert isinstance(entity._shodan, ShodanClient)
assert isinstance(entity._greynoise, GreynoiseClient)
assert isinstance(entity._urlhaus, UrlHausClient)
unset_env_vars()
Expand Down Expand Up @@ -464,7 +467,7 @@ def test_handler_ip_1(self, fake_load_dotenv_1):
assert entity.console == console
assert entity.progress == progress
assert isinstance(entity._vt, VTClient)
assert isinstance(entity._enricher, IpWhoisClient)
assert isinstance(entity._geoasn, IpWhoisClient)
assert isinstance(entity._whois, PTClient)
assert entity._greynoise is None
assert entity._urlhaus is None
Expand All @@ -478,8 +481,9 @@ def test_handler_ip_2(self, fake_load_dotenv_1):
console = Console()
progress = simulate_progress(console),
entity = generate_entity_handler(parse_args(), console, progress)
assert isinstance(entity._enricher, ShodanClient)
assert isinstance(entity._geoasn, IpWhoisClient)
assert isinstance(entity._whois, PTClient)
assert isinstance(entity._shodan, ShodanClient)
assert isinstance(entity._greynoise, GreynoiseClient)
assert isinstance(entity._urlhaus, UrlHausClient)
assert isinstance(entity._abuseipdb, AbuseIpDbClient)
Expand All @@ -496,8 +500,9 @@ def test_view_domain_1(self, m_domain_view, test_data):
console=MagicMock(),
progress=MagicMock(),
vt_client=MagicMock(),
ip_enricher_client=MagicMock(),
ip_geoasn_client=MagicMock(),
whois_client=MagicMock(),
shodan_client=MagicMock(),
greynoise_client=MagicMock(),
abuseipdb_client=MagicMock(),
urlhaus_client=MagicMock(),
Expand All @@ -516,8 +521,9 @@ def test_view_ip_1(self, m_ip_view, test_data):
console=MagicMock(),
progress=MagicMock(),
vt_client=MagicMock(),
ip_enricher_client=MagicMock(),
ip_geoasn_client=MagicMock(),
whois_client=MagicMock(),
shodan_client=MagicMock(),
greynoise_client=MagicMock(),
abuseipdb_client=MagicMock(),
urlhaus_client=MagicMock(),
Expand Down
6 changes: 4 additions & 2 deletions tests/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
import pytest
from unittest.mock import MagicMock, patch

from shodan import APIError

from wtfis.clients.abuseipdb import AbuseIpDbClient
from wtfis.clients.base import requests
from wtfis.clients.greynoise import GreynoiseClient
from wtfis.clients.ip2whois import Ip2WhoisClient
from wtfis.clients.ipwhois import IpWhoisClient
from wtfis.clients.passivetotal import PTClient
from wtfis.clients.shodan import APIError, Shodan, ShodanClient
from wtfis.clients.shodan import Shodan, ShodanClient
from wtfis.clients.urlhaus import UrlHausClient
from wtfis.clients.virustotal import VTClient
from wtfis.models.ipwhois import IpWhoisMap
Expand Down Expand Up @@ -172,7 +174,7 @@ def test_get_ip_apierror_invalid_key(self, mock_shodan_host, shodan_client):
shodan_client.enrich_ips("thisdoesntmatter")

assert e.type == APIError
assert str(e.value) == "Invalid Shodan API key"
assert str(e.value) == "Invalid API key"

@patch.object(Shodan, "host")
def test_get_ip_apierror_other(self, mock_shodan_host, shodan_client):
Expand Down
60 changes: 44 additions & 16 deletions tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from wtfis.clients.greynoise import GreynoiseClient
from wtfis.clients.ipwhois import IpWhoisClient
from wtfis.clients.passivetotal import PTClient
from wtfis.clients.shodan import ShodanClient
from wtfis.clients.urlhaus import UrlHausClient
from wtfis.clients.virustotal import VTClient
from wtfis.handlers.domain import DomainHandler
Expand All @@ -25,8 +26,9 @@ def generate_domain_handler(max_resolutions=3):
console=Console(),
progress=MagicMock(),
vt_client=VTClient("dummykey"),
ip_enricher_client=IpWhoisClient(),
ip_geoasn_client=IpWhoisClient(),
whois_client=PTClient("dummyuser", "dummykey"),
shodan_client=ShodanClient("dummykey"),
greynoise_client=GreynoiseClient("dummykey"),
abuseipdb_client=AbuseIpDbClient("dummykey"),
urlhaus_client=UrlHausClient(),
Expand All @@ -40,8 +42,9 @@ def generate_ip_handler():
console=Console(),
progress=MagicMock(),
vt_client=VTClient("dummykey"),
ip_enricher_client=IpWhoisClient(),
ip_geoasn_client=IpWhoisClient(),
whois_client=PTClient("dummyuser", "dummykey"),
shodan_client=ShodanClient("dummykey"),
greynoise_client=GreynoiseClient("dummykey"),
abuseipdb_client=AbuseIpDbClient("dummykey"),
urlhaus_client=UrlHausClient(),
Expand Down Expand Up @@ -70,16 +73,18 @@ def test_fetch_data_1(self, domain_handler, test_data):

handler._fetch_vt_domain = MagicMock()
handler._fetch_vt_resolutions = MagicMock()
handler._fetch_ip_enrichments = MagicMock()
handler._fetch_geoasn = MagicMock()
handler._fetch_whois = MagicMock()
handler._fetch_shodan = MagicMock()
handler._fetch_greynoise = MagicMock()
handler._fetch_urlhaus = MagicMock()

handler.fetch_data()
handler._fetch_vt_domain.assert_called_once()
handler._fetch_vt_resolutions.assert_called_once()
handler._fetch_ip_enrichments.assert_called_once()
handler._fetch_geoasn.assert_called_once()
handler._fetch_whois.assert_called_once()
handler._fetch_shodan.assert_called_once()
handler._fetch_greynoise.assert_called_once()
handler._fetch_urlhaus.assert_called_once()

Expand All @@ -88,21 +93,23 @@ def test_fetch_data_2(self, domain_handler):
handler = domain_handler(0)
handler._fetch_vt_domain = MagicMock()
handler._fetch_vt_resolutions = MagicMock()
handler._fetch_ip_enrichments = MagicMock()
handler._fetch_geoasn = MagicMock()
handler._fetch_whois = MagicMock()
handler._fetch_shodan = MagicMock()
handler._fetch_greynoise = MagicMock()
handler._fetch_urlhaus = MagicMock()

handler.fetch_data()
handler._fetch_vt_domain.assert_called_once()
handler._fetch_vt_resolutions.assert_not_called()
handler._fetch_ip_enrichments.assert_not_called()
handler._fetch_geoasn.assert_not_called()
handler._fetch_whois.assert_called_once()
handler._fetch_shodan.assert_not_called()
handler._fetch_greynoise.assert_not_called()
handler._fetch_urlhaus.assert_called_once()

assert handler.ip_enrich == IpWhoisMap.model_validate({})
assert handler.ip_enrich.root == {}
assert handler.geoasn == IpWhoisMap.model_validate({})
assert handler.geoasn.root == {}

@patch.object(requests.Session, "get")
def test_vt_http_error(self, mock_requests_get, domain_handler, capsys):
Expand Down Expand Up @@ -173,7 +180,7 @@ def test_ipwhois_http_error(self, mock_requests_get, domain_handler, capsys, tes
mock_resp.status_code = 500
mock_requests_get.return_value = mock_resp

handler._fetch_ip_enrichments(*["1.2.3.4", "1.2.3.5"])
handler._fetch_geoasn(*["1.2.3.4", "1.2.3.5"])
assert handler.warnings[0].startswith("Could not fetch IPWhois: 500 Server Error:")

handler.print_warnings()
Expand All @@ -191,12 +198,12 @@ def test_ipwhois_validation_error(self, mock_requests_get, domain_handler, capsy
mock_requests_get.return_value = mock_resp

with pytest.raises(SystemExit) as e:
handler._fetch_ip_enrichments("1.2.3.4")
handler._fetch_geoasn("1.2.3.4")

capture = capsys.readouterr()

assert capture.err.startswith(
"Data model validation error: 16 validation errors for IpWhois\n"
"Data model validation error: 9 validation errors for IpWhois\n"
)
assert e.type == SystemExit
assert e.value.code == 1
Expand Down Expand Up @@ -374,16 +381,18 @@ def test_entity_refang(self, ip_handler):
def test_fetch_data(self, ip_handler):
handler = ip_handler()
handler._fetch_vt_ip_address = MagicMock()
handler._fetch_ip_enrichments = MagicMock()
handler._fetch_geoasn = MagicMock()
handler._fetch_whois = MagicMock()
handler._fetch_shodan = MagicMock()
handler._fetch_greynoise = MagicMock()
handler._fetch_urlhaus = MagicMock()
handler._fetch_abuseipdb = MagicMock()

handler.fetch_data()
handler._fetch_vt_ip_address.assert_called_once()
handler._fetch_ip_enrichments.assert_called_once()
handler._fetch_geoasn.assert_called_once()
handler._fetch_whois.assert_called_once()
handler._fetch_shodan.assert_called_once()
handler._fetch_greynoise.assert_called_once()
handler._fetch_urlhaus.assert_called_once()
handler._fetch_abuseipdb.assert_called_once()
Expand Down Expand Up @@ -438,7 +447,7 @@ def test_ipwhois_http_error(self, mock_requests_get, ip_handler, capsys):
mock_resp.status_code = 502
mock_requests_get.return_value = mock_resp

handler._fetch_ip_enrichments("1.2.3.4")
handler._fetch_geoasn("1.2.3.4")
assert handler.warnings[0].startswith("Could not fetch IPWhois: 502 Server Error:")

handler.print_warnings()
Expand Down Expand Up @@ -473,16 +482,35 @@ def test_ipwhois_validation_error(self, mock_requests_get, ip_handler, capsys):
mock_requests_get.return_value = mock_resp

with pytest.raises(SystemExit) as e:
handler._fetch_ip_enrichments("1.2.3.4")
handler._fetch_geoasn("1.2.3.4")

capture = capsys.readouterr()

assert capture.err.startswith(
"Data model validation error: 16 validation errors for IpWhois\n"
"Data model validation error: 9 validation errors for IpWhois\n"
)
assert e.type == SystemExit
assert e.value.code == 1

@patch.object(requests.Session, "get")
def test_shodan_api_error(self, mock_requests_get, ip_handler, capsys):
"""
Test fail open behavior of Shodan on invalid API key
"""
handler = ip_handler()
mock_resp = requests.models.Response()

mock_resp.status_code = 401
mock_resp._content = b'<html>'
mock_requests_get.return_value = mock_resp

handler._fetch_shodan("1.2.3.4")
assert handler.warnings[0].startswith("Could not fetch Shodan: Invalid API key")

handler.print_warnings()
capture = capsys.readouterr()
assert capture.out.startswith("WARN: Could not fetch Shodan: Invalid API key")

@patch.object(requests.Session, "get")
def test_greynoise_http_error(self, mock_requests_get, ip_handler, capsys):
"""
Expand Down
Loading
Loading