mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
Add 5 new cybersecurity skills: golden ticket detection, traffic baselining, sandbox evasion analysis, domain fronting hunting, SpiderFoot OSINT
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,43 @@
|
||||
---
|
||||
name: analyzing-malware-sandbox-evasion-techniques
|
||||
description: Detect sandbox evasion techniques in malware samples by analyzing timing checks, VM artifact queries, user interaction detection, and sleep inflation patterns from Cuckoo/AnyRun behavioral reports
|
||||
domain: cybersecurity
|
||||
subdomain: malware-analysis
|
||||
tags:
|
||||
- sandbox-evasion
|
||||
- malware-analysis
|
||||
- cuckoo
|
||||
- anyrun
|
||||
- mitre-attack
|
||||
- virtualization-detection
|
||||
- behavioral-analysis
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Analyzing Malware Sandbox Evasion Techniques
|
||||
|
||||
## Overview
|
||||
|
||||
Sandbox evasion (MITRE ATT&CK T1497) allows malware to detect analysis environments and alter behavior to avoid detection. This skill analyzes behavioral reports from Cuckoo Sandbox and AnyRun for evasion indicators including timing-based checks (GetTickCount, QueryPerformanceCounter, sleep inflation), VM artifact detection (registry keys, MAC address prefixes, process names like vmtoolsd.exe), user interaction checks (mouse movement, keyboard input), and environment fingerprinting (disk size, CPU count, RAM). Detection rules flag samples exhibiting these behaviors for deeper manual analysis.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Cuckoo Sandbox 2.0+ or AnyRun account for behavioral analysis reports
|
||||
- Python 3.8+ with json library for report parsing
|
||||
- Behavioral report exports in JSON format
|
||||
|
||||
## Steps
|
||||
|
||||
1. Parse Cuckoo/AnyRun behavioral report JSON files
|
||||
2. Extract API call sequences for timing-related functions
|
||||
3. Identify VM artifact detection via registry queries and WMI calls
|
||||
4. Detect sleep inflation by comparing requested vs actual sleep durations
|
||||
5. Flag user interaction checks (GetCursorPos, GetAsyncKeyState patterns)
|
||||
6. Score evasion sophistication based on technique count and diversity
|
||||
7. Map detected techniques to MITRE ATT&CK T1497 sub-techniques
|
||||
|
||||
## Expected Output
|
||||
|
||||
JSON report listing detected evasion techniques with MITRE ATT&CK mapping, API call evidence, evasion sophistication score, and classification of evasion categories (timing, VM detection, user interaction, environment fingerprinting).
|
||||
@@ -0,0 +1,84 @@
|
||||
# Malware Sandbox Evasion Techniques API Reference
|
||||
|
||||
## MITRE ATT&CK T1497 Sub-techniques
|
||||
|
||||
| Sub-technique | ID | Evasion Method |
|
||||
|---|---|---|
|
||||
| System Checks | T1497.001 | VM artifacts, registry keys, MAC prefixes, process names |
|
||||
| User Activity Based Checks | T1497.002 | Mouse movement, keyboard input, foreground window |
|
||||
| Time Based Evasion | T1497.003 | GetTickCount, sleep inflation, RDTSC timing |
|
||||
|
||||
## Cuckoo Sandbox Report JSON Structure
|
||||
|
||||
### API Call Format
|
||||
```json
|
||||
{
|
||||
"behavior": {
|
||||
"processes": [
|
||||
{
|
||||
"process_name": "malware.exe",
|
||||
"pid": 1234,
|
||||
"calls": [
|
||||
{
|
||||
"api": "GetTickCount",
|
||||
"category": "system",
|
||||
"arguments": {},
|
||||
"return": "123456789"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Timing API Indicators
|
||||
|
||||
| API | Purpose | Evasion Use |
|
||||
|---|---|---|
|
||||
| GetTickCount / GetTickCount64 | System uptime in ms | Check if uptime < 20min (sandbox) |
|
||||
| QueryPerformanceCounter | High-res timer | Measure sleep accuracy |
|
||||
| GetSystemTimeAsFileTime | System time | Detect time acceleration |
|
||||
| NtQuerySystemTime | Kernel time query | Compare with user-mode time |
|
||||
| RDTSC | CPU timestamp counter | Detect VM overhead in timing |
|
||||
|
||||
## VM Artifact Indicators
|
||||
|
||||
### Registry Keys
|
||||
```
|
||||
HKLM\SOFTWARE\VMware, Inc.\VMware Tools
|
||||
HKLM\SOFTWARE\Oracle\VirtualBox Guest Additions
|
||||
HKLM\HARDWARE\ACPI\DSDT\VBOX__
|
||||
HKLM\SYSTEM\CurrentControlSet\Services\VBoxGuest
|
||||
```
|
||||
|
||||
### VM Process Names
|
||||
```
|
||||
vmtoolsd.exe, vmwaretray.exe # VMware
|
||||
vboxservice.exe, vboxtray.exe # VirtualBox
|
||||
qemu-ga.exe # QEMU
|
||||
prl_tools.exe # Parallels
|
||||
```
|
||||
|
||||
### VM MAC Address Prefixes
|
||||
```
|
||||
00:0C:29 VMware
|
||||
00:50:56 VMware
|
||||
08:00:27 VirtualBox
|
||||
00:1C:42 Parallels
|
||||
52:54:00 QEMU/KVM
|
||||
```
|
||||
|
||||
## AnyRun Report API
|
||||
|
||||
### Get Report
|
||||
```
|
||||
GET https://api.any.run/v1/analysis/{task_id}
|
||||
Authorization: API-Key <key>
|
||||
```
|
||||
|
||||
## CLI Usage
|
||||
```bash
|
||||
python agent.py --report cuckoo_report.json --output evasion_report.json
|
||||
python agent.py --report report.json --min-sleep-ms 30000
|
||||
```
|
||||
@@ -0,0 +1,205 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Analyze malware sandbox evasion techniques from Cuckoo/AnyRun behavioral reports."""
|
||||
|
||||
import json
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
TIMING_APIS = {
|
||||
"GetTickCount", "GetTickCount64", "QueryPerformanceCounter",
|
||||
"QueryPerformanceFrequency", "GetSystemTimeAsFileTime", "NtQuerySystemTime",
|
||||
"timeGetTime", "GetLocalTime", "GetSystemTime",
|
||||
}
|
||||
|
||||
SLEEP_APIS = {"Sleep", "SleepEx", "NtDelayExecution", "WaitForSingleObject"}
|
||||
|
||||
VM_REGISTRY_KEYS = [
|
||||
"HKLM\\SOFTWARE\\VMware", "HKLM\\SOFTWARE\\Oracle\\VirtualBox",
|
||||
"HKLM\\HARDWARE\\ACPI\\DSDT\\VBOX", "HKLM\\SYSTEM\\CurrentControlSet\\Services\\VBoxGuest",
|
||||
"HKLM\\SOFTWARE\\Microsoft\\Virtual Machine\\Guest\\Parameters",
|
||||
"HKLM\\HARDWARE\\Description\\System\\SystemBiosVersion",
|
||||
]
|
||||
|
||||
VM_PROCESSES = {
|
||||
"vmtoolsd.exe", "vmwaretray.exe", "vboxservice.exe", "vboxtray.exe",
|
||||
"qemu-ga.exe", "vmusrvc.exe", "prl_tools.exe", "xenservice.exe",
|
||||
"windanr.exe", "vdagent.exe",
|
||||
}
|
||||
|
||||
VM_MAC_PREFIXES = ["00:0C:29", "00:50:56", "08:00:27", "00:1C:42", "00:16:3E", "52:54:00"]
|
||||
|
||||
USER_INTERACTION_APIS = {
|
||||
"GetCursorPos", "GetAsyncKeyState", "GetForegroundWindow",
|
||||
"GetLastInputInfo", "mouse_event", "keybd_event",
|
||||
}
|
||||
|
||||
WMI_EVASION_QUERIES = [
|
||||
"Win32_ComputerSystem", "Win32_BIOS", "Win32_DiskDrive",
|
||||
"Win32_PhysicalMemory", "Win32_Processor",
|
||||
]
|
||||
|
||||
|
||||
def parse_cuckoo_report(filepath):
|
||||
"""Parse a Cuckoo Sandbox behavioral report JSON."""
|
||||
with open(filepath) as f:
|
||||
report = json.load(f)
|
||||
behavior = report.get("behavior", {})
|
||||
api_calls = []
|
||||
for process in behavior.get("processes", []):
|
||||
for call in process.get("calls", []):
|
||||
api_calls.append({
|
||||
"api": call.get("api", ""),
|
||||
"category": call.get("category", ""),
|
||||
"arguments": call.get("arguments", {}),
|
||||
"return": call.get("return", ""),
|
||||
"process_name": process.get("process_name", ""),
|
||||
"pid": process.get("pid", 0),
|
||||
})
|
||||
return api_calls, report
|
||||
|
||||
|
||||
def detect_timing_checks(api_calls):
|
||||
"""Detect timing-based sandbox evasion via GetTickCount, QPC, etc."""
|
||||
findings = []
|
||||
timing_count = 0
|
||||
for call in api_calls:
|
||||
if call["api"] in TIMING_APIS:
|
||||
timing_count += 1
|
||||
if timing_count >= 3:
|
||||
findings.append({
|
||||
"technique": "Timing-Based Evasion",
|
||||
"mitre_id": "T1497.003",
|
||||
"api_count": timing_count,
|
||||
"apis_used": list({c["api"] for c in api_calls if c["api"] in TIMING_APIS}),
|
||||
"severity": "high",
|
||||
"description": f"{timing_count} timing API calls detected; malware may be measuring execution time to detect sandbox acceleration",
|
||||
})
|
||||
return findings
|
||||
|
||||
|
||||
def detect_sleep_inflation(api_calls, min_sleep_ms=60000):
|
||||
"""Detect sleep calls with long durations used to evade sandbox time limits."""
|
||||
findings = []
|
||||
for call in api_calls:
|
||||
if call["api"] not in SLEEP_APIS:
|
||||
continue
|
||||
ms = 0
|
||||
args = call.get("arguments", {})
|
||||
if isinstance(args, dict):
|
||||
ms = int(args.get("Milliseconds", args.get("milliseconds", 0)))
|
||||
elif isinstance(args, list):
|
||||
for a in args:
|
||||
if isinstance(a, dict) and a.get("name", "").lower() == "milliseconds":
|
||||
ms = int(a.get("value", 0))
|
||||
if ms >= min_sleep_ms:
|
||||
findings.append({
|
||||
"technique": "Sleep Inflation",
|
||||
"mitre_id": "T1497.003",
|
||||
"api": call["api"],
|
||||
"sleep_ms": ms,
|
||||
"sleep_seconds": ms / 1000,
|
||||
"process": call["process_name"],
|
||||
"severity": "high",
|
||||
"description": f"Sleep call of {ms / 1000:.0f}s detected; likely delaying execution to outlast sandbox analysis window",
|
||||
})
|
||||
return findings
|
||||
|
||||
|
||||
def detect_vm_artifact_checks(api_calls):
|
||||
"""Detect VM artifact queries (registry, processes, MAC addresses)."""
|
||||
findings = []
|
||||
for call in api_calls:
|
||||
args_str = json.dumps(call.get("arguments", "")).lower()
|
||||
for reg_key in VM_REGISTRY_KEYS:
|
||||
if reg_key.lower() in args_str:
|
||||
findings.append({
|
||||
"technique": "VM Registry Artifact Check",
|
||||
"mitre_id": "T1497.001",
|
||||
"registry_key": reg_key,
|
||||
"api": call["api"],
|
||||
"severity": "high",
|
||||
})
|
||||
break
|
||||
for wmi_query in WMI_EVASION_QUERIES:
|
||||
if wmi_query.lower() in args_str:
|
||||
findings.append({
|
||||
"technique": "WMI Environment Fingerprinting",
|
||||
"mitre_id": "T1497.001",
|
||||
"wmi_class": wmi_query,
|
||||
"api": call["api"],
|
||||
"severity": "medium",
|
||||
})
|
||||
break
|
||||
return findings
|
||||
|
||||
|
||||
def detect_user_interaction_checks(api_calls):
|
||||
"""Detect checks for user interaction (mouse, keyboard, foreground window)."""
|
||||
interaction_apis = [c for c in api_calls if c["api"] in USER_INTERACTION_APIS]
|
||||
if len(interaction_apis) >= 2:
|
||||
return [{
|
||||
"technique": "User Interaction Detection",
|
||||
"mitre_id": "T1497.002",
|
||||
"api_count": len(interaction_apis),
|
||||
"apis_used": list({c["api"] for c in interaction_apis}),
|
||||
"severity": "medium",
|
||||
"description": "Malware checks for user input to determine if running in automated sandbox",
|
||||
}]
|
||||
return []
|
||||
|
||||
|
||||
def score_evasion_sophistication(all_findings):
|
||||
"""Score evasion sophistication based on technique diversity."""
|
||||
technique_ids = {f["mitre_id"] for f in all_findings}
|
||||
categories = {f["technique"].split()[0] for f in all_findings}
|
||||
score = min(len(all_findings) * 10 + len(technique_ids) * 15 + len(categories) * 10, 100)
|
||||
level = "low" if score < 30 else "medium" if score < 60 else "high"
|
||||
return {"score": score, "level": level, "unique_techniques": len(technique_ids), "total_indicators": len(all_findings)}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Sandbox Evasion Technique Analyzer")
|
||||
parser.add_argument("--report", required=True, help="Path to Cuckoo/AnyRun behavioral report JSON")
|
||||
parser.add_argument("--min-sleep-ms", type=int, default=60000, help="Minimum sleep duration to flag (ms)")
|
||||
parser.add_argument("--output", default="evasion_analysis_report.json", help="Output report path")
|
||||
args = parser.parse_args()
|
||||
|
||||
api_calls, raw_report = parse_cuckoo_report(args.report)
|
||||
print(f"[+] Parsed {len(api_calls)} API calls from behavioral report")
|
||||
|
||||
timing = detect_timing_checks(api_calls)
|
||||
sleep = detect_sleep_inflation(api_calls, args.min_sleep_ms)
|
||||
vm_checks = detect_vm_artifact_checks(api_calls)
|
||||
user_checks = detect_user_interaction_checks(api_calls)
|
||||
|
||||
all_findings = timing + sleep + vm_checks + user_checks
|
||||
sophistication = score_evasion_sophistication(all_findings)
|
||||
|
||||
report = {
|
||||
"analysis_time": datetime.utcnow().isoformat() + "Z",
|
||||
"sample_sha256": raw_report.get("target", {}).get("file", {}).get("sha256", ""),
|
||||
"total_api_calls": len(api_calls),
|
||||
"evasion_findings": {
|
||||
"timing_checks": timing,
|
||||
"sleep_inflation": sleep,
|
||||
"vm_artifact_checks": vm_checks,
|
||||
"user_interaction_checks": user_checks,
|
||||
},
|
||||
"total_indicators": len(all_findings),
|
||||
"sophistication": sophistication,
|
||||
"mitre_techniques": ["T1497.001", "T1497.002", "T1497.003"],
|
||||
}
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(report, f, indent=2)
|
||||
print(f"[+] Timing checks: {len(timing)}")
|
||||
print(f"[+] Sleep inflation: {len(sleep)}")
|
||||
print(f"[+] VM artifact checks: {len(vm_checks)}")
|
||||
print(f"[+] User interaction checks: {len(user_checks)}")
|
||||
print(f"[+] Evasion sophistication: {sophistication['level']} ({sophistication['score']}/100)")
|
||||
print(f"[+] Report saved to {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,44 @@
|
||||
---
|
||||
name: detecting-golden-ticket-forgery
|
||||
description: Detect Kerberos Golden Ticket forgery by analyzing Windows Event ID 4769 for RC4 encryption downgrades (0x17), abnormal ticket lifetimes, and krbtgt account anomalies in Splunk and Elastic SIEM
|
||||
domain: cybersecurity
|
||||
subdomain: threat-detection
|
||||
tags:
|
||||
- golden-ticket
|
||||
- kerberos
|
||||
- active-directory
|
||||
- mimikatz
|
||||
- splunk
|
||||
- credential-theft
|
||||
- windows-security
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Detecting Golden Ticket Forgery
|
||||
|
||||
## Overview
|
||||
|
||||
A Golden Ticket attack (MITRE ATT&CK T1558.001) involves forging a Kerberos Ticket Granting Ticket (TGT) using the krbtgt account NTLM hash, granting unrestricted access to any service in the Active Directory domain. This skill detects Golden Ticket usage by analyzing Event ID 4769 for RC4 encryption type (0x17) in environments enforcing AES, identifying tickets with abnormal lifetimes exceeding domain policy, correlating TGS requests with missing corresponding TGT requests (Event ID 4768), and detecting krbtgt password age anomalies.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Windows Domain Controller with Kerberos audit logging enabled
|
||||
- Splunk or Elastic SIEM ingesting Windows Security event logs
|
||||
- Python 3.8+ for offline event log analysis
|
||||
- Knowledge of domain Kerberos encryption policy (AES vs RC4)
|
||||
|
||||
## Steps
|
||||
|
||||
1. Audit domain Kerberos encryption policy to establish AES-only baseline
|
||||
2. Forward Event IDs 4768 and 4769 to SIEM platform
|
||||
3. Detect RC4 (0x17) encryption in TGS requests where AES is enforced
|
||||
4. Identify TGS requests without corresponding TGT requests (forged ticket indicator)
|
||||
5. Alert on ticket lifetimes exceeding MaxTicketAge domain policy
|
||||
6. Monitor krbtgt account password age and last reset date
|
||||
7. Correlate findings with host/user context for risk scoring
|
||||
|
||||
## Expected Output
|
||||
|
||||
JSON report with Golden Ticket indicators including RC4 downgrades, orphaned TGS requests, abnormal ticket lifetimes, and risk-scored alerts with MITRE ATT&CK technique mapping.
|
||||
@@ -0,0 +1,82 @@
|
||||
# Golden Ticket Forgery Detection API Reference
|
||||
|
||||
## Windows Security Event IDs
|
||||
|
||||
### Event ID 4768 - TGT Requested (AS-REQ)
|
||||
```
|
||||
Key Fields:
|
||||
TargetUserName - Account requesting TGT
|
||||
TargetDomainName - Domain of requesting account
|
||||
IpAddress - Source IP
|
||||
TicketEncryptionType - 0x12 (AES256), 0x11 (AES128), 0x17 (RC4)
|
||||
PreAuthType - 15 = PA-ENC-TIMESTAMP (normal)
|
||||
```
|
||||
|
||||
### Event ID 4769 - TGS Requested (TGS-REQ)
|
||||
```
|
||||
Key Fields:
|
||||
TargetUserName - Account using the ticket
|
||||
ServiceName - SPN of target service
|
||||
IpAddress - Source IP of requestor
|
||||
TicketEncryptionType - 0x17 = RC4 (Golden Ticket indicator)
|
||||
TicketOptions - Kerberos ticket flags
|
||||
LogonGuid - Correlate with Event 4624
|
||||
```
|
||||
|
||||
## Detection Indicators
|
||||
|
||||
| Indicator | Normal | Golden Ticket |
|
||||
|---|---|---|
|
||||
| TicketEncryptionType | 0x12 (AES256) | 0x17 (RC4-HMAC) |
|
||||
| TGT Lifetime | <= 10 hours | Often 10+ years |
|
||||
| TGS without TGT | Always preceded by 4768 | 4769 without 4768 |
|
||||
| Domain field | Matches domain | May be blank or incorrect |
|
||||
|
||||
## Splunk SPL Queries
|
||||
|
||||
### RC4 TGS Detection (Golden Ticket)
|
||||
```spl
|
||||
index=wineventlog sourcetype="WinEventLog:Security" EventCode=4769
|
||||
TicketEncryptionType=0x17
|
||||
ServiceName!="krbtgt"
|
||||
| stats count by TargetUserName, IpAddress, ServiceName
|
||||
| where count > 3
|
||||
| sort -count
|
||||
```
|
||||
|
||||
### Orphaned TGS (No Prior TGT)
|
||||
```spl
|
||||
index=wineventlog EventCode=4769
|
||||
| join type=left TargetUserName
|
||||
[search index=wineventlog EventCode=4768
|
||||
| dedup TargetUserName | fields TargetUserName]
|
||||
| where isnull(TargetUserName)
|
||||
| stats count by TargetUserName, IpAddress
|
||||
```
|
||||
|
||||
### krbtgt Service Anomaly
|
||||
```spl
|
||||
index=wineventlog EventCode=4769 ServiceName="krbtgt*"
|
||||
| table _time, TargetUserName, IpAddress, TicketEncryptionType
|
||||
```
|
||||
|
||||
## Elastic KQL
|
||||
|
||||
### RC4 Downgrade in Elastic
|
||||
```kql
|
||||
event.code: "4769" AND winlog.event_data.TicketEncryptionType: "0x17"
|
||||
AND NOT winlog.event_data.ServiceName: "krbtgt"
|
||||
```
|
||||
|
||||
## MITRE ATT&CK
|
||||
|
||||
| Technique | ID | Description |
|
||||
|---|---|---|
|
||||
| Steal or Forge Kerberos Tickets: Golden Ticket | T1558.001 | Forge TGT using krbtgt hash |
|
||||
|
||||
## CLI Usage
|
||||
```bash
|
||||
python agent.py --evtx-xml security_events.xml --output golden_ticket_report.json
|
||||
python agent.py --show-splunk
|
||||
python agent.py --evtx-xml events.xml --max-ticket-hours 8
|
||||
```
|
||||
@@ -0,0 +1,201 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Detect Kerberos Golden Ticket forgery via Windows Security event log analysis."""
|
||||
|
||||
import json
|
||||
import argparse
|
||||
import xml.etree.ElementTree as ET
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def parse_security_events(xml_path):
|
||||
"""Parse exported Windows Security event log XML for Kerberos events 4768/4769."""
|
||||
tree = ET.parse(xml_path)
|
||||
root = tree.getroot()
|
||||
ns = {"e": "http://schemas.microsoft.com/win/2004/08/events/event"}
|
||||
events = []
|
||||
for event_el in root.findall(".//e:Event", ns):
|
||||
sys_el = event_el.find("e:System", ns)
|
||||
event_id = int(sys_el.find("e:EventID", ns).text)
|
||||
if event_id not in (4768, 4769):
|
||||
continue
|
||||
time_created = sys_el.find("e:TimeCreated", ns).attrib.get("SystemTime", "")
|
||||
data_el = event_el.find("e:EventData", ns)
|
||||
fields = {}
|
||||
for d in data_el.findall("e:Data", ns):
|
||||
fields[d.attrib.get("Name", "")] = d.text or ""
|
||||
events.append({"event_id": event_id, "timestamp": time_created, **fields})
|
||||
return events
|
||||
|
||||
|
||||
def detect_rc4_in_aes_environment(events):
|
||||
"""Detect RC4 encryption (0x17) in TGS requests where AES should be enforced."""
|
||||
alerts = []
|
||||
for ev in events:
|
||||
if ev["event_id"] != 4769:
|
||||
continue
|
||||
enc_type = ev.get("TicketEncryptionType", "")
|
||||
if enc_type in ("0x17", "23"):
|
||||
alerts.append({
|
||||
"detection": "RC4 Encryption in TGS Request",
|
||||
"mitre_technique": "T1558.001",
|
||||
"timestamp": ev["timestamp"],
|
||||
"user": ev.get("TargetUserName", ""),
|
||||
"domain": ev.get("TargetDomainName", ""),
|
||||
"service": ev.get("ServiceName", ""),
|
||||
"ip_address": ev.get("IpAddress", ""),
|
||||
"encryption_type": enc_type,
|
||||
"severity": "critical",
|
||||
"description": "RC4 (0x17) encryption detected in TGS request; Golden Ticket indicator in AES-enforced environments",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def detect_orphaned_tgs(events):
|
||||
"""Detect TGS requests (4769) without preceding TGT request (4768) from same user."""
|
||||
tgt_users = set()
|
||||
for ev in events:
|
||||
if ev["event_id"] == 4768:
|
||||
tgt_users.add(f"{ev.get('TargetUserName', '')}@{ev.get('TargetDomainName', '')}")
|
||||
alerts = []
|
||||
tgs_without_tgt = defaultdict(list)
|
||||
for ev in events:
|
||||
if ev["event_id"] != 4769:
|
||||
continue
|
||||
user_key = f"{ev.get('TargetUserName', '')}@{ev.get('TargetDomainName', '')}"
|
||||
if user_key not in tgt_users and ev.get("TargetUserName", ""):
|
||||
tgs_without_tgt[user_key].append(ev)
|
||||
for user, tgs_events in tgs_without_tgt.items():
|
||||
alerts.append({
|
||||
"detection": "Orphaned TGS Request (No Preceding TGT)",
|
||||
"mitre_technique": "T1558.001",
|
||||
"user": user,
|
||||
"tgs_count": len(tgs_events),
|
||||
"services": list({e.get("ServiceName", "") for e in tgs_events}),
|
||||
"source_ips": list({e.get("IpAddress", "") for e in tgs_events}),
|
||||
"first_seen": tgs_events[0]["timestamp"],
|
||||
"severity": "critical",
|
||||
"description": "TGS requests without corresponding TGT; forged ticket likely",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def detect_abnormal_ticket_lifetime(events, max_lifetime_hours=10):
|
||||
"""Detect tickets with lifetime exceeding domain policy (default MaxTicketAge=10h)."""
|
||||
user_tgt_times = defaultdict(list)
|
||||
for ev in events:
|
||||
if ev["event_id"] == 4768 and ev.get("TargetUserName"):
|
||||
try:
|
||||
ts = datetime.fromisoformat(ev["timestamp"].replace("Z", "+00:00"))
|
||||
user_tgt_times[ev["TargetUserName"]].append(ts)
|
||||
except (ValueError, AttributeError):
|
||||
continue
|
||||
alerts = []
|
||||
for user, times in user_tgt_times.items():
|
||||
if len(times) < 2:
|
||||
continue
|
||||
times.sort()
|
||||
for i in range(1, len(times)):
|
||||
gap_hours = (times[i] - times[i - 1]).total_seconds() / 3600
|
||||
if gap_hours > max_lifetime_hours * 2:
|
||||
alerts.append({
|
||||
"detection": "Abnormal TGT Renewal Gap",
|
||||
"mitre_technique": "T1558.001",
|
||||
"user": user,
|
||||
"gap_hours": round(gap_hours, 2),
|
||||
"max_expected_hours": max_lifetime_hours,
|
||||
"severity": "high",
|
||||
"description": f"TGT renewal gap of {gap_hours:.1f}h exceeds 2x MaxTicketAge ({max_lifetime_hours}h)",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def detect_krbtgt_service_anomaly(events):
|
||||
"""Detect TGS requests targeting the krbtgt service (unusual and suspicious)."""
|
||||
alerts = []
|
||||
for ev in events:
|
||||
if ev["event_id"] == 4769 and ev.get("ServiceName", "").lower().startswith("krbtgt"):
|
||||
alerts.append({
|
||||
"detection": "TGS Request Targeting krbtgt Service",
|
||||
"mitre_technique": "T1558.001",
|
||||
"timestamp": ev["timestamp"],
|
||||
"user": ev.get("TargetUserName", ""),
|
||||
"service": ev.get("ServiceName", ""),
|
||||
"ip_address": ev.get("IpAddress", ""),
|
||||
"severity": "critical",
|
||||
"description": "Direct TGS request for krbtgt service is highly anomalous",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def generate_splunk_queries():
|
||||
"""Return Splunk SPL queries for Golden Ticket detection."""
|
||||
return {
|
||||
"rc4_downgrade": (
|
||||
'index=wineventlog sourcetype="WinEventLog:Security" EventCode=4769 '
|
||||
'TicketEncryptionType=0x17 ServiceName!="krbtgt" '
|
||||
'| stats count by TargetUserName, IpAddress, ServiceName'
|
||||
),
|
||||
"orphaned_tgs": (
|
||||
'index=wineventlog EventCode=4769 '
|
||||
'| join type=left TargetUserName [search index=wineventlog EventCode=4768 '
|
||||
'| rename TargetUserName as tgt_user | dedup tgt_user | fields tgt_user] '
|
||||
'| where isnull(tgt_user) | stats count by TargetUserName, IpAddress'
|
||||
),
|
||||
"krbtgt_tgs": (
|
||||
'index=wineventlog EventCode=4769 ServiceName="krbtgt*" '
|
||||
'| table _time, TargetUserName, IpAddress, ServiceName, TicketEncryptionType'
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Golden Ticket Forgery Detector")
|
||||
parser.add_argument("--evtx-xml", help="Path to exported Security event log XML")
|
||||
parser.add_argument("--max-ticket-hours", type=int, default=10, help="MaxTicketAge in hours (default: 10)")
|
||||
parser.add_argument("--output", default="golden_ticket_report.json", help="Output report path")
|
||||
parser.add_argument("--show-splunk", action="store_true", help="Print Splunk SPL queries")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.show_splunk:
|
||||
for name, spl in generate_splunk_queries().items():
|
||||
print(f"\n--- {name} ---\n{spl}")
|
||||
return
|
||||
|
||||
if not args.evtx_xml:
|
||||
print("[!] Provide --evtx-xml path or use --show-splunk for detection queries")
|
||||
return
|
||||
|
||||
events = parse_security_events(args.evtx_xml)
|
||||
print(f"[+] Parsed {len(events)} Kerberos events (4768/4769)")
|
||||
|
||||
rc4_alerts = detect_rc4_in_aes_environment(events)
|
||||
orphan_alerts = detect_orphaned_tgs(events)
|
||||
lifetime_alerts = detect_abnormal_ticket_lifetime(events, args.max_ticket_hours)
|
||||
krbtgt_alerts = detect_krbtgt_service_anomaly(events)
|
||||
|
||||
report = {
|
||||
"analysis_time": datetime.utcnow().isoformat() + "Z",
|
||||
"total_events": len(events),
|
||||
"detections": {
|
||||
"rc4_encryption_downgrade": rc4_alerts,
|
||||
"orphaned_tgs_requests": orphan_alerts,
|
||||
"abnormal_ticket_lifetime": lifetime_alerts,
|
||||
"krbtgt_service_anomaly": krbtgt_alerts,
|
||||
},
|
||||
"total_alerts": len(rc4_alerts) + len(orphan_alerts) + len(lifetime_alerts) + len(krbtgt_alerts),
|
||||
"mitre_techniques": ["T1558.001"],
|
||||
"splunk_queries": generate_splunk_queries(),
|
||||
}
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(report, f, indent=2)
|
||||
print(f"[+] RC4 downgrades: {len(rc4_alerts)}")
|
||||
print(f"[+] Orphaned TGS: {len(orphan_alerts)}")
|
||||
print(f"[+] Lifetime anomalies: {len(lifetime_alerts)}")
|
||||
print(f"[+] krbtgt anomalies: {len(krbtgt_alerts)}")
|
||||
print(f"[+] Report saved to {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,44 @@
|
||||
---
|
||||
name: hunting-for-domain-fronting-c2-traffic
|
||||
description: Detect domain fronting C2 traffic by analyzing SNI vs HTTP Host header mismatches in proxy logs and TLS certificate discrepancies using pyOpenSSL for certificate inspection
|
||||
domain: cybersecurity
|
||||
subdomain: threat-hunting
|
||||
tags:
|
||||
- domain-fronting
|
||||
- c2-detection
|
||||
- tls-inspection
|
||||
- proxy-logs
|
||||
- pyopenssl
|
||||
- threat-hunting
|
||||
- network-security
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Hunting for Domain Fronting C2 Traffic
|
||||
|
||||
## Overview
|
||||
|
||||
Domain fronting (MITRE ATT&CK T1090.004) is a technique where attackers use different domain names in the TLS SNI field and the HTTP Host header to disguise C2 traffic behind legitimate CDN-hosted domains. This skill detects domain fronting by parsing proxy/web gateway logs for SNI-Host header mismatches, analyzing TLS certificates for CDN provider identification, flagging connections where the SNI points to a high-reputation domain but the Host header targets an attacker-controlled domain, and correlating with known CDN provider IP ranges.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Web proxy or secure web gateway logs with SNI and Host header fields
|
||||
- Python 3.8+ with pyOpenSSL and cryptography libraries
|
||||
- TLS inspection enabled on proxy for Host header visibility
|
||||
- CDN provider IP range lists (CloudFront, Azure CDN, Cloudflare)
|
||||
|
||||
## Steps
|
||||
|
||||
1. Parse proxy logs for connections with both SNI and Host header fields
|
||||
2. Compare SNI domain against HTTP Host header for mismatches
|
||||
3. Extract TLS certificate Subject and SAN fields using pyOpenSSL
|
||||
4. Identify CDN-hosted connections via certificate issuer and IP ranges
|
||||
5. Flag high-confidence domain fronting where SNI and Host differ on CDN IPs
|
||||
6. Score alerts based on domain reputation differential
|
||||
7. Generate detection report with network flow context
|
||||
|
||||
## Expected Output
|
||||
|
||||
JSON report containing detected domain fronting indicators with SNI-Host pairs, certificate details, CDN provider identification, confidence scores, and MITRE ATT&CK technique mapping.
|
||||
@@ -0,0 +1,78 @@
|
||||
# Domain Fronting C2 Traffic Detection API Reference
|
||||
|
||||
## Domain Fronting Mechanism
|
||||
|
||||
```
|
||||
TLS ClientHello: SNI = legitimate-cdn-domain.cloudfront.net
|
||||
HTTP Request: Host: attacker-c2-server.evil.com
|
||||
```
|
||||
|
||||
The CDN accepts the TLS connection based on SNI, then routes the HTTP request
|
||||
to the backend specified in the Host header. Network monitoring sees only the
|
||||
legitimate SNI domain.
|
||||
|
||||
## MITRE ATT&CK
|
||||
|
||||
| Technique | ID | Description |
|
||||
|---|---|---|
|
||||
| Proxy: Domain Fronting | T1090.004 | Route C2 through CDN using SNI/Host mismatch |
|
||||
|
||||
## CDN Provider Identification
|
||||
|
||||
### Certificate Issuers
|
||||
| CDN | Certificate CN Pattern |
|
||||
|---|---|
|
||||
| CloudFront | *.cloudfront.net |
|
||||
| Azure CDN | *.azureedge.net |
|
||||
| Cloudflare | sni.cloudflaressl.com |
|
||||
| Akamai | *.akamaiedge.net |
|
||||
| Fastly | *.fastly.net |
|
||||
|
||||
## Proxy Log Detection
|
||||
|
||||
### Squid Proxy Log Fields
|
||||
```
|
||||
timestamp src_ip CONNECT sni:443 -> status Host: host_header
|
||||
```
|
||||
|
||||
### Palo Alto Threat ID
|
||||
```
|
||||
Threat ID 86467: Domain fronting detected (SNI/Host mismatch)
|
||||
```
|
||||
|
||||
### Splunk Detection Query
|
||||
```spl
|
||||
index=proxy sourcetype=squid OR sourcetype=bluecoat
|
||||
| eval sni_root=mvindex(split(sni, "."), -2) + "." + mvindex(split(sni, "."), -1)
|
||||
| eval host_root=mvindex(split(host_header, "."), -2) + "." + mvindex(split(host_header, "."), -1)
|
||||
| where sni_root != host_root
|
||||
| stats count by sni, host_header, src_ip
|
||||
| sort -count
|
||||
```
|
||||
|
||||
## pyOpenSSL Certificate Inspection
|
||||
|
||||
```python
|
||||
from OpenSSL import crypto
|
||||
import ssl, socket
|
||||
|
||||
ctx = ssl.create_default_context()
|
||||
with ctx.wrap_socket(socket.socket(), server_hostname=hostname) as s:
|
||||
s.connect((hostname, 443))
|
||||
der_cert = s.getpeercert(True)
|
||||
|
||||
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, der_cert)
|
||||
subject_cn = x509.get_subject().CN
|
||||
issuer_cn = x509.get_issuer().CN
|
||||
|
||||
for i in range(x509.get_extension_count()):
|
||||
ext = x509.get_extension(i)
|
||||
if ext.get_short_name() == b"subjectAltName":
|
||||
print(str(ext)) # DNS:*.cloudfront.net, DNS:cloudfront.net
|
||||
```
|
||||
|
||||
## CLI Usage
|
||||
```bash
|
||||
python agent.py --proxy-log squid_access.csv --output fronting_report.json
|
||||
python agent.py --proxy-log logs.csv --check-certs
|
||||
```
|
||||
@@ -0,0 +1,173 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Detect domain fronting C2 traffic via SNI/Host header mismatch and TLS certificate analysis."""
|
||||
|
||||
import json
|
||||
import csv
|
||||
import ssl
|
||||
import socket
|
||||
import argparse
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
|
||||
try:
|
||||
from OpenSSL import crypto
|
||||
HAS_PYOPENSSL = True
|
||||
except ImportError:
|
||||
HAS_PYOPENSSL = False
|
||||
|
||||
|
||||
CDN_PROVIDERS = {
|
||||
"cloudfront.net": "Amazon CloudFront",
|
||||
"azureedge.net": "Azure CDN",
|
||||
"cloudflare.com": "Cloudflare",
|
||||
"akamaiedge.net": "Akamai",
|
||||
"fastly.net": "Fastly",
|
||||
"googleapis.com": "Google Cloud CDN",
|
||||
"azurefd.net": "Azure Front Door",
|
||||
}
|
||||
|
||||
|
||||
def load_proxy_logs(filepath):
|
||||
"""Load proxy logs CSV with columns: timestamp, src_ip, sni, host_header, dst_ip, dst_port, method, url, status."""
|
||||
records = []
|
||||
with open(filepath, newline="", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
records.append({
|
||||
"timestamp": row.get("timestamp", ""),
|
||||
"src_ip": row.get("src_ip", row.get("c-ip", "")),
|
||||
"sni": row.get("sni", row.get("cs-ssl-sni", "")).lower().strip(),
|
||||
"host_header": row.get("host_header", row.get("cs-host", "")).lower().strip(),
|
||||
"dst_ip": row.get("dst_ip", row.get("s-ip", "")),
|
||||
"dst_port": int(row.get("dst_port", row.get("s-port", "443"))),
|
||||
"method": row.get("method", row.get("cs-method", "")),
|
||||
"url": row.get("url", row.get("cs-uri-stem", "")),
|
||||
"status": row.get("status", row.get("sc-status", "")),
|
||||
"bytes": int(row.get("bytes", row.get("sc-bytes", "0"))),
|
||||
})
|
||||
return records
|
||||
|
||||
|
||||
def extract_domain_root(domain):
|
||||
"""Extract root domain from FQDN (e.g., sub.example.com -> example.com)."""
|
||||
parts = domain.rstrip(".").split(".")
|
||||
return ".".join(parts[-2:]) if len(parts) >= 2 else domain
|
||||
|
||||
|
||||
def identify_cdn_provider(domain):
|
||||
"""Check if a domain belongs to a known CDN provider."""
|
||||
for cdn_suffix, provider in CDN_PROVIDERS.items():
|
||||
if domain.endswith(cdn_suffix):
|
||||
return provider
|
||||
return None
|
||||
|
||||
|
||||
def detect_sni_host_mismatch(records):
|
||||
"""Detect connections where SNI and Host header point to different domains."""
|
||||
alerts = []
|
||||
for rec in records:
|
||||
sni = rec["sni"]
|
||||
host = rec["host_header"]
|
||||
if not sni or not host:
|
||||
continue
|
||||
sni_root = extract_domain_root(sni)
|
||||
host_root = extract_domain_root(host)
|
||||
if sni_root != host_root:
|
||||
cdn = identify_cdn_provider(sni) or identify_cdn_provider(host)
|
||||
confidence = "high" if cdn else "medium"
|
||||
alerts.append({
|
||||
"detection": "SNI/Host Header Mismatch",
|
||||
"mitre_technique": "T1090.004",
|
||||
"timestamp": rec["timestamp"],
|
||||
"src_ip": rec["src_ip"],
|
||||
"sni": sni,
|
||||
"host_header": host,
|
||||
"sni_root": sni_root,
|
||||
"host_root": host_root,
|
||||
"cdn_provider": cdn,
|
||||
"dst_ip": rec["dst_ip"],
|
||||
"confidence": confidence,
|
||||
"severity": "critical" if cdn else "high",
|
||||
"description": f"Domain fronting: SNI={sni} but Host={host}",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def get_tls_certificate_info(hostname, port=443, timeout=5):
|
||||
"""Retrieve TLS certificate details for a given hostname using pyOpenSSL."""
|
||||
if not HAS_PYOPENSSL:
|
||||
return {"error": "pyOpenSSL not installed"}
|
||||
try:
|
||||
ctx = ssl.create_default_context()
|
||||
with ctx.wrap_socket(socket.socket(), server_hostname=hostname) as s:
|
||||
s.settimeout(timeout)
|
||||
s.connect((hostname, port))
|
||||
der_cert = s.getpeercert(True)
|
||||
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, der_cert)
|
||||
subject = dict(x509.get_subject().get_components())
|
||||
issuer = dict(x509.get_issuer().get_components())
|
||||
san_list = []
|
||||
for i in range(x509.get_extension_count()):
|
||||
ext = x509.get_extension(i)
|
||||
if ext.get_short_name() == b"subjectAltName":
|
||||
san_list = [s.strip().replace("DNS:", "") for s in str(ext).split(",")]
|
||||
return {
|
||||
"subject_cn": subject.get(b"CN", b"").decode(),
|
||||
"issuer_cn": issuer.get(b"CN", b"").decode(),
|
||||
"issuer_o": issuer.get(b"O", b"").decode(),
|
||||
"san": san_list[:20],
|
||||
"not_before": str(x509.get_notBefore()),
|
||||
"not_after": str(x509.get_notAfter()),
|
||||
"serial": str(x509.get_serial_number()),
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def analyze_fronting_pairs(alerts):
|
||||
"""Aggregate and rank domain fronting pairs by frequency."""
|
||||
pair_counts = defaultdict(int)
|
||||
for a in alerts:
|
||||
pair_counts[(a["sni"], a["host_header"])] += 1
|
||||
ranked = sorted(pair_counts.items(), key=lambda x: -x[1])
|
||||
return [{"sni": p[0], "host": p[1], "count": c} for (p, c) in ranked[:20]]
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Domain Fronting C2 Traffic Hunter")
|
||||
parser.add_argument("--proxy-log", required=True, help="CSV proxy log with SNI and Host header fields")
|
||||
parser.add_argument("--check-certs", action="store_true", help="Fetch TLS certs for top fronting domains")
|
||||
parser.add_argument("--output", default="domain_fronting_report.json", help="Output report path")
|
||||
args = parser.parse_args()
|
||||
|
||||
records = load_proxy_logs(args.proxy_log)
|
||||
print(f"[+] Loaded {len(records)} proxy log entries")
|
||||
|
||||
alerts = detect_sni_host_mismatch(records)
|
||||
print(f"[+] Detected {len(alerts)} SNI/Host mismatches")
|
||||
|
||||
fronting_pairs = analyze_fronting_pairs(alerts)
|
||||
cert_info = {}
|
||||
if args.check_certs and fronting_pairs:
|
||||
for pair in fronting_pairs[:5]:
|
||||
cert_info[pair["sni"]] = get_tls_certificate_info(pair["sni"])
|
||||
|
||||
report = {
|
||||
"analysis_time": datetime.utcnow().isoformat() + "Z",
|
||||
"total_proxy_entries": len(records),
|
||||
"detections": alerts[:50],
|
||||
"total_mismatches": len(alerts),
|
||||
"fronting_pairs_ranked": fronting_pairs,
|
||||
"certificate_analysis": cert_info,
|
||||
"mitre_technique": "T1090.004",
|
||||
"cdn_involved": list({a["cdn_provider"] for a in alerts if a.get("cdn_provider")}),
|
||||
}
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(report, f, indent=2)
|
||||
print(f"[+] Top fronting pairs: {len(fronting_pairs)}")
|
||||
print(f"[+] Report saved to {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,43 @@
|
||||
---
|
||||
name: implementing-network-traffic-baselining
|
||||
description: Build network traffic baselines from NetFlow/IPFIX data using Python pandas for statistical analysis, z-score anomaly detection, and hourly/daily traffic pattern profiling
|
||||
domain: cybersecurity
|
||||
subdomain: network-security
|
||||
tags:
|
||||
- netflow
|
||||
- ipfix
|
||||
- traffic-analysis
|
||||
- baselining
|
||||
- anomaly-detection
|
||||
- pandas
|
||||
- network-monitoring
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Implementing Network Traffic Baselining
|
||||
|
||||
## Overview
|
||||
|
||||
Network traffic baselining establishes normal communication patterns by analyzing historical NetFlow/IPFIX data to create statistical profiles of expected behavior. This skill uses Python pandas to compute hourly and daily traffic distributions, per-host byte/packet counts, protocol ratios, and top-N talker profiles. Anomalies are detected using z-score thresholds and IQR (interquartile range) outlier methods, enabling SOC analysts to identify deviations such as data exfiltration spikes, beaconing patterns, and unusual port usage.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- NetFlow v5/v9 or IPFIX flow data exported as CSV or JSON
|
||||
- Python 3.8+ with pandas and numpy libraries
|
||||
- Historical flow data (minimum 7 days recommended for baseline)
|
||||
|
||||
## Steps
|
||||
|
||||
1. Ingest NetFlow/IPFIX records from CSV or JSON exports
|
||||
2. Compute hourly and daily traffic volume distributions (bytes, packets, flows)
|
||||
3. Build per-source-IP baseline profiles with mean, median, standard deviation
|
||||
4. Calculate protocol and port distribution baselines
|
||||
5. Apply z-score anomaly detection to identify statistical outliers
|
||||
6. Flag flows exceeding IQR-based thresholds as potential anomalies
|
||||
7. Generate baseline report with anomaly alerts
|
||||
|
||||
## Expected Output
|
||||
|
||||
JSON report containing traffic baselines (hourly/daily profiles), per-host statistics, detected anomalies with z-scores, and top talker rankings with deviation indicators.
|
||||
@@ -0,0 +1,75 @@
|
||||
# Network Traffic Baselining API Reference
|
||||
|
||||
## NetFlow/IPFIX CSV Format
|
||||
|
||||
### Expected Columns
|
||||
```
|
||||
timestamp,src_ip,dst_ip,src_port,dst_port,protocol,bytes,packets
|
||||
2024-01-15T08:30:00Z,10.0.1.5,203.0.113.10,54321,443,6,15234,42
|
||||
```
|
||||
|
||||
### Alternative Column Names (auto-mapped)
|
||||
```
|
||||
ts -> timestamp sa -> src_ip da -> dst_ip
|
||||
sp -> src_port dp -> dst_port pr -> protocol
|
||||
ibyt -> bytes ipkt -> packets
|
||||
```
|
||||
|
||||
### Protocol Numbers
|
||||
| Number | Protocol |
|
||||
|--------|----------|
|
||||
| 1 | ICMP |
|
||||
| 6 | TCP |
|
||||
| 17 | UDP |
|
||||
|
||||
## Pandas Analysis Functions
|
||||
|
||||
### Hourly Aggregation
|
||||
```python
|
||||
df["hour"] = df["timestamp"].dt.hour
|
||||
hourly = df.groupby("hour").agg(
|
||||
total_bytes=("bytes", "sum"),
|
||||
total_packets=("packets", "sum"),
|
||||
flow_count=("bytes", "count"),
|
||||
)
|
||||
```
|
||||
|
||||
### Z-Score Anomaly Detection
|
||||
```python
|
||||
mean = host_stats["total_bytes"].mean()
|
||||
std = host_stats["total_bytes"].std()
|
||||
host_stats["zscore"] = (host_stats["total_bytes"] - mean) / std
|
||||
anomalies = host_stats[host_stats["zscore"].abs() >= 3.0]
|
||||
```
|
||||
|
||||
### IQR Outlier Detection
|
||||
```python
|
||||
q1 = series.quantile(0.25)
|
||||
q3 = series.quantile(0.75)
|
||||
iqr = q3 - q1
|
||||
outliers = series[(series < q1 - 1.5 * iqr) | (series > q3 + 1.5 * iqr)]
|
||||
```
|
||||
|
||||
## NetFlow Export Tools
|
||||
|
||||
### nfdump CSV Export
|
||||
```bash
|
||||
nfdump -r nfcapd.202401 -o csv > flows.csv
|
||||
```
|
||||
|
||||
### SiLK rwcut Export
|
||||
```bash
|
||||
rwcut --fields=sIP,dIP,sPort,dPort,protocol,bytes,packets,sTime flows.rw > flows.csv
|
||||
```
|
||||
|
||||
### Elastic NetFlow to CSV
|
||||
```json
|
||||
GET netflow-*/_search
|
||||
{ "size": 10000, "query": { "range": { "@timestamp": { "gte": "now-7d" } } } }
|
||||
```
|
||||
|
||||
## CLI Usage
|
||||
```bash
|
||||
python agent.py --netflow-csv flows.csv --output baseline.json
|
||||
python agent.py --netflow-csv flows.csv --zscore-threshold 2.5 --scan-threshold 30
|
||||
```
|
||||
@@ -0,0 +1,169 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Network traffic baselining agent using pandas for NetFlow/IPFIX statistical analysis."""
|
||||
|
||||
import json
|
||||
import math
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
|
||||
def load_netflow_csv(filepath):
|
||||
"""Load NetFlow/IPFIX records from CSV export."""
|
||||
df = pd.read_csv(filepath, parse_dates=["timestamp"])
|
||||
required = {"timestamp", "src_ip", "dst_ip", "src_port", "dst_port", "protocol", "bytes", "packets"}
|
||||
missing = required - set(df.columns)
|
||||
if missing:
|
||||
alt_map = {"ts": "timestamp", "sa": "src_ip", "da": "dst_ip", "sp": "src_port",
|
||||
"dp": "dst_port", "pr": "protocol", "ibyt": "bytes", "ipkt": "packets"}
|
||||
df.rename(columns={k: v for k, v in alt_map.items() if k in df.columns}, inplace=True)
|
||||
print(f"[+] Loaded {len(df)} flow records from {filepath}")
|
||||
return df
|
||||
|
||||
|
||||
def compute_hourly_baseline(df):
|
||||
"""Compute hourly traffic volume baseline."""
|
||||
df["hour"] = df["timestamp"].dt.hour
|
||||
hourly = df.groupby("hour").agg(
|
||||
total_bytes=("bytes", "sum"),
|
||||
total_packets=("packets", "sum"),
|
||||
flow_count=("bytes", "count"),
|
||||
).reset_index()
|
||||
hourly["bytes_mean"] = hourly["total_bytes"] / max(df["timestamp"].dt.date.nunique(), 1)
|
||||
hourly["bytes_std"] = df.groupby("hour")["bytes"].std().values
|
||||
return hourly.to_dict(orient="records")
|
||||
|
||||
|
||||
def compute_host_baselines(df):
|
||||
"""Compute per-source-IP traffic baselines."""
|
||||
host_stats = df.groupby("src_ip").agg(
|
||||
total_bytes=("bytes", "sum"),
|
||||
total_packets=("packets", "sum"),
|
||||
flow_count=("bytes", "count"),
|
||||
unique_dst_ips=("dst_ip", "nunique"),
|
||||
unique_dst_ports=("dst_port", "nunique"),
|
||||
mean_bytes_per_flow=("bytes", "mean"),
|
||||
std_bytes_per_flow=("bytes", "std"),
|
||||
).reset_index()
|
||||
host_stats = host_stats.fillna(0)
|
||||
return host_stats
|
||||
|
||||
|
||||
def compute_protocol_baseline(df):
|
||||
"""Compute protocol distribution baseline."""
|
||||
proto_map = {6: "TCP", 17: "UDP", 1: "ICMP"}
|
||||
df["proto_name"] = df["protocol"].map(lambda x: proto_map.get(x, str(x)))
|
||||
proto_stats = df.groupby("proto_name").agg(
|
||||
flow_count=("bytes", "count"),
|
||||
total_bytes=("bytes", "sum"),
|
||||
).reset_index()
|
||||
total = proto_stats["flow_count"].sum()
|
||||
proto_stats["percentage"] = (proto_stats["flow_count"] / total * 100).round(2)
|
||||
return proto_stats.to_dict(orient="records")
|
||||
|
||||
|
||||
def detect_zscore_anomalies(df, host_baselines, threshold=3.0):
|
||||
"""Detect anomalous hosts using z-score on bytes transferred."""
|
||||
mean_bytes = host_baselines["total_bytes"].mean()
|
||||
std_bytes = host_baselines["total_bytes"].std()
|
||||
if std_bytes == 0:
|
||||
return []
|
||||
host_baselines["zscore"] = ((host_baselines["total_bytes"] - mean_bytes) / std_bytes).round(4)
|
||||
anomalies = host_baselines[host_baselines["zscore"].abs() >= threshold]
|
||||
alerts = []
|
||||
for _, row in anomalies.iterrows():
|
||||
alerts.append({
|
||||
"detection": "Z-Score Traffic Anomaly",
|
||||
"src_ip": row["src_ip"],
|
||||
"total_bytes": int(row["total_bytes"]),
|
||||
"zscore": float(row["zscore"]),
|
||||
"threshold": threshold,
|
||||
"flow_count": int(row["flow_count"]),
|
||||
"unique_destinations": int(row["unique_dst_ips"]),
|
||||
"severity": "critical" if abs(row["zscore"]) >= 5.0 else "high",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def detect_iqr_anomalies(df, host_baselines):
|
||||
"""Detect outlier hosts using IQR method on bytes per flow."""
|
||||
q1 = host_baselines["mean_bytes_per_flow"].quantile(0.25)
|
||||
q3 = host_baselines["mean_bytes_per_flow"].quantile(0.75)
|
||||
iqr = q3 - q1
|
||||
lower = q1 - 1.5 * iqr
|
||||
upper = q3 + 1.5 * iqr
|
||||
outliers = host_baselines[
|
||||
(host_baselines["mean_bytes_per_flow"] < lower) | (host_baselines["mean_bytes_per_flow"] > upper)
|
||||
]
|
||||
alerts = []
|
||||
for _, row in outliers.iterrows():
|
||||
alerts.append({
|
||||
"detection": "IQR Bytes-Per-Flow Outlier",
|
||||
"src_ip": row["src_ip"],
|
||||
"mean_bytes_per_flow": round(float(row["mean_bytes_per_flow"]), 2),
|
||||
"iqr_lower": round(float(lower), 2),
|
||||
"iqr_upper": round(float(upper), 2),
|
||||
"severity": "medium",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def detect_port_scan_pattern(df, threshold=50):
|
||||
"""Detect hosts connecting to an unusually high number of unique ports."""
|
||||
port_counts = df.groupby("src_ip")["dst_port"].nunique().reset_index()
|
||||
port_counts.columns = ["src_ip", "unique_ports"]
|
||||
scanners = port_counts[port_counts["unique_ports"] >= threshold]
|
||||
return [{"detection": "Port Scan Pattern", "src_ip": row["src_ip"],
|
||||
"unique_ports": int(row["unique_ports"]), "severity": "high"}
|
||||
for _, row in scanners.iterrows()]
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Network Traffic Baselining Agent")
|
||||
parser.add_argument("--netflow-csv", required=True, help="Path to NetFlow/IPFIX CSV export")
|
||||
parser.add_argument("--zscore-threshold", type=float, default=3.0, help="Z-score anomaly threshold")
|
||||
parser.add_argument("--scan-threshold", type=int, default=50, help="Port scan unique ports threshold")
|
||||
parser.add_argument("--output", default="traffic_baseline_report.json", help="Output report path")
|
||||
args = parser.parse_args()
|
||||
|
||||
df = load_netflow_csv(args.netflow_csv)
|
||||
hourly = compute_hourly_baseline(df)
|
||||
host_baselines = compute_host_baselines(df)
|
||||
protocol = compute_protocol_baseline(df)
|
||||
|
||||
zscore_alerts = detect_zscore_anomalies(df, host_baselines, args.zscore_threshold)
|
||||
iqr_alerts = detect_iqr_anomalies(df, host_baselines)
|
||||
scan_alerts = detect_port_scan_pattern(df, args.scan_threshold)
|
||||
|
||||
top_talkers = host_baselines.nlargest(10, "total_bytes")[["src_ip", "total_bytes", "flow_count"]].to_dict(orient="records")
|
||||
|
||||
report = {
|
||||
"analysis_time": datetime.utcnow().isoformat() + "Z",
|
||||
"total_flows": len(df),
|
||||
"date_range": {"start": str(df["timestamp"].min()), "end": str(df["timestamp"].max())},
|
||||
"baselines": {
|
||||
"hourly_profile": hourly,
|
||||
"protocol_distribution": protocol,
|
||||
"top_talkers": top_talkers,
|
||||
},
|
||||
"anomalies": {
|
||||
"zscore_anomalies": zscore_alerts,
|
||||
"iqr_outliers": iqr_alerts,
|
||||
"port_scan_patterns": scan_alerts,
|
||||
},
|
||||
"total_anomalies": len(zscore_alerts) + len(iqr_alerts) + len(scan_alerts),
|
||||
}
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(report, f, indent=2, default=str)
|
||||
print(f"[+] Z-score anomalies: {len(zscore_alerts)}")
|
||||
print(f"[+] IQR outliers: {len(iqr_alerts)}")
|
||||
print(f"[+] Port scan patterns: {len(scan_alerts)}")
|
||||
print(f"[+] Report saved to {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,43 @@
|
||||
---
|
||||
name: performing-osint-with-spiderfoot
|
||||
description: Automate OSINT collection using SpiderFoot REST API and CLI for target profiling, module-based reconnaissance, and structured result analysis across 200+ data sources
|
||||
domain: cybersecurity
|
||||
subdomain: threat-intelligence
|
||||
tags:
|
||||
- osint
|
||||
- spiderfoot
|
||||
- reconnaissance
|
||||
- threat-intelligence
|
||||
- attack-surface
|
||||
- target-profiling
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Performing OSINT with SpiderFoot
|
||||
|
||||
## Overview
|
||||
|
||||
SpiderFoot is an open-source OSINT automation tool with 200+ modules that integrates with data sources for threat intelligence and attack surface mapping. This skill uses the SpiderFoot REST API and CLI (sf.py/spiderfoot-cli) to create and manage scans, select modules by use case (footprint, investigate, passive), parse structured results for domains, IPs, email addresses, leaked credentials, and DNS records, and generate target intelligence profiles.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- SpiderFoot 4.0+ installed or SpiderFoot HX cloud account
|
||||
- Python 3.8+ with requests library
|
||||
- SpiderFoot server running on default port 5001
|
||||
- Optional: API keys for VirusTotal, Shodan, HaveIBeenPwned modules
|
||||
|
||||
## Steps
|
||||
|
||||
1. Connect to SpiderFoot REST API or use CLI interface
|
||||
2. Create a new scan with target specification (domain, IP, email, name)
|
||||
3. Select scan modules by use case (all, footprint, investigate, passive)
|
||||
4. Monitor scan progress via API polling
|
||||
5. Retrieve and parse scan results by data element type
|
||||
6. Extract key findings: subdomains, IPs, emails, leaked credentials
|
||||
7. Generate structured OSINT intelligence report
|
||||
|
||||
## Expected Output
|
||||
|
||||
JSON report containing OSINT findings organized by data type (domains, IPs, emails, credentials, DNS records), module source attribution, and target profile summary with risk indicators.
|
||||
@@ -0,0 +1,79 @@
|
||||
# SpiderFoot OSINT API Reference
|
||||
|
||||
## REST API Endpoints
|
||||
|
||||
### List Modules
|
||||
```
|
||||
GET /api/modules
|
||||
Response: [{"name": "sfp_dnsresolve", "descr": "...", "group": "Footprint", "provides": [...]}]
|
||||
```
|
||||
|
||||
### Start Scan
|
||||
```
|
||||
POST /api/startscan
|
||||
Content-Type: application/x-www-form-urlencoded
|
||||
|
||||
scanname=my-scan&scantarget=example.com&usecase=footprint
|
||||
Response: {"scanid": "abc123"}
|
||||
```
|
||||
|
||||
### Check Scan Status
|
||||
```
|
||||
GET /api/scanstatus/{scan_id}
|
||||
Response: {"status": "RUNNING"} # RUNNING, FINISHED, ABORTED, ERROR-FAILED
|
||||
```
|
||||
|
||||
### Get Scan Results
|
||||
```
|
||||
GET /api/scanresults/{scan_id}
|
||||
Response: [{"type": "INTERNET_NAME", "data": "sub.example.com", "module": "sfp_dnsresolve", "source": "example.com"}]
|
||||
```
|
||||
|
||||
### Delete Scan
|
||||
```
|
||||
GET /api/scandelete/{scan_id}
|
||||
```
|
||||
|
||||
### List Scans
|
||||
```
|
||||
GET /api/scanlist
|
||||
```
|
||||
|
||||
## Scan Use Cases
|
||||
| Use Case | Description |
|
||||
|---|---|
|
||||
| all | All modules (slowest, most comprehensive) |
|
||||
| footprint | Attack surface mapping: subdomains, IPs, ports |
|
||||
| investigate | Deep analysis: WHOIS, DNS, reputation checks |
|
||||
| passive | Passive only: no active probing of target |
|
||||
|
||||
## Data Element Types
|
||||
| Type | Description |
|
||||
|---|---|
|
||||
| INTERNET_NAME | Discovered domain/subdomain |
|
||||
| IP_ADDRESS | IP addresses |
|
||||
| EMAILADDR | Email addresses |
|
||||
| LEAKSITE_CONTENT | Leaked credentials/data |
|
||||
| DNS_TEXT | DNS TXT/MX/NS records |
|
||||
| LINKED_URL_INTERNAL | URLs on target domain |
|
||||
| CO_HOSTED_SITE | Sites sharing same IP |
|
||||
| AFFILIATE_INTERNET_NAME | Related domains |
|
||||
|
||||
## CLI Usage (sf.py)
|
||||
```bash
|
||||
# Start scan via CLI
|
||||
python sf.py -s example.com -t INTERNET_NAME,IP_ADDRESS -m sfp_dnsresolve,sfp_portscan_tcp
|
||||
|
||||
# Passive footprint
|
||||
python sf.py -s example.com -u passive
|
||||
|
||||
# List modules
|
||||
python sf.py -M
|
||||
```
|
||||
|
||||
## Agent CLI Usage
|
||||
```bash
|
||||
python agent.py --target example.com --use-case footprint --output report.json
|
||||
python agent.py --target 203.0.113.5 --use-case investigate --timeout 1200
|
||||
python agent.py --list-modules --server http://spiderfoot:5001
|
||||
```
|
||||
@@ -0,0 +1,176 @@
|
||||
#!/usr/bin/env python3
|
||||
"""OSINT automation agent using SpiderFoot REST API for target profiling and reconnaissance."""
|
||||
|
||||
import os
|
||||
import json
|
||||
import time
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
def get_sf_session(base_url):
|
||||
"""Create a requests session for SpiderFoot API."""
|
||||
session = requests.Session()
|
||||
session.headers.update({"Accept": "application/json"})
|
||||
session.base_url = base_url.rstrip("/")
|
||||
return session
|
||||
|
||||
|
||||
def list_modules(session):
|
||||
"""List available SpiderFoot modules."""
|
||||
resp = session.get(f"{session.base_url}/api/modules")
|
||||
resp.raise_for_status()
|
||||
modules = resp.json()
|
||||
return [{"name": m.get("name", ""), "descr": m.get("descr", ""),
|
||||
"group": m.get("group", ""), "provides": m.get("provides", [])}
|
||||
for m in modules]
|
||||
|
||||
|
||||
def list_scan_types(session):
|
||||
"""List available scan types (use cases)."""
|
||||
resp = session.get(f"{session.base_url}/api/scantypes")
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
|
||||
def start_scan(session, target, scan_name, use_case="all"):
|
||||
"""Start a new SpiderFoot scan via REST API."""
|
||||
data = {
|
||||
"scanname": scan_name,
|
||||
"scantarget": target,
|
||||
"usecase": use_case,
|
||||
}
|
||||
resp = session.post(f"{session.base_url}/api/startscan", data=data)
|
||||
resp.raise_for_status()
|
||||
result = resp.json()
|
||||
scan_id = result.get("scanid", result.get("id", ""))
|
||||
print(f"[+] Scan started: {scan_id} (target: {target}, use_case: {use_case})")
|
||||
return scan_id
|
||||
|
||||
|
||||
def get_scan_status(session, scan_id):
|
||||
"""Check scan status."""
|
||||
resp = session.get(f"{session.base_url}/api/scanstatus/{scan_id}")
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
|
||||
def wait_for_scan(session, scan_id, poll_interval=10, timeout=600):
|
||||
"""Poll scan status until completion or timeout."""
|
||||
elapsed = 0
|
||||
while elapsed < timeout:
|
||||
status = get_scan_status(session, scan_id)
|
||||
state = status.get("status", "")
|
||||
if state in ("FINISHED", "ABORTED", "ERROR-FAILED"):
|
||||
print(f"[+] Scan {scan_id} completed with status: {state}")
|
||||
return state
|
||||
print(f"[*] Scan status: {state} (elapsed: {elapsed}s)")
|
||||
time.sleep(poll_interval)
|
||||
elapsed += poll_interval
|
||||
print(f"[!] Scan timed out after {timeout}s")
|
||||
return "TIMEOUT"
|
||||
|
||||
|
||||
def get_scan_results(session, scan_id):
|
||||
"""Retrieve all results from a completed scan."""
|
||||
resp = session.get(f"{session.base_url}/api/scanresults/{scan_id}")
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
|
||||
def categorize_results(results):
|
||||
"""Categorize scan results by data element type."""
|
||||
categories = {
|
||||
"domains": [], "ips": [], "emails": [], "credentials": [],
|
||||
"dns_records": [], "urls": [], "hostnames": [], "other": [],
|
||||
}
|
||||
type_map = {
|
||||
"INTERNET_NAME": "domains", "IP_ADDRESS": "ips", "EMAILADDR": "emails",
|
||||
"LEAKSITE_CONTENT": "credentials", "DNS_TEXT": "dns_records",
|
||||
"LINKED_URL_INTERNAL": "urls", "LINKED_URL_EXTERNAL": "urls",
|
||||
"AFFILIATE_INTERNET_NAME": "hostnames", "CO_HOSTED_SITE": "hostnames",
|
||||
}
|
||||
for result in results:
|
||||
data_type = result.get("type", "")
|
||||
category = type_map.get(data_type, "other")
|
||||
entry = {
|
||||
"data": result.get("data", ""),
|
||||
"type": data_type,
|
||||
"module": result.get("module", ""),
|
||||
"source": result.get("source", ""),
|
||||
}
|
||||
categories[category].append(entry)
|
||||
return categories
|
||||
|
||||
|
||||
def generate_target_profile(target, categories):
|
||||
"""Generate structured OSINT profile from categorized results."""
|
||||
return {
|
||||
"target": target,
|
||||
"profile_time": datetime.utcnow().isoformat() + "Z",
|
||||
"summary": {
|
||||
"domains_found": len(categories["domains"]),
|
||||
"ips_found": len(categories["ips"]),
|
||||
"emails_found": len(categories["emails"]),
|
||||
"credentials_found": len(categories["credentials"]),
|
||||
"dns_records": len(categories["dns_records"]),
|
||||
"urls_found": len(categories["urls"]),
|
||||
"hostnames_found": len(categories["hostnames"]),
|
||||
"other_findings": len(categories["other"]),
|
||||
},
|
||||
"domains": [d["data"] for d in categories["domains"][:50]],
|
||||
"ips": [d["data"] for d in categories["ips"][:50]],
|
||||
"emails": [d["data"] for d in categories["emails"][:50]],
|
||||
"leaked_credentials": len(categories["credentials"]),
|
||||
"dns_records": [d["data"] for d in categories["dns_records"][:20]],
|
||||
"data_sources": list({r["module"] for cat in categories.values() for r in cat}),
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="SpiderFoot OSINT Agent")
|
||||
parser.add_argument("--target", required=True, help="Scan target (domain, IP, email, name)")
|
||||
parser.add_argument("--server", default=os.environ.get("SPIDERFOOT_URL", "http://127.0.0.1:5001"),
|
||||
help="SpiderFoot server URL")
|
||||
parser.add_argument("--use-case", choices=["all", "footprint", "investigate", "passive"],
|
||||
default="footprint", help="Scan use case")
|
||||
parser.add_argument("--scan-name", default="", help="Scan name (default: auto-generated)")
|
||||
parser.add_argument("--timeout", type=int, default=600, help="Scan timeout in seconds")
|
||||
parser.add_argument("--poll-interval", type=int, default=10, help="Status poll interval in seconds")
|
||||
parser.add_argument("--output", default="osint_report.json", help="Output report path")
|
||||
parser.add_argument("--list-modules", action="store_true", help="List available modules and exit")
|
||||
args = parser.parse_args()
|
||||
|
||||
session = get_sf_session(args.server)
|
||||
|
||||
if args.list_modules:
|
||||
modules = list_modules(session)
|
||||
for m in modules:
|
||||
print(f" {m['name']}: {m['descr']}")
|
||||
print(f"\n[+] Total modules: {len(modules)}")
|
||||
return
|
||||
|
||||
scan_name = args.scan_name or f"osint-{args.target}-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
|
||||
scan_id = start_scan(session, args.target, scan_name, args.use_case)
|
||||
final_status = wait_for_scan(session, scan_id, args.poll_interval, args.timeout)
|
||||
|
||||
results = get_scan_results(session, scan_id)
|
||||
print(f"[+] Retrieved {len(results)} results from scan")
|
||||
|
||||
categories = categorize_results(results)
|
||||
profile = generate_target_profile(args.target, categories)
|
||||
profile["scan_id"] = scan_id
|
||||
profile["scan_status"] = final_status
|
||||
profile["use_case"] = args.use_case
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(profile, f, indent=2)
|
||||
print(f"[+] Domains: {profile['summary']['domains_found']}, IPs: {profile['summary']['ips_found']}, "
|
||||
f"Emails: {profile['summary']['emails_found']}")
|
||||
print(f"[+] Report saved to {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user