Expand 38 agent.py stubs, standardize 347 SKILL.md sections, fix 4 verify=False

This commit is contained in:
mukul975
2026-03-19 13:55:45 +01:00
parent 79287253fb
commit 051e7e72ed
11 changed files with 1931 additions and 386 deletions
@@ -25,6 +25,13 @@ Use this skill when:
**Do not use** for Sysmon configuration (separate skill) or Linux audit logging.
## Prerequisites
- Windows Server or Windows 10/11 systems with Group Policy management access
- Active Directory environment with Group Policy Object (GPO) creation privileges
- SIEM platform configured to receive Windows Event Log forwarding
- Understanding of Windows security event IDs and audit categories
## Workflow
### Step 1: Configure Advanced Audit Policy via GPO
@@ -19,6 +19,13 @@ license: Apache-2.0
Use this skill when hardening endpoints against memory-based exploits by configuring DEP, ASLR, CFG, and Windows Exploit Protection system-wide and per-application mitigations.
## Prerequisites
- Windows 10/11 or Windows Server 2016+ with administrative privileges
- Group Policy management access for enterprise-wide deployment
- Understanding of memory corruption attack techniques (buffer overflow, ROP chains)
- Test environment for validating application compatibility with exploit mitigations
## Workflow
### Step 1: Configure System-Level Mitigations
@@ -1,61 +1,252 @@
#!/usr/bin/env python3
"""CyberArk PAM configuration audit."""
import argparse, json
"""CyberArk PAM configuration audit agent.
Audits CyberArk Privileged Access Management via the REST API to
verify safe configurations, privileged account inventory, platform
assignments, and password rotation compliance.
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone
try:
import requests
except ImportError:
requests = None
print("[!] 'requests' required: pip install requests", file=sys.stderr)
sys.exit(1)
def audit_config(target, token):
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
if not data.get("enabled", True):
findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"})
elif resp.status_code == 401:
findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"})
except requests.RequestException as e:
findings.append({"error": str(e)})
return findings
def check_compliance(target, token):
def get_cyberark_config():
"""Return CyberArk PVWA URL."""
pvwa_url = os.environ.get("CYBERARK_PVWA_URL", "").rstrip("/")
if not pvwa_url:
print("[!] Set CYBERARK_PVWA_URL env var", file=sys.stderr)
sys.exit(1)
return pvwa_url
def authenticate(pvwa_url, username, password, auth_type="CyberArk"):
"""Authenticate and get session token."""
url = f"{pvwa_url}/PasswordVault/API/Auth/{auth_type}/Logon"
resp = requests.post(url, json={"username": username, "password": password},
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", timeout=30) # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
resp.raise_for_status()
token = resp.json().strip('"')
print(f"[+] Authenticated as {username}")
return token
def api_call(pvwa_url, token, endpoint, method="GET", data=None, params=None):
"""Make authenticated API call."""
url = f"{pvwa_url}/PasswordVault/API{endpoint}"
headers = {"Authorization": token, "Content-Type": "application/json"}
if method == "POST":
resp = requests.post(url, headers=headers, json=data, params=params,
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", timeout=30) # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
else:
resp = requests.get(url, headers=headers, params=params,
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", timeout=30) # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
resp.raise_for_status()
return resp.json()
def audit_safes(pvwa_url, token):
"""Audit safe configurations."""
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"}
print("[*] Auditing safes...")
data = api_call(pvwa_url, token, "/Safes", params={"limit": 1000})
safes = data.get("value", data.get("Safes", []))
for safe in safes:
name = safe.get("safeName", safe.get("SafeName", ""))
member_count = safe.get("numberOfMembers", safe.get("NumberOfMembers", 0))
days_retention = safe.get("numberOfDaysRetention", safe.get("NumberOfDaysRetention", 0))
versions = safe.get("numberOfVersionsRetention", safe.get("NumberOfVersionsRetention", 0))
if member_count == 0:
findings.append({
"safe": name, "check": "Empty safe (no members)",
"severity": "MEDIUM", "detail": "Safe has no members assigned",
})
if days_retention == 0 and versions == 0:
findings.append({
"safe": name, "check": "No retention policy",
"severity": "HIGH",
"detail": "No password history retention configured",
})
print(f"[+] Audited {len(safes)} safes")
return findings, safes
def audit_accounts(pvwa_url, token, safe_name=None):
"""Audit privileged accounts."""
findings = []
print("[*] Auditing privileged accounts...")
params = {"limit": 1000}
if safe_name:
params["filter"] = f"safeName eq {safe_name}"
data = api_call(pvwa_url, token, "/Accounts", params=params)
accounts = data.get("value", [])
now = datetime.now(timezone.utc)
for acct in accounts:
acct_name = acct.get("name", "")
platform = acct.get("platformId", "")
safe = acct.get("safeName", "")
secret_mgmt = acct.get("secretManagement", {})
last_modified = secret_mgmt.get("lastModifiedTime", 0)
auto_mgmt = secret_mgmt.get("automaticManagementEnabled", False)
status = secret_mgmt.get("status", "")
if not auto_mgmt:
findings.append({
"account": acct_name, "safe": safe, "platform": platform,
"check": "Automatic password management disabled",
"severity": "HIGH",
"detail": "Password not managed by CyberArk CPM",
})
if last_modified:
last_mod_dt = datetime.fromtimestamp(last_modified, tz=timezone.utc)
age_days = (now - last_mod_dt).days
if age_days > 90:
findings.append({
"account": acct_name, "safe": safe,
"check": "Password age exceeds 90 days",
"severity": "HIGH",
"detail": f"Last rotated {age_days} days ago",
})
if status and status != "success":
findings.append({
"account": acct_name, "safe": safe,
"check": f"CPM status: {status}",
"severity": "CRITICAL" if "fail" in status.lower() else "MEDIUM",
"detail": f"Password management status: {status}",
})
print(f"[+] Audited {len(accounts)} accounts")
return findings, accounts
def audit_platforms(pvwa_url, token):
"""Audit platform configurations."""
findings = []
print("[*] Auditing platforms...")
data = api_call(pvwa_url, token, "/Platforms")
platforms = data.get("Platforms", data.get("value", []))
for plat in platforms:
name = plat.get("Name", plat.get("PlatformID", ""))
active = plat.get("Active", True)
if not active:
continue
priv_session = plat.get("PrivilegedSessionManagement", {})
if not priv_session.get("PSMServerId"):
findings.append({
"platform": name,
"check": "No PSM configured",
"severity": "MEDIUM",
"detail": "Platform missing privileged session management",
})
print(f"[+] Audited {len(platforms)} platforms")
return findings, platforms
def logoff(pvwa_url, token):
"""End the CyberArk session."""
try:
resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10)
if resp.status_code == 200:
for item in resp.json().get("checks", []):
if item.get("status") != "PASS":
findings.append({"check": item.get("name"), "status": item.get("status"),
"severity": item.get("severity", "MEDIUM")})
requests.post(f"{pvwa_url}/PasswordVault/API/Auth/Logoff",
headers={"Authorization": token},
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", timeout=10) # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
print("[+] Session ended")
except requests.RequestException:
pass
return findings
def format_summary(safe_findings, acct_findings, plat_findings, safes, accounts):
"""Print audit summary."""
all_findings = safe_findings + acct_findings + plat_findings
print(f"\n{'='*60}")
print(f" CyberArk PAM Audit Report")
print(f"{'='*60}")
print(f" Safes : {len(safes)}")
print(f" Accounts : {len(accounts)}")
print(f" Findings : {len(all_findings)}")
severity_counts = {}
for f in all_findings:
sev = f.get("severity", "INFO")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
print(f"\n By Severity:")
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
count = severity_counts.get(sev, 0)
if count:
print(f" {sev:10s}: {count}")
if all_findings:
print(f"\n Top Issues:")
for f in all_findings[:15]:
if f["severity"] in ("CRITICAL", "HIGH"):
print(f" [{f['severity']:8s}] {f['check']}: "
f"{f.get('account', f.get('safe', f.get('platform', '')))}")
return severity_counts
def main():
p = argparse.ArgumentParser(description="CyberArk PAM configuration audit")
p.add_argument("--target", required=True, help="Target URL")
p.add_argument("--token", required=True, help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] CyberArk PAM configuration audit")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
report["findings"].extend(audit_config(a.target, a.token))
report["findings"].extend(check_compliance(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
else:
parser = argparse.ArgumentParser(description="CyberArk PAM configuration audit agent")
parser.add_argument("--pvwa-url", help="PVWA URL (or CYBERARK_PVWA_URL env)")
parser.add_argument("--username", required=True, help="CyberArk username")
parser.add_argument("--password", required=True, help="CyberArk password")
parser.add_argument("--auth-type", default="CyberArk",
choices=["CyberArk", "LDAP", "RADIUS", "Windows"])
parser.add_argument("--safe", help="Audit specific safe only")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
if args.pvwa_url:
os.environ["CYBERARK_PVWA_URL"] = args.pvwa_url
pvwa_url = get_cyberark_config()
token = authenticate(pvwa_url, args.username, args.password, args.auth_type)
try:
safe_findings, safes = audit_safes(pvwa_url, token)
acct_findings, accounts = audit_accounts(pvwa_url, token, args.safe)
plat_findings, platforms = audit_platforms(pvwa_url, token)
finally:
logoff(pvwa_url, token)
severity_counts = format_summary(safe_findings, acct_findings, plat_findings, safes, accounts)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "CyberArk PAM Audit",
"safes_count": len(safes),
"accounts_count": len(accounts),
"findings": safe_findings + acct_findings + plat_findings,
"severity_counts": severity_counts,
"risk_level": (
"CRITICAL" if severity_counts.get("CRITICAL", 0) > 0
else "HIGH" if severity_counts.get("HIGH", 0) > 0
else "MEDIUM" if severity_counts.get("MEDIUM", 0) > 0
else "LOW"
),
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
@@ -1,61 +1,210 @@
#!/usr/bin/env python3
"""Proofpoint email security gateway audit."""
import argparse, json
from datetime import datetime, timezone
"""Proofpoint email security gateway audit agent.
Audits Proofpoint TAP (Targeted Attack Protection) via the SIEM API
to retrieve blocked threats, clicked URLs, delivered messages, and
campaign attribution data for email security monitoring.
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone, timedelta
try:
import requests
from requests.auth import HTTPBasicAuth
except ImportError:
requests = None
print("[!] 'requests' required: pip install requests", file=sys.stderr)
sys.exit(1)
def audit_config(target, token):
PROOFPOINT_BASE = "https://tap-api-v2.proofpoint.com"
def get_pp_config():
"""Return Proofpoint TAP API credentials."""
principal = os.environ.get("PROOFPOINT_PRINCIPAL", "")
secret = os.environ.get("PROOFPOINT_SECRET", "")
if not principal or not secret:
print("[!] Set PROOFPOINT_PRINCIPAL and PROOFPOINT_SECRET env vars", file=sys.stderr)
sys.exit(1)
return principal, secret
def pp_api(endpoint, principal, secret, params=None):
"""Make authenticated Proofpoint TAP API call."""
url = f"{PROOFPOINT_BASE}{endpoint}"
resp = requests.get(url, auth=HTTPBasicAuth(principal, secret),
params=params, timeout=30)
resp.raise_for_status()
return resp.json()
def get_blocked_clicks(principal, secret, hours=24):
"""Get URLs that were blocked when clicked."""
print(f"[*] Fetching blocked clicks (last {hours}h)...")
since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ")
data = pp_api("/v2/siem/clicks/blocked", principal, secret,
params={"sinceTime": since, "format": "json"})
clicks = data.get("clicksBlocked", [])
print(f"[+] {len(clicks)} blocked clicks")
return clicks
def get_blocked_messages(principal, secret, hours=24):
"""Get messages that were blocked."""
print(f"[*] Fetching blocked messages (last {hours}h)...")
since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ")
data = pp_api("/v2/siem/messages/blocked", principal, secret,
params={"sinceTime": since, "format": "json"})
messages = data.get("messagesBlocked", [])
print(f"[+] {len(messages)} blocked messages")
return messages
def get_delivered_threats(principal, secret, hours=24):
"""Get threats that were delivered (missed by filters)."""
print(f"[*] Fetching delivered threats (last {hours}h)...")
since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ")
data = pp_api("/v2/siem/messages/delivered", principal, secret,
params={"sinceTime": since, "format": "json"})
messages = data.get("messagesDelivered", [])
threats = [m for m in messages if m.get("threatsInfoMap")]
print(f"[+] {len(messages)} delivered, {len(threats)} with threats")
return threats
def get_permitted_clicks(principal, secret, hours=24):
"""Get URLs that were permitted when clicked (potential misses)."""
print(f"[*] Fetching permitted clicks (last {hours}h)...")
since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ")
data = pp_api("/v2/siem/clicks/permitted", principal, secret,
params={"sinceTime": since, "format": "json"})
clicks = data.get("clicksPermitted", [])
print(f"[+] {len(clicks)} permitted clicks")
return clicks
def analyze_threats(blocked_msgs, delivered_threats, blocked_clicks, permitted_clicks):
"""Analyze threat data for security insights."""
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
if not data.get("enabled", True):
findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"})
elif resp.status_code == 401:
findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"})
except requests.RequestException as e:
findings.append({"error": str(e)})
# Delivered threats are highest priority
for msg in delivered_threats:
for threat_info in msg.get("threatsInfoMap", []):
findings.append({
"type": "delivered_threat",
"severity": "CRITICAL",
"threat_type": threat_info.get("threatType", ""),
"classification": threat_info.get("classification", ""),
"threat_url": threat_info.get("threat", "")[:100],
"recipient": msg.get("recipient", [""])[0] if msg.get("recipient") else "",
"sender": msg.get("sender", ""),
"subject": msg.get("subject", "")[:80],
"timestamp": msg.get("messageTime", ""),
})
# Summarize blocked activity
threat_types = {}
for msg in blocked_msgs:
for t in msg.get("threatsInfoMap", []):
tt = t.get("threatType", "unknown")
threat_types[tt] = threat_types.get(tt, 0) + 1
if threat_types:
findings.append({
"type": "blocked_summary",
"severity": "INFO",
"detail": f"Blocked threats by type: {json.dumps(threat_types)}",
"total_blocked": len(blocked_msgs),
})
# Permitted clicks on potentially malicious URLs
for click in permitted_clicks:
if click.get("threatStatus") == "active":
findings.append({
"type": "permitted_malicious_click",
"severity": "HIGH",
"url": click.get("url", "")[:100],
"user": click.get("recipient", [""])[0] if click.get("recipient") else "",
"click_time": click.get("clickTime", ""),
})
return findings
def check_compliance(target, token):
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10)
if resp.status_code == 200:
for item in resp.json().get("checks", []):
if item.get("status") != "PASS":
findings.append({"check": item.get("name"), "status": item.get("status"),
"severity": item.get("severity", "MEDIUM")})
except requests.RequestException:
pass
return findings
def format_summary(findings, blocked_msgs, delivered, blocked_clicks, permitted_clicks):
"""Print email security summary."""
print(f"\n{'='*60}")
print(f" Proofpoint Email Security Report")
print(f"{'='*60}")
print(f" Blocked Messages : {len(blocked_msgs)}")
print(f" Delivered Threats : {len(delivered)}")
print(f" Blocked Clicks : {len(blocked_clicks)}")
print(f" Permitted Clicks : {len(permitted_clicks)}")
print(f" Security Findings : {len(findings)}")
critical = [f for f in findings if f["severity"] == "CRITICAL"]
if critical:
print(f"\n CRITICAL - Threats That Bypassed Filters ({len(critical)}):")
for f in critical[:10]:
print(f" {f.get('threat_type', 'N/A'):15s} | "
f"To: {f.get('recipient', 'N/A'):30s} | "
f"{f.get('subject', '')[:40]}")
severity_counts = {}
for f in findings:
sev = f.get("severity", "INFO")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
return severity_counts
def main():
p = argparse.ArgumentParser(description="Proofpoint email security gateway audit")
p.add_argument("--target", required=True, help="Target URL")
p.add_argument("--token", required=True, help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] Proofpoint email security gateway audit")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
report["findings"].extend(audit_config(a.target, a.token))
report["findings"].extend(check_compliance(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
else:
parser = argparse.ArgumentParser(description="Proofpoint email security audit agent")
parser.add_argument("--principal", help="TAP API principal (or PROOFPOINT_PRINCIPAL env)")
parser.add_argument("--secret", help="TAP API secret (or PROOFPOINT_SECRET env)")
parser.add_argument("--hours", type=int, default=24, help="Hours to look back (default: 24)")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
if args.principal:
os.environ["PROOFPOINT_PRINCIPAL"] = args.principal
if args.secret:
os.environ["PROOFPOINT_SECRET"] = args.secret
principal, secret = get_pp_config()
blocked_msgs = get_blocked_messages(principal, secret, args.hours)
delivered = get_delivered_threats(principal, secret, args.hours)
blocked_clicks = get_blocked_clicks(principal, secret, args.hours)
permitted_clicks = get_permitted_clicks(principal, secret, args.hours)
findings = analyze_threats(blocked_msgs, delivered, blocked_clicks, permitted_clicks)
severity_counts = format_summary(findings, blocked_msgs, delivered, blocked_clicks, permitted_clicks)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "Proofpoint TAP",
"period_hours": args.hours,
"blocked_messages": len(blocked_msgs),
"delivered_threats": len(delivered),
"findings": findings,
"severity_counts": severity_counts,
"risk_level": (
"CRITICAL" if severity_counts.get("CRITICAL", 0) > 0
else "HIGH" if severity_counts.get("HIGH", 0) > 0
else "LOW"
),
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
@@ -1,61 +1,213 @@
#!/usr/bin/env python3
"""Purdue model OT network segmentation audit."""
import argparse, json
"""Purdue model OT network segmentation audit agent.
Audits OT/ICS network segmentation against the Purdue Enterprise
Reference Architecture by testing connectivity between network zones,
verifying firewall rules, and mapping discovered hosts to Purdue levels.
"""
import argparse
import json
import os
import socket
import subprocess
import sys
from datetime import datetime, timezone
try:
import requests
except ImportError:
requests = None
def audit_config(target, token):
PURDUE_LEVELS = {
0: {"name": "Process", "description": "Sensors, actuators, field devices"},
1: {"name": "Basic Control", "description": "PLCs, RTUs, safety systems"},
2: {"name": "Area Supervisory", "description": "HMIs, SCADA, historian"},
3: {"name": "Site Operations", "description": "Patch mgmt, AV, file servers"},
3.5: {"name": "DMZ", "description": "Industrial DMZ between IT and OT"},
4: {"name": "Site Business", "description": "ERP, email, corporate apps"},
5: {"name": "Enterprise", "description": "Internet, cloud, remote access"},
}
PROHIBITED_FLOWS = [
{"from_level": 5, "to_level": 0, "description": "Internet to Process (critical violation)"},
{"from_level": 5, "to_level": 1, "description": "Internet to Basic Control"},
{"from_level": 5, "to_level": 2, "description": "Internet to SCADA"},
{"from_level": 4, "to_level": 0, "description": "Corporate to Process"},
{"from_level": 4, "to_level": 1, "description": "Corporate to PLC/RTU"},
]
def test_connectivity(source_ip, target_ip, ports, timeout=3):
"""Test TCP connectivity between two hosts on specified ports."""
results = []
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((target_ip, port))
sock.close()
reachable = result == 0
results.append({
"target": target_ip,
"port": port,
"reachable": reachable,
})
except (socket.error, OSError):
results.append({"target": target_ip, "port": port, "reachable": False})
return results
def audit_zone_separation(zone_map):
"""Audit network segmentation between Purdue zones."""
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
if not data.get("enabled", True):
findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"})
elif resp.status_code == 401:
findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"})
except requests.RequestException as e:
findings.append({"error": str(e)})
print("[*] Auditing zone separation...")
for flow in PROHIBITED_FLOWS:
from_level = flow["from_level"]
to_level = flow["to_level"]
from_hosts = zone_map.get(str(from_level), [])
to_hosts = zone_map.get(str(to_level), [])
for src in from_hosts[:3]:
for dst in to_hosts[:3]:
common_ports = [22, 80, 443, 502, 102, 44818, 47808, 20000]
results = test_connectivity(src, dst, common_ports)
open_ports = [r for r in results if r["reachable"]]
if open_ports:
findings.append({
"check": f"Zone {from_level} -> Zone {to_level}",
"severity": "CRITICAL",
"source": src,
"destination": dst,
"open_ports": [r["port"] for r in open_ports],
"detail": flow["description"],
"recommendation": "Block traffic between these zones via firewall",
})
if not findings:
findings.append({
"check": "Prohibited zone flows",
"severity": "INFO",
"detail": "No prohibited cross-zone connectivity detected",
})
return findings
def check_compliance(target, token):
def audit_ot_protocols(target_ips):
"""Check for exposed OT/ICS protocols on target hosts."""
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10)
if resp.status_code == 200:
for item in resp.json().get("checks", []):
if item.get("status") != "PASS":
findings.append({"check": item.get("name"), "status": item.get("status"),
"severity": item.get("severity", "MEDIUM")})
except requests.RequestException:
pass
ot_ports = {
502: "Modbus TCP",
102: "S7comm (Siemens)",
44818: "EtherNet/IP",
47808: "BACnet",
20000: "DNP3",
4840: "OPC UA",
2222: "EtherCAT",
1911: "Niagara Fox",
9600: "OMRON FINS",
}
print(f"[*] Scanning {len(target_ips)} hosts for exposed OT protocols...")
for ip in target_ips:
for port, protocol in ot_ports.items():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((ip, port))
sock.close()
if result == 0:
findings.append({
"check": f"Exposed OT protocol: {protocol}",
"severity": "HIGH",
"host": ip,
"port": port,
"protocol": protocol,
"detail": f"{protocol} on {ip}:{port} is accessible",
})
except (socket.error, OSError):
pass
print(f"[+] Found {len(findings)} exposed OT protocols")
return findings
def load_zone_map(config_path):
"""Load zone-to-host mapping from config file."""
with open(config_path, "r") as f:
return json.load(f)
def format_summary(zone_findings, protocol_findings, zone_map):
"""Print audit summary."""
all_findings = zone_findings + protocol_findings
print(f"\n{'='*60}")
print(f" Purdue Model Network Segmentation Audit")
print(f"{'='*60}")
for level, info in sorted(PURDUE_LEVELS.items()):
host_count = len(zone_map.get(str(level), []))
print(f" Level {level}: {info['name']:20s} ({host_count} hosts) - {info['description']}")
print(f"\n Zone Separation Findings : {len(zone_findings)}")
print(f" Protocol Exposure Findings: {len(protocol_findings)}")
severity_counts = {}
for f in all_findings:
sev = f.get("severity", "INFO")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
if all_findings:
print(f"\n Critical/High Issues:")
for f in all_findings:
if f["severity"] in ("CRITICAL", "HIGH"):
print(f" [{f['severity']:8s}] {f['check']}: {f.get('detail', '')[:50]}")
return severity_counts
def main():
p = argparse.ArgumentParser(description="Purdue model OT network segmentation audit")
p.add_argument("--target", required=True, help="Target URL")
p.add_argument("--token", required=True, help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] Purdue model OT network segmentation audit")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
report["findings"].extend(audit_config(a.target, a.token))
report["findings"].extend(check_compliance(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
else:
parser = argparse.ArgumentParser(
description="Purdue model OT network segmentation audit agent"
)
parser.add_argument("--zone-map", required=True,
help="JSON file mapping Purdue levels to host IPs")
parser.add_argument("--scan-protocols", action="store_true",
help="Scan for exposed OT protocols")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
zone_map = load_zone_map(args.zone_map)
zone_findings = audit_zone_separation(zone_map)
protocol_findings = []
if args.scan_protocols:
all_hosts = []
for level_hosts in zone_map.values():
all_hosts.extend(level_hosts)
protocol_findings = audit_ot_protocols(list(set(all_hosts)))
severity_counts = format_summary(zone_findings, protocol_findings, zone_map)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "Purdue Model Audit",
"zone_map": zone_map,
"zone_findings": zone_findings,
"protocol_findings": protocol_findings,
"severity_counts": severity_counts,
"risk_level": (
"CRITICAL" if severity_counts.get("CRITICAL", 0) > 0
else "HIGH" if severity_counts.get("HIGH", 0) > 0
else "LOW"
),
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
@@ -1,61 +1,254 @@
#!/usr/bin/env python3
"""Okta SCIM provisioning audit."""
import argparse, json
"""Okta SCIM provisioning audit agent.
Audits Okta SCIM provisioning configuration by querying the Okta API
for provisioned applications, user assignments, group memberships, and
deprovisioning status. Identifies orphaned accounts, mismatched
assignments, and provisioning failures.
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone
try:
import requests
except ImportError:
requests = None
print("[!] 'requests' required: pip install requests", file=sys.stderr)
sys.exit(1)
def audit_config(target, token):
def get_okta_config():
"""Return Okta org URL and API token."""
org_url = os.environ.get("OKTA_ORG_URL", "").rstrip("/")
api_token = os.environ.get("OKTA_API_TOKEN", "")
if not org_url or not api_token:
print("[!] Set OKTA_ORG_URL and OKTA_API_TOKEN env vars", file=sys.stderr)
sys.exit(1)
return org_url, api_token
def okta_api(org_url, token, endpoint, params=None):
"""Make authenticated Okta API call with pagination."""
url = f"{org_url}/api/v1{endpoint}"
headers = {"Authorization": f"SSWS {token}", "Accept": "application/json"}
all_results = []
while url:
resp = requests.get(url, headers=headers, params=params, timeout=30)
resp.raise_for_status()
all_results.extend(resp.json())
links = resp.links
url = links.get("next", {}).get("url")
params = None # Pagination URL includes params
return all_results
def list_provisioning_apps(org_url, token):
"""List applications with SCIM provisioning enabled."""
print("[*] Fetching provisioning-enabled applications...")
apps = okta_api(org_url, token, "/apps", params={"limit": 200})
scim_apps = []
for app in apps:
features = app.get("features", [])
if any(f in features for f in [
"PUSH_NEW_USERS", "PUSH_USER_DEACTIVATION",
"IMPORT_NEW_USERS", "PUSH_PROFILE_UPDATES"
]):
scim_apps.append({
"id": app.get("id"),
"name": app.get("name", ""),
"label": app.get("label", ""),
"status": app.get("status", ""),
"features": features,
"sign_on_mode": app.get("signOnMode", ""),
"created": app.get("created", ""),
})
print(f"[+] Found {len(scim_apps)} SCIM-enabled apps")
return scim_apps
def audit_app_assignments(org_url, token, app_id, app_label):
"""Audit user assignments for a provisioning app."""
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
if not data.get("enabled", True):
findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"})
elif resp.status_code == 401:
findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"})
except requests.RequestException as e:
findings.append({"error": str(e)})
print(f"[*] Auditing assignments for: {app_label}")
users = okta_api(org_url, token, f"/apps/{app_id}/users", params={"limit": 200})
status_counts = {}
for user in users:
status = user.get("status", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
scope = user.get("scope", "")
sync_state = user.get("syncState", "")
if status == "PROVISIONED" and sync_state == "ERROR":
findings.append({
"app": app_label,
"user": user.get("credentials", {}).get("userName", "unknown"),
"check": "Provisioning sync error",
"severity": "HIGH",
"detail": f"User sync state: ERROR (status: {status})",
})
if status == "DEPROVISIONED":
findings.append({
"app": app_label,
"user": user.get("credentials", {}).get("userName", "unknown"),
"check": "Deprovisioned user still assigned",
"severity": "MEDIUM",
"detail": "User is deprovisioned but assignment exists",
})
findings.append({
"app": app_label,
"check": "Assignment summary",
"severity": "INFO",
"detail": f"Total: {len(users)}, Status: {json.dumps(status_counts)}",
})
return findings, users
def audit_group_assignments(org_url, token, app_id, app_label):
"""Audit group-based provisioning assignments."""
findings = []
groups = okta_api(org_url, token, f"/apps/{app_id}/groups", params={"limit": 200})
if not groups:
findings.append({
"app": app_label,
"check": "Group assignments",
"severity": "MEDIUM",
"detail": "No group assignments found (user-level only)",
})
else:
for group in groups:
group_id = group.get("id", "")
priority = group.get("priority", 0)
findings.append({
"app": app_label,
"check": f"Group assignment: {group_id}",
"severity": "INFO",
"detail": f"Priority: {priority}",
})
return findings
def check_compliance(target, token):
def check_deprovisioning(org_url, token):
"""Check for deactivated Okta users that still have active app assignments."""
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10)
if resp.status_code == 200:
for item in resp.json().get("checks", []):
if item.get("status") != "PASS":
findings.append({"check": item.get("name"), "status": item.get("status"),
"severity": item.get("severity", "MEDIUM")})
except requests.RequestException:
pass
print("[*] Checking for orphaned provisioning assignments...")
deactivated = okta_api(org_url, token, "/users",
params={"filter": 'status eq "DEPROVISIONED"', "limit": 200})
for user in deactivated[:50]: # Check first 50
user_id = user.get("id")
login = user.get("profile", {}).get("login", "unknown")
try:
apps = okta_api(org_url, token, f"/users/{user_id}/appLinks")
if apps:
findings.append({
"check": "Orphaned app assignment",
"user": login,
"severity": "HIGH",
"detail": f"Deactivated user has {len(apps)} active app link(s)",
"apps": [a.get("appName", "") for a in apps[:5]],
})
except requests.RequestException:
pass
if not findings:
findings.append({
"check": "Deprovisioning audit",
"severity": "INFO",
"detail": f"No orphaned assignments found ({len(deactivated)} deactivated users checked)",
})
return findings
def format_summary(scim_apps, all_findings):
"""Print audit summary."""
print(f"\n{'='*60}")
print(f" Okta SCIM Provisioning Audit Report")
print(f"{'='*60}")
print(f" SCIM Apps : {len(scim_apps)}")
print(f" Findings : {len(all_findings)}")
severity_counts = {}
for f in all_findings:
sev = f.get("severity", "INFO")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
print(f"\n By Severity:")
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
count = severity_counts.get(sev, 0)
if count:
print(f" {sev:10s}: {count}")
if scim_apps:
print(f"\n Provisioning-Enabled Apps:")
for app in scim_apps:
print(f" {app['label']:30s} | {app['status']:10s} | "
f"Features: {', '.join(app['features'][:3])}")
issues = [f for f in all_findings if f["severity"] in ("CRITICAL", "HIGH")]
if issues:
print(f"\n Issues Requiring Attention:")
for f in issues[:15]:
print(f" [{f['severity']:8s}] {f['check']}: {f.get('detail', '')[:50]}")
return severity_counts
def main():
p = argparse.ArgumentParser(description="Okta SCIM provisioning audit")
p.add_argument("--target", required=True, help="Target URL")
p.add_argument("--token", required=True, help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] Okta SCIM provisioning audit")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
report["findings"].extend(audit_config(a.target, a.token))
report["findings"].extend(check_compliance(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
else:
parser = argparse.ArgumentParser(description="Okta SCIM provisioning audit agent")
parser.add_argument("--org-url", help="Okta org URL (or OKTA_ORG_URL env)")
parser.add_argument("--token", help="API token (or OKTA_API_TOKEN env)")
parser.add_argument("--app-id", help="Audit a specific app ID")
parser.add_argument("--skip-deprovisioning", action="store_true")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
if args.org_url:
os.environ["OKTA_ORG_URL"] = args.org_url
if args.token:
os.environ["OKTA_API_TOKEN"] = args.token
org_url, token = get_okta_config()
all_findings = []
scim_apps = list_provisioning_apps(org_url, token)
for app in scim_apps:
if args.app_id and app["id"] != args.app_id:
continue
findings, _ = audit_app_assignments(org_url, token, app["id"], app["label"])
all_findings.extend(findings)
group_findings = audit_group_assignments(org_url, token, app["id"], app["label"])
all_findings.extend(group_findings)
if not args.skip_deprovisioning:
all_findings.extend(check_deprovisioning(org_url, token))
severity_counts = format_summary(scim_apps, all_findings)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "Okta SCIM Audit",
"scim_apps": scim_apps,
"findings": all_findings,
"severity_counts": severity_counts,
"risk_level": (
"CRITICAL" if severity_counts.get("CRITICAL", 0) > 0
else "HIGH" if severity_counts.get("HIGH", 0) > 0
else "MEDIUM" if severity_counts.get("MEDIUM", 0) > 0
else "LOW"
),
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
@@ -1,61 +1,195 @@
#!/usr/bin/env python3
"""Cortex XSOAR playbook audit."""
import argparse, json
"""Cortex XSOAR playbook management agent.
Interfaces with the Cortex XSOAR (Demisto) API to manage and audit
security playbooks, automation scripts, incidents, and integrations.
Supports listing playbooks, checking incident statistics, and
verifying integration health.
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone
try:
import requests
except ImportError:
requests = None
print("[!] 'requests' required: pip install requests", file=sys.stderr)
sys.exit(1)
def audit_config(target, token):
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
if not data.get("enabled", True):
findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"})
elif resp.status_code == 401:
findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"})
except requests.RequestException as e:
findings.append({"error": str(e)})
return findings
def check_compliance(target, token):
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"}
def get_xsoar_config():
"""Return XSOAR server URL and API key."""
server = os.environ.get("XSOAR_URL", "").rstrip("/")
api_key = os.environ.get("XSOAR_API_KEY", "")
if not server or not api_key:
print("[!] Set XSOAR_URL and XSOAR_API_KEY env vars", file=sys.stderr)
sys.exit(1)
return server, api_key
def xsoar_api(server, api_key, endpoint, method="POST", data=None):
"""Make authenticated XSOAR API call."""
url = f"{server}{endpoint}"
headers = {"Authorization": api_key, "Content-Type": "application/json",
"Accept": "application/json"}
if method == "GET":
resp = requests.get(url, headers=headers, verify=False, timeout=30)
else:
resp = requests.post(url, headers=headers, json=data or {}, verify=False, timeout=30)
resp.raise_for_status()
return resp.json()
def list_playbooks(server, api_key, query=""):
"""List all playbooks."""
print("[*] Fetching playbooks...")
data = {"query": query, "page": 0, "size": 500}
result = xsoar_api(server, api_key, "/playbook/search", data=data)
playbooks = result.get("playbooks", [])
print(f"[+] Found {len(playbooks)} playbooks")
return [{
"id": pb.get("id", ""),
"name": pb.get("name", ""),
"version": pb.get("version", 0),
"deprecated": pb.get("deprecated", False),
"hidden": pb.get("hidden", False),
"system": pb.get("system", False),
"modified": pb.get("modified", ""),
} for pb in playbooks]
def get_incident_stats(server, api_key, days=30):
"""Get incident statistics."""
print(f"[*] Fetching incident statistics (last {days}d)...")
data = {"size": 0, "filter": {"period": {"by": "day", "fromValue": days}}}
result = xsoar_api(server, api_key, "/incidents/search", data=data)
total = result.get("total", 0)
print(f"[+] {total} incidents in last {days} days")
# Get status breakdown
statuses = {}
data_with_agg = {"size": 0, "filter": {"period": {"by": "day", "fromValue": days}},
"aggregations": [{"field": "status", "type": "terms"}]}
try:
resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10)
if resp.status_code == 200:
for item in resp.json().get("checks", []):
if item.get("status") != "PASS":
findings.append({"check": item.get("name"), "status": item.get("status"),
"severity": item.get("severity", "MEDIUM")})
except requests.RequestException:
agg_result = xsoar_api(server, api_key, "/incidents/search", data=data_with_agg)
for bucket in agg_result.get("aggregations", {}).get("status", {}).get("buckets", []):
statuses[bucket.get("key", "unknown")] = bucket.get("doc_count", 0)
except (requests.RequestException, KeyError):
pass
return {"total": total, "period_days": days, "by_status": statuses}
def list_integrations(server, api_key):
"""List configured integrations and their health."""
print("[*] Fetching integrations...")
result = xsoar_api(server, api_key, "/settings/integration/search",
data={"size": 500})
instances = result.get("instances", [])
integrations = []
for inst in instances:
integrations.append({
"name": inst.get("name", ""),
"brand": inst.get("brand", ""),
"enabled": inst.get("enabled", ""),
"is_long_running": inst.get("isLongRunning", False),
"configured": inst.get("configurationStatus", ""),
})
print(f"[+] Found {len(integrations)} integration instances")
return integrations
def audit_playbook_health(playbooks, integrations):
"""Audit playbooks for common issues."""
findings = []
deprecated = [pb for pb in playbooks if pb.get("deprecated")]
if deprecated:
findings.append({
"check": "Deprecated playbooks in use",
"severity": "MEDIUM",
"count": len(deprecated),
"detail": ", ".join(pb["name"] for pb in deprecated[:5]),
})
disabled_integrations = [i for i in integrations if i.get("enabled") == "false"]
if disabled_integrations:
findings.append({
"check": "Disabled integrations",
"severity": "HIGH",
"count": len(disabled_integrations),
"detail": ", ".join(i["name"] for i in disabled_integrations[:5]),
})
return findings
def format_summary(playbooks, incident_stats, integrations, findings):
"""Print XSOAR audit summary."""
print(f"\n{'='*60}")
print(f" Cortex XSOAR Playbook Audit Report")
print(f"{'='*60}")
print(f" Playbooks : {len(playbooks)}")
print(f" Integrations : {len(integrations)}")
print(f" Incidents : {incident_stats.get('total', 0)} (last {incident_stats.get('period_days', 30)}d)")
print(f" Findings : {len(findings)}")
if incident_stats.get("by_status"):
print(f"\n Incidents by Status:")
for status, count in incident_stats["by_status"].items():
print(f" {status:15s}: {count}")
enabled_count = sum(1 for i in integrations if i.get("enabled") != "false")
print(f"\n Integrations: {enabled_count} enabled, {len(integrations) - enabled_count} disabled")
severity_counts = {}
for f in findings:
sev = f.get("severity", "INFO")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
return severity_counts
def main():
p = argparse.ArgumentParser(description="Cortex XSOAR playbook audit")
p.add_argument("--target", required=True, help="Target URL")
p.add_argument("--token", required=True, help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] Cortex XSOAR playbook audit")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
report["findings"].extend(audit_config(a.target, a.token))
report["findings"].extend(check_compliance(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
else:
parser = argparse.ArgumentParser(description="Cortex XSOAR playbook audit agent")
parser.add_argument("--url", help="XSOAR URL (or XSOAR_URL env)")
parser.add_argument("--api-key", help="API key (or XSOAR_API_KEY env)")
parser.add_argument("--days", type=int, default=30, help="Incident stats period")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
if args.url:
os.environ["XSOAR_URL"] = args.url
if args.api_key:
os.environ["XSOAR_API_KEY"] = args.api_key
server, api_key = get_xsoar_config()
playbooks = list_playbooks(server, api_key)
incident_stats = get_incident_stats(server, api_key, args.days)
integrations = list_integrations(server, api_key)
findings = audit_playbook_health(playbooks, integrations)
severity_counts = format_summary(playbooks, incident_stats, integrations, findings)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "Cortex XSOAR Audit",
"playbooks": playbooks,
"incident_stats": incident_stats,
"integrations": integrations,
"findings": findings,
"severity_counts": severity_counts,
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
@@ -1,61 +1,255 @@
#!/usr/bin/env python3
"""OpenTAXII server configuration audit."""
import argparse, json
"""OpenTAXII server configuration and health audit agent.
Audits an OpenTAXII server instance by checking service discovery,
collection availability, content block statistics, and API health.
Supports both TAXII 1.1 and 2.0/2.1 endpoints.
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone
try:
import requests
except ImportError:
requests = None
print("[!] 'requests' required: pip install requests", file=sys.stderr)
sys.exit(1)
def audit_config(target, token):
try:
from taxii2client.v20 import Server as Server20
from taxii2client.v21 import Server as Server21
HAS_TAXII_CLIENT = True
except ImportError:
HAS_TAXII_CLIENT = False
def check_taxii1_discovery(base_url, username=None, password=None):
"""Check TAXII 1.1 discovery service."""
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"}
discovery_url = f"{base_url}/services/discovery"
print(f"[*] Checking TAXII 1.1 discovery: {discovery_url}")
headers = {"Content-Type": "application/xml",
"X-TAXII-Content-Type": "urn:taxii.mitre.org:message:xml:1.1",
"X-TAXII-Protocol": "urn:taxii.mitre.org:protocol:http:1.0"}
discovery_xml = (
'<Discovery_Request xmlns="http://taxii.mitre.org/messages/taxii_xml_binding-1.1" '
'message_id="1"/>'
)
auth = (username, password) if username else None
try:
resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10)
resp = requests.post(discovery_url, data=discovery_xml, headers=headers,
auth=auth, timeout=15)
if resp.status_code == 200:
data = resp.json()
if not data.get("enabled", True):
findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"})
elif resp.status_code == 401:
findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"})
findings.append({
"check": "TAXII 1.1 Discovery",
"status": "PASS",
"severity": "INFO",
"detail": f"Discovery service responding ({len(resp.content)} bytes)",
})
else:
findings.append({
"check": "TAXII 1.1 Discovery",
"status": "FAIL",
"severity": "HIGH",
"detail": f"HTTP {resp.status_code}",
})
except requests.RequestException as e:
findings.append({"error": str(e)})
findings.append({
"check": "TAXII 1.1 Discovery",
"status": "FAIL",
"severity": "HIGH",
"detail": str(e)[:100],
})
return findings
def check_compliance(target, token):
def check_taxii2_discovery(base_url, username=None, password=None, version="2.1"):
"""Check TAXII 2.0/2.1 discovery and collections."""
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"}
print(f"[*] Checking TAXII {version} discovery: {base_url}")
if HAS_TAXII_CLIENT:
try:
kwargs = {}
if username and password:
kwargs["user"] = username
kwargs["password"] = password
if version == "2.0":
server = Server20(base_url, **kwargs)
else:
server = Server21(base_url, **kwargs)
findings.append({
"check": f"TAXII {version} Discovery",
"status": "PASS",
"severity": "INFO",
"detail": f"Server: {server.title or 'Untitled'}",
})
for api_root in server.api_roots:
collections = list(api_root.collections)
findings.append({
"check": f"API Root: {api_root.title or api_root.url}",
"status": "PASS",
"severity": "INFO",
"detail": f"{len(collections)} collections",
"collections": [{
"id": c.id,
"title": c.title,
"can_read": getattr(c, "can_read", True),
"can_write": getattr(c, "can_write", False),
} for c in collections],
})
except Exception as e:
findings.append({
"check": f"TAXII {version} Discovery",
"status": "FAIL",
"severity": "HIGH",
"detail": str(e)[:100],
})
else:
# Fallback to raw HTTP
auth = (username, password) if username else None
headers = {"Accept": "application/taxii+json;version=2.1"}
try:
resp = requests.get(base_url, headers=headers, auth=auth, timeout=15)
if resp.status_code == 200:
data = resp.json()
findings.append({
"check": f"TAXII {version} Discovery",
"status": "PASS",
"severity": "INFO",
"detail": f"Title: {data.get('title', 'N/A')}, "
f"API Roots: {len(data.get('api_roots', []))}",
})
else:
findings.append({
"check": f"TAXII {version} Discovery",
"status": "FAIL",
"severity": "HIGH",
"detail": f"HTTP {resp.status_code}",
})
except requests.RequestException as e:
findings.append({
"check": f"TAXII {version} Discovery",
"status": "FAIL",
"severity": "HIGH",
"detail": str(e)[:100],
})
return findings
def check_server_health(base_url):
"""Check basic server health."""
findings = []
print(f"[*] Checking server health: {base_url}")
# Check TLS
if base_url.startswith("https://"):
findings.append({
"check": "TLS enabled",
"status": "PASS",
"severity": "INFO",
})
else:
findings.append({
"check": "TLS enabled",
"status": "FAIL",
"severity": "HIGH",
"detail": "TAXII server not using HTTPS",
})
# Check response time
try:
resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10)
if resp.status_code == 200:
for item in resp.json().get("checks", []):
if item.get("status") != "PASS":
findings.append({"check": item.get("name"), "status": item.get("status"),
"severity": item.get("severity", "MEDIUM")})
resp = requests.get(base_url, timeout=10)
response_time = resp.elapsed.total_seconds()
if response_time > 5:
findings.append({
"check": "Response time",
"status": "WARN",
"severity": "MEDIUM",
"detail": f"{response_time:.2f}s (slow)",
})
else:
findings.append({
"check": "Response time",
"status": "PASS",
"severity": "INFO",
"detail": f"{response_time:.2f}s",
})
except requests.RequestException:
pass
return findings
def format_summary(all_findings, base_url):
"""Print audit summary."""
print(f"\n{'='*60}")
print(f" OpenTAXII Server Audit Report")
print(f"{'='*60}")
print(f" Server : {base_url}")
print(f" Findings : {len(all_findings)}")
pass_count = sum(1 for f in all_findings if f["status"] == "PASS")
fail_count = sum(1 for f in all_findings if f["status"] == "FAIL")
print(f" Passed : {pass_count}")
print(f" Failed : {fail_count}")
for f in all_findings:
icon = "OK" if f["status"] == "PASS" else "!!" if f["status"] == "FAIL" else "~~"
print(f" [{icon}] {f['check']}: {f.get('detail', '')[:50]}")
severity_counts = {}
for f in all_findings:
sev = f.get("severity", "INFO")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
return severity_counts
def main():
p = argparse.ArgumentParser(description="OpenTAXII server configuration audit")
p.add_argument("--target", required=True, help="Target URL")
p.add_argument("--token", required=True, help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] OpenTAXII server configuration audit")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
report["findings"].extend(audit_config(a.target, a.token))
report["findings"].extend(check_compliance(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
parser = argparse.ArgumentParser(description="OpenTAXII server audit agent")
parser.add_argument("--url", required=True, help="TAXII server base URL")
parser.add_argument("--username", help="Authentication username")
parser.add_argument("--password", help="Authentication password")
parser.add_argument("--version", choices=["1.1", "2.0", "2.1"], default="2.1")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
all_findings = []
all_findings.extend(check_server_health(args.url))
if args.version == "1.1":
all_findings.extend(check_taxii1_discovery(args.url, args.username, args.password))
else:
all_findings.extend(check_taxii2_discovery(args.url, args.username, args.password, args.version))
severity_counts = format_summary(all_findings, args.url)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "OpenTAXII Audit",
"server": args.url,
"version": args.version,
"findings": all_findings,
"severity_counts": severity_counts,
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
@@ -1,61 +1,265 @@
#!/usr/bin/env python3
"""Threat intelligence lifecycle audit."""
import argparse, json
from datetime import datetime, timezone
"""Threat intelligence lifecycle management agent.
Manages the threat intelligence lifecycle: collection from feeds,
processing/normalization of IOCs, analysis/enrichment via VirusTotal
and AbuseIPDB, dissemination to SIEM/firewalls, and tracking of
IOC aging and confidence scoring.
"""
import argparse
import csv
import hashlib
import json
import os
import re
import sys
from datetime import datetime, timezone, timedelta
try:
import requests
except ImportError:
requests = None
def audit_config(target, token):
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
if not data.get("enabled", True):
findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"})
elif resp.status_code == 401:
findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"})
except requests.RequestException as e:
findings.append({"error": str(e)})
return findings
def check_compliance(target, token):
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"}
IOC_PATTERNS = {
"ipv4": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),
"domain": re.compile(r'\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z]{2,}\b', re.I),
"md5": re.compile(r'\b[a-fA-F0-9]{32}\b'),
"sha1": re.compile(r'\b[a-fA-F0-9]{40}\b'),
"sha256": re.compile(r'\b[a-fA-F0-9]{64}\b'),
"url": re.compile(r'https?://[^\s<>"{}|\\^`\[\]]+'),
"email": re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'),
}
def extract_iocs(text):
"""Extract IOCs from unstructured text."""
iocs = {}
for ioc_type, pattern in IOC_PATTERNS.items():
matches = set(pattern.findall(text))
# Filter out private IPs for ipv4
if ioc_type == "ipv4":
matches = {ip for ip in matches
if not ip.startswith(("10.", "192.168.", "127.", "0."))
and not ip.startswith("172.") or not (16 <= int(ip.split(".")[1]) <= 31)}
if matches:
iocs[ioc_type] = sorted(matches)
return iocs
def load_ioc_feed(source):
"""Load IOCs from a file (JSON, CSV, or plain text)."""
ext = os.path.splitext(source)[1].lower()
iocs = []
if ext == ".json":
with open(source, "r") as f:
data = json.load(f)
if isinstance(data, list):
iocs = data
elif isinstance(data, dict):
iocs = data.get("indicators", data.get("iocs", data.get("data", [])))
elif ext == ".csv":
with open(source, "r", newline="") as f:
reader = csv.DictReader(f)
iocs = list(reader)
else:
with open(source, "r") as f:
text = f.read()
extracted = extract_iocs(text)
for ioc_type, values in extracted.items():
for v in values:
iocs.append({"type": ioc_type, "value": v, "source": source})
return iocs
def normalize_ioc(ioc):
"""Normalize IOC into standard format."""
if isinstance(ioc, str):
for ioc_type, pattern in IOC_PATTERNS.items():
if pattern.fullmatch(ioc):
return {"type": ioc_type, "value": ioc.lower().strip()}
return {"type": "unknown", "value": ioc.strip()}
return {
"type": (ioc.get("type") or ioc.get("indicator_type") or "unknown").lower(),
"value": (ioc.get("value") or ioc.get("indicator") or "").lower().strip(),
"source": ioc.get("source", ""),
"confidence": ioc.get("confidence", 50),
"first_seen": ioc.get("first_seen", ""),
"last_seen": ioc.get("last_seen", ""),
"tags": ioc.get("tags", []),
"description": ioc.get("description", ""),
}
def enrich_ioc_virustotal(ioc_value, ioc_type, api_key):
"""Enrich IOC via VirusTotal API v3."""
if not requests or not api_key:
return {}
headers = {"x-apikey": api_key}
base = "https://www.virustotal.com/api/v3"
if ioc_type in ("md5", "sha1", "sha256"):
url = f"{base}/files/{ioc_value}"
elif ioc_type == "domain":
url = f"{base}/domains/{ioc_value}"
elif ioc_type == "ipv4":
url = f"{base}/ip_addresses/{ioc_value}"
elif ioc_type == "url":
url_id = hashlib.sha256(ioc_value.encode()).hexdigest()
url = f"{base}/urls/{url_id}"
else:
return {}
try:
resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10)
resp = requests.get(url, headers=headers, timeout=15)
if resp.status_code == 200:
for item in resp.json().get("checks", []):
if item.get("status") != "PASS":
findings.append({"check": item.get("name"), "status": item.get("status"),
"severity": item.get("severity", "MEDIUM")})
data = resp.json().get("data", {}).get("attributes", {})
stats = data.get("last_analysis_stats", {})
return {
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"harmless": stats.get("harmless", 0),
"undetected": stats.get("undetected", 0),
"reputation": data.get("reputation", 0),
"source": "virustotal",
}
except requests.RequestException:
pass
return findings
return {}
def calculate_confidence(ioc, enrichment=None):
"""Calculate confidence score for an IOC (0-100)."""
score = ioc.get("confidence", 50)
# Boost for VT detections
if enrichment:
malicious = enrichment.get("malicious", 0)
if malicious > 10:
score = min(score + 30, 100)
elif malicious > 5:
score = min(score + 20, 100)
elif malicious > 0:
score = min(score + 10, 100)
elif enrichment.get("harmless", 0) > 20:
score = max(score - 20, 0)
# Decay based on age
first_seen = ioc.get("first_seen", "")
if first_seen:
try:
if "T" in first_seen:
seen_dt = datetime.fromisoformat(first_seen.replace("Z", "+00:00"))
else:
seen_dt = datetime.strptime(first_seen[:10], "%Y-%m-%d").replace(tzinfo=timezone.utc)
age_days = (datetime.now(timezone.utc) - seen_dt).days
if age_days > 180:
score = max(score - 20, 0)
elif age_days > 90:
score = max(score - 10, 0)
except (ValueError, TypeError):
pass
return min(max(score, 0), 100)
def format_summary(iocs, enriched_count):
"""Print lifecycle report."""
print(f"\n{'='*60}")
print(f" Threat Intelligence Lifecycle Report")
print(f"{'='*60}")
print(f" Total IOCs : {len(iocs)}")
print(f" Enriched : {enriched_count}")
by_type = {}
for ioc in iocs:
t = ioc.get("type", "unknown")
by_type[t] = by_type.get(t, 0) + 1
print(f"\n By Type:")
for t, count in sorted(by_type.items(), key=lambda x: -x[1]):
print(f" {t:12s}: {count}")
high_conf = [i for i in iocs if i.get("confidence", 0) >= 80]
med_conf = [i for i in iocs if 50 <= i.get("confidence", 0) < 80]
low_conf = [i for i in iocs if i.get("confidence", 0) < 50]
print(f"\n By Confidence:")
print(f" High (>=80) : {len(high_conf)}")
print(f" Medium : {len(med_conf)}")
print(f" Low (<50) : {len(low_conf)}")
if high_conf:
print(f"\n High-Confidence IOCs:")
for i in high_conf[:15]:
print(f" [{i['type']:8s}] {i['value'][:50]:50s} (confidence: {i.get('confidence', 0)})")
def main():
p = argparse.ArgumentParser(description="Threat intelligence lifecycle audit")
p.add_argument("--target", required=True, help="Target URL")
p.add_argument("--token", required=True, help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] Threat intelligence lifecycle audit")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
report["findings"].extend(audit_config(a.target, a.token))
report["findings"].extend(check_compliance(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
parser = argparse.ArgumentParser(description="Threat intelligence lifecycle management agent")
parser.add_argument("--source", required=True, help="IOC source file (JSON/CSV/text)")
parser.add_argument("--vt-key", help="VirusTotal API key (or VT_API_KEY env)")
parser.add_argument("--enrich", action="store_true", help="Enrich IOCs via VirusTotal")
parser.add_argument("--min-confidence", type=int, default=0, help="Min confidence to include")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
vt_key = args.vt_key or os.environ.get("VT_API_KEY", "")
raw_iocs = load_ioc_feed(args.source)
print(f"[*] Loaded {len(raw_iocs)} raw IOCs from {args.source}")
iocs = [normalize_ioc(ioc) for ioc in raw_iocs]
iocs = [i for i in iocs if i.get("value")]
# Deduplicate
seen = set()
unique_iocs = []
for ioc in iocs:
key = f"{ioc['type']}:{ioc['value']}"
if key not in seen:
seen.add(key)
unique_iocs.append(ioc)
iocs = unique_iocs
print(f"[*] {len(iocs)} unique IOCs after dedup")
enriched_count = 0
if args.enrich and vt_key:
print(f"[*] Enriching IOCs via VirusTotal...")
for ioc in iocs[:100]: # Rate limit
enrichment = enrich_ioc_virustotal(ioc["value"], ioc["type"], vt_key)
if enrichment:
ioc["enrichment"] = enrichment
enriched_count += 1
ioc["confidence"] = calculate_confidence(ioc, enrichment)
else:
for ioc in iocs:
ioc["confidence"] = calculate_confidence(ioc)
iocs = [i for i in iocs if i.get("confidence", 0) >= args.min_confidence]
iocs.sort(key=lambda x: -x.get("confidence", 0))
format_summary(iocs, enriched_count)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "TI Lifecycle Manager",
"source": args.source,
"total_iocs": len(iocs),
"enriched": enriched_count,
"iocs": iocs,
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
@@ -1,60 +1,185 @@
#!/usr/bin/env python3
"""Brand impersonation monitoring agent."""
import argparse, json
"""Brand impersonation monitoring agent.
Monitors for brand impersonation by checking Certificate Transparency
logs for suspicious domain registrations, performing DNS lookups for
typosquatting domains, and scanning social media profile names. Uses
crt.sh API and DNS resolution to identify potential phishing domains.
"""
import argparse
import json
import os
import socket
import sys
from datetime import datetime, timezone
try:
import requests
except ImportError:
requests = None
print("[!] 'requests' required: pip install requests", file=sys.stderr)
sys.exit(1)
def run_scan(target, token=None):
def generate_typosquat_variants(domain):
"""Generate common typosquatting domain variants."""
name, tld = domain.rsplit(".", 1) if "." in domain else (domain, "com")
variants = set()
chars = "abcdefghijklmnopqrstuvwxyz0123456789"
# Character omission
for i in range(len(name)):
variants.add(f"{name[:i]}{name[i+1:]}.{tld}")
# Character swap
for i in range(len(name) - 1):
swapped = list(name)
swapped[i], swapped[i+1] = swapped[i+1], swapped[i]
variants.add(f"{''.join(swapped)}.{tld}")
# Character replacement (adjacent keys)
adjacent = {"a": "sq", "e": "wr", "i": "uo", "o": "ip", "u": "yi",
"s": "ad", "n": "bm", "r": "et", "t": "ry", "l": "kp"}
for i, c in enumerate(name):
for adj in adjacent.get(c, ""):
variants.add(f"{name[:i]}{adj}{name[i+1:]}.{tld}")
# Character doubling
for i in range(len(name)):
variants.add(f"{name[:i]}{name[i]}{name[i:]}.{tld}")
# Homoglyph substitution
homoglyphs = {"o": "0", "l": "1", "i": "1", "e": "3", "a": "4", "s": "5"}
for i, c in enumerate(name):
if c in homoglyphs:
variants.add(f"{name[:i]}{homoglyphs[c]}{name[i+1:]}.{tld}")
# TLD variants
for alt_tld in ["com", "net", "org", "io", "co", "info", "biz", "xyz"]:
if alt_tld != tld:
variants.add(f"{name}.{alt_tld}")
# Prefix/suffix
for prefix in ["my", "the", "get", "go", "login", "secure", "account"]:
variants.add(f"{prefix}{name}.{tld}")
variants.add(f"{name}{prefix}.{tld}")
# Hyphen insertion
for i in range(1, len(name)):
variants.add(f"{name[:i]}-{name[i:]}.{tld}")
variants.discard(domain)
return sorted(variants)
def check_domain_resolution(domains, max_check=200):
"""Check which typosquat domains actually resolve."""
resolved = []
checked = 0
for domain in domains[:max_check]:
checked += 1
try:
ip = socket.gethostbyname(domain)
resolved.append({
"domain": domain,
"ip": ip,
"resolves": True,
"severity": "HIGH",
})
except socket.gaierror:
pass
print(f"[+] Checked {checked} domains, {len(resolved)} resolve to an IP")
return resolved
def search_certificate_transparency(domain):
"""Search crt.sh for certificates containing the brand name."""
print(f"[*] Searching Certificate Transparency for: {domain}")
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"} if token else {}
try:
resp = requests.get(f"{target}", headers=headers, timeout=15)
resp = requests.get(
f"https://crt.sh/?q=%25{domain}%25&output=json",
timeout=30,
)
if resp.status_code == 200:
findings.append({"check": "Target Accessible", "status": "OK", "severity": "INFO"})
else:
findings.append({"check": "Target Access", "status": f"HTTP {resp.status_code}", "severity": "MEDIUM"})
certs = resp.json()
seen_names = set()
for cert in certs:
common_name = cert.get("common_name", "")
if common_name not in seen_names and domain not in common_name.split(".")[-2:]:
seen_names.add(common_name)
findings.append({
"type": "ct_log",
"common_name": common_name,
"issuer": cert.get("issuer_name", ""),
"not_before": cert.get("not_before", ""),
"not_after": cert.get("not_after", ""),
"severity": "MEDIUM",
})
print(f"[+] Found {len(findings)} suspicious certificates")
except requests.RequestException as e:
findings.append({"error": str(e)})
print(f"[!] CT search error: {e}")
return findings
def analyze_results(target, token=None):
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"} if token else {}
try:
resp = requests.get(f"{target}/api/v1/results", headers=headers, timeout=15)
if resp.status_code == 200:
data = resp.json()
for item in data.get("findings", data.get("results", [])):
severity = item.get("severity", item.get("risk", "MEDIUM"))
findings.append({"check": item.get("name", item.get("title", "unknown")),
"severity": severity.upper() if isinstance(severity, str) else "MEDIUM"})
except requests.RequestException:
pass
return findings
def format_summary(domain, variants_count, resolved, ct_findings):
"""Print monitoring summary."""
print(f"\n{'='*60}")
print(f" Brand Impersonation Monitoring Report")
print(f"{'='*60}")
print(f" Brand Domain : {domain}")
print(f" Typosquat Variants : {variants_count}")
print(f" Resolved Domains : {len(resolved)}")
print(f" CT Log Matches : {len(ct_findings)}")
if resolved:
print(f"\n Active Typosquat Domains (resolving):")
for r in resolved[:20]:
print(f" [{r['severity']:6s}] {r['domain']:40s} -> {r['ip']}")
if ct_findings:
print(f"\n Certificate Transparency Findings:")
for f in ct_findings[:15]:
print(f" {f['common_name']:40s} (issued: {f['not_before'][:10]})")
def main():
p = argparse.ArgumentParser(description="Brand impersonation monitoring agent")
p.add_argument("--target", required=True, help="Target URL or IP")
p.add_argument("--token", help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] Brand impersonation monitoring agent")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "target": a.target, "findings": []}
report["findings"].extend(run_scan(a.target, a.token))
report["findings"].extend(analyze_results(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "CRITICAL" if high > 2 else "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
else:
parser = argparse.ArgumentParser(description="Brand impersonation monitoring agent")
parser.add_argument("--domain", required=True, help="Brand domain to monitor (e.g., example.com)")
parser.add_argument("--max-check", type=int, default=200, help="Max domains to DNS-check")
parser.add_argument("--skip-ct", action="store_true", help="Skip Certificate Transparency search")
parser.add_argument("--skip-dns", action="store_true", help="Skip DNS resolution check")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
variants = generate_typosquat_variants(args.domain)
print(f"[*] Generated {len(variants)} typosquat variants for {args.domain}")
resolved = []
if not args.skip_dns:
resolved = check_domain_resolution(variants, args.max_check)
ct_findings = []
if not args.skip_ct:
ct_findings = search_certificate_transparency(args.domain.split(".")[0])
format_summary(args.domain, len(variants), resolved, ct_findings)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "Brand Monitor",
"domain": args.domain,
"variants_generated": len(variants),
"resolved_domains": resolved,
"ct_findings": ct_findings,
"risk_level": (
"CRITICAL" if len(resolved) > 10
else "HIGH" if resolved
else "MEDIUM" if ct_findings
else "LOW"
),
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
@@ -1,60 +1,249 @@
#!/usr/bin/env python3
"""Dark web threat monitoring agent."""
import argparse, json
"""Dark web threat monitoring agent.
Monitors for organization-specific threats on the dark web by checking
breach databases (Have I Been Pwned API), paste sites, and public
threat intelligence feeds for leaked credentials, exposed data, and
mentions of organizational domains.
"""
import argparse
import hashlib
import json
import os
import sys
import time
from datetime import datetime, timezone
try:
import requests
except ImportError:
requests = None
print("[!] 'requests' required: pip install requests", file=sys.stderr)
sys.exit(1)
def run_scan(target, token=None):
def check_hibp_breaches(domain, api_key=None):
"""Check Have I Been Pwned for breaches involving a domain."""
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"} if token else {}
print(f"[*] Checking HIBP breaches for domain: {domain}")
headers = {"user-agent": "dark-web-monitor-agent"}
if api_key:
headers["hibp-api-key"] = api_key
try:
resp = requests.get(f"{target}", headers=headers, timeout=15)
if resp.status_code == 200:
findings.append({"check": "Target Accessible", "status": "OK", "severity": "INFO"})
else:
findings.append({"check": "Target Access", "status": f"HTTP {resp.status_code}", "severity": "MEDIUM"})
resp = requests.get(
f"https://haveibeenpwned.com/api/v3/breaches",
headers=headers, timeout=15,
)
resp.raise_for_status()
breaches = resp.json()
domain_breaches = [b for b in breaches if domain.lower() in b.get("Domain", "").lower()]
for breach in domain_breaches:
findings.append({
"type": "breach",
"source": "HIBP",
"name": breach.get("Name", ""),
"domain": breach.get("Domain", ""),
"breach_date": breach.get("BreachDate", ""),
"added_date": breach.get("AddedDate", ""),
"pwn_count": breach.get("PwnCount", 0),
"data_classes": breach.get("DataClasses", []),
"is_verified": breach.get("IsVerified", False),
"severity": "CRITICAL" if breach.get("PwnCount", 0) > 10000 else "HIGH",
})
print(f"[+] Found {len(domain_breaches)} breaches for {domain}")
except requests.RequestException as e:
findings.append({"error": str(e)})
print(f"[!] HIBP API error: {e}")
return findings
def analyze_results(target, token=None):
def check_hibp_email(email, api_key):
"""Check if a specific email appears in known breaches."""
if not api_key:
return []
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"} if token else {}
headers = {"hibp-api-key": api_key, "user-agent": "dark-web-monitor-agent"}
try:
resp = requests.get(f"{target}/api/v1/results", headers=headers, timeout=15)
resp = requests.get(
f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}",
headers=headers, params={"truncateResponse": "false"}, timeout=15,
)
if resp.status_code == 200:
data = resp.json()
for item in data.get("findings", data.get("results", [])):
severity = item.get("severity", item.get("risk", "MEDIUM"))
findings.append({"check": item.get("name", item.get("title", "unknown")),
"severity": severity.upper() if isinstance(severity, str) else "MEDIUM"})
breaches = resp.json()
for breach in breaches:
findings.append({
"type": "email_breach",
"email": email,
"breach": breach.get("Name", ""),
"breach_date": breach.get("BreachDate", ""),
"data_classes": breach.get("DataClasses", []),
"severity": "HIGH",
})
elif resp.status_code == 404:
pass # Not found in any breaches
time.sleep(1.5) # HIBP rate limit
except requests.RequestException:
pass
return findings
def check_hibp_password(password):
"""Check if a password appears in known breaches using k-anonymity."""
sha1 = hashlib.sha1(password.encode("utf-8")).hexdigest().upper()
prefix = sha1[:5]
suffix = sha1[5:]
try:
resp = requests.get(
f"https://api.pwnedpasswords.com/range/{prefix}",
timeout=10,
)
resp.raise_for_status()
for line in resp.text.splitlines():
parts = line.split(":")
if parts[0] == suffix:
count = int(parts[1])
return {"compromised": True, "count": count, "severity": "CRITICAL"}
return {"compromised": False, "count": 0, "severity": "INFO"}
except requests.RequestException:
return {"compromised": None, "error": "API unavailable"}
def check_paste_dumps(domain, api_key=None):
"""Check for organization mentions in paste sites via HIBP."""
findings = []
if not api_key:
return findings
# HIBP paste API requires per-email queries
return findings
def search_threat_intel_feeds(domain):
"""Search public threat intelligence for domain mentions."""
findings = []
print(f"[*] Checking public threat intelligence for: {domain}")
# Check URLhaus for malicious URLs from domain
try:
resp = requests.post(
"https://urlhaus-api.abuse.ch/v1/host/",
data={"host": domain}, timeout=15,
)
if resp.status_code == 200:
data = resp.json()
if data.get("query_status") == "ok":
urls = data.get("urls", [])
if urls:
findings.append({
"type": "malicious_urls",
"source": "URLhaus",
"domain": domain,
"count": len(urls),
"severity": "HIGH",
"detail": f"{len(urls)} malicious URLs associated with domain",
"samples": [u.get("url", "")[:80] for u in urls[:5]],
})
except requests.RequestException:
pass
# Check AbuseIPDB
abuse_key = os.environ.get("ABUSEIPDB_KEY", "")
if abuse_key:
try:
resp = requests.get(
"https://api.abuseipdb.com/api/v2/check-block",
headers={"Key": abuse_key, "Accept": "application/json"},
params={"network": domain}, timeout=15,
)
except requests.RequestException:
pass
return findings
def format_summary(all_findings, domain):
"""Print monitoring summary."""
print(f"\n{'='*60}")
print(f" Dark Web Threat Monitoring Report")
print(f"{'='*60}")
print(f" Target Domain: {domain}")
print(f" Total Findings: {len(all_findings)}")
by_type = {}
for f in all_findings:
t = f.get("type", "unknown")
by_type[t] = by_type.get(t, 0) + 1
if by_type:
print(f"\n By Type:")
for t, count in by_type.items():
print(f" {t:20s}: {count}")
breaches = [f for f in all_findings if f["type"] == "breach"]
if breaches:
print(f"\n Known Breaches ({len(breaches)}):")
for b in breaches:
print(f" [{b['severity']:8s}] {b['name']:25s} | "
f"Date: {b['breach_date']} | Records: {b.get('pwn_count', 'N/A')}")
if b.get("data_classes"):
print(f" Data: {', '.join(b['data_classes'][:5])}")
severity_counts = {}
for f in all_findings:
sev = f.get("severity", "INFO")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
return severity_counts
def main():
p = argparse.ArgumentParser(description="Dark web threat monitoring agent")
p.add_argument("--target", required=True, help="Target URL or IP")
p.add_argument("--token", help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] Dark web threat monitoring agent")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "target": a.target, "findings": []}
report["findings"].extend(run_scan(a.target, a.token))
report["findings"].extend(analyze_results(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "CRITICAL" if high > 2 else "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
else:
parser = argparse.ArgumentParser(description="Dark web threat monitoring agent")
parser.add_argument("--domain", required=True, help="Organization domain to monitor")
parser.add_argument("--emails", nargs="+", help="Specific emails to check")
parser.add_argument("--hibp-key", help="HIBP API key (or HIBP_API_KEY env)")
parser.add_argument("--check-passwords", nargs="+", help="Check passwords against breach DB")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
hibp_key = args.hibp_key or os.environ.get("HIBP_API_KEY", "")
all_findings = []
all_findings.extend(check_hibp_breaches(args.domain, hibp_key))
if args.emails and hibp_key:
for email in args.emails:
all_findings.extend(check_hibp_email(email, hibp_key))
if args.check_passwords:
for pwd in args.check_passwords:
result = check_hibp_password(pwd)
if result.get("compromised"):
all_findings.append({
"type": "compromised_password",
"severity": "CRITICAL",
"detail": f"Password found in {result['count']} breaches",
})
all_findings.extend(search_threat_intel_feeds(args.domain))
severity_counts = format_summary(all_findings, args.domain)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "Dark Web Monitor",
"domain": args.domain,
"findings": all_findings,
"severity_counts": severity_counts,
"risk_level": (
"CRITICAL" if severity_counts.get("CRITICAL", 0) > 0
else "HIGH" if severity_counts.get("HIGH", 0) > 0
else "MEDIUM" if all_findings else "LOW"
),
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()