Skip to content

Commit

Permalink
Merge pull request #50 from CybercentreCanada/malware_config
Browse files Browse the repository at this point in the history
Add the ability to score the `malware_config` section of a file, if i…
  • Loading branch information
cccs-rs authored Jan 29, 2025
2 parents 515f3f5 + 8daf0e8 commit 18fb9ac
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
6 changes: 6 additions & 0 deletions service_manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ heuristics:
filetype: "*"
description: "VT has crowdsourced AI analysis of the file"

- heur_id: 1002
name: Malware Configuration Extracted
score: 1000
filetype: "*"
description: "VT has extracted malware configuration"

docker_config:
image: ${REGISTRY}cccs/assemblyline-service-virustotal:$SERVICE_TAG
cpu_cores: 0.25
Expand Down
46 changes: 45 additions & 1 deletion virustotal/reports/common/info.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import json
import regex

from assemblyline_v4_service.common.result import BODY_FORMAT, ResultSection
from typing import Any

from assemblyline.odm import IP_ONLY_REGEX, DOMAIN_ONLY_REGEX, FULL_URI
from assemblyline_v4_service.common.result import BODY_FORMAT, ResultSection, ResultJSONSection, Heuristic


# Modeling output after PDFId service
Expand Down Expand Up @@ -103,6 +107,46 @@ def pe_section(info={}, exiftool={}, signature={}):

return main_section

def malware_config_section(malware_config={}):
tags = {}
heur = None
def tag_output(output: Any, tags: dict = {}):
def tag_string(value):
if regex.search(IP_ONLY_REGEX, value):
tags.setdefault("network.static.ip", []).append(value)
elif regex.search(DOMAIN_ONLY_REGEX, value):
tags.setdefault("network.static.domain", []).append(value)
elif regex.search(FULL_URI, value):
tags.setdefault("network.static.uri", []).append(value)

if isinstance(output, dict):
# Iterate over values of dictionary
for value in output.values():
if isinstance(value, dict):
tag_output(value, tags)
elif isinstance(value, list):
[tag_output(v, tags) for v in value]
elif isinstance(value, str):
tag_string(value)

elif isinstance(output, str):
tag_string(output)

# Add tags for attribution
for k, v in malware_config.items():
if "campaign" in k:
tags.setdefault("attribution.campaign", []).append(v)
elif "family" in k:
tags.setdefault("attribution.family", []).append(v)
elif "domain" in k:
tags.setdefault("attribution.network", []).append(v)

if not heur and ("campaign" in k or "family" in k):
heur = Heuristic(1002)

# Tag anything resembling an IP, domain, or URI
tag_output(malware_config, tags)
return ResultJSONSection("Malware Configuration", section_body=malware_config, tags=tags, heuristic=heur)

# Modeling output after YARA service
def yara_section(rule_matches=[]):
Expand Down
9 changes: 2 additions & 7 deletions virustotal/reports/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,8 @@ def v3(doc, file_name, av_processor: AVResultsProcessor):
main_section.add_subsection(info_section)

# Malware Config
for k, v in attributes.get("malware_config", {}).items():
if "campaign" in k:
main_section.add_tag("attribution.campaign", v)
elif "family" in k:
main_section.add_tag("attribution.family", v)
elif "domain" in k:
main_section.add_tag("attribution.network", v)
if attributes.get("malware_config"):
infected_section.add_subsection(info.malware_config_section(attributes["malware_config"]))

infected_section, no_av_section = av_processor.get_av_results(attributes["last_analysis_results"])
if infected_section.subsections:
Expand Down

0 comments on commit 18fb9ac

Please sign in to comment.