feat: add 5 cybersecurity skills - CloudTrail anomalies, SSL/TLS assessment, Wazuh detection, Prefetch analysis, WMI lateral movement

This commit is contained in:
mukul975
2026-03-11 00:44:42 +01:00
parent 1ba371d7f7
commit 724fda0883
20 changed files with 1593 additions and 0 deletions
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,39 @@
---
name: analyzing-windows-prefetch-with-python
description: Parse Windows Prefetch files using the windowsprefetch Python library to reconstruct application execution history, detect renamed or masquerading binaries, and identify suspicious program execution patterns.
domain: cybersecurity
subdomain: digital-forensics
tags: [digital-forensics, windows, prefetch, execution-history, incident-response, malware-analysis]
version: "1.0"
author: mahipal
license: MIT
---
# Analyzing Windows Prefetch with Python
## Overview
Windows Prefetch files (.pf) record application execution data including executable names, run counts, timestamps, loaded DLLs, and accessed directories. This skill covers parsing Prefetch files using the windowsprefetch Python library to reconstruct execution timelines, detect renamed or masquerading binaries by comparing executable names with loaded resources, and identifying suspicious programs that may indicate malware execution or lateral movement.
## Prerequisites
- Python 3.9+ with `windowsprefetch` library (pip install windowsprefetch)
- Windows Prefetch files from C:\Windows\Prefetch\ (versions 17-30 supported)
- Understanding of Windows Prefetch file naming conventions (EXECUTABLE-HASH.pf)
## Steps
### Step 1: Collect Prefetch Files
Gather .pf files from target system's C:\Windows\Prefetch\ directory.
### Step 2: Parse Execution History
Extract executable name, run count, last execution timestamps, and volume information.
### Step 3: Detect Suspicious Execution
Flag known attack tools (mimikatz, psexec, etc.), renamed binaries, and unusual execution patterns.
### Step 4: Build Execution Timeline
Reconstruct chronological execution timeline from all Prefetch files.
## Expected Output
JSON report with execution history, suspicious executables, renamed binary indicators, and timeline reconstruction.
@@ -0,0 +1,60 @@
# API Reference: Analyzing Windows Prefetch with Python
## windowsprefetch Library
```python
import windowsprefetch
pf = windowsprefetch.Prefetch("CMD.EXE-1234ABCD.pf")
print(pf.executableName) # CMD.EXE
print(pf.runCount) # 42
print(pf.lastRunTime) # 2025-01-15 10:30:22
print(pf.timestamps) # List of up to 8 execution times
print(pf.resources) # List of loaded files/DLLs
print(pf.volumes) # Volume info (name, serial, creation)
```
Install: `pip install windowsprefetch`
## Prefetch File Versions
| Version | Windows | Max Timestamps |
|---------|---------|----------------|
| 17 | XP/2003 | 1 |
| 23 | Vista/7 | 1 |
| 26 | 8/8.1 | 8 |
| 30 | 10/11 | 8 (compressed) |
## File Naming Convention
Format: `EXECUTABLE-XXXXXXXX.pf`
- EXECUTABLE: uppercase executable name
- XXXXXXXX: hash of file path (allows multiple entries per executable)
## Suspicious Executables to Flag
| Category | Examples |
|----------|---------|
| Credential tools | mimikatz, rubeus, lazagne, secretsdump |
| Lateral movement | psexec, psexesvc, wmiexec |
| C2 agents | beacon, meterpreter, covenant, empire |
| LOLBins | certutil, mshta, regsvr32, rundll32, bitsadmin |
| Recon | sharphound, bloodhound, nmap |
## Prefetch Directory Location
```
C:\Windows\Prefetch\
```
Requires admin privileges to read. Enable via:
```
reg query "HKLM\SYSTEM\CurrentControlSet\Control\Session Manager\Memory Management\PrefetchParameters"
```
## References
- windowsprefetch PyPI: https://pypi.org/project/windowsprefetch/
- Windows Prefetch Parser: https://github.com/PoorBillionaire/Windows-Prefetch-Parser
- libscca/pyscca: https://github.com/libyal/libscca
- SANS Prefetch Analysis: https://www.sans.org/blog/a-prescription-for-windows-prefetch-analysis
@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""Agent for analyzing Windows Prefetch files with Python.
Parses Prefetch (.pf) files to reconstruct execution history,
detect renamed/masquerading binaries, and identify suspicious
tool execution using the windowsprefetch library.
"""
import argparse
import hashlib
import json
import os
from datetime import datetime
from pathlib import Path
try:
import windowsprefetch
except ImportError:
windowsprefetch = None
SUSPICIOUS_EXECUTABLES = {
"mimikatz", "psexec", "psexesvc", "procdump", "lazagne",
"rubeus", "sharphound", "bloodhound", "cobalt", "beacon",
"meterpreter", "powersploit", "empire", "covenant",
"secretsdump", "wce", "fgdump", "pwdump", "gsecdump",
"certutil", "bitsadmin", "mshta", "regsvr32", "rundll32",
"wscript", "cscript", "msiexec", "installutil",
}
LOLBINS = {
"certutil.exe", "bitsadmin.exe", "mshta.exe", "regsvr32.exe",
"rundll32.exe", "wscript.exe", "cscript.exe", "msiexec.exe",
"installutil.exe", "regasm.exe", "regsvcs.exe", "msconfig.exe",
"esentutl.exe", "expand.exe", "extrac32.exe", "findstr.exe",
"hh.exe", "ie4uinit.exe", "makecab.exe", "replace.exe",
}
class PrefetchAnalyzer:
"""Analyzes Windows Prefetch files for forensic investigation."""
def __init__(self, output_dir="./prefetch_analysis"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.findings = []
self.executions = []
def parse_prefetch_file(self, pf_path):
"""Parse a single Prefetch file and extract execution data."""
if windowsprefetch is None:
raise RuntimeError("windowsprefetch not installed: pip install windowsprefetch")
try:
pf = windowsprefetch.Prefetch(pf_path)
except Exception:
return None
timestamps = []
if hasattr(pf, "lastRunTime"):
timestamps.append(str(pf.lastRunTime))
if hasattr(pf, "timestamps"):
timestamps.extend([str(t) for t in pf.timestamps])
resources = []
if hasattr(pf, "resources"):
resources = pf.resources if isinstance(pf.resources, list) else []
elif hasattr(pf, "filenames"):
resources = pf.filenames if isinstance(pf.filenames, list) else []
volumes = []
if hasattr(pf, "volumes"):
for v in pf.volumes:
volumes.append({
"name": getattr(v, "name", str(v)),
"serial": getattr(v, "serialNumber", ""),
})
entry = {
"file": str(pf_path),
"executable": pf.executableName if hasattr(pf, "executableName") else Path(pf_path).stem,
"run_count": pf.runCount if hasattr(pf, "runCount") else 0,
"last_run_time": timestamps[0] if timestamps else "",
"all_timestamps": timestamps,
"pf_hash": Path(pf_path).stem.split("-")[-1] if "-" in Path(pf_path).stem else "",
"resources_count": len(resources),
"volumes": volumes,
"file_size": os.path.getsize(pf_path),
"file_sha256": self._hash_file(pf_path),
}
self.executions.append(entry)
return entry
def _hash_file(self, path):
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def parse_directory(self, prefetch_dir):
"""Parse all .pf files in a directory."""
pf_dir = Path(prefetch_dir)
pf_files = sorted(pf_dir.glob("*.pf"), key=lambda p: p.stat().st_mtime, reverse=True)
for pf_file in pf_files:
self.parse_prefetch_file(str(pf_file))
return len(pf_files)
def detect_suspicious(self):
"""Flag known attack tools and LOLBins."""
for entry in self.executions:
exe = entry["executable"].lower()
exe_base = exe.replace(".exe", "")
if exe_base in SUSPICIOUS_EXECUTABLES:
self.findings.append({
"severity": "critical", "type": "Attack Tool Executed",
"detail": f"{entry['executable']} run {entry['run_count']} times, "
f"last: {entry['last_run_time']}",
})
elif exe in LOLBINS:
if entry["run_count"] > 10:
self.findings.append({
"severity": "medium", "type": "LOLBin High Usage",
"detail": f"{entry['executable']} run {entry['run_count']} times",
})
def detect_renamed_binaries(self):
"""Detect potential binary renaming/masquerading."""
for entry in self.executions:
exe = entry["executable"].upper()
pf_name = Path(entry["file"]).stem.upper()
expected_prefix = exe.replace(".EXE", "")
if not pf_name.startswith(expected_prefix):
self.findings.append({
"severity": "high", "type": "Possible Renamed Binary",
"detail": f"PF name '{pf_name}' does not match executable '{exe}'",
})
def build_timeline(self):
"""Build chronological execution timeline."""
timeline = []
for entry in self.executions:
for ts in entry["all_timestamps"]:
if ts:
timeline.append({
"timestamp": ts,
"executable": entry["executable"],
"run_count": entry["run_count"],
})
timeline.sort(key=lambda x: x["timestamp"], reverse=True)
return timeline[:100]
def generate_report(self, prefetch_dir):
count = self.parse_directory(prefetch_dir)
self.detect_suspicious()
self.detect_renamed_binaries()
timeline = self.build_timeline()
report = {
"report_date": datetime.utcnow().isoformat(),
"prefetch_dir": str(prefetch_dir),
"total_prefetch_files": count,
"total_unique_executables": len(self.executions),
"execution_history": self.executions,
"execution_timeline": timeline[:50],
"findings": self.findings,
"total_findings": len(self.findings),
}
out = self.output_dir / "prefetch_analysis_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2, default=str)
print(json.dumps(report, indent=2, default=str))
return report
def main():
parser = argparse.ArgumentParser(
description="Analyze Windows Prefetch files for execution forensics"
)
parser.add_argument("prefetch_dir", help="Path to directory containing .pf files")
parser.add_argument("--output-dir", default="./prefetch_analysis")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
analyzer = PrefetchAnalyzer(output_dir=args.output_dir)
analyzer.generate_report(args.prefetch_dir)
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: detecting-aws-cloudtrail-anomalies
description: Detect unusual API call patterns in AWS CloudTrail logs using boto3, statistical baselining, and behavioral analysis to identify credential compromise, privilege escalation, and unauthorized resource access.
domain: cybersecurity
subdomain: cloud-security
tags: [cloud-security, aws, cloudtrail, anomaly-detection, threat-detection, boto3]
version: "1.0"
author: mahipal
license: MIT
---
# Detecting AWS CloudTrail Anomalies
## Overview
AWS CloudTrail records API calls across AWS services. This skill covers querying CloudTrail events with boto3's `lookup_events` API, building statistical baselines of normal API activity, detecting anomalies such as unusual event sources, geographic anomalies, high-frequency API calls, and first-time API usage patterns that indicate compromised credentials or insider threats.
## Prerequisites
- Python 3.9+ with `boto3` library
- AWS credentials with CloudTrail read permissions (cloudtrail:LookupEvents)
- Understanding of AWS IAM and common API patterns
- CloudTrail enabled in target AWS account (management events at minimum)
## Steps
### Step 1: Query CloudTrail Events
Use boto3 CloudTrail client's lookup_events to retrieve recent API activity with pagination.
### Step 2: Build Activity Baseline
Aggregate events by user, source IP, event source, and event name to establish normal behavior patterns.
### Step 3: Detect Anomalies
Flag unusual patterns: new event sources per user, first-time API calls, geographic IP changes, high error rates, and sensitive API usage (IAM, KMS, S3 policy changes).
### Step 4: Generate Detection Report
Produce a JSON report with anomaly scores, top suspicious users, and recommended investigation actions.
## Expected Output
JSON report with event statistics, baseline deviations, anomalous users/IPs, sensitive API calls, and error rate analysis.
@@ -0,0 +1,59 @@
# API Reference: Detecting AWS CloudTrail Anomalies
## boto3 CloudTrail API
```python
import boto3
client = boto3.client("cloudtrail", region_name="us-east-1")
# Paginated event lookup
paginator = client.get_paginator("lookup_events")
pages = paginator.paginate(
StartTime=datetime(2025, 1, 1),
EndTime=datetime.utcnow(),
LookupAttributes=[{"AttributeKey": "EventName", "AttributeValue": "ConsoleLogin"}],
PaginationConfig={"MaxItems": 500, "PageSize": 50},
)
for page in pages:
for event in page["Events"]:
ct = json.loads(event["CloudTrailEvent"])
print(ct["sourceIPAddress"], event["EventName"])
```
## CloudTrail Event Fields
| Field | Location | Description |
|-------|----------|-------------|
| EventName | Event | API action name |
| EventSource | Event | AWS service (e.g. iam.amazonaws.com) |
| Username | Event | IAM user or assumed role |
| sourceIPAddress | CloudTrailEvent JSON | Caller IP address |
| errorCode | CloudTrailEvent JSON | Error type if failed |
| userAgent | CloudTrailEvent JSON | Client SDK/browser |
| awsRegion | CloudTrailEvent JSON | Region of API call |
## Sensitive API Calls to Monitor
| Event Name | Risk | Reason |
|------------|------|--------|
| StopLogging | Critical | Disabling CloudTrail |
| DeleteTrail | Critical | Removing audit trail |
| CreateAccessKey | High | New credentials for user |
| AttachUserPolicy | High | Privilege escalation |
| PutBucketPolicy | High | S3 access change |
| ConsoleLogin | Medium | Interactive access |
| RunInstances | Medium | Resource creation |
| AssumeRole | Medium | Role switching |
## Rate Limits
- lookup_events: 2 requests/second per account per region
- Maximum lookback: 90 days
- Max results per page: 50 events
## References
- boto3 CloudTrail: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudtrail.html
- CloudTrail Insights: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/logging-insights-events-with-cloudtrail.html
- LookupEvents API: https://docs.aws.amazon.com/awscloudtrail/latest/APIReference/API_LookupEvents.html
@@ -0,0 +1,183 @@
#!/usr/bin/env python3
"""Agent for detecting anomalies in AWS CloudTrail logs.
Queries CloudTrail events via boto3, builds behavioral baselines,
and detects unusual API patterns indicating credential compromise,
privilege escalation, or unauthorized access.
"""
import argparse
import json
import os
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from pathlib import Path
try:
import boto3
except ImportError:
boto3 = None
SENSITIVE_EVENTS = {
"CreateUser", "CreateAccessKey", "AttachUserPolicy", "AttachRolePolicy",
"PutUserPolicy", "PutRolePolicy", "CreateRole", "AssumeRole",
"ConsoleLogin", "PutBucketPolicy", "PutBucketAcl",
"CreateKeyPair", "RunInstances", "StopLogging", "DeleteTrail",
"DisableKey", "ScheduleKeyDeletion", "CreateGrante",
"AuthorizeSecurityGroupIngress", "ModifyInstanceAttribute",
}
ERROR_INDICATORS = {"AccessDenied", "UnauthorizedAccess", "Client.UnauthorizedAccess"}
class CloudTrailAnomalyDetector:
"""Detects anomalies in AWS CloudTrail API activity."""
def __init__(self, profile=None, region=None, lookback_hours=24,
output_dir="./cloudtrail_anomalies"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.lookback_hours = lookback_hours
self.findings = []
self.client = None
if boto3:
session = boto3.Session(profile_name=profile, region_name=region or "us-east-1")
self.client = session.client("cloudtrail")
def fetch_events(self, max_results=1000):
"""Fetch CloudTrail events using lookup_events with pagination."""
if not self.client:
return []
start_time = datetime.utcnow() - timedelta(hours=self.lookback_hours)
events = []
paginator = self.client.get_paginator("lookup_events")
page_iter = paginator.paginate(
StartTime=start_time,
EndTime=datetime.utcnow(),
PaginationConfig={"MaxItems": max_results, "PageSize": 50},
)
for page in page_iter:
for event in page.get("Events", []):
ct_event = json.loads(event.get("CloudTrailEvent", "{}"))
events.append({
"event_name": event.get("EventName", ""),
"event_source": event.get("EventSource", ""),
"event_time": event.get("EventTime", "").isoformat()
if hasattr(event.get("EventTime", ""), "isoformat")
else str(event.get("EventTime", "")),
"username": event.get("Username", ""),
"source_ip": ct_event.get("sourceIPAddress", ""),
"user_agent": ct_event.get("userAgent", ""),
"error_code": ct_event.get("errorCode", ""),
"error_message": ct_event.get("errorMessage", ""),
"aws_region": ct_event.get("awsRegion", ""),
"read_only": event.get("ReadOnly", ""),
})
return events
def build_baseline(self, events):
"""Build behavioral baseline from events."""
user_events = defaultdict(list)
user_ips = defaultdict(set)
user_sources = defaultdict(set)
for e in events:
user = e["username"]
user_events[user].append(e["event_name"])
user_ips[user].add(e["source_ip"])
user_sources[user].add(e["event_source"])
return {
"user_event_counts": {u: len(evts) for u, evts in user_events.items()},
"user_unique_ips": {u: len(ips) for u, ips in user_ips.items()},
"user_unique_sources": {u: len(srcs) for u, srcs in user_sources.items()},
}
def detect_anomalies(self, events):
"""Detect anomalous patterns in CloudTrail events."""
user_events = defaultdict(list)
for e in events:
user_events[e["username"]].append(e)
sensitive_calls = [e for e in events if e["event_name"] in SENSITIVE_EVENTS]
for e in sensitive_calls:
self.findings.append({
"severity": "high", "type": "Sensitive API Call",
"detail": f"{e['username']} called {e['event_name']} from {e['source_ip']}",
})
error_events = [e for e in events if e["error_code"] in ERROR_INDICATORS]
error_by_user = Counter(e["username"] for e in error_events)
for user, count in error_by_user.items():
if count >= 5:
self.findings.append({
"severity": "high", "type": "High Access Denied Rate",
"detail": f"{user} received {count} AccessDenied errors",
})
for user, evts in user_events.items():
ips = {e["source_ip"] for e in evts}
if len(ips) >= 5:
self.findings.append({
"severity": "medium", "type": "Multiple Source IPs",
"detail": f"{user} accessed from {len(ips)} distinct IPs",
})
trail_tampering = [e for e in events
if e["event_name"] in ("StopLogging", "DeleteTrail", "UpdateTrail")]
for e in trail_tampering:
self.findings.append({
"severity": "critical", "type": "CloudTrail Tampering",
"detail": f"{e['username']} called {e['event_name']}",
})
return {
"sensitive_api_calls": len(sensitive_calls),
"access_denied_events": len(error_events),
"trail_tampering_events": len(trail_tampering),
}
def generate_report(self):
events = self.fetch_events()
baseline = self.build_baseline(events)
anomalies = self.detect_anomalies(events)
event_name_counts = Counter(e["event_name"] for e in events).most_common(20)
source_counts = Counter(e["event_source"] for e in events).most_common(10)
report = {
"report_date": datetime.utcnow().isoformat(),
"lookback_hours": self.lookback_hours,
"total_events": len(events),
"top_event_names": event_name_counts,
"top_event_sources": source_counts,
"baseline": baseline,
"anomaly_summary": anomalies,
"findings": self.findings,
"total_findings": len(self.findings),
}
out = self.output_dir / "cloudtrail_anomaly_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2, default=str)
print(json.dumps(report, indent=2, default=str))
return report
def main():
parser = argparse.ArgumentParser(
description="Detect anomalies in AWS CloudTrail API activity"
)
parser.add_argument("--profile", default=None, help="AWS CLI profile name")
parser.add_argument("--region", default="us-east-1", help="AWS region")
parser.add_argument("--hours", type=int, default=24, help="Lookback window in hours")
parser.add_argument("--output-dir", default="./cloudtrail_anomalies")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
detector = CloudTrailAnomalyDetector(
profile=args.profile, region=args.region,
lookback_hours=args.hours, output_dir=args.output_dir,
)
detector.generate_report()
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: hunting-for-lateral-movement-via-wmi
description: Detect WMI-based lateral movement by analyzing Windows Event ID 4688 process creation and Sysmon Event ID 1 for WmiPrvSE.exe child process patterns, remote process execution, and WMI event subscription persistence.
domain: cybersecurity
subdomain: threat-hunting
tags: [threat-hunting, lateral-movement, wmi, sysmon, mitre-attack, process-creation]
version: "1.0"
author: mahipal
license: MIT
---
# Hunting for Lateral Movement via WMI
## Overview
Windows Management Instrumentation (WMI) is commonly abused for lateral movement via `wmic process call create` or Win32_Process.Create() to execute commands on remote hosts. Detection focuses on identifying WmiPrvSE.exe spawning child processes (cmd.exe, powershell.exe) in Windows Security Event ID 4688 and Sysmon Event ID 1 logs, along with WMI-Activity/Operational events (5857, 5860, 5861) for event subscription persistence.
## Prerequisites
- Windows Security Event Logs with Process Creation auditing enabled (Event 4688 with command line)
- Sysmon installed with Event ID 1 (Process Creation) configured
- Python 3.9+ with `python-evtx`, `lxml` libraries
- Understanding of WMI architecture and WmiPrvSE.exe behavior
## Steps
### Step 1: Parse Process Creation Events
Extract Event ID 4688 and Sysmon Event 1 entries from EVTX files.
### Step 2: Detect WmiPrvSE Child Processes
Flag processes where ParentImage/ParentProcessName is WmiPrvSE.exe, indicating remote WMI execution.
### Step 3: Analyze Command Line Patterns
Identify suspicious command lines matching WMI lateral movement patterns (cmd.exe /q /c, output redirection to admin$ share).
### Step 4: Check WMI Event Subscriptions
Parse WMI-Activity/Operational log for event consumer creation indicating persistence.
## Expected Output
JSON report with WMI-spawned processes, suspicious command lines, WMI event subscription alerts, and timeline of lateral movement activity.
@@ -0,0 +1,78 @@
# API Reference: Hunting for Lateral Movement via WMI
## Detection Event IDs
| Source | Event ID | Description |
|--------|----------|-------------|
| Security | 4688 | Process creation (enable command line auditing) |
| Sysmon | 1 | Process creation with full details |
| WMI-Activity | 5857 | WMI provider loaded |
| WMI-Activity | 5860 | WMI temporary event consumer |
| WMI-Activity | 5861 | WMI permanent event consumer |
## WMI Lateral Movement Process Chain
```
Source Host: Destination Host:
wmic.exe --> WmiPrvSE.exe
process call create -> cmd.exe /q /c <command>
-> 1> \\127.0.0.1\admin$\__<timestamp> 2>&1
```
## Key Detection Patterns
| Pattern | Indicator | MITRE |
|---------|-----------|-------|
| WmiPrvSE -> cmd.exe | Remote command execution | T1047 |
| WmiPrvSE -> powershell.exe | Remote PowerShell via WMI | T1047 |
| cmd.exe /q /c ... admin$ | WMI output redirection | T1047 |
| Event 5861 consumer | WMI event subscription persistence | T1546.003 |
| wmic process call create | Direct WMI process creation | T1047 |
## Suspicious Child Processes of WmiPrvSE.exe
| Process | Risk Level | Context |
|---------|------------|---------|
| cmd.exe | High | Command execution |
| powershell.exe | High | Script execution |
| mshta.exe | Critical | HTA script execution |
| cscript.exe | High | VBScript/JScript |
| regsvr32.exe | High | COM object registration |
| rundll32.exe | High | DLL execution |
## Command Line Regex Patterns
```python
# WMI remote execution via cmd
r"cmd\.exe\s+/[qQ]\s+/[cC]"
# Output to admin$ share
r"\\\\127\.0\.0\.1\\admin\$\\__\d+"
# WMIC process creation
r"wmic\s+.*process\s+call\s+create"
```
## Sysmon Event 1 Key Fields
| Field | Description |
|-------|-------------|
| Image | Full path of created process |
| ParentImage | Full path of parent process |
| CommandLine | Process command line arguments |
| User | Account that created the process |
| ProcessGuid | Unique process identifier |
| ParentProcessGuid | Parent process identifier |
## WMI-Activity Log Location
```
%SystemRoot%\System32\winevt\Logs\Microsoft-Windows-WMI-Activity%4Operational.evtx
```
## References
- MITRE T1047 (WMI): https://attack.mitre.org/techniques/T1047/
- MITRE T1546.003 (WMI Event Subscription): https://attack.mitre.org/techniques/T1546/003/
- Detecting WMI Lateral Movement: https://imphash.medium.com/detecting-lateral-movement-101-part-2
- JPCERT Lateral Movement: https://www.jpcert.or.jp/english/pub/sr/20170612ac-ir_research_en.pdf
@@ -0,0 +1,209 @@
#!/usr/bin/env python3
"""Agent for hunting lateral movement via WMI.
Detects WMI-based lateral movement by parsing Windows Event ID
4688 and Sysmon Event 1 for WmiPrvSE.exe child process patterns,
suspicious command lines, and WMI event subscription persistence.
"""
# For authorized threat hunting and blue team use only
import argparse
import json
import os
import re
import xml.etree.ElementTree as ET
from collections import Counter
from datetime import datetime
from pathlib import Path
try:
import Evtx.Evtx as evtx
except ImportError:
evtx = None
NS = "{http://schemas.microsoft.com/win/2004/08/events/event}"
WMI_PARENT_NAMES = {"wmiprvse.exe", "wmiprvse"}
SUSPICIOUS_CHILDREN = {"cmd.exe", "powershell.exe", "pwsh.exe", "mshta.exe",
"cscript.exe", "wscript.exe", "regsvr32.exe", "rundll32.exe"}
WMI_CMD_PATTERNS = [
re.compile(r"cmd\.exe\s+/[qQ]\s+/[cC]", re.IGNORECASE),
re.compile(r"\\\\127\.0\.0\.1\\admin\$\\__\d+", re.IGNORECASE),
re.compile(r"wmic\s+.*process\s+call\s+create", re.IGNORECASE),
re.compile(r"Win32_Process.*Create", re.IGNORECASE),
]
WMI_ACTIVITY_IDS = {"5857", "5860", "5861"}
def _parse_event(xml_str):
"""Extract event fields from EVTX XML record."""
root = ET.fromstring(xml_str)
sys_node = root.find(f"{NS}System")
event_id = sys_node.find(f"{NS}EventID").text if sys_node is not None else None
tc = sys_node.find(f"{NS}TimeCreated") if sys_node is not None else None
timestamp = tc.get("SystemTime", "") if tc is not None else ""
channel = ""
ch_node = sys_node.find(f"{NS}Channel") if sys_node is not None else None
if ch_node is not None:
channel = ch_node.text or ""
data = {}
for d in root.iter(f"{NS}Data"):
name = d.get("Name", "")
data[name] = d.text or ""
return event_id, timestamp, channel, data
class WMILateralMovementHunter:
"""Hunts for WMI-based lateral movement in Windows Event Logs."""
def __init__(self, output_dir="./wmi_lateral_hunt"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.findings = []
self.wmi_processes = []
self.wmi_subscriptions = []
def parse_evtx(self, evtx_path):
"""Parse EVTX file for WMI lateral movement indicators."""
if evtx is None:
raise RuntimeError("python-evtx required: pip install python-evtx")
with evtx.Evtx(evtx_path) as log:
for record in log.records():
try:
xml_str = record.xml()
except Exception:
continue
event_id, ts, channel, data = _parse_event(xml_str)
if event_id == "4688":
self._check_4688(ts, data)
elif event_id == "1" and "sysmon" in channel.lower():
self._check_sysmon1(ts, data)
elif event_id in WMI_ACTIVITY_IDS:
self._check_wmi_activity(event_id, ts, data)
def _check_4688(self, ts, data):
"""Check Security Event 4688 for WmiPrvSE child processes."""
parent = data.get("ParentProcessName", "").lower()
new_proc = data.get("NewProcessName", "").lower()
cmdline = data.get("CommandLine", "")
parent_base = Path(parent).name if parent else ""
if parent_base in WMI_PARENT_NAMES:
child_base = Path(new_proc).name if new_proc else ""
entry = {
"timestamp": ts,
"source": "Event4688",
"parent": parent,
"child": new_proc,
"command_line": cmdline[:500],
"user": data.get("SubjectUserName", ""),
"suspicious": child_base in SUSPICIOUS_CHILDREN,
}
self.wmi_processes.append(entry)
severity = "high" if entry["suspicious"] else "medium"
self.findings.append({
"severity": severity,
"type": "WMI Process Spawn",
"detail": f"WmiPrvSE spawned {child_base}: {cmdline[:100]}",
"mitre": "T1047",
})
for pattern in WMI_CMD_PATTERNS:
if pattern.search(cmdline):
self.findings.append({
"severity": "critical",
"type": "WMI Lateral Movement Command",
"detail": f"Pattern match in: {cmdline[:150]}",
"mitre": "T1047",
})
break
def _check_sysmon1(self, ts, data):
"""Check Sysmon Event ID 1 for WmiPrvSE child processes."""
parent_image = data.get("ParentImage", "").lower()
image = data.get("Image", "").lower()
cmdline = data.get("CommandLine", "")
parent_base = Path(parent_image).name if parent_image else ""
if parent_base in WMI_PARENT_NAMES:
child_base = Path(image).name if image else ""
entry = {
"timestamp": ts,
"source": "Sysmon1",
"parent_image": parent_image,
"image": image,
"command_line": cmdline[:500],
"user": data.get("User", ""),
"parent_guid": data.get("ParentProcessGuid", ""),
"process_guid": data.get("ProcessGuid", ""),
"suspicious": child_base in SUSPICIOUS_CHILDREN,
}
self.wmi_processes.append(entry)
if entry["suspicious"]:
self.findings.append({
"severity": "high",
"type": "Sysmon WMI Process Spawn",
"detail": f"WmiPrvSE -> {child_base} by {entry['user']}",
"mitre": "T1047",
})
def _check_wmi_activity(self, event_id, ts, data):
"""Check WMI-Activity/Operational events for persistence."""
entry = {
"timestamp": ts,
"event_id": event_id,
"operation": data.get("Operation", ""),
"consumer": data.get("Consumer", data.get("CONSUMER", "")),
"possible_cause": data.get("PossibleCause", ""),
}
self.wmi_subscriptions.append(entry)
if event_id in ("5860", "5861"):
self.findings.append({
"severity": "high",
"type": "WMI Event Subscription",
"detail": f"Event consumer created/modified: {entry['consumer'][:100]}",
"mitre": "T1546.003",
})
def generate_report(self, evtx_files):
for path in evtx_files:
self.parse_evtx(path)
child_counts = Counter(
Path(p["child"] if "child" in p else p.get("image", "")).name
for p in self.wmi_processes
)
report = {
"report_date": datetime.utcnow().isoformat(),
"evtx_files_parsed": [str(f) for f in evtx_files],
"total_wmi_spawned_processes": len(self.wmi_processes),
"wmi_child_process_counts": dict(child_counts),
"wmi_processes": self.wmi_processes[:50],
"wmi_event_subscriptions": self.wmi_subscriptions,
"findings": self.findings,
"total_findings": len(self.findings),
}
out = self.output_dir / "wmi_lateral_movement_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2, default=str)
print(json.dumps(report, indent=2, default=str))
return report
def main():
parser = argparse.ArgumentParser(
description="Hunt for WMI-based lateral movement in Windows Event Logs"
)
parser.add_argument("evtx_files", nargs="+",
help="Path(s) to EVTX files (Security, Sysmon, WMI-Activity)")
parser.add_argument("--output-dir", default="./wmi_lateral_hunt")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
hunter = WMILateralMovementHunter(output_dir=args.output_dir)
hunter.generate_report(args.evtx_files)
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: implementing-endpoint-detection-with-wazuh
description: Deploy and configure Wazuh SIEM/XDR for endpoint detection including agent management, custom decoder and rule XML creation, alert querying via the Wazuh REST API, and automated response actions.
domain: cybersecurity
subdomain: security-operations
tags: [siem, xdr, wazuh, endpoint-detection, custom-rules, incident-response]
version: "1.0"
author: mahipal
license: MIT
---
# Implementing Endpoint Detection with Wazuh
## Overview
Wazuh is an open-source SIEM and XDR platform for endpoint monitoring, threat detection, and compliance. This skill covers managing agents via the Wazuh REST API, creating custom decoders and rules in XML for organization-specific detections, querying alerts, and testing rule logic using the logtest endpoint.
## Prerequisites
- Wazuh Manager 4.x deployed with API enabled
- Python 3.9+ with `requests` library
- API credentials (username/password for JWT authentication)
- Understanding of Wazuh decoder and rule XML syntax
## Steps
### Step 1: Authenticate to Wazuh API
Obtain JWT token via POST to /security/user/authenticate.
### Step 2: List and Monitor Agents
Query agent status, versions, and last keep-alive via /agents endpoint.
### Step 3: Query Security Alerts
Search alerts by rule ID, severity, agent, or time range.
### Step 4: Test Custom Rules with Logtest
Use the /logtest endpoint to validate decoder and rule logic against sample log lines.
## Expected Output
JSON report with agent inventory, alert statistics, rule coverage, and logtest validation results.
@@ -0,0 +1,72 @@
# API Reference: Implementing Endpoint Detection with Wazuh
## Wazuh REST API Endpoints
| Endpoint | Method | Purpose |
|----------|--------|---------|
| /security/user/authenticate | POST | Get JWT token (Basic Auth) |
| /agents | GET | List agents with status |
| /agents/summary/status | GET | Agent status summary |
| /alerts | GET | Query security alerts |
| /rules | GET | List detection rules |
| /logtest | PUT | Test log against decoders/rules |
| /manager/configuration | GET | Manager configuration |
| /agents/{id}/restart | PUT | Restart specific agent |
## Authentication
```python
import requests
from requests.auth import HTTPBasicAuth
resp = requests.post(
"https://wazuh:55000/security/user/authenticate",
auth=HTTPBasicAuth("wazuh-wui", "password"),
verify=False,
)
token = resp.json()["data"]["token"]
headers = {"Authorization": f"Bearer {token}"}
```
## Custom Rule XML Syntax
```xml
<group name="custom_rules,">
<rule id="100001" level="12">
<if_sid>5716</if_sid>
<srcip>!192.168.1.0/24</srcip>
<description>SSH login from external IP</description>
<mitre><id>T1078</id></mitre>
</rule>
</group>
```
Location: `/var/ossec/etc/rules/local_rules.xml`
## Custom Decoder XML
```xml
<decoder name="custom_app">
<program_name>myapp</program_name>
<regex>^(\S+) (\S+) (\S+)</regex>
<order>srcip,user,action</order>
</decoder>
```
Location: `/var/ossec/etc/decoders/local_decoder.xml`
## Alert Query Parameters
| Parameter | Example | Description |
|-----------|---------|-------------|
| limit | 20 | Max results |
| sort | -timestamp | Sort descending |
| q | rule.level>=10 | Filter by level |
| search | brute force | Text search |
| select | rule.id,agent.name | Field selection |
## References
- Wazuh API Docs: https://documentation.wazuh.com/current/user-manual/api/
- Wazuh Rules Syntax: https://documentation.wazuh.com/current/user-manual/ruleset/ruleset-xml-syntax/rules.html
- Wazuh Custom Rules: https://documentation.wazuh.com/current/user-manual/ruleset/rules/custom.html
@@ -0,0 +1,192 @@
#!/usr/bin/env python3
"""Agent for implementing endpoint detection with Wazuh.
Manages Wazuh agents, queries alerts, tests custom decoder/rule
logic via logtest, and audits endpoint coverage using the
Wazuh REST API.
"""
import argparse
import json
import os
from datetime import datetime
from pathlib import Path
try:
import requests
from requests.auth import HTTPBasicAuth
except ImportError:
requests = None
class WazuhDetectionAgent:
"""Manages Wazuh endpoint detection via its REST API."""
def __init__(self, wazuh_url, username, password,
output_dir="./wazuh_detection"):
self.base_url = wazuh_url.rstrip("/")
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.findings = []
self.token = self._authenticate(username, password)
def _authenticate(self, username, password):
"""Authenticate and obtain JWT token."""
if not requests:
return None
try:
resp = requests.post(
f"{self.base_url}/security/user/authenticate",
auth=HTTPBasicAuth(username, password),
verify=False, timeout=10,
)
if resp.status_code == 200:
return resp.json().get("data", {}).get("token")
except requests.RequestException:
pass
return None
def _api(self, method, path, params=None, data=None):
if not requests or not self.token:
return None
try:
resp = requests.request(
method, f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {self.token}"},
params=params, json=data, verify=False, timeout=15,
)
return resp.json() if resp.status_code == 200 else None
except requests.RequestException:
return None
def list_agents(self):
"""List all registered Wazuh agents with status."""
data = self._api("GET", "/agents", params={"limit": 500, "select": "id,name,status,os.name,os.version,version,lastKeepAlive,ip"})
if not data:
return []
agents = data.get("data", {}).get("affected_items", [])
disconnected = [a for a in agents if a.get("status") == "disconnected"]
if disconnected:
self.findings.append({
"severity": "medium", "type": "Disconnected Agents",
"detail": f"{len(disconnected)} agents disconnected",
})
return agents
def get_agent_summary(self):
"""Get agent status summary counts."""
data = self._api("GET", "/agents/summary/status")
if data:
return data.get("data", {})
return {}
def query_alerts(self, limit=20, level_min=10):
"""Query recent high-severity alerts."""
data = self._api("GET", "/alerts", params={
"limit": limit, "sort": "-timestamp",
"q": f"rule.level>={level_min}",
})
if not data:
return []
alerts = data.get("data", {}).get("affected_items", [])
return [{
"id": a.get("id"),
"timestamp": a.get("timestamp"),
"rule_id": a.get("rule", {}).get("id"),
"rule_description": a.get("rule", {}).get("description"),
"rule_level": a.get("rule", {}).get("level"),
"agent_name": a.get("agent", {}).get("name"),
"agent_id": a.get("agent", {}).get("id"),
} for a in alerts]
def get_rules_summary(self):
"""Get summary of active detection rules."""
data = self._api("GET", "/rules", params={"limit": 500, "select": "id,description,level,groups"})
if not data:
return {"total": 0}
rules = data.get("data", {}).get("affected_items", [])
total = data.get("data", {}).get("total_affected_items", len(rules))
levels = {}
for r in rules:
lvl = str(r.get("level", 0))
levels[lvl] = levels.get(lvl, 0) + 1
return {"total_rules": total, "by_level": levels}
def test_logtest(self, log_line, log_format="syslog"):
"""Test a log line against Wazuh decoders and rules."""
data = self._api("PUT", "/logtest", data={
"log_format": log_format,
"location": "test",
"event": log_line,
})
if not data:
return {"error": "Logtest failed"}
result = data.get("data", {})
return {
"decoder": result.get("decoder", {}).get("name"),
"rule_id": result.get("rule", {}).get("id"),
"rule_description": result.get("rule", {}).get("description"),
"rule_level": result.get("rule", {}).get("level"),
"output": result.get("output"),
}
def check_vulnerability_detection(self):
"""Check if vulnerability detection module is enabled."""
data = self._api("GET", "/manager/configuration",
params={"section": "vulnerability-detector"})
if data:
config = data.get("data", {}).get("affected_items", [])
if config:
enabled = config[0].get("vulnerability-detector", {}).get("enabled", "no")
if enabled != "yes":
self.findings.append({
"severity": "medium", "type": "Vuln Detection Disabled",
"detail": "Vulnerability detection module is not enabled",
})
return {"enabled": enabled == "yes"}
return {"enabled": False}
def generate_report(self):
agents = self.list_agents()
summary = self.get_agent_summary()
alerts = self.query_alerts()
rules = self.get_rules_summary()
vuln_det = self.check_vulnerability_detection()
report = {
"report_date": datetime.utcnow().isoformat(),
"wazuh_url": self.base_url,
"agent_summary": summary,
"agents": agents[:50],
"total_agents": len(agents),
"recent_critical_alerts": alerts,
"rules_summary": rules,
"vulnerability_detection": vuln_det,
"findings": self.findings,
"total_findings": len(self.findings),
}
out = self.output_dir / "wazuh_detection_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2, default=str)
print(json.dumps(report, indent=2, default=str))
return report
def main():
parser = argparse.ArgumentParser(
description="Manage Wazuh endpoint detection and query alerts"
)
parser.add_argument("wazuh_url", help="Wazuh API URL (e.g. https://wazuh:55000)")
parser.add_argument("--username", default="wazuh-wui", help="API username")
parser.add_argument("--password", required=True, help="API password")
parser.add_argument("--output-dir", default="./wazuh_detection")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
agent = WazuhDetectionAgent(args.wazuh_url, args.username, args.password,
output_dir=args.output_dir)
agent.generate_report()
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,39 @@
---
name: performing-ssl-tls-security-assessment
description: Assess SSL/TLS server configurations using the sslyze Python library to evaluate cipher suites, certificate chains, protocol versions, HSTS headers, and known vulnerabilities like Heartbleed and ROBOT.
domain: cybersecurity
subdomain: network-security
tags: [network-security, ssl, tls, sslyze, certificate, cipher-suites, vulnerability-assessment]
version: "1.0"
author: mahipal
license: MIT
---
# Performing SSL/TLS Security Assessment
## Overview
Assess SSL/TLS server configurations using sslyze, a fast Python-based scanning library. This skill covers evaluating supported protocol versions (SSLv2/3, TLS 1.0-1.3), cipher suite strength, certificate chain validation, HSTS enforcement, OCSP stapling, and scanning for known vulnerabilities including Heartbleed, ROBOT, and session renegotiation weaknesses.
## Prerequisites
- Python 3.9+ with `sslyze` library (pip install sslyze)
- Network access to target HTTPS servers on port 443
- Understanding of TLS protocol versions and cipher suite classifications
## Steps
### Step 1: Configure Server Scan
Create ServerScanRequest with ServerNetworkLocation specifying target hostname and port.
### Step 2: Execute TLS Scan
Use sslyze Scanner to queue and execute scans for all TLS check commands concurrently.
### Step 3: Analyze Results
Evaluate accepted cipher suites, certificate validity, protocol versions, and vulnerability scan results.
### Step 4: Generate Security Report
Produce a JSON report with compliance findings and remediation recommendations.
## Expected Output
JSON report with supported protocols, accepted cipher suites, certificate details, vulnerability results (Heartbleed, ROBOT), and HSTS status.
@@ -0,0 +1,62 @@
# API Reference: Performing SSL/TLS Security Assessment
## sslyze Python API
```python
from sslyze import Scanner, ServerScanRequest, ServerNetworkLocation
location = ServerNetworkLocation(hostname="example.com", port=443)
request = ServerScanRequest(server_location=location)
scanner = Scanner()
scanner.queue_scans([request])
for result in scanner.get_results():
scan = result.scan_result
# Access individual scan command results
tls12 = scan.tls_1_2_cipher_suites
cert = scan.certificate_info
heartbleed = scan.heartbleed
```
Install: `pip install sslyze`
## Scan Command Attributes
| Attribute | Check |
|-----------|-------|
| ssl_2_0_cipher_suites | SSLv2 support (must be disabled) |
| ssl_3_0_cipher_suites | SSLv3 support (must be disabled) |
| tls_1_0_cipher_suites | TLS 1.0 (deprecated) |
| tls_1_1_cipher_suites | TLS 1.1 (deprecated) |
| tls_1_2_cipher_suites | TLS 1.2 (current) |
| tls_1_3_cipher_suites | TLS 1.3 (recommended) |
| certificate_info | Certificate chain validation |
| heartbleed | CVE-2014-0160 Heartbleed |
| robot | ROBOT RSA oracle attack |
| openssl_ccs_injection | CVE-2014-0224 |
| session_renegotiation | Client-initiated renego |
## Weak Cipher Suite Keywords
| Keyword | Risk | Description |
|---------|------|-------------|
| RC4 | High | Broken stream cipher |
| DES / 3DES | High | Weak block cipher |
| NULL | Critical | No encryption |
| EXPORT | Critical | Weak export-grade cipher |
| anon | Critical | No authentication |
## sslyze CLI
```bash
sslyze example.com --regular
sslyze example.com --certinfo --tlsv1_2 --heartbleed --robot
sslyze example.com --json_out results.json
```
## References
- sslyze GitHub: https://github.com/nabla-c0d3/sslyze
- sslyze Docs: https://nabla-c0d3.github.io/sslyze/documentation/
- sslyze PyPI: https://pypi.org/project/sslyze/
- Mozilla TLS Config: https://wiki.mozilla.org/Security/Server_Side_TLS
@@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""Agent for performing SSL/TLS security assessment using sslyze.
Scans TLS server configurations to evaluate cipher suites,
protocol versions, certificate chains, HSTS, and known
vulnerabilities like Heartbleed and ROBOT.
"""
import argparse
import json
import os
from datetime import datetime
from pathlib import Path
try:
from sslyze import (
Scanner,
ServerScanRequest,
ServerNetworkLocation,
ScanCommand,
ScanCommandAttemptStatusEnum,
)
except ImportError:
Scanner = None
WEAK_CIPHERS_KEYWORDS = ["RC4", "DES", "3DES", "NULL", "EXPORT", "anon"]
class SSLTLSAssessmentAgent:
"""Assesses SSL/TLS configurations using sslyze."""
def __init__(self, output_dir="./ssl_tls_assessment"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.findings = []
def scan_server(self, hostname, port=443):
"""Run full sslyze scan against a target server."""
if Scanner is None:
return {"error": "sslyze not installed: pip install sslyze"}
location = ServerNetworkLocation(hostname=hostname, port=port)
scan_request = ServerScanRequest(server_location=location)
scanner = Scanner()
scanner.queue_scans([scan_request])
for result in scanner.get_results():
return self._process_result(result, hostname, port)
return {"error": "No scan results returned"}
def _process_result(self, result, hostname, port):
"""Process sslyze ServerScanResult into structured findings."""
report = {"hostname": hostname, "port": port, "protocols": {},
"cipher_suites": {}, "certificate": {}, "vulnerabilities": {}}
protocol_checks = [
("ssl_2_0_cipher_suites", "SSLv2"),
("ssl_3_0_cipher_suites", "SSLv3"),
("tls_1_0_cipher_suites", "TLS 1.0"),
("tls_1_1_cipher_suites", "TLS 1.1"),
("tls_1_2_cipher_suites", "TLS 1.2"),
("tls_1_3_cipher_suites", "TLS 1.3"),
]
scan = result.scan_result
for attr, proto_name in protocol_checks:
attempt = getattr(scan, attr, None)
if attempt and attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
ciphers = attempt.result
accepted = [c.cipher_suite.name for c in ciphers.accepted_cipher_suites]
report["protocols"][proto_name] = len(accepted) > 0
report["cipher_suites"][proto_name] = accepted
if proto_name in ("SSLv2", "SSLv3"):
if accepted:
self.findings.append({
"severity": "critical", "type": "Deprecated Protocol",
"detail": f"{proto_name} enabled with {len(accepted)} cipher suites",
})
elif proto_name in ("TLS 1.0", "TLS 1.1"):
if accepted:
self.findings.append({
"severity": "high", "type": "Legacy Protocol",
"detail": f"{proto_name} still enabled",
})
for cipher_name in accepted:
if any(weak in cipher_name for weak in WEAK_CIPHERS_KEYWORDS):
self.findings.append({
"severity": "high", "type": "Weak Cipher Suite",
"detail": f"{cipher_name} accepted on {proto_name}",
})
cert_attempt = getattr(scan, "certificate_info", None)
if cert_attempt and cert_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
cert_result = cert_attempt.result
for deployment in cert_result.certificate_deployments:
leaf = deployment.received_certificate_chain[0]
report["certificate"] = {
"subject": leaf.subject.rfc4514_string(),
"issuer": leaf.issuer.rfc4514_string(),
"not_before": leaf.not_valid_before_utc.isoformat(),
"not_after": leaf.not_valid_after_utc.isoformat(),
"serial": str(leaf.serial_number),
"signature_algorithm": leaf.signature_hash_algorithm.name
if leaf.signature_hash_algorithm else "unknown",
"chain_valid": deployment.verified_certificate_chain is not None,
"ocsp_stapling": deployment.ocsp_response is not None,
}
if leaf.not_valid_after_utc < datetime.utcnow():
self.findings.append({
"severity": "critical", "type": "Expired Certificate",
"detail": f"Certificate expired on {leaf.not_valid_after_utc}",
})
if leaf.signature_hash_algorithm and leaf.signature_hash_algorithm.name == "sha1":
self.findings.append({
"severity": "high", "type": "SHA-1 Certificate",
"detail": "Certificate uses SHA-1 signature",
})
vuln_checks = [
("heartbleed", "Heartbleed", "is_vulnerable_to_heartbleed"),
("openssl_ccs_injection", "CCS Injection", "is_vulnerable_to_ccs_injection"),
]
for attr, name, field in vuln_checks:
attempt = getattr(scan, attr, None)
if attempt and attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
vulnerable = getattr(attempt.result, field, False)
report["vulnerabilities"][name] = vulnerable
if vulnerable:
self.findings.append({
"severity": "critical", "type": f"{name} Vulnerable",
"detail": f"Server is vulnerable to {name}",
})
robot_attempt = getattr(scan, "robot", None)
if robot_attempt and robot_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
robot_result = robot_attempt.result
is_vuln = "VULNERABLE" in str(robot_result.robot_result)
report["vulnerabilities"]["ROBOT"] = is_vuln
if is_vuln:
self.findings.append({
"severity": "critical", "type": "ROBOT Vulnerable",
"detail": "Server is vulnerable to ROBOT attack",
})
return report
def generate_report(self, targets):
"""Scan multiple targets and generate consolidated report."""
results = []
for target in targets:
parts = target.split(":")
hostname = parts[0]
port = int(parts[1]) if len(parts) > 1 else 443
results.append(self.scan_server(hostname, port))
report = {
"report_date": datetime.utcnow().isoformat(),
"targets_scanned": len(targets),
"scan_results": results,
"findings": self.findings,
"total_findings": len(self.findings),
}
out = self.output_dir / "ssl_tls_assessment_report.json"
with open(out, "w") as f:
json.dump(report, f, indent=2, default=str)
print(json.dumps(report, indent=2, default=str))
return report
def main():
parser = argparse.ArgumentParser(
description="Assess SSL/TLS server configurations using sslyze"
)
parser.add_argument("targets", nargs="+", help="Target host:port (e.g. example.com:443)")
parser.add_argument("--output-dir", default="./ssl_tls_assessment")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
agent = SSLTLSAssessmentAgent(output_dir=args.output_dir)
agent.generate_report(args.targets)
if __name__ == "__main__":
main()