Files
mukul975 c47eed6a64 Production hardening: security fixes, code quality, 724 skills complete
- Fix 25 shell=True subprocess calls with list-based commands
- Fix 49 verify=False in defensive skills (env-var override)
- Add timeout to 231 HTTP/subprocess/socket calls
- Fix 6 SQL injection patterns with whitelist validation
- Replace 8 __import__() with standard imports
- Remove 701 unused imports across 442 files
- Add authorized-testing disclaimers to all offensive skills
- Complete 11 incomplete skill directories
- Expand 10 stub SKILL.md files with full content
- Fix 2 YAML parse errors in frontmatter
- Fix 5 pre-existing syntax errors
- Convert 22 hardcoded paths/ports to environment variables
- Back up 21 redundant skill pairs to .bak
- Fix 2 global declaration errors
- 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE)
- 0 compile errors across all 724 agent.py files
2026-03-19 13:26:49 +01:00

205 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""Analyze malware sandbox evasion techniques from Cuckoo/AnyRun behavioral reports."""
import json
import argparse
from datetime import datetime
TIMING_APIS = {
"GetTickCount", "GetTickCount64", "QueryPerformanceCounter",
"QueryPerformanceFrequency", "GetSystemTimeAsFileTime", "NtQuerySystemTime",
"timeGetTime", "GetLocalTime", "GetSystemTime",
}
SLEEP_APIS = {"Sleep", "SleepEx", "NtDelayExecution", "WaitForSingleObject"}
VM_REGISTRY_KEYS = [
"HKLM\\SOFTWARE\\VMware", "HKLM\\SOFTWARE\\Oracle\\VirtualBox",
"HKLM\\HARDWARE\\ACPI\\DSDT\\VBOX", "HKLM\\SYSTEM\\CurrentControlSet\\Services\\VBoxGuest",
"HKLM\\SOFTWARE\\Microsoft\\Virtual Machine\\Guest\\Parameters",
"HKLM\\HARDWARE\\Description\\System\\SystemBiosVersion",
]
VM_PROCESSES = {
"vmtoolsd.exe", "vmwaretray.exe", "vboxservice.exe", "vboxtray.exe",
"qemu-ga.exe", "vmusrvc.exe", "prl_tools.exe", "xenservice.exe",
"windanr.exe", "vdagent.exe",
}
VM_MAC_PREFIXES = ["00:0C:29", "00:50:56", "08:00:27", "00:1C:42", "00:16:3E", "52:54:00"]
USER_INTERACTION_APIS = {
"GetCursorPos", "GetAsyncKeyState", "GetForegroundWindow",
"GetLastInputInfo", "mouse_event", "keybd_event",
}
WMI_EVASION_QUERIES = [
"Win32_ComputerSystem", "Win32_BIOS", "Win32_DiskDrive",
"Win32_PhysicalMemory", "Win32_Processor",
]
def parse_cuckoo_report(filepath):
"""Parse a Cuckoo Sandbox behavioral report JSON."""
with open(filepath) as f:
report = json.load(f)
behavior = report.get("behavior", {})
api_calls = []
for process in behavior.get("processes", []):
for call in process.get("calls", []):
api_calls.append({
"api": call.get("api", ""),
"category": call.get("category", ""),
"arguments": call.get("arguments", {}),
"return": call.get("return", ""),
"process_name": process.get("process_name", ""),
"pid": process.get("pid", 0),
})
return api_calls, report
def detect_timing_checks(api_calls):
"""Detect timing-based sandbox evasion via GetTickCount, QPC, etc."""
findings = []
timing_count = 0
for call in api_calls:
if call["api"] in TIMING_APIS:
timing_count += 1
if timing_count >= 3:
findings.append({
"technique": "Timing-Based Evasion",
"mitre_id": "T1497.003",
"api_count": timing_count,
"apis_used": list({c["api"] for c in api_calls if c["api"] in TIMING_APIS}),
"severity": "high",
"description": f"{timing_count} timing API calls detected; malware may be measuring execution time to detect sandbox acceleration",
})
return findings
def detect_sleep_inflation(api_calls, min_sleep_ms=60000):
"""Detect sleep calls with long durations used to evade sandbox time limits."""
findings = []
for call in api_calls:
if call["api"] not in SLEEP_APIS:
continue
ms = 0
args = call.get("arguments", {})
if isinstance(args, dict):
ms = int(args.get("Milliseconds", args.get("milliseconds", 0)))
elif isinstance(args, list):
for a in args:
if isinstance(a, dict) and a.get("name", "").lower() == "milliseconds":
ms = int(a.get("value", 0))
if ms >= min_sleep_ms:
findings.append({
"technique": "Sleep Inflation",
"mitre_id": "T1497.003",
"api": call["api"],
"sleep_ms": ms,
"sleep_seconds": ms / 1000,
"process": call["process_name"],
"severity": "high",
"description": f"Sleep call of {ms / 1000:.0f}s detected; likely delaying execution to outlast sandbox analysis window",
})
return findings
def detect_vm_artifact_checks(api_calls):
"""Detect VM artifact queries (registry, processes, MAC addresses)."""
findings = []
for call in api_calls:
args_str = json.dumps(call.get("arguments", "")).lower()
for reg_key in VM_REGISTRY_KEYS:
if reg_key.lower() in args_str:
findings.append({
"technique": "VM Registry Artifact Check",
"mitre_id": "T1497.001",
"registry_key": reg_key,
"api": call["api"],
"severity": "high",
})
break
for wmi_query in WMI_EVASION_QUERIES:
if wmi_query.lower() in args_str:
findings.append({
"technique": "WMI Environment Fingerprinting",
"mitre_id": "T1497.001",
"wmi_class": wmi_query,
"api": call["api"],
"severity": "medium",
})
break
return findings
def detect_user_interaction_checks(api_calls):
"""Detect checks for user interaction (mouse, keyboard, foreground window)."""
interaction_apis = [c for c in api_calls if c["api"] in USER_INTERACTION_APIS]
if len(interaction_apis) >= 2:
return [{
"technique": "User Interaction Detection",
"mitre_id": "T1497.002",
"api_count": len(interaction_apis),
"apis_used": list({c["api"] for c in interaction_apis}),
"severity": "medium",
"description": "Malware checks for user input to determine if running in automated sandbox",
}]
return []
def score_evasion_sophistication(all_findings):
"""Score evasion sophistication based on technique diversity."""
technique_ids = {f["mitre_id"] for f in all_findings}
categories = {f["technique"].split()[0] for f in all_findings}
score = min(len(all_findings) * 10 + len(technique_ids) * 15 + len(categories) * 10, 100)
level = "low" if score < 30 else "medium" if score < 60 else "high"
return {"score": score, "level": level, "unique_techniques": len(technique_ids), "total_indicators": len(all_findings)}
def main():
parser = argparse.ArgumentParser(description="Sandbox Evasion Technique Analyzer")
parser.add_argument("--report", required=True, help="Path to Cuckoo/AnyRun behavioral report JSON")
parser.add_argument("--min-sleep-ms", type=int, default=60000, help="Minimum sleep duration to flag (ms)")
parser.add_argument("--output", default="evasion_analysis_report.json", help="Output report path")
args = parser.parse_args()
api_calls, raw_report = parse_cuckoo_report(args.report)
print(f"[+] Parsed {len(api_calls)} API calls from behavioral report")
timing = detect_timing_checks(api_calls)
sleep = detect_sleep_inflation(api_calls, args.min_sleep_ms)
vm_checks = detect_vm_artifact_checks(api_calls)
user_checks = detect_user_interaction_checks(api_calls)
all_findings = timing + sleep + vm_checks + user_checks
sophistication = score_evasion_sophistication(all_findings)
report = {
"analysis_time": datetime.utcnow().isoformat() + "Z",
"sample_sha256": raw_report.get("target", {}).get("file", {}).get("sha256", ""),
"total_api_calls": len(api_calls),
"evasion_findings": {
"timing_checks": timing,
"sleep_inflation": sleep,
"vm_artifact_checks": vm_checks,
"user_interaction_checks": user_checks,
},
"total_indicators": len(all_findings),
"sophistication": sophistication,
"mitre_techniques": ["T1497.001", "T1497.002", "T1497.003"],
}
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"[+] Timing checks: {len(timing)}")
print(f"[+] Sleep inflation: {len(sleep)}")
print(f"[+] VM artifact checks: {len(vm_checks)}")
print(f"[+] User interaction checks: {len(user_checks)}")
print(f"[+] Evasion sophistication: {sophistication['level']} ({sophistication['score']}/100)")
print(f"[+] Report saved to {args.output}")
if __name__ == "__main__":
main()