Add 5 new cybersecurity skills: greenbone vuln mgmt, email compromise detection, MISP sharing, CobaltStrike C2 analysis, registry run key hunting

This commit is contained in:
mukul975
2026-03-11 00:41:05 +01:00
parent 757f1c8eae
commit 546f1ae6ef
20 changed files with 1804 additions and 0 deletions
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: analyzing-cobaltstrike-malleable-c2-profiles
description: Parse and analyze Cobalt Strike Malleable C2 profiles using dissect.cobaltstrike and pyMalleableC2 to extract C2 indicators, detect evasion techniques, and generate network detection signatures.
domain: cybersecurity
subdomain: malware-analysis
tags: [cobalt-strike, malleable-c2, c2-detection, beacon-analysis, network-signatures, threat-hunting, red-team-tools]
version: "1.0"
author: mahipal
license: Apache-2.0
---
# Analyzing CobaltStrike Malleable C2 Profiles
## Overview
Cobalt Strike Malleable C2 profiles are domain-specific language scripts that customize how Beacon communicates with the team server, defining HTTP request/response transformations, sleep intervals, jitter values, user agents, URI paths, and process injection behavior. Threat actors use malleable profiles to disguise C2 traffic as legitimate services (Amazon, Google, Slack). Analyzing these profiles reveals network indicators for detection: URI patterns, HTTP headers, POST/GET transforms, DNS settings, and process injection techniques. The `dissect.cobaltstrike` library can parse both profile files and extract configurations from beacon payloads, while `pyMalleableC2` provides AST-based parsing using Lark grammar for programmatic profile manipulation and validation.
## Prerequisites
- Python 3.9+ with `dissect.cobaltstrike` and/or `pyMalleableC2`
- Sample Malleable C2 profiles (available from public repositories)
- Understanding of HTTP protocol and Cobalt Strike beacon communication model
- Network monitoring tools (Suricata/Snort) for signature deployment
- PCAP analysis tools for traffic validation
## Steps
1. Install libraries: `pip install dissect.cobaltstrike` or `pip install pyMalleableC2`
2. Parse profile with `C2Profile.from_path("profile.profile")`
3. Extract HTTP GET/POST block configurations (URIs, headers, parameters)
4. Identify user agent strings and spoof targets
5. Extract sleep time, jitter percentage, and DNS beacon settings
6. Analyze process injection settings (spawn-to, allocation technique)
7. Generate Suricata/Snort signatures from extracted network indicators
8. Compare profile against known threat actor profile collections
9. Extract staging URIs and payload delivery mechanisms
10. Produce detection report with IOCs and recommended network signatures
## Expected Output
A JSON report containing extracted C2 URIs, HTTP headers, user agents, sleep/jitter settings, process injection config, spawned process paths, DNS settings, and generated Suricata-compatible detection rules.
@@ -0,0 +1,95 @@
# CobaltStrike Malleable C2 Profile Analysis API Reference
## Installation
```bash
pip install dissect.cobaltstrike
pip install 'dissect.cobaltstrike[full]' # With PCAP support
pip install pyMalleableC2 # Alternative parser
```
## dissect.cobaltstrike API
### Parse Beacon Configuration
```python
from dissect.cobaltstrike.beacon import BeaconConfig
bconfig = BeaconConfig.from_path("beacon.bin")
print(hex(bconfig.watermark)) # 0x5109bf6d
print(bconfig.protocol) # https
print(bconfig.version) # BeaconVersion(...)
print(bconfig.settings) # Full config dict
```
### Parse Malleable C2 Profile
```python
from dissect.cobaltstrike.c2profile import C2Profile
profile = C2Profile.from_path("amazon.profile")
config = profile.as_dict()
print(config["useragent"])
print(config["http-get.uri"])
print(config["sleeptime"])
```
### PCAP Analysis
```bash
# Extract beacons from PCAP
beacon-pcap --extract-beacons traffic.pcap
# Decrypt traffic with private key
beacon-pcap -p team_server.pem traffic.pcap --beacon beacon.bin
```
## pyMalleableC2 API
```python
from malleableC2 import Profile
profile = Profile.from_file("amazon.profile")
print(profile.sleeptime)
print(profile.useragent)
print(profile.http_get.uri)
print(profile.http_post.uri)
```
## Key Profile Settings
| Setting | Description | Detection Value |
|---------|-------------|-----------------|
| `sleeptime` | Callback interval (ms) | Low values = aggressive beaconing |
| `jitter` | Sleep randomization % | Timing analysis evasion |
| `useragent` | HTTP User-Agent string | Network signature |
| `http-get.uri` | GET request URI path | URI-based detection |
| `http-post.uri` | POST request URI path | URI-based detection |
| `spawnto_x86` | 32-bit spawn process | Process creation detection |
| `spawnto_x64` | 64-bit spawn process | Process creation detection |
| `pipename` | Named pipe pattern | Named pipe monitoring |
| `dns_idle` | DNS idle IP address | DNS beacon detection |
| `watermark` | License watermark | Operator attribution |
## Suricata Rule Format
```
alert http $HOME_NET any -> $EXTERNAL_NET any (
msg:"MALWARE CobaltStrike C2 URI";
flow:established,to_server;
http.uri; content:"/api/v1/status";
http.header; content:"User-Agent: Mozilla/5.0";
sid:9000001; rev:1;
)
```
## CLI Usage
```bash
python agent.py --input profile.profile --output report.json
python agent.py --input parsed_config.json --output report.json
```
## References
- dissect.cobaltstrike: https://github.com/fox-it/dissect.cobaltstrike
- pyMalleableC2: https://github.com/byt3bl33d3r/pyMalleableC2
- Unit42 Analysis: https://unit42.paloaltonetworks.com/cobalt-strike-malleable-c2-profile/
- Config Extractor: https://github.com/strozfriedberg/cobaltstrike-config-extractor
@@ -0,0 +1,235 @@
#!/usr/bin/env python3
"""CobaltStrike Malleable C2 Profile Analyzer - parses profiles to extract C2 indicators, detection signatures, and evasion techniques"""
# For authorized security research and defensive analysis only
import argparse
import json
import re
import sys
from collections import Counter
from datetime import datetime
from pathlib import Path
try:
from dissect.cobaltstrike.c2profile import C2Profile
HAS_DISSECT = True
except ImportError:
HAS_DISSECT = False
RUN_KEY_SUSPICIOUS = ["powershell", "cmd.exe", "mshta", "rundll32", "regsvr32", "wscript", "cscript"]
KNOWN_SPOOF_TARGETS = {
"amazon": "Amazon CDN impersonation",
"google": "Google services impersonation",
"microsoft": "Microsoft services impersonation",
"slack": "Slack API impersonation",
"cloudfront": "CloudFront CDN impersonation",
"jquery": "jQuery CDN impersonation",
"outlook": "Outlook Web impersonation",
"onedrive": "OneDrive impersonation",
}
def load_data(path):
return json.loads(Path(path).read_text(encoding="utf-8"))
def parse_profile_with_dissect(profile_path):
"""Parse a .profile file using dissect.cobaltstrike C2Profile."""
if not HAS_DISSECT:
return None
profile = C2Profile.from_path(profile_path)
return profile.as_dict()
def parse_profile_regex(content):
"""Regex-based parser for malleable C2 profile when dissect is unavailable."""
config = {}
set_pattern = re.compile(r'set\s+(\w+)\s+"([^"]*)"', re.MULTILINE)
for match in set_pattern.finditer(content):
config[match.group(1)] = match.group(2)
block_pattern = re.compile(r'(http-get|http-post|http-stager|https-certificate|dns-beacon|process-inject|post-ex)\s*\{', re.MULTILINE)
for match in block_pattern.finditer(content):
config.setdefault("blocks", []).append(match.group(1))
uri_pattern = re.compile(r'set\s+uri\s+"([^"]*)"', re.MULTILINE)
for match in uri_pattern.finditer(content):
config.setdefault("uris", []).append(match.group(1))
header_pattern = re.compile(r'header\s+"([^"]+)"\s+"([^"]*)"', re.MULTILINE)
for match in header_pattern.finditer(content):
config.setdefault("headers", []).append({"name": match.group(1), "value": match.group(2)})
spawn_pattern = re.compile(r'set\s+spawnto_x(?:86|64)\s+"([^"]*)"', re.MULTILINE)
for match in spawn_pattern.finditer(content):
config.setdefault("spawn_to", []).append(match.group(1))
return config
def analyze_profile(config):
"""Analyze parsed profile configuration for detection opportunities."""
findings = []
ua = config.get("useragent", config.get("user_agent", ""))
if ua:
findings.append({
"type": "user_agent_identified",
"severity": "info",
"resource": "http-config",
"detail": f"User-Agent: {ua[:100]}",
"indicator": ua,
})
for target, desc in KNOWN_SPOOF_TARGETS.items():
if target.lower() in ua.lower():
findings.append({
"type": "service_impersonation",
"severity": "medium",
"resource": "user-agent",
"detail": f"{desc} detected in User-Agent string",
})
sleeptime = config.get("sleeptime", config.get("sleep_time", ""))
jitter = config.get("jitter", "")
if sleeptime:
try:
sleep_ms = int(sleeptime)
if sleep_ms < 1000:
findings.append({
"type": "aggressive_beaconing",
"severity": "high",
"resource": "beacon-config",
"detail": f"Very low sleep time: {sleep_ms}ms - aggressive C2 callback rate",
})
except ValueError:
pass
uris = config.get("uris", [])
for uri in uris:
findings.append({
"type": "c2_uri",
"severity": "high",
"resource": "http-config",
"detail": f"C2 URI path: {uri}",
"indicator": uri,
})
headers = config.get("headers", [])
for h in headers:
name = h.get("name", "") if isinstance(h, dict) else str(h)
value = h.get("value", "") if isinstance(h, dict) else ""
if name.lower() in ("host", "cookie", "authorization"):
findings.append({
"type": "c2_header",
"severity": "medium",
"resource": "http-config",
"detail": f"Custom header: {name}: {value[:60]}",
})
spawn_to = config.get("spawn_to", config.get("spawnto_x86", []))
if isinstance(spawn_to, str):
spawn_to = [spawn_to]
for proc in spawn_to:
findings.append({
"type": "spawn_to_process",
"severity": "high",
"resource": "process-inject",
"detail": f"Beacon spawns to: {proc}",
"indicator": proc,
})
pipename = config.get("pipename", config.get("pipename_stager", ""))
if pipename:
findings.append({
"type": "named_pipe",
"severity": "high",
"resource": "process-inject",
"detail": f"Named pipe: {pipename}",
"indicator": pipename,
})
dns_idle = config.get("dns_idle", "")
if dns_idle:
findings.append({
"type": "dns_beacon_config",
"severity": "medium",
"resource": "dns-beacon",
"detail": f"DNS idle IP: {dns_idle}",
})
watermark = config.get("watermark", "")
if watermark:
findings.append({
"type": "watermark",
"severity": "info",
"resource": "beacon-config",
"detail": f"Beacon watermark: {watermark}",
})
return findings
def generate_suricata_rules(findings, sid_start=9000001):
"""Generate Suricata rules from extracted indicators."""
rules = []
sid = sid_start
for f in findings:
if f["type"] == "c2_uri" and f.get("indicator"):
uri = f["indicator"].replace('"', '\\"')
rules.append(
f'alert http $HOME_NET any -> $EXTERNAL_NET any '
f'(msg:"MALWARE CobaltStrike Malleable C2 URI {uri}"; '
f'flow:established,to_server; '
f'http.uri; content:"{uri}"; '
f'sid:{sid}; rev:1;)'
)
sid += 1
elif f["type"] == "named_pipe" and f.get("indicator"):
pipe = f["indicator"]
rules.append(
f'# Named pipe detection requires endpoint monitoring: {pipe}'
)
return rules
def analyze(data):
if isinstance(data, str):
config = parse_profile_regex(data)
elif isinstance(data, dict):
config = data
else:
config = data[0] if isinstance(data, list) and data else {}
return analyze_profile(config)
def generate_report(input_path):
path = Path(input_path)
if path.suffix in (".profile", ".txt"):
content = path.read_text(encoding="utf-8")
config = parse_profile_regex(content)
findings = analyze_profile(config)
else:
data = load_data(input_path)
if isinstance(data, list):
findings = []
for profile in data:
findings.extend(analyze_profile(profile))
else:
findings = analyze_profile(data)
sev = Counter(f["severity"] for f in findings)
iocs = [f.get("indicator", "") for f in findings if f.get("indicator")]
rules = generate_suricata_rules(findings)
return {
"report": "cobaltstrike_malleable_c2_analysis",
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_findings": len(findings),
"severity_summary": dict(sev),
"extracted_iocs": iocs,
"suricata_rules": rules,
"findings": findings,
}
def main():
ap = argparse.ArgumentParser(description="CobaltStrike Malleable C2 Profile Analyzer")
ap.add_argument("--input", required=True, help="Input .profile file or JSON with parsed config")
ap.add_argument("--output", help="Output JSON report path")
args = ap.parse_args()
report = generate_report(args.input)
out = json.dumps(report, indent=2)
if args.output:
Path(args.output).write_text(out, encoding="utf-8")
print(f"Report written to {args.output}")
else:
print(out)
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,39 @@
---
name: detecting-email-account-compromise
description: Detect compromised O365 and Google Workspace email accounts by analyzing inbox rule creation, suspicious sign-in locations, mail forwarding rules, and unusual API access patterns via Microsoft Graph and audit logs.
domain: cybersecurity
subdomain: incident-response
tags: [email-compromise, office365, microsoft-graph, bec, inbox-rules, sign-in-analysis, account-takeover]
version: "1.0"
author: mahipal
license: Apache-2.0
---
# Detecting Email Account Compromise
## Overview
Email account compromise (EAC) is a prevalent attack vector where adversaries gain unauthorized access to mailboxes to exfiltrate sensitive data, conduct business email compromise (BEC), or establish persistence through inbox rule manipulation. Attackers commonly create forwarding rules to siphon emails, delete rules to hide evidence, or use OAuth tokens for persistent access. Detection relies on analyzing Microsoft 365 Unified Audit Logs, Azure AD sign-in logs for impossible travel or suspicious locations, inbox rule creation events (Set-InboxRule, New-InboxRule), and Microsoft Graph API access patterns. Key indicators include forwarding rules to external addresses, rules that delete or move messages matching keywords like "invoice" or "payment", and sign-ins from unusual user agents such as python-requests.
## Prerequisites
- Microsoft 365 with Unified Audit Logging enabled
- Azure AD P1/P2 for risk detection APIs
- Python 3.9+ with `requests`, `msal` libraries
- Microsoft Graph API application registration with Mail.Read, AuditLog.Read.All permissions
- Understanding of OAuth2 client credential flows
## Steps
1. Export audit logs or connect to Microsoft Graph API using MSAL authentication
2. Query inbox rules for all monitored mailboxes via `/users/{id}/mailFolders/inbox/messageRules`
3. Analyze rules for external forwarding (ForwardTo, RedirectTo external addresses)
4. Detect suspicious rule patterns: deletion rules, keyword-matching rules targeting financial terms
5. Query sign-in logs via `/auditLogs/signIns` for unusual locations and impossible travel
6. Check for suspicious user agent strings (python-requests, PowerShell, curl)
7. Identify OAuth application consent grants for suspicious third-party apps
8. Correlate findings across users to detect campaign-level compromise
9. Generate compromise indicators report with severity scores
## Expected Output
A JSON report listing compromised or suspicious accounts, malicious inbox rules detected, impossible travel events, suspicious OAuth grants, and recommended containment actions with severity ratings.
@@ -0,0 +1,81 @@
# Email Account Compromise Detection API Reference
## Microsoft Graph API Endpoints
### List Inbox Rules
```http
GET https://graph.microsoft.com/v1.0/users/{userId}/mailFolders/inbox/messageRules
Authorization: Bearer {token}
```
### Get Sign-In Logs
```http
GET https://graph.microsoft.com/v1.0/auditLogs/signIns
?$filter=createdDateTime ge {startDate}
&$top=100
Authorization: Bearer {token}
```
### Risk Detections (Azure AD P2)
```http
GET https://graph.microsoft.com/v1.0/identityProtection/riskDetections
?$filter=riskLevel eq 'high'
Authorization: Bearer {token}
```
### OAuth2 Permission Grants
```http
GET https://graph.microsoft.com/v1.0/oauth2PermissionGrants
Authorization: Bearer {token}
```
## Authentication with MSAL
```python
from msal import ConfidentialClientApplication
app = ConfidentialClientApplication(
client_id="<app-id>",
client_credential="<secret>",
authority="https://login.microsoftonline.com/<tenant-id>"
)
token = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
headers = {"Authorization": f"Bearer {token['access_token']}"}
```
## Inbox Rule Compromise Indicators
| Indicator | Field | Description |
|-----------|-------|-------------|
| External forwarding | `actions.forwardTo` | Rule forwards to external domain |
| External redirect | `actions.redirectTo` | Rule redirects to external address |
| Auto-delete | `actions.delete` | Rule auto-deletes matching messages |
| Financial keywords | `conditions.subjectContains` | Targets "invoice", "payment", "wire" |
## Sign-In Risk Indicators
| Signal | Detection Method |
|--------|-----------------|
| Impossible travel | Haversine distance / time > 900 km/h |
| Suspicious UA | python-requests, curl, PowerShell in userAgent |
| Unfamiliar location | New country/region for user |
| Token replay | Same token from different IPs |
## CLI Usage
```bash
python agent.py --input audit_data.json --output report.json
```
## Required API Permissions
- `Mail.Read` - Read inbox rules
- `AuditLog.Read.All` - Read sign-in and audit logs
- `IdentityRiskEvent.Read.All` - Read risk detections
- `Directory.Read.All` - Read OAuth permission grants
## References
- Microsoft Graph API: https://learn.microsoft.com/en-us/graph/api/overview
- Identity Protection APIs: https://learn.microsoft.com/en-us/graph/api/resources/identityprotection-overview
- MSAL Python: https://github.com/AzureAD/microsoft-authentication-library-for-python
@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""Email Account Compromise Detection agent - analyzes inbox rules, sign-in logs, and OAuth grants to detect O365/Google Workspace account compromise"""
import argparse
import json
import math
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
SUSPICIOUS_UA_PATTERNS = [
"python-requests", "python-urllib", "curl", "wget", "powershell",
"go-http-client", "httpie", "postman", "insomnia",
]
FINANCIAL_KEYWORDS = [
"invoice", "payment", "wire", "transfer", "bank", "account",
"payroll", "salary", "remittance", "ach", "swift",
]
def load_data(path):
return json.loads(Path(path).read_text(encoding="utf-8"))
def haversine_km(lat1, lon1, lat2, lon2):
R = 6371.0
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def analyze_inbox_rules(rules):
"""Detect malicious inbox rules: external forwarding, deletion rules, keyword-based filters."""
findings = []
for rule in rules:
user = rule.get("user", rule.get("mailbox", ""))
rule_name = rule.get("displayName", rule.get("name", ""))
actions = rule.get("actions", {})
forward_to = actions.get("forwardTo", []) or actions.get("forward_to", [])
redirect_to = actions.get("redirectTo", []) or actions.get("redirect_to", [])
delete_msg = actions.get("delete", False) or actions.get("moveToDeletedItems", False)
move_to = actions.get("moveToFolder", "") or ""
conditions = rule.get("conditions", {})
subject_contains = conditions.get("subjectContains", []) or []
body_contains = conditions.get("bodyContains", []) or []
for dest in forward_to + redirect_to:
addr = dest.get("emailAddress", {}).get("address", dest) if isinstance(dest, dict) else str(dest)
domain = addr.split("@")[-1] if "@" in str(addr) else ""
user_domain = user.split("@")[-1] if "@" in user else ""
if domain and domain != user_domain:
findings.append({
"type": "external_forwarding_rule",
"severity": "critical",
"resource": user,
"detail": f"Rule '{rule_name}' forwards to external address: {addr}",
})
if delete_msg:
findings.append({
"type": "deletion_rule",
"severity": "high",
"resource": user,
"detail": f"Rule '{rule_name}' auto-deletes messages",
})
keyword_hits = [kw for kw in FINANCIAL_KEYWORDS
if any(kw in s.lower() for s in subject_contains + body_contains)]
if keyword_hits:
findings.append({
"type": "financial_keyword_filter",
"severity": "high",
"resource": user,
"detail": f"Rule '{rule_name}' targets financial keywords: {', '.join(keyword_hits)}",
})
return findings
def analyze_sign_ins(sign_ins):
"""Detect impossible travel, suspicious user agents, and risky sign-in patterns."""
findings = []
user_logins = defaultdict(list)
for si in sign_ins:
user = si.get("userPrincipalName", si.get("user", ""))
ua = si.get("userAgent", si.get("user_agent", ""))
ip = si.get("ipAddress", si.get("ip", ""))
ts = si.get("createdDateTime", si.get("timestamp", ""))
lat = si.get("location", {}).get("geoCoordinates", {}).get("latitude", 0)
lon = si.get("location", {}).get("geoCoordinates", {}).get("longitude", 0)
country = si.get("location", {}).get("countryOrRegion", "")
risk = si.get("riskLevelAggregated", si.get("risk_level", "none"))
for pattern in SUSPICIOUS_UA_PATTERNS:
if pattern.lower() in (ua or "").lower():
findings.append({
"type": "suspicious_user_agent",
"severity": "high",
"resource": user,
"detail": f"Sign-in from suspicious UA '{ua[:60]}' at IP {ip}",
})
break
if risk in ("high", "medium"):
findings.append({
"type": "risky_sign_in",
"severity": "high" if risk == "high" else "medium",
"resource": user,
"detail": f"Azure AD risk level '{risk}' from {country or ip}",
})
if lat and lon and ts:
user_logins[user].append({"ts": ts, "lat": lat, "lon": lon, "ip": ip})
for user, logins in user_logins.items():
try:
logins.sort(key=lambda x: x["ts"])
except TypeError:
continue
for i in range(1, len(logins)):
try:
t1 = datetime.fromisoformat(logins[i - 1]["ts"].replace("Z", "+00:00"))
t2 = datetime.fromisoformat(logins[i]["ts"].replace("Z", "+00:00"))
hours = abs((t2 - t1).total_seconds()) / 3600.0
if hours < 0.01:
continue
dist = haversine_km(logins[i - 1]["lat"], logins[i - 1]["lon"], logins[i]["lat"], logins[i]["lon"])
speed = dist / hours
if speed > 900:
findings.append({
"type": "impossible_travel",
"severity": "critical",
"resource": user,
"detail": f"Impossible travel: {dist:.0f} km in {hours:.1f}h ({speed:.0f} km/h) between IPs {logins[i-1]['ip']} and {logins[i]['ip']}",
})
except (ValueError, TypeError):
continue
return findings
def analyze_oauth_grants(grants):
"""Detect suspicious OAuth application consent grants."""
findings = []
for grant in grants:
user = grant.get("user", grant.get("principalDisplayName", ""))
app = grant.get("appDisplayName", grant.get("app_name", ""))
scopes = grant.get("scope", grant.get("scopes", ""))
consent_type = grant.get("consentType", "")
risky_scopes = ["Mail.ReadWrite", "Mail.Send", "MailboxSettings.ReadWrite", "Files.ReadWrite.All"]
granted_risky = [s for s in risky_scopes if s.lower() in (scopes or "").lower()]
if granted_risky:
findings.append({
"type": "risky_oauth_grant",
"severity": "high",
"resource": user,
"detail": f"App '{app}' granted risky scopes: {', '.join(granted_risky)}",
})
if consent_type == "AllPrincipals":
findings.append({
"type": "admin_consent_grant",
"severity": "critical",
"resource": user,
"detail": f"App '{app}' has admin consent (AllPrincipals) with scopes: {scopes[:80]}",
})
return findings
def analyze(data):
findings = []
if isinstance(data, list):
findings.extend(analyze_inbox_rules(data))
return findings
findings.extend(analyze_inbox_rules(data.get("inbox_rules", data.get("rules", []))))
findings.extend(analyze_sign_ins(data.get("sign_ins", data.get("logins", []))))
findings.extend(analyze_oauth_grants(data.get("oauth_grants", data.get("app_consents", []))))
return findings
def generate_report(input_path):
data = load_data(input_path)
findings = analyze(data)
sev = Counter(f["severity"] for f in findings)
compromised = set(f["resource"] for f in findings if f["severity"] in ("critical", "high"))
return {
"report": "email_account_compromise_detection",
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_findings": len(findings),
"severity_summary": dict(sev),
"potentially_compromised_accounts": sorted(compromised),
"findings": findings,
}
def main():
ap = argparse.ArgumentParser(description="Email Account Compromise Detection Agent")
ap.add_argument("--input", required=True, help="Input JSON with inbox rules/sign-in/OAuth data")
ap.add_argument("--output", help="Output JSON report path")
args = ap.parse_args()
report = generate_report(args.input)
out = json.dumps(report, indent=2)
if args.output:
Path(args.output).write_text(out, encoding="utf-8")
print(f"Report written to {args.output}")
else:
print(out)
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: hunting-for-registry-run-key-persistence
description: Detect MITRE ATT&CK T1547.001 registry Run key persistence by analyzing Sysmon Event ID 13 logs and registry queries to identify malicious auto-start entries.
domain: cybersecurity
subdomain: threat-hunting
tags: [persistence, registry-run-keys, t1547-001, sysmon, threat-hunting, windows-forensics, mitre-attack]
version: "1.0"
author: mahipal
license: Apache-2.0
---
# Hunting for Registry Run Key Persistence
## Overview
Registry Run keys (T1547.001) are one of the most commonly used persistence mechanisms by adversaries. When a program is added to a Run key in the Windows registry, it executes automatically when a user logs in. Attackers abuse keys under `HKLM\Software\Microsoft\Windows\CurrentVersion\Run`, `HKCU\Software\Microsoft\Windows\CurrentVersion\Run`, and their RunOnce counterparts to maintain persistence. Sysmon Event ID 13 (RegistryEvent - Value Set) captures registry value modifications including the target object path, the process that made the change, and the new value. Detection involves monitoring these events for suspicious executables in temp directories, encoded PowerShell commands, LOLBin paths, and processes that do not normally create Run key entries. Chaining Event 13 with Event 1 (Process Creation) and Event 11 (FileCreate) strengthens detection by confirming payload creation and execution.
## Prerequisites
- Windows systems with Sysmon installed and configured to log Event ID 13
- Sysmon config with RegistryEvent rules for Run/RunOnce keys
- Python 3.9+ with `json`, `xml.etree.ElementTree`, `re` modules
- SIEM or log aggregator collecting Sysmon logs (Splunk, Elastic, Sentinel)
- Knowledge of legitimate auto-start programs for baseline comparison
## Steps
1. Collect Sysmon Event ID 13 logs filtered for Run/RunOnce key paths
2. Parse event XML/JSON for TargetObject, Details (value written), Image (modifying process)
3. Flag entries where the value points to temp directories, AppData, or ProgramData
4. Detect encoded PowerShell commands or script interpreters in registry values
5. Identify LOLBin abuse (mshta.exe, rundll32.exe, regsvr32.exe, wscript.exe)
6. Compare against known-good baseline of legitimate auto-start entries
7. Check if the modifying process (Image) is unusual (cmd.exe, powershell.exe, python.exe)
8. Chain with Event ID 1 to verify if the registered binary was recently created
9. Generate detection report with MITRE ATT&CK mapping and severity scores
10. Produce Sigma/Splunk detection rules from findings
## Expected Output
A JSON report listing suspicious Run key entries with the registry path, value written, modifying process, timestamp, MITRE technique mapping, severity rating, and recommended Sigma detection rules.
@@ -0,0 +1,103 @@
# Registry Run Key Persistence (T1547.001) Detection Reference
## Sysmon Event ID 13 - RegistryEvent (Value Set)
### Event Fields
| Field | Description |
|-------|-------------|
| `UtcTime` | Timestamp of registry modification |
| `ProcessGuid` | Unique process identifier |
| `Image` | Full path of modifying process |
| `TargetObject` | Full registry key path + value name |
| `Details` | New registry value data |
| `User` | User account context |
### Registry Paths to Monitor
```
HKLM\Software\Microsoft\Windows\CurrentVersion\Run
HKLM\Software\Microsoft\Windows\CurrentVersion\RunOnce
HKCU\Software\Microsoft\Windows\CurrentVersion\Run
HKCU\Software\Microsoft\Windows\CurrentVersion\RunOnce
HKLM\Software\Microsoft\Windows\CurrentVersion\RunServices
HKLM\Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders
HKCU\Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders
```
## Sysmon Configuration
```xml
<Sysmon schemaversion="4.90">
<EventFiltering>
<RuleGroup name="RegistryPersistence" groupRelation="or">
<RegistryEvent onmatch="include">
<TargetObject condition="contains">CurrentVersion\Run</TargetObject>
<TargetObject condition="contains">CurrentVersion\RunOnce</TargetObject>
<TargetObject condition="contains">Explorer\Shell Folders</TargetObject>
</RegistryEvent>
</RuleGroup>
</EventFiltering>
</Sysmon>
```
## Splunk Detection Query
```spl
index=sysmon EventCode=13
| where match(TargetObject, "(?i)CurrentVersion\\\\Run")
| eval suspicious=if(match(Details, "(?i)(temp|appdata|downloads|programdata)"), "yes", "no")
| where suspicious="yes"
| stats count by Image, TargetObject, Details, User, host
```
## Sigma Rule
```yaml
title: Suspicious Run Key Persistence
status: stable
logsource:
product: windows
category: registry_set
detection:
selection:
EventType: SetValue
TargetObject|contains: '\CurrentVersion\Run'
filter_legitimate:
Details|contains:
- 'SecurityHealth'
- 'WindowsDefender'
condition: selection and not filter_legitimate
level: medium
tags:
- attack.persistence
- attack.t1547.001
```
## Windows Registry Query (reg.exe)
```cmd
reg query HKLM\Software\Microsoft\Windows\CurrentVersion\Run
reg query HKCU\Software\Microsoft\Windows\CurrentVersion\Run
reg query HKLM\Software\Microsoft\Windows\CurrentVersion\RunOnce
reg query HKCU\Software\Microsoft\Windows\CurrentVersion\RunOnce
```
## CLI Usage
```bash
python agent.py --input sysmon_events.json --output report.json
python agent.py --input registry_snapshot.json --output report.json
```
## MITRE ATT&CK Reference
- **Technique**: T1547.001 - Boot or Logon Autostart Execution: Registry Run Keys
- **Tactic**: Persistence
- **Platforms**: Windows
- **Detection**: Sysmon Event 13, Windows Security Event 4657
## References
- Sysmon Event 13: https://www.ultimatewindowssecurity.com/securitylog/encyclopedia/event.aspx?eventid=90013
- Splunk Detection: https://research.splunk.com/endpoint/f5f6af30-7aa7-4295-bfe9-07fe87c01a4b/
- Nextron T1547.001: https://www.nextron-systems.com/2025/07/29/detecting-the-most-popular-mitre-persistence-method-registry-run-keys-startup-folder/
@@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""Registry Run Key Persistence Hunter - detects T1547.001 persistence via Sysmon Event 13 analysis and registry run key auditing"""
import argparse
import json
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
RUN_KEY_PATHS = [
r"HKLM\Software\Microsoft\Windows\CurrentVersion\Run",
r"HKLM\Software\Microsoft\Windows\CurrentVersion\RunOnce",
r"HKCU\Software\Microsoft\Windows\CurrentVersion\Run",
r"HKCU\Software\Microsoft\Windows\CurrentVersion\RunOnce",
r"HKLM\Software\Microsoft\Windows\CurrentVersion\RunServices",
r"HKLM\Software\Microsoft\Windows\CurrentVersion\RunServicesOnce",
r"HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders",
r"HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders",
r"HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders",
r"HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders",
]
SUSPICIOUS_PATHS = [
r"\\temp\\", r"\\tmp\\", r"\\appdata\\local\\temp",
r"\\downloads\\", r"\\programdata\\", r"\\public\\",
r"\\users\\public", r"\\recycler\\", r"\\perflogs\\",
]
LOLBINS = [
"mshta.exe", "rundll32.exe", "regsvr32.exe", "wscript.exe", "cscript.exe",
"certutil.exe", "bitsadmin.exe", "msiexec.exe", "forfiles.exe",
"pcalua.exe", "bash.exe", "scriptrunner.exe",
]
SUSPICIOUS_MODIFIERS = [
"cmd.exe", "powershell.exe", "pwsh.exe", "python.exe", "python3.exe",
"wmic.exe", "reg.exe", "mshta.exe", "cscript.exe", "wscript.exe",
]
KNOWN_GOOD_VALUES = [
"SecurityHealth", "WindowsDefender", "iTunesHelper", "VMware",
"RealTimeProtection", "OneDrive", "Teams", "Spotify",
]
def load_data(path):
return json.loads(Path(path).read_text(encoding="utf-8"))
def is_run_key(target_object):
"""Check if registry path matches Run/RunOnce keys."""
target_lower = target_object.lower().replace("/", "\\")
for rk in RUN_KEY_PATHS:
if rk.lower() in target_lower:
return True
return False
def analyze_sysmon_events(events):
"""Analyze Sysmon Event ID 13 (RegistryEvent Value Set) for persistence."""
findings = []
for evt in events:
event_id = evt.get("EventID", evt.get("event_id", evt.get("EventCode", 0)))
if str(event_id) != "13":
continue
event_data = evt.get("EventData", evt)
target_obj = event_data.get("TargetObject", event_data.get("target_object", ""))
details = event_data.get("Details", event_data.get("details", event_data.get("value", "")))
image = event_data.get("Image", event_data.get("image", event_data.get("process", "")))
user = event_data.get("User", event_data.get("user", ""))
timestamp = evt.get("TimeCreated", evt.get("timestamp", evt.get("UtcTime", "")))
if not is_run_key(target_obj):
continue
severity = "medium"
indicators = []
details_lower = (details or "").lower()
for susp_path in SUSPICIOUS_PATHS:
if susp_path.lower() in details_lower:
severity = "high"
indicators.append(f"suspicious_path:{susp_path.strip(chr(92))}")
break
for lolbin in LOLBINS:
if lolbin.lower() in details_lower:
severity = "high"
indicators.append(f"lolbin:{lolbin}")
break
if "powershell" in details_lower and ("-enc" in details_lower or "-e " in details_lower or "encodedcommand" in details_lower):
severity = "critical"
indicators.append("encoded_powershell")
if re.search(r"(FromBase64|IEX|Invoke-Expression|DownloadString|Net\.WebClient)", details or "", re.IGNORECASE):
severity = "critical"
indicators.append("malicious_powershell_pattern")
image_name = (image or "").split("\\")[-1].lower()
for susp_mod in SUSPICIOUS_MODIFIERS:
if susp_mod.lower() == image_name:
severity = max(severity, "high", key=lambda x: {"low": 0, "medium": 1, "high": 2, "critical": 3}[x])
indicators.append(f"suspicious_modifier:{susp_mod}")
break
value_name = target_obj.split("\\")[-1] if "\\" in target_obj else target_obj
is_known_good = any(kg.lower() in value_name.lower() for kg in KNOWN_GOOD_VALUES)
if is_known_good:
severity = "low"
indicators.append("known_good_match")
findings.append({
"type": "registry_run_key_persistence",
"severity": severity,
"resource": target_obj,
"detail": f"Run key set by {image_name or 'unknown'}: {(details or '')[:120]}",
"mitre_technique": "T1547.001",
"mitre_tactic": "Persistence",
"modifying_process": image,
"registry_value": details,
"user": user,
"timestamp": timestamp,
"indicators": indicators,
})
return findings
def analyze_registry_snapshot(entries):
"""Analyze a static registry snapshot for suspicious Run key values."""
findings = []
for entry in entries:
key_path = entry.get("key", entry.get("path", ""))
value_name = entry.get("name", entry.get("value_name", ""))
value_data = entry.get("data", entry.get("value_data", entry.get("value", "")))
if not is_run_key(key_path) and not any(rk.lower().split("\\")[-1] in key_path.lower() for rk in RUN_KEY_PATHS[:4]):
continue
severity = "medium"
indicators = []
data_lower = (value_data or "").lower()
for susp_path in SUSPICIOUS_PATHS:
if susp_path.lower() in data_lower:
severity = "high"
indicators.append(f"suspicious_path")
break
for lolbin in LOLBINS:
if lolbin.lower() in data_lower:
severity = "high"
indicators.append(f"lolbin:{lolbin}")
break
if not Path(value_data.strip('"').split(" ")[0]).suffix.lower() in (".exe", ".dll", ".bat", ".cmd", ".ps1", ".vbs", ".js", ".hta", ".scr", ".com", ""):
pass
is_known = any(kg.lower() in (value_name or "").lower() for kg in KNOWN_GOOD_VALUES)
if is_known:
severity = "low"
findings.append({
"type": "run_key_entry",
"severity": severity,
"resource": f"{key_path}\\{value_name}",
"detail": f"Value: {(value_data or '')[:120]}",
"mitre_technique": "T1547.001",
"indicators": indicators,
})
return findings
def generate_sigma_rule(finding):
"""Generate a Sigma detection rule from a finding."""
details = finding.get("registry_value", finding.get("detail", ""))
return {
"title": f"Suspicious Run Key Persistence - {finding['resource'].split(chr(92))[-1]}",
"status": "experimental",
"logsource": {"product": "windows", "category": "registry_set"},
"detection": {
"selection": {
"EventType": "SetValue",
"TargetObject|contains": "CurrentVersion\\Run",
"Details|contains": details[:60] if details else "",
},
"condition": "selection",
},
"level": finding["severity"],
"tags": ["attack.persistence", "attack.t1547.001"],
}
def analyze(data):
findings = []
if isinstance(data, list):
has_event_id = any("EventID" in e or "event_id" in e or "EventCode" in e for e in data)
if has_event_id:
findings.extend(analyze_sysmon_events(data))
else:
findings.extend(analyze_registry_snapshot(data))
elif isinstance(data, dict):
sysmon = data.get("sysmon_events", data.get("events", []))
registry = data.get("registry_entries", data.get("snapshot", []))
findings.extend(analyze_sysmon_events(sysmon))
findings.extend(analyze_registry_snapshot(registry))
return findings
def generate_report(input_path):
data = load_data(input_path)
findings = analyze(data)
sev = Counter(f["severity"] for f in findings)
critical_high = [f for f in findings if f["severity"] in ("critical", "high")]
sigma_rules = [generate_sigma_rule(f) for f in critical_high[:5]]
return {
"report": "registry_run_key_persistence_hunt",
"generated_at": datetime.utcnow().isoformat() + "Z",
"mitre_technique": "T1547.001",
"total_findings": len(findings),
"severity_summary": dict(sev),
"high_priority_count": len(critical_high),
"sigma_rules": sigma_rules,
"findings": findings,
}
def main():
ap = argparse.ArgumentParser(description="Registry Run Key Persistence Hunter")
ap.add_argument("--input", required=True, help="Input JSON with Sysmon events or registry snapshot")
ap.add_argument("--output", help="Output JSON report path")
args = ap.parse_args()
report = generate_report(args.input)
out = json.dumps(report, indent=2)
if args.output:
Path(args.output).write_text(out, encoding="utf-8")
print(f"Report written to {args.output}")
else:
print(out)
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: implementing-vulnerability-management-with-greenbone
description: Deploy and operate Greenbone/OpenVAS vulnerability management using the python-gvm library to create scan targets, execute vulnerability scans, and parse scan reports via GMP protocol.
domain: cybersecurity
subdomain: vulnerability-management
tags: [openvas, greenbone, vulnerability-scanning, gmp, python-gvm, vulnerability-management, compliance]
version: "1.0"
author: mahipal
license: Apache-2.0
---
# Implementing Vulnerability Management with Greenbone
## Overview
Greenbone Vulnerability Management (GVM) is the open-source framework behind OpenVAS, providing comprehensive vulnerability scanning with over 100,000 Network Vulnerability Tests (NVTs). The python-gvm library provides a Python API to interact with GVM through the Greenbone Management Protocol (GMP), enabling programmatic creation of scan targets, task management, scan execution, and report retrieval. This skill covers connecting to GVM via Unix socket or TLS, authenticating, creating scan configs and targets, launching scans, and parsing XML-based vulnerability reports to produce actionable findings.
## Prerequisites
- Greenbone Community Edition or Greenbone Enterprise Appliance installed
- Python 3.9+ with `python-gvm` (`pip install python-gvm`)
- GMP access credentials (username/password)
- Network connectivity to GVM daemon (Unix socket or TCP/TLS)
- Understanding of CVSS scoring and vulnerability classification
## Steps
1. Install python-gvm: `pip install python-gvm`
2. Establish a GMP connection via `UnixSocketConnection` or `TLSConnection`
3. Authenticate with `gmp.authenticate(username, password)`
4. Create a target with `gmp.create_target(name, hosts=[...], port_list_id=...)`
5. Create a scan task with `gmp.create_task(name, config_id, target_id, scanner_id)`
6. Start the scan with `gmp.start_task(task_id)`
7. Monitor scan progress with `gmp.get_task(task_id)`
8. Retrieve results with `gmp.get_report(report_id, report_format_id=...)`
9. Parse the XML report for vulnerabilities, CVSS scores, and affected hosts
10. Generate a JSON summary report with severity distribution and remediation priorities
## Expected Output
A JSON report containing total vulnerabilities found, severity breakdown (critical/high/medium/low), per-host findings with CVE references and CVSS scores, and scan metadata including duration and NVT feed version.
@@ -0,0 +1,81 @@
# Greenbone/OpenVAS python-gvm API Reference
## Installation
```bash
pip install python-gvm
```
## Connection Setup
```python
from gvm.connections import UnixSocketConnection, TLSConnection
from gvm.protocols.gmp import Gmp
from gvm.transforms import EtreeTransform
# Unix socket (local)
conn = UnixSocketConnection(path="/run/gvmd/gvmd.sock")
# TLS (remote)
conn = TLSConnection(hostname="gvm-host", port=9390)
transform = EtreeTransform()
with Gmp(connection=conn, transform=transform) as gmp:
gmp.authenticate("admin", "password")
version = gmp.get_version()
```
## Core GMP API Methods
| Method | Description |
|--------|-------------|
| `gmp.authenticate(username, password)` | Authenticate to GVM |
| `gmp.get_version()` | Get GMP protocol version |
| `gmp.create_target(name, hosts, port_list_id)` | Create scan target |
| `gmp.create_task(name, config_id, target_id, scanner_id)` | Create scan task |
| `gmp.start_task(task_id)` | Start a scan task |
| `gmp.get_task(task_id)` | Get task status/progress |
| `gmp.get_tasks()` | List all scan tasks |
| `gmp.get_report(report_id, report_format_id)` | Retrieve scan report |
| `gmp.get_reports()` | List all reports |
| `gmp.get_results(filter_string)` | Get vulnerability results |
## Default Resource IDs
| Resource | ID | Description |
|----------|-----|-------------|
| Port List (All IANA TCP) | `33d0cd82-57c6-11e1-8ed1-406186ea4fc5` | All IANA assigned TCP ports |
| Scan Config (Full and fast) | `daba56c8-73ec-11df-a475-002264764cea` | Full and fast scan |
| Scanner (OpenVAS Default) | `08b69003-5fc2-4037-a479-93b440211c73` | Default OpenVAS scanner |
| Report Format (XML) | `a994b278-1f62-11e1-96ac-406186ea4fc5` | XML report format |
## Report XML Structure
```xml
<report>
<results>
<result>
<host>192.168.1.10</host>
<name>SSL/TLS Certificate Expired</name>
<threat>High</threat>
<severity>7.5</severity>
<nvt oid="1.3.6.1.4.1.25623.1.0.103955">
<cve>CVE-2024-12345</cve>
</nvt>
<description>The SSL certificate has expired...</description>
</result>
</results>
</report>
```
## CLI Usage
```bash
python agent.py --input scan_results.json --output report.json
python agent.py --input scan_results.json --host gvm-server --username admin --password secret
```
## References
- python-gvm GitHub: https://github.com/greenbone/python-gvm
- GMP Protocol Docs: https://greenbone.github.io/docs/latest/api.html
- Greenbone Community Docs: https://greenbone.github.io/docs/latest/
@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""Greenbone/OpenVAS Vulnerability Management agent - creates scan targets, executes scans, and parses reports via python-gvm GMP protocol"""
import argparse
import json
import sys
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
try:
from gvm.connections import UnixSocketConnection, TLSConnection
from gvm.protocols.gmp import Gmp
from gvm.transforms import EtreeTransform
from lxml import etree
HAS_GVM = True
except ImportError:
HAS_GVM = False
def load_data(path):
return json.loads(Path(path).read_text(encoding="utf-8"))
def connect_gvm(host, port=9390, socket_path="/run/gvmd/gvmd.sock", use_tls=False, username="admin", password="admin"):
"""Connect to GVM daemon and authenticate via GMP."""
if not HAS_GVM:
return None, "python-gvm not installed"
transform = EtreeTransform()
if use_tls:
conn = TLSConnection(hostname=host, port=port)
else:
conn = UnixSocketConnection(path=socket_path)
gmp = Gmp(connection=conn, transform=transform)
gmp.authenticate(username, password)
version_resp = gmp.get_version()
version = version_resp.xpath("version/text()")[0] if version_resp.xpath("version/text()") else "unknown"
return gmp, version
def create_target(gmp, name, hosts, port_list_id="33d0cd82-57c6-11e1-8ed1-406186ea4fc5"):
"""Create a scan target. Default port list is 'All IANA assigned TCP'."""
resp = gmp.create_target(name=name, hosts=hosts, port_list_id=port_list_id)
target_id = resp.get("id", "")
return target_id
def create_and_start_task(gmp, task_name, target_id, config_id="daba56c8-73ec-11df-a475-002264764cea", scanner_id="08b69003-5fc2-4037-a479-93b440211c73"):
"""Create scan task and start it. Default config is 'Full and fast', default scanner is 'OpenVAS Default'."""
resp = gmp.create_task(name=task_name, config_id=config_id, target_id=target_id, scanner_id=scanner_id)
task_id = resp.get("id", "")
gmp.start_task(task_id)
return task_id
def parse_report_xml(report_xml):
"""Parse GMP report XML into structured findings."""
findings = []
results = report_xml.findall(".//result") if report_xml is not None else []
for result in results:
host_el = result.find("host")
host = host_el.text if host_el is not None else ""
name_el = result.find("name")
name = name_el.text if name_el is not None else ""
threat_el = result.find("threat")
threat = threat_el.text if threat_el is not None else "Log"
severity_el = result.find("severity")
cvss = float(severity_el.text) if severity_el is not None and severity_el.text else 0.0
nvt = result.find("nvt")
oid = nvt.get("oid", "") if nvt is not None else ""
cve_el = nvt.find("cve") if nvt is not None else None
cve = cve_el.text if cve_el is not None and cve_el.text != "NOCVE" else ""
desc_el = result.find("description")
desc = (desc_el.text or "")[:200] if desc_el is not None else ""
sev_label = "critical" if cvss >= 9.0 else "high" if cvss >= 7.0 else "medium" if cvss >= 4.0 else "low"
findings.append({
"host": host,
"vulnerability": name,
"severity": sev_label,
"cvss": cvss,
"cve": cve,
"nvt_oid": oid,
"description": desc,
})
return findings
def analyze_offline_report(data):
"""Analyze pre-exported GVM report data (JSON format)."""
findings = []
results = data.get("results", data.get("vulnerabilities", []))
if isinstance(data, list):
results = data
for r in results:
cvss = r.get("cvss", r.get("severity_score", 0.0))
if isinstance(cvss, str):
try:
cvss = float(cvss)
except ValueError:
cvss = 0.0
sev_label = "critical" if cvss >= 9.0 else "high" if cvss >= 7.0 else "medium" if cvss >= 4.0 else "low"
findings.append({
"host": r.get("host", r.get("ip", "")),
"vulnerability": r.get("name", r.get("vulnerability", "")),
"severity": sev_label,
"cvss": cvss,
"cve": r.get("cve", r.get("cves", "")),
"nvt_oid": r.get("nvt_oid", r.get("oid", "")),
"description": (r.get("description", r.get("summary", "")) or "")[:200],
})
return findings
def generate_report(input_path):
data = load_data(input_path)
findings = analyze_offline_report(data)
sev = Counter(f["severity"] for f in findings)
host_vulns = defaultdict(int)
for f in findings:
host_vulns[f["host"]] += 1
cve_list = [f["cve"] for f in findings if f["cve"]]
findings.sort(key=lambda x: x["cvss"], reverse=True)
return {
"report": "greenbone_vulnerability_management",
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_vulnerabilities": len(findings),
"severity_summary": dict(sev),
"hosts_scanned": len(host_vulns),
"host_vulnerability_counts": dict(host_vulns),
"unique_cves": len(set(cve_list)),
"top_10_findings": findings[:10],
"findings": findings,
}
def main():
ap = argparse.ArgumentParser(description="Greenbone Vulnerability Management Agent")
ap.add_argument("--input", required=True, help="Input JSON with GVM scan results")
ap.add_argument("--output", help="Output JSON report path")
ap.add_argument("--host", help="GVM host for live scan (requires python-gvm)")
ap.add_argument("--username", default="admin", help="GMP username")
ap.add_argument("--password", default="admin", help="GMP password")
args = ap.parse_args()
report = generate_report(args.input)
out = json.dumps(report, indent=2)
if args.output:
Path(args.output).write_text(out, encoding="utf-8")
print(f"Report written to {args.output}")
else:
print(out)
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,40 @@
---
name: performing-threat-intelligence-sharing-with-misp
description: Use PyMISP to create, enrich, and share threat intelligence events on a MISP platform, including IOC management, feed integration, STIX export, and community sharing workflows.
domain: cybersecurity
subdomain: threat-intelligence
tags: [misp, pymisp, threat-intelligence, ioc-sharing, stix, taxii, threat-feeds, information-sharing]
version: "1.0"
author: mahipal
license: Apache-2.0
---
# Performing Threat Intelligence Sharing with MISP
## Overview
MISP (Malware Information Sharing Platform) is an open-source threat intelligence platform designed for collecting, storing, distributing, and sharing cybersecurity indicators and threat information. PyMISP is the official Python library for interacting with MISP instances via the REST API, enabling programmatic event creation, attribute management, tag assignment, galaxy cluster attachment, and feed synchronization. This skill covers using PyMISP to create events with structured IOCs (IP addresses, domains, file hashes, URLs), enrich events with MITRE ATT&CK tags, manage sharing groups and distribution levels, search for existing intelligence, and export in STIX 2.1 format for interoperability with other platforms.
## Prerequisites
- MISP instance (v2.4+) with API access enabled
- Python 3.9+ with `pymisp` (`pip install pymisp`)
- MISP API key (Settings > Auth Keys)
- Understanding of MISP data model (Events, Attributes, Objects, Tags, Galaxies)
- Knowledge of TLP marking and sharing protocols
## Steps
1. Install PyMISP: `pip install pymisp`
2. Initialize `ExpandedPyMISP(url, key, ssl=True)` connection
3. Create a `MISPEvent` with info, distribution level, threat level, and analysis status
4. Add attributes via `event.add_attribute(type, value)` for IPs, domains, hashes
5. Apply TLP tags and MITRE ATT&CK technique tags
6. Publish the event with `misp.publish(event)`
7. Search existing events with `misp.search(controller='events', value=..., type_attribute=...)`
8. Enable and configure threat feeds for automatic IOC ingestion
9. Export events in STIX 2.1 format for cross-platform sharing
10. Validate sharing group configuration and sync server settings
## Expected Output
A JSON report summarizing events created, attributes added, tags applied, feed sync status, and any correlation hits against existing intelligence, with event IDs and distribution metadata.
@@ -0,0 +1,97 @@
# MISP / PyMISP API Reference
## Installation
```bash
pip install pymisp
```
## Connection Setup
```python
from pymisp import PyMISP, MISPEvent, MISPAttribute
misp = PyMISP(
url="https://misp.example.com",
key="YOUR_API_KEY",
ssl=True
)
```
## Core PyMISP Methods
| Method | Description |
|--------|-------------|
| `misp.add_event(event)` | Create new event |
| `misp.update_event(event)` | Update existing event |
| `misp.publish(event)` | Publish event for sharing |
| `misp.delete_event(event_id)` | Delete an event |
| `misp.search(controller, value, type_attribute)` | Search events/attributes |
| `misp.get_event(event_id)` | Retrieve single event |
| `misp.add_tag(event, tag)` | Add tag to event |
| `misp.search_index(published=True)` | Search event index |
## Creating Events
```python
event = MISPEvent()
event.info = "APT Campaign - Phishing IOCs"
event.distribution = 1 # 0=Org, 1=Community, 2=Connected, 3=All
event.threat_level_id = 2 # 1=High, 2=Medium, 3=Low, 4=Undefined
event.analysis = 0 # 0=Initial, 1=Ongoing, 2=Complete
event.add_attribute("ip-dst", "203.0.113.50", to_ids=True, comment="C2 server")
event.add_attribute("domain", "evil.example.com", to_ids=True)
event.add_attribute("sha256", "a1b2c3d4...", category="Payload delivery")
event.add_tag("tlp:amber")
event.add_tag("mitre-attack-pattern:T1566 - Phishing")
result = misp.add_event(event)
```
## Searching Intelligence
```python
# Search by attribute value
results = misp.search(controller="attributes", value="203.0.113.50", type_attribute="ip-dst")
# Search events by date range
results = misp.search(controller="events", date_from="2025-01-01", date_to="2025-12-31")
# Search with tags
results = misp.search(controller="events", tags=["tlp:white", "ransomware"])
```
## MISP Attribute Types
| Type | Example | Category |
|------|---------|----------|
| `ip-dst` | `203.0.113.50` | Network activity |
| `domain` | `evil.example.com` | Network activity |
| `url` | `https://evil.com/payload` | Network activity |
| `sha256` | `a1b2c3...` | Payload delivery |
| `md5` | `d41d8c...` | Payload delivery |
| `email-src` | `attacker@evil.com` | Payload delivery |
| `filename` | `malware.exe` | Payload delivery |
| `regkey` | `HKLM\...\Run\evil` | Persistence mechanism |
## Distribution Levels
- `0` - Your organisation only
- `1` - This community only
- `2` - Connected communities
- `3` - All communities
- `4` - Sharing group
## CLI Usage
```bash
python agent.py --input events.json --output report.json
python agent.py --input events.json --misp-url https://misp.example.com --api-key KEY
```
## References
- PyMISP Docs: https://pymisp.readthedocs.io/
- PyMISP GitHub: https://github.com/MISP/PyMISP
- MISP REST API: https://www.circl.lu/doc/misp/automation/
@@ -0,0 +1,219 @@
#!/usr/bin/env python3
"""MISP Threat Intelligence Sharing agent - creates events, manages attributes, searches IOCs, and validates sharing configuration via PyMISP"""
import argparse
import json
import sys
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
try:
from pymisp import PyMISP, MISPEvent, MISPAttribute, MISPTag
HAS_PYMISP = True
except ImportError:
HAS_PYMISP = False
MISP_ATTRIBUTE_TYPES = {
"ip-dst", "ip-src", "domain", "hostname", "url", "md5", "sha1",
"sha256", "filename", "email-src", "email-dst", "mutex", "regkey",
"user-agent", "vulnerability", "link", "text", "comment",
}
TLP_TAGS = {
"white": "tlp:white",
"green": "tlp:green",
"amber": "tlp:amber",
"amber+strict": "tlp:amber+strict",
"red": "tlp:red",
}
def load_data(path):
return json.loads(Path(path).read_text(encoding="utf-8"))
def connect_misp(url, api_key, ssl=True):
"""Initialize PyMISP connection."""
if not HAS_PYMISP:
return None, "pymisp not installed (pip install pymisp)"
misp = PyMISP(url, api_key, ssl=ssl)
return misp, "connected"
def create_event_from_data(misp, event_data):
"""Create a MISP event with attributes and tags."""
event = MISPEvent()
event.info = event_data.get("info", "Untitled Event")
event.distribution = event_data.get("distribution", 1) # 0=org, 1=community, 2=connected, 3=all
event.threat_level_id = event_data.get("threat_level", 2) # 1=high, 2=medium, 3=low, 4=undefined
event.analysis = event_data.get("analysis", 0) # 0=initial, 1=ongoing, 2=complete
for attr in event_data.get("attributes", []):
attr_type = attr.get("type", "text")
value = attr.get("value", "")
category = attr.get("category", "")
to_ids = attr.get("to_ids", True)
comment = attr.get("comment", "")
if attr_type in MISP_ATTRIBUTE_TYPES and value:
event.add_attribute(type=attr_type, value=value, category=category,
to_ids=to_ids, comment=comment)
for tag_name in event_data.get("tags", []):
event.add_tag(tag_name)
tlp = event_data.get("tlp", "").lower()
if tlp in TLP_TAGS:
event.add_tag(TLP_TAGS[tlp])
if misp:
result = misp.add_event(event)
return result
return event.to_dict()
def validate_event_quality(event_data):
"""Validate event data quality for sharing readiness."""
findings = []
eid = event_data.get("id", event_data.get("info", "unknown"))
if not event_data.get("info"):
findings.append({
"type": "missing_event_info",
"severity": "high",
"resource": str(eid),
"detail": "Event lacks descriptive info/title",
})
attrs = event_data.get("attributes", event_data.get("Attribute", []))
if not attrs:
findings.append({
"type": "no_attributes",
"severity": "high",
"resource": str(eid),
"detail": "Event has no IOC attributes",
})
attr_types = Counter(a.get("type", "unknown") for a in attrs)
if len(attr_types) == 1 and len(attrs) > 1:
findings.append({
"type": "single_attribute_type",
"severity": "low",
"resource": str(eid),
"detail": f"All {len(attrs)} attributes are type '{list(attr_types.keys())[0]}' - consider enriching",
})
tags = event_data.get("tags", event_data.get("Tag", []))
tag_names = [t.get("name", t) if isinstance(t, dict) else t for t in tags]
has_tlp = any("tlp:" in t.lower() for t in tag_names)
if not has_tlp:
findings.append({
"type": "missing_tlp_tag",
"severity": "high",
"resource": str(eid),
"detail": "Event lacks TLP classification tag",
})
has_mitre = any("mitre-attack" in t.lower() or "attack-pattern" in t.lower() for t in tag_names)
if not has_mitre and len(attrs) > 0:
findings.append({
"type": "missing_mitre_mapping",
"severity": "medium",
"resource": str(eid),
"detail": "Event lacks MITRE ATT&CK technique mapping",
})
dist = event_data.get("distribution", -1)
if dist == 3:
findings.append({
"type": "unrestricted_distribution",
"severity": "medium",
"resource": str(eid),
"detail": "Event set to 'All communities' distribution - verify this is intentional",
})
for attr in attrs:
val = attr.get("value", "")
atype = attr.get("type", "")
if atype in ("ip-dst", "ip-src") and val in ("127.0.0.1", "0.0.0.0", "10.0.0.1", "192.168.1.1"):
findings.append({
"type": "private_ip_ioc",
"severity": "high",
"resource": str(eid),
"detail": f"Private/localhost IP '{val}' used as IOC - will generate false positives",
})
if atype in ("md5", "sha1", "sha256") and len(val) < 32:
findings.append({
"type": "invalid_hash_length",
"severity": "high",
"resource": str(eid),
"detail": f"Hash attribute '{val}' is too short for type {atype}",
})
return findings
def validate_sharing_config(config):
"""Validate MISP sharing and feed configuration."""
findings = []
servers = config.get("sync_servers", [])
if not servers:
findings.append({
"type": "no_sync_servers",
"severity": "medium",
"resource": "misp_config",
"detail": "No synchronization servers configured for intelligence sharing",
})
for srv in servers:
if not srv.get("pull", False) and not srv.get("push", False):
findings.append({
"type": "inactive_sync_server",
"severity": "medium",
"resource": srv.get("name", srv.get("url", "")),
"detail": "Sync server has neither pull nor push enabled",
})
feeds = config.get("feeds", [])
enabled_feeds = [f for f in feeds if f.get("enabled", False)]
if not enabled_feeds:
findings.append({
"type": "no_active_feeds",
"severity": "medium",
"resource": "misp_config",
"detail": "No active threat intelligence feeds configured",
})
return findings
def analyze(data):
findings = []
events = data.get("events", [data] if "info" in data or "Attribute" in data else [])
if isinstance(data, list):
events = data
for evt in events:
findings.extend(validate_event_quality(evt))
if "sync_servers" in data or "feeds" in data:
findings.extend(validate_sharing_config(data))
return findings
def generate_report(input_path):
data = load_data(input_path)
findings = analyze(data)
sev = Counter(f["severity"] for f in findings)
cats = Counter(f["type"] for f in findings)
return {
"report": "misp_threat_intelligence_sharing",
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_findings": len(findings),
"severity_summary": dict(sev),
"finding_categories": dict(cats),
"findings": findings,
}
def main():
ap = argparse.ArgumentParser(description="MISP Threat Intelligence Sharing Agent")
ap.add_argument("--input", required=True, help="Input JSON with MISP events or config")
ap.add_argument("--output", help="Output JSON report path")
ap.add_argument("--misp-url", help="MISP instance URL for live operations")
ap.add_argument("--api-key", help="MISP API key")
args = ap.parse_args()
report = generate_report(args.input)
out = json.dumps(report, indent=2)
if args.output:
Path(args.output).write_text(out, encoding="utf-8")
print(f"Report written to {args.output}")
else:
print(out)
if __name__ == "__main__":
main()