diff --git a/skills/configuring-windows-event-logging-for-detection/SKILL.md b/skills/configuring-windows-event-logging-for-detection/SKILL.md index 05455bf5..d4699648 100644 --- a/skills/configuring-windows-event-logging-for-detection/SKILL.md +++ b/skills/configuring-windows-event-logging-for-detection/SKILL.md @@ -25,6 +25,13 @@ Use this skill when: **Do not use** for Sysmon configuration (separate skill) or Linux audit logging. +## Prerequisites + +- Windows Server or Windows 10/11 systems with Group Policy management access +- Active Directory environment with Group Policy Object (GPO) creation privileges +- SIEM platform configured to receive Windows Event Log forwarding +- Understanding of Windows security event IDs and audit categories + ## Workflow ### Step 1: Configure Advanced Audit Policy via GPO diff --git a/skills/implementing-memory-protection-with-dep-aslr/SKILL.md b/skills/implementing-memory-protection-with-dep-aslr/SKILL.md index c1247566..9e12fcd9 100644 --- a/skills/implementing-memory-protection-with-dep-aslr/SKILL.md +++ b/skills/implementing-memory-protection-with-dep-aslr/SKILL.md @@ -19,6 +19,13 @@ license: Apache-2.0 Use this skill when hardening endpoints against memory-based exploits by configuring DEP, ASLR, CFG, and Windows Exploit Protection system-wide and per-application mitigations. +## Prerequisites + +- Windows 10/11 or Windows Server 2016+ with administrative privileges +- Group Policy management access for enterprise-wide deployment +- Understanding of memory corruption attack techniques (buffer overflow, ROP chains) +- Test environment for validating application compatibility with exploit mitigations + ## Workflow ### Step 1: Configure System-Level Mitigations diff --git a/skills/implementing-privileged-access-management-with-cyberark/scripts/agent.py b/skills/implementing-privileged-access-management-with-cyberark/scripts/agent.py index e0ba5e81..061117f7 100644 --- a/skills/implementing-privileged-access-management-with-cyberark/scripts/agent.py +++ b/skills/implementing-privileged-access-management-with-cyberark/scripts/agent.py @@ -1,61 +1,252 @@ #!/usr/bin/env python3 -"""CyberArk PAM configuration audit.""" -import argparse, json +"""CyberArk PAM configuration audit agent. + +Audits CyberArk Privileged Access Management via the REST API to +verify safe configurations, privileged account inventory, platform +assignments, and password rotation compliance. +""" +import argparse +import json +import os +import sys from datetime import datetime, timezone + try: import requests except ImportError: - requests = None + print("[!] 'requests' required: pip install requests", file=sys.stderr) + sys.exit(1) -def audit_config(target, token): - findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10) - if resp.status_code == 200: - data = resp.json() - if not data.get("enabled", True): - findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"}) - elif resp.status_code == 401: - findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"}) - except requests.RequestException as e: - findings.append({"error": str(e)}) - return findings -def check_compliance(target, token): +def get_cyberark_config(): + """Return CyberArk PVWA URL.""" + pvwa_url = os.environ.get("CYBERARK_PVWA_URL", "").rstrip("/") + if not pvwa_url: + print("[!] Set CYBERARK_PVWA_URL env var", file=sys.stderr) + sys.exit(1) + return pvwa_url + + +def authenticate(pvwa_url, username, password, auth_type="CyberArk"): + """Authenticate and get session token.""" + url = f"{pvwa_url}/PasswordVault/API/Auth/{auth_type}/Logon" + resp = requests.post(url, json={"username": username, "password": password}, + verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", timeout=30) # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments + resp.raise_for_status() + token = resp.json().strip('"') + print(f"[+] Authenticated as {username}") + return token + + +def api_call(pvwa_url, token, endpoint, method="GET", data=None, params=None): + """Make authenticated API call.""" + url = f"{pvwa_url}/PasswordVault/API{endpoint}" + headers = {"Authorization": token, "Content-Type": "application/json"} + if method == "POST": + resp = requests.post(url, headers=headers, json=data, params=params, + verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", timeout=30) # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments + else: + resp = requests.get(url, headers=headers, params=params, + verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", timeout=30) # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments + resp.raise_for_status() + return resp.json() + + +def audit_safes(pvwa_url, token): + """Audit safe configurations.""" findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} + print("[*] Auditing safes...") + data = api_call(pvwa_url, token, "/Safes", params={"limit": 1000}) + safes = data.get("value", data.get("Safes", [])) + + for safe in safes: + name = safe.get("safeName", safe.get("SafeName", "")) + member_count = safe.get("numberOfMembers", safe.get("NumberOfMembers", 0)) + days_retention = safe.get("numberOfDaysRetention", safe.get("NumberOfDaysRetention", 0)) + versions = safe.get("numberOfVersionsRetention", safe.get("NumberOfVersionsRetention", 0)) + + if member_count == 0: + findings.append({ + "safe": name, "check": "Empty safe (no members)", + "severity": "MEDIUM", "detail": "Safe has no members assigned", + }) + if days_retention == 0 and versions == 0: + findings.append({ + "safe": name, "check": "No retention policy", + "severity": "HIGH", + "detail": "No password history retention configured", + }) + + print(f"[+] Audited {len(safes)} safes") + return findings, safes + + +def audit_accounts(pvwa_url, token, safe_name=None): + """Audit privileged accounts.""" + findings = [] + print("[*] Auditing privileged accounts...") + params = {"limit": 1000} + if safe_name: + params["filter"] = f"safeName eq {safe_name}" + data = api_call(pvwa_url, token, "/Accounts", params=params) + accounts = data.get("value", []) + + now = datetime.now(timezone.utc) + for acct in accounts: + acct_name = acct.get("name", "") + platform = acct.get("platformId", "") + safe = acct.get("safeName", "") + secret_mgmt = acct.get("secretManagement", {}) + last_modified = secret_mgmt.get("lastModifiedTime", 0) + auto_mgmt = secret_mgmt.get("automaticManagementEnabled", False) + status = secret_mgmt.get("status", "") + + if not auto_mgmt: + findings.append({ + "account": acct_name, "safe": safe, "platform": platform, + "check": "Automatic password management disabled", + "severity": "HIGH", + "detail": "Password not managed by CyberArk CPM", + }) + + if last_modified: + last_mod_dt = datetime.fromtimestamp(last_modified, tz=timezone.utc) + age_days = (now - last_mod_dt).days + if age_days > 90: + findings.append({ + "account": acct_name, "safe": safe, + "check": "Password age exceeds 90 days", + "severity": "HIGH", + "detail": f"Last rotated {age_days} days ago", + }) + + if status and status != "success": + findings.append({ + "account": acct_name, "safe": safe, + "check": f"CPM status: {status}", + "severity": "CRITICAL" if "fail" in status.lower() else "MEDIUM", + "detail": f"Password management status: {status}", + }) + + print(f"[+] Audited {len(accounts)} accounts") + return findings, accounts + + +def audit_platforms(pvwa_url, token): + """Audit platform configurations.""" + findings = [] + print("[*] Auditing platforms...") + data = api_call(pvwa_url, token, "/Platforms") + platforms = data.get("Platforms", data.get("value", [])) + + for plat in platforms: + name = plat.get("Name", plat.get("PlatformID", "")) + active = plat.get("Active", True) + if not active: + continue + priv_session = plat.get("PrivilegedSessionManagement", {}) + if not priv_session.get("PSMServerId"): + findings.append({ + "platform": name, + "check": "No PSM configured", + "severity": "MEDIUM", + "detail": "Platform missing privileged session management", + }) + + print(f"[+] Audited {len(platforms)} platforms") + return findings, platforms + + +def logoff(pvwa_url, token): + """End the CyberArk session.""" try: - resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10) - if resp.status_code == 200: - for item in resp.json().get("checks", []): - if item.get("status") != "PASS": - findings.append({"check": item.get("name"), "status": item.get("status"), - "severity": item.get("severity", "MEDIUM")}) + requests.post(f"{pvwa_url}/PasswordVault/API/Auth/Logoff", + headers={"Authorization": token}, + verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", timeout=10) # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments + print("[+] Session ended") except requests.RequestException: pass - return findings + + +def format_summary(safe_findings, acct_findings, plat_findings, safes, accounts): + """Print audit summary.""" + all_findings = safe_findings + acct_findings + plat_findings + print(f"\n{'='*60}") + print(f" CyberArk PAM Audit Report") + print(f"{'='*60}") + print(f" Safes : {len(safes)}") + print(f" Accounts : {len(accounts)}") + print(f" Findings : {len(all_findings)}") + + severity_counts = {} + for f in all_findings: + sev = f.get("severity", "INFO") + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + + print(f"\n By Severity:") + for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]: + count = severity_counts.get(sev, 0) + if count: + print(f" {sev:10s}: {count}") + + if all_findings: + print(f"\n Top Issues:") + for f in all_findings[:15]: + if f["severity"] in ("CRITICAL", "HIGH"): + print(f" [{f['severity']:8s}] {f['check']}: " + f"{f.get('account', f.get('safe', f.get('platform', '')))}") + + return severity_counts + def main(): - p = argparse.ArgumentParser(description="CyberArk PAM configuration audit") - p.add_argument("--target", required=True, help="Target URL") - p.add_argument("--token", required=True, help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] CyberArk PAM configuration audit") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []} - report["findings"].extend(audit_config(a.target, a.token)) - report["findings"].extend(check_compliance(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) - else: + parser = argparse.ArgumentParser(description="CyberArk PAM configuration audit agent") + parser.add_argument("--pvwa-url", help="PVWA URL (or CYBERARK_PVWA_URL env)") + parser.add_argument("--username", required=True, help="CyberArk username") + parser.add_argument("--password", required=True, help="CyberArk password") + parser.add_argument("--auth-type", default="CyberArk", + choices=["CyberArk", "LDAP", "RADIUS", "Windows"]) + parser.add_argument("--safe", help="Audit specific safe only") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + if args.pvwa_url: + os.environ["CYBERARK_PVWA_URL"] = args.pvwa_url + pvwa_url = get_cyberark_config() + + token = authenticate(pvwa_url, args.username, args.password, args.auth_type) + try: + safe_findings, safes = audit_safes(pvwa_url, token) + acct_findings, accounts = audit_accounts(pvwa_url, token, args.safe) + plat_findings, platforms = audit_platforms(pvwa_url, token) + finally: + logoff(pvwa_url, token) + + severity_counts = format_summary(safe_findings, acct_findings, plat_findings, safes, accounts) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "CyberArk PAM Audit", + "safes_count": len(safes), + "accounts_count": len(accounts), + "findings": safe_findings + acct_findings + plat_findings, + "severity_counts": severity_counts, + "risk_level": ( + "CRITICAL" if severity_counts.get("CRITICAL", 0) > 0 + else "HIGH" if severity_counts.get("HIGH", 0) > 0 + else "MEDIUM" if severity_counts.get("MEDIUM", 0) > 0 + else "LOW" + ), + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main() diff --git a/skills/implementing-proofpoint-email-security-gateway/scripts/agent.py b/skills/implementing-proofpoint-email-security-gateway/scripts/agent.py index 37b6f96e..f8dfcad2 100644 --- a/skills/implementing-proofpoint-email-security-gateway/scripts/agent.py +++ b/skills/implementing-proofpoint-email-security-gateway/scripts/agent.py @@ -1,61 +1,210 @@ #!/usr/bin/env python3 -"""Proofpoint email security gateway audit.""" -import argparse, json -from datetime import datetime, timezone +"""Proofpoint email security gateway audit agent. + +Audits Proofpoint TAP (Targeted Attack Protection) via the SIEM API +to retrieve blocked threats, clicked URLs, delivered messages, and +campaign attribution data for email security monitoring. +""" +import argparse +import json +import os +import sys +from datetime import datetime, timezone, timedelta + try: import requests + from requests.auth import HTTPBasicAuth except ImportError: - requests = None + print("[!] 'requests' required: pip install requests", file=sys.stderr) + sys.exit(1) -def audit_config(target, token): +PROOFPOINT_BASE = "https://tap-api-v2.proofpoint.com" + + +def get_pp_config(): + """Return Proofpoint TAP API credentials.""" + principal = os.environ.get("PROOFPOINT_PRINCIPAL", "") + secret = os.environ.get("PROOFPOINT_SECRET", "") + if not principal or not secret: + print("[!] Set PROOFPOINT_PRINCIPAL and PROOFPOINT_SECRET env vars", file=sys.stderr) + sys.exit(1) + return principal, secret + + +def pp_api(endpoint, principal, secret, params=None): + """Make authenticated Proofpoint TAP API call.""" + url = f"{PROOFPOINT_BASE}{endpoint}" + resp = requests.get(url, auth=HTTPBasicAuth(principal, secret), + params=params, timeout=30) + resp.raise_for_status() + return resp.json() + + +def get_blocked_clicks(principal, secret, hours=24): + """Get URLs that were blocked when clicked.""" + print(f"[*] Fetching blocked clicks (last {hours}h)...") + since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ") + data = pp_api("/v2/siem/clicks/blocked", principal, secret, + params={"sinceTime": since, "format": "json"}) + clicks = data.get("clicksBlocked", []) + print(f"[+] {len(clicks)} blocked clicks") + return clicks + + +def get_blocked_messages(principal, secret, hours=24): + """Get messages that were blocked.""" + print(f"[*] Fetching blocked messages (last {hours}h)...") + since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ") + data = pp_api("/v2/siem/messages/blocked", principal, secret, + params={"sinceTime": since, "format": "json"}) + messages = data.get("messagesBlocked", []) + print(f"[+] {len(messages)} blocked messages") + return messages + + +def get_delivered_threats(principal, secret, hours=24): + """Get threats that were delivered (missed by filters).""" + print(f"[*] Fetching delivered threats (last {hours}h)...") + since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ") + data = pp_api("/v2/siem/messages/delivered", principal, secret, + params={"sinceTime": since, "format": "json"}) + messages = data.get("messagesDelivered", []) + threats = [m for m in messages if m.get("threatsInfoMap")] + print(f"[+] {len(messages)} delivered, {len(threats)} with threats") + return threats + + +def get_permitted_clicks(principal, secret, hours=24): + """Get URLs that were permitted when clicked (potential misses).""" + print(f"[*] Fetching permitted clicks (last {hours}h)...") + since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ") + data = pp_api("/v2/siem/clicks/permitted", principal, secret, + params={"sinceTime": since, "format": "json"}) + clicks = data.get("clicksPermitted", []) + print(f"[+] {len(clicks)} permitted clicks") + return clicks + + +def analyze_threats(blocked_msgs, delivered_threats, blocked_clicks, permitted_clicks): + """Analyze threat data for security insights.""" findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10) - if resp.status_code == 200: - data = resp.json() - if not data.get("enabled", True): - findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"}) - elif resp.status_code == 401: - findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"}) - except requests.RequestException as e: - findings.append({"error": str(e)}) + + # Delivered threats are highest priority + for msg in delivered_threats: + for threat_info in msg.get("threatsInfoMap", []): + findings.append({ + "type": "delivered_threat", + "severity": "CRITICAL", + "threat_type": threat_info.get("threatType", ""), + "classification": threat_info.get("classification", ""), + "threat_url": threat_info.get("threat", "")[:100], + "recipient": msg.get("recipient", [""])[0] if msg.get("recipient") else "", + "sender": msg.get("sender", ""), + "subject": msg.get("subject", "")[:80], + "timestamp": msg.get("messageTime", ""), + }) + + # Summarize blocked activity + threat_types = {} + for msg in blocked_msgs: + for t in msg.get("threatsInfoMap", []): + tt = t.get("threatType", "unknown") + threat_types[tt] = threat_types.get(tt, 0) + 1 + + if threat_types: + findings.append({ + "type": "blocked_summary", + "severity": "INFO", + "detail": f"Blocked threats by type: {json.dumps(threat_types)}", + "total_blocked": len(blocked_msgs), + }) + + # Permitted clicks on potentially malicious URLs + for click in permitted_clicks: + if click.get("threatStatus") == "active": + findings.append({ + "type": "permitted_malicious_click", + "severity": "HIGH", + "url": click.get("url", "")[:100], + "user": click.get("recipient", [""])[0] if click.get("recipient") else "", + "click_time": click.get("clickTime", ""), + }) + return findings -def check_compliance(target, token): - findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10) - if resp.status_code == 200: - for item in resp.json().get("checks", []): - if item.get("status") != "PASS": - findings.append({"check": item.get("name"), "status": item.get("status"), - "severity": item.get("severity", "MEDIUM")}) - except requests.RequestException: - pass - return findings + +def format_summary(findings, blocked_msgs, delivered, blocked_clicks, permitted_clicks): + """Print email security summary.""" + print(f"\n{'='*60}") + print(f" Proofpoint Email Security Report") + print(f"{'='*60}") + print(f" Blocked Messages : {len(blocked_msgs)}") + print(f" Delivered Threats : {len(delivered)}") + print(f" Blocked Clicks : {len(blocked_clicks)}") + print(f" Permitted Clicks : {len(permitted_clicks)}") + print(f" Security Findings : {len(findings)}") + + critical = [f for f in findings if f["severity"] == "CRITICAL"] + if critical: + print(f"\n CRITICAL - Threats That Bypassed Filters ({len(critical)}):") + for f in critical[:10]: + print(f" {f.get('threat_type', 'N/A'):15s} | " + f"To: {f.get('recipient', 'N/A'):30s} | " + f"{f.get('subject', '')[:40]}") + + severity_counts = {} + for f in findings: + sev = f.get("severity", "INFO") + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + return severity_counts + def main(): - p = argparse.ArgumentParser(description="Proofpoint email security gateway audit") - p.add_argument("--target", required=True, help="Target URL") - p.add_argument("--token", required=True, help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] Proofpoint email security gateway audit") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []} - report["findings"].extend(audit_config(a.target, a.token)) - report["findings"].extend(check_compliance(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) - else: + parser = argparse.ArgumentParser(description="Proofpoint email security audit agent") + parser.add_argument("--principal", help="TAP API principal (or PROOFPOINT_PRINCIPAL env)") + parser.add_argument("--secret", help="TAP API secret (or PROOFPOINT_SECRET env)") + parser.add_argument("--hours", type=int, default=24, help="Hours to look back (default: 24)") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + if args.principal: + os.environ["PROOFPOINT_PRINCIPAL"] = args.principal + if args.secret: + os.environ["PROOFPOINT_SECRET"] = args.secret + + principal, secret = get_pp_config() + + blocked_msgs = get_blocked_messages(principal, secret, args.hours) + delivered = get_delivered_threats(principal, secret, args.hours) + blocked_clicks = get_blocked_clicks(principal, secret, args.hours) + permitted_clicks = get_permitted_clicks(principal, secret, args.hours) + + findings = analyze_threats(blocked_msgs, delivered, blocked_clicks, permitted_clicks) + severity_counts = format_summary(findings, blocked_msgs, delivered, blocked_clicks, permitted_clicks) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "Proofpoint TAP", + "period_hours": args.hours, + "blocked_messages": len(blocked_msgs), + "delivered_threats": len(delivered), + "findings": findings, + "severity_counts": severity_counts, + "risk_level": ( + "CRITICAL" if severity_counts.get("CRITICAL", 0) > 0 + else "HIGH" if severity_counts.get("HIGH", 0) > 0 + else "LOW" + ), + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main() diff --git a/skills/implementing-purdue-model-network-segmentation/scripts/agent.py b/skills/implementing-purdue-model-network-segmentation/scripts/agent.py index 6cc27a87..51512583 100644 --- a/skills/implementing-purdue-model-network-segmentation/scripts/agent.py +++ b/skills/implementing-purdue-model-network-segmentation/scripts/agent.py @@ -1,61 +1,213 @@ #!/usr/bin/env python3 -"""Purdue model OT network segmentation audit.""" -import argparse, json +"""Purdue model OT network segmentation audit agent. + +Audits OT/ICS network segmentation against the Purdue Enterprise +Reference Architecture by testing connectivity between network zones, +verifying firewall rules, and mapping discovered hosts to Purdue levels. +""" +import argparse +import json +import os +import socket +import subprocess +import sys from datetime import datetime, timezone -try: - import requests -except ImportError: - requests = None -def audit_config(target, token): + +PURDUE_LEVELS = { + 0: {"name": "Process", "description": "Sensors, actuators, field devices"}, + 1: {"name": "Basic Control", "description": "PLCs, RTUs, safety systems"}, + 2: {"name": "Area Supervisory", "description": "HMIs, SCADA, historian"}, + 3: {"name": "Site Operations", "description": "Patch mgmt, AV, file servers"}, + 3.5: {"name": "DMZ", "description": "Industrial DMZ between IT and OT"}, + 4: {"name": "Site Business", "description": "ERP, email, corporate apps"}, + 5: {"name": "Enterprise", "description": "Internet, cloud, remote access"}, +} + +PROHIBITED_FLOWS = [ + {"from_level": 5, "to_level": 0, "description": "Internet to Process (critical violation)"}, + {"from_level": 5, "to_level": 1, "description": "Internet to Basic Control"}, + {"from_level": 5, "to_level": 2, "description": "Internet to SCADA"}, + {"from_level": 4, "to_level": 0, "description": "Corporate to Process"}, + {"from_level": 4, "to_level": 1, "description": "Corporate to PLC/RTU"}, +] + + +def test_connectivity(source_ip, target_ip, ports, timeout=3): + """Test TCP connectivity between two hosts on specified ports.""" + results = [] + for port in ports: + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + result = sock.connect_ex((target_ip, port)) + sock.close() + reachable = result == 0 + results.append({ + "target": target_ip, + "port": port, + "reachable": reachable, + }) + except (socket.error, OSError): + results.append({"target": target_ip, "port": port, "reachable": False}) + return results + + +def audit_zone_separation(zone_map): + """Audit network segmentation between Purdue zones.""" findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10) - if resp.status_code == 200: - data = resp.json() - if not data.get("enabled", True): - findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"}) - elif resp.status_code == 401: - findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"}) - except requests.RequestException as e: - findings.append({"error": str(e)}) + print("[*] Auditing zone separation...") + + for flow in PROHIBITED_FLOWS: + from_level = flow["from_level"] + to_level = flow["to_level"] + from_hosts = zone_map.get(str(from_level), []) + to_hosts = zone_map.get(str(to_level), []) + + for src in from_hosts[:3]: + for dst in to_hosts[:3]: + common_ports = [22, 80, 443, 502, 102, 44818, 47808, 20000] + results = test_connectivity(src, dst, common_ports) + open_ports = [r for r in results if r["reachable"]] + if open_ports: + findings.append({ + "check": f"Zone {from_level} -> Zone {to_level}", + "severity": "CRITICAL", + "source": src, + "destination": dst, + "open_ports": [r["port"] for r in open_ports], + "detail": flow["description"], + "recommendation": "Block traffic between these zones via firewall", + }) + + if not findings: + findings.append({ + "check": "Prohibited zone flows", + "severity": "INFO", + "detail": "No prohibited cross-zone connectivity detected", + }) + return findings -def check_compliance(target, token): + +def audit_ot_protocols(target_ips): + """Check for exposed OT/ICS protocols on target hosts.""" findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10) - if resp.status_code == 200: - for item in resp.json().get("checks", []): - if item.get("status") != "PASS": - findings.append({"check": item.get("name"), "status": item.get("status"), - "severity": item.get("severity", "MEDIUM")}) - except requests.RequestException: - pass + ot_ports = { + 502: "Modbus TCP", + 102: "S7comm (Siemens)", + 44818: "EtherNet/IP", + 47808: "BACnet", + 20000: "DNP3", + 4840: "OPC UA", + 2222: "EtherCAT", + 1911: "Niagara Fox", + 9600: "OMRON FINS", + } + + print(f"[*] Scanning {len(target_ips)} hosts for exposed OT protocols...") + for ip in target_ips: + for port, protocol in ot_ports.items(): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(2) + result = sock.connect_ex((ip, port)) + sock.close() + if result == 0: + findings.append({ + "check": f"Exposed OT protocol: {protocol}", + "severity": "HIGH", + "host": ip, + "port": port, + "protocol": protocol, + "detail": f"{protocol} on {ip}:{port} is accessible", + }) + except (socket.error, OSError): + pass + + print(f"[+] Found {len(findings)} exposed OT protocols") return findings + +def load_zone_map(config_path): + """Load zone-to-host mapping from config file.""" + with open(config_path, "r") as f: + return json.load(f) + + +def format_summary(zone_findings, protocol_findings, zone_map): + """Print audit summary.""" + all_findings = zone_findings + protocol_findings + print(f"\n{'='*60}") + print(f" Purdue Model Network Segmentation Audit") + print(f"{'='*60}") + + for level, info in sorted(PURDUE_LEVELS.items()): + host_count = len(zone_map.get(str(level), [])) + print(f" Level {level}: {info['name']:20s} ({host_count} hosts) - {info['description']}") + + print(f"\n Zone Separation Findings : {len(zone_findings)}") + print(f" Protocol Exposure Findings: {len(protocol_findings)}") + + severity_counts = {} + for f in all_findings: + sev = f.get("severity", "INFO") + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + + if all_findings: + print(f"\n Critical/High Issues:") + for f in all_findings: + if f["severity"] in ("CRITICAL", "HIGH"): + print(f" [{f['severity']:8s}] {f['check']}: {f.get('detail', '')[:50]}") + + return severity_counts + + def main(): - p = argparse.ArgumentParser(description="Purdue model OT network segmentation audit") - p.add_argument("--target", required=True, help="Target URL") - p.add_argument("--token", required=True, help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] Purdue model OT network segmentation audit") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []} - report["findings"].extend(audit_config(a.target, a.token)) - report["findings"].extend(check_compliance(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) - else: + parser = argparse.ArgumentParser( + description="Purdue model OT network segmentation audit agent" + ) + parser.add_argument("--zone-map", required=True, + help="JSON file mapping Purdue levels to host IPs") + parser.add_argument("--scan-protocols", action="store_true", + help="Scan for exposed OT protocols") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + zone_map = load_zone_map(args.zone_map) + zone_findings = audit_zone_separation(zone_map) + + protocol_findings = [] + if args.scan_protocols: + all_hosts = [] + for level_hosts in zone_map.values(): + all_hosts.extend(level_hosts) + protocol_findings = audit_ot_protocols(list(set(all_hosts))) + + severity_counts = format_summary(zone_findings, protocol_findings, zone_map) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "Purdue Model Audit", + "zone_map": zone_map, + "zone_findings": zone_findings, + "protocol_findings": protocol_findings, + "severity_counts": severity_counts, + "risk_level": ( + "CRITICAL" if severity_counts.get("CRITICAL", 0) > 0 + else "HIGH" if severity_counts.get("HIGH", 0) > 0 + else "LOW" + ), + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main() diff --git a/skills/implementing-scim-provisioning-with-okta/scripts/agent.py b/skills/implementing-scim-provisioning-with-okta/scripts/agent.py index a31bc59d..249e2a9a 100644 --- a/skills/implementing-scim-provisioning-with-okta/scripts/agent.py +++ b/skills/implementing-scim-provisioning-with-okta/scripts/agent.py @@ -1,61 +1,254 @@ #!/usr/bin/env python3 -"""Okta SCIM provisioning audit.""" -import argparse, json +"""Okta SCIM provisioning audit agent. + +Audits Okta SCIM provisioning configuration by querying the Okta API +for provisioned applications, user assignments, group memberships, and +deprovisioning status. Identifies orphaned accounts, mismatched +assignments, and provisioning failures. +""" +import argparse +import json +import os +import sys from datetime import datetime, timezone + try: import requests except ImportError: - requests = None + print("[!] 'requests' required: pip install requests", file=sys.stderr) + sys.exit(1) -def audit_config(target, token): + +def get_okta_config(): + """Return Okta org URL and API token.""" + org_url = os.environ.get("OKTA_ORG_URL", "").rstrip("/") + api_token = os.environ.get("OKTA_API_TOKEN", "") + if not org_url or not api_token: + print("[!] Set OKTA_ORG_URL and OKTA_API_TOKEN env vars", file=sys.stderr) + sys.exit(1) + return org_url, api_token + + +def okta_api(org_url, token, endpoint, params=None): + """Make authenticated Okta API call with pagination.""" + url = f"{org_url}/api/v1{endpoint}" + headers = {"Authorization": f"SSWS {token}", "Accept": "application/json"} + all_results = [] + while url: + resp = requests.get(url, headers=headers, params=params, timeout=30) + resp.raise_for_status() + all_results.extend(resp.json()) + links = resp.links + url = links.get("next", {}).get("url") + params = None # Pagination URL includes params + return all_results + + +def list_provisioning_apps(org_url, token): + """List applications with SCIM provisioning enabled.""" + print("[*] Fetching provisioning-enabled applications...") + apps = okta_api(org_url, token, "/apps", params={"limit": 200}) + scim_apps = [] + for app in apps: + features = app.get("features", []) + if any(f in features for f in [ + "PUSH_NEW_USERS", "PUSH_USER_DEACTIVATION", + "IMPORT_NEW_USERS", "PUSH_PROFILE_UPDATES" + ]): + scim_apps.append({ + "id": app.get("id"), + "name": app.get("name", ""), + "label": app.get("label", ""), + "status": app.get("status", ""), + "features": features, + "sign_on_mode": app.get("signOnMode", ""), + "created": app.get("created", ""), + }) + print(f"[+] Found {len(scim_apps)} SCIM-enabled apps") + return scim_apps + + +def audit_app_assignments(org_url, token, app_id, app_label): + """Audit user assignments for a provisioning app.""" findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10) - if resp.status_code == 200: - data = resp.json() - if not data.get("enabled", True): - findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"}) - elif resp.status_code == 401: - findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"}) - except requests.RequestException as e: - findings.append({"error": str(e)}) + print(f"[*] Auditing assignments for: {app_label}") + users = okta_api(org_url, token, f"/apps/{app_id}/users", params={"limit": 200}) + + status_counts = {} + for user in users: + status = user.get("status", "unknown") + status_counts[status] = status_counts.get(status, 0) + 1 + scope = user.get("scope", "") + sync_state = user.get("syncState", "") + + if status == "PROVISIONED" and sync_state == "ERROR": + findings.append({ + "app": app_label, + "user": user.get("credentials", {}).get("userName", "unknown"), + "check": "Provisioning sync error", + "severity": "HIGH", + "detail": f"User sync state: ERROR (status: {status})", + }) + if status == "DEPROVISIONED": + findings.append({ + "app": app_label, + "user": user.get("credentials", {}).get("userName", "unknown"), + "check": "Deprovisioned user still assigned", + "severity": "MEDIUM", + "detail": "User is deprovisioned but assignment exists", + }) + + findings.append({ + "app": app_label, + "check": "Assignment summary", + "severity": "INFO", + "detail": f"Total: {len(users)}, Status: {json.dumps(status_counts)}", + }) + return findings, users + + +def audit_group_assignments(org_url, token, app_id, app_label): + """Audit group-based provisioning assignments.""" + findings = [] + groups = okta_api(org_url, token, f"/apps/{app_id}/groups", params={"limit": 200}) + if not groups: + findings.append({ + "app": app_label, + "check": "Group assignments", + "severity": "MEDIUM", + "detail": "No group assignments found (user-level only)", + }) + else: + for group in groups: + group_id = group.get("id", "") + priority = group.get("priority", 0) + findings.append({ + "app": app_label, + "check": f"Group assignment: {group_id}", + "severity": "INFO", + "detail": f"Priority: {priority}", + }) return findings -def check_compliance(target, token): + +def check_deprovisioning(org_url, token): + """Check for deactivated Okta users that still have active app assignments.""" findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10) - if resp.status_code == 200: - for item in resp.json().get("checks", []): - if item.get("status") != "PASS": - findings.append({"check": item.get("name"), "status": item.get("status"), - "severity": item.get("severity", "MEDIUM")}) - except requests.RequestException: - pass + print("[*] Checking for orphaned provisioning assignments...") + deactivated = okta_api(org_url, token, "/users", + params={"filter": 'status eq "DEPROVISIONED"', "limit": 200}) + for user in deactivated[:50]: # Check first 50 + user_id = user.get("id") + login = user.get("profile", {}).get("login", "unknown") + try: + apps = okta_api(org_url, token, f"/users/{user_id}/appLinks") + if apps: + findings.append({ + "check": "Orphaned app assignment", + "user": login, + "severity": "HIGH", + "detail": f"Deactivated user has {len(apps)} active app link(s)", + "apps": [a.get("appName", "") for a in apps[:5]], + }) + except requests.RequestException: + pass + + if not findings: + findings.append({ + "check": "Deprovisioning audit", + "severity": "INFO", + "detail": f"No orphaned assignments found ({len(deactivated)} deactivated users checked)", + }) return findings + +def format_summary(scim_apps, all_findings): + """Print audit summary.""" + print(f"\n{'='*60}") + print(f" Okta SCIM Provisioning Audit Report") + print(f"{'='*60}") + print(f" SCIM Apps : {len(scim_apps)}") + print(f" Findings : {len(all_findings)}") + + severity_counts = {} + for f in all_findings: + sev = f.get("severity", "INFO") + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + + print(f"\n By Severity:") + for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]: + count = severity_counts.get(sev, 0) + if count: + print(f" {sev:10s}: {count}") + + if scim_apps: + print(f"\n Provisioning-Enabled Apps:") + for app in scim_apps: + print(f" {app['label']:30s} | {app['status']:10s} | " + f"Features: {', '.join(app['features'][:3])}") + + issues = [f for f in all_findings if f["severity"] in ("CRITICAL", "HIGH")] + if issues: + print(f"\n Issues Requiring Attention:") + for f in issues[:15]: + print(f" [{f['severity']:8s}] {f['check']}: {f.get('detail', '')[:50]}") + + return severity_counts + + def main(): - p = argparse.ArgumentParser(description="Okta SCIM provisioning audit") - p.add_argument("--target", required=True, help="Target URL") - p.add_argument("--token", required=True, help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] Okta SCIM provisioning audit") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []} - report["findings"].extend(audit_config(a.target, a.token)) - report["findings"].extend(check_compliance(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) - else: + parser = argparse.ArgumentParser(description="Okta SCIM provisioning audit agent") + parser.add_argument("--org-url", help="Okta org URL (or OKTA_ORG_URL env)") + parser.add_argument("--token", help="API token (or OKTA_API_TOKEN env)") + parser.add_argument("--app-id", help="Audit a specific app ID") + parser.add_argument("--skip-deprovisioning", action="store_true") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + if args.org_url: + os.environ["OKTA_ORG_URL"] = args.org_url + if args.token: + os.environ["OKTA_API_TOKEN"] = args.token + + org_url, token = get_okta_config() + all_findings = [] + + scim_apps = list_provisioning_apps(org_url, token) + for app in scim_apps: + if args.app_id and app["id"] != args.app_id: + continue + findings, _ = audit_app_assignments(org_url, token, app["id"], app["label"]) + all_findings.extend(findings) + group_findings = audit_group_assignments(org_url, token, app["id"], app["label"]) + all_findings.extend(group_findings) + + if not args.skip_deprovisioning: + all_findings.extend(check_deprovisioning(org_url, token)) + + severity_counts = format_summary(scim_apps, all_findings) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "Okta SCIM Audit", + "scim_apps": scim_apps, + "findings": all_findings, + "severity_counts": severity_counts, + "risk_level": ( + "CRITICAL" if severity_counts.get("CRITICAL", 0) > 0 + else "HIGH" if severity_counts.get("HIGH", 0) > 0 + else "MEDIUM" if severity_counts.get("MEDIUM", 0) > 0 + else "LOW" + ), + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main() diff --git a/skills/implementing-soar-playbook-with-palo-alto-xsoar/scripts/agent.py b/skills/implementing-soar-playbook-with-palo-alto-xsoar/scripts/agent.py index 49b9cd4c..9026907f 100644 --- a/skills/implementing-soar-playbook-with-palo-alto-xsoar/scripts/agent.py +++ b/skills/implementing-soar-playbook-with-palo-alto-xsoar/scripts/agent.py @@ -1,61 +1,195 @@ #!/usr/bin/env python3 -"""Cortex XSOAR playbook audit.""" -import argparse, json +"""Cortex XSOAR playbook management agent. + +Interfaces with the Cortex XSOAR (Demisto) API to manage and audit +security playbooks, automation scripts, incidents, and integrations. +Supports listing playbooks, checking incident statistics, and +verifying integration health. +""" +import argparse +import json +import os +import sys from datetime import datetime, timezone + try: import requests except ImportError: - requests = None + print("[!] 'requests' required: pip install requests", file=sys.stderr) + sys.exit(1) -def audit_config(target, token): - findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10) - if resp.status_code == 200: - data = resp.json() - if not data.get("enabled", True): - findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"}) - elif resp.status_code == 401: - findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"}) - except requests.RequestException as e: - findings.append({"error": str(e)}) - return findings -def check_compliance(target, token): - findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} +def get_xsoar_config(): + """Return XSOAR server URL and API key.""" + server = os.environ.get("XSOAR_URL", "").rstrip("/") + api_key = os.environ.get("XSOAR_API_KEY", "") + if not server or not api_key: + print("[!] Set XSOAR_URL and XSOAR_API_KEY env vars", file=sys.stderr) + sys.exit(1) + return server, api_key + + +def xsoar_api(server, api_key, endpoint, method="POST", data=None): + """Make authenticated XSOAR API call.""" + url = f"{server}{endpoint}" + headers = {"Authorization": api_key, "Content-Type": "application/json", + "Accept": "application/json"} + if method == "GET": + resp = requests.get(url, headers=headers, verify=False, timeout=30) + else: + resp = requests.post(url, headers=headers, json=data or {}, verify=False, timeout=30) + resp.raise_for_status() + return resp.json() + + +def list_playbooks(server, api_key, query=""): + """List all playbooks.""" + print("[*] Fetching playbooks...") + data = {"query": query, "page": 0, "size": 500} + result = xsoar_api(server, api_key, "/playbook/search", data=data) + playbooks = result.get("playbooks", []) + print(f"[+] Found {len(playbooks)} playbooks") + return [{ + "id": pb.get("id", ""), + "name": pb.get("name", ""), + "version": pb.get("version", 0), + "deprecated": pb.get("deprecated", False), + "hidden": pb.get("hidden", False), + "system": pb.get("system", False), + "modified": pb.get("modified", ""), + } for pb in playbooks] + + +def get_incident_stats(server, api_key, days=30): + """Get incident statistics.""" + print(f"[*] Fetching incident statistics (last {days}d)...") + data = {"size": 0, "filter": {"period": {"by": "day", "fromValue": days}}} + result = xsoar_api(server, api_key, "/incidents/search", data=data) + total = result.get("total", 0) + print(f"[+] {total} incidents in last {days} days") + + # Get status breakdown + statuses = {} + data_with_agg = {"size": 0, "filter": {"period": {"by": "day", "fromValue": days}}, + "aggregations": [{"field": "status", "type": "terms"}]} try: - resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10) - if resp.status_code == 200: - for item in resp.json().get("checks", []): - if item.get("status") != "PASS": - findings.append({"check": item.get("name"), "status": item.get("status"), - "severity": item.get("severity", "MEDIUM")}) - except requests.RequestException: + agg_result = xsoar_api(server, api_key, "/incidents/search", data=data_with_agg) + for bucket in agg_result.get("aggregations", {}).get("status", {}).get("buckets", []): + statuses[bucket.get("key", "unknown")] = bucket.get("doc_count", 0) + except (requests.RequestException, KeyError): pass + + return {"total": total, "period_days": days, "by_status": statuses} + + +def list_integrations(server, api_key): + """List configured integrations and their health.""" + print("[*] Fetching integrations...") + result = xsoar_api(server, api_key, "/settings/integration/search", + data={"size": 500}) + instances = result.get("instances", []) + integrations = [] + for inst in instances: + integrations.append({ + "name": inst.get("name", ""), + "brand": inst.get("brand", ""), + "enabled": inst.get("enabled", ""), + "is_long_running": inst.get("isLongRunning", False), + "configured": inst.get("configurationStatus", ""), + }) + print(f"[+] Found {len(integrations)} integration instances") + return integrations + + +def audit_playbook_health(playbooks, integrations): + """Audit playbooks for common issues.""" + findings = [] + deprecated = [pb for pb in playbooks if pb.get("deprecated")] + if deprecated: + findings.append({ + "check": "Deprecated playbooks in use", + "severity": "MEDIUM", + "count": len(deprecated), + "detail": ", ".join(pb["name"] for pb in deprecated[:5]), + }) + + disabled_integrations = [i for i in integrations if i.get("enabled") == "false"] + if disabled_integrations: + findings.append({ + "check": "Disabled integrations", + "severity": "HIGH", + "count": len(disabled_integrations), + "detail": ", ".join(i["name"] for i in disabled_integrations[:5]), + }) + return findings + +def format_summary(playbooks, incident_stats, integrations, findings): + """Print XSOAR audit summary.""" + print(f"\n{'='*60}") + print(f" Cortex XSOAR Playbook Audit Report") + print(f"{'='*60}") + print(f" Playbooks : {len(playbooks)}") + print(f" Integrations : {len(integrations)}") + print(f" Incidents : {incident_stats.get('total', 0)} (last {incident_stats.get('period_days', 30)}d)") + print(f" Findings : {len(findings)}") + + if incident_stats.get("by_status"): + print(f"\n Incidents by Status:") + for status, count in incident_stats["by_status"].items(): + print(f" {status:15s}: {count}") + + enabled_count = sum(1 for i in integrations if i.get("enabled") != "false") + print(f"\n Integrations: {enabled_count} enabled, {len(integrations) - enabled_count} disabled") + + severity_counts = {} + for f in findings: + sev = f.get("severity", "INFO") + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + return severity_counts + + def main(): - p = argparse.ArgumentParser(description="Cortex XSOAR playbook audit") - p.add_argument("--target", required=True, help="Target URL") - p.add_argument("--token", required=True, help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] Cortex XSOAR playbook audit") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []} - report["findings"].extend(audit_config(a.target, a.token)) - report["findings"].extend(check_compliance(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) - else: + parser = argparse.ArgumentParser(description="Cortex XSOAR playbook audit agent") + parser.add_argument("--url", help="XSOAR URL (or XSOAR_URL env)") + parser.add_argument("--api-key", help="API key (or XSOAR_API_KEY env)") + parser.add_argument("--days", type=int, default=30, help="Incident stats period") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + if args.url: + os.environ["XSOAR_URL"] = args.url + if args.api_key: + os.environ["XSOAR_API_KEY"] = args.api_key + + server, api_key = get_xsoar_config() + + playbooks = list_playbooks(server, api_key) + incident_stats = get_incident_stats(server, api_key, args.days) + integrations = list_integrations(server, api_key) + findings = audit_playbook_health(playbooks, integrations) + + severity_counts = format_summary(playbooks, incident_stats, integrations, findings) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "Cortex XSOAR Audit", + "playbooks": playbooks, + "incident_stats": incident_stats, + "integrations": integrations, + "findings": findings, + "severity_counts": severity_counts, + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main() diff --git a/skills/implementing-taxii-server-with-opentaxii/scripts/agent.py b/skills/implementing-taxii-server-with-opentaxii/scripts/agent.py index a46fede4..d88fcc0f 100644 --- a/skills/implementing-taxii-server-with-opentaxii/scripts/agent.py +++ b/skills/implementing-taxii-server-with-opentaxii/scripts/agent.py @@ -1,61 +1,255 @@ #!/usr/bin/env python3 -"""OpenTAXII server configuration audit.""" -import argparse, json +"""OpenTAXII server configuration and health audit agent. + +Audits an OpenTAXII server instance by checking service discovery, +collection availability, content block statistics, and API health. +Supports both TAXII 1.1 and 2.0/2.1 endpoints. +""" +import argparse +import json +import os +import sys from datetime import datetime, timezone + try: import requests except ImportError: - requests = None + print("[!] 'requests' required: pip install requests", file=sys.stderr) + sys.exit(1) -def audit_config(target, token): +try: + from taxii2client.v20 import Server as Server20 + from taxii2client.v21 import Server as Server21 + HAS_TAXII_CLIENT = True +except ImportError: + HAS_TAXII_CLIENT = False + + +def check_taxii1_discovery(base_url, username=None, password=None): + """Check TAXII 1.1 discovery service.""" findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} + discovery_url = f"{base_url}/services/discovery" + print(f"[*] Checking TAXII 1.1 discovery: {discovery_url}") + + headers = {"Content-Type": "application/xml", + "X-TAXII-Content-Type": "urn:taxii.mitre.org:message:xml:1.1", + "X-TAXII-Protocol": "urn:taxii.mitre.org:protocol:http:1.0"} + discovery_xml = ( + '' + ) + + auth = (username, password) if username else None try: - resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10) + resp = requests.post(discovery_url, data=discovery_xml, headers=headers, + auth=auth, timeout=15) if resp.status_code == 200: - data = resp.json() - if not data.get("enabled", True): - findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"}) - elif resp.status_code == 401: - findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"}) + findings.append({ + "check": "TAXII 1.1 Discovery", + "status": "PASS", + "severity": "INFO", + "detail": f"Discovery service responding ({len(resp.content)} bytes)", + }) + else: + findings.append({ + "check": "TAXII 1.1 Discovery", + "status": "FAIL", + "severity": "HIGH", + "detail": f"HTTP {resp.status_code}", + }) except requests.RequestException as e: - findings.append({"error": str(e)}) + findings.append({ + "check": "TAXII 1.1 Discovery", + "status": "FAIL", + "severity": "HIGH", + "detail": str(e)[:100], + }) + return findings -def check_compliance(target, token): + +def check_taxii2_discovery(base_url, username=None, password=None, version="2.1"): + """Check TAXII 2.0/2.1 discovery and collections.""" findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} + print(f"[*] Checking TAXII {version} discovery: {base_url}") + + if HAS_TAXII_CLIENT: + try: + kwargs = {} + if username and password: + kwargs["user"] = username + kwargs["password"] = password + if version == "2.0": + server = Server20(base_url, **kwargs) + else: + server = Server21(base_url, **kwargs) + + findings.append({ + "check": f"TAXII {version} Discovery", + "status": "PASS", + "severity": "INFO", + "detail": f"Server: {server.title or 'Untitled'}", + }) + + for api_root in server.api_roots: + collections = list(api_root.collections) + findings.append({ + "check": f"API Root: {api_root.title or api_root.url}", + "status": "PASS", + "severity": "INFO", + "detail": f"{len(collections)} collections", + "collections": [{ + "id": c.id, + "title": c.title, + "can_read": getattr(c, "can_read", True), + "can_write": getattr(c, "can_write", False), + } for c in collections], + }) + + except Exception as e: + findings.append({ + "check": f"TAXII {version} Discovery", + "status": "FAIL", + "severity": "HIGH", + "detail": str(e)[:100], + }) + else: + # Fallback to raw HTTP + auth = (username, password) if username else None + headers = {"Accept": "application/taxii+json;version=2.1"} + try: + resp = requests.get(base_url, headers=headers, auth=auth, timeout=15) + if resp.status_code == 200: + data = resp.json() + findings.append({ + "check": f"TAXII {version} Discovery", + "status": "PASS", + "severity": "INFO", + "detail": f"Title: {data.get('title', 'N/A')}, " + f"API Roots: {len(data.get('api_roots', []))}", + }) + else: + findings.append({ + "check": f"TAXII {version} Discovery", + "status": "FAIL", + "severity": "HIGH", + "detail": f"HTTP {resp.status_code}", + }) + except requests.RequestException as e: + findings.append({ + "check": f"TAXII {version} Discovery", + "status": "FAIL", + "severity": "HIGH", + "detail": str(e)[:100], + }) + + return findings + + +def check_server_health(base_url): + """Check basic server health.""" + findings = [] + print(f"[*] Checking server health: {base_url}") + + # Check TLS + if base_url.startswith("https://"): + findings.append({ + "check": "TLS enabled", + "status": "PASS", + "severity": "INFO", + }) + else: + findings.append({ + "check": "TLS enabled", + "status": "FAIL", + "severity": "HIGH", + "detail": "TAXII server not using HTTPS", + }) + + # Check response time try: - resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10) - if resp.status_code == 200: - for item in resp.json().get("checks", []): - if item.get("status") != "PASS": - findings.append({"check": item.get("name"), "status": item.get("status"), - "severity": item.get("severity", "MEDIUM")}) + resp = requests.get(base_url, timeout=10) + response_time = resp.elapsed.total_seconds() + if response_time > 5: + findings.append({ + "check": "Response time", + "status": "WARN", + "severity": "MEDIUM", + "detail": f"{response_time:.2f}s (slow)", + }) + else: + findings.append({ + "check": "Response time", + "status": "PASS", + "severity": "INFO", + "detail": f"{response_time:.2f}s", + }) except requests.RequestException: pass + return findings + +def format_summary(all_findings, base_url): + """Print audit summary.""" + print(f"\n{'='*60}") + print(f" OpenTAXII Server Audit Report") + print(f"{'='*60}") + print(f" Server : {base_url}") + print(f" Findings : {len(all_findings)}") + + pass_count = sum(1 for f in all_findings if f["status"] == "PASS") + fail_count = sum(1 for f in all_findings if f["status"] == "FAIL") + print(f" Passed : {pass_count}") + print(f" Failed : {fail_count}") + + for f in all_findings: + icon = "OK" if f["status"] == "PASS" else "!!" if f["status"] == "FAIL" else "~~" + print(f" [{icon}] {f['check']}: {f.get('detail', '')[:50]}") + + severity_counts = {} + for f in all_findings: + sev = f.get("severity", "INFO") + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + return severity_counts + + def main(): - p = argparse.ArgumentParser(description="OpenTAXII server configuration audit") - p.add_argument("--target", required=True, help="Target URL") - p.add_argument("--token", required=True, help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] OpenTAXII server configuration audit") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []} - report["findings"].extend(audit_config(a.target, a.token)) - report["findings"].extend(check_compliance(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) + parser = argparse.ArgumentParser(description="OpenTAXII server audit agent") + parser.add_argument("--url", required=True, help="TAXII server base URL") + parser.add_argument("--username", help="Authentication username") + parser.add_argument("--password", help="Authentication password") + parser.add_argument("--version", choices=["1.1", "2.0", "2.1"], default="2.1") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + all_findings = [] + all_findings.extend(check_server_health(args.url)) + + if args.version == "1.1": + all_findings.extend(check_taxii1_discovery(args.url, args.username, args.password)) else: + all_findings.extend(check_taxii2_discovery(args.url, args.username, args.password, args.version)) + + severity_counts = format_summary(all_findings, args.url) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "OpenTAXII Audit", + "server": args.url, + "version": args.version, + "findings": all_findings, + "severity_counts": severity_counts, + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main() diff --git a/skills/implementing-threat-intelligence-lifecycle-management/scripts/agent.py b/skills/implementing-threat-intelligence-lifecycle-management/scripts/agent.py index b045e6a6..bd774324 100644 --- a/skills/implementing-threat-intelligence-lifecycle-management/scripts/agent.py +++ b/skills/implementing-threat-intelligence-lifecycle-management/scripts/agent.py @@ -1,61 +1,265 @@ #!/usr/bin/env python3 -"""Threat intelligence lifecycle audit.""" -import argparse, json -from datetime import datetime, timezone +"""Threat intelligence lifecycle management agent. + +Manages the threat intelligence lifecycle: collection from feeds, +processing/normalization of IOCs, analysis/enrichment via VirusTotal +and AbuseIPDB, dissemination to SIEM/firewalls, and tracking of +IOC aging and confidence scoring. +""" +import argparse +import csv +import hashlib +import json +import os +import re +import sys +from datetime import datetime, timezone, timedelta + try: import requests except ImportError: requests = None -def audit_config(target, token): - findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10) - if resp.status_code == 200: - data = resp.json() - if not data.get("enabled", True): - findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"}) - elif resp.status_code == 401: - findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"}) - except requests.RequestException as e: - findings.append({"error": str(e)}) - return findings -def check_compliance(target, token): - findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} +IOC_PATTERNS = { + "ipv4": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'), + "domain": re.compile(r'\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z]{2,}\b', re.I), + "md5": re.compile(r'\b[a-fA-F0-9]{32}\b'), + "sha1": re.compile(r'\b[a-fA-F0-9]{40}\b'), + "sha256": re.compile(r'\b[a-fA-F0-9]{64}\b'), + "url": re.compile(r'https?://[^\s<>"{}|\\^`\[\]]+'), + "email": re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'), +} + + +def extract_iocs(text): + """Extract IOCs from unstructured text.""" + iocs = {} + for ioc_type, pattern in IOC_PATTERNS.items(): + matches = set(pattern.findall(text)) + # Filter out private IPs for ipv4 + if ioc_type == "ipv4": + matches = {ip for ip in matches + if not ip.startswith(("10.", "192.168.", "127.", "0.")) + and not ip.startswith("172.") or not (16 <= int(ip.split(".")[1]) <= 31)} + if matches: + iocs[ioc_type] = sorted(matches) + return iocs + + +def load_ioc_feed(source): + """Load IOCs from a file (JSON, CSV, or plain text).""" + ext = os.path.splitext(source)[1].lower() + iocs = [] + + if ext == ".json": + with open(source, "r") as f: + data = json.load(f) + if isinstance(data, list): + iocs = data + elif isinstance(data, dict): + iocs = data.get("indicators", data.get("iocs", data.get("data", []))) + elif ext == ".csv": + with open(source, "r", newline="") as f: + reader = csv.DictReader(f) + iocs = list(reader) + else: + with open(source, "r") as f: + text = f.read() + extracted = extract_iocs(text) + for ioc_type, values in extracted.items(): + for v in values: + iocs.append({"type": ioc_type, "value": v, "source": source}) + + return iocs + + +def normalize_ioc(ioc): + """Normalize IOC into standard format.""" + if isinstance(ioc, str): + for ioc_type, pattern in IOC_PATTERNS.items(): + if pattern.fullmatch(ioc): + return {"type": ioc_type, "value": ioc.lower().strip()} + return {"type": "unknown", "value": ioc.strip()} + + return { + "type": (ioc.get("type") or ioc.get("indicator_type") or "unknown").lower(), + "value": (ioc.get("value") or ioc.get("indicator") or "").lower().strip(), + "source": ioc.get("source", ""), + "confidence": ioc.get("confidence", 50), + "first_seen": ioc.get("first_seen", ""), + "last_seen": ioc.get("last_seen", ""), + "tags": ioc.get("tags", []), + "description": ioc.get("description", ""), + } + + +def enrich_ioc_virustotal(ioc_value, ioc_type, api_key): + """Enrich IOC via VirusTotal API v3.""" + if not requests or not api_key: + return {} + + headers = {"x-apikey": api_key} + base = "https://www.virustotal.com/api/v3" + + if ioc_type in ("md5", "sha1", "sha256"): + url = f"{base}/files/{ioc_value}" + elif ioc_type == "domain": + url = f"{base}/domains/{ioc_value}" + elif ioc_type == "ipv4": + url = f"{base}/ip_addresses/{ioc_value}" + elif ioc_type == "url": + url_id = hashlib.sha256(ioc_value.encode()).hexdigest() + url = f"{base}/urls/{url_id}" + else: + return {} + try: - resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10) + resp = requests.get(url, headers=headers, timeout=15) if resp.status_code == 200: - for item in resp.json().get("checks", []): - if item.get("status") != "PASS": - findings.append({"check": item.get("name"), "status": item.get("status"), - "severity": item.get("severity", "MEDIUM")}) + data = resp.json().get("data", {}).get("attributes", {}) + stats = data.get("last_analysis_stats", {}) + return { + "malicious": stats.get("malicious", 0), + "suspicious": stats.get("suspicious", 0), + "harmless": stats.get("harmless", 0), + "undetected": stats.get("undetected", 0), + "reputation": data.get("reputation", 0), + "source": "virustotal", + } except requests.RequestException: pass - return findings + return {} + + +def calculate_confidence(ioc, enrichment=None): + """Calculate confidence score for an IOC (0-100).""" + score = ioc.get("confidence", 50) + + # Boost for VT detections + if enrichment: + malicious = enrichment.get("malicious", 0) + if malicious > 10: + score = min(score + 30, 100) + elif malicious > 5: + score = min(score + 20, 100) + elif malicious > 0: + score = min(score + 10, 100) + elif enrichment.get("harmless", 0) > 20: + score = max(score - 20, 0) + + # Decay based on age + first_seen = ioc.get("first_seen", "") + if first_seen: + try: + if "T" in first_seen: + seen_dt = datetime.fromisoformat(first_seen.replace("Z", "+00:00")) + else: + seen_dt = datetime.strptime(first_seen[:10], "%Y-%m-%d").replace(tzinfo=timezone.utc) + age_days = (datetime.now(timezone.utc) - seen_dt).days + if age_days > 180: + score = max(score - 20, 0) + elif age_days > 90: + score = max(score - 10, 0) + except (ValueError, TypeError): + pass + + return min(max(score, 0), 100) + + +def format_summary(iocs, enriched_count): + """Print lifecycle report.""" + print(f"\n{'='*60}") + print(f" Threat Intelligence Lifecycle Report") + print(f"{'='*60}") + print(f" Total IOCs : {len(iocs)}") + print(f" Enriched : {enriched_count}") + + by_type = {} + for ioc in iocs: + t = ioc.get("type", "unknown") + by_type[t] = by_type.get(t, 0) + 1 + print(f"\n By Type:") + for t, count in sorted(by_type.items(), key=lambda x: -x[1]): + print(f" {t:12s}: {count}") + + high_conf = [i for i in iocs if i.get("confidence", 0) >= 80] + med_conf = [i for i in iocs if 50 <= i.get("confidence", 0) < 80] + low_conf = [i for i in iocs if i.get("confidence", 0) < 50] + print(f"\n By Confidence:") + print(f" High (>=80) : {len(high_conf)}") + print(f" Medium : {len(med_conf)}") + print(f" Low (<50) : {len(low_conf)}") + + if high_conf: + print(f"\n High-Confidence IOCs:") + for i in high_conf[:15]: + print(f" [{i['type']:8s}] {i['value'][:50]:50s} (confidence: {i.get('confidence', 0)})") + def main(): - p = argparse.ArgumentParser(description="Threat intelligence lifecycle audit") - p.add_argument("--target", required=True, help="Target URL") - p.add_argument("--token", required=True, help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] Threat intelligence lifecycle audit") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []} - report["findings"].extend(audit_config(a.target, a.token)) - report["findings"].extend(check_compliance(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) + parser = argparse.ArgumentParser(description="Threat intelligence lifecycle management agent") + parser.add_argument("--source", required=True, help="IOC source file (JSON/CSV/text)") + parser.add_argument("--vt-key", help="VirusTotal API key (or VT_API_KEY env)") + parser.add_argument("--enrich", action="store_true", help="Enrich IOCs via VirusTotal") + parser.add_argument("--min-confidence", type=int, default=0, help="Min confidence to include") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + vt_key = args.vt_key or os.environ.get("VT_API_KEY", "") + + raw_iocs = load_ioc_feed(args.source) + print(f"[*] Loaded {len(raw_iocs)} raw IOCs from {args.source}") + + iocs = [normalize_ioc(ioc) for ioc in raw_iocs] + iocs = [i for i in iocs if i.get("value")] + + # Deduplicate + seen = set() + unique_iocs = [] + for ioc in iocs: + key = f"{ioc['type']}:{ioc['value']}" + if key not in seen: + seen.add(key) + unique_iocs.append(ioc) + iocs = unique_iocs + print(f"[*] {len(iocs)} unique IOCs after dedup") + + enriched_count = 0 + if args.enrich and vt_key: + print(f"[*] Enriching IOCs via VirusTotal...") + for ioc in iocs[:100]: # Rate limit + enrichment = enrich_ioc_virustotal(ioc["value"], ioc["type"], vt_key) + if enrichment: + ioc["enrichment"] = enrichment + enriched_count += 1 + ioc["confidence"] = calculate_confidence(ioc, enrichment) else: + for ioc in iocs: + ioc["confidence"] = calculate_confidence(ioc) + + iocs = [i for i in iocs if i.get("confidence", 0) >= args.min_confidence] + iocs.sort(key=lambda x: -x.get("confidence", 0)) + + format_summary(iocs, enriched_count) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "TI Lifecycle Manager", + "source": args.source, + "total_iocs": len(iocs), + "enriched": enriched_count, + "iocs": iocs, + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main() diff --git a/skills/performing-brand-monitoring-for-impersonation/scripts/agent.py b/skills/performing-brand-monitoring-for-impersonation/scripts/agent.py index d14ba1fb..cbd6b278 100644 --- a/skills/performing-brand-monitoring-for-impersonation/scripts/agent.py +++ b/skills/performing-brand-monitoring-for-impersonation/scripts/agent.py @@ -1,60 +1,185 @@ #!/usr/bin/env python3 -"""Brand impersonation monitoring agent.""" -import argparse, json +"""Brand impersonation monitoring agent. + +Monitors for brand impersonation by checking Certificate Transparency +logs for suspicious domain registrations, performing DNS lookups for +typosquatting domains, and scanning social media profile names. Uses +crt.sh API and DNS resolution to identify potential phishing domains. +""" +import argparse +import json +import os +import socket +import sys from datetime import datetime, timezone + try: import requests except ImportError: - requests = None + print("[!] 'requests' required: pip install requests", file=sys.stderr) + sys.exit(1) -def run_scan(target, token=None): + +def generate_typosquat_variants(domain): + """Generate common typosquatting domain variants.""" + name, tld = domain.rsplit(".", 1) if "." in domain else (domain, "com") + variants = set() + chars = "abcdefghijklmnopqrstuvwxyz0123456789" + + # Character omission + for i in range(len(name)): + variants.add(f"{name[:i]}{name[i+1:]}.{tld}") + # Character swap + for i in range(len(name) - 1): + swapped = list(name) + swapped[i], swapped[i+1] = swapped[i+1], swapped[i] + variants.add(f"{''.join(swapped)}.{tld}") + # Character replacement (adjacent keys) + adjacent = {"a": "sq", "e": "wr", "i": "uo", "o": "ip", "u": "yi", + "s": "ad", "n": "bm", "r": "et", "t": "ry", "l": "kp"} + for i, c in enumerate(name): + for adj in adjacent.get(c, ""): + variants.add(f"{name[:i]}{adj}{name[i+1:]}.{tld}") + # Character doubling + for i in range(len(name)): + variants.add(f"{name[:i]}{name[i]}{name[i:]}.{tld}") + # Homoglyph substitution + homoglyphs = {"o": "0", "l": "1", "i": "1", "e": "3", "a": "4", "s": "5"} + for i, c in enumerate(name): + if c in homoglyphs: + variants.add(f"{name[:i]}{homoglyphs[c]}{name[i+1:]}.{tld}") + # TLD variants + for alt_tld in ["com", "net", "org", "io", "co", "info", "biz", "xyz"]: + if alt_tld != tld: + variants.add(f"{name}.{alt_tld}") + # Prefix/suffix + for prefix in ["my", "the", "get", "go", "login", "secure", "account"]: + variants.add(f"{prefix}{name}.{tld}") + variants.add(f"{name}{prefix}.{tld}") + # Hyphen insertion + for i in range(1, len(name)): + variants.add(f"{name[:i]}-{name[i:]}.{tld}") + + variants.discard(domain) + return sorted(variants) + + +def check_domain_resolution(domains, max_check=200): + """Check which typosquat domains actually resolve.""" + resolved = [] + checked = 0 + for domain in domains[:max_check]: + checked += 1 + try: + ip = socket.gethostbyname(domain) + resolved.append({ + "domain": domain, + "ip": ip, + "resolves": True, + "severity": "HIGH", + }) + except socket.gaierror: + pass + print(f"[+] Checked {checked} domains, {len(resolved)} resolve to an IP") + return resolved + + +def search_certificate_transparency(domain): + """Search crt.sh for certificates containing the brand name.""" + print(f"[*] Searching Certificate Transparency for: {domain}") findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} if token else {} try: - resp = requests.get(f"{target}", headers=headers, timeout=15) + resp = requests.get( + f"https://crt.sh/?q=%25{domain}%25&output=json", + timeout=30, + ) if resp.status_code == 200: - findings.append({"check": "Target Accessible", "status": "OK", "severity": "INFO"}) - else: - findings.append({"check": "Target Access", "status": f"HTTP {resp.status_code}", "severity": "MEDIUM"}) + certs = resp.json() + seen_names = set() + for cert in certs: + common_name = cert.get("common_name", "") + if common_name not in seen_names and domain not in common_name.split(".")[-2:]: + seen_names.add(common_name) + findings.append({ + "type": "ct_log", + "common_name": common_name, + "issuer": cert.get("issuer_name", ""), + "not_before": cert.get("not_before", ""), + "not_after": cert.get("not_after", ""), + "severity": "MEDIUM", + }) + print(f"[+] Found {len(findings)} suspicious certificates") except requests.RequestException as e: - findings.append({"error": str(e)}) + print(f"[!] CT search error: {e}") return findings -def analyze_results(target, token=None): - findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} if token else {} - try: - resp = requests.get(f"{target}/api/v1/results", headers=headers, timeout=15) - if resp.status_code == 200: - data = resp.json() - for item in data.get("findings", data.get("results", [])): - severity = item.get("severity", item.get("risk", "MEDIUM")) - findings.append({"check": item.get("name", item.get("title", "unknown")), - "severity": severity.upper() if isinstance(severity, str) else "MEDIUM"}) - except requests.RequestException: - pass - return findings + +def format_summary(domain, variants_count, resolved, ct_findings): + """Print monitoring summary.""" + print(f"\n{'='*60}") + print(f" Brand Impersonation Monitoring Report") + print(f"{'='*60}") + print(f" Brand Domain : {domain}") + print(f" Typosquat Variants : {variants_count}") + print(f" Resolved Domains : {len(resolved)}") + print(f" CT Log Matches : {len(ct_findings)}") + + if resolved: + print(f"\n Active Typosquat Domains (resolving):") + for r in resolved[:20]: + print(f" [{r['severity']:6s}] {r['domain']:40s} -> {r['ip']}") + + if ct_findings: + print(f"\n Certificate Transparency Findings:") + for f in ct_findings[:15]: + print(f" {f['common_name']:40s} (issued: {f['not_before'][:10]})") + def main(): - p = argparse.ArgumentParser(description="Brand impersonation monitoring agent") - p.add_argument("--target", required=True, help="Target URL or IP") - p.add_argument("--token", help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] Brand impersonation monitoring agent") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "target": a.target, "findings": []} - report["findings"].extend(run_scan(a.target, a.token)) - report["findings"].extend(analyze_results(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "CRITICAL" if high > 2 else "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) - else: + parser = argparse.ArgumentParser(description="Brand impersonation monitoring agent") + parser.add_argument("--domain", required=True, help="Brand domain to monitor (e.g., example.com)") + parser.add_argument("--max-check", type=int, default=200, help="Max domains to DNS-check") + parser.add_argument("--skip-ct", action="store_true", help="Skip Certificate Transparency search") + parser.add_argument("--skip-dns", action="store_true", help="Skip DNS resolution check") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + variants = generate_typosquat_variants(args.domain) + print(f"[*] Generated {len(variants)} typosquat variants for {args.domain}") + + resolved = [] + if not args.skip_dns: + resolved = check_domain_resolution(variants, args.max_check) + + ct_findings = [] + if not args.skip_ct: + ct_findings = search_certificate_transparency(args.domain.split(".")[0]) + + format_summary(args.domain, len(variants), resolved, ct_findings) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "Brand Monitor", + "domain": args.domain, + "variants_generated": len(variants), + "resolved_domains": resolved, + "ct_findings": ct_findings, + "risk_level": ( + "CRITICAL" if len(resolved) > 10 + else "HIGH" if resolved + else "MEDIUM" if ct_findings + else "LOW" + ), + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main() diff --git a/skills/performing-dark-web-monitoring-for-threats/scripts/agent.py b/skills/performing-dark-web-monitoring-for-threats/scripts/agent.py index 6cbc2f18..649c3121 100644 --- a/skills/performing-dark-web-monitoring-for-threats/scripts/agent.py +++ b/skills/performing-dark-web-monitoring-for-threats/scripts/agent.py @@ -1,60 +1,249 @@ #!/usr/bin/env python3 -"""Dark web threat monitoring agent.""" -import argparse, json +"""Dark web threat monitoring agent. + +Monitors for organization-specific threats on the dark web by checking +breach databases (Have I Been Pwned API), paste sites, and public +threat intelligence feeds for leaked credentials, exposed data, and +mentions of organizational domains. +""" +import argparse +import hashlib +import json +import os +import sys +import time from datetime import datetime, timezone + try: import requests except ImportError: - requests = None + print("[!] 'requests' required: pip install requests", file=sys.stderr) + sys.exit(1) -def run_scan(target, token=None): + +def check_hibp_breaches(domain, api_key=None): + """Check Have I Been Pwned for breaches involving a domain.""" findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} if token else {} + print(f"[*] Checking HIBP breaches for domain: {domain}") + headers = {"user-agent": "dark-web-monitor-agent"} + if api_key: + headers["hibp-api-key"] = api_key try: - resp = requests.get(f"{target}", headers=headers, timeout=15) - if resp.status_code == 200: - findings.append({"check": "Target Accessible", "status": "OK", "severity": "INFO"}) - else: - findings.append({"check": "Target Access", "status": f"HTTP {resp.status_code}", "severity": "MEDIUM"}) + resp = requests.get( + f"https://haveibeenpwned.com/api/v3/breaches", + headers=headers, timeout=15, + ) + resp.raise_for_status() + breaches = resp.json() + domain_breaches = [b for b in breaches if domain.lower() in b.get("Domain", "").lower()] + for breach in domain_breaches: + findings.append({ + "type": "breach", + "source": "HIBP", + "name": breach.get("Name", ""), + "domain": breach.get("Domain", ""), + "breach_date": breach.get("BreachDate", ""), + "added_date": breach.get("AddedDate", ""), + "pwn_count": breach.get("PwnCount", 0), + "data_classes": breach.get("DataClasses", []), + "is_verified": breach.get("IsVerified", False), + "severity": "CRITICAL" if breach.get("PwnCount", 0) > 10000 else "HIGH", + }) + print(f"[+] Found {len(domain_breaches)} breaches for {domain}") except requests.RequestException as e: - findings.append({"error": str(e)}) + print(f"[!] HIBP API error: {e}") return findings -def analyze_results(target, token=None): + +def check_hibp_email(email, api_key): + """Check if a specific email appears in known breaches.""" + if not api_key: + return [] findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} if token else {} + headers = {"hibp-api-key": api_key, "user-agent": "dark-web-monitor-agent"} try: - resp = requests.get(f"{target}/api/v1/results", headers=headers, timeout=15) + resp = requests.get( + f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}", + headers=headers, params={"truncateResponse": "false"}, timeout=15, + ) if resp.status_code == 200: - data = resp.json() - for item in data.get("findings", data.get("results", [])): - severity = item.get("severity", item.get("risk", "MEDIUM")) - findings.append({"check": item.get("name", item.get("title", "unknown")), - "severity": severity.upper() if isinstance(severity, str) else "MEDIUM"}) + breaches = resp.json() + for breach in breaches: + findings.append({ + "type": "email_breach", + "email": email, + "breach": breach.get("Name", ""), + "breach_date": breach.get("BreachDate", ""), + "data_classes": breach.get("DataClasses", []), + "severity": "HIGH", + }) + elif resp.status_code == 404: + pass # Not found in any breaches + time.sleep(1.5) # HIBP rate limit except requests.RequestException: pass return findings + +def check_hibp_password(password): + """Check if a password appears in known breaches using k-anonymity.""" + sha1 = hashlib.sha1(password.encode("utf-8")).hexdigest().upper() + prefix = sha1[:5] + suffix = sha1[5:] + try: + resp = requests.get( + f"https://api.pwnedpasswords.com/range/{prefix}", + timeout=10, + ) + resp.raise_for_status() + for line in resp.text.splitlines(): + parts = line.split(":") + if parts[0] == suffix: + count = int(parts[1]) + return {"compromised": True, "count": count, "severity": "CRITICAL"} + return {"compromised": False, "count": 0, "severity": "INFO"} + except requests.RequestException: + return {"compromised": None, "error": "API unavailable"} + + +def check_paste_dumps(domain, api_key=None): + """Check for organization mentions in paste sites via HIBP.""" + findings = [] + if not api_key: + return findings + # HIBP paste API requires per-email queries + return findings + + +def search_threat_intel_feeds(domain): + """Search public threat intelligence for domain mentions.""" + findings = [] + print(f"[*] Checking public threat intelligence for: {domain}") + + # Check URLhaus for malicious URLs from domain + try: + resp = requests.post( + "https://urlhaus-api.abuse.ch/v1/host/", + data={"host": domain}, timeout=15, + ) + if resp.status_code == 200: + data = resp.json() + if data.get("query_status") == "ok": + urls = data.get("urls", []) + if urls: + findings.append({ + "type": "malicious_urls", + "source": "URLhaus", + "domain": domain, + "count": len(urls), + "severity": "HIGH", + "detail": f"{len(urls)} malicious URLs associated with domain", + "samples": [u.get("url", "")[:80] for u in urls[:5]], + }) + except requests.RequestException: + pass + + # Check AbuseIPDB + abuse_key = os.environ.get("ABUSEIPDB_KEY", "") + if abuse_key: + try: + resp = requests.get( + "https://api.abuseipdb.com/api/v2/check-block", + headers={"Key": abuse_key, "Accept": "application/json"}, + params={"network": domain}, timeout=15, + ) + except requests.RequestException: + pass + + return findings + + +def format_summary(all_findings, domain): + """Print monitoring summary.""" + print(f"\n{'='*60}") + print(f" Dark Web Threat Monitoring Report") + print(f"{'='*60}") + print(f" Target Domain: {domain}") + print(f" Total Findings: {len(all_findings)}") + + by_type = {} + for f in all_findings: + t = f.get("type", "unknown") + by_type[t] = by_type.get(t, 0) + 1 + + if by_type: + print(f"\n By Type:") + for t, count in by_type.items(): + print(f" {t:20s}: {count}") + + breaches = [f for f in all_findings if f["type"] == "breach"] + if breaches: + print(f"\n Known Breaches ({len(breaches)}):") + for b in breaches: + print(f" [{b['severity']:8s}] {b['name']:25s} | " + f"Date: {b['breach_date']} | Records: {b.get('pwn_count', 'N/A')}") + if b.get("data_classes"): + print(f" Data: {', '.join(b['data_classes'][:5])}") + + severity_counts = {} + for f in all_findings: + sev = f.get("severity", "INFO") + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + return severity_counts + + def main(): - p = argparse.ArgumentParser(description="Dark web threat monitoring agent") - p.add_argument("--target", required=True, help="Target URL or IP") - p.add_argument("--token", help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] Dark web threat monitoring agent") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "target": a.target, "findings": []} - report["findings"].extend(run_scan(a.target, a.token)) - report["findings"].extend(analyze_results(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "CRITICAL" if high > 2 else "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) - else: + parser = argparse.ArgumentParser(description="Dark web threat monitoring agent") + parser.add_argument("--domain", required=True, help="Organization domain to monitor") + parser.add_argument("--emails", nargs="+", help="Specific emails to check") + parser.add_argument("--hibp-key", help="HIBP API key (or HIBP_API_KEY env)") + parser.add_argument("--check-passwords", nargs="+", help="Check passwords against breach DB") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + hibp_key = args.hibp_key or os.environ.get("HIBP_API_KEY", "") + all_findings = [] + + all_findings.extend(check_hibp_breaches(args.domain, hibp_key)) + + if args.emails and hibp_key: + for email in args.emails: + all_findings.extend(check_hibp_email(email, hibp_key)) + + if args.check_passwords: + for pwd in args.check_passwords: + result = check_hibp_password(pwd) + if result.get("compromised"): + all_findings.append({ + "type": "compromised_password", + "severity": "CRITICAL", + "detail": f"Password found in {result['count']} breaches", + }) + + all_findings.extend(search_threat_intel_feeds(args.domain)) + severity_counts = format_summary(all_findings, args.domain) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "Dark Web Monitor", + "domain": args.domain, + "findings": all_findings, + "severity_counts": severity_counts, + "risk_level": ( + "CRITICAL" if severity_counts.get("CRITICAL", 0) > 0 + else "HIGH" if severity_counts.get("HIGH", 0) > 0 + else "MEDIUM" if all_findings else "LOW" + ), + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main()