Skip to content

Commit

Permalink
Refactor: DNS monitor and add unit tests
Browse files Browse the repository at this point in the history
Details:

1. Refactored the dns_monitor.py script to better structure the code and improve readability. The main function is now split into three distinct functions: fetch_txt_records, process_txt_records, and monitor_dns. This allows for easier testing and understanding of each part of the process.

2. for testing to work - move the argument parsing into the if __name__ == "__main__": block so it's only executed when the script is run directly.

3. Added a unit test (test_dns_monitor.py) for the process_txt_records function, with a mock DNS resolver response. This ensures that our key processing logic works as expected.

4. Added a .gitignore file to ignore Python cache files, which are not necessary to track in version control.
  • Loading branch information
Aricg committed May 15, 2023
1 parent fddfdf9 commit 6d57951
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 22 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/

54 changes: 32 additions & 22 deletions dns_monitor.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,52 @@
#!/usr/bin/env python3

# dns_monitor.py
import dns.resolver
import time
import os
import argparse

# Command-line arguments
parser = argparse.ArgumentParser(description='Monitor DNS TXT records.')
parser.add_argument('--hostname', type=str, default=os.getenv('HOSTNAME', 'futurestay.com'))
parser.add_argument('--substring', type=str, default=os.getenv('SUBSTRING', 'google-site-verification'))
parser.add_argument('--d', type=int, default=os.getenv('DELAY', 5))
args = parser.parse_args()

# Store the keys
keys = set()

def monitor_dns(hostname, substring, d):
def fetch_txt_records(targethost):
return dns.resolver.resolve(targethost, 'TXT')

def process_txt_records(records, substring):
new_keys = set()
found_new_key = False
for rdata in records:
for txt_string in rdata.strings:
txt_string = txt_string.decode()
if substring in txt_string:
key = txt_string.split('=')[-1] if '=' in txt_string else None
if key:
new_keys.add(key)
if key not in keys:
found_new_key = True
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - Found new '{substring}' key '{key}' in TXT record")
return new_keys, found_new_key

def monitor_dns(targethost, substring, d):
global keys
while True:
try:
new_keys = set()
found_new_key = False
answers = dns.resolver.resolve(hostname, 'TXT')
for rdata in answers:
for txt_string in rdata.strings:
txt_string = txt_string.decode()
if substring in txt_string:
key = txt_string.split('=')[-1] if '=' in txt_string else None
if key:
new_keys.add(key)
if key not in keys:
found_new_key = True
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - Found new '{substring}' key '{key}' in TXT record of {hostname}")
records = fetch_txt_records(targethost)
new_keys, found_new_key = process_txt_records(records, substring)
keys = new_keys
if not found_new_key:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - No new keys found in this loop for {hostname}")
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - No new keys found in this loop for {targethost}")
except Exception as e:
print(f"Error occurred: {e}")
time.sleep(d)

monitor_dns(args.hostname, args.substring, args.d)
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser(description='Monitor DNS TXT records.')
parser.add_argument('--targethost', type=str, default=os.getenv('TARGETHOSTNAME', 'futurestay.com'))
parser.add_argument('--substring', type=str, default=os.getenv('SUBSTRING', 'google-site-verification'))
parser.add_argument('--d', type=int, default=os.getenv('DELAY', 5))
args = parser.parse_args()
monitor_dns(args.targethost, args.substring, args.d)

28 changes: 28 additions & 0 deletions test_dns_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python3

# test_dns_monitor.py
import unittest
from unittest.mock import patch, MagicMock
from dns_monitor import process_txt_records

class TestDNSMonitor(unittest.TestCase):

@patch('dns_monitor.fetch_txt_records')
def test_process_txt_records(self, mock_fetch_txt_records):
# Simulate multiple Rdata objects
mock_rdata1 = MagicMock()
mock_rdata1.strings = [b'google-site-verification=abc123']
mock_rdata2 = MagicMock()
mock_rdata2.strings = [b'google-site-verification=def456']

# Return a list of mock Rdata objects
mock_fetch_txt_records.return_value = [mock_rdata1, mock_rdata2]

new_keys, found_new_key = process_txt_records(mock_fetch_txt_records.return_value, 'google-site-verification')

self.assertEqual(new_keys, {'abc123', 'def456'})
self.assertTrue(found_new_key)

if __name__ == "__main__":
unittest.main()

0 comments on commit 6d57951

Please sign in to comment.