mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
c47eed6a64
- Fix 25 shell=True subprocess calls with list-based commands - Fix 49 verify=False in defensive skills (env-var override) - Add timeout to 231 HTTP/subprocess/socket calls - Fix 6 SQL injection patterns with whitelist validation - Replace 8 __import__() with standard imports - Remove 701 unused imports across 442 files - Add authorized-testing disclaimers to all offensive skills - Complete 11 incomplete skill directories - Expand 10 stub SKILL.md files with full content - Fix 2 YAML parse errors in frontmatter - Fix 5 pre-existing syntax errors - Convert 22 hardcoded paths/ports to environment variables - Back up 21 redundant skill pairs to .bak - Fix 2 global declaration errors - 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE) - 0 compile errors across all 724 agent.py files
150 lines
5.3 KiB
Python
150 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
# For authorized testing in lab/CTF environments only
|
|
"""BGP hijacking assessment agent for monitoring route origin validation and RPKI status."""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import List
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
sys.exit("requests is required: pip install requests")
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
RIPESTAT_BASE = "https://stat.ripe.net/data"
|
|
|
|
|
|
def check_rpki_status(prefix: str, asn: int) -> dict:
|
|
"""Check RPKI validation status for a prefix-origin pair via RIPEstat."""
|
|
url = f"{RIPESTAT_BASE}/rpki-validation/data.json"
|
|
params = {"resource": f"AS{asn}", "prefix": prefix}
|
|
resp = requests.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
data = resp.json().get("data", {})
|
|
return {
|
|
"prefix": prefix,
|
|
"asn": asn,
|
|
"status": data.get("status", "unknown"),
|
|
"validating_roas": data.get("validating_roas", []),
|
|
}
|
|
|
|
|
|
def get_announced_prefixes(asn: int) -> List[dict]:
|
|
"""Get prefixes currently announced by an ASN via RIPEstat."""
|
|
url = f"{RIPESTAT_BASE}/announced-prefixes/data.json"
|
|
params = {"resource": f"AS{asn}"}
|
|
resp = requests.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
prefixes = resp.json().get("data", {}).get("prefixes", [])
|
|
logger.info("AS%d announces %d prefixes", asn, len(prefixes))
|
|
return prefixes
|
|
|
|
|
|
def get_routing_status(prefix: str) -> dict:
|
|
"""Get current routing status for a prefix via RIPEstat."""
|
|
url = f"{RIPESTAT_BASE}/routing-status/data.json"
|
|
params = {"resource": prefix}
|
|
resp = requests.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
data = resp.json().get("data", {})
|
|
return {
|
|
"prefix": prefix,
|
|
"first_seen": data.get("first_seen", {}).get("time", ""),
|
|
"last_seen": data.get("last_seen", {}).get("time", ""),
|
|
"visibility": data.get("visibility", {}).get("v4", {}).get("total_ris_peers", 0),
|
|
"origins": [str(o.get("asn", "")) for o in data.get("origins", [])],
|
|
}
|
|
|
|
|
|
def check_roas(prefix: str) -> List[dict]:
|
|
"""Query ROA (Route Origin Authorization) records for a prefix."""
|
|
url = f"{RIPESTAT_BASE}/rpki-validation/data.json"
|
|
params = {"resource": prefix}
|
|
resp = requests.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
return resp.json().get("data", {}).get("validating_roas", [])
|
|
|
|
|
|
def get_bgp_looking_glass(prefix: str) -> dict:
|
|
"""Query BGP looking glass for current route advertisements."""
|
|
url = f"{RIPESTAT_BASE}/looking-glass/data.json"
|
|
params = {"resource": prefix}
|
|
resp = requests.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
rrcs = resp.json().get("data", {}).get("rrcs", [])
|
|
as_paths = []
|
|
for rrc in rrcs:
|
|
for peer in rrc.get("peers", []):
|
|
as_paths.append({
|
|
"rrc": rrc.get("rrc", ""),
|
|
"peer_asn": peer.get("asn_origin", ""),
|
|
"as_path": peer.get("as_path", ""),
|
|
"prefix": peer.get("prefix", ""),
|
|
})
|
|
return {"prefix": prefix, "routes": as_paths[:20]}
|
|
|
|
|
|
def assess_hijack_resilience(asn: int) -> dict:
|
|
"""Run a full BGP hijacking resilience assessment for an organization's ASN."""
|
|
logger.info("Assessing BGP hijack resilience for AS%d", asn)
|
|
prefixes = get_announced_prefixes(asn)
|
|
results = []
|
|
for p in prefixes:
|
|
prefix = p.get("prefix", "")
|
|
rpki = check_rpki_status(prefix, asn)
|
|
routing = get_routing_status(prefix)
|
|
multi_origin = len(routing.get("origins", [])) > 1
|
|
results.append({
|
|
"prefix": prefix,
|
|
"rpki_status": rpki["status"],
|
|
"roas": rpki["validating_roas"],
|
|
"origins_count": len(routing.get("origins", [])),
|
|
"multi_origin_conflict": multi_origin,
|
|
"visibility_peers": routing["visibility"],
|
|
})
|
|
|
|
unprotected = [r for r in results if r["rpki_status"] != "valid"]
|
|
conflicts = [r for r in results if r["multi_origin_conflict"]]
|
|
|
|
return {
|
|
"assessment_date": datetime.utcnow().isoformat(),
|
|
"asn": asn,
|
|
"total_prefixes": len(prefixes),
|
|
"rpki_valid": len(results) - len(unprotected),
|
|
"rpki_unprotected": len(unprotected),
|
|
"multi_origin_conflicts": len(conflicts),
|
|
"prefix_details": results,
|
|
"risk_summary": [],
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="BGP Hijacking Assessment Agent")
|
|
parser.add_argument("--asn", type=int, required=True, help="Target ASN number")
|
|
parser.add_argument("--prefix", help="Specific prefix to check")
|
|
parser.add_argument("--output", default="bgp_assessment.json")
|
|
args = parser.parse_args()
|
|
|
|
if args.prefix:
|
|
rpki = check_rpki_status(args.prefix, args.asn)
|
|
routing = get_routing_status(args.prefix)
|
|
lg = get_bgp_looking_glass(args.prefix)
|
|
report = {"rpki": rpki, "routing": routing, "looking_glass": lg}
|
|
else:
|
|
report = assess_hijack_resilience(args.asn)
|
|
|
|
with open(args.output, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
logger.info("Report saved to %s", args.output)
|
|
print(json.dumps(report, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|