mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-26 19:54:37 +03:00
25e0bc60e8
Pre-deployment security audit skill for Solidity contracts in Foundry projects. Complements analyzing-ethereum-smart-contract-vulnerabilities (which it is based on) with a dev-side, Foundry-first workflow and full key-hygiene coverage. Layers four independent techniques: - Static analysis: Slither (90+ detectors) + Aderyn (Cyfrin) - Symbolic execution: Mythril (optional) - Property-based testing: forge fuzz + invariant tests (handler pattern) - Manual review checklist + secrets/keystore audit Includes scripts/agent.py (orchestrator aggregating Slither/Aderyn/Mythril/forge test + coverage + private-key scan into a JSON report with a PASS/FAIL deploy gate) and three references (tool cheat-sheets, SWC vulnerability checklist, secure deployment & key hygiene with cast keystore / multisig). Passes tools/validate-skill.py. Slither, Aderyn, forge test/coverage parsing and the gate logic were verified end-to-end against a reentrancy-vulnerable contract. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
320 lines
14 KiB
Python
320 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""Foundry Smart Contract Security Agent.
|
|
|
|
Pre-deployment audit orchestrator for a Foundry project. Runs static analysis
|
|
(Slither, Aderyn), optional symbolic execution (Mythril), Foundry tests/coverage,
|
|
and a key-leak scan, then aggregates everything into a single JSON report with a
|
|
PASS/FAIL deploy gate.
|
|
|
|
Design constraints (mirrors the upstream repo's style, hardened):
|
|
- subprocess always called with an ARGUMENT LIST, never shell=True
|
|
- no outbound network calls; every tool runs locally on local source
|
|
- every external tool guarded by timeout and graceful degradation if absent
|
|
- read-only with respect to the project (only writes the report file)
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import shutil
|
|
import argparse
|
|
import logging
|
|
import subprocess
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SWC_REGISTRY = {
|
|
"SWC-101": "Integer Overflow and Underflow",
|
|
"SWC-104": "Unchecked Call Return Value",
|
|
"SWC-105": "Unprotected Ether Withdrawal",
|
|
"SWC-106": "Unprotected SELFDESTRUCT",
|
|
"SWC-107": "Reentrancy",
|
|
"SWC-110": "Assert Violation",
|
|
"SWC-112": "Delegatecall to Untrusted Callee",
|
|
"SWC-113": "DoS with Failed Call",
|
|
"SWC-114": "Transaction Order Dependence (front-running)",
|
|
"SWC-115": "Authorization through tx.origin",
|
|
"SWC-116": "Block values as a proxy for time",
|
|
"SWC-120": "Weak Sources of Randomness",
|
|
"SWC-128": "DoS with Block Gas Limit",
|
|
}
|
|
|
|
SEVERITY_RANK = {"critical": 0, "high": 1, "medium": 2, "low": 3, "informational": 4, "optimization": 5}
|
|
|
|
# Directories that are dependencies / build output, not the audited code.
|
|
SKIP_DIRS = {"lib", "out", "cache", "node_modules", ".git", "broadcast", "artifacts"}
|
|
|
|
# A raw 32-byte hex private key (with or without 0x). High-precision signal: a
|
|
# 64-hex literal in source is almost always a key. Broader secret detection
|
|
# (mnemonics, API tokens, generic secrets) is intentionally delegated to gitleaks
|
|
# (see references/secure-deployment-and-keys.md) rather than reinvented noisily here.
|
|
PRIVKEY_RE = re.compile(r"\b(0x)?[0-9a-fA-F]{64}\b")
|
|
|
|
|
|
def _which(tool):
|
|
return shutil.which(tool) is not None
|
|
|
|
|
|
def _run(cmd, timeout):
|
|
"""Run a command (list args), return (returncode, stdout, stderr). Never raises."""
|
|
try:
|
|
p = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
|
return p.returncode, p.stdout, p.stderr
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning("Timeout: %s", " ".join(cmd))
|
|
return 124, "", "timeout"
|
|
except FileNotFoundError:
|
|
return 127, "", "not found"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Static analysis
|
|
# --------------------------------------------------------------------------- #
|
|
def run_slither(project):
|
|
if not _which("slither"):
|
|
logger.warning("slither not installed - skipping static analysis")
|
|
return None
|
|
rc, out, err = _run(["slither", project, "--json", "-"], timeout=300)
|
|
# Slither exits non-zero when it finds issues; JSON still lands on stdout.
|
|
if not out:
|
|
logger.error("slither produced no JSON (%s)", err.strip()[:200])
|
|
return {}
|
|
try:
|
|
return json.loads(out)
|
|
except json.JSONDecodeError:
|
|
logger.error("slither JSON parse failed")
|
|
return {}
|
|
|
|
|
|
def analyze_slither(slither_output):
|
|
findings, by_severity, by_detector = [], defaultdict(int), defaultdict(int)
|
|
for det in (slither_output or {}).get("results", {}).get("detectors", []):
|
|
severity = det.get("impact", "informational").lower()
|
|
by_severity[severity] += 1
|
|
name = det.get("check", "unknown")
|
|
by_detector[name] += 1
|
|
loc = ""
|
|
elems = det.get("elements", [])
|
|
if elems:
|
|
sm = elems[0].get("source_mapping", {})
|
|
lines = sm.get("lines") or [0]
|
|
loc = f"{sm.get('filename_short', '')}:L{lines[0]}"
|
|
findings.append({
|
|
"source": "slither", "detector": name, "severity": severity,
|
|
"confidence": det.get("confidence", ""), "location": loc,
|
|
"description": (det.get("description", "") or "").strip()[:240],
|
|
})
|
|
return {
|
|
"total": len(findings),
|
|
"by_severity": dict(by_severity),
|
|
"top_detectors": dict(sorted(by_detector.items(), key=lambda x: -x[1])[:15]),
|
|
"findings": sorted(findings, key=lambda f: SEVERITY_RANK.get(f["severity"], 9)),
|
|
}
|
|
|
|
|
|
def run_aderyn(project):
|
|
if not _which("aderyn"):
|
|
logger.info("aderyn not installed - skipping (recommended: cargo install aderyn)")
|
|
return None
|
|
report = os.path.join(project, "aderyn-report.json")
|
|
rc, out, err = _run(["aderyn", project, "-o", report], timeout=300)
|
|
try:
|
|
with open(report) as fh:
|
|
data = json.load(fh)
|
|
return data
|
|
except (OSError, json.JSONDecodeError):
|
|
logger.error("aderyn report not readable")
|
|
return {}
|
|
|
|
|
|
def analyze_aderyn(aderyn_output):
|
|
findings, by_severity = [], defaultdict(int)
|
|
if not aderyn_output:
|
|
return {"total": 0, "by_severity": {}, "findings": []}
|
|
for sev_key, sev in (("high_issues", "high"), ("low_issues", "low")):
|
|
block = aderyn_output.get(sev_key, {}) or {}
|
|
for issue in block.get("issues", []) if isinstance(block, dict) else []:
|
|
by_severity[sev] += 1
|
|
inst = (issue.get("instances") or [{}])[0]
|
|
loc = f"{inst.get('contract_path', '')}:L{inst.get('line_no', 0)}"
|
|
findings.append({
|
|
"source": "aderyn", "detector": issue.get("title", ""), "severity": sev,
|
|
"location": loc, "description": (issue.get("description", "") or "").strip()[:240],
|
|
})
|
|
return {
|
|
"total": len(findings),
|
|
"by_severity": dict(by_severity),
|
|
"findings": sorted(findings, key=lambda f: SEVERITY_RANK.get(f["severity"], 9)),
|
|
}
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Symbolic execution (optional)
|
|
# --------------------------------------------------------------------------- #
|
|
def run_mythril(target, timeout):
|
|
if not _which("myth"):
|
|
logger.info("mythril not installed - skipping symbolic execution")
|
|
return None
|
|
rc, out, err = _run(
|
|
["myth", "analyze", target, "--execution-timeout", str(timeout), "-o", "json"],
|
|
timeout=timeout + 60,
|
|
)
|
|
if not out:
|
|
return {}
|
|
try:
|
|
return json.loads(out)
|
|
except json.JSONDecodeError:
|
|
logger.error("mythril JSON parse failed")
|
|
return {}
|
|
|
|
|
|
def analyze_mythril(mythril_output):
|
|
findings, by_swc = [], defaultdict(int)
|
|
for issue in (mythril_output or {}).get("issues", []):
|
|
swc = f"SWC-{issue.get('swc-id')}" if issue.get("swc-id") else "unknown"
|
|
by_swc[swc] += 1
|
|
findings.append({
|
|
"source": "mythril", "swc_id": swc,
|
|
"swc_title": SWC_REGISTRY.get(swc, issue.get("title", "")),
|
|
"severity": issue.get("severity", "Medium").lower(),
|
|
"location": f"{issue.get('contract', '')}:L{issue.get('lineno', 0)}",
|
|
"description": (issue.get("description", "") or "").strip()[:240],
|
|
})
|
|
return {"total": len(findings), "by_swc": dict(by_swc), "findings": findings}
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Foundry tests + coverage
|
|
# --------------------------------------------------------------------------- #
|
|
def run_forge_tests(project):
|
|
if not _which("forge"):
|
|
logger.warning("forge not installed - skipping tests")
|
|
return {"available": False}
|
|
rc, out, err = _run(["forge", "test"], timeout=900)
|
|
text = out + err
|
|
passed = sum(int(n) for n in re.findall(r"(\d+)\s+passed", text))
|
|
failed = sum(int(n) for n in re.findall(r"(\d+)\s+failed", text))
|
|
return {"available": True, "exit_code": rc, "passed": passed,
|
|
"failed": failed, "all_passed": rc == 0}
|
|
|
|
|
|
def run_forge_coverage(project):
|
|
if not _which("forge"):
|
|
return {"available": False}
|
|
rc, out, err = _run(["forge", "coverage", "--report", "summary"], timeout=1200)
|
|
m = re.search(r"^\|\s*Total\s*\|\s*([\d.]+)%", out, re.M)
|
|
lines_pct = float(m.group(1)) if m else None
|
|
return {"available": True, "lines_pct": lines_pct}
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Key hygiene
|
|
# --------------------------------------------------------------------------- #
|
|
def scan_key_leaks(project):
|
|
"""Heuristic scan for plaintext private keys / mnemonics in source-controlled files."""
|
|
hits = []
|
|
for root, dirs, files in os.walk(project):
|
|
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
|
|
for f in files:
|
|
# Match source/config files plus all dotenv variants (.env, .env.local, .env.prod...)
|
|
if not (f.endswith((".sol", ".env", ".json", ".js", ".ts", ".toml", ".txt", ".md", ".sh", ".yaml", ".yml"))
|
|
or f.startswith(".env")):
|
|
continue
|
|
path = os.path.join(root, f)
|
|
try:
|
|
with open(path, encoding="utf-8", errors="ignore") as fh:
|
|
content = fh.read()
|
|
except OSError:
|
|
continue
|
|
for m in PRIVKEY_RE.finditer(content):
|
|
# Exclude the well-known Anvil/Hardhat test mnemonic-derived keys & all-zero.
|
|
val = m.group(0).lower().removeprefix("0x")
|
|
if val == "0" * 64 or "test test test" in content[max(0, m.start() - 80):m.start()]:
|
|
continue
|
|
hits.append({"file": os.path.relpath(path, project), "type": "possible_private_key"})
|
|
break
|
|
return {"leaked_secret_candidates": len(hits), "hits": hits[:20],
|
|
"note": "high-precision private-key scan only; run gitleaks for full secret coverage"}
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Aggregation + gate
|
|
# --------------------------------------------------------------------------- #
|
|
def deduplicate(*finding_lists):
|
|
seen, combined = set(), []
|
|
for lst in finding_lists:
|
|
for f in lst:
|
|
key = (f.get("location", ""), f.get("detector", f.get("swc_id", "")))
|
|
if key not in seen:
|
|
seen.add(key)
|
|
combined.append(f)
|
|
return combined
|
|
|
|
|
|
def build_report(project, slither, aderyn, mythril, tests, coverage, keys, min_coverage):
|
|
combined = deduplicate(slither["findings"], aderyn["findings"], mythril["findings"])
|
|
crit_high = sum(1 for f in combined if f.get("severity") in ("critical", "high"))
|
|
|
|
gate_fail = []
|
|
if crit_high > 0:
|
|
gate_fail.append(f"{crit_high} high/critical static finding(s)")
|
|
if tests.get("available") and not tests.get("all_passed"):
|
|
gate_fail.append(f"{tests.get('failed', '?')} failing test(s)")
|
|
if keys["leaked_secret_candidates"] > 0:
|
|
gate_fail.append(f"{keys['leaked_secret_candidates']} possible leaked secret(s)")
|
|
cov = coverage.get("lines_pct")
|
|
if cov is not None and cov < min_coverage:
|
|
gate_fail.append(f"line coverage {cov}% < {min_coverage}% threshold")
|
|
|
|
return {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"project": os.path.abspath(project),
|
|
"static_analysis": {
|
|
"slither": {"total": slither["total"], "by_severity": slither["by_severity"],
|
|
"top_detectors": slither["top_detectors"]},
|
|
"aderyn": {"total": aderyn["total"], "by_severity": aderyn["by_severity"]},
|
|
},
|
|
"symbolic_execution": {"mythril": {"total": mythril["total"], "by_swc": mythril.get("by_swc", {})}},
|
|
"testing": {"forge_test": tests, "coverage": coverage},
|
|
"key_hygiene": keys,
|
|
"combined_findings": len(combined),
|
|
"critical_high_findings": crit_high,
|
|
"deploy_gate": "PASS" if not gate_fail else "FAIL",
|
|
"gate_failures": gate_fail,
|
|
"findings": combined[:40],
|
|
}
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Foundry Smart Contract Security Audit Agent")
|
|
ap.add_argument("--project", default=".", help="Path to the Foundry project root")
|
|
ap.add_argument("--mythril", metavar="FILE", help="Run Mythril symbolic execution on this .sol file")
|
|
ap.add_argument("--mythril-timeout", type=int, default=300)
|
|
ap.add_argument("--min-coverage", type=float, default=80.0, help="Min line coverage %% for PASS")
|
|
ap.add_argument("--output", default="audit-report.json")
|
|
args = ap.parse_args()
|
|
|
|
logger.info("Auditing Foundry project: %s", os.path.abspath(args.project))
|
|
slither = analyze_slither(run_slither(args.project))
|
|
aderyn = analyze_aderyn(run_aderyn(args.project))
|
|
mythril = analyze_mythril(run_mythril(args.mythril, args.mythril_timeout) if args.mythril else {})
|
|
tests = run_forge_tests(args.project)
|
|
coverage = run_forge_coverage(args.project)
|
|
keys = scan_key_leaks(args.project)
|
|
|
|
report = build_report(args.project, slither, aderyn, mythril, tests, coverage, keys, args.min_coverage)
|
|
with open(args.output, "w") as fh:
|
|
json.dump(report, fh, indent=2, default=str)
|
|
logger.info("Audit: %d findings (%d high/critical) | gate=%s",
|
|
report["combined_findings"], report["critical_high_findings"], report["deploy_gate"])
|
|
if report["gate_failures"]:
|
|
logger.warning("Gate failures: %s", "; ".join(report["gate_failures"]))
|
|
print(json.dumps(report, indent=2, default=str))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|