Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

So.. I did some stuff #76

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*.pyc
suspicious_domains.log
suspicious_domains_2024-01-24.log
143 changes: 64 additions & 79 deletions catch_phishing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# Copyright (c) 2017 @x0rz
#!/usr/bin/env python3
# Copyright (c) 2017 @x0rz modified in 2024 by (joel esler)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -10,97 +10,80 @@
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import re
from collections import Counter
import math

import certstream
import tqdm
import yaml
import time
import os
from Levenshtein import distance
from termcolor import colored, cprint
from termcolor import colored
from tld import get_tld

from confusables import unconfuse

certstream_url = 'wss://certstream.calidog.io'

log_suspicious = os.path.dirname(os.path.realpath(__file__))+'/suspicious_domains_'+time.strftime("%Y-%m-%d")+'.log'

suspicious_yaml = os.path.dirname(os.path.realpath(__file__))+'/suspicious.yaml'

external_yaml = os.path.dirname(os.path.realpath(__file__))+'/external.yaml'
domain_split_regex = re.compile("\W+")

certstream_url = 'wss://certstream.calidog.io'
log_suspicious = os.path.dirname(os.path.realpath(__file__)) + '/suspicious_domains_' + time.strftime("%Y-%m-%d") + '.log'
suspicious_yaml = os.path.dirname(os.path.realpath(__file__)) + '/suspicious.yaml'
external_yaml = os.path.dirname(os.path.realpath(__file__)) + '/external.yaml'
pbar = tqdm.tqdm(desc='certificate_update', unit='cert')

def entropy(string):
"""Calculates the Shannon entropy of a string"""
prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]
entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])
return entropy

def score_domain(domain):
"""Score `domain`.

The highest score, the most probable `domain` is a phishing site.

Args:
domain (str): the domain to check.
prob = [float(count) / len(string) for count in Counter(string).values()]
return -sum(p * math.log(p) / math.log(2.0) for p in prob)

Returns:
int: the score of `domain`.
"""
def score_domain(domain, suspicious_tlds, suspicious_keywords):
"""Score `domain`."""
score = 0
for t in suspicious['tlds']:
if domain.endswith(t):
score += 20

# Remove initial '*.' for wildcard certificates bug
if domain.startswith('*.'):
domain = domain[2:]

# Removing TLD to catch inner TLD in subdomain (ie. paypal.com.domain.com)
try:
# Attempt to extract the TLD and process the domain
res = get_tld(domain, as_object=True, fail_silently=True, fix_protocol=True)
domain = '.'.join([res.subdomain, res.domain])
except Exception:
pass

# Higer entropy is kind of suspicious
score += int(round(entropy(domain)*10))
score += int(round(entropy(domain) * 10))
domain = unconfuse(domain)
words_in_domain = domain_split_regex.split(domain)

# Remove lookalike characters using list from http://www.unicode.org/reports/tr39
domain = unconfuse(domain)
if words_in_domain[0] in ['com', 'net', 'org', 'mil', 'gov']:
score += 10

words_in_domain = re.split("\W+", domain)
for t in suspicious_tlds:
if domain.endswith(t):
score += 20

# ie. detect fake .com (ie. *.com-account-management.info)
if words_in_domain[0] in ['com', 'net', 'org']:
score += 10
for word in words_in_domain:
if word in suspicious_keywords:
score += suspicious_keywords[word]

# Testing keywords
for word in suspicious['keywords']:
if word in domain:
score += suspicious['keywords'][word]
for key in [k for (k, s) in suspicious_keywords.items() if s >= 70]:
for word in [w for w in words_in_domain if w not in ['email', 'mail', 'cloud']]:
if distance(str(word), str(key)) == 1:
score += 70

# Testing Levenshtein distance for strong keywords (>= 70 points) (ie. paypol)
for key in [k for (k,s) in suspicious['keywords'].items() if s >= 70]:
# Removing too generic keywords (ie. mail.domain.com)
for word in [w for w in words_in_domain if w not in ['email', 'mail', 'cloud']]:
if distance(str(word), str(key)) == 1:
score += 70
if 'xn--' not in domain and domain.count('-') >= 4:
score += domain.count('-') * 3

