diff --git a/api_app/analyzers_manager/migrations/0148_analyzer_config_phunter.py b/api_app/analyzers_manager/migrations/0148_analyzer_config_phunter.py new file mode 100644 index 0000000000..a300bf0a59 --- /dev/null +++ b/api_app/analyzers_manager/migrations/0148_analyzer_config_phunter.py @@ -0,0 +1,128 @@ +from django.db import migrations +from django.db.models.fields.related_descriptors import ( + ForwardManyToOneDescriptor, + ForwardOneToOneDescriptor, + ManyToManyDescriptor, + ReverseManyToOneDescriptor, + ReverseOneToOneDescriptor, +) + +plugin = { + "python_module": { + "health_check_schedule": None, + "update_schedule": None, + "module": "phunter.phunter.Phunter", + "base_path": "api_app.analyzers_manager.observable_analyzers", + }, + "name": "Phunter", + "description": "[Phunter](https://github.com/N0rz3/Phunter) is a very useful tool for finding information about a phone number.", + "disabled": False, + "soft_time_limit": 60, + "routing_key": "default", + "health_check_status": True, + "type": "observable", + "docker_based": False, + "maximum_tlp": "CLEAR", + "observable_supported": ["generic"], + "supported_filetypes": [], + "run_hash": False, + "run_hash_type": "", + "not_supported_filetypes": [], + "mapping_data_model": {}, + "model": "analyzers_manager.AnalyzerConfig", +} + +params = [] + +values = [] + + +def _get_real_obj(Model, field, value): + def _get_obj(Model, other_model, value): + if isinstance(value, dict): + real_vals = {} + for key, real_val in value.items(): + real_vals[key] = _get_real_obj(other_model, key, real_val) + value = other_model.objects.get_or_create(**real_vals)[0] + # it is just the primary key serialized + else: + if isinstance(value, int): + if Model.__name__ == "PluginConfig": + value = other_model.objects.get(name=plugin["name"]) + else: + value = other_model.objects.get(pk=value) + else: + value = other_model.objects.get(name=value) + return value + + if ( + type(getattr(Model, field)) + in [ + ForwardManyToOneDescriptor, + ReverseManyToOneDescriptor, + ReverseOneToOneDescriptor, + ForwardOneToOneDescriptor, + ] + and value + ): + other_model = getattr(Model, field).get_queryset().model + value = _get_obj(Model, other_model, value) + elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: + other_model = getattr(Model, field).rel.model + value = [_get_obj(Model, other_model, val) for val in value] + return value + + +def _create_object(Model, data): + mtm, no_mtm = {}, {} + for field, value in data.items(): + value = _get_real_obj(Model, field, value) + if type(getattr(Model, field)) is ManyToManyDescriptor: + mtm[field] = value + else: + no_mtm[field] = value + try: + o = Model.objects.get(**no_mtm) + except Model.DoesNotExist: + o = Model(**no_mtm) + o.full_clean() + o.save() + for field, value in mtm.items(): + attribute = getattr(o, field) + if value is not None: + attribute.set(value) + return False + return True + + +def migrate(apps, schema_editor): + Parameter = apps.get_model("api_app", "Parameter") + PluginConfig = apps.get_model("api_app", "PluginConfig") + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + if not Model.objects.filter(name=plugin["name"]).exists(): + exists = _create_object(Model, plugin) + if not exists: + for param in params: + _create_object(Parameter, param) + for value in values: + _create_object(PluginConfig, value) + + +def reverse_migrate(apps, schema_editor): + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + Model.objects.get(name=plugin["name"]).delete() + + +class Migration(migrations.Migration): + atomic = False + dependencies = [ + ("api_app", "0065_job_mpnodesearch"), + ( + "analyzers_manager", + "0147_alter_analyzer_config_feodo_yaraify_urlhaus_yaraify_scan", + ), + ] + + operations = [migrations.RunPython(migrate, reverse_migrate)] diff --git a/api_app/analyzers_manager/observable_analyzers/phunter/__init__.py b/api_app/analyzers_manager/observable_analyzers/phunter/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api_app/analyzers_manager/observable_analyzers/phunter/phunter.py b/api_app/analyzers_manager/observable_analyzers/phunter/phunter.py new file mode 100644 index 0000000000..b663238f26 --- /dev/null +++ b/api_app/analyzers_manager/observable_analyzers/phunter/phunter.py @@ -0,0 +1,62 @@ +# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl +# See the file 'LICENSE' for copying permission. + +import logging + +from api_app.analyzers_manager.classes import ObservableAnalyzer +from api_app.analyzers_manager.observable_analyzers.phunter.phunter_base import ( + PhunterBase, +) +from tests.mock_utils import if_mock_connections, patch + +logger = logging.getLogger(__name__) + + +class Phunter(ObservableAnalyzer): + """ + This analyzer is a wrapper for the Phunter project. + """ + + @classmethod + def update(cls) -> bool: + pass + + def run(self): + logger.info(f"Running Phunter Analyzer for {self.observable_name}") + + phunter = PhunterBase() + + results = phunter.phunt(phone_number=self.observable_name) + + return results + + @classmethod + def _monkeypatch(cls): + patches = [ + if_mock_connections( + patch.object( + Phunter, + "phunt", + return_value={ + "Valid": "true", + "Operator": "Not found", + "Possible": "true", + "Line Type": "Not found", + "Spamcalls": "true", + "Free Lookup": { + "Owner": "Not found", + "Carrier": "Not found", + "Country": "United States", + "Location": "Not found", + "National": "(833) 371-2570", + "Line Type": "TOLL FREE", + "Local Time": "02:43:29", + "Views count": "34", + "International": "+1 833-371-2570", + }, + "Phone Number": "+18333712570", + }, + ), + ) + ] + return super()._monkeypatch(patches=patches) diff --git a/api_app/analyzers_manager/observable_analyzers/phunter/phunter_base.py b/api_app/analyzers_manager/observable_analyzers/phunter/phunter_base.py new file mode 100644 index 0000000000..d9a84f4cd7 --- /dev/null +++ b/api_app/analyzers_manager/observable_analyzers/phunter/phunter_base.py @@ -0,0 +1,76 @@ +import phonenumbers +import requests +from bs4 import BeautifulSoup +from phonenumbers import carrier + + +class PhunterBase: + def __init__(self): + pass + + def phunt(self, phone_number: str) -> dict: + + # General Information + + parsed = phonenumbers.parse(phone_number) + + possible = phonenumbers.is_possible_number(parsed) + + valid = phonenumbers.is_valid_number(parsed) + + operator = carrier.name_for_number(parsed, "en") + if operator != "": + operator = operator + else: + operator = "Not found" + + line = phonenumbers.number_type(parsed) + if line == phonenumbers.PhoneNumberType.FIXED_LINE: + ligne = "Fixed" + elif line == phonenumbers.PhoneNumberType.MOBILE: + ligne = "Mobile" + else: + ligne = "Not found" + + # Free Lookup + + user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:124.0) Gecko/20100101 Firefox/124.0" + + free_lookup_url = f"https://free-lookup.net/{phone_number.replace('+', '')}" + + r = requests.get(free_lookup_url, headers={"user-agent": user_agent}) + + html_body = BeautifulSoup(r.text, "html.parser") + list_info = html_body.findChild("ul", class_="report-summary__list").findAll( + "div" + ) + + info_dict = { + k.text.strip(): info.text.strip() if info.text.strip() else "Not found" + for _, (k, info) in enumerate(zip(list_info[::2], list_info[1::2])) + } + + # Spamcalls + + spammer = False + + spamcalls_url = f"https://spamcalls.net/en/number/{phone_number}" + + r = requests.get(spamcalls_url, headers={"user-agent": user_agent}) + + if r.status_code == 200: + spammer = True + else: + spammer = False + + result = { + "Phone Number": phone_number, + "Possible": possible, + "Valid": valid, + "Operator": operator, + "Line Type": ligne, + "Free Lookup": info_dict, + "Spamcalls": spammer, + } + + return result diff --git a/requirements/project-requirements.txt b/requirements/project-requirements.txt index 6f0a5296e2..4f21efb0fd 100644 --- a/requirements/project-requirements.txt +++ b/requirements/project-requirements.txt @@ -88,6 +88,7 @@ docxpy==0.8.5 pylnk3==0.4.2 androguard==3.4.0a1 # version >=4.x of androguard raises a dependency conflict with quark-engine==25.1.1 wad==0.4.6 +phonenumbers==8.13.53 # this is required because XLMMacroDeobfuscator does not pin the following packages pyxlsb2==0.0.8