#!/usr/bin/env python3 # For authorized testing in lab/CTF environments only """IDOR vulnerability detection agent using requests with multi-session comparison.""" import argparse import json import logging import sys import hashlib from typing import List, Optional try: import requests except ImportError: sys.exit("requests is required: pip install requests") logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) class IDORTester: """Tests for Insecure Direct Object Reference vulnerabilities.""" def __init__(self, base_url: str, user_a_token: str, user_b_token: str, verify_ssl: bool = False): self.base_url = base_url.rstrip("/") self.verify = verify_ssl self.session_a = requests.Session() self.session_a.headers.update({"Authorization": f"Bearer {user_a_token}"}) self.session_a.verify = verify_ssl self.session_b = requests.Session() self.session_b.headers.update({"Authorization": f"Bearer {user_b_token}"}) self.session_b.verify = verify_ssl self.findings = [] def _response_hash(self, resp: requests.Response) -> str: return hashlib.md5(resp.content).hexdigest() def test_horizontal_idor(self, endpoint_template: str, own_id: str, other_id: str, method: str = "GET") -> dict: """Test horizontal IDOR by accessing another user's resource.""" own_url = f"{self.base_url}{endpoint_template.replace('{id}', own_id)}" other_url = f"{self.base_url}{endpoint_template.replace('{id}', other_id)}" own_resp = self.session_a.request(method, own_url, timeout=10) other_resp = self.session_a.request(method, other_url, timeout=10) vulnerable = ( other_resp.status_code == 200 and own_resp.status_code == 200 and self._response_hash(other_resp) != self._response_hash(own_resp) ) result = { "type": "horizontal", "endpoint": endpoint_template, "method": method, "own_status": own_resp.status_code, "other_status": other_resp.status_code, "vulnerable": vulnerable, "own_content_length": len(own_resp.content), "other_content_length": len(other_resp.content), } if vulnerable: self.findings.append(result) logger.warning("IDOR FOUND: %s %s", method, endpoint_template) return result def test_vertical_idor(self, endpoint: str, method: str = "GET") -> dict: """Test vertical IDOR by accessing admin endpoints with regular user token.""" url = f"{self.base_url}{endpoint}" resp = self.session_a.request(method, url, timeout=10) vulnerable = resp.status_code == 200 result = { "type": "vertical", "endpoint": endpoint, "method": method, "status_code": resp.status_code, "vulnerable": vulnerable, "content_length": len(resp.content), } if vulnerable: self.findings.append(result) logger.warning("Vertical IDOR: %s %s (status=%d)", method, endpoint, resp.status_code) return result def test_id_enumeration(self, endpoint_template: str, id_range: range, method: str = "GET") -> dict: """Enumerate valid object IDs via response code analysis.""" valid_ids = [] for obj_id in id_range: url = f"{self.base_url}{endpoint_template.replace('{id}', str(obj_id))}" try: resp = self.session_a.request(method, url, timeout=5) if resp.status_code == 200: valid_ids.append(obj_id) except requests.RequestException: continue logger.info("Enumerated %d valid IDs in range %d-%d", len(valid_ids), id_range.start, id_range.stop) return { "endpoint": endpoint_template, "range_tested": f"{id_range.start}-{id_range.stop}", "valid_ids_found": len(valid_ids), "sample_ids": valid_ids[:10], } def test_write_idor(self, endpoint_template: str, other_id: str, payload: dict) -> dict: """Test write-based IDOR via PUT/PATCH with another user's ID.""" url = f"{self.base_url}{endpoint_template.replace('{id}', other_id)}" resp = self.session_a.put(url, json=payload, timeout=10) vulnerable = resp.status_code in (200, 201, 204) result = { "type": "write_idor", "endpoint": endpoint_template, "method": "PUT", "target_id": other_id, "status_code": resp.status_code, "vulnerable": vulnerable, } if vulnerable: self.findings.append(result) logger.warning("Write IDOR: PUT %s (status=%d)", endpoint_template, resp.status_code) return result def test_cross_session(self, endpoint_template: str, resource_id: str) -> dict: """Compare responses between two authenticated sessions for the same resource.""" url = f"{self.base_url}{endpoint_template.replace('{id}', resource_id)}" resp_a = self.session_a.get(url, timeout=10) resp_b = self.session_b.get(url, timeout=10) same_response = self._response_hash(resp_a) == self._response_hash(resp_b) result = { "endpoint": endpoint_template, "resource_id": resource_id, "user_a_status": resp_a.status_code, "user_b_status": resp_b.status_code, "same_response": same_response, "missing_authz": resp_a.status_code == 200 and resp_b.status_code == 200 and same_response, } return result def generate_report(self) -> dict: """Compile IDOR assessment results.""" return { "target": self.base_url, "total_findings": len(self.findings), "findings": self.findings, "severity": "High" if self.findings else "None", } def main(): parser = argparse.ArgumentParser(description="IDOR Vulnerability Testing Agent") parser.add_argument("--url", required=True, help="Base URL of the target API") parser.add_argument("--token-a", required=True, help="JWT token for User A") parser.add_argument("--token-b", required=True, help="JWT token for User B") parser.add_argument("--endpoints", nargs="+", default=["/api/v1/users/{id}/profile"]) parser.add_argument("--own-id", default="101", help="User A's resource ID") parser.add_argument("--other-id", default="102", help="User B's resource ID") parser.add_argument("--output", default="idor_report.json") args = parser.parse_args() tester = IDORTester(args.url, args.token_a, args.token_b) all_results = [] for ep in args.endpoints: result = tester.test_horizontal_idor(ep, args.own_id, args.other_id) all_results.append(result) report = tester.generate_report() report["test_details"] = all_results with open(args.output, "w") as f: json.dump(report, f, indent=2) logger.info("Report saved to %s", args.output) print(json.dumps(report, indent=2)) if __name__ == "__main__": main()