Files
mukul975 c47eed6a64 Production hardening: security fixes, code quality, 724 skills complete
- Fix 25 shell=True subprocess calls with list-based commands
- Fix 49 verify=False in defensive skills (env-var override)
- Add timeout to 231 HTTP/subprocess/socket calls
- Fix 6 SQL injection patterns with whitelist validation
- Replace 8 __import__() with standard imports
- Remove 701 unused imports across 442 files
- Add authorized-testing disclaimers to all offensive skills
- Complete 11 incomplete skill directories
- Expand 10 stub SKILL.md files with full content
- Fix 2 YAML parse errors in frontmatter
- Fix 5 pre-existing syntax errors
- Convert 22 hardcoded paths/ports to environment variables
- Back up 21 redundant skill pairs to .bak
- Fix 2 global declaration errors
- 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE)
- 0 compile errors across all 724 agent.py files
2026-03-19 13:26:49 +01:00

150 lines
5.3 KiB
Python

#!/usr/bin/env python3
# For authorized testing in lab/CTF environments only
"""BGP hijacking assessment agent for monitoring route origin validation and RPKI status."""
import argparse
import json
import logging
import sys
from datetime import datetime
from typing import List
try:
import requests
except ImportError:
sys.exit("requests is required: pip install requests")
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
RIPESTAT_BASE = "https://stat.ripe.net/data"
def check_rpki_status(prefix: str, asn: int) -> dict:
"""Check RPKI validation status for a prefix-origin pair via RIPEstat."""
url = f"{RIPESTAT_BASE}/rpki-validation/data.json"
params = {"resource": f"AS{asn}", "prefix": prefix}
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json().get("data", {})
return {
"prefix": prefix,
"asn": asn,
"status": data.get("status", "unknown"),
"validating_roas": data.get("validating_roas", []),
}
def get_announced_prefixes(asn: int) -> List[dict]:
"""Get prefixes currently announced by an ASN via RIPEstat."""
url = f"{RIPESTAT_BASE}/announced-prefixes/data.json"
params = {"resource": f"AS{asn}"}
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
prefixes = resp.json().get("data", {}).get("prefixes", [])
logger.info("AS%d announces %d prefixes", asn, len(prefixes))
return prefixes
def get_routing_status(prefix: str) -> dict:
"""Get current routing status for a prefix via RIPEstat."""
url = f"{RIPESTAT_BASE}/routing-status/data.json"
params = {"resource": prefix}
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json().get("data", {})
return {
"prefix": prefix,
"first_seen": data.get("first_seen", {}).get("time", ""),
"last_seen": data.get("last_seen", {}).get("time", ""),
"visibility": data.get("visibility", {}).get("v4", {}).get("total_ris_peers", 0),
"origins": [str(o.get("asn", "")) for o in data.get("origins", [])],
}
def check_roas(prefix: str) -> List[dict]:
"""Query ROA (Route Origin Authorization) records for a prefix."""
url = f"{RIPESTAT_BASE}/rpki-validation/data.json"
params = {"resource": prefix}
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json().get("data", {}).get("validating_roas", [])
def get_bgp_looking_glass(prefix: str) -> dict:
"""Query BGP looking glass for current route advertisements."""
url = f"{RIPESTAT_BASE}/looking-glass/data.json"
params = {"resource": prefix}
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
rrcs = resp.json().get("data", {}).get("rrcs", [])
as_paths = []
for rrc in rrcs:
for peer in rrc.get("peers", []):
as_paths.append({
"rrc": rrc.get("rrc", ""),
"peer_asn": peer.get("asn_origin", ""),
"as_path": peer.get("as_path", ""),
"prefix": peer.get("prefix", ""),
})
return {"prefix": prefix, "routes": as_paths[:20]}
def assess_hijack_resilience(asn: int) -> dict:
"""Run a full BGP hijacking resilience assessment for an organization's ASN."""
logger.info("Assessing BGP hijack resilience for AS%d", asn)
prefixes = get_announced_prefixes(asn)
results = []
for p in prefixes:
prefix = p.get("prefix", "")
rpki = check_rpki_status(prefix, asn)
routing = get_routing_status(prefix)
multi_origin = len(routing.get("origins", [])) > 1
results.append({
"prefix": prefix,
"rpki_status": rpki["status"],
"roas": rpki["validating_roas"],
"origins_count": len(routing.get("origins", [])),
"multi_origin_conflict": multi_origin,
"visibility_peers": routing["visibility"],
})
unprotected = [r for r in results if r["rpki_status"] != "valid"]
conflicts = [r for r in results if r["multi_origin_conflict"]]
return {
"assessment_date": datetime.utcnow().isoformat(),
"asn": asn,
"total_prefixes": len(prefixes),
"rpki_valid": len(results) - len(unprotected),
"rpki_unprotected": len(unprotected),
"multi_origin_conflicts": len(conflicts),
"prefix_details": results,
"risk_summary": [],
}
def main():
parser = argparse.ArgumentParser(description="BGP Hijacking Assessment Agent")
parser.add_argument("--asn", type=int, required=True, help="Target ASN number")
parser.add_argument("--prefix", help="Specific prefix to check")
parser.add_argument("--output", default="bgp_assessment.json")
args = parser.parse_args()
if args.prefix:
rpki = check_rpki_status(args.prefix, args.asn)
routing = get_routing_status(args.prefix)
lg = get_bgp_looking_glass(args.prefix)
report = {"rpki": rpki, "routing": routing, "looking_glass": lg}
else:
report = assess_hijack_resilience(args.asn)
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
logger.info("Report saved to %s", args.output)
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()