mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
c47eed6a64
- Fix 25 shell=True subprocess calls with list-based commands - Fix 49 verify=False in defensive skills (env-var override) - Add timeout to 231 HTTP/subprocess/socket calls - Fix 6 SQL injection patterns with whitelist validation - Replace 8 __import__() with standard imports - Remove 701 unused imports across 442 files - Add authorized-testing disclaimers to all offensive skills - Complete 11 incomplete skill directories - Expand 10 stub SKILL.md files with full content - Fix 2 YAML parse errors in frontmatter - Fix 5 pre-existing syntax errors - Convert 22 hardcoded paths/ports to environment variables - Back up 21 redundant skill pairs to .bak - Fix 2 global declaration errors - 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE) - 0 compile errors across all 724 agent.py files
164 lines
6.4 KiB
Python
164 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Second-Order SQL Injection agent — detects stored SQL injection payloads
|
|
by analyzing database content and tracing data flow from input to secondary
|
|
query execution points."""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
from collections import Counter
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
|
|
SQL_INJECTION_PATTERNS = [
|
|
r"(?i)(\bunion\b\s+\bselect\b)",
|
|
r"(?i)(\bor\b\s+1\s*=\s*1)",
|
|
r"(?i)(\band\b\s+1\s*=\s*1)",
|
|
r"(?i)(;\s*drop\s+table\b)",
|
|
r"(?i)(;\s*delete\s+from\b)",
|
|
r"(?i)(;\s*update\b.*\bset\b)",
|
|
r"(?i)(;\s*insert\s+into\b)",
|
|
r"(?i)(--\s*$)",
|
|
r"(?i)(\bexec\b\s*\()",
|
|
r"(?i)(\bwaitfor\b\s+\bdelay\b)",
|
|
r"(?i)(\bsleep\b\s*\(\d+\))",
|
|
r"(?i)(\bconvert\b\s*\()",
|
|
r"(?i)(\bcast\b\s*\(.*\bas\b)",
|
|
r"(?i)(\bchar\b\s*\(\d+\))",
|
|
r"(?i)(\b0x[0-9a-f]+\b)",
|
|
r"(\x27|\x22)\s*(or|and|union)",
|
|
]
|
|
|
|
|
|
def scan_database_values(db_dump_path: str) -> list[dict]:
|
|
"""Scan a database dump (JSON format) for stored SQL injection payloads."""
|
|
data = json.loads(Path(db_dump_path).read_text(encoding="utf-8"))
|
|
findings = []
|
|
for table_name, rows in data.items():
|
|
for row_idx, row in enumerate(rows):
|
|
for col_name, value in row.items():
|
|
if not isinstance(value, str):
|
|
continue
|
|
for pattern in SQL_INJECTION_PATTERNS:
|
|
match = re.search(pattern, value)
|
|
if match:
|
|
findings.append({
|
|
"type": "stored_sqli_payload",
|
|
"severity": "critical",
|
|
"table": table_name,
|
|
"column": col_name,
|
|
"row_index": row_idx,
|
|
"matched_pattern": pattern,
|
|
"matched_text": match.group(0),
|
|
"value_preview": value[:200],
|
|
"detail": f"SQL injection payload in {table_name}.{col_name} row {row_idx}",
|
|
})
|
|
break
|
|
return findings
|
|
|
|
|
|
def scan_source_code(source_dir: str) -> list[dict]:
|
|
"""Scan source code for second-order SQL injection sinks (string concatenation with DB data)."""
|
|
dangerous_patterns = [
|
|
(r"(?i)cursor\.execute\s*\(\s*[\"'].*%s", "python_format_string"),
|
|
(r"(?i)cursor\.execute\s*\(\s*f[\"']", "python_fstring"),
|
|
(r'(?i)query\s*=\s*["\'].*\+\s*\w+', "string_concatenation"),
|
|
(r"(?i)\.format\s*\(.*\)\s*\)", "python_format"),
|
|
(r'(?i)\$\{.*\}\s*(?:FROM|WHERE|INSERT|UPDATE|DELETE)', "template_literal"),
|
|
(r'(?i)sprintf\s*\(\s*["\'].*(?:SELECT|INSERT|UPDATE|DELETE)', "sprintf_query"),
|
|
]
|
|
findings = []
|
|
src = Path(source_dir)
|
|
for ext in ("*.py", "*.php", "*.java", "*.js", "*.rb", "*.cs"):
|
|
for fpath in src.rglob(ext):
|
|
try:
|
|
content = fpath.read_text(encoding="utf-8", errors="ignore")
|
|
for line_no, line in enumerate(content.splitlines(), 1):
|
|
for pattern, pattern_name in dangerous_patterns:
|
|
if re.search(pattern, line):
|
|
findings.append({
|
|
"type": "second_order_sqli_sink",
|
|
"severity": "high",
|
|
"file": str(fpath),
|
|
"line": line_no,
|
|
"pattern": pattern_name,
|
|
"code_snippet": line.strip()[:200],
|
|
"detail": f"Potential second-order SQLi sink at {fpath.name}:{line_no}",
|
|
})
|
|
break
|
|
except OSError:
|
|
continue
|
|
return findings
|
|
|
|
|
|
def trace_data_flow(db_findings: list[dict], code_findings: list[dict]) -> list[dict]:
|
|
"""Correlate stored payloads with code sinks to identify complete attack paths."""
|
|
attack_paths = []
|
|
for db_f in db_findings:
|
|
table = db_f["table"]
|
|
column = db_f["column"]
|
|
for code_f in code_findings:
|
|
snippet = code_f.get("code_snippet", "").lower()
|
|
if table.lower() in snippet or column.lower() in snippet:
|
|
attack_paths.append({
|
|
"type": "confirmed_attack_path",
|
|
"severity": "critical",
|
|
"source": f"{table}.{column}",
|
|
"sink": f"{code_f['file']}:{code_f['line']}",
|
|
"detail": f"Stored payload in {table}.{column} flows to query at {code_f['file']}:{code_f['line']}",
|
|
})
|
|
return attack_paths
|
|
|
|
|
|
def generate_report(db_dump_path: str = None, source_dir: str = None) -> dict:
|
|
"""Run analysis and build consolidated report."""
|
|
findings = []
|
|
db_findings = []
|
|
code_findings = []
|
|
|
|
if db_dump_path:
|
|
db_findings = scan_database_values(db_dump_path)
|
|
findings.extend(db_findings)
|
|
if source_dir:
|
|
code_findings = scan_source_code(source_dir)
|
|
findings.extend(code_findings)
|
|
if db_findings and code_findings:
|
|
attack_paths = trace_data_flow(db_findings, code_findings)
|
|
findings.extend(attack_paths)
|
|
|
|
severity_counts = Counter(f.get("severity", "info") for f in findings)
|
|
return {
|
|
"report": "second_order_sql_injection",
|
|
"generated_at": datetime.utcnow().isoformat() + "Z",
|
|
"total_findings": len(findings),
|
|
"severity_summary": dict(severity_counts),
|
|
"stored_payloads": len(db_findings),
|
|
"code_sinks": len(code_findings),
|
|
"confirmed_attack_paths": len([f for f in findings if f["type"] == "confirmed_attack_path"]),
|
|
"findings": findings,
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Second-Order SQL Injection Agent")
|
|
parser.add_argument("--db-dump", help="JSON database dump file to scan for stored payloads")
|
|
parser.add_argument("--source", help="Source code directory to scan for injection sinks")
|
|
parser.add_argument("--output", help="Output JSON file path")
|
|
args = parser.parse_args()
|
|
|
|
if not args.db_dump and not args.source:
|
|
parser.error("At least one of --db-dump or --source is required")
|
|
|
|
report = generate_report(args.db_dump, args.source)
|
|
output = json.dumps(report, indent=2)
|
|
if args.output:
|
|
Path(args.output).write_text(output, encoding="utf-8")
|
|
print(f"Report written to {args.output}")
|
|
else:
|
|
print(output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|