diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..94ec8c6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ + __pycache__/ + diff --git a/dns_monitor.py b/dns_monitor.py index 08d8192..df03501 100755 --- a/dns_monitor.py +++ b/dns_monitor.py @@ -1,42 +1,52 @@ #!/usr/bin/env python3 +# dns_monitor.py import dns.resolver import time import os import argparse -# Command-line arguments -parser = argparse.ArgumentParser(description='Monitor DNS TXT records.') -parser.add_argument('--hostname', type=str, default=os.getenv('HOSTNAME', 'futurestay.com')) -parser.add_argument('--substring', type=str, default=os.getenv('SUBSTRING', 'google-site-verification')) -parser.add_argument('--d', type=int, default=os.getenv('DELAY', 5)) -args = parser.parse_args() # Store the keys keys = set() -def monitor_dns(hostname, substring, d): +def fetch_txt_records(targethost): + return dns.resolver.resolve(targethost, 'TXT') + +def process_txt_records(records, substring): + new_keys = set() + found_new_key = False + for rdata in records: + for txt_string in rdata.strings: + txt_string = txt_string.decode() + if substring in txt_string: + key = txt_string.split('=')[-1] if '=' in txt_string else None + if key: + new_keys.add(key) + if key not in keys: + found_new_key = True + print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - Found new '{substring}' key '{key}' in TXT record") + return new_keys, found_new_key + +def monitor_dns(targethost, substring, d): global keys while True: try: - new_keys = set() - found_new_key = False - answers = dns.resolver.resolve(hostname, 'TXT') - for rdata in answers: - for txt_string in rdata.strings: - txt_string = txt_string.decode() - if substring in txt_string: - key = txt_string.split('=')[-1] if '=' in txt_string else None - if key: - new_keys.add(key) - if key not in keys: - found_new_key = True - print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - Found new '{substring}' key '{key}' in TXT record of {hostname}") + records = fetch_txt_records(targethost) + new_keys, found_new_key = process_txt_records(records, substring) keys = new_keys if not found_new_key: - print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - No new keys found in this loop for {hostname}") + print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - No new keys found in this loop for {targethost}") except Exception as e: print(f"Error occurred: {e}") time.sleep(d) -monitor_dns(args.hostname, args.substring, args.d) +if __name__ == "__main__": + # Command-line arguments + parser = argparse.ArgumentParser(description='Monitor DNS TXT records.') + parser.add_argument('--targethost', type=str, default=os.getenv('TARGETHOSTNAME', 'futurestay.com')) + parser.add_argument('--substring', type=str, default=os.getenv('SUBSTRING', 'google-site-verification')) + parser.add_argument('--d', type=int, default=os.getenv('DELAY', 5)) + args = parser.parse_args() + monitor_dns(args.targethost, args.substring, args.d) + diff --git a/test_dns_monitor.py b/test_dns_monitor.py new file mode 100755 index 0000000..7047ac5 --- /dev/null +++ b/test_dns_monitor.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 + +# test_dns_monitor.py +import unittest +from unittest.mock import patch, MagicMock +from dns_monitor import process_txt_records + +class TestDNSMonitor(unittest.TestCase): + + @patch('dns_monitor.fetch_txt_records') + def test_process_txt_records(self, mock_fetch_txt_records): + # Simulate multiple Rdata objects + mock_rdata1 = MagicMock() + mock_rdata1.strings = [b'google-site-verification=abc123'] + mock_rdata2 = MagicMock() + mock_rdata2.strings = [b'google-site-verification=def456'] + + # Return a list of mock Rdata objects + mock_fetch_txt_records.return_value = [mock_rdata1, mock_rdata2] + + new_keys, found_new_key = process_txt_records(mock_fetch_txt_records.return_value, 'google-site-verification') + + self.assertEqual(new_keys, {'abc123', 'def456'}) + self.assertTrue(found_new_key) + +if __name__ == "__main__": + unittest.main() +