mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
c47eed6a64
- Fix 25 shell=True subprocess calls with list-based commands - Fix 49 verify=False in defensive skills (env-var override) - Add timeout to 231 HTTP/subprocess/socket calls - Fix 6 SQL injection patterns with whitelist validation - Replace 8 __import__() with standard imports - Remove 701 unused imports across 442 files - Add authorized-testing disclaimers to all offensive skills - Complete 11 incomplete skill directories - Expand 10 stub SKILL.md files with full content - Fix 2 YAML parse errors in frontmatter - Fix 5 pre-existing syntax errors - Convert 22 hardcoded paths/ports to environment variables - Back up 21 redundant skill pairs to .bak - Fix 2 global declaration errors - 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE) - 0 compile errors across all 724 agent.py files
163 lines
5.6 KiB
Python
163 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
# For authorized penetration testing and educational environments only.
|
|
# Usage against targets without prior mutual consent is illegal.
|
|
# It is the end user's responsibility to obey all applicable local, state and federal laws.
|
|
"""Agent for detecting excessive data exposure (OWASP API3) in API responses."""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
from datetime import datetime, timezone
|
|
|
|
try:
|
|
import requests
|
|
HAS_REQUESTS = True
|
|
except ImportError:
|
|
HAS_REQUESTS = False
|
|
|
|
SENSITIVE_FIELDS = [
|
|
"password", "passwd", "secret", "token", "api_key", "apikey",
|
|
"ssn", "social_security", "credit_card", "card_number", "cvv",
|
|
"private_key", "secret_key", "access_key", "session_id",
|
|
"internal_id", "salary", "bank_account", "routing_number",
|
|
"date_of_birth", "dob", "national_id", "passport",
|
|
]
|
|
|
|
PII_PATTERNS = {
|
|
"email": r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
|
|
"phone": r'\+?1?\d{10,15}',
|
|
"ssn": r'\d{3}-\d{2}-\d{4}',
|
|
"credit_card": r'\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}',
|
|
"ip_address": r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
|
|
}
|
|
|
|
|
|
def analyze_response(response_json, endpoint=""):
|
|
"""Analyze API response for excessive data exposure."""
|
|
findings = []
|
|
|
|
def check_fields(obj, path=""):
|
|
if isinstance(obj, dict):
|
|
for key, value in obj.items():
|
|
current_path = f"{path}.{key}" if path else key
|
|
key_lower = key.lower()
|
|
for sf in SENSITIVE_FIELDS:
|
|
if sf in key_lower:
|
|
findings.append({
|
|
"field": current_path,
|
|
"matched_pattern": sf,
|
|
"value_type": type(value).__name__,
|
|
"value_preview": str(value)[:20] + "..." if len(str(value)) > 20 else str(value),
|
|
"severity": "HIGH",
|
|
})
|
|
break
|
|
check_fields(value, current_path)
|
|
elif isinstance(obj, list):
|
|
for i, item in enumerate(obj[:5]):
|
|
check_fields(item, f"{path}[{i}]")
|
|
|
|
check_fields(response_json)
|
|
|
|
response_str = json.dumps(response_json)
|
|
for pattern_name, pattern in PII_PATTERNS.items():
|
|
matches = re.findall(pattern, response_str)
|
|
if matches:
|
|
findings.append({
|
|
"type": "pii_exposure",
|
|
"pattern": pattern_name,
|
|
"match_count": len(matches),
|
|
"samples": matches[:3],
|
|
"severity": "HIGH",
|
|
})
|
|
return findings
|
|
|
|
|
|
def test_endpoint(url, headers=None):
|
|
"""Fetch API endpoint and analyze for data exposure."""
|
|
if not HAS_REQUESTS:
|
|
return {"error": "requests library not available"}
|
|
try:
|
|
resp = requests.get(url, headers=headers, timeout=15, verify=False)
|
|
data = resp.json()
|
|
field_count = count_fields(data)
|
|
findings = analyze_response(data, url)
|
|
return {
|
|
"endpoint": url,
|
|
"status_code": resp.status_code,
|
|
"total_fields": field_count,
|
|
"sensitive_fields": len(findings),
|
|
"findings": findings,
|
|
}
|
|
except (requests.RequestException, json.JSONDecodeError) as e:
|
|
return {"endpoint": url, "error": str(e)[:200]}
|
|
|
|
|
|
def count_fields(obj, count=0):
|
|
"""Count total fields in a JSON response."""
|
|
if isinstance(obj, dict):
|
|
count += len(obj)
|
|
for v in obj.values():
|
|
count = count_fields(v, count)
|
|
elif isinstance(obj, list):
|
|
for item in obj[:10]:
|
|
count = count_fields(item, count)
|
|
return count
|
|
|
|
|
|
def compare_with_spec(response_json, spec_fields):
|
|
"""Compare response fields against expected OpenAPI spec fields."""
|
|
actual = set()
|
|
|
|
def extract_keys(obj, prefix=""):
|
|
if isinstance(obj, dict):
|
|
for k in obj:
|
|
path = f"{prefix}.{k}" if prefix else k
|
|
actual.add(path)
|
|
extract_keys(obj[k], path)
|
|
|
|
extract_keys(response_json)
|
|
extra = actual - set(spec_fields)
|
|
return list(extra)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Detect excessive data exposure in API responses"
|
|
)
|
|
parser.add_argument("--url", help="API endpoint to test")
|
|
parser.add_argument("--json-file", help="JSON response file to analyze")
|
|
parser.add_argument("--token", help="Bearer token for auth")
|
|
parser.add_argument("--output", "-o", help="Output JSON report")
|
|
args = parser.parse_args()
|
|
|
|
print("[*] Excessive Data Exposure Detection Agent (OWASP API3)")
|
|
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
|
|
|
|
if args.url:
|
|
headers = {"Authorization": f"Bearer {args.token}"} if args.token else {}
|
|
result = test_endpoint(args.url, headers)
|
|
report["findings"].append(result)
|
|
print(f"[*] {args.url}: {result.get('sensitive_fields', 0)} sensitive fields found")
|
|
|
|
if args.json_file:
|
|
with open(args.json_file, "r") as f:
|
|
data = json.load(f)
|
|
findings = analyze_response(data, args.json_file)
|
|
report["findings"].extend(findings)
|
|
print(f"[*] File analysis: {len(findings)} sensitive fields found")
|
|
|
|
report["risk_level"] = "HIGH" if any(
|
|
f.get("sensitive_fields", 0) > 0 or f.get("severity") == "HIGH" for f in report["findings"]
|
|
) else "LOW"
|
|
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"[*] Report saved to {args.output}")
|
|
else:
|
|
print(json.dumps(report, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|