diff --git a/skills/implementing-rapid7-insightvm-for-scanning/scripts/agent.py b/skills/implementing-rapid7-insightvm-for-scanning/scripts/agent.py index a3e52810..494e8e5d 100644 --- a/skills/implementing-rapid7-insightvm-for-scanning/scripts/agent.py +++ b/skills/implementing-rapid7-insightvm-for-scanning/scripts/agent.py @@ -1,61 +1,244 @@ #!/usr/bin/env python3 -"""Rapid7 InsightVM scanning configuration audit.""" -import argparse, json +"""Rapid7 InsightVM vulnerability scanning agent. + +Interfaces with the InsightVM (Nexpose) REST API to manage scan +configurations, launch scans, retrieve vulnerability results, and +generate remediation reports. Supports site management, asset +discovery, and vulnerability prioritization. +""" +import argparse +import json +import os +import sys +import time from datetime import datetime, timezone + try: import requests + from requests.auth import HTTPBasicAuth except ImportError: - requests = None + print("[!] 'requests' required: pip install requests", file=sys.stderr) + sys.exit(1) -def audit_config(target, token): - findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10) - if resp.status_code == 200: - data = resp.json() - if not data.get("enabled", True): - findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"}) - elif resp.status_code == 401: - findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"}) - except requests.RequestException as e: - findings.append({"error": str(e)}) - return findings -def check_compliance(target, token): - findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} - try: - resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10) - if resp.status_code == 200: - for item in resp.json().get("checks", []): - if item.get("status") != "PASS": - findings.append({"check": item.get("name"), "status": item.get("status"), - "severity": item.get("severity", "MEDIUM")}) - except requests.RequestException: - pass - return findings +def get_insightvm_config(): + """Return InsightVM API connection config.""" + host = os.environ.get("INSIGHTVM_HOST", "localhost") + port = os.environ.get("INSIGHTVM_PORT", "3780") + user = os.environ.get("INSIGHTVM_USER", "") + password = os.environ.get("INSIGHTVM_PASSWORD", "") + return f"https://{host}:{port}", user, password + + +def api_call(base_url, endpoint, user, password, method="GET", + data=None, params=None): + """Make authenticated API call to InsightVM.""" + url = f"{base_url}/api/3{endpoint}" + auth = HTTPBasicAuth(user, password) + headers = {"Content-Type": "application/json", "Accept": "application/json"} + if method == "POST": + resp = requests.post(url, auth=auth, headers=headers, json=data, + params=params, verify=False, timeout=60) + elif method == "PUT": + resp = requests.put(url, auth=auth, headers=headers, json=data, + verify=False, timeout=60) + else: + resp = requests.get(url, auth=auth, headers=headers, params=params, + verify=False, timeout=60) + resp.raise_for_status() + return resp.json() + + +def list_sites(base_url, user, password): + """List all scan sites.""" + print("[*] Listing sites...") + data = api_call(base_url, "/sites", user, password, params={"size": 500}) + sites = [] + for site in data.get("resources", []): + sites.append({ + "id": site.get("id"), + "name": site.get("name", ""), + "description": site.get("description", ""), + "type": site.get("type", ""), + "assets": site.get("assets", 0), + "last_scan_time": site.get("lastScanTime", ""), + "risk_score": site.get("riskScore", 0), + }) + print(f"[+] Found {len(sites)} sites") + return sites + + +def get_site_vulnerabilities(base_url, user, password, site_id): + """Get vulnerabilities for a specific site.""" + print(f"[*] Fetching vulnerabilities for site {site_id}...") + vulns = [] + page = 0 + while True: + data = api_call(base_url, f"/sites/{site_id}/vulnerabilities", + user, password, params={"page": page, "size": 100}) + resources = data.get("resources", []) + if not resources: + break + for v in resources: + vulns.append({ + "id": v.get("id", ""), + "title": v.get("title", ""), + "severity": v.get("severity", ""), + "cvss_v3_score": v.get("cvss", {}).get("v3", {}).get("score", 0), + "risk_score": v.get("riskScore", 0), + "instances": v.get("instances", 0), + "status": v.get("status", ""), + }) + page += 1 + total_pages = data.get("page", {}).get("totalPages", 1) + if page >= total_pages: + break + + print(f"[+] Retrieved {len(vulns)} vulnerabilities") + return vulns + + +def launch_scan(base_url, user, password, site_id, scan_name=None): + """Launch a scan on a site.""" + if not scan_name: + scan_name = f"agent-scan-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}" + print(f"[*] Launching scan '{scan_name}' on site {site_id}...") + data = api_call(base_url, f"/sites/{site_id}/scans", user, password, + method="POST", data={"name": scan_name}) + scan_id = data.get("id") + print(f"[+] Scan started, ID: {scan_id}") + return scan_id + + +def poll_scan_status(base_url, user, password, scan_id, max_wait=1800): + """Poll scan status until completion.""" + print(f"[*] Waiting for scan {scan_id} to complete...") + elapsed = 0 + interval = 30 + while elapsed < max_wait: + data = api_call(base_url, f"/scans/{scan_id}", user, password) + status = data.get("status", "unknown") + if status in ("finished", "stopped", "error"): + print(f"[+] Scan {status}") + return status, data + print(f" Status: {status} ({elapsed}s)") + time.sleep(interval) + elapsed += interval + print("[!] Scan timed out") + return "timeout", {} + + +def get_scan_report(base_url, user, password, scan_id): + """Get scan results summary.""" + data = api_call(base_url, f"/scans/{scan_id}", user, password) + return { + "scan_id": scan_id, + "status": data.get("status", ""), + "start_time": data.get("startTime", ""), + "end_time": data.get("endTime", ""), + "duration": data.get("duration", ""), + "assets_discovered": data.get("assets", 0), + "vulnerabilities": data.get("vulnerabilities", {}), + } + + +def format_summary(sites, vulns=None, scan_report=None): + """Print summary.""" + print(f"\n{'='*60}") + print(f" Rapid7 InsightVM Report") + print(f"{'='*60}") + + if sites: + print(f"\n Sites ({len(sites)}):") + for s in sites: + print(f" {s['name']:30s} | Assets: {s['assets']:5d} | " + f"Risk: {s['risk_score']:8.1f}") + + if vulns: + severity_counts = {} + for v in vulns: + sev = v.get("severity", "unknown") + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + print(f"\n Vulnerabilities ({len(vulns)}):") + for sev in sorted(severity_counts.keys()): + print(f" {sev:15s}: {severity_counts[sev]}") + + if scan_report: + print(f"\n Scan Report:") + print(f" Status : {scan_report.get('status', 'N/A')}") + print(f" Assets : {scan_report.get('assets_discovered', 0)}") + def main(): - p = argparse.ArgumentParser(description="Rapid7 InsightVM scanning configuration audit") - p.add_argument("--target", required=True, help="Target URL") - p.add_argument("--token", required=True, help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] Rapid7 InsightVM scanning configuration audit") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []} - report["findings"].extend(audit_config(a.target, a.token)) - report["findings"].extend(check_compliance(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) - else: + parser = argparse.ArgumentParser(description="Rapid7 InsightVM scanning agent") + sub = parser.add_subparsers(dest="command") + + sub.add_parser("list-sites", help="List scan sites") + + p_vulns = sub.add_parser("get-vulns", help="Get site vulnerabilities") + p_vulns.add_argument("--site-id", required=True, type=int) + + p_scan = sub.add_parser("scan", help="Launch a scan") + p_scan.add_argument("--site-id", required=True, type=int) + p_scan.add_argument("--wait", action="store_true", help="Wait for completion") + p_scan.add_argument("--max-wait", type=int, default=1800) + + parser.add_argument("--host", help="InsightVM host (or INSIGHTVM_HOST env)") + parser.add_argument("--user", help="Username (or INSIGHTVM_USER env)") + parser.add_argument("--password", help="Password (or INSIGHTVM_PASSWORD env)") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + + if args.host: + os.environ["INSIGHTVM_HOST"] = args.host + if args.user: + os.environ["INSIGHTVM_USER"] = args.user + if args.password: + os.environ["INSIGHTVM_PASSWORD"] = args.password + + base_url, user, password = get_insightvm_config() + if not user or not password: + print("[!] Set INSIGHTVM_USER and INSIGHTVM_PASSWORD", file=sys.stderr) + sys.exit(1) + + result = {} + if args.command == "list-sites": + sites = list_sites(base_url, user, password) + format_summary(sites) + result = {"sites": sites} + elif args.command == "get-vulns": + vulns = get_site_vulnerabilities(base_url, user, password, args.site_id) + format_summary([], vulns) + result = {"site_id": args.site_id, "vulnerabilities": vulns} + elif args.command == "scan": + scan_id = launch_scan(base_url, user, password, args.site_id) + if args.wait: + status, data = poll_scan_status(base_url, user, password, scan_id, args.max_wait) + scan_report = get_scan_report(base_url, user, password, scan_id) + format_summary([], scan_report=scan_report) + result = {"scan_id": scan_id, "report": scan_report} + else: + result = {"scan_id": scan_id, "status": "launched"} + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "Rapid7 InsightVM", + "command": args.command, + "result": result, + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main() diff --git a/skills/performing-credential-access-with-lazagne/scripts/agent.py b/skills/performing-credential-access-with-lazagne/scripts/agent.py index 4677eb79..116a140a 100644 --- a/skills/performing-credential-access-with-lazagne/scripts/agent.py +++ b/skills/performing-credential-access-with-lazagne/scripts/agent.py @@ -1,60 +1,275 @@ #!/usr/bin/env python3 -"""LaZagne credential access detection agent.""" -import argparse, json +"""LaZagne credential access detection agent. + +Detects evidence of LaZagne credential harvesting tool execution on +endpoints by scanning for process artifacts, file system indicators, +and Windows event log entries associated with credential dumping. +Used for defensive detection and incident response, not offensive use. +""" +import argparse +import json +import os +import subprocess +import sys from datetime import datetime, timezone -try: - import requests -except ImportError: - requests = None -def run_scan(target, token=None): + +LAZAGNE_INDICATORS = { + "file_paths": [ + "lazagne.exe", "LaZagne.exe", "laZagne.py", + "lazagne_output.txt", "credentials.txt", + ], + "process_names": [ + "lazagne.exe", "python.exe lazagne", + ], + "registry_keys": [ + r"SOFTWARE\Microsoft\Windows\CurrentVersion\Run", + ], + "event_ids": { + 4688: "Process creation (look for lazagne.exe or suspicious python)", + 4663: "File access audit (credential store access)", + 4656: "Handle request to credential objects", + }, + "credential_stores": [ + # Windows + r"%APPDATA%\Mozilla\Firefox\Profiles", + r"%LOCALAPPDATA%\Google\Chrome\User Data\Default\Login Data", + r"%APPDATA%\Opera Software\Opera Stable\Login Data", + # Linux + os.path.expanduser("~/.mozilla/firefox"), + os.path.expanduser("~/.config/google-chrome/Default/Login Data"), + "/etc/shadow", + os.path.expanduser("~/.ssh"), + ], +} + + +def scan_filesystem_indicators(search_paths=None): + """Scan file system for LaZagne artifacts.""" findings = [] - if not requests: return [{"error": "requests required"}] - headers = {"Authorization": f"Bearer {token}"} if token else {} - try: - resp = requests.get(f"{target}", headers=headers, timeout=15) - if resp.status_code == 200: - findings.append({"check": "Target Accessible", "status": "OK", "severity": "INFO"}) + if search_paths is None: + if sys.platform == "win32": + search_paths = [ + os.environ.get("TEMP", "C:\\Temp"), + os.environ.get("USERPROFILE", "C:\\Users\\Default"), + "C:\\Windows\\Temp", + ] else: - findings.append({"check": "Target Access", "status": f"HTTP {resp.status_code}", "severity": "MEDIUM"}) - except requests.RequestException as e: - findings.append({"error": str(e)}) + search_paths = ["/tmp", "/var/tmp", os.path.expanduser("~")] + + print("[*] Scanning filesystem for LaZagne indicators...") + for search_path in search_paths: + if not os.path.isdir(search_path): + continue + for root, dirs, files in os.walk(search_path): + for fname in files: + fname_lower = fname.lower() + for indicator in LAZAGNE_INDICATORS["file_paths"]: + if indicator.lower() in fname_lower: + full_path = os.path.join(root, fname) + stat = os.stat(full_path) + findings.append({ + "type": "file_indicator", + "path": full_path, + "indicator": indicator, + "size": stat.st_size, + "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), + "severity": "CRITICAL", + "description": f"LaZagne artifact found: {fname}", + }) + # Limit directory depth to avoid slow scans + if root.count(os.sep) - search_path.count(os.sep) > 3: + dirs.clear() + return findings -def analyze_results(target, token=None): + +def check_windows_event_logs(): + """Check Windows Security event logs for LaZagne-related activity.""" findings = [] - if not requests: return [] - headers = {"Authorization": f"Bearer {token}"} if token else {} - try: - resp = requests.get(f"{target}/api/v1/results", headers=headers, timeout=15) - if resp.status_code == 200: - data = resp.json() - for item in data.get("findings", data.get("results", [])): - severity = item.get("severity", item.get("risk", "MEDIUM")) - findings.append({"check": item.get("name", item.get("title", "unknown")), - "severity": severity.upper() if isinstance(severity, str) else "MEDIUM"}) - except requests.RequestException: - pass + if sys.platform != "win32": + return findings + + print("[*] Checking Windows Security event logs...") + + # Check for process creation events matching LaZagne + ps_script = """ + Get-WinEvent -FilterHashtable @{ + LogName='Security'; Id=4688; StartTime=(Get-Date).AddDays(-7) + } -MaxEvents 1000 -ErrorAction SilentlyContinue | + Where-Object { + $_.Properties[5].Value -match 'lazagne|LaZagne' -or + $_.Properties[8].Value -match 'lazagne|LaZagne' + } | + Select-Object TimeCreated, @{N='ProcessName';E={$_.Properties[5].Value}}, + @{N='CommandLine';E={$_.Properties[8].Value}}, + @{N='User';E={$_.Properties[1].Value}} | + ConvertTo-Json + """ + result = subprocess.run( + ["powershell", "-Command", ps_script], + capture_output=True, text=True, timeout=60, + ) + if result.returncode == 0 and result.stdout.strip(): + try: + events = json.loads(result.stdout) + if isinstance(events, dict): + events = [events] + for evt in events: + findings.append({ + "type": "event_log", + "event_id": 4688, + "time": str(evt.get("TimeCreated", "")), + "process": evt.get("ProcessName", ""), + "command_line": evt.get("CommandLine", "")[:200], + "user": evt.get("User", ""), + "severity": "CRITICAL", + "description": "LaZagne process execution detected in event logs", + }) + except json.JSONDecodeError: + pass + + # Check for credential file access (Event ID 4663) + ps_script2 = """ + Get-WinEvent -FilterHashtable @{ + LogName='Security'; Id=4663; StartTime=(Get-Date).AddDays(-7) + } -MaxEvents 500 -ErrorAction SilentlyContinue | + Where-Object { + $_.Properties[6].Value -match 'Login Data|logins.json|Credentials|vault' + } | + Select-Object TimeCreated, @{N='ObjectName';E={$_.Properties[6].Value}}, + @{N='ProcessName';E={$_.Properties[11].Value}} | + ConvertTo-Json + """ + result = subprocess.run( + ["powershell", "-Command", ps_script2], + capture_output=True, text=True, timeout=60, + ) + if result.returncode == 0 and result.stdout.strip(): + try: + events = json.loads(result.stdout) + if isinstance(events, dict): + events = [events] + for evt in events: + findings.append({ + "type": "credential_access", + "event_id": 4663, + "time": str(evt.get("TimeCreated", "")), + "object": evt.get("ObjectName", ""), + "process": evt.get("ProcessName", ""), + "severity": "HIGH", + "description": "Credential store accessed by process", + }) + except json.JSONDecodeError: + pass + return findings + +def check_credential_store_integrity(): + """Check if credential stores have been recently accessed unusually.""" + findings = [] + print("[*] Checking credential store integrity...") + + stores_to_check = [] + if sys.platform == "win32": + appdata = os.environ.get("APPDATA", "") + localappdata = os.environ.get("LOCALAPPDATA", "") + stores_to_check = [ + (os.path.join(localappdata, "Google", "Chrome", "User Data", "Default", "Login Data"), "Chrome"), + (os.path.join(appdata, "Mozilla", "Firefox"), "Firefox"), + ] + else: + stores_to_check = [ + (os.path.expanduser("~/.config/google-chrome/Default/Login Data"), "Chrome"), + (os.path.expanduser("~/.mozilla/firefox"), "Firefox"), + (os.path.expanduser("~/.ssh"), "SSH Keys"), + ] + + for path, store_name in stores_to_check: + if os.path.exists(path): + if os.path.isfile(path): + stat = os.stat(path) + access_time = datetime.fromtimestamp(stat.st_atime, tz=timezone.utc) + mod_time = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + findings.append({ + "type": "credential_store", + "store": store_name, + "path": path, + "last_accessed": access_time.isoformat(), + "last_modified": mod_time.isoformat(), + "severity": "INFO", + }) + + return findings + + +def format_summary(all_findings): + """Print detection summary.""" + print(f"\n{'='*60}") + print(f" LaZagne Credential Access Detection Report") + print(f"{'='*60}") + + file_findings = [f for f in all_findings if f["type"] == "file_indicator"] + event_findings = [f for f in all_findings if f["type"] in ("event_log", "credential_access")] + store_findings = [f for f in all_findings if f["type"] == "credential_store"] + + print(f" File Indicators : {len(file_findings)}") + print(f" Event Log Hits : {len(event_findings)}") + print(f" Credential Stores: {len(store_findings)}") + + critical = [f for f in all_findings if f.get("severity") == "CRITICAL"] + if critical: + print(f"\n CRITICAL FINDINGS ({len(critical)}):") + for f in critical: + print(f" [{f['type']}] {f.get('description', f.get('path', ''))}") + + severity_counts = {} + for f in all_findings: + sev = f.get("severity", "INFO") + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + return severity_counts + + def main(): - p = argparse.ArgumentParser(description="LaZagne credential access detection agent") - p.add_argument("--target", required=True, help="Target URL or IP") - p.add_argument("--token", help="API token") - p.add_argument("--output", "-o", help="Output JSON report") - p.add_argument("--verbose", "-v", action="store_true") - a = p.parse_args() - print("[*] LaZagne credential access detection agent") - report = {"timestamp": datetime.now(timezone.utc).isoformat(), "target": a.target, "findings": []} - report["findings"].extend(run_scan(a.target, a.token)) - report["findings"].extend(analyze_results(a.target, a.token)) - high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL")) - report["risk_level"] = "CRITICAL" if high > 2 else "HIGH" if high else "MEDIUM" if report["findings"] else "LOW" - print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}") - if a.output: - with open(a.output, "w") as f: json.dump(report, f, indent=2) - else: + parser = argparse.ArgumentParser( + description="LaZagne credential access detection agent (defensive use)" + ) + parser.add_argument("--search-paths", nargs="+", help="Paths to scan for artifacts") + parser.add_argument("--skip-events", action="store_true", help="Skip Windows event log check") + parser.add_argument("--skip-stores", action="store_true", help="Skip credential store check") + parser.add_argument("--output", "-o", help="Output JSON report") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + all_findings = [] + all_findings.extend(scan_filesystem_indicators(args.search_paths)) + if not args.skip_events: + all_findings.extend(check_windows_event_logs()) + if not args.skip_stores: + all_findings.extend(check_credential_store_integrity()) + + severity_counts = format_summary(all_findings) + + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tool": "LaZagne Detection", + "findings": all_findings, + "severity_counts": severity_counts, + "risk_level": ( + "CRITICAL" if severity_counts.get("CRITICAL", 0) > 0 + else "HIGH" if severity_counts.get("HIGH", 0) > 0 + else "LOW" + ), + } + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"\n[+] Report saved to {args.output}") + elif args.verbose: print(json.dumps(report, indent=2)) + if __name__ == "__main__": main()