mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
189 lines
7.0 KiB
Python
189 lines
7.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Agent for analyzing Windows Prefetch files with Python.
|
|
|
|
Parses Prefetch (.pf) files to reconstruct execution history,
|
|
detect renamed/masquerading binaries, and identify suspicious
|
|
tool execution using the windowsprefetch library.
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import windowsprefetch
|
|
except ImportError:
|
|
windowsprefetch = None
|
|
|
|
SUSPICIOUS_EXECUTABLES = {
|
|
"mimikatz", "psexec", "psexesvc", "procdump", "lazagne",
|
|
"rubeus", "sharphound", "bloodhound", "cobalt", "beacon",
|
|
"meterpreter", "powersploit", "empire", "covenant",
|
|
"secretsdump", "wce", "fgdump", "pwdump", "gsecdump",
|
|
"certutil", "bitsadmin", "mshta", "regsvr32", "rundll32",
|
|
"wscript", "cscript", "msiexec", "installutil",
|
|
}
|
|
|
|
LOLBINS = {
|
|
"certutil.exe", "bitsadmin.exe", "mshta.exe", "regsvr32.exe",
|
|
"rundll32.exe", "wscript.exe", "cscript.exe", "msiexec.exe",
|
|
"installutil.exe", "regasm.exe", "regsvcs.exe", "msconfig.exe",
|
|
"esentutl.exe", "expand.exe", "extrac32.exe", "findstr.exe",
|
|
"hh.exe", "ie4uinit.exe", "makecab.exe", "replace.exe",
|
|
}
|
|
|
|
|
|
class PrefetchAnalyzer:
|
|
"""Analyzes Windows Prefetch files for forensic investigation."""
|
|
|
|
def __init__(self, output_dir="./prefetch_analysis"):
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.findings = []
|
|
self.executions = []
|
|
|
|
def parse_prefetch_file(self, pf_path):
|
|
"""Parse a single Prefetch file and extract execution data."""
|
|
if windowsprefetch is None:
|
|
raise RuntimeError("windowsprefetch not installed: pip install windowsprefetch")
|
|
try:
|
|
pf = windowsprefetch.Prefetch(pf_path)
|
|
except Exception:
|
|
return None
|
|
|
|
timestamps = []
|
|
if hasattr(pf, "lastRunTime"):
|
|
timestamps.append(str(pf.lastRunTime))
|
|
if hasattr(pf, "timestamps"):
|
|
timestamps.extend([str(t) for t in pf.timestamps])
|
|
|
|
resources = []
|
|
if hasattr(pf, "resources"):
|
|
resources = pf.resources if isinstance(pf.resources, list) else []
|
|
elif hasattr(pf, "filenames"):
|
|
resources = pf.filenames if isinstance(pf.filenames, list) else []
|
|
|
|
volumes = []
|
|
if hasattr(pf, "volumes"):
|
|
for v in pf.volumes:
|
|
volumes.append({
|
|
"name": getattr(v, "name", str(v)),
|
|
"serial": getattr(v, "serialNumber", ""),
|
|
})
|
|
|
|
entry = {
|
|
"file": str(pf_path),
|
|
"executable": pf.executableName if hasattr(pf, "executableName") else Path(pf_path).stem,
|
|
"run_count": pf.runCount if hasattr(pf, "runCount") else 0,
|
|
"last_run_time": timestamps[0] if timestamps else "",
|
|
"all_timestamps": timestamps,
|
|
"pf_hash": Path(pf_path).stem.split("-")[-1] if "-" in Path(pf_path).stem else "",
|
|
"resources_count": len(resources),
|
|
"volumes": volumes,
|
|
"file_size": os.path.getsize(pf_path),
|
|
"file_sha256": self._hash_file(pf_path),
|
|
}
|
|
self.executions.append(entry)
|
|
return entry
|
|
|
|
def _hash_file(self, path):
|
|
h = hashlib.sha256()
|
|
with open(path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
def parse_directory(self, prefetch_dir):
|
|
"""Parse all .pf files in a directory."""
|
|
pf_dir = Path(prefetch_dir)
|
|
pf_files = sorted(pf_dir.glob("*.pf"), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
for pf_file in pf_files:
|
|
self.parse_prefetch_file(str(pf_file))
|
|
return len(pf_files)
|
|
|
|
def detect_suspicious(self):
|
|
"""Flag known attack tools and LOLBins."""
|
|
for entry in self.executions:
|
|
exe = entry["executable"].lower()
|
|
exe_base = exe.replace(".exe", "")
|
|
if exe_base in SUSPICIOUS_EXECUTABLES:
|
|
self.findings.append({
|
|
"severity": "critical", "type": "Attack Tool Executed",
|
|
"detail": f"{entry['executable']} run {entry['run_count']} times, "
|
|
f"last: {entry['last_run_time']}",
|
|
})
|
|
elif exe in LOLBINS:
|
|
if entry["run_count"] > 10:
|
|
self.findings.append({
|
|
"severity": "medium", "type": "LOLBin High Usage",
|
|
"detail": f"{entry['executable']} run {entry['run_count']} times",
|
|
})
|
|
|
|
def detect_renamed_binaries(self):
|
|
"""Detect potential binary renaming/masquerading."""
|
|
for entry in self.executions:
|
|
exe = entry["executable"].upper()
|
|
pf_name = Path(entry["file"]).stem.upper()
|
|
expected_prefix = exe.replace(".EXE", "")
|
|
if not pf_name.startswith(expected_prefix):
|
|
self.findings.append({
|
|
"severity": "high", "type": "Possible Renamed Binary",
|
|
"detail": f"PF name '{pf_name}' does not match executable '{exe}'",
|
|
})
|
|
|
|
def build_timeline(self):
|
|
"""Build chronological execution timeline."""
|
|
timeline = []
|
|
for entry in self.executions:
|
|
for ts in entry["all_timestamps"]:
|
|
if ts:
|
|
timeline.append({
|
|
"timestamp": ts,
|
|
"executable": entry["executable"],
|
|
"run_count": entry["run_count"],
|
|
})
|
|
timeline.sort(key=lambda x: x["timestamp"], reverse=True)
|
|
return timeline[:100]
|
|
|
|
def generate_report(self, prefetch_dir):
|
|
count = self.parse_directory(prefetch_dir)
|
|
self.detect_suspicious()
|
|
self.detect_renamed_binaries()
|
|
timeline = self.build_timeline()
|
|
|
|
report = {
|
|
"report_date": datetime.utcnow().isoformat(),
|
|
"prefetch_dir": str(prefetch_dir),
|
|
"total_prefetch_files": count,
|
|
"total_unique_executables": len(self.executions),
|
|
"execution_history": self.executions,
|
|
"execution_timeline": timeline[:50],
|
|
"findings": self.findings,
|
|
"total_findings": len(self.findings),
|
|
}
|
|
out = self.output_dir / "prefetch_analysis_report.json"
|
|
with open(out, "w") as f:
|
|
json.dump(report, f, indent=2, default=str)
|
|
print(json.dumps(report, indent=2, default=str))
|
|
return report
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze Windows Prefetch files for execution forensics"
|
|
)
|
|
parser.add_argument("prefetch_dir", help="Path to directory containing .pf files")
|
|
parser.add_argument("--output-dir", default="./prefetch_analysis")
|
|
args = parser.parse_args()
|
|
|
|
os.makedirs(args.output_dir, exist_ok=True)
|
|
analyzer = PrefetchAnalyzer(output_dir=args.output_dir)
|
|
analyzer.generate_report(args.prefetch_dir)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|