# Lots of '-' (ie. www.paypal-datacenter.com-acccount-alert.com)
if 'xn--' not in domain and domain.count('-') >= 4:
score += domain.count('-') * 3
if domain.count('.') >= 3:
score += domain.count('.') * 3

# Deeply nested subdomains (ie. www.paypal.com.security.accountupdate.gq)
if domain.count('.') >= 3:
score += domain.count('.') * 3
except UnicodeError as e:
# Log the error and skip scoring for this domain
print(f"Error processing domain '{domain}': {e}")
return 0

return score


def is_subdomain(domain, base_domain):
return domain == base_domain or domain.endswith('.' + base_domain)

def callback(message, context):
"""Callback handler for certstream events."""
if message['message_type'] == "heartbeat":
Expand All @@ -110,49 +93,51 @@ def callback(message, context):
all_domains = message['data']['leaf_cert']['all_domains']

for domain in all_domains:
domain_lower = domain.lower()

if any(is_subdomain(domain_lower, ignore_domain) for ignore_domain in ignore_list):
continue

pbar.update(1)
score = score_domain(domain.lower())
score = score_domain(domain_lower, suspicious_tlds_set, suspicious_keywords_set)

# If issued from a free CA = more suspicious
if "Let's Encrypt" == message['data']['leaf_cert']['issuer']['O']:
score += 10

if "ZeroSSL" == message['data']['leaf_cert']['issuer']['O']:
score += 10

if score >= 100:
tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score))
tqdm.tqdm.write("[!] Suspicious: {} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score))
elif score >= 90:
tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline']), score))
tqdm.tqdm.write("[!] Suspicious: {} (score={})".format(colored(domain, 'red', attrs=['underline']), score))
elif score >= 80:
tqdm.tqdm.write(
"[!] Likely : "
"{} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score))
tqdm.tqdm.write("[!] Likely : {} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score))
elif score >= 65:
tqdm.tqdm.write(
"[+] Potential : "
"{} (score={})".format(colored(domain, attrs=['underline']), score))
tqdm.tqdm.write("[+] Potential : {} (score={})".format(colored(domain, attrs=['underline']), score))

if score >= 75:
if score >= 100:
with open(log_suspicious, 'a') as f:
f.write("{}\n".format(domain))


if __name__ == '__main__':
with open(suspicious_yaml, 'r') as f:
suspicious = yaml.safe_load(f)

ignore_list = set(suspicious.get('ignore_domains', []))

with open(external_yaml, 'r') as f:
external = yaml.safe_load(f)

if external['override_suspicious.yaml'] is True:
suspicious = external
else:
if external['keywords'] is not None:
if external.get('override_suspicious.yaml', False) is False:
if external.get('keywords') is not None:
suspicious['keywords'].update(external['keywords'])

if external['tlds'] is not None:
if external.get('tlds') is not None:
suspicious['tlds'].update(external['tlds'])
if external.get('ignore_domains') is not None:
ignore_list.update(external['ignore_domains'])

suspicious_tlds_set = set(suspicious.get('tlds', []))
suspicious_keywords_set = {k: v for k, v in suspicious.get('keywords', {}).items()}

certstream.listen_for_events(callback, url=certstream_url)
10 changes: 9 additions & 1 deletion confusables.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@
u'\uABB6': 'k',
u'\u049B': 'k',
u'\u049F': 'k',
u'\u1E33': 'k',
u'\U00010320': 'l',
u'\U0001E8C7': 'l',
u'\U0001D7CF': 'l',
Expand Down Expand Up @@ -1823,8 +1824,14 @@
}

def unconfuse(domain):
# Correctly handle Punycode domains
if domain.startswith('xn--'):
domain = domain.encode('idna').decode('idna')
try:
domain = domain.encode('idna').decode('idna')
except UnicodeError as e:
print(f"Error decoding Punycode domain '{domain}': {e}")
return domain # Return the original domain on error

unconfused = ''
for i in range(len(domain)):
if domain[i] in confusables:
Expand All @@ -1834,3 +1841,4 @@ def unconfuse(domain):

return unconfused


Loading