mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
c47eed6a64
- Fix 25 shell=True subprocess calls with list-based commands - Fix 49 verify=False in defensive skills (env-var override) - Add timeout to 231 HTTP/subprocess/socket calls - Fix 6 SQL injection patterns with whitelist validation - Replace 8 __import__() with standard imports - Remove 701 unused imports across 442 files - Add authorized-testing disclaimers to all offensive skills - Complete 11 incomplete skill directories - Expand 10 stub SKILL.md files with full content - Fix 2 YAML parse errors in frontmatter - Fix 5 pre-existing syntax errors - Convert 22 hardcoded paths/ports to environment variables - Back up 21 redundant skill pairs to .bak - Fix 2 global declaration errors - 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE) - 0 compile errors across all 724 agent.py files
205 lines
7.9 KiB
Python
205 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Analyze malware sandbox evasion techniques from Cuckoo/AnyRun behavioral reports."""
|
|
|
|
import json
|
|
import argparse
|
|
from datetime import datetime
|
|
|
|
TIMING_APIS = {
|
|
"GetTickCount", "GetTickCount64", "QueryPerformanceCounter",
|
|
"QueryPerformanceFrequency", "GetSystemTimeAsFileTime", "NtQuerySystemTime",
|
|
"timeGetTime", "GetLocalTime", "GetSystemTime",
|
|
}
|
|
|
|
SLEEP_APIS = {"Sleep", "SleepEx", "NtDelayExecution", "WaitForSingleObject"}
|
|
|
|
VM_REGISTRY_KEYS = [
|
|
"HKLM\\SOFTWARE\\VMware", "HKLM\\SOFTWARE\\Oracle\\VirtualBox",
|
|
"HKLM\\HARDWARE\\ACPI\\DSDT\\VBOX", "HKLM\\SYSTEM\\CurrentControlSet\\Services\\VBoxGuest",
|
|
"HKLM\\SOFTWARE\\Microsoft\\Virtual Machine\\Guest\\Parameters",
|
|
"HKLM\\HARDWARE\\Description\\System\\SystemBiosVersion",
|
|
]
|
|
|
|
VM_PROCESSES = {
|
|
"vmtoolsd.exe", "vmwaretray.exe", "vboxservice.exe", "vboxtray.exe",
|
|
"qemu-ga.exe", "vmusrvc.exe", "prl_tools.exe", "xenservice.exe",
|
|
"windanr.exe", "vdagent.exe",
|
|
}
|
|
|
|
VM_MAC_PREFIXES = ["00:0C:29", "00:50:56", "08:00:27", "00:1C:42", "00:16:3E", "52:54:00"]
|
|
|
|
USER_INTERACTION_APIS = {
|
|
"GetCursorPos", "GetAsyncKeyState", "GetForegroundWindow",
|
|
"GetLastInputInfo", "mouse_event", "keybd_event",
|
|
}
|
|
|
|
WMI_EVASION_QUERIES = [
|
|
"Win32_ComputerSystem", "Win32_BIOS", "Win32_DiskDrive",
|
|
"Win32_PhysicalMemory", "Win32_Processor",
|
|
]
|
|
|
|
|
|
def parse_cuckoo_report(filepath):
|
|
"""Parse a Cuckoo Sandbox behavioral report JSON."""
|
|
with open(filepath) as f:
|
|
report = json.load(f)
|
|
behavior = report.get("behavior", {})
|
|
api_calls = []
|
|
for process in behavior.get("processes", []):
|
|
for call in process.get("calls", []):
|
|
api_calls.append({
|
|
"api": call.get("api", ""),
|
|
"category": call.get("category", ""),
|
|
"arguments": call.get("arguments", {}),
|
|
"return": call.get("return", ""),
|
|
"process_name": process.get("process_name", ""),
|
|
"pid": process.get("pid", 0),
|
|
})
|
|
return api_calls, report
|
|
|
|
|
|
def detect_timing_checks(api_calls):
|
|
"""Detect timing-based sandbox evasion via GetTickCount, QPC, etc."""
|
|
findings = []
|
|
timing_count = 0
|
|
for call in api_calls:
|
|
if call["api"] in TIMING_APIS:
|
|
timing_count += 1
|
|
if timing_count >= 3:
|
|
findings.append({
|
|
"technique": "Timing-Based Evasion",
|
|
"mitre_id": "T1497.003",
|
|
"api_count": timing_count,
|
|
"apis_used": list({c["api"] for c in api_calls if c["api"] in TIMING_APIS}),
|
|
"severity": "high",
|
|
"description": f"{timing_count} timing API calls detected; malware may be measuring execution time to detect sandbox acceleration",
|
|
})
|
|
return findings
|
|
|
|
|
|
def detect_sleep_inflation(api_calls, min_sleep_ms=60000):
|
|
"""Detect sleep calls with long durations used to evade sandbox time limits."""
|
|
findings = []
|
|
for call in api_calls:
|
|
if call["api"] not in SLEEP_APIS:
|
|
continue
|
|
ms = 0
|
|
args = call.get("arguments", {})
|
|
if isinstance(args, dict):
|
|
ms = int(args.get("Milliseconds", args.get("milliseconds", 0)))
|
|
elif isinstance(args, list):
|
|
for a in args:
|
|
if isinstance(a, dict) and a.get("name", "").lower() == "milliseconds":
|
|
ms = int(a.get("value", 0))
|
|
if ms >= min_sleep_ms:
|
|
findings.append({
|
|
"technique": "Sleep Inflation",
|
|
"mitre_id": "T1497.003",
|
|
"api": call["api"],
|
|
"sleep_ms": ms,
|
|
"sleep_seconds": ms / 1000,
|
|
"process": call["process_name"],
|
|
"severity": "high",
|
|
"description": f"Sleep call of {ms / 1000:.0f}s detected; likely delaying execution to outlast sandbox analysis window",
|
|
})
|
|
return findings
|
|
|
|
|
|
def detect_vm_artifact_checks(api_calls):
|
|
"""Detect VM artifact queries (registry, processes, MAC addresses)."""
|
|
findings = []
|
|
for call in api_calls:
|
|
args_str = json.dumps(call.get("arguments", "")).lower()
|
|
for reg_key in VM_REGISTRY_KEYS:
|
|
if reg_key.lower() in args_str:
|
|
findings.append({
|
|
"technique": "VM Registry Artifact Check",
|
|
"mitre_id": "T1497.001",
|
|
"registry_key": reg_key,
|
|
"api": call["api"],
|
|
"severity": "high",
|
|
})
|
|
break
|
|
for wmi_query in WMI_EVASION_QUERIES:
|
|
if wmi_query.lower() in args_str:
|
|
findings.append({
|
|
"technique": "WMI Environment Fingerprinting",
|
|
"mitre_id": "T1497.001",
|
|
"wmi_class": wmi_query,
|
|
"api": call["api"],
|
|
"severity": "medium",
|
|
})
|
|
break
|
|
return findings
|
|
|
|
|
|
def detect_user_interaction_checks(api_calls):
|
|
"""Detect checks for user interaction (mouse, keyboard, foreground window)."""
|
|
interaction_apis = [c for c in api_calls if c["api"] in USER_INTERACTION_APIS]
|
|
if len(interaction_apis) >= 2:
|
|
return [{
|
|
"technique": "User Interaction Detection",
|
|
"mitre_id": "T1497.002",
|
|
"api_count": len(interaction_apis),
|
|
"apis_used": list({c["api"] for c in interaction_apis}),
|
|
"severity": "medium",
|
|
"description": "Malware checks for user input to determine if running in automated sandbox",
|
|
}]
|
|
return []
|
|
|
|
|
|
def score_evasion_sophistication(all_findings):
|
|
"""Score evasion sophistication based on technique diversity."""
|
|
technique_ids = {f["mitre_id"] for f in all_findings}
|
|
categories = {f["technique"].split()[0] for f in all_findings}
|
|
score = min(len(all_findings) * 10 + len(technique_ids) * 15 + len(categories) * 10, 100)
|
|
level = "low" if score < 30 else "medium" if score < 60 else "high"
|
|
return {"score": score, "level": level, "unique_techniques": len(technique_ids), "total_indicators": len(all_findings)}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Sandbox Evasion Technique Analyzer")
|
|
parser.add_argument("--report", required=True, help="Path to Cuckoo/AnyRun behavioral report JSON")
|
|
parser.add_argument("--min-sleep-ms", type=int, default=60000, help="Minimum sleep duration to flag (ms)")
|
|
parser.add_argument("--output", default="evasion_analysis_report.json", help="Output report path")
|
|
args = parser.parse_args()
|
|
|
|
api_calls, raw_report = parse_cuckoo_report(args.report)
|
|
print(f"[+] Parsed {len(api_calls)} API calls from behavioral report")
|
|
|
|
timing = detect_timing_checks(api_calls)
|
|
sleep = detect_sleep_inflation(api_calls, args.min_sleep_ms)
|
|
vm_checks = detect_vm_artifact_checks(api_calls)
|
|
user_checks = detect_user_interaction_checks(api_calls)
|
|
|
|
all_findings = timing + sleep + vm_checks + user_checks
|
|
sophistication = score_evasion_sophistication(all_findings)
|
|
|
|
report = {
|
|
"analysis_time": datetime.utcnow().isoformat() + "Z",
|
|
"sample_sha256": raw_report.get("target", {}).get("file", {}).get("sha256", ""),
|
|
"total_api_calls": len(api_calls),
|
|
"evasion_findings": {
|
|
"timing_checks": timing,
|
|
"sleep_inflation": sleep,
|
|
"vm_artifact_checks": vm_checks,
|
|
"user_interaction_checks": user_checks,
|
|
},
|
|
"total_indicators": len(all_findings),
|
|
"sophistication": sophistication,
|
|
"mitre_techniques": ["T1497.001", "T1497.002", "T1497.003"],
|
|
}
|
|
|
|
with open(args.output, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"[+] Timing checks: {len(timing)}")
|
|
print(f"[+] Sleep inflation: {len(sleep)}")
|
|
print(f"[+] VM artifact checks: {len(vm_checks)}")
|
|
print(f"[+] User interaction checks: {len(user_checks)}")
|
|
print(f"[+] Evasion sophistication: {sophistication['level']} ({sophistication['score']}/100)")
|
|
print(f"[+] Report saved to {args.output}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|