feat: add 5 new cybersecurity skills - WMI persistence, CloudTrail forensics, honeypots, PDF malware, DCSync detection

This commit is contained in:
mukul975
2026-03-11 00:42:59 +01:00
parent 466f37bfb6
commit 679c98b339
20 changed files with 2112 additions and 0 deletions
@@ -0,0 +1,22 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,74 @@
---
name: analyzing-malicious-pdf-with-peepdf
description: Perform static analysis of malicious PDF documents using peepdf, pdfid, and pdf-parser to extract embedded JavaScript, shellcode, and suspicious objects.
domain: cybersecurity
subdomain: malware-analysis
tags: [malware-analysis, pdf, peepdf, pdfid, pdf-parser, static-analysis, reverse-engineering, dfir]
version: "1.0"
author: mahipal
license: MIT
---
# Analyzing Malicious PDF with peepdf
## When to Use
- When triaging suspicious PDF attachments from phishing emails
- During malware analysis of PDF-based exploit documents
- When extracting embedded JavaScript, shellcode, or executables from PDFs
- For forensic examination of weaponized document artifacts
- When building detection signatures for PDF-based threats
## Prerequisites
- Python 3.8+ with peepdf-3 installed (pip install peepdf-3)
- pdfid.py and pdf-parser.py from Didier Stevens suite
- Isolated analysis environment (VM or sandbox)
- Optional: PyV8 for JavaScript emulation within peepdf
- Optional: Pylibemu for shellcode analysis
## Workflow
1. **Triage with pdfid**: Scan PDF for suspicious keywords (/JS, /JavaScript, /OpenAction, /Launch, /EmbeddedFile).
2. **Interactive Analysis**: Open PDF in peepdf interactive mode to explore object structure.
3. **Identify Suspicious Objects**: Locate objects containing JavaScript, streams, or encoded data.
4. **Extract Content**: Dump suspicious streams and decode filters (FlateDecode, ASCIIHexDecode).
5. **Deobfuscate JavaScript**: Analyze extracted JS for shellcode, heap sprays, or exploit code.
6. **Check VirusTotal**: Use peepdf vtcheck to cross-reference file hash with AV detections.
7. **Generate IOCs**: Extract URLs, domains, hashes, and shellcode signatures.
## Key Concepts
| Concept | Description |
|---------|-------------|
| /OpenAction | Automatic action executed when PDF is opened |
| /JavaScript /JS | Embedded JavaScript code in PDF objects |
| /Launch | Action that launches external applications |
| /EmbeddedFile | File embedded within the PDF structure |
| FlateDecode | zlib compression filter used to hide content |
| Object Streams | PDF objects stored in compressed streams |
## Tools & Systems
| Tool | Purpose |
|------|---------|
| peepdf / peepdf-3 | Interactive PDF analysis with JS emulation |
| pdfid.py | Quick triage scanning for suspicious keywords |
| pdf-parser.py | Deep object-level PDF parsing |
| VirusTotal | Hash lookup and AV detection cross-reference |
| CyberChef | Decode and transform extracted payloads |
## Output Format
```
Analysis Report: PDF-MAL-[DATE]-[SEQ]
File: [filename.pdf]
SHA-256: [hash]
Suspicious Keywords: [/JS, /OpenAction, etc.]
Objects with JavaScript: [Object IDs]
Extracted URLs: [List]
Shellcode Detected: [Yes/No]
Embedded Files: [Count and types]
VirusTotal Detections: [X/Y engines]
Risk Level: [Critical/High/Medium/Low]
```
@@ -0,0 +1,126 @@
# Malicious PDF Analysis Reference
## peepdf Installation
```bash
# Python 3 version
pip install peepdf-3
# From source
git clone https://github.com/jesparza/peepdf.git
cd peepdf && pip install -r requirements.txt
```
## peepdf CLI Usage
```bash
# Basic analysis (loose mode, force parsing)
peepdf -f -l malicious.pdf
# Interactive mode
peepdf -i malicious.pdf
# Batch script execution
peepdf -s commands.txt malicious.pdf
# JSON output
peepdf -j malicious.pdf
```
## peepdf Interactive Commands
| Command | Description |
|---------|-------------|
| `info` | Display document summary and suspicious elements |
| `tree` | Show object tree structure |
| `object <id>` | Display raw content of object |
| `stream <id>` | Decode and display stream content |
| `rawstream <id>` | Display raw (encoded) stream |
| `js_analyse <id>` | Analyze JavaScript in object |
| `js_eval <id>` | Evaluate JavaScript (requires PyV8) |
| `vtcheck` | Check file hash on VirusTotal |
| `extract uri` | Extract all URIs from document |
| `search <string>` | Search for string across objects |
| `offsets <id>` | Show byte offsets of object in file |
| `metadata` | Display document metadata |
## pdfid.py Usage
```bash
# Basic scan
pdfid.py malicious.pdf
# Additional disarm indicators
pdfid.py -e malicious.pdf
# Scan directory
pdfid.py -r /samples/
```
### pdfid Suspicious Keywords
| Keyword | Risk | Significance |
|---------|------|-------------|
| /JS | High | JavaScript object reference |
| /JavaScript | High | JavaScript action |
| /OpenAction | High | Automatic execution on open |
| /AA | High | Additional actions trigger |
| /Launch | Critical | Launch external application |
| /EmbeddedFile | High | Embedded file (dropper) |
| /XFA | High | XML Forms Architecture (exploit surface) |
| /JBIG2Decode | Medium | Image decoder (CVE-2009-0658) |
| /AcroForm | Medium | Interactive form (potential exploit) |
| /ObjStm | Low | Object stream (can hide objects) |
| /URI | Low | External URL reference |
## pdf-parser.py Usage
```bash
# Document statistics
pdf-parser.py --stats malicious.pdf
# Extract specific object
pdf-parser.py -o 10 malicious.pdf
# Extract and decode filters
pdf-parser.py -o 10 -f malicious.pdf
# Dump decoded stream to file
pdf-parser.py -o 10 -f -d extracted.bin malicious.pdf
# Search for keyword
pdf-parser.py --search "/JavaScript" malicious.pdf
# Search by type
pdf-parser.py --type "/Action" malicious.pdf
```
## Common CVEs in PDF Exploits
| CVE | Component | Description |
|-----|-----------|-------------|
| CVE-2009-0658 | JBIG2 | Buffer overflow in JBIG2 decoder |
| CVE-2009-4324 | Doc.media | Use-after-free via newplayer |
| CVE-2010-0188 | LibTIFF | TIFF image handling overflow |
| CVE-2013-0640 | XFA | Memory corruption in XFA |
| CVE-2017-11882 | Equation Editor | Stack buffer overflow |
## Shellcode Detection Patterns
| Pattern | Indicator |
|---------|-----------|
| `%u9090%u9090` | NOP sled (Unicode) |
| `\x90\x90\x90` | NOP sled (hex) |
| `unescape()` | Shellcode decoding |
| `String.fromCharCode` | Character code assembly |
| `eval()` | Dynamic code execution |
| `new ActiveXObject` | COM object instantiation |
| `spray` variable name | Heap spray technique |
## VirusTotal Check via peepdf
```
PPDF> vtcheck
MD5: abc123...
Detections: 45/72
```
@@ -0,0 +1,219 @@
#!/usr/bin/env python3
"""Malicious PDF Analysis Agent - static analysis using peepdf, pdfid, and pdf-parser for threat detection."""
import json
import argparse
import logging
import subprocess
import hashlib
import os
import re
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
SUSPICIOUS_KEYWORDS = [
"/JS", "/JavaScript", "/OpenAction", "/AA", "/Launch", "/EmbeddedFile",
"/RichMedia", "/XFA", "/AcroForm", "/JBIG2Decode", "/URI", "/SubmitForm",
"/ImportData", "/Names", "/ObjStm",
]
HIGH_RISK_KEYWORDS = ["/JS", "/JavaScript", "/OpenAction", "/Launch", "/EmbeddedFile", "/XFA"]
def compute_hashes(filepath):
"""Compute MD5 and SHA-256 hashes of the PDF file."""
md5 = hashlib.md5()
sha256 = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
md5.update(chunk)
sha256.update(chunk)
return {"md5": md5.hexdigest(), "sha256": sha256.hexdigest()}
def run_pdfid(filepath):
"""Run pdfid.py to triage PDF for suspicious keywords."""
cmd = ["python3", "-m", "pdfid", filepath]
alt_cmd = ["pdfid.py", filepath]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
result = subprocess.run(alt_cmd, capture_output=True, text=True)
keywords = {}
for line in result.stdout.strip().split("\n"):
line = line.strip()
for kw in SUSPICIOUS_KEYWORDS:
if kw.lower() in line.lower():
parts = line.rsplit(None, 1)
if len(parts) == 2:
try:
count = int(parts[1])
keywords[kw] = count
except ValueError:
pass
return keywords
def run_peepdf_analysis(filepath):
"""Run peepdf for detailed PDF object analysis."""
cmd = ["peepdf", "-f", "-l", filepath]
alt_cmd = ["python3", "-m", "peepdf", "-f", "-l", filepath]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
result = subprocess.run(alt_cmd, capture_output=True, text=True)
analysis = {
"versions": 0,
"objects": 0,
"streams": 0,
"encoded_streams": 0,
"suspicious_objects": [],
"js_objects": [],
"vulns": [],
"urls": [],
"raw_output": result.stdout[:2000],
}
for line in result.stdout.split("\n"):
line = line.strip()
if "Version" in line and "Objects" in line:
nums = re.findall(r"\d+", line)
if nums:
analysis["objects"] = int(nums[-1]) if nums else 0
if "Suspicious" in line or "suspicious" in line:
analysis["suspicious_objects"].append(line)
if "/JS" in line or "/JavaScript" in line:
obj_ids = re.findall(r"(\d+)", line)
analysis["js_objects"].extend(obj_ids)
if "CVE" in line.upper():
cves = re.findall(r"CVE-\d{4}-\d{4,}", line, re.IGNORECASE)
analysis["vulns"].extend(cves)
urls = re.findall(r"https?://[^\s\"'<>]+", line)
analysis["urls"].extend(urls)
return analysis
def run_pdf_parser(filepath, object_id=None):
"""Run pdf-parser.py to extract specific objects."""
if object_id:
cmd = ["pdf-parser.py", "-o", str(object_id), "-f", "-d", filepath]
else:
cmd = ["pdf-parser.py", "--stats", filepath]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.stdout[:3000]
def extract_javascript(filepath, peepdf_analysis):
"""Extract JavaScript content from identified objects."""
js_content = []
for obj_id in peepdf_analysis.get("js_objects", []):
cmd = ["pdf-parser.py", "-o", str(obj_id), "-f", "-w", filepath]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout:
js_content.append({
"object_id": obj_id,
"content_preview": result.stdout[:1000],
"length": len(result.stdout),
})
return js_content
def detect_shellcode_patterns(content):
"""Detect common shellcode patterns in extracted content."""
patterns = {
"heap_spray": r"(%u[0-9a-fA-F]{4}){4,}",
"nop_sled": r"(\\x90){8,}|(%u9090){4,}",
"unescape_chain": r"unescape\s*\(",
"shellcode_var": r"shellcode|payload|sc\s*=\s*[\"']",
"fromcharcode": r"String\.fromCharCode",
"eval_call": r"eval\s*\(",
"activex": r"new\s+ActiveXObject",
}
detected = {}
for name, pattern in patterns.items():
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
detected[name] = len(matches)
return detected
def calculate_risk_score(pdfid_results, peepdf_analysis, shellcode_patterns):
"""Calculate overall risk score for the PDF."""
score = 0
for kw, count in pdfid_results.items():
if count > 0:
if kw in HIGH_RISK_KEYWORDS:
score += count * 20
else:
score += count * 5
score += len(peepdf_analysis.get("vulns", [])) * 30
score += len(peepdf_analysis.get("js_objects", [])) * 15
score += sum(shellcode_patterns.values()) * 10
risk_level = "critical" if score >= 80 else "high" if score >= 50 else "medium" if score >= 20 else "low"
return {"score": min(score, 100), "risk_level": risk_level}
def generate_report(filepath, hashes, pdfid_results, peepdf_analysis, js_content, shellcode, risk):
"""Generate comprehensive PDF malware analysis report."""
report = {
"timestamp": datetime.utcnow().isoformat(),
"file": os.path.basename(filepath),
"file_size": os.path.getsize(filepath),
"hashes": hashes,
"risk_assessment": risk,
"pdfid_keywords": pdfid_results,
"suspicious_keyword_count": sum(1 for v in pdfid_results.values() if v > 0),
"peepdf_analysis": {
"objects": peepdf_analysis.get("objects", 0),
"js_objects": peepdf_analysis.get("js_objects", []),
"cve_references": peepdf_analysis.get("vulns", []),
"extracted_urls": list(set(peepdf_analysis.get("urls", []))),
},
"javascript_content": js_content[:5],
"shellcode_indicators": shellcode,
"iocs": {
"sha256": hashes["sha256"],
"urls": list(set(peepdf_analysis.get("urls", []))),
"cves": peepdf_analysis.get("vulns", []),
},
}
return report
def main():
parser = argparse.ArgumentParser(description="Malicious PDF Analysis Agent")
parser.add_argument("file", help="Path to PDF file to analyze")
parser.add_argument("--extract-js", action="store_true", help="Extract JavaScript objects")
parser.add_argument("--output", default="pdf_analysis_report.json")
args = parser.parse_args()
if not os.path.exists(args.file):
logger.error("File not found: %s", args.file)
return
logger.info("Analyzing: %s (%d bytes)", args.file, os.path.getsize(args.file))
hashes = compute_hashes(args.file)
logger.info("SHA-256: %s", hashes["sha256"])
pdfid_results = run_pdfid(args.file)
peepdf_analysis = run_peepdf_analysis(args.file)
js_content = []
shellcode = {}
if args.extract_js or peepdf_analysis.get("js_objects"):
js_content = extract_javascript(args.file, peepdf_analysis)
all_js = " ".join(j["content_preview"] for j in js_content)
shellcode = detect_shellcode_patterns(all_js)
risk = calculate_risk_score(pdfid_results, peepdf_analysis, shellcode)
report = generate_report(args.file, hashes, pdfid_results, peepdf_analysis, js_content, shellcode, risk)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info("Risk: %s (score %d), %d suspicious keywords, %d JS objects, %d CVEs",
risk["risk_level"], risk["score"], report["suspicious_keyword_count"],
len(peepdf_analysis.get("js_objects", [])), len(peepdf_analysis.get("vulns", [])))
print(json.dumps(report, indent=2, default=str))
if __name__ == "__main__":
main()
+22
View File
@@ -0,0 +1,22 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+74
View File
@@ -0,0 +1,74 @@
---
name: detecting-wmi-persistence
description: Detect WMI event subscription persistence by analyzing Sysmon Event IDs 19, 20, and 21 for malicious EventFilter, EventConsumer, and FilterToConsumerBinding creation.
domain: cybersecurity
subdomain: threat-hunting
tags: [threat-hunting, wmi, persistence, sysmon, t1546.003, mitre-attack, windows, dfir]
version: "1.0"
author: mahipal
license: MIT
---
# Detecting WMI Persistence
## When to Use
- When hunting for WMI event subscription persistence (MITRE ATT&CK T1546.003)
- After detecting suspicious WMI activity in endpoint telemetry
- During incident response to identify attacker persistence mechanisms
- When Sysmon alerts trigger on Event IDs 19, 20, or 21
- During purple team exercises testing WMI-based persistence
## Prerequisites
- Sysmon v6.1+ deployed with WMI event logging enabled (Event IDs 19, 20, 21)
- Windows Security Event Log forwarding configured
- SIEM with Sysmon data ingested (Splunk, Elastic, Sentinel)
- PowerShell access for WMI enumeration on endpoints
- Sysinternals Autoruns for manual WMI subscription review
## Workflow
1. **Collect Telemetry**: Parse Sysmon Event IDs 19 (WmiEventFilter), 20 (WmiEventConsumer), 21 (WmiEventConsumerToFilter).
2. **Identify Suspicious Consumers**: Flag CommandLineEventConsumer and ActiveScriptEventConsumer types executing code.
3. **Analyze Event Filters**: Examine WQL queries in EventFilters for process start triggers or timer-based execution.
4. **Correlate Bindings**: Match FilterToConsumerBindings linking suspicious filters to consumers.
5. **Check Persistence Locations**: Query WMI namespaces root\subscription and root\default for active subscriptions.
6. **Validate Findings**: Cross-reference with known-good WMI subscriptions (SCCM, AV products).
7. **Document and Remediate**: Remove malicious subscriptions and update detection rules.
## Key Concepts
| Concept | Description |
|---------|-------------|
| Sysmon Event 19 | WmiEventFilter creation detected |
| Sysmon Event 20 | WmiEventConsumer creation detected |
| Sysmon Event 21 | WmiEventConsumerToFilter binding detected |
| T1546.003 | Event Triggered Execution: WMI Event Subscription |
| CommandLineEventConsumer | Executes system commands when filter triggers |
| ActiveScriptEventConsumer | Runs VBScript/JScript when filter triggers |
## Tools & Systems
| Tool | Purpose |
|------|---------|
| Sysmon | Windows event monitoring for WMI activity |
| WMI Explorer | GUI tool for browsing WMI namespaces |
| Autoruns | Sysinternals tool listing persistence mechanisms |
| PowerShell Get-WMIObject | Enumerate WMI event subscriptions |
| Splunk | SIEM analysis of Sysmon WMI events |
| Velociraptor | Endpoint WMI artifact collection |
## Output Format
```
Hunt ID: TH-WMI-[DATE]-[SEQ]
Technique: T1546.003
Host: [Hostname]
Event Type: [EventFilter|EventConsumer|Binding]
Consumer Type: [CommandLine|ActiveScript]
WQL Query: [Filter query text]
Command: [Executed command or script]
Risk Level: [Critical/High/Medium/Low]
Recommended Action: [Remove subscription, investigate lateral movement]
```
@@ -0,0 +1,89 @@
# WMI Persistence Detection Reference
## Sysmon Event IDs
| Event ID | Type | Description |
|----------|------|-------------|
| 19 | WmiEventFilter | Logs WMI EventFilter creation with WQL query |
| 20 | WmiEventConsumer | Logs WMI EventConsumer creation (command/script) |
| 21 | WmiEventConsumerToFilter | Logs binding of EventFilter to EventConsumer |
## Sysmon Configuration
Enable WMI event logging in sysmonconfig.xml:
```xml
<RuleGroup groupRelation="or">
<WmiEvent onmatch="include">
<Operation condition="is">Created</Operation>
</WmiEvent>
</RuleGroup>
```
Install: `sysmon64.exe -accepteula -i sysmonconfig.xml`
## PowerShell WMI Enumeration
```powershell
# List all EventFilters
Get-WmiObject -Namespace root\subscription -Class __EventFilter
# List all EventConsumers
Get-WmiObject -Namespace root\subscription -Class __EventConsumer
# List all Bindings
Get-WmiObject -Namespace root\subscription -Class __FilterToConsumerBinding
# Remove specific subscription
Get-WmiObject -Namespace root\subscription -Class __EventFilter -Filter "Name='MalFilter'" | Remove-WmiObject
Get-WmiObject -Namespace root\subscription -Class CommandLineEventConsumer -Filter "Name='MalConsumer'" | Remove-WmiObject
Get-WmiObject -Namespace root\subscription -Class __FilterToConsumerBinding | Where-Object {$_.Filter -like '*MalFilter*'} | Remove-WmiObject
```
## Suspicious Consumer Types
| Consumer Class | Risk | Description |
|---------------|------|-------------|
| CommandLineEventConsumer | Critical | Executes arbitrary system commands |
| ActiveScriptEventConsumer | Critical | Runs embedded VBScript or JScript |
| LogFileEventConsumer | Low | Writes to log file |
| NTEventLogEventConsumer | Low | Creates Windows event log entry |
| SMTPEventConsumer | Medium | Sends email notification |
## Splunk Detection Query
```spl
index=sysmon EventCode IN (19, 20, 21)
| eval event_type=case(EventCode=19, "EventFilter", EventCode=20, "EventConsumer", EventCode=21, "Binding")
| where Consumer_Type IN ("CommandLineEventConsumer", "ActiveScriptEventConsumer")
| stats count by Computer, event_type, Consumer_Type, Destination, User
| where count > 0
```
## Elastic Detection Rule
```json
{
"rule": {
"name": "WMI Persistence via Event Subscription",
"query": "event.code:(\"19\" OR \"20\" OR \"21\") AND winlog.event_data.EventType:\"WmiConsumerEvent\" AND winlog.event_data.Type:(\"CommandLineEventConsumer\" OR \"ActiveScriptEventConsumer\")",
"severity": "high",
"risk_score": 73,
"tags": ["ATT&CK T1546.003"]
}
}
```
## MITRE ATT&CK Mapping
- **Technique**: T1546.003 - Event Triggered Execution: WMI Event Subscription
- **Tactic**: Persistence, Privilege Escalation
- **Data Sources**: WMI Objects (WMI Creation), Command Execution, Process Creation
## Autoruns WMI Tab
```cmd
autorunsc64.exe -accepteula -w -nobanner -c
```
Output includes WMI subscriptions under "WMI" category with filter name, consumer, and command details.
@@ -0,0 +1,207 @@
#!/usr/bin/env python3
"""WMI Persistence Detection Agent - hunts for malicious WMI event subscriptions via Sysmon and WMI queries."""
import json
import argparse
import logging
import subprocess
import re
import xml.etree.ElementTree as ET
from collections import defaultdict
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
REPLICATION_GUIDS = {
"1131f6aa-9c07-11d1-f79f-00c04fc2dcd2": "DS-Replication-Get-Changes",
"1131f6ad-9c07-11d1-f79f-00c04fc2dcd2": "DS-Replication-Get-Changes-All",
}
SUSPICIOUS_CONSUMERS = ["CommandLineEventConsumer", "ActiveScriptEventConsumer"]
KNOWN_GOOD_FILTERS = [
"SCM Event Log Filter",
"BVTFilter",
"TSLogonFilter",
]
def query_sysmon_wmi_events(evtx_path=None, hours_back=72):
"""Query Sysmon Event IDs 19, 20, 21 for WMI persistence."""
events = []
for event_id in [19, 20, 21]:
cmd = [
"wevtutil", "qe", "Microsoft-Windows-Sysmon/Operational",
"/q:*[System[EventID={}]]".format(event_id),
"/f:xml", "/c:500",
]
if evtx_path:
cmd = ["wevtutil", "qe", evtx_path, "/lf:true",
"/q:*[System[EventID={}]]".format(event_id), "/f:xml", "/c:500"]
result = subprocess.run(cmd, capture_output=True, text=True)
for event_xml in re.findall(r"<Event.*?</Event>", result.stdout, re.DOTALL):
try:
root = ET.fromstring(event_xml)
ns = {"s": "http://schemas.microsoft.com/win/2004/08/events/event"}
data = {}
for el in root.findall(".//s:Data", ns):
data[el.get("Name", "")] = el.text or ""
events.append({
"event_id": event_id,
"timestamp": root.findtext(".//s:TimeCreated/@SystemTime", "", ns),
"computer": root.findtext(".//s:Computer", "", ns),
"operation": data.get("Operation", ""),
"event_type": data.get("EventType", ""),
"consumer_type": data.get("Type", ""),
"name": data.get("Name", ""),
"destination": data.get("Destination", ""),
"query": data.get("Query", ""),
"user": data.get("User", ""),
"raw_data": data,
})
except ET.ParseError:
continue
logger.info("Parsed %d Sysmon WMI events (IDs 19/20/21)", len(events))
return events
def enumerate_wmi_subscriptions():
"""Enumerate active WMI event subscriptions via PowerShell."""
subscriptions = {"filters": [], "consumers": [], "bindings": []}
ps_commands = {
"filters": "Get-WmiObject -Namespace root\\subscription -Class __EventFilter | Select Name, Query, QueryLanguage | ConvertTo-Json",
"consumers": "Get-WmiObject -Namespace root\\subscription -Class __EventConsumer | Select __CLASS, Name, CommandLineTemplate, ScriptText | ConvertTo-Json",
"bindings": "Get-WmiObject -Namespace root\\subscription -Class __FilterToConsumerBinding | Select Filter, Consumer | ConvertTo-Json",
}
for category, ps_cmd in ps_commands.items():
cmd = ["powershell", "-Command", ps_cmd]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout.strip():
try:
data = json.loads(result.stdout)
if isinstance(data, dict):
data = [data]
subscriptions[category] = data
except json.JSONDecodeError:
pass
return subscriptions
def analyze_suspicious_subscriptions(subscriptions):
"""Identify suspicious WMI subscriptions."""
findings = []
for consumer in subscriptions.get("consumers", []):
consumer_class = consumer.get("__CLASS", "")
name = consumer.get("Name", "")
if consumer_class in SUSPICIOUS_CONSUMERS:
severity = "critical"
cmd_template = consumer.get("CommandLineTemplate", "")
script_text = consumer.get("ScriptText", "")
payload = cmd_template or script_text
if any(kw in payload.lower() for kw in ["powershell", "cmd.exe", "wscript", "cscript", "mshta", "certutil", "bitsadmin"]):
severity = "critical"
elif payload:
severity = "high"
findings.append({
"type": "suspicious_consumer",
"consumer_class": consumer_class,
"name": name,
"payload": payload[:500],
"severity": severity,
"mitre_technique": "T1546.003",
})
for filt in subscriptions.get("filters", []):
name = filt.get("Name", "")
query = filt.get("Query", "")
if name not in KNOWN_GOOD_FILTERS:
if any(kw in query.lower() for kw in ["win32_processstarttr", "__instancecreationevent", "win32_logonsession"]):
findings.append({
"type": "suspicious_filter",
"name": name,
"wql_query": query,
"severity": "high",
"mitre_technique": "T1546.003",
})
return findings
def analyze_sysmon_events(events):
"""Analyze Sysmon WMI events for suspicious patterns."""
findings = []
for event in events:
eid = event["event_id"]
if eid == 20 and event.get("consumer_type") in SUSPICIOUS_CONSUMERS:
destination = event.get("destination", "")
suspicious_cmds = ["powershell", "cmd.exe", "wscript", "mshta", "certutil", "regsvr32"]
if any(cmd in destination.lower() for cmd in suspicious_cmds):
findings.append({
"type": "sysmon_suspicious_consumer",
"event_id": eid,
"consumer_type": event["consumer_type"],
"destination": destination[:500],
"computer": event["computer"],
"timestamp": event["timestamp"],
"user": event["user"],
"severity": "critical",
})
if eid == 19:
query = event.get("query", "")
if "__instancecreationevent" in query.lower() or "win32_processstarttr" in query.lower():
findings.append({
"type": "sysmon_suspicious_filter",
"event_id": eid,
"wql_query": query,
"computer": event["computer"],
"timestamp": event["timestamp"],
"severity": "high",
})
return findings
def generate_report(sysmon_events, live_subscriptions, sysmon_findings, subscription_findings):
"""Generate comprehensive WMI persistence hunt report."""
all_findings = sysmon_findings + subscription_findings
report = {
"timestamp": datetime.utcnow().isoformat(),
"hunt_type": "WMI Event Subscription Persistence (T1546.003)",
"sysmon_events_analyzed": len(sysmon_events),
"live_subscriptions": {
"filters": len(live_subscriptions.get("filters", [])),
"consumers": len(live_subscriptions.get("consumers", [])),
"bindings": len(live_subscriptions.get("bindings", [])),
},
"total_findings": len(all_findings),
"critical_findings": sum(1 for f in all_findings if f.get("severity") == "critical"),
"high_findings": sum(1 for f in all_findings if f.get("severity") == "high"),
"findings": all_findings,
}
return report
def main():
parser = argparse.ArgumentParser(description="WMI Persistence Detection Agent")
parser.add_argument("--evtx", help="Path to exported Sysmon .evtx file (optional)")
parser.add_argument("--skip-live", action="store_true", help="Skip live WMI enumeration")
parser.add_argument("--output", default="wmi_persistence_report.json")
args = parser.parse_args()
sysmon_events = query_sysmon_wmi_events(args.evtx)
sysmon_findings = analyze_sysmon_events(sysmon_events)
live_subs = {}
sub_findings = []
if not args.skip_live:
live_subs = enumerate_wmi_subscriptions()
sub_findings = analyze_suspicious_subscriptions(live_subs)
report = generate_report(sysmon_events, live_subs, sysmon_findings, sub_findings)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info("WMI hunt: %d events, %d findings (%d critical)",
len(sysmon_events), report["total_findings"], report["critical_findings"])
print(json.dumps(report, indent=2, default=str))
if __name__ == "__main__":
main()
+22
View File
@@ -0,0 +1,22 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,73 @@
---
name: hunting-for-dcsync-attacks
description: Detect DCSync attacks by analyzing Windows Event ID 4662 for unauthorized DS-Replication-Get-Changes requests from non-domain-controller accounts.
domain: cybersecurity
subdomain: threat-hunting
tags: [threat-hunting, dcsync, active-directory, credential-access, t1003.006, mimikatz, windows, dfir]
version: "1.0"
author: mahipal
license: MIT
---
# Hunting for DCSync Attacks
## When to Use
- When hunting for DCSync credential theft (MITRE ATT&CK T1003.006)
- After detecting Mimikatz or similar tools in the environment
- During incident response involving Active Directory compromise
- When monitoring for unauthorized domain replication requests
- During purple team exercises testing AD attack detection
## Prerequisites
- Windows Security Event Log forwarding enabled (Event ID 4662)
- Audit Directory Service Access enabled via Group Policy
- Domain Computers SACL configured on Domain Object for machine account detection
- SIEM with Windows event data ingested (Splunk, Elastic, Sentinel)
- Knowledge of legitimate domain controller accounts and replication partners
## Workflow
1. **Enable Auditing**: Ensure Audit Directory Service Access is enabled on domain controllers.
2. **Collect Events**: Gather Windows Event ID 4662 with AccessMask 0x100 (Control Access).
3. **Filter Replication GUIDs**: Search for DS-Replication-Get-Changes and DS-Replication-Get-Changes-All.
4. **Identify Non-DC Sources**: Flag events where SubjectUserName is not a domain controller machine account.
5. **Correlate with Network**: Cross-reference source IPs against known DC addresses.
6. **Validate Findings**: Exclude legitimate replication tools (Azure AD Connect, SCCM).
7. **Respond**: Disable compromised accounts, reset krbtgt, investigate lateral movement.
## Key Concepts
| Concept | Description |
|---------|-------------|
| DCSync | Technique abusing AD replication protocol to extract password hashes |
| Event ID 4662 | Directory Service Access audit event |
| DS-Replication-Get-Changes | GUID 1131f6aa-9c07-11d1-f79f-00c04fc2dcd2 |
| DS-Replication-Get-Changes-All | GUID 1131f6ad-9c07-11d1-f79f-00c04fc2dcd2 |
| AccessMask 0x100 | Control Access right indicating extended rights verification |
| T1003.006 | OS Credential Dumping: DCSync |
## Tools & Systems
| Tool | Purpose |
|------|---------|
| Windows Event Viewer | Direct event log analysis |
| Splunk | SIEM correlation of Event 4662 |
| Elastic Security | Detection rules for DCSync patterns |
| Mimikatz lsadump::dcsync | Attack tool used to perform DCSync |
| Impacket secretsdump.py | Python-based DCSync implementation |
| BloodHound | Identify accounts with replication rights |
## Output Format
```
Hunt ID: TH-DCSYNC-[DATE]-[SEQ]
Technique: T1003.006
Domain Controller: [DC hostname]
Subject Account: [Account performing replication]
Source IP: [Non-DC IP address]
GUID Accessed: [Replication GUID]
Risk Level: [Critical/High/Medium/Low]
Recommended Action: [Disable account, reset krbtgt, investigate]
```
@@ -0,0 +1,100 @@
# DCSync Attack Detection Reference
## Windows Event ID 4662
Directory Service Access event logged when an object in Active Directory is accessed.
### Required Group Policy Configuration
```
Computer Configuration > Policies > Windows Settings > Security Settings >
Advanced Audit Policy Configuration > Audit Policies > DS Access >
Audit Directory Service Access: Success, Failure
```
### Required SACL Configuration
Add "Domain Computers" to the SACL on the domain root object to detect machine account DCSync.
## Key Detection GUIDs
| GUID | Right | Description |
|------|-------|-------------|
| 1131f6aa-9c07-11d1-f79f-00c04fc2dcd2 | DS-Replication-Get-Changes | Read replication changes |
| 1131f6ad-9c07-11d1-f79f-00c04fc2dcd2 | DS-Replication-Get-Changes-All | Read all replication changes (includes secrets) |
| 89e95b76-444d-4c62-991a-0facbeda640c | DS-Replication-Get-Changes-In-Filtered-Set | Filtered replication set |
### AccessMask Value
`0x100` (256 decimal) = Control Access - logged when access is allowed following extended rights verification.
## Splunk Detection Query
```spl
index=wineventlog EventCode=4662
| where AccessMask="0x100"
| where match(Properties, "(?i)1131f6ad-9c07-11d1-f79f-00c04fc2dcd2") OR match(Properties, "(?i)1131f6aa-9c07-11d1-f79f-00c04fc2dcd2")
| where NOT match(SubjectUserName, "\\$$")
| eval is_dc=if(match(SubjectUserName, "(?i)(DC|AZUREADCONNECT)"), "legitimate", "suspicious")
| where is_dc="suspicious"
| stats count by SubjectUserName, SubjectDomainName, Computer, Properties
```
## Elastic KQL Detection
```
event.code: "4662" AND winlog.event_data.AccessMask: "0x100" AND
winlog.event_data.Properties: (*1131f6ad-9c07-11d1-f79f-00c04fc2dcd2* OR *1131f6aa-9c07-11d1-f79f-00c04fc2dcd2*)
AND NOT winlog.event_data.SubjectUserName: *$
```
## PowerShell Detection
```powershell
# Query Event 4662 for replication GUID access
Get-WinEvent -FilterHashtable @{
LogName='Security'; Id=4662
} | Where-Object {
$_.Properties[8].Value -match '1131f6ad-9c07-11d1-f79f-00c04fc2dcd2' -and
$_.Properties[1].Value -notmatch '\$$'
} | Select-Object TimeCreated,
@{N='Account';E={$_.Properties[1].Value}},
@{N='Domain';E={$_.Properties[2].Value}}
# List accounts with replication rights
Import-Module ActiveDirectory
(Get-Acl "AD:\DC=domain,DC=local").Access |
Where-Object { $_.ObjectType -in @(
'1131f6ad-9c07-11d1-f79f-00c04fc2dcd2',
'1131f6aa-9c07-11d1-f79f-00c04fc2dcd2'
)} | Select-Object IdentityReference, ActiveDirectoryRights
```
## Attack Tools (for Detection Signatures)
```bash
# Mimikatz DCSync
lsadump::dcsync /domain:corp.local /user:krbtgt
# Impacket secretsdump.py
secretsdump.py -just-dc corp.local/admin:Password@dc01.corp.local
# Impacket - specific user
secretsdump.py -just-dc-user krbtgt corp.local/admin:Password@dc01.corp.local
```
## MITRE ATT&CK Mapping
- **Technique**: T1003.006 - OS Credential Dumping: DCSync
- **Tactic**: Credential Access
- **Platforms**: Windows
- **Data Sources**: Active Directory: Active Directory Object Access, Network Traffic
## Response Checklist
1. Disable compromised account immediately
2. Reset krbtgt password twice (12-hour interval between resets)
3. Revoke all Kerberos tickets (purge ticket cache)
4. Audit all accounts with replication rights on domain object
5. Review source host for additional compromise indicators
6. Check for persistence mechanisms (scheduled tasks, services, WMI)
@@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""DCSync Detection Agent - hunts for unauthorized AD replication requests via Event ID 4662 analysis."""
import json
import argparse
import logging
import subprocess
import re
import xml.etree.ElementTree as ET
from collections import defaultdict
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
REPLICATION_GUIDS = {
"1131f6aa-9c07-11d1-f79f-00c04fc2dcd2": "DS-Replication-Get-Changes",
"1131f6ad-9c07-11d1-f79f-00c04fc2dcd2": "DS-Replication-Get-Changes-All",
"89e95b76-444d-4c62-991a-0facbeda640c": "DS-Replication-Get-Changes-In-Filtered-Set",
}
DCSYNC_ACCESS_MASK = "0x100"
def get_domain_controllers():
"""Get list of legitimate domain controller machine accounts."""
cmd = ["powershell", "-Command",
"Get-ADDomainController -Filter * | Select-Object Name, IPv4Address | ConvertTo-Json"]
result = subprocess.run(cmd, capture_output=True, text=True)
dcs = []
try:
data = json.loads(result.stdout) if result.stdout else []
if isinstance(data, dict):
data = [data]
for dc in data:
dcs.append({
"name": dc.get("Name", ""),
"ip": dc.get("IPv4Address", ""),
"machine_account": dc.get("Name", "") + "$",
})
except json.JSONDecodeError:
pass
return dcs
def query_event_4662(evtx_path=None, max_events=5000):
"""Query Windows Event ID 4662 for directory service access events."""
events = []
if evtx_path:
cmd = ["wevtutil", "qe", evtx_path, "/lf:true",
"/q:*[System[EventID=4662]]", "/f:xml", f"/c:{max_events}"]
else:
cmd = ["wevtutil", "qe", "Security",
"/q:*[System[EventID=4662]]", "/f:xml", f"/c:{max_events}"]
result = subprocess.run(cmd, capture_output=True, text=True)
for event_xml in re.findall(r"<Event.*?</Event>", result.stdout, re.DOTALL):
try:
root = ET.fromstring(event_xml)
ns = {"s": "http://schemas.microsoft.com/win/2004/08/events/event"}
data = {}
for el in root.findall(".//s:Data", ns):
data[el.get("Name", "")] = el.text or ""
time_created = root.find(".//s:TimeCreated", ns)
timestamp = time_created.get("SystemTime", "") if time_created is not None else ""
events.append({
"timestamp": timestamp,
"computer": root.findtext(".//s:Computer", "", ns),
"subject_user_name": data.get("SubjectUserName", ""),
"subject_domain_name": data.get("SubjectDomainName", ""),
"subject_logon_id": data.get("SubjectLogonId", ""),
"object_server": data.get("ObjectServer", ""),
"object_type": data.get("ObjectType", ""),
"object_name": data.get("ObjectName", ""),
"access_mask": data.get("AccessMask", ""),
"properties": data.get("Properties", ""),
})
except ET.ParseError:
continue
logger.info("Parsed %d Event 4662 entries", len(events))
return events
def filter_replication_events(events):
"""Filter events for DS-Replication GUID access."""
replication_events = []
for event in events:
properties = event.get("properties", "").lower()
access_mask = event.get("access_mask", "")
for guid, name in REPLICATION_GUIDS.items():
if guid.lower() in properties and access_mask == DCSYNC_ACCESS_MASK:
replication_events.append({
**event,
"replication_right": name,
"guid": guid,
})
return replication_events
def identify_dcsync_suspects(replication_events, dc_accounts):
"""Identify non-DC accounts performing replication requests."""
dc_names = set(dc.get("machine_account", "").lower() for dc in dc_accounts)
dc_names.update(dc.get("name", "").lower() + "$" for dc in dc_accounts)
known_legitimate = {"azureadconnect", "sccm", "adconnect", "microsoftdirectorysync"}
suspects = []
legitimate = []
for event in replication_events:
account = event["subject_user_name"].lower()
domain = event["subject_domain_name"]
if account in dc_names:
legitimate.append(event)
continue
if account.endswith("$") and account in dc_names:
legitimate.append(event)
continue
if any(known in account for known in known_legitimate):
legitimate.append(event)
continue
event["severity"] = "critical"
event["mitre_technique"] = "T1003.006"
event["indicator"] = "Non-DC account performing directory replication"
suspects.append(event)
return suspects, legitimate
def analyze_suspect_patterns(suspects):
"""Analyze patterns in suspected DCSync activity."""
by_account = defaultdict(lambda: {"count": 0, "computers": set(), "guids": set(), "timestamps": []})
for event in suspects:
account = f"{event['subject_domain_name']}\\{event['subject_user_name']}"
by_account[account]["count"] += 1
by_account[account]["computers"].add(event["computer"])
by_account[account]["guids"].add(event.get("replication_right", ""))
by_account[account]["timestamps"].append(event["timestamp"])
patterns = []
for account, data in by_account.items():
has_both = "DS-Replication-Get-Changes" in data["guids"] and "DS-Replication-Get-Changes-All" in data["guids"]
patterns.append({
"account": account,
"replication_requests": data["count"],
"source_computers": list(data["computers"]),
"replication_rights": list(data["guids"]),
"has_full_dcsync_rights": has_both,
"severity": "critical" if has_both else "high",
"first_seen": min(data["timestamps"]) if data["timestamps"] else "",
"last_seen": max(data["timestamps"]) if data["timestamps"] else "",
})
return sorted(patterns, key=lambda x: x["replication_requests"], reverse=True)
def check_replication_acls():
"""Check which accounts have replication rights on the domain object."""
cmd = ["powershell", "-Command",
"(Get-Acl 'AD:\\DC=domain,DC=local').Access | "
"Where-Object {$_.ObjectType -eq '1131f6ad-9c07-11d1-f79f-00c04fc2dcd2' -or "
"$_.ObjectType -eq '1131f6aa-9c07-11d1-f79f-00c04fc2dcd2'} | "
"Select-Object IdentityReference, ActiveDirectoryRights | ConvertTo-Json"]
result = subprocess.run(cmd, capture_output=True, text=True)
try:
acls = json.loads(result.stdout) if result.stdout else []
if isinstance(acls, dict):
acls = [acls]
return [{"identity": a.get("IdentityReference", ""), "rights": a.get("ActiveDirectoryRights", "")} for a in acls]
except json.JSONDecodeError:
return []
def generate_report(events, replication_events, suspects, legitimate, patterns, acls):
"""Generate DCSync hunt report."""
report = {
"timestamp": datetime.utcnow().isoformat(),
"hunt_type": "DCSync Detection (T1003.006)",
"events_analyzed": len(events),
"replication_events": len(replication_events),
"legitimate_replication": len(legitimate),
"suspicious_replication": len(suspects),
"severity": "critical" if suspects else "clear",
"suspect_patterns": patterns,
"accounts_with_replication_rights": acls,
"suspicious_events_detail": suspects[:20],
"recommendations": [
"Disable compromised accounts immediately",
"Reset krbtgt password twice (with 12-hour interval)",
"Audit all accounts with DS-Replication-Get-Changes rights",
"Investigate source hosts for additional compromise indicators",
"Review lateral movement from suspect accounts",
] if suspects else ["No DCSync activity detected - continue monitoring"],
}
return report
def main():
parser = argparse.ArgumentParser(description="DCSync Attack Detection Agent")
parser.add_argument("--evtx", help="Path to exported Security .evtx file")
parser.add_argument("--max-events", type=int, default=5000, help="Max events to parse (default: 5000)")
parser.add_argument("--skip-acl-check", action="store_true", help="Skip replication ACL enumeration")
parser.add_argument("--known-dcs", help="JSON file with known DC hostnames")
parser.add_argument("--output", default="dcsync_hunt_report.json")
args = parser.parse_args()
dc_accounts = get_domain_controllers()
if args.known_dcs:
with open(args.known_dcs) as f:
extra_dcs = json.load(f)
dc_accounts.extend(extra_dcs)
logger.info("Known DCs: %d", len(dc_accounts))
events = query_event_4662(args.evtx, args.max_events)
replication_events = filter_replication_events(events)
suspects, legitimate = identify_dcsync_suspects(replication_events, dc_accounts)
patterns = analyze_suspect_patterns(suspects)
acls = []
if not args.skip_acl_check:
acls = check_replication_acls()
report = generate_report(events, replication_events, suspects, legitimate, patterns, acls)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
if suspects:
logger.warning("ALERT: %d suspected DCSync events from %d accounts",
len(suspects), len(patterns))
else:
logger.info("No DCSync suspects found (%d legitimate replication events)", len(legitimate))
print(json.dumps(report, indent=2, default=str))
if __name__ == "__main__":
main()
@@ -0,0 +1,22 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,73 @@
---
name: implementing-network-deception-with-honeypots
description: Deploy and manage network honeypots using OpenCanary, T-Pot, or Cowrie to detect unauthorized access, lateral movement, and attacker reconnaissance.
domain: cybersecurity
subdomain: deception-technology
tags: [deception, honeypot, opencanary, cowrie, t-pot, detection, lateral-movement, network-security]
version: "1.0"
author: mahipal
license: MIT
---
# Implementing Network Deception with Honeypots
## When to Use
- When deploying deception technology to detect lateral movement
- To create early warning indicators for network intrusion
- During security architecture design to add detection depth
- When monitoring for unauthorized internal scanning or credential theft
- To gather threat intelligence on attacker techniques and tools
## Prerequisites
- Linux server or VM for honeypot deployment (Ubuntu 22.04+ recommended)
- Python 3.8+ with pip for OpenCanary installation
- Docker for T-Pot or containerized deployment
- Network segment with appropriate VLAN configuration
- SIEM integration for alert forwarding (syslog, webhook, or file-based)
- Firewall rules allowing inbound connections to honeypot services
## Workflow
1. **Plan Deployment**: Select honeypot types and network placement strategy.
2. **Install Honeypot**: Deploy OpenCanary, Cowrie, or T-Pot on dedicated host.
3. **Configure Services**: Enable emulated services (SSH, HTTP, SMB, FTP, RDP).
4. **Set Up Alerting**: Configure log forwarding to SIEM and alert channels.
5. **Deploy Canary Tokens**: Place credential files, shares, and DNS entries.
6. **Monitor Interactions**: Analyze honeypot logs for attacker activity.
7. **Tune and Maintain**: Update configurations based on detection results.
## Key Concepts
| Concept | Description |
|---------|-------------|
| OpenCanary | Lightweight Python honeypot with modular service emulation |
| Cowrie | Medium-interaction SSH/Telnet honeypot capturing commands |
| T-Pot | Multi-honeypot platform with ELK stack visualization |
| Canary Token | Tripwire credential or file that alerts when accessed |
| Low-Interaction | Emulates services at protocol level without full OS |
| High-Interaction | Full OS honeypot capturing complete attacker sessions |
## Tools & Systems
| Tool | Purpose |
|------|---------|
| OpenCanary | Modular honeypot daemon with service emulation |
| Cowrie | SSH/Telnet honeypot with session recording |
| T-Pot | All-in-one multi-honeypot platform |
| Dionaea | Malware-capturing honeypot for exploit detection |
| Splunk/Elastic | SIEM for honeypot alert aggregation |
## Output Format
```
Alert: HONEYPOT-[SERVICE]-[DATE]-[SEQ]
Honeypot: [Hostname/IP]
Service: [SSH/HTTP/SMB/FTP/RDP]
Source IP: [Attacker IP]
Interaction: [Login attempt/Port scan/File access]
Credentials Used: [Username:Password if applicable]
Commands Executed: [For SSH honeypots]
Risk Level: [Critical/High/Medium/Low]
```
@@ -0,0 +1,128 @@
# Network Deception with Honeypots Reference
## OpenCanary Installation
```bash
# Ubuntu/Debian
sudo apt-get install python3-dev python3-pip python3-virtualenv libssl-dev libpcap-dev
virtualenv canary-env && source canary-env/bin/activate
pip install opencanary
# Docker
docker pull thinkst/opencanary
docker run -d --network host -v /path/to/config:/etc/opencanaryd thinkst/opencanary
```
## OpenCanary CLI
```bash
# Generate default config
opencanaryd --copyconfig
# Start daemon
opencanaryd --start
# Stop daemon
opencanaryd --stop
# Check status
opencanaryd --status
# Run in foreground (debug)
opencanaryd --dev
```
## Configuration File (`/etc/opencanaryd/opencanary.conf`)
```json
{
"device.node_id": "honeypot-dmz-01",
"ssh.enabled": true,
"ssh.port": 22,
"ssh.version": "SSH-2.0-OpenSSH_7.6p1 Ubuntu-4ubuntu0.3",
"http.enabled": true,
"http.port": 80,
"http.banner": "Apache/2.4.41 (Ubuntu)",
"http.skin": "nasLogin",
"smb.enabled": true,
"smb.filelist": [{"name": "passwords.xlsx", "type": "xlsx"}],
"ftp.enabled": true,
"ftp.port": 21,
"ftp.banner": "FTP server ready",
"mysql.enabled": true,
"mysql.port": 3306,
"rdp.enabled": true,
"rdp.port": 3389
}
```
## Available Service Modules
| Service | Config Key | Default Port | Interaction Level |
|---------|-----------|-------------|-------------------|
| SSH | ssh.enabled | 22 | Medium |
| HTTP | http.enabled | 80 | Low-Medium |
| FTP | ftp.enabled | 21 | Low |
| SMB | smb.enabled | 445 | Low |
| MySQL | mysql.enabled | 3306 | Low |
| RDP | rdp.enabled | 3389 | Low |
| Telnet | telnet.enabled | 23 | Low |
| SNMP | snmp.enabled | 161 | Low |
| Git | git.enabled | 9418 | Low |
| Redis | redis.enabled | 6379 | Low |
| VNC | vnc.enabled | 5000 | Low |
## Log Format (JSON, one per line)
```json
{
"dst_host": "10.0.0.50",
"dst_port": 22,
"src_host": "10.0.0.100",
"src_port": 45321,
"logtype": 3001,
"node_id": "honeypot-dmz-01",
"utc_time": "2025-03-01 14:30:00.123456",
"logdata": {"USERNAME": "admin", "PASSWORD": "password123"}
}
```
## Log Type Codes
| Code | Service | Event |
|------|---------|-------|
| 1001 | FTP | Login attempt |
| 2001 | HTTP | Login attempt |
| 3001 | SSH | Login attempt |
| 5001 | SMB | File open |
| 6001 | Telnet | Login attempt |
| 7001 | MySQL | Login attempt |
| 8001 | RDP | Login attempt |
## Cowrie SSH Honeypot
```bash
# Docker deployment
docker run -d -p 22:2222 cowrie/cowrie
# Session replay
bin/playlog log/tty/20250301-143000-abc123.log
```
## Syslog Forwarding
```json
{
"logger": {
"class": "PyLogger",
"kwargs": {
"handlers": {
"syslog": {
"class": "logging.handlers.SysLogHandler",
"address": ["siem.example.com", 514]
}
}
}
}
}
```
@@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""Honeypot Deployment Agent - deploys OpenCanary honeypots and analyzes interaction logs."""
import json
import argparse
import logging
import subprocess
import os
import re
from collections import defaultdict
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
OPENCANARY_CONFIG_TEMPLATE = {
"device.node_id": "opencanary-001",
"ip.ignorelist": [],
"logtype.console.enabled": True,
"logger": {
"class": "PyLogger",
"kwargs": {
"formatters": {"plain": {"format": "%(message)s"}},
"handlers": {
"file": {
"class": "logging.FileHandler",
"filename": "/var/tmp/opencanary.log",
},
"console": {
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
},
},
"ftp.enabled": False,
"ftp.port": 21,
"ftp.banner": "FTP server ready",
"http.enabled": False,
"http.port": 80,
"http.banner": "Apache/2.4.41 (Ubuntu)",
"http.skin": "nasLogin",
"httpproxy.enabled": False,
"httpproxy.port": 8080,
"ssh.enabled": False,
"ssh.port": 22,
"ssh.version": "SSH-2.0-OpenSSH_7.6p1 Ubuntu-4ubuntu0.3",
"smb.enabled": False,
"smb.filelist": [{"name": "passwords.xlsx", "type": "xlsx"}, {"name": "backup-credentials.txt", "type": "txt"}],
"telnet.enabled": False,
"telnet.port": 23,
"telnet.banner": "Welcome to the management console",
"rdp.enabled": False,
"rdp.port": 3389,
"mysql.enabled": False,
"mysql.port": 3306,
"snmp.enabled": False,
"snmp.port": 161,
}
def generate_config(services, node_id="opencanary-001", log_path="/var/tmp/opencanary.log"):
"""Generate OpenCanary configuration with specified services enabled."""
config = OPENCANARY_CONFIG_TEMPLATE.copy()
config["device.node_id"] = node_id
config["logger"]["kwargs"]["handlers"]["file"]["filename"] = log_path
for service in services:
key = f"{service}.enabled"
if key in config:
config[key] = True
enabled = [s for s in services if f"{s}.enabled" in config]
logger.info("Generated config with %d services: %s", len(enabled), ", ".join(enabled))
return config
def deploy_opencanary(config, config_path="/etc/opencanaryd/opencanary.conf"):
"""Deploy OpenCanary with generated configuration."""
os.makedirs(os.path.dirname(config_path), exist_ok=True)
with open(config_path, "w") as f:
json.dump(config, f, indent=2)
logger.info("Configuration written to %s", config_path)
start_cmd = ["opencanaryd", "--start"]
result = subprocess.run(start_cmd, capture_output=True, text=True)
return {"config_path": config_path, "started": result.returncode == 0, "output": result.stdout[:200]}
def parse_opencanary_log(log_path="/var/tmp/opencanary.log"):
"""Parse OpenCanary JSON log file for interaction events."""
events = []
try:
with open(log_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
events.append({
"timestamp": event.get("utc_time", ""),
"dst_host": event.get("dst_host", ""),
"dst_port": event.get("dst_port", 0),
"src_host": event.get("src_host", ""),
"src_port": event.get("src_port", 0),
"logtype": event.get("logtype", 0),
"node_id": event.get("node_id", ""),
"logdata": event.get("logdata", {}),
})
except json.JSONDecodeError:
continue
except FileNotFoundError:
logger.warning("Log file not found: %s", log_path)
return events
def analyze_interactions(events):
"""Analyze honeypot interactions for threat intelligence."""
by_source = defaultdict(lambda: {"count": 0, "services": set(), "credentials": []})
by_service = defaultdict(int)
credential_attempts = []
log_type_map = {
1001: "ftp_login", 2001: "http_login", 3001: "ssh_login",
5001: "smb_file_open", 6001: "telnet_login", 7001: "mysql_login",
8001: "rdp_login",
}
for event in events:
src = event["src_host"]
service = log_type_map.get(event["logtype"], f"type_{event['logtype']}")
by_source[src]["count"] += 1
by_source[src]["services"].add(service)
by_service[service] += 1
logdata = event.get("logdata", {})
username = logdata.get("USERNAME", logdata.get("username", ""))
password = logdata.get("PASSWORD", logdata.get("password", ""))
if username:
cred = {"username": username, "password": password, "service": service, "source": src}
credential_attempts.append(cred)
by_source[src]["credentials"].append(cred)
source_summary = {}
for ip, data in sorted(by_source.items(), key=lambda x: x[1]["count"], reverse=True):
source_summary[ip] = {
"interaction_count": data["count"],
"services_targeted": list(data["services"]),
"credential_attempts": len(data["credentials"]),
}
return {
"total_interactions": len(events),
"unique_sources": len(by_source),
"service_distribution": dict(sorted(by_service.items(), key=lambda x: x[1], reverse=True)),
"top_sources": dict(list(source_summary.items())[:20]),
"credential_attempts": len(credential_attempts),
"unique_usernames": len(set(c["username"] for c in credential_attempts)),
"top_credentials": credential_attempts[:20],
}
def check_honeypot_status():
"""Check if OpenCanary daemon is running."""
cmd = ["opencanaryd", "--status"]
result = subprocess.run(cmd, capture_output=True, text=True)
is_running = "running" in result.stdout.lower() or result.returncode == 0
return {"running": is_running, "status_output": result.stdout.strip()[:200]}
def generate_report(analysis, status, config):
"""Generate honeypot deployment and interaction report."""
enabled_services = [k.replace(".enabled", "") for k, v in config.items() if k.endswith(".enabled") and v]
report = {
"timestamp": datetime.utcnow().isoformat(),
"honeypot_type": "OpenCanary",
"node_id": config.get("device.node_id", ""),
"enabled_services": enabled_services,
"daemon_status": status,
"interaction_analysis": analysis,
}
return report
def main():
parser = argparse.ArgumentParser(description="Honeypot Deployment and Analysis Agent")
parser.add_argument("--action", choices=["deploy", "analyze", "status", "full"], default="analyze")
parser.add_argument("--services", nargs="+", default=["ssh", "http", "smb", "ftp", "telnet"],
help="Services to enable (default: ssh http smb ftp telnet)")
parser.add_argument("--node-id", default="opencanary-001", help="Honeypot node identifier")
parser.add_argument("--log-path", default="/var/tmp/opencanary.log", help="OpenCanary log file path")
parser.add_argument("--config-path", default="/etc/opencanaryd/opencanary.conf")
parser.add_argument("--output", default="honeypot_report.json")
args = parser.parse_args()
config = generate_config(args.services, args.node_id, args.log_path)
if args.action in ("deploy", "full"):
deploy_result = deploy_opencanary(config, args.config_path)
logger.info("Deployment: %s", "success" if deploy_result["started"] else "failed")
status = check_honeypot_status()
events = parse_opencanary_log(args.log_path)
analysis = analyze_interactions(events)
report = generate_report(analysis, status, config)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info("Honeypot: %d interactions from %d sources, %d credential attempts",
analysis["total_interactions"], analysis["unique_sources"],
analysis["credential_attempts"])
print(json.dumps(report, indent=2, default=str))
if __name__ == "__main__":
main()
@@ -0,0 +1,22 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,73 @@
---
name: performing-cloud-forensics-with-aws-cloudtrail
description: Perform forensic investigation of AWS environments using CloudTrail logs to reconstruct attacker activity, identify compromised credentials, and analyze API call patterns.
domain: cybersecurity
subdomain: cloud-security
tags: [cloud-security, aws, cloudtrail, forensics, incident-response, dfir, boto3, s3]
version: "1.0"
author: mahipal
license: MIT
---
# Performing Cloud Forensics with AWS CloudTrail
## When to Use
- When investigating suspected AWS account compromise
- After detecting unauthorized API calls or credential exposure
- During incident response involving cloud infrastructure
- When analyzing S3 data exfiltration or IAM privilege escalation
- For post-incident forensic timeline reconstruction
## Prerequisites
- AWS account with CloudTrail enabled (management and data events)
- IAM permissions for cloudtrail:LookupEvents, s3:GetObject, athena:StartQueryExecution
- boto3 Python SDK installed
- CloudTrail logs delivered to S3 with optional Athena table configured
- AWS CLI configured with appropriate credentials
## Workflow
1. **Scope Investigation**: Identify timeframe, affected accounts, and compromised credentials.
2. **Query CloudTrail**: Use boto3 lookup_events or Athena to retrieve relevant API events.
3. **Filter by Indicators**: Search for suspicious user agents, source IPs, and event names.
4. **Reconstruct Timeline**: Build chronological sequence of attacker actions from API calls.
5. **Analyze Access Patterns**: Identify data access, IAM changes, and resource modifications.
6. **Identify Persistence**: Check for new IAM users, access keys, roles, or Lambda functions.
7. **Generate Report**: Produce forensic timeline with findings and remediation steps.
## Key Concepts
| Concept | Description |
|---------|-------------|
| LookupEvents | CloudTrail API to query management events (last 90 days) |
| Athena Queries | SQL queries against CloudTrail logs in S3 for historical analysis |
| User Agent Analysis | Identify tool signatures (AWS CLI, SDK, console, custom) |
| AccessKeyId | Track activity by specific IAM access key |
| EventName | AWS API action name (e.g., GetObject, CreateUser, AssumeRole) |
| sourceIPAddress | Origin IP of API call for geolocation analysis |
## Tools & Systems
| Tool | Purpose |
|------|---------|
| boto3 CloudTrail client | Programmatic CloudTrail event lookup |
| AWS Athena | SQL-based analysis of CloudTrail S3 logs |
| AWS CLI | Command-line CloudTrail queries |
| jq | JSON processing for CloudTrail event parsing |
| CloudTrail Lake | Advanced event data store with SQL query support |
## Output Format
```
Forensic Report: AWS-IR-[DATE]-[SEQ]
Account: [AWS Account ID]
Timeframe: [Start] to [End]
Compromised Credentials: [Access Key IDs]
Suspicious Events: [Count]
Source IPs: [List of attacker IPs]
Actions Taken: [API calls by attacker]
Data Accessed: [S3 objects, secrets, etc.]
Persistence Mechanisms: [New users, keys, roles]
```
@@ -0,0 +1,113 @@
# AWS CloudTrail Forensics API Reference
## boto3 CloudTrail Client
```python
import boto3
client = boto3.client("cloudtrail", region_name="us-east-1")
```
## lookup_events
```python
response = client.lookup_events(
LookupAttributes=[
{"AttributeKey": "Username", "AttributeValue": "compromised-user"},
],
StartTime=datetime(2025, 1, 1),
EndTime=datetime(2025, 1, 2),
MaxResults=50,
)
```
### LookupAttributes Keys
| AttributeKey | Description |
|-------------|-------------|
| EventId | Unique event identifier |
| EventName | AWS API action (e.g., CreateUser, GetObject) |
| ReadOnly | true/false for read-only API calls |
| Username | IAM user or role session name |
| ResourceType | AWS resource type (e.g., AWS::S3::Object) |
| ResourceName | Name or ARN of the resource accessed |
| EventSource | AWS service (e.g., iam.amazonaws.com) |
| AccessKeyId | IAM access key used for the API call |
### Response Structure
```json
{
"Events": [
{
"EventId": "abc123",
"EventName": "CreateUser",
"EventTime": "2025-01-01T12:00:00Z",
"Username": "attacker",
"CloudTrailEvent": "{\"sourceIPAddress\":\"1.2.3.4\",\"userAgent\":\"aws-cli/2.0\",...}"
}
],
"NextToken": "..."
}
```
## Paginator Usage
```python
paginator = client.get_paginator("lookup_events")
for page in paginator.paginate(
LookupAttributes=[{"AttributeKey": "AccessKeyId", "AttributeValue": "AKIA..."}],
StartTime=start, EndTime=end
):
for event in page["Events"]:
ct = json.loads(event["CloudTrailEvent"])
print(ct["sourceIPAddress"], ct["eventName"])
```
## AWS CLI Equivalents
```bash
# Lookup events by username
aws cloudtrail lookup-events \
--lookup-attributes AttributeKey=Username,AttributeValue=compromised-user \
--start-time 2025-01-01T00:00:00Z \
--output json
# Search by access key
aws cloudtrail lookup-events \
--lookup-attributes AttributeKey=AccessKeyId,AttributeValue=AKIAEXAMPLE \
--max-results 50
```
## Athena Query for S3 CloudTrail Logs
```sql
SELECT eventtime, eventname, useridentity.arn, sourceipaddress, useragent,
requestparameters, responseelements, errorcode
FROM cloudtrail_logs
WHERE eventtime BETWEEN '2025-01-01' AND '2025-01-02'
AND useridentity.accesskeyid = 'AKIAEXAMPLE'
ORDER BY eventtime;
```
## Key Forensic Event Names
| Event Name | Service | Forensic Significance |
|-----------|---------|----------------------|
| CreateUser | IAM | Persistence - new user account |
| CreateAccessKey | IAM | Persistence - new credential |
| AssumeRole | STS | Lateral movement / privilege escalation |
| GetObject | S3 | Data exfiltration |
| StopLogging | CloudTrail | Anti-forensics |
| PutBucketPolicy | S3 | Permission modification |
| RunInstances | EC2 | Cryptomining / C2 infrastructure |
| GetSecretValue | SecretsManager | Credential theft |
## Suspicious User Agents
| User Agent Pattern | Tool |
|-------------------|------|
| `Pacu/...` | AWS exploitation framework |
| `python-requests` | Custom Python scripts |
| `aws-cli/2.x` from unusual IP | CLI from attacker machine |
| `Scout Suite` | Cloud security assessment |
| `Prowler` | AWS security scanner |
@@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""AWS CloudTrail Forensics Agent - investigates API activity for incident response using boto3."""
import json
import argparse
import logging
from collections import defaultdict
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
try:
import boto3
from botocore.exceptions import ClientError
HAS_BOTO3 = True
except ImportError:
HAS_BOTO3 = False
SUSPICIOUS_EVENTS = [
"CreateUser", "CreateAccessKey", "AttachUserPolicy", "AttachRolePolicy",
"PutUserPolicy", "CreateRole", "AssumeRole", "CreateLoginProfile",
"UpdateLoginProfile", "CreateFunction20150331", "UpdateFunctionCode20150331v2",
"AuthorizeSecurityGroupIngress", "RunInstances", "StopLogging", "DeleteTrail",
"PutBucketPolicy", "PutBucketAcl", "GetSecretValue", "GetParametersByPath",
]
PERSISTENCE_EVENTS = [
"CreateUser", "CreateAccessKey", "CreateRole", "CreateLoginProfile",
"CreateFunction20150331", "CreateEventSourceMapping20150331",
]
def lookup_events(client, start_time, end_time, username=None, access_key_id=None, event_name=None):
"""Query CloudTrail using lookup_events with pagination."""
kwargs = {
"StartTime": start_time,
"EndTime": end_time,
"MaxResults": 50,
}
lookup_attrs = []
if username:
lookup_attrs.append({"AttributeKey": "Username", "AttributeValue": username})
if access_key_id:
lookup_attrs.append({"AttributeKey": "AccessKeyId", "AttributeValue": access_key_id})
if event_name:
lookup_attrs.append({"AttributeKey": "EventName", "AttributeValue": event_name})
if lookup_attrs:
kwargs["LookupAttributes"] = lookup_attrs
events = []
paginator = client.get_paginator("lookup_events")
for page in paginator.paginate(**kwargs):
for event in page.get("Events", []):
ct_event = json.loads(event.get("CloudTrailEvent", "{}"))
events.append({
"event_time": str(event.get("EventTime", "")),
"event_name": event.get("EventName", ""),
"event_source": ct_event.get("eventSource", ""),
"username": event.get("Username", ""),
"source_ip": ct_event.get("sourceIPAddress", ""),
"user_agent": ct_event.get("userAgent", ""),
"access_key_id": ct_event.get("userIdentity", {}).get("accessKeyId", ""),
"arn": ct_event.get("userIdentity", {}).get("arn", ""),
"error_code": ct_event.get("errorCode", ""),
"error_message": ct_event.get("errorMessage", ""),
"request_params": ct_event.get("requestParameters", {}),
"response_elements": ct_event.get("responseElements", {}),
"aws_region": ct_event.get("awsRegion", ""),
})
logger.info("Retrieved %d CloudTrail events", len(events))
return events
def detect_suspicious_activity(events):
"""Flag events matching suspicious API calls."""
suspicious = []
for event in events:
if event["event_name"] in SUSPICIOUS_EVENTS:
event["indicator"] = "suspicious_api_call"
event["severity"] = "high" if event["event_name"] in PERSISTENCE_EVENTS else "medium"
suspicious.append(event)
if event["error_code"] == "AccessDenied":
event["indicator"] = "access_denied_enumeration"
event["severity"] = "medium"
suspicious.append(event)
return suspicious
def detect_persistence(events):
"""Identify persistence mechanisms created by attacker."""
persistence = []
for event in events:
if event["event_name"] in PERSISTENCE_EVENTS and not event["error_code"]:
details = {}
resp = event.get("response_elements", {})
if event["event_name"] == "CreateUser":
details["created_user"] = resp.get("user", {}).get("userName", "")
elif event["event_name"] == "CreateAccessKey":
details["access_key_id"] = resp.get("accessKey", {}).get("accessKeyId", "")
details["for_user"] = resp.get("accessKey", {}).get("userName", "")
elif event["event_name"] == "CreateRole":
details["role_name"] = resp.get("role", {}).get("roleName", "")
persistence.append({**event, "persistence_details": details})
return persistence
def analyze_source_ips(events):
"""Analyze source IP distribution for anomalies."""
ip_activity = defaultdict(lambda: {"count": 0, "events": set(), "users": set()})
for event in events:
ip = event["source_ip"]
if ip:
ip_activity[ip]["count"] += 1
ip_activity[ip]["events"].add(event["event_name"])
ip_activity[ip]["users"].add(event["username"])
result = {}
for ip, data in ip_activity.items():
result[ip] = {
"request_count": data["count"],
"unique_events": len(data["events"]),
"unique_users": len(data["users"]),
"event_types": list(data["events"])[:10],
}
return dict(sorted(result.items(), key=lambda x: x[1]["request_count"], reverse=True))
def analyze_user_agents(events):
"""Analyze user agents for tool identification."""
ua_counts = defaultdict(int)
for event in events:
ua = event.get("user_agent", "unknown")
ua_counts[ua] += 1
suspicious_uas = {}
for ua, count in ua_counts.items():
if any(tool in ua.lower() for tool in ["pacu", "prowler", "scoutsuite", "boto", "python", "curl", "custom"]):
suspicious_uas[ua] = count
return {
"all_user_agents": dict(sorted(ua_counts.items(), key=lambda x: x[1], reverse=True)[:15]),
"suspicious_user_agents": suspicious_uas,
}
def build_timeline(events):
"""Build chronological attack timeline."""
return sorted(
[{"time": e["event_time"], "event": e["event_name"], "user": e["username"],
"source_ip": e["source_ip"], "error": e.get("error_code", "")}
for e in events],
key=lambda x: x["time"]
)
def generate_report(events, suspicious, persistence, ip_analysis, ua_analysis):
"""Generate forensic investigation report."""
report = {
"timestamp": datetime.utcnow().isoformat(),
"investigation_type": "AWS CloudTrail Forensic Analysis",
"total_events_analyzed": len(events),
"suspicious_events": len(suspicious),
"persistence_mechanisms_found": len(persistence),
"unique_source_ips": len(ip_analysis),
"source_ip_analysis": dict(list(ip_analysis.items())[:10]),
"user_agent_analysis": ua_analysis,
"persistence_details": persistence[:10],
"top_suspicious_events": suspicious[:20],
"timeline": build_timeline(events)[:50],
}
return report
def main():
parser = argparse.ArgumentParser(description="AWS CloudTrail Forensics Agent")
parser.add_argument("--hours-back", type=int, default=24, help="Hours to look back (default: 24)")
parser.add_argument("--username", help="Filter by IAM username")
parser.add_argument("--access-key-id", help="Filter by access key ID")
parser.add_argument("--event-name", help="Filter by specific event name")
parser.add_argument("--region", default="us-east-1", help="AWS region (default: us-east-1)")
parser.add_argument("--profile", help="AWS CLI profile name")
parser.add_argument("--output", default="cloudtrail_forensics_report.json")
args = parser.parse_args()
if not HAS_BOTO3:
logger.error("boto3 is required: pip install boto3")
return
session_kwargs = {}
if args.profile:
session_kwargs["profile_name"] = args.profile
session = boto3.Session(**session_kwargs)
client = session.client("cloudtrail", region_name=args.region)
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=args.hours_back)
logger.info("Querying CloudTrail: %s to %s", start_time.isoformat(), end_time.isoformat())
events = lookup_events(client, start_time, end_time, args.username, args.access_key_id, args.event_name)
suspicious = detect_suspicious_activity(events)
persistence = detect_persistence(events)
ip_analysis = analyze_source_ips(events)
ua_analysis = analyze_user_agents(events)
report = generate_report(events, suspicious, persistence, ip_analysis, ua_analysis)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info("Forensics: %d events, %d suspicious, %d persistence mechanisms",
len(events), len(suspicious), len(persistence))
print(json.dumps(report, indent=2, default=str))
if __name__ == "__main__":
main()