mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
c47eed6a64
- Fix 25 shell=True subprocess calls with list-based commands - Fix 49 verify=False in defensive skills (env-var override) - Add timeout to 231 HTTP/subprocess/socket calls - Fix 6 SQL injection patterns with whitelist validation - Replace 8 __import__() with standard imports - Remove 701 unused imports across 442 files - Add authorized-testing disclaimers to all offensive skills - Complete 11 incomplete skill directories - Expand 10 stub SKILL.md files with full content - Fix 2 YAML parse errors in frontmatter - Fix 5 pre-existing syntax errors - Convert 22 hardcoded paths/ports to environment variables - Back up 21 redundant skill pairs to .bak - Fix 2 global declaration errors - 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE) - 0 compile errors across all 724 agent.py files
163 lines
5.9 KiB
Python
163 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Agent for detecting suspicious PowerShell execution patterns."""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
SUSPICIOUS_CMDLETS = [
|
|
"Invoke-Expression", "IEX", "Invoke-WebRequest", "Invoke-RestMethod",
|
|
"Start-Process", "New-Object Net.WebClient", "DownloadString",
|
|
"DownloadFile", "System.Reflection.Assembly", "FromBase64String",
|
|
"Invoke-Mimikatz", "Invoke-Shellcode", "Invoke-DllInjection",
|
|
"Invoke-ReflectivePEInjection", "Get-Keystrokes", "Get-GPPPassword",
|
|
"Invoke-CredentialInjection", "Invoke-TokenManipulation",
|
|
"Add-Exfiltration", "Get-TimedScreenshot",
|
|
]
|
|
|
|
OBFUSCATION_PATTERNS = [
|
|
(r'\-[eE][nN][cC]\s', "Encoded command (-enc)"),
|
|
(r'[Ff][Rr][Oo][Mm][Bb][Aa][Ss][Ee]64', "Base64 decoding"),
|
|
(r'\$\{[^}]+\}', "Variable obfuscation ${...}"),
|
|
(r"'[^']*'\s*\+\s*'[^']*'", "String concatenation obfuscation"),
|
|
(r'\-[Ww]indow[Ss]tyle\s+[Hh]idden', "Hidden window execution"),
|
|
(r'\-[Nn]o[Pp]rofile', "NoProfile flag"),
|
|
(r'\-[Ee]xecution[Pp]olicy\s+[Bb]ypass', "Execution policy bypass"),
|
|
(r'[Ss]et-[Mm]pPreference.*-[Dd]isable', "Defender bypass attempt"),
|
|
(r'[Aa][Mm][Ss][Ii]', "AMSI reference"),
|
|
]
|
|
|
|
|
|
def parse_script_block_logs():
|
|
"""Parse PowerShell script block logging events (Event ID 4104)."""
|
|
events = []
|
|
if sys.platform != "win32":
|
|
return events
|
|
ps_cmd = (
|
|
"Get-WinEvent -FilterHashtable @{LogName='Microsoft-Windows-PowerShell/Operational';"
|
|
"Id=4104} -MaxEvents 200 | Select-Object TimeCreated,"
|
|
"@{N='ScriptBlock';E={$_.Properties[2].Value}},"
|
|
"@{N='Path';E={$_.Properties[4].Value}} | ConvertTo-Json -Depth 3"
|
|
)
|
|
try:
|
|
result = subprocess.check_output(
|
|
["powershell", "-NoProfile", "-Command", ps_cmd],
|
|
text=True, errors="replace", timeout=30
|
|
)
|
|
data = json.loads(result) if result.strip() else []
|
|
return data if isinstance(data, list) else [data]
|
|
except (subprocess.SubprocessError, json.JSONDecodeError):
|
|
return []
|
|
|
|
|
|
def analyze_script_content(script_text):
|
|
"""Analyze a PowerShell script for suspicious patterns."""
|
|
findings = []
|
|
if not script_text:
|
|
return findings
|
|
|
|
for cmdlet in SUSPICIOUS_CMDLETS:
|
|
if cmdlet.lower() in script_text.lower():
|
|
findings.append({"type": "suspicious_cmdlet", "cmdlet": cmdlet})
|
|
|
|
for pattern, desc in OBFUSCATION_PATTERNS:
|
|
if re.search(pattern, script_text):
|
|
findings.append({"type": "obfuscation", "pattern": desc})
|
|
|
|
b64_match = re.findall(r'[A-Za-z0-9+/]{40,}={0,2}', script_text)
|
|
for b64 in b64_match[:3]:
|
|
try:
|
|
import base64
|
|
decoded = base64.b64decode(b64).decode("utf-8", errors="replace")
|
|
if any(c.lower() in decoded.lower() for c in SUSPICIOUS_CMDLETS[:10]):
|
|
findings.append({"type": "encoded_payload", "preview": decoded[:100]})
|
|
except Exception:
|
|
pass
|
|
|
|
return findings
|
|
|
|
|
|
def analyze_log_file(log_path):
|
|
"""Analyze a text file containing PowerShell commands."""
|
|
findings = []
|
|
try:
|
|
with open(log_path, "r", errors="replace") as f:
|
|
content = f.read()
|
|
results = analyze_script_content(content)
|
|
if results:
|
|
findings.append({
|
|
"file": log_path,
|
|
"indicators": results,
|
|
"indicator_count": len(results),
|
|
})
|
|
except FileNotFoundError:
|
|
print(f"[!] File not found: {log_path}")
|
|
return findings
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Detect suspicious PowerShell execution patterns"
|
|
)
|
|
parser.add_argument("--event-logs", action="store_true",
|
|
help="Parse Windows PowerShell event logs")
|
|
parser.add_argument("--script", help="Analyze a PowerShell script file")
|
|
parser.add_argument("--log-dir", help="Directory of PS log files to scan")
|
|
parser.add_argument("--output", "-o", help="Output JSON report")
|
|
parser.add_argument("--verbose", "-v", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
print("[*] Suspicious PowerShell Execution Detection Agent")
|
|
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
|
|
|
|
if args.event_logs:
|
|
events = parse_script_block_logs()
|
|
for evt in events:
|
|
script = evt.get("ScriptBlock", "")
|
|
indicators = analyze_script_content(script)
|
|
if indicators:
|
|
report["findings"].append({
|
|
"source": "event_log",
|
|
"time": evt.get("TimeCreated", ""),
|
|
"path": evt.get("Path", ""),
|
|
"indicators": indicators,
|
|
"preview": script[:200] if args.verbose else "",
|
|
})
|
|
print(f"[*] Analyzed {len(events)} script block events")
|
|
|
|
if args.script:
|
|
findings = analyze_log_file(args.script)
|
|
report["findings"].extend(findings)
|
|
|
|
if args.log_dir and os.path.isdir(args.log_dir):
|
|
for root, _, files in os.walk(args.log_dir):
|
|
for f in files:
|
|
if f.lower().endswith((".ps1", ".psm1", ".psd1", ".log", ".txt")):
|
|
findings = analyze_log_file(os.path.join(root, f))
|
|
report["findings"].extend(findings)
|
|
|
|
report["total_suspicious"] = len(report["findings"])
|
|
report["risk_level"] = (
|
|
"CRITICAL" if len(report["findings"]) >= 10
|
|
else "HIGH" if len(report["findings"]) >= 5
|
|
else "MEDIUM" if report["findings"]
|
|
else "LOW"
|
|
)
|
|
print(f"[*] Suspicious findings: {len(report['findings'])}")
|
|
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"[*] Report saved to {args.output}")
|
|
else:
|
|
print(json.dumps(report, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|