feat: add 5 new cybersecurity skills - RDP brute force, Covenant C2, Calico network policies, heap spray analysis, T1098 hunting

This commit is contained in:
mukul975
2026-03-11 00:37:15 +01:00
parent 46d4f941ab
commit 74f8c11642
20 changed files with 1643 additions and 0 deletions
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: analyzing-heap-spray-exploitation
description: Detect and analyze heap spray attacks in memory dumps using Volatility3 plugins to identify NOP sled patterns, shellcode landing zones, and suspicious large allocations in process virtual address space.
domain: cybersecurity
subdomain: malware-analysis
tags: [malware-analysis, memory-forensics, heap-spray, volatility3, exploit-analysis]
version: "1.0"
author: mahipal
license: MIT
---
# Analyzing Heap Spray Exploitation
## Overview
Heap spraying is an exploitation technique that fills large regions of a process's heap with attacker-controlled data (typically NOP sleds followed by shellcode) to increase the reliability of code execution exploits. This skill covers detecting heap spray artifacts in memory dumps using Volatility3's malfind, vadinfo, and memmap plugins, identifying suspicious contiguous memory allocations, scanning for NOP sled patterns (0x90, 0x0c0c0c0c), and extracting embedded shellcode for analysis.
## Prerequisites
- Python 3.9+ with `volatility3` framework installed
- Memory dump file (.raw, .vmem, .dmp format)
- Understanding of virtual memory layout and VAD (Virtual Address Descriptor) trees
- Familiarity with common shellcode patterns and NOP sled encodings
## Steps
### Step 1: Identify Suspicious Processes
Use Volatility3 windows.malfind to scan for processes with executable injected memory regions.
### Step 2: Analyze VAD Entries
Examine VAD tree entries using windows.vadinfo for large contiguous allocations with RWX permissions.
### Step 3: Scan for NOP Sled Patterns
Search suspicious memory regions for NOP sled signatures (0x90 sequences, 0x0c0c0c0c patterns).
### Step 4: Extract and Analyze Shellcode
Dump suspicious memory regions and identify shellcode using byte pattern analysis.
## Expected Output
JSON report with suspicious processes, heap spray indicators, NOP sled locations, memory region sizes, and extracted shellcode hashes.
@@ -0,0 +1,55 @@
# API Reference: Analyzing Heap Spray Exploitation
## Volatility3 Plugins for Heap Spray Analysis
| Plugin | Command | Purpose |
|--------|---------|---------|
| malfind | `vol -f dump.raw windows.malfind` | Find injected executable memory regions |
| vadinfo | `vol -f dump.raw windows.vadinfo` | Virtual Address Descriptor details |
| memmap | `vol -f dump.raw windows.memmap --pid PID --dump` | Dump process memory to files |
| pslist | `vol -f dump.raw windows.pslist` | List running processes |
| handles | `vol -f dump.raw windows.handles --pid PID` | List process handles |
## Common Heap Spray NOP Sled Patterns
| Pattern | Hex | Description |
|---------|-----|-------------|
| x86 NOP | 0x90909090 | Classic NOP instruction |
| 0x0C landing | 0x0C0C0C0C | Common heap spray address target |
| 0x0D landing | 0x0D0D0D0D | Alternative spray address |
| 0x0A landing | 0x0A0A0A0A | Alternative spray address |
| 0x41 fill | 0x41414141 | "AAAA" padding fill |
## Shellcode Signatures
| Bytes | Mnemonic | Context |
|-------|----------|---------|
| FC E8 | CLD; CALL | Common shellcode prologue |
| 60 E8 | PUSHAD; CALL | Register-saving shellcode start |
| 31 C0 50 68 | XOR EAX; PUSH; PUSH | Stack setup for API call |
| E8 FF FF FF FF | CALL $+5 | Self-locating shellcode (GetPC) |
## Detection Thresholds
| Indicator | Threshold | Meaning |
|-----------|-----------|---------|
| Large allocation | >= 1 MB per region | Suspicious heap allocation |
| Total spray size | >= 50 MB per process | Strong heap spray indicator |
| NOP sled count | >= 20 repeated bytes | NOP sled detected |
| RWX permissions | PAGE_EXECUTE_READWRITE | Injected executable code |
## Install Volatility3
```bash
pip install volatility3
# Or from source:
git clone https://github.com/volatilityfoundation/volatility3.git
cd volatility3 && pip install -e .
```
## References
- Volatility3 GitHub: https://github.com/volatilityfoundation/volatility3
- Volatility3 malfind: https://volatility3.readthedocs.io/en/latest/
- Heap Spray Techniques: https://www.corelan.be/index.php/2011/12/31/exploit-writing-tutorial-part-11-heap-spraying-demystified/
- DFRWS 2025 Workshop: https://webdiis.unizar.es/~ricardo/dfrws-eu-25-workshop/
@@ -0,0 +1,219 @@
#!/usr/bin/env python3
"""Agent for analyzing heap spray exploitation in memory dumps.
Detects heap spray artifacts using Volatility3 by scanning for
NOP sled patterns, large contiguous allocations, and injected
executable regions in process virtual address space.
"""
# For authorized forensic analysis only
import argparse
import hashlib
import json
import os
import re
import subprocess
from collections import defaultdict
from datetime import datetime
from pathlib import Path
NOP_PATTERNS = {
"x86_nop": b"\x90" * 16,
"heap_spray_0c": b"\x0c" * 16,
"heap_spray_0d": b"\x0d" * 16,
"heap_spray_0a": b"\x0a" * 16,
"heap_spray_04": b"\x04" * 16,
"heap_spray_41": b"\x41" * 16,
}
SHELLCODE_MARKERS = [
b"\xfc\xe8", # CLD; CALL
b"\x60\xe8", # PUSHAD; CALL
b"\xeb\x10\x5a", # JMP SHORT; POP EDX
b"\x31\xc0\x50\x68", # XOR EAX; PUSH; PUSH
b"\xe8\xff\xff\xff\xff", # CALL $+5 (self-locating)
]
SUSPICIOUS_ALLOC_THRESHOLD = 0x100000 # 1 MB
class HeapSprayAnalyzer:
"""Detects heap spray exploitation artifacts in memory dumps."""
def __init__(self, memory_dump, output_dir="./heap_spray_analysis"):
self.memory_dump = memory_dump
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.findings = []
def _run_vol3(self, plugin, extra_args=None):
"""Run a Volatility3 plugin and return stdout."""
cmd = ["vol", "-f", self.memory_dump, plugin]
if extra_args:
cmd.extend(extra_args)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
return result.stdout
except (FileNotFoundError, subprocess.TimeoutExpired):
return ""
def run_malfind(self):
"""Run windows.malfind to detect injected executable memory."""
output = self._run_vol3("windows.malfind")
entries = []
current = {}
for line in output.splitlines():
parts = line.split()
if len(parts) >= 6 and parts[0].isdigit():
if current:
entries.append(current)
current = {
"pid": int(parts[0]),
"process": parts[1],
"start_addr": parts[2],
"end_addr": parts[3],
"protection": parts[5] if len(parts) > 5 else "",
}
elif current and line.strip().startswith("0x"):
hex_match = re.findall(r"[0-9a-fA-F]{2}", line.split(" ")[0] if " " in line else line)
if "hex_bytes" not in current:
current["hex_bytes"] = ""
current["hex_bytes"] += "".join(hex_match)
if current:
entries.append(current)
return entries
def run_vadinfo(self):
"""Run windows.vadinfo to find large suspicious allocations."""
output = self._run_vol3("windows.vadinfo")
large_allocs = []
for line in output.splitlines():
parts = line.split()
if len(parts) >= 5 and parts[0].isdigit():
try:
pid = int(parts[0])
start = int(parts[2], 16) if parts[2].startswith("0x") else 0
end = int(parts[3], 16) if parts[3].startswith("0x") else 0
size = end - start
if size >= SUSPICIOUS_ALLOC_THRESHOLD:
large_allocs.append({
"pid": pid, "process": parts[1],
"start": hex(start), "end": hex(end),
"size_bytes": size, "size_mb": round(size / (1024 * 1024), 2),
})
except (ValueError, IndexError):
continue
return large_allocs
def scan_dump_for_patterns(self, dump_path):
"""Scan a memory dump file for NOP sled and shellcode patterns."""
matches = {"nop_sleds": [], "shellcode_markers": []}
try:
with open(dump_path, "rb") as f:
data = f.read()
except (FileNotFoundError, PermissionError):
return matches
for name, pattern in NOP_PATTERNS.items():
offset = 0
count = 0
while True:
idx = data.find(pattern, offset)
if idx == -1:
break
count += 1
offset = idx + len(pattern)
if count > 100:
break
if count > 0:
matches["nop_sleds"].append({"pattern": name, "occurrences": count})
for marker in SHELLCODE_MARKERS:
idx = data.find(marker)
if idx != -1:
context = data[idx:idx + 64]
matches["shellcode_markers"].append({
"offset": hex(idx),
"bytes": context.hex()[:128],
"sha256": hashlib.sha256(context).hexdigest(),
})
return matches
def dump_process_memory(self, pid):
"""Dump a process's memory using Volatility3 memmap."""
dump_dir = self.output_dir / f"pid_{pid}"
dump_dir.mkdir(exist_ok=True)
self._run_vol3("windows.memmap", ["--pid", str(pid), "--dump",
"--output-dir", str(dump_dir)])
dumps = list(dump_dir.glob("*.dmp"))
return [str(d) for d in dumps]
def analyze(self):
"""Run full heap spray analysis pipeline."""
malfind_results = self.run_malfind()
large_allocs = self.run_vadinfo()
spray_candidates = defaultdict(list)
for alloc in large_allocs:
spray_candidates[alloc["pid"]].append(alloc)
for pid, allocs in spray_candidates.items():
total_mb = sum(a["size_mb"] for a in allocs)
if total_mb > 50:
self.findings.append({
"severity": "high", "type": "Heap Spray Indicator",
"detail": f"PID {pid}: {total_mb:.1f} MB in {len(allocs)} large allocations",
})
for entry in malfind_results:
hex_bytes = entry.get("hex_bytes", "")
if hex_bytes.count("90") > 20 or hex_bytes.count("0c") > 20:
self.findings.append({
"severity": "critical", "type": "NOP Sled in Injected Region",
"detail": f"PID {entry['pid']} ({entry['process']}): "
f"NOP sled at {entry['start_addr']}",
})
return {
"malfind_entries": malfind_results,
"large_allocations": large_allocs,
"spray_candidate_pids": list(spray_candidates.keys()),
}
def generate_report(self):
analysis = self.analyze()
report = {
"report_date": datetime.utcnow().isoformat(),
"memory_dump": self.memory_dump,
"malfind_count": len(analysis["malfind_entries"]),
"large_allocation_count": len(analysis["large_allocations"]),
**analysis,
"findings": self.findings,
"total_findings": len(self.findings),
}
out = self.output_dir / "heap_spray_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2, default=str)
print(json.dumps(report, indent=2, default=str))
return report
def main():
parser = argparse.ArgumentParser(
description="Analyze memory dumps for heap spray exploitation artifacts"
)
parser.add_argument("memory_dump", help="Path to memory dump file (.raw, .vmem, .dmp)")
parser.add_argument("--output-dir", default="./heap_spray_analysis",
help="Output directory for report and dumps")
parser.add_argument("--alloc-threshold", type=int, default=0x100000,
help="Minimum allocation size in bytes to flag (default: 1MB)")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
analyzer = HeapSprayAnalyzer(args.memory_dump, output_dir=args.output_dir)
analyzer.generate_report()
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,51 @@
---
name: detecting-rdp-brute-force-attacks
description: Detect RDP brute force attacks by analyzing Windows Security Event Logs for failed authentication patterns (Event ID 4625), successful logons after failures (Event ID 4624), NLA failures, and source IP frequency analysis.
domain: cybersecurity
subdomain: threat-detection
tags: [threat-detection, rdp, brute-force, windows-event-logs, blue-team, siem]
version: "1.0"
author: mahipal
license: MIT
---
# Detecting RDP Brute Force Attacks
## Overview
RDP brute force attacks target Windows Remote Desktop Protocol services by attempting rapid credential guessing against exposed RDP endpoints. Detection relies on analyzing Windows Security Event Logs for Event ID 4625 (failed logon with Logon Type 10 or 3) and correlating with Event ID 4624 (successful logon) to identify compromised accounts. This skill covers parsing EVTX files with python-evtx, identifying attack patterns through source IP frequency analysis, detecting NLA bypass attempts, and generating actionable detection reports.
## Prerequisites
- Python 3.9+ with `python-evtx`, `lxml` libraries
- Windows Security EVTX log files (exported from Event Viewer or collected via WEF)
- Understanding of Windows authentication Event IDs (4624, 4625, 4776)
- Familiarity with RDP Logon Types (Type 3 for NLA, Type 10 for RemoteInteractive)
## Steps
### Step 1: Export Security Event Logs
Export Windows Security logs to EVTX format using Event Viewer or wevtutil:
```
wevtutil epl Security C:\logs\security.evtx
```
### Step 2: Parse Failed Logon Events
Use python-evtx to parse Event ID 4625 entries, extracting source IP, target username, failure reason (Sub Status), and Logon Type fields.
### Step 3: Analyze Attack Patterns
Identify brute force patterns by:
- Counting failed logons per source IP within time windows
- Detecting username spray attacks (many usernames from one IP)
- Correlating 4625 failures with subsequent 4624 success from same IP
### Step 4: Generate Detection Report
Produce a JSON report with top attacking IPs, targeted accounts, time-based analysis, and compromise indicators.
## Expected Output
JSON report containing:
- Total failed logon events and unique source IPs
- Top attacking IPs ranked by failure count
- Targeted usernames and failure sub-status codes
- Successful logons following brute force attempts (potential compromises)
- Time-series analysis of attack intensity
@@ -0,0 +1,68 @@
# API Reference: Detecting RDP Brute Force Attacks
## Windows Security Event IDs
| Event ID | Description | Key Fields |
|----------|-------------|------------|
| 4625 | Failed logon attempt | TargetUserName, IpAddress, SubStatus, LogonType |
| 4624 | Successful logon | TargetUserName, IpAddress, LogonType |
| 4776 | NTLM credential validation | TargetUserName, Workstation, Status |
| 4771 | Kerberos pre-auth failed | TargetUserName, IpAddress, Status |
## Logon Types for RDP
| Type | Name | Context |
|------|------|---------|
| 3 | Network | RDP with NLA enabled (pre-auth) |
| 10 | RemoteInteractive | RDP session after NLA |
## Failure Sub-Status Codes
| Sub-Status | Meaning |
|------------|---------|
| 0xC0000064 | User does not exist |
| 0xC000006A | Wrong password |
| 0xC0000234 | Account locked out |
| 0xC0000072 | Account disabled |
| 0xC0000193 | Account expired |
| 0xC0000071 | Password expired |
## python-evtx Library Usage
```python
import Evtx.Evtx as evtx
with evtx.Evtx("Security.evtx") as log:
for record in log.records():
xml_str = record.xml()
```
Install: `pip install python-evtx lxml`
## wevtutil Export Commands
```bash
# Export Security log to EVTX
wevtutil epl Security C:\logs\security.evtx
# Query failed RDP logons
wevtutil qe Security /q:"*[System[(EventID=4625)] and EventData[Data[@Name='LogonType']='10']]" /f:text
# Count recent failed logons
wevtutil qe Security /q:"*[System[(EventID=4625)]]" /c:100 /rd:true /f:text
```
## Detection Thresholds
| Pattern | Threshold | Indicator |
|---------|-----------|-----------|
| Brute force | >10 failures/IP in 15 min | Single-target credential guessing |
| Password spray | >5 unique users/IP | Multi-user single-password attack |
| Compromise | 4625 followed by 4624 from same IP | Successful brute force |
## References
- Microsoft Event 4625: https://learn.microsoft.com/en-us/windows/security/threat-protection/auditing/event-4625
- Microsoft Event 4624: https://learn.microsoft.com/en-us/windows/security/threat-protection/auditing/event-4624
- python-evtx: https://github.com/williballenthin/python-evtx
- LogonTracer: https://github.com/JPCERTCC/LogonTracer
@@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""Agent for detecting RDP brute force attacks from Windows Event Logs.
Parses EVTX files for Event ID 4625 (failed logon) and 4624
(successful logon) to identify brute force patterns, source IP
frequency, username spraying, and post-compromise indicators.
"""
# For authorized security monitoring and blue team use only
import argparse
import json
import os
import xml.etree.ElementTree as ET
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
try:
import Evtx.Evtx as evtx
except ImportError:
evtx = None
NS = "{http://schemas.microsoft.com/win/2004/08/events/event}"
FAILURE_SUBSTATUS = {
"0xc0000064": "user_does_not_exist",
"0xc000006a": "wrong_password",
"0xc0000072": "account_disabled",
"0xc000006f": "logon_outside_hours",
"0xc0000070": "workstation_restriction",
"0xc0000071": "password_expired",
"0xc0000234": "account_locked",
"0xc0000193": "account_expired",
}
RDP_LOGON_TYPES = {"3", "10"}
def _extract_event_data(xml_str):
"""Extract structured fields from a single EVTX record XML."""
root = ET.fromstring(xml_str)
sys_node = root.find(f"{NS}System")
event_id = sys_node.find(f"{NS}EventID").text if sys_node is not None else None
time_created = None
tc = sys_node.find(f"{NS}TimeCreated") if sys_node is not None else None
if tc is not None:
time_created = tc.get("SystemTime", "")
data = {}
for d in root.iter(f"{NS}Data"):
name = d.get("Name", "")
data[name] = d.text or ""
return event_id, time_created, data
class RDPBruteForceDetector:
"""Detects RDP brute force attacks from Windows Security EVTX logs."""
def __init__(self, evtx_path, threshold=10, time_window_minutes=15,
output_dir="./rdp_brute_force_report"):
self.evtx_path = evtx_path
self.threshold = threshold
self.time_window = time_window_minutes
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.failed_logons = []
self.successful_logons = []
def parse_events(self):
"""Parse EVTX file and extract logon events."""
if evtx is None:
raise RuntimeError("python-evtx not installed: pip install python-evtx")
with evtx.Evtx(self.evtx_path) as log:
for record in log.records():
try:
xml_str = record.xml()
except Exception:
continue
event_id, ts, data = _extract_event_data(xml_str)
logon_type = data.get("LogonType", "")
if event_id == "4625" and logon_type in RDP_LOGON_TYPES:
self.failed_logons.append({
"timestamp": ts,
"source_ip": data.get("IpAddress", "-"),
"target_user": data.get("TargetUserName", ""),
"target_domain": data.get("TargetDomainName", ""),
"sub_status": data.get("SubStatus", "").lower(),
"logon_type": logon_type,
})
elif event_id == "4624" and logon_type in RDP_LOGON_TYPES:
self.successful_logons.append({
"timestamp": ts,
"source_ip": data.get("IpAddress", "-"),
"target_user": data.get("TargetUserName", ""),
"logon_type": logon_type,
})
def analyze(self):
"""Analyze parsed events for brute force patterns."""
ip_counter = Counter(e["source_ip"] for e in self.failed_logons)
user_counter = Counter(e["target_user"] for e in self.failed_logons)
substatus_counter = Counter(
FAILURE_SUBSTATUS.get(e["sub_status"], e["sub_status"])
for e in self.failed_logons
)
brute_force_ips = {ip: cnt for ip, cnt in ip_counter.items()
if cnt >= self.threshold and ip != "-"}
success_ips = {e["source_ip"] for e in self.successful_logons}
compromised = []
for ip in brute_force_ips:
if ip in success_ips:
user_list = [e["target_user"] for e in self.successful_logons
if e["source_ip"] == ip]
compromised.append({"ip": ip, "users": list(set(user_list)),
"failed_attempts": brute_force_ips[ip]})
users_per_ip = defaultdict(set)
for e in self.failed_logons:
users_per_ip[e["source_ip"]].add(e["target_user"])
spray_ips = {ip: len(users) for ip, users in users_per_ip.items()
if len(users) >= 5}
return {
"total_failed_logons": len(self.failed_logons),
"total_successful_logons": len(self.successful_logons),
"unique_source_ips": len(ip_counter),
"top_attacking_ips": ip_counter.most_common(20),
"targeted_users": user_counter.most_common(20),
"failure_reasons": dict(substatus_counter),
"brute_force_ips": brute_force_ips,
"password_spray_ips": spray_ips,
"potential_compromises": compromised,
}
def generate_report(self):
"""Parse events and generate full detection report."""
self.parse_events()
analysis = self.analyze()
report = {
"report_date": datetime.utcnow().isoformat(),
"evtx_file": str(self.evtx_path),
"detection_threshold": self.threshold,
**analysis,
}
out = self.output_dir / "rdp_brute_force_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2, default=str)
print(json.dumps(report, indent=2, default=str))
return report
def main():
parser = argparse.ArgumentParser(
description="Detect RDP brute force attacks from Windows Security EVTX logs"
)
parser.add_argument("evtx_file", help="Path to Security.evtx log file")
parser.add_argument("--threshold", type=int, default=10,
help="Min failed logons per IP to flag as brute force (default: 10)")
parser.add_argument("--output-dir", default="./rdp_brute_force_report",
help="Output directory for report")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
detector = RDPBruteForceDetector(args.evtx_file, args.threshold,
output_dir=args.output_dir)
detector.generate_report()
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: hunting-for-t1098-account-manipulation
description: Hunt for MITRE ATT&CK T1098 account manipulation including shadow admin creation, SID history injection, group membership changes, and credential modifications using Windows Security Event Logs.
domain: cybersecurity
subdomain: threat-hunting
tags: [threat-hunting, mitre-attack, t1098, account-manipulation, active-directory, persistence]
version: "1.0"
author: mahipal
license: MIT
---
# Hunting for T1098 Account Manipulation
## Overview
MITRE ATT&CK T1098 (Account Manipulation) covers adversary actions to maintain or expand access to compromised accounts, including adding credentials, modifying group memberships, SID history injection, and creating shadow admin accounts. This skill covers detecting these techniques through Windows Security Event Log analysis (Event IDs 4738, 4728, 4732, 4756, 4670, 5136), correlating group membership changes with privilege escalation indicators, and identifying anomalous account modification patterns.
## Prerequisites
- Windows Security Event Logs (EVTX format) or SIEM access
- Python 3.9+ with `python-evtx`, `lxml` libraries
- Understanding of Active Directory group structure and SID architecture
- Familiarity with MITRE ATT&CK T1098 sub-techniques
## Steps
### Step 1: Parse Account Modification Events
Extract Event IDs 4738 (user account changed), 4728/4732/4756 (member added to security groups), and 5136 (directory service object modified).
### Step 2: Detect Privileged Group Changes
Flag additions to Domain Admins, Enterprise Admins, Schema Admins, Administrators, and Backup Operators groups.
### Step 3: Identify Shadow Admin Indicators
Detect accounts receiving AdminSDHolder protection, direct privilege assignment, or SID history injection.
### Step 4: Correlate with Attack Timeline
Cross-reference account changes with authentication events to identify initial compromise and persistence establishment.
## Expected Output
JSON report with detected account manipulation events, privileged group changes, shadow admin indicators, and timeline correlation.
@@ -0,0 +1,68 @@
# API Reference: Hunting for T1098 Account Manipulation
## Key Windows Security Event IDs
| Event ID | Description | T1098 Relevance |
|----------|-------------|-----------------|
| 4738 | User account changed | Account property modification |
| 4728 | Member added to global security group | Domain Admins, etc. |
| 4732 | Member added to local security group | Administrators, etc. |
| 4756 | Member added to universal security group | Enterprise Admins |
| 4729 | Member removed from global security group | Evidence cleanup |
| 4670 | Permissions on object changed | ACL manipulation |
| 5136 | Directory service object modified | SID History, SPNs |
| 4724 | Password reset attempted | Credential takeover |
| 4781 | Account name changed | Account rename evasion |
## Privileged Groups to Monitor
| Group | Risk | Impact |
|-------|------|--------|
| Domain Admins | Critical | Full domain control |
| Enterprise Admins | Critical | Forest-wide admin |
| Schema Admins | Critical | AD schema modification |
| Administrators | High | Local admin on DCs |
| Backup Operators | High | Read any file, backup SAM |
| DNS Admins | High | DLL injection on DCs |
| Account Operators | Medium | Create/modify accounts |
## Sensitive AD Attributes
| Attribute | Attack Technique |
|-----------|-----------------|
| SIDHistory | T1134.005 - SID History Injection |
| servicePrincipalName | T1558.003 - Kerberoasting setup |
| msDS-AllowedToDelegateTo | Constrained delegation abuse |
| msDS-AllowedToActOnBehalfOfOtherIdentity | RBCD attack |
| AdminCount | AdminSDHolder persistence |
| userAccountControl | Account flag manipulation |
## python-evtx Parsing
```python
import Evtx.Evtx as evtx
with evtx.Evtx("Security.evtx") as log:
for record in log.records():
xml_str = record.xml()
# Parse XML to extract EventID and Data fields
```
Install: `pip install python-evtx lxml`
## MITRE T1098 Sub-Techniques
| ID | Name | Key Indicator |
|----|------|---------------|
| T1098.001 | Additional Cloud Credentials | New keys/certs added |
| T1098.002 | Additional Email Delegate Access | Mailbox permission grants |
| T1098.003 | Additional Cloud Roles | Role assignment changes |
| T1098.004 | SSH Authorized Keys | authorized_keys modification |
| T1098.005 | Device Registration | Rogue device enrollment |
## References
- MITRE T1098: https://attack.mitre.org/techniques/T1098/
- CISA Eviction Guide: https://www.cisa.gov/eviction-strategies-tool/info-attack/T1098
- Windows Event Mapping: https://www.socinvestigation.com/mapping-mitre-attck-with-window-event-log-ids/
- python-evtx: https://github.com/williballenthin/python-evtx
@@ -0,0 +1,223 @@
#!/usr/bin/env python3
"""Agent for hunting MITRE ATT&CK T1098 account manipulation.
Detects shadow admin creation, SID history injection, privileged
group membership changes, and credential modifications by parsing
Windows Security Event Logs for key event IDs.
"""
# For authorized threat hunting and blue team use only
import argparse
import json
import os
import xml.etree.ElementTree as ET
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
try:
import Evtx.Evtx as evtx
except ImportError:
evtx = None
NS = "{http://schemas.microsoft.com/win/2004/08/events/event}"
T1098_EVENT_IDS = {
"4738": "User account changed",
"4728": "Member added to global security group",
"4732": "Member added to local security group",
"4756": "Member added to universal security group",
"4729": "Member removed from global security group",
"4733": "Member removed from local security group",
"4670": "Permissions on object changed",
"5136": "Directory service object modified",
"4724": "Password reset attempted",
"4723": "Password change attempted",
"4781": "Account name changed",
}
PRIVILEGED_GROUPS = {
"domain admins", "enterprise admins", "schema admins",
"administrators", "backup operators", "server operators",
"account operators", "print operators", "dns admins",
"group policy creator owners", "remote desktop users",
}
SENSITIVE_ATTRIBUTES = {
"sidhistory", "serviceprincipalname", "msds-allowedtodelegateto",
"msds-allowedtoactonbehalfofotheridentity", "admincount",
"useraccountcontrol", "primarygroupid",
}
def _parse_event(xml_str):
"""Extract event ID, timestamp, and data fields from event XML."""
root = ET.fromstring(xml_str)
sys_node = root.find(f"{NS}System")
event_id = sys_node.find(f"{NS}EventID").text if sys_node is not None else None
tc = sys_node.find(f"{NS}TimeCreated") if sys_node is not None else None
timestamp = tc.get("SystemTime", "") if tc is not None else ""
data = {}
for d in root.iter(f"{NS}Data"):
name = d.get("Name", "")
data[name] = d.text or ""
return event_id, timestamp, data
class T1098HuntingAgent:
"""Hunts for T1098 account manipulation in Windows Event Logs."""
def __init__(self, evtx_path, output_dir="./t1098_hunt"):
self.evtx_path = evtx_path
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.events = []
self.findings = []
def parse_events(self):
"""Parse EVTX for T1098-relevant event IDs."""
if evtx is None:
raise RuntimeError("python-evtx required: pip install python-evtx")
with evtx.Evtx(self.evtx_path) as log:
for record in log.records():
try:
xml_str = record.xml()
except Exception:
continue
event_id, ts, data = _parse_event(xml_str)
if event_id in T1098_EVENT_IDS:
self.events.append({
"event_id": event_id,
"description": T1098_EVENT_IDS[event_id],
"timestamp": ts,
"subject_user": data.get("SubjectUserName", ""),
"subject_domain": data.get("SubjectDomainName", ""),
"target_user": data.get("TargetUserName", data.get("MemberName", "")),
"target_domain": data.get("TargetDomainName", ""),
"group_name": data.get("TargetUserName", ""),
"member_sid": data.get("MemberSid", ""),
"attribute_name": data.get("AttributeLDAPDisplayName", ""),
"attribute_value": data.get("AttributeValue", "")[:200],
})
def detect_privileged_group_changes(self):
"""Detect additions to privileged security groups."""
group_add_ids = {"4728", "4732", "4756"}
alerts = []
for event in self.events:
if event["event_id"] in group_add_ids:
group = event["group_name"].lower()
if any(pg in group for pg in PRIVILEGED_GROUPS):
alerts.append(event)
self.findings.append({
"severity": "critical",
"type": "Privileged Group Addition",
"detail": f"{event['subject_user']} added {event['target_user']} "
f"to '{event['group_name']}' at {event['timestamp']}",
"mitre": "T1098.001",
})
return alerts
def detect_sid_history_injection(self):
"""Detect SID History modifications via directory service changes."""
alerts = []
for event in self.events:
if event["event_id"] == "5136":
attr = event["attribute_name"].lower()
if attr == "sidhistory":
alerts.append(event)
self.findings.append({
"severity": "critical",
"type": "SID History Injection",
"detail": f"SID History modified on {event['target_user']} "
f"by {event['subject_user']} at {event['timestamp']}",
"mitre": "T1134.005",
})
return alerts
def detect_sensitive_attribute_changes(self):
"""Detect changes to sensitive AD attributes."""
alerts = []
for event in self.events:
if event["event_id"] == "5136":
attr = event["attribute_name"].lower()
if attr in SENSITIVE_ATTRIBUTES:
alerts.append(event)
self.findings.append({
"severity": "high",
"type": "Sensitive Attribute Modified",
"detail": f"Attribute '{event['attribute_name']}' changed on "
f"{event['target_user']} by {event['subject_user']}",
"mitre": "T1098",
})
return alerts
def detect_shadow_admin(self):
"""Detect potential shadow admin creation patterns."""
alerts = []
admin_adds = [e for e in self.events
if e["event_id"] in ("4728", "4732", "4756") and
any(pg in e["group_name"].lower() for pg in PRIVILEGED_GROUPS)]
account_changes = [e for e in self.events if e["event_id"] == "4738"]
changed_accounts = {e["target_user"].lower() for e in account_changes}
for add_event in admin_adds:
target = add_event["target_user"].lower()
if target in changed_accounts:
alerts.append(add_event)
self.findings.append({
"severity": "critical",
"type": "Shadow Admin Indicator",
"detail": f"Account '{add_event['target_user']}' modified and then "
f"added to '{add_event['group_name']}'",
"mitre": "T1098",
})
return alerts
def generate_report(self):
self.parse_events()
priv_group = self.detect_privileged_group_changes()
sid_history = self.detect_sid_history_injection()
sensitive = self.detect_sensitive_attribute_changes()
shadow = self.detect_shadow_admin()
event_summary = Counter(e["event_id"] for e in self.events)
actor_summary = Counter(e["subject_user"] for e in self.events if e["subject_user"])
report = {
"report_date": datetime.utcnow().isoformat(),
"evtx_file": str(self.evtx_path),
"mitre_technique": "T1098 - Account Manipulation",
"total_t1098_events": len(self.events),
"event_id_summary": dict(event_summary),
"top_actors": actor_summary.most_common(10),
"privileged_group_changes": len(priv_group),
"sid_history_injections": len(sid_history),
"sensitive_attr_changes": len(sensitive),
"shadow_admin_indicators": len(shadow),
"findings": self.findings,
"total_findings": len(self.findings),
}
out = self.output_dir / "t1098_hunt_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2, default=str)
print(json.dumps(report, indent=2, default=str))
return report
def main():
parser = argparse.ArgumentParser(
description="Hunt for MITRE T1098 account manipulation in Windows Event Logs"
)
parser.add_argument("evtx_file", help="Path to Security.evtx log file")
parser.add_argument("--output-dir", default="./t1098_hunt",
help="Output directory for hunt report")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
agent = T1098HuntingAgent(args.evtx_file, output_dir=args.output_dir)
agent.generate_report()
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: implementing-container-network-policies-with-calico
description: Enforce Kubernetes network segmentation using Calico CNI network policies and global network policies to control pod-to-pod traffic, restrict egress, and implement zero-trust microsegmentation.
domain: cybersecurity
subdomain: container-security
tags: [container-security, kubernetes, calico, network-policy, microsegmentation, cni]
version: "1.0"
author: mahipal
license: MIT
---
# Implementing Container Network Policies with Calico
## Overview
Calico provides Kubernetes-native and extended network policy enforcement through its CNI plugin. This skill covers creating and auditing Calico NetworkPolicy and GlobalNetworkPolicy resources to implement pod-to-pod traffic control, namespace isolation, egress restrictions, and DNS-based policy rules using calicoctl and the Kubernetes API.
## Prerequisites
- Kubernetes cluster with Calico CNI installed
- Python 3.9+ with `kubernetes` client library
- calicoctl CLI tool installed and configured
- kubectl access with RBAC permissions for network policy management
## Steps
### Step 1: Audit Existing Network Policies
Use calicoctl and kubectl to inventory current network policies and identify unprotected namespaces.
### Step 2: Implement Default-Deny Policies
Create default-deny ingress and egress policies per namespace as a zero-trust baseline.
### Step 3: Create Workload-Specific Allow Rules
Define granular allow rules for legitimate pod-to-pod and pod-to-service communication.
### Step 4: Validate Policy Enforcement
Test connectivity between pods to verify policies are correctly enforced.
## Expected Output
JSON audit report listing all network policies, unprotected namespaces, policy rule counts, and connectivity test results.
@@ -0,0 +1,68 @@
# API Reference: Implementing Container Network Policies with Calico
## calicoctl Commands
```bash
# List network policies across all namespaces
calicoctl get networkpolicy --all-namespaces -o json
# List global network policies
calicoctl get globalnetworkpolicy -o json
# Check Calico node status
calicoctl node status
# Apply a Calico network policy
calicoctl apply -f policy.yaml
# Get workload endpoints
calicoctl get workloadendpoint -o wide
# Check IP pool configuration
calicoctl get ippool -o json
```
## Kubernetes NetworkPolicy vs Calico
| Feature | K8s NetworkPolicy | Calico NetworkPolicy | Calico GlobalNetworkPolicy |
|---------|-------------------|---------------------|-----------------------------|
| Scope | Namespace | Namespace | Cluster-wide |
| Selector | Pod labels | Pod + service account | All workloads + host endpoints |
| Rule types | Ingress, Egress | Ingress, Egress | Ingress, Egress |
| DNS policy | No | Yes | Yes |
| Order/Priority | No | Yes (order field) | Yes (order field) |
| CIDR ranges | Yes | Yes | Yes |
## Default-Deny Policy Template
```yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-ingress
namespace: production
spec:
podSelector: {}
policyTypes:
- Ingress
```
## Python kubernetes Client
```python
from kubernetes import client, config
config.load_kube_config()
net_v1 = client.NetworkingV1Api()
policies = net_v1.list_network_policy_for_all_namespaces()
for p in policies.items:
print(p.metadata.name, p.metadata.namespace)
```
Install: `pip install kubernetes`
## References
- Calico Network Policy: https://docs.tigera.io/calico/latest/network-policy/get-started/calico-policy/calico-network-policy
- calicoctl Reference: https://docs.tigera.io/calico-enterprise/latest/reference/clis/calicoctl/overview
- K8s Network Policy: https://kubernetes.io/docs/concepts/services-networking/network-policies/
@@ -0,0 +1,192 @@
#!/usr/bin/env python3
"""Agent for implementing container network policies with Calico.
Audits Kubernetes network policies, identifies unprotected
namespaces, validates Calico policy enforcement, and generates
default-deny baseline policy manifests.
"""
import argparse
import json
import os
import subprocess
from datetime import datetime
from pathlib import Path
try:
from kubernetes import client, config
except ImportError:
client = None
config = None
DEFAULT_DENY_INGRESS = {
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {"name": "default-deny-ingress"},
"spec": {"podSelector": {}, "policyTypes": ["Ingress"]},
}
DEFAULT_DENY_EGRESS = {
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {"name": "default-deny-egress"},
"spec": {"podSelector": {}, "policyTypes": ["Egress"]},
}
class CalicoNetworkPolicyAgent:
"""Audits and manages Calico network policies on Kubernetes."""
def __init__(self, kubeconfig=None, output_dir="./calico_policy_audit"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.findings = []
self.v1 = None
self.net_v1 = None
if client and config:
try:
if kubeconfig:
config.load_kube_config(config_file=kubeconfig)
else:
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.net_v1 = client.NetworkingV1Api()
except Exception:
pass
def _run_calicoctl(self, args):
"""Run calicoctl command and return parsed output."""
try:
result = subprocess.run(
["calicoctl"] + args, capture_output=True, text=True, timeout=30
)
return result.stdout, result.returncode
except FileNotFoundError:
return "", 1
except subprocess.TimeoutExpired:
return "", 1
def list_k8s_network_policies(self):
"""List all Kubernetes NetworkPolicy resources across namespaces."""
if not self.net_v1:
return []
policies = self.net_v1.list_network_policy_for_all_namespaces()
return [{"name": p.metadata.name, "namespace": p.metadata.namespace,
"pod_selector": p.spec.pod_selector.match_labels or {},
"policy_types": p.spec.policy_types or [],
"ingress_rules": len(p.spec.ingress or []),
"egress_rules": len(p.spec.egress or [])}
for p in policies.items]
def list_calico_policies(self):
"""List Calico-specific NetworkPolicy and GlobalNetworkPolicy resources."""
policies = []
stdout, rc = self._run_calicoctl(["get", "networkpolicy", "-o", "json", "--all-namespaces"])
if rc == 0 and stdout.strip():
try:
data = json.loads(stdout)
items = data.get("items", [data]) if "items" in data else [data]
for item in items:
meta = item.get("metadata", {})
policies.append({"kind": "NetworkPolicy", "name": meta.get("name"),
"namespace": meta.get("namespace")})
except json.JSONDecodeError:
pass
stdout, rc = self._run_calicoctl(["get", "globalnetworkpolicy", "-o", "json"])
if rc == 0 and stdout.strip():
try:
data = json.loads(stdout)
items = data.get("items", [data]) if "items" in data else [data]
for item in items:
policies.append({"kind": "GlobalNetworkPolicy",
"name": item.get("metadata", {}).get("name")})
except json.JSONDecodeError:
pass
return policies
def find_unprotected_namespaces(self):
"""Identify namespaces without any network policies."""
if not self.v1 or not self.net_v1:
return []
namespaces = [ns.metadata.name for ns in self.v1.list_namespace().items]
policies = self.net_v1.list_network_policy_for_all_namespaces()
protected = {p.metadata.namespace for p in policies.items}
system_ns = {"kube-system", "kube-public", "kube-node-lease", "calico-system"}
unprotected = [ns for ns in namespaces if ns not in protected and ns not in system_ns]
for ns in unprotected:
self.findings.append({"severity": "high", "type": "Unprotected Namespace",
"detail": f"Namespace '{ns}' has no network policies"})
return unprotected
def generate_default_deny(self, namespace):
"""Generate default-deny policy manifests for a namespace."""
ingress = {**DEFAULT_DENY_INGRESS, "metadata": {**DEFAULT_DENY_INGRESS["metadata"],
"namespace": namespace}}
egress = {**DEFAULT_DENY_EGRESS, "metadata": {**DEFAULT_DENY_EGRESS["metadata"],
"namespace": namespace}}
return {"namespace": namespace, "ingress_policy": ingress, "egress_policy": egress}
def check_calico_node_status(self):
"""Check Calico node status using calicoctl."""
stdout, rc = self._run_calicoctl(["node", "status"])
if rc == 0:
return {"status": "healthy", "output": stdout[:500]}
return {"status": "unavailable"}
def test_connectivity(self, src_pod, dst_pod, namespace, port=80):
"""Test pod-to-pod connectivity using kubectl exec."""
try:
result = subprocess.run(
["kubectl", "exec", src_pod, "-n", namespace, "--",
"wget", "--spider", "--timeout=3", f"http://{dst_pod}:{port}"],
capture_output=True, text=True, timeout=10,
)
return {"src": src_pod, "dst": dst_pod, "port": port,
"connected": result.returncode == 0}
except (FileNotFoundError, subprocess.TimeoutExpired):
return {"src": src_pod, "dst": dst_pod, "connected": None, "error": "test failed"}
def generate_report(self):
k8s_policies = self.list_k8s_network_policies()
calico_policies = self.list_calico_policies()
unprotected = self.find_unprotected_namespaces()
node_status = self.check_calico_node_status()
deny_manifests = [self.generate_default_deny(ns) for ns in unprotected[:5]]
report = {
"report_date": datetime.utcnow().isoformat(),
"k8s_network_policies": {"count": len(k8s_policies), "policies": k8s_policies},
"calico_policies": {"count": len(calico_policies), "policies": calico_policies},
"unprotected_namespaces": unprotected,
"calico_node_status": node_status,
"recommended_deny_policies": deny_manifests,
"findings": self.findings,
"total_findings": len(self.findings),
}
out = self.output_dir / "calico_policy_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2)
print(json.dumps(report, indent=2))
return report
def main():
parser = argparse.ArgumentParser(
description="Audit and manage Calico network policies on Kubernetes"
)
parser.add_argument("--kubeconfig", default=None,
help="Path to kubeconfig file")
parser.add_argument("--output-dir", default="./calico_policy_audit",
help="Output directory for report")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
agent = CalicoNetworkPolicyAgent(kubeconfig=args.kubeconfig,
output_dir=args.output_dir)
agent.generate_report()
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,44 @@
---
name: performing-red-team-with-covenant
description: Conduct red team operations using the Covenant C2 framework for authorized adversary simulation, including listener setup, grunt deployment, task execution, and lateral movement tracking.
domain: cybersecurity
subdomain: red-team
tags: [red-team, c2, covenant, adversary-simulation, penetration-testing]
version: "1.0"
author: mahipal
license: MIT
---
# Performing Red Team Operations with Covenant C2
## Overview
Covenant is a collaborative .NET C2 framework for red teamers that provides a Swagger-documented REST API for managing listeners, launchers, grunts (agents), and tasks. This skill covers automating Covenant operations through its API for authorized red team engagements: creating HTTP/HTTPS listeners, generating binary and PowerShell launchers, deploying grunts, executing tasks on compromised hosts, and tracking lateral movement.
## Prerequisites
- Covenant C2 server deployed (Docker or .NET 6)
- Python 3.9+ with `requests` library
- Covenant API token (obtained via /api/users/login)
- Written authorization for red team engagement
- Isolated lab or authorized target environment
## Steps
### Step 1: Authenticate to Covenant API
Obtain a JWT token by posting credentials to /api/users/login endpoint.
### Step 2: Create Listener
Configure an HTTP or HTTPS listener with callback URLs and bind address.
### Step 3: Generate Launcher
Create a binary, PowerShell, or MSBuild launcher tied to the listener for grunt deployment.
### Step 4: Deploy and Manage Grunts
Monitor grunt callbacks, execute tasks, and collect output from compromised hosts.
### Step 5: Document Operations
Generate an operations report documenting all actions, timestamps, and findings.
## Expected Output
JSON report with listener configuration, active grunts, executed tasks, and task output for engagement documentation.
@@ -0,0 +1,66 @@
# API Reference: Performing Red Team with Covenant C2
## Covenant REST API Endpoints
| Endpoint | Method | Purpose |
|----------|--------|---------|
| /api/users/login | POST | Authenticate and get JWT token |
| /api/listeners | GET | List all listeners |
| /api/listeners/http | POST | Create HTTP listener |
| /api/grunts | GET | List all grunts (agents) |
| /api/grunts/{id}/interact | POST | Execute task on grunt |
| /api/grunttasks/{id} | GET | Get task output |
| /api/launchers/binary | PUT | Generate binary launcher |
| /api/launchers/powershell | PUT | Generate PowerShell launcher |
| /api/launchers/msbuild | PUT | Generate MSBuild launcher |
## Authentication
```json
POST /api/users/login
{"userName": "admin", "password": "pass"}
Response: {"covenantToken": "eyJhbGciOi..."}
Header: Authorization: Bearer <token>
```
## Listener Configuration
| Field | Type | Description |
|-------|------|-------------|
| name | string | Listener display name |
| bindAddress | string | IP to bind (0.0.0.0 for all) |
| bindPort | int | Port for grunt callbacks |
| connectAddresses | array | Callback addresses for grunts |
| listenerTypeId | int | 1=HTTP, 2=Bridge |
## Grunt Status Values
| Status | Description |
|--------|-------------|
| Uninitialized | Grunt created but not connected |
| Stage0 | Initial callback received |
| Stage1 | Key exchange in progress |
| Stage2 | Fully staged and active |
| Active | Connected and ready for tasks |
| Lost | Missed check-in threshold |
## Built-in Tasks
| Task | Description |
|------|-------------|
| WhoAmI | Current user identity |
| GetHostname | Target hostname |
| ListDirectory | Directory listing |
| Download | Retrieve file from target |
| Upload | Upload file to target |
| PowerShell | Execute PowerShell command |
| Assembly | Load and execute .NET assembly |
| Mimikatz | Credential extraction |
## References
- Covenant GitHub: https://github.com/cobbr/Covenant
- Covenant Wiki: https://github.com/cobbr/Covenant/wiki
- Covenant API Docs: Swagger UI at https://<host>:7443/swagger
- Netwrix Tutorial: https://netwrix.com/en/resources/blog/covenant-c2-tutorial/
@@ -0,0 +1,193 @@
#!/usr/bin/env python3
"""Agent for red team operations with Covenant C2 framework.
Automates Covenant C2 operations through its REST API: listener
management, launcher generation, grunt monitoring, and task
execution for authorized adversary simulation engagements.
"""
# For authorized red team engagements only
import argparse
import json
import os
from datetime import datetime
from pathlib import Path
try:
import requests
except ImportError:
requests = None
class CovenantC2Agent:
"""Manages Covenant C2 operations via its REST API."""
def __init__(self, covenant_url, username, password,
output_dir="./covenant_ops"):
self.base_url = covenant_url.rstrip("/")
self.token = None
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.operations_log = []
self._authenticate(username, password)
def _authenticate(self, username, password):
"""Authenticate and obtain JWT token from Covenant API."""
if not requests:
return
try:
resp = requests.post(
f"{self.base_url}/api/users/login",
json={"userName": username, "password": password},
verify=False, timeout=10,
)
if resp.status_code == 200:
data = resp.json()
self.token = data.get("covenantToken") or data.get("token")
self._log("authenticate", "success", {"user": username})
except requests.RequestException as e:
self._log("authenticate", "failed", {"error": str(e)})
def _api(self, method, path, data=None):
if not requests or not self.token:
return None
try:
resp = requests.request(
method, f"{self.base_url}/api{path}",
headers={"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"},
json=data, verify=False, timeout=15,
)
return resp
except requests.RequestException:
return None
def _log(self, action, status, details=None):
self.operations_log.append({
"timestamp": datetime.utcnow().isoformat(),
"action": action, "status": status,
"details": details or {},
})
def list_listeners(self):
"""List all configured listeners."""
resp = self._api("GET", "/listeners")
if resp and resp.status_code == 200:
listeners = resp.json()
self._log("list_listeners", "success", {"count": len(listeners)})
return [{"id": l["id"], "name": l["name"], "status": l["status"],
"bindAddress": l.get("bindAddress"),
"bindPort": l.get("bindPort"),
"listenerType": l.get("listenerType", {}).get("name")}
for l in listeners]
return []
def create_http_listener(self, name, bind_port=80, connect_addresses=None):
"""Create an HTTP listener for grunt callbacks."""
listener_data = {
"name": name,
"bindAddress": "0.0.0.0",
"bindPort": bind_port,
"connectAddresses": connect_addresses or ["0.0.0.0"],
"listenerTypeId": 1,
"status": "Active",
}
resp = self._api("POST", "/listeners/http", listener_data)
if resp and resp.status_code in (200, 201):
result = resp.json()
self._log("create_listener", "success",
{"name": name, "port": bind_port, "id": result.get("id")})
return result
self._log("create_listener", "failed",
{"status": resp.status_code if resp else 0})
return None
def list_grunts(self):
"""List all active grunts (agents)."""
resp = self._api("GET", "/grunts")
if resp and resp.status_code == 200:
grunts = resp.json()
self._log("list_grunts", "success", {"count": len(grunts)})
return [{"id": g["id"], "name": g["name"], "status": g["status"],
"hostname": g.get("hostname"), "userName": g.get("userName"),
"ipAddress": g.get("ipAddress"),
"operatingSystem": g.get("operatingSystem"),
"integrity": g.get("integrity"),
"lastCheckIn": g.get("lastCheckIn")}
for g in grunts]
return []
def create_launcher(self, listener_id, launcher_type="Binary"):
"""Generate a launcher payload for grunt deployment."""
resp = self._api("PUT", f"/launchers/{launcher_type.lower()}",
{"listenerId": listener_id})
if resp and resp.status_code == 200:
launcher = resp.json()
self._log("create_launcher", "success",
{"type": launcher_type, "listener_id": listener_id})
return {"type": launcher_type, "launcherString": launcher.get("launcherString", "")[:200]}
return None
def execute_task(self, grunt_id, task_name, parameters=None):
"""Assign and execute a task on a grunt."""
task_data = {
"gruntId": grunt_id,
"taskName": task_name,
"parameters": parameters or [],
}
resp = self._api("POST", f"/grunts/{grunt_id}/interact", task_data)
if resp and resp.status_code in (200, 201):
result = resp.json()
self._log("execute_task", "success",
{"grunt_id": grunt_id, "task": task_name})
return {"taskId": result.get("id"), "output": result.get("gruntTaskOutput", "")}
self._log("execute_task", "failed",
{"grunt_id": grunt_id, "task": task_name})
return None
def get_task_output(self, task_id):
"""Retrieve output from a completed task."""
resp = self._api("GET", f"/grunttasks/{task_id}")
if resp and resp.status_code == 200:
return resp.json().get("gruntTaskOutput", "")
return None
def generate_report(self):
"""Generate an operations report for engagement documentation."""
listeners = self.list_listeners()
grunts = self.list_grunts()
report = {
"report_date": datetime.utcnow().isoformat(),
"covenant_url": self.base_url,
"active_listeners": listeners,
"active_grunts": grunts,
"operations_log": self.operations_log,
"total_operations": len(self.operations_log),
}
out = self.output_dir / "covenant_ops_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2)
print(json.dumps(report, indent=2))
return report
def main():
parser = argparse.ArgumentParser(
description="Covenant C2 red team operations agent (authorized use only)"
)
parser.add_argument("covenant_url", help="Covenant server URL (e.g. https://10.0.0.5:7443)")
parser.add_argument("--username", default="admin", help="Covenant username")
parser.add_argument("--password", required=True, help="Covenant password")
parser.add_argument("--output-dir", default="./covenant_ops",
help="Output directory for ops report")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
agent = CovenantC2Agent(args.covenant_url, args.username, args.password,
output_dir=args.output_dir)
agent.generate_report()
if __name__ == "__main__":
main()