Skip to content

Commit

Permalink
Merge pull request #1311 from yogeshojha/1292-bug-local-model-dont-us…
Browse files Browse the repository at this point in the history
…e-fetch-gpt-vulnerability-details

Fix LLM/langchain issue for fetching vulnerability report using local LLM model Fixed #1292  local model dont use fetch gpt vulnerability details
  • Loading branch information
yogeshojha authored Jul 19, 2024
2 parents 519547a + 11ddd04 commit 49bbb75
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 115 deletions.
2 changes: 1 addition & 1 deletion web/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
name='waf_detector'),
path(
'tools/gpt_vulnerability_report/',
GPTVulnerabilityReportGenerator.as_view(),
LLMVulnerabilityReportGenerator.as_view(),
name='gpt_vulnerability_report_generator'),
path(
'tools/gpt_get_possible_attacks/',
Expand Down
13 changes: 7 additions & 6 deletions web/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from reNgine.common_func import *
from reNgine.definitions import ABORTED_TASK
from reNgine.tasks import *
from reNgine.gpt import GPTAttackSuggestionGenerator
from reNgine.llm import *
from reNgine.utilities import is_safe_path
from scanEngine.models import *
from startScan.models import *
Expand Down Expand Up @@ -141,7 +141,7 @@ def get(self, request):
tech_used = ''
for tech in subdomain.technologies.all():
tech_used += f'{tech.name}, '
input = f'''
llm_input = f'''
Subdomain Name: {subdomain.name}
Subdomain Page Title: {subdomain.page_title}
Open Ports: {open_ports_str}
Expand All @@ -151,16 +151,17 @@ def get(self, request):
Web Server: {subdomain.webserver}
Page Content Length: {subdomain.content_length}
'''
gpt = GPTAttackSuggestionGenerator()
response = gpt.get_attack_suggestion(input)
llm_input = re.sub(r'\t', '', llm_input)
gpt = LLMAttackSuggestionGenerator(logger)
response = gpt.get_attack_suggestion(llm_input)
response['subdomain_name'] = subdomain.name
if response.get('status'):
subdomain.attack_surface = response.get('description')
subdomain.save()
return Response(response)


class GPTVulnerabilityReportGenerator(APIView):
class LLMVulnerabilityReportGenerator(APIView):
def get(self, request):
req = self.request
vulnerability_id = req.query_params.get('id')
Expand All @@ -169,7 +170,7 @@ def get(self, request):
'status': False,
'error': 'Missing GET param Vulnerability `id`'
})
task = gpt_vulnerability_description.apply_async(args=(vulnerability_id,))
task = llm_vulnerability_description.apply_async(args=(vulnerability_id,))
response = task.wait()
return Response(response)

Expand Down
5 changes: 4 additions & 1 deletion web/celery-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ echo 'alias httpx="/go/bin/httpx"' >> ~/.bashrc
# TEMPORARY FIX, httpcore is causing issues with celery, removing it as temp fix
python3 -m pip uninstall -y httpcore

# TEMPORARY FIX FOR langchain
pip install tenacity==8.2.2

loglevel='info'
if [ "$DEBUG" == "1" ]; then
loglevel='debug'
Expand All @@ -183,7 +186,7 @@ watchmedo auto-restart --recursive --pattern="*.py" --directory="/usr/src/app/re
watchmedo auto-restart --recursive --pattern="*.py" --directory="/usr/src/app/reNgine/" -- celery -A reNgine.tasks worker --pool=gevent --concurrency=50 --loglevel=$loglevel -Q run_command_queue -n run_command_worker &
watchmedo auto-restart --recursive --pattern="*.py" --directory="/usr/src/app/reNgine/" -- celery -A reNgine.tasks worker --pool=gevent --concurrency=10 --loglevel=$loglevel -Q query_reverse_whois_queue -n query_reverse_whois_worker &
watchmedo auto-restart --recursive --pattern="*.py" --directory="/usr/src/app/reNgine/" -- celery -A reNgine.tasks worker --pool=gevent --concurrency=10 --loglevel=$loglevel -Q query_ip_history_queue -n query_ip_history_worker &
watchmedo auto-restart --recursive --pattern="*.py" --directory="/usr/src/app/reNgine/" -- celery -A reNgine.tasks worker --pool=gevent --concurrency=30 --loglevel=$loglevel -Q gpt_queue -n gpt_worker &
watchmedo auto-restart --recursive --pattern="*.py" --directory="/usr/src/app/reNgine/" -- celery -A reNgine.tasks worker --pool=gevent --concurrency=30 --loglevel=$loglevel -Q llm_queue -n llm_worker &
watchmedo auto-restart --recursive --pattern="*.py" --directory="/usr/src/app/reNgine/" -- celery -A reNgine.tasks worker --pool=gevent --concurrency=10 --loglevel=$loglevel -Q dorking_queue -n dorking_worker &
watchmedo auto-restart --recursive --pattern="*.py" --directory="/usr/src/app/reNgine/" -- celery -A reNgine.tasks worker --pool=gevent --concurrency=10 --loglevel=$loglevel -Q osint_discovery_queue -n osint_discovery_worker &
watchmedo auto-restart --recursive --pattern="*.py" --directory="/usr/src/app/reNgine/" -- celery -A reNgine.tasks worker --pool=gevent --concurrency=10 --loglevel=$loglevel -Q h8mail_queue -n h8mail_worker &
Expand Down
90 changes: 54 additions & 36 deletions web/reNgine/common_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,38 +35,38 @@
# EngineType utils #
#------------------#
def dump_custom_scan_engines(results_dir):
"""Dump custom scan engines to YAML files.
Args:
results_dir (str): Results directory (will be created if non-existent).
"""
custom_engines = EngineType.objects.filter(default_engine=False)
if not os.path.exists(results_dir):
os.makedirs(results_dir, exist_ok=True)
for engine in custom_engines:
with open(os.path.join(results_dir, f"{engine.engine_name}.yaml"), 'w') as f:
f.write(engine.yaml_configuration)
"""Dump custom scan engines to YAML files.
Args:
results_dir (str): Results directory (will be created if non-existent).
"""
custom_engines = EngineType.objects.filter(default_engine=False)
if not os.path.exists(results_dir):
os.makedirs(results_dir, exist_ok=True)
for engine in custom_engines:
with open(os.path.join(results_dir, f"{engine.engine_name}.yaml"), 'w') as f:
f.write(engine.yaml_configuration)

def load_custom_scan_engines(results_dir):
"""Load custom scan engines from YAML files. The filename without .yaml will
be used as the engine name.
Args:
results_dir (str): Results directory containing engines configs.
"""
config_paths = [
f for f in os.listdir(results_dir)
if os.path.isfile(os.path.join(results_dir, f)) and f.endswith('.yaml')
]
for path in config_paths:
engine_name = os.path.splitext(os.path.basename(path))[0]
full_path = os.path.join(results_dir, path)
with open(full_path, 'r') as f:
yaml_configuration = f.read()

engine, _ = EngineType.objects.get_or_create(engine_name=engine_name)
engine.yaml_configuration = yaml_configuration
engine.save()
"""Load custom scan engines from YAML files. The filename without .yaml will
be used as the engine name.
Args:
results_dir (str): Results directory containing engines configs.
"""
config_paths = [
f for f in os.listdir(results_dir)
if os.path.isfile(os.path.join(results_dir, f)) and f.endswith('.yaml')
]
for path in config_paths:
engine_name = os.path.splitext(os.path.basename(path))[0]
full_path = os.path.join(results_dir, path)
with open(full_path, 'r') as f:
yaml_configuration = f.read()

engine, _ = EngineType.objects.get_or_create(engine_name=engine_name)
engine.yaml_configuration = yaml_configuration
engine.save()


#--------------------------------#
Expand Down Expand Up @@ -1005,9 +1005,27 @@ def get_netlas_key():
netlas_key = NetlasAPIKey.objects.all()
return netlas_key[0] if netlas_key else None


def extract_between(text, pattern):
match = pattern.search(text)
if match:
return match.group(1).strip()
return ""
def parse_llm_vulnerability_report(report):
report = report.replace('**', '')
data = {}
sections = re.split(r'\n(?=(?:Description|Impact|Remediation|References):)', report.strip())

try:
for section in sections:
if not section.strip():
continue

section_title, content = re.split(r':\n', section.strip(), maxsplit=1)

if section_title == 'Description':
data['description'] = content.strip()
elif section_title == 'Impact':
data['impact'] = content.strip()
elif section_title == 'Remediation':
data['remediation'] = content.strip()
elif section_title == 'References':
data['references'] = [ref.strip() for ref in content.split('\n') if ref.strip()]
except Exception as e:
return data

return data
46 changes: 25 additions & 21 deletions web/reNgine/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@
'parameter_size': '~1.7T',
}
},
{
{
'name': 'gpt-4-turbo',
'model': 'gpt-4',
'modified_at': '',
Expand All @@ -485,26 +485,30 @@

# GPT Vulnerability Report Generator
VULNERABILITY_DESCRIPTION_SYSTEM_MESSAGE = """
You are a highly skilled penetration tester who has recently completed a penetration testing.
You will be given with a
- Vulnerability title
- Vulnerable URL
- and some description about the vulnerability.
Your job is to write a detailed technical penetration testing report based on the given Vulnerability details.
The purpose of this report is to provide an in-depth analysis of the vulnerabilities discovered during the penetration testing engagement.
The penetration testing report must contain all separated by \n\n
- Vulnerability description
Include a detailed vulnerability description, include any known CVE IDs, any known existing vulnerabilities.
- Impact
Include what this vulnerability can impact for web applications.
- Remediation
Include steps to remediate this vulnerability. Separate each new remediations by - and a new line \n
- References
Include any references URL about this vulnerability, any existing CVE ID, or news articles etc. Separate each new references by - and a new line \n. Only include http urls
Do not write 'Penetration Testing Report:' on the title.
You are an expert penetration tester who has just completed a comprehensive security assessment. Based on the provided vulnerability title, vulnerable URL, and vulnerability description, your task is to generate a detailed, technical penetration testing report in plain text format.
Your task is to generate a detailed, technical penetration testing report. This report should offer an in-depth analysis of the discovered vulnerabilities, adhering to industry best practices and standards.
The output should adhere to the following structure:
Description:
A comprehensive explanation of the vulnerability, including: Detailed technical analysis, Associated CVE IDs (if any), Related known vulnerabilities, Exploitation methods
Impact:
A thorough assessment of the vulnerability's potential impact on web applications, including: Data confidentiality breaches, System integrity compromises, Service availability disruptions, Potential for further exploitation
Remediation:
A prioritized list of specific, actionable steps to address the vulnerability, such as: Code modifications, Configuration changes, Security patch applications, Implementation of security controls
References:
Relevant, authoritative sources supporting your analysis, such as: Official CVE database entries, Vendor security advisories, Respected security research publications, Applicable industry standards or guidelines
Ensure that:
1. Each section (Description, Impact, Remediation, References) is separated by ONLY ONE blank line and no multiple new lines. The content must be immediately after the section title.
2. Do not make title as bold, italic or underline. It must be Title ending with a colon. Example: Description:
3. All URLs in the 'references' section begin with 'http://' or 'https://'.
4. Remediation steps should be specific and actionable and should not contain any ambiguous or general recommendations.
5. Refrain from including any personal opinions or subjective assessments in your report.
"""


Expand Down
76 changes: 38 additions & 38 deletions web/reNgine/gpt.py → web/reNgine/llm.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@

import openai
import re
from reNgine.common_func import get_open_ai_key, extract_between
from reNgine.common_func import get_open_ai_key, parse_llm_vulnerability_report
from reNgine.definitions import VULNERABILITY_DESCRIPTION_SYSTEM_MESSAGE, ATTACK_SUGGESTION_GPT_SYSTEM_PROMPT, OLLAMA_INSTANCE
from langchain_community.llms import Ollama

from dashboard.models import OllamaSettings

class GPTVulnerabilityReportGenerator:

def __init__(self):
class LLMVulnerabilityReportGenerator:

def __init__(self, logger):
selected_model = OllamaSettings.objects.first()
self.model_name = selected_model.selected_model if selected_model else 'gpt-3.5-turbo'
self.use_ollama = selected_model.use_ollama if selected_model else False
self.openai_api_key = None
self.ollama = None
self.logger = logger

def get_vulnerability_description(self, description):
"""Generate Vulnerability Description using GPT.
Expand All @@ -29,27 +31,32 @@ def get_vulnerability_description(self, description):
'references': (list) of urls
}
"""
print(f"Generating Vulnerability Description for: {description}")
self.logger.info(f"Generating Vulnerability Description for: {description}")
if self.use_ollama:
prompt = VULNERABILITY_DESCRIPTION_SYSTEM_MESSAGE + "\nUser: " + description
self.ollama = Ollama(
prompt = re.sub(r'\t', '', prompt)
self.logger.info(f"Using Ollama for Vulnerability Description Generation")
llm = Ollama(
base_url=OLLAMA_INSTANCE,
model=self.model_name
)
response_content = self.ollama(prompt)
response_content = llm.invoke(prompt)
# self.logger.info(response_content)
else:
self.logger.info(f'Using OpenAI API for Vulnerability Description Generation')
openai_api_key = get_open_ai_key()
if not openai_api_key:
return {
'status': False,
'error': 'OpenAI API Key not set'
}
try:
prompt = re.sub(r'\t', '', VULNERABILITY_DESCRIPTION_SYSTEM_MESSAGE)
openai.api_key = openai_api_key
gpt_response = openai.ChatCompletion.create(
model=self.model_name,
messages=[
{'role': 'system', 'content': VULNERABILITY_DESCRIPTION_SYSTEM_MESSAGE},
{'role': 'system', 'content': prompt},
{'role': 'user', 'content': description}
]
)
Expand All @@ -60,69 +67,62 @@ def get_vulnerability_description(self, description):
'status': False,
'error': str(e)
}
vuln_description_pattern = re.compile(
r"[Vv]ulnerability [Dd]escription:(.*?)(?:\n\n[Ii]mpact:|$)",
re.DOTALL
)
impact_pattern = re.compile(
r"[Ii]mpact:(.*?)(?:\n\n[Rr]emediation:|$)",
re.DOTALL
)
remediation_pattern = re.compile(
r"[Rr]emediation:(.*?)(?:\n\n[Rr]eferences:|$)",
re.DOTALL
)

description_section = extract_between(response_content, vuln_description_pattern)
impact_section = extract_between(response_content, impact_pattern)
remediation_section = extract_between(response_content, remediation_pattern)
references_start_index = response_content.find("References:")
references_section = response_content[references_start_index + len("References:"):].strip()

response = parse_llm_vulnerability_report(response_content)

url_pattern = re.compile(r'https://\S+')
urls = url_pattern.findall(references_section)
if not response:
return {
'status': False,
'error': 'Failed to parse LLM response'
}

return {
'status': True,
'description': description_section,
'impact': impact_section,
'remediation': remediation_section,
'references': urls,
'description': response.get('description', ''),
'impact': response.get('impact', ''),
'remediation': response.get('remediation', ''),
'references': response.get('references', []),
}

class GPTAttackSuggestionGenerator:

def __init__(self):
class LLMAttackSuggestionGenerator:

def __init__(self, logger):
selected_model = OllamaSettings.objects.first()
self.model_name = selected_model.selected_model if selected_model else 'gpt-3.5-turbo'
self.use_ollama = selected_model.use_ollama if selected_model else False
self.openai_api_key = None
self.ollama = None
self.logger = logger

def get_attack_suggestion(self, user_input):
'''
user_input (str): input for gpt
'''
if self.use_ollama:
self.logger.info(f"Using Ollama for Attack Suggestion Generation")
prompt = ATTACK_SUGGESTION_GPT_SYSTEM_PROMPT + "\nUser: " + user_input
self.ollama = Ollama(
prompt = re.sub(r'\t', '', prompt)
llm = Ollama(
base_url=OLLAMA_INSTANCE,
model=self.model_name
)
response_content = self.ollama(prompt)
response_content = llm.invoke(prompt)
self.logger.info(response_content)
else:
self.logger.info(f'Using OpenAI API for Attack Suggestion Generation')
openai_api_key = get_open_ai_key()
if not openai_api_key:
return {
'status': False,
'error': 'OpenAI API Key not set'
}
try:
prompt = re.sub(r'\t', '', ATTACK_SUGGESTION_GPT_SYSTEM_PROMPT)
openai.api_key = openai_api_key
gpt_response = openai.ChatCompletion.create(
model=self.model_name,
messages=[
{'role': 'system', 'content': ATTACK_SUGGESTION_GPT_SYSTEM_PROMPT},
{'role': 'system', 'content': prompt},
{'role': 'user', 'content': user_input}
]
)
Expand Down
Loading

0 comments on commit 49bbb75

Please sign in to comment.