Expand boilerplate agent.py stubs with real implementations (batch 2)

This commit is contained in:
mukul975
2026-03-19 13:44:30 +01:00
parent 3803da65d5
commit 7e2e6ad664
2 changed files with 491 additions and 93 deletions
@@ -1,61 +1,244 @@
#!/usr/bin/env python3
"""Rapid7 InsightVM scanning configuration audit."""
import argparse, json
"""Rapid7 InsightVM vulnerability scanning agent.
Interfaces with the InsightVM (Nexpose) REST API to manage scan
configurations, launch scans, retrieve vulnerability results, and
generate remediation reports. Supports site management, asset
discovery, and vulnerability prioritization.
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
try:
import requests
from requests.auth import HTTPBasicAuth
except ImportError:
requests = None
print("[!] 'requests' required: pip install requests", file=sys.stderr)
sys.exit(1)
def audit_config(target, token):
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/status", headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
if not data.get("enabled", True):
findings.append({"check": "Service Status", "status": "DISABLED", "severity": "CRITICAL"})
elif resp.status_code == 401:
findings.append({"check": "Authentication", "status": "UNAUTHORIZED", "severity": "HIGH"})
except requests.RequestException as e:
findings.append({"error": str(e)})
return findings
def check_compliance(target, token):
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(f"{target}/api/v1/compliance", headers=headers, timeout=10)
if resp.status_code == 200:
for item in resp.json().get("checks", []):
if item.get("status") != "PASS":
findings.append({"check": item.get("name"), "status": item.get("status"),
"severity": item.get("severity", "MEDIUM")})
except requests.RequestException:
pass
return findings
def get_insightvm_config():
"""Return InsightVM API connection config."""
host = os.environ.get("INSIGHTVM_HOST", "localhost")
port = os.environ.get("INSIGHTVM_PORT", "3780")
user = os.environ.get("INSIGHTVM_USER", "")
password = os.environ.get("INSIGHTVM_PASSWORD", "")
return f"https://{host}:{port}", user, password
def api_call(base_url, endpoint, user, password, method="GET",
data=None, params=None):
"""Make authenticated API call to InsightVM."""
url = f"{base_url}/api/3{endpoint}"
auth = HTTPBasicAuth(user, password)
headers = {"Content-Type": "application/json", "Accept": "application/json"}
if method == "POST":
resp = requests.post(url, auth=auth, headers=headers, json=data,
params=params, verify=False, timeout=60)
elif method == "PUT":
resp = requests.put(url, auth=auth, headers=headers, json=data,
verify=False, timeout=60)
else:
resp = requests.get(url, auth=auth, headers=headers, params=params,
verify=False, timeout=60)
resp.raise_for_status()
return resp.json()
def list_sites(base_url, user, password):
"""List all scan sites."""
print("[*] Listing sites...")
data = api_call(base_url, "/sites", user, password, params={"size": 500})
sites = []
for site in data.get("resources", []):
sites.append({
"id": site.get("id"),
"name": site.get("name", ""),
"description": site.get("description", ""),
"type": site.get("type", ""),
"assets": site.get("assets", 0),
"last_scan_time": site.get("lastScanTime", ""),
"risk_score": site.get("riskScore", 0),
})
print(f"[+] Found {len(sites)} sites")
return sites
def get_site_vulnerabilities(base_url, user, password, site_id):
"""Get vulnerabilities for a specific site."""
print(f"[*] Fetching vulnerabilities for site {site_id}...")
vulns = []
page = 0
while True:
data = api_call(base_url, f"/sites/{site_id}/vulnerabilities",
user, password, params={"page": page, "size": 100})
resources = data.get("resources", [])
if not resources:
break
for v in resources:
vulns.append({
"id": v.get("id", ""),
"title": v.get("title", ""),
"severity": v.get("severity", ""),
"cvss_v3_score": v.get("cvss", {}).get("v3", {}).get("score", 0),
"risk_score": v.get("riskScore", 0),
"instances": v.get("instances", 0),
"status": v.get("status", ""),
})
page += 1
total_pages = data.get("page", {}).get("totalPages", 1)
if page >= total_pages:
break
print(f"[+] Retrieved {len(vulns)} vulnerabilities")
return vulns
def launch_scan(base_url, user, password, site_id, scan_name=None):
"""Launch a scan on a site."""
if not scan_name:
scan_name = f"agent-scan-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}"
print(f"[*] Launching scan '{scan_name}' on site {site_id}...")
data = api_call(base_url, f"/sites/{site_id}/scans", user, password,
method="POST", data={"name": scan_name})
scan_id = data.get("id")
print(f"[+] Scan started, ID: {scan_id}")
return scan_id
def poll_scan_status(base_url, user, password, scan_id, max_wait=1800):
"""Poll scan status until completion."""
print(f"[*] Waiting for scan {scan_id} to complete...")
elapsed = 0
interval = 30
while elapsed < max_wait:
data = api_call(base_url, f"/scans/{scan_id}", user, password)
status = data.get("status", "unknown")
if status in ("finished", "stopped", "error"):
print(f"[+] Scan {status}")
return status, data
print(f" Status: {status} ({elapsed}s)")
time.sleep(interval)
elapsed += interval
print("[!] Scan timed out")
return "timeout", {}
def get_scan_report(base_url, user, password, scan_id):
"""Get scan results summary."""
data = api_call(base_url, f"/scans/{scan_id}", user, password)
return {
"scan_id": scan_id,
"status": data.get("status", ""),
"start_time": data.get("startTime", ""),
"end_time": data.get("endTime", ""),
"duration": data.get("duration", ""),
"assets_discovered": data.get("assets", 0),
"vulnerabilities": data.get("vulnerabilities", {}),
}
def format_summary(sites, vulns=None, scan_report=None):
"""Print summary."""
print(f"\n{'='*60}")
print(f" Rapid7 InsightVM Report")
print(f"{'='*60}")
if sites:
print(f"\n Sites ({len(sites)}):")
for s in sites:
print(f" {s['name']:30s} | Assets: {s['assets']:5d} | "
f"Risk: {s['risk_score']:8.1f}")
if vulns:
severity_counts = {}
for v in vulns:
sev = v.get("severity", "unknown")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
print(f"\n Vulnerabilities ({len(vulns)}):")
for sev in sorted(severity_counts.keys()):
print(f" {sev:15s}: {severity_counts[sev]}")
if scan_report:
print(f"\n Scan Report:")
print(f" Status : {scan_report.get('status', 'N/A')}")
print(f" Assets : {scan_report.get('assets_discovered', 0)}")
def main():
p = argparse.ArgumentParser(description="Rapid7 InsightVM scanning configuration audit")
p.add_argument("--target", required=True, help="Target URL")
p.add_argument("--token", required=True, help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] Rapid7 InsightVM scanning configuration audit")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "findings": []}
report["findings"].extend(audit_config(a.target, a.token))
report["findings"].extend(check_compliance(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
else:
parser = argparse.ArgumentParser(description="Rapid7 InsightVM scanning agent")
sub = parser.add_subparsers(dest="command")
sub.add_parser("list-sites", help="List scan sites")
p_vulns = sub.add_parser("get-vulns", help="Get site vulnerabilities")
p_vulns.add_argument("--site-id", required=True, type=int)
p_scan = sub.add_parser("scan", help="Launch a scan")
p_scan.add_argument("--site-id", required=True, type=int)
p_scan.add_argument("--wait", action="store_true", help="Wait for completion")
p_scan.add_argument("--max-wait", type=int, default=1800)
parser.add_argument("--host", help="InsightVM host (or INSIGHTVM_HOST env)")
parser.add_argument("--user", help="Username (or INSIGHTVM_USER env)")
parser.add_argument("--password", help="Password (or INSIGHTVM_PASSWORD env)")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
if args.host:
os.environ["INSIGHTVM_HOST"] = args.host
if args.user:
os.environ["INSIGHTVM_USER"] = args.user
if args.password:
os.environ["INSIGHTVM_PASSWORD"] = args.password
base_url, user, password = get_insightvm_config()
if not user or not password:
print("[!] Set INSIGHTVM_USER and INSIGHTVM_PASSWORD", file=sys.stderr)
sys.exit(1)
result = {}
if args.command == "list-sites":
sites = list_sites(base_url, user, password)
format_summary(sites)
result = {"sites": sites}
elif args.command == "get-vulns":
vulns = get_site_vulnerabilities(base_url, user, password, args.site_id)
format_summary([], vulns)
result = {"site_id": args.site_id, "vulnerabilities": vulns}
elif args.command == "scan":
scan_id = launch_scan(base_url, user, password, args.site_id)
if args.wait:
status, data = poll_scan_status(base_url, user, password, scan_id, args.max_wait)
scan_report = get_scan_report(base_url, user, password, scan_id)
format_summary([], scan_report=scan_report)
result = {"scan_id": scan_id, "report": scan_report}
else:
result = {"scan_id": scan_id, "status": "launched"}
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "Rapid7 InsightVM",
"command": args.command,
"result": result,
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
@@ -1,60 +1,275 @@
#!/usr/bin/env python3
"""LaZagne credential access detection agent."""
import argparse, json
"""LaZagne credential access detection agent.
Detects evidence of LaZagne credential harvesting tool execution on
endpoints by scanning for process artifacts, file system indicators,
and Windows event log entries associated with credential dumping.
Used for defensive detection and incident response, not offensive use.
"""
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
try:
import requests
except ImportError:
requests = None
def run_scan(target, token=None):
LAZAGNE_INDICATORS = {
"file_paths": [
"lazagne.exe", "LaZagne.exe", "laZagne.py",
"lazagne_output.txt", "credentials.txt",
],
"process_names": [
"lazagne.exe", "python.exe lazagne",
],
"registry_keys": [
r"SOFTWARE\Microsoft\Windows\CurrentVersion\Run",
],
"event_ids": {
4688: "Process creation (look for lazagne.exe or suspicious python)",
4663: "File access audit (credential store access)",
4656: "Handle request to credential objects",
},
"credential_stores": [
# Windows
r"%APPDATA%\Mozilla\Firefox\Profiles",
r"%LOCALAPPDATA%\Google\Chrome\User Data\Default\Login Data",
r"%APPDATA%\Opera Software\Opera Stable\Login Data",
# Linux
os.path.expanduser("~/.mozilla/firefox"),
os.path.expanduser("~/.config/google-chrome/Default/Login Data"),
"/etc/shadow",
os.path.expanduser("~/.ssh"),
],
}
def scan_filesystem_indicators(search_paths=None):
"""Scan file system for LaZagne artifacts."""
findings = []
if not requests: return [{"error": "requests required"}]
headers = {"Authorization": f"Bearer {token}"} if token else {}
try:
resp = requests.get(f"{target}", headers=headers, timeout=15)
if resp.status_code == 200:
findings.append({"check": "Target Accessible", "status": "OK", "severity": "INFO"})
if search_paths is None:
if sys.platform == "win32":
search_paths = [
os.environ.get("TEMP", "C:\\Temp"),
os.environ.get("USERPROFILE", "C:\\Users\\Default"),
"C:\\Windows\\Temp",
]
else:
findings.append({"check": "Target Access", "status": f"HTTP {resp.status_code}", "severity": "MEDIUM"})
except requests.RequestException as e:
findings.append({"error": str(e)})
search_paths = ["/tmp", "/var/tmp", os.path.expanduser("~")]
print("[*] Scanning filesystem for LaZagne indicators...")
for search_path in search_paths:
if not os.path.isdir(search_path):
continue
for root, dirs, files in os.walk(search_path):
for fname in files:
fname_lower = fname.lower()
for indicator in LAZAGNE_INDICATORS["file_paths"]:
if indicator.lower() in fname_lower:
full_path = os.path.join(root, fname)
stat = os.stat(full_path)
findings.append({
"type": "file_indicator",
"path": full_path,
"indicator": indicator,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(),
"severity": "CRITICAL",
"description": f"LaZagne artifact found: {fname}",
})
# Limit directory depth to avoid slow scans
if root.count(os.sep) - search_path.count(os.sep) > 3:
dirs.clear()
return findings
def analyze_results(target, token=None):
def check_windows_event_logs():
"""Check Windows Security event logs for LaZagne-related activity."""
findings = []
if not requests: return []
headers = {"Authorization": f"Bearer {token}"} if token else {}
try:
resp = requests.get(f"{target}/api/v1/results", headers=headers, timeout=15)
if resp.status_code == 200:
data = resp.json()
for item in data.get("findings", data.get("results", [])):
severity = item.get("severity", item.get("risk", "MEDIUM"))
findings.append({"check": item.get("name", item.get("title", "unknown")),
"severity": severity.upper() if isinstance(severity, str) else "MEDIUM"})
except requests.RequestException:
pass
if sys.platform != "win32":
return findings
print("[*] Checking Windows Security event logs...")
# Check for process creation events matching LaZagne
ps_script = """
Get-WinEvent -FilterHashtable @{
LogName='Security'; Id=4688; StartTime=(Get-Date).AddDays(-7)
} -MaxEvents 1000 -ErrorAction SilentlyContinue |
Where-Object {
$_.Properties[5].Value -match 'lazagne|LaZagne' -or
$_.Properties[8].Value -match 'lazagne|LaZagne'
} |
Select-Object TimeCreated, @{N='ProcessName';E={$_.Properties[5].Value}},
@{N='CommandLine';E={$_.Properties[8].Value}},
@{N='User';E={$_.Properties[1].Value}} |
ConvertTo-Json
"""
result = subprocess.run(
["powershell", "-Command", ps_script],
capture_output=True, text=True, timeout=60,
)
if result.returncode == 0 and result.stdout.strip():
try:
events = json.loads(result.stdout)
if isinstance(events, dict):
events = [events]
for evt in events:
findings.append({
"type": "event_log",
"event_id": 4688,
"time": str(evt.get("TimeCreated", "")),
"process": evt.get("ProcessName", ""),
"command_line": evt.get("CommandLine", "")[:200],
"user": evt.get("User", ""),
"severity": "CRITICAL",
"description": "LaZagne process execution detected in event logs",
})
except json.JSONDecodeError:
pass
# Check for credential file access (Event ID 4663)
ps_script2 = """
Get-WinEvent -FilterHashtable @{
LogName='Security'; Id=4663; StartTime=(Get-Date).AddDays(-7)
} -MaxEvents 500 -ErrorAction SilentlyContinue |
Where-Object {
$_.Properties[6].Value -match 'Login Data|logins.json|Credentials|vault'
} |
Select-Object TimeCreated, @{N='ObjectName';E={$_.Properties[6].Value}},
@{N='ProcessName';E={$_.Properties[11].Value}} |
ConvertTo-Json
"""
result = subprocess.run(
["powershell", "-Command", ps_script2],
capture_output=True, text=True, timeout=60,
)
if result.returncode == 0 and result.stdout.strip():
try:
events = json.loads(result.stdout)
if isinstance(events, dict):
events = [events]
for evt in events:
findings.append({
"type": "credential_access",
"event_id": 4663,
"time": str(evt.get("TimeCreated", "")),
"object": evt.get("ObjectName", ""),
"process": evt.get("ProcessName", ""),
"severity": "HIGH",
"description": "Credential store accessed by process",
})
except json.JSONDecodeError:
pass
return findings
def check_credential_store_integrity():
"""Check if credential stores have been recently accessed unusually."""
findings = []
print("[*] Checking credential store integrity...")
stores_to_check = []
if sys.platform == "win32":
appdata = os.environ.get("APPDATA", "")
localappdata = os.environ.get("LOCALAPPDATA", "")
stores_to_check = [
(os.path.join(localappdata, "Google", "Chrome", "User Data", "Default", "Login Data"), "Chrome"),
(os.path.join(appdata, "Mozilla", "Firefox"), "Firefox"),
]
else:
stores_to_check = [
(os.path.expanduser("~/.config/google-chrome/Default/Login Data"), "Chrome"),
(os.path.expanduser("~/.mozilla/firefox"), "Firefox"),
(os.path.expanduser("~/.ssh"), "SSH Keys"),
]
for path, store_name in stores_to_check:
if os.path.exists(path):
if os.path.isfile(path):
stat = os.stat(path)
access_time = datetime.fromtimestamp(stat.st_atime, tz=timezone.utc)
mod_time = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
findings.append({
"type": "credential_store",
"store": store_name,
"path": path,
"last_accessed": access_time.isoformat(),
"last_modified": mod_time.isoformat(),
"severity": "INFO",
})
return findings
def format_summary(all_findings):
"""Print detection summary."""
print(f"\n{'='*60}")
print(f" LaZagne Credential Access Detection Report")
print(f"{'='*60}")
file_findings = [f for f in all_findings if f["type"] == "file_indicator"]
event_findings = [f for f in all_findings if f["type"] in ("event_log", "credential_access")]
store_findings = [f for f in all_findings if f["type"] == "credential_store"]
print(f" File Indicators : {len(file_findings)}")
print(f" Event Log Hits : {len(event_findings)}")
print(f" Credential Stores: {len(store_findings)}")
critical = [f for f in all_findings if f.get("severity") == "CRITICAL"]
if critical:
print(f"\n CRITICAL FINDINGS ({len(critical)}):")
for f in critical:
print(f" [{f['type']}] {f.get('description', f.get('path', ''))}")
severity_counts = {}
for f in all_findings:
sev = f.get("severity", "INFO")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
return severity_counts
def main():
p = argparse.ArgumentParser(description="LaZagne credential access detection agent")
p.add_argument("--target", required=True, help="Target URL or IP")
p.add_argument("--token", help="API token")
p.add_argument("--output", "-o", help="Output JSON report")
p.add_argument("--verbose", "-v", action="store_true")
a = p.parse_args()
print("[*] LaZagne credential access detection agent")
report = {"timestamp": datetime.now(timezone.utc).isoformat(), "target": a.target, "findings": []}
report["findings"].extend(run_scan(a.target, a.token))
report["findings"].extend(analyze_results(a.target, a.token))
high = sum(1 for f in report["findings"] if f.get("severity") in ("HIGH", "CRITICAL"))
report["risk_level"] = "CRITICAL" if high > 2 else "HIGH" if high else "MEDIUM" if report["findings"] else "LOW"
print(f"[*] {len(report['findings'])} findings, risk: {report['risk_level']}")
if a.output:
with open(a.output, "w") as f: json.dump(report, f, indent=2)
else:
parser = argparse.ArgumentParser(
description="LaZagne credential access detection agent (defensive use)"
)
parser.add_argument("--search-paths", nargs="+", help="Paths to scan for artifacts")
parser.add_argument("--skip-events", action="store_true", help="Skip Windows event log check")
parser.add_argument("--skip-stores", action="store_true", help="Skip credential store check")
parser.add_argument("--output", "-o", help="Output JSON report")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
all_findings = []
all_findings.extend(scan_filesystem_indicators(args.search_paths))
if not args.skip_events:
all_findings.extend(check_windows_event_logs())
if not args.skip_stores:
all_findings.extend(check_credential_store_integrity())
severity_counts = format_summary(all_findings)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": "LaZagne Detection",
"findings": all_findings,
"severity_counts": severity_counts,
"risk_level": (
"CRITICAL" if severity_counts.get("CRITICAL", 0) > 0
else "HIGH" if severity_counts.get("HIGH", 0) > 0
else "LOW"
),
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[+] Report saved to {args.output}")
elif args.verbose:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()