#!/usr/bin/env python3 # For authorized testing in lab/CTF environments only """HTTP request smuggling detection agent using raw socket and requests.""" import argparse import json import logging import socket import ssl import sys import time from urllib.parse import urlparse from typing import Optional try: import requests except ImportError: sys.exit("requests is required: pip install requests") logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) def identify_architecture(url: str) -> dict: """Identify the front-end/back-end HTTP architecture from response headers.""" resp = requests.get(url, timeout=10, allow_redirects=False) headers = dict(resp.headers) arch = { "url": url, "server": headers.get("Server", "unknown"), "via": headers.get("Via", ""), "x_served_by": headers.get("X-Served-By", ""), "x_cache": headers.get("X-Cache", ""), "cf_ray": headers.get("CF-Ray", ""), "http_version": f"HTTP/{resp.raw.version / 10:.1f}" if hasattr(resp.raw, "version") else "unknown", } if arch["cf_ray"]: arch["cdn"] = "Cloudflare" elif "cloudfront" in headers.get("X-Amz-Cf-Id", "").lower(): arch["cdn"] = "AWS CloudFront" elif arch["x_cache"]: arch["cdn"] = "Varnish/CDN" logger.info("Architecture: server=%s, cdn=%s", arch["server"], arch.get("cdn", "none")) return arch def send_raw_request(host: str, port: int, request_bytes: bytes, use_ssl: bool = True, timeout: float = 10.0) -> tuple: """Send a raw HTTP request and measure response time.""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) if use_ssl: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE sock = context.wrap_socket(sock, server_hostname=host) start = time.time() try: sock.connect((host, port)) sock.sendall(request_bytes) response = b"" while True: try: chunk = sock.recv(4096) if not chunk: break response += chunk except socket.timeout: break except Exception as exc: elapsed = time.time() - start return b"", elapsed, str(exc) finally: sock.close() elapsed = time.time() - start return response, elapsed, None def test_clte_detection(host: str, port: int, use_ssl: bool = True) -> dict: """Test for CL.TE smuggling via time-based detection.""" probe = ( f"POST / HTTP/1.1\r\n" f"Host: {host}\r\n" f"Content-Length: 4\r\n" f"Transfer-Encoding: chunked\r\n" f"\r\n" f"1\r\nA\r\nX" ).encode() response, elapsed, error = send_raw_request(host, port, probe, use_ssl, timeout=15) vulnerable = elapsed > 5.0 and not error result = { "test": "CL.TE", "response_time": round(elapsed, 2), "likely_vulnerable": vulnerable, "error": error, } logger.info("CL.TE test: %.2fs response (vulnerable=%s)", elapsed, vulnerable) return result def test_tecl_detection(host: str, port: int, use_ssl: bool = True) -> dict: """Test for TE.CL smuggling via differential response.""" probe = ( f"POST / HTTP/1.1\r\n" f"Host: {host}\r\n" f"Content-Length: 6\r\n" f"Transfer-Encoding: chunked\r\n" f"\r\n" f"0\r\n\r\nX" ).encode() response, elapsed, error = send_raw_request(host, port, probe, use_ssl, timeout=15) status = "" if response: first_line = response.split(b"\r\n", 1)[0].decode(errors="ignore") status = first_line vulnerable = elapsed > 5.0 and not error result = { "test": "TE.CL", "response_time": round(elapsed, 2), "response_status": status, "likely_vulnerable": vulnerable, "error": error, } logger.info("TE.CL test: %.2fs (vulnerable=%s)", elapsed, vulnerable) return result def test_te_te_detection(host: str, port: int, use_ssl: bool = True) -> dict: """Test for TE.TE smuggling with obfuscated Transfer-Encoding headers.""" obfuscations = [ "Transfer-Encoding: xchunked", "Transfer-Encoding : chunked", "Transfer-Encoding: chunked\r\nTransfer-Encoding: x", "Transfer-Encoding:\tchunked", "X: x\r\nTransfer-Encoding: chunked", ] results = [] for obf in obfuscations: probe = ( f"POST / HTTP/1.1\r\n" f"Host: {host}\r\n" f"Content-Length: 4\r\n" f"{obf}\r\n" f"\r\n" f"1\r\nA\r\nX" ).encode() response, elapsed, error = send_raw_request(host, port, probe, use_ssl, timeout=10) results.append({ "obfuscation": obf.replace("\r\n", " | "), "response_time": round(elapsed, 2), "suspicious": elapsed > 5.0, }) return {"test": "TE.TE", "obfuscation_results": results} def run_assessment(url: str) -> dict: """Run the full HTTP request smuggling assessment.""" parsed = urlparse(url) host = parsed.hostname use_ssl = parsed.scheme == "https" port = parsed.port or (443 if use_ssl else 80) arch = identify_architecture(url) clte = test_clte_detection(host, port, use_ssl) tecl = test_tecl_detection(host, port, use_ssl) tete = test_te_te_detection(host, port, use_ssl) return { "target": url, "architecture": arch, "tests": {"CL.TE": clte, "TE.CL": tecl, "TE.TE": tete}, "summary": { "clte_vulnerable": clte["likely_vulnerable"], "tecl_vulnerable": tecl["likely_vulnerable"], "any_suspicious": any(r["suspicious"] for r in tete["obfuscation_results"]), }, } def main(): parser = argparse.ArgumentParser(description="HTTP Request Smuggling Detection Agent") parser.add_argument("--url", required=True, help="Target URL") parser.add_argument("--output", default="smuggling_report.json") args = parser.parse_args() report = run_assessment(args.url) with open(args.output, "w") as f: json.dump(report, f, indent=2) logger.info("Report saved to %s", args.output) print(json.dumps(report, indent=2)) if __name__ == "__main__": main()