diff --git a/skills/analyzing-linux-audit-logs-for-intrusion/LICENSE b/skills/analyzing-linux-audit-logs-for-intrusion/LICENSE new file mode 100644 index 00000000..09d37ad6 --- /dev/null +++ b/skills/analyzing-linux-audit-logs-for-intrusion/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Mahipal + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/skills/analyzing-linux-audit-logs-for-intrusion/SKILL.md b/skills/analyzing-linux-audit-logs-for-intrusion/SKILL.md new file mode 100644 index 00000000..7b83d664 --- /dev/null +++ b/skills/analyzing-linux-audit-logs-for-intrusion/SKILL.md @@ -0,0 +1,18 @@ +--- +name: analyzing-linux-audit-logs-for-intrusion +description: > + Parse and analyze Linux auditd logs to detect intrusion indicators + including unauthorized file access, privilege escalation, syscall + anomalies, and suspicious process execution using ausearch and Python. +domain: cybersecurity +subdomain: log-analysis +tags: [auditd, linux-forensics, syscall-monitoring, intrusion-detection] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Analyzing Linux Audit Logs for Intrusion + +Parse auditd logs to detect file access violations, privilege escalation, +suspicious syscalls, and unauthorized process execution. diff --git a/skills/analyzing-linux-audit-logs-for-intrusion/references/api-reference.md b/skills/analyzing-linux-audit-logs-for-intrusion/references/api-reference.md new file mode 100644 index 00000000..ea9f41a1 --- /dev/null +++ b/skills/analyzing-linux-audit-logs-for-intrusion/references/api-reference.md @@ -0,0 +1,89 @@ +# API Reference: Analyzing Linux Audit Logs for Intrusion + +## Audit Log Location +``` +/var/log/audit/audit.log +``` + +## ausearch CLI +```bash +# Search by key +ausearch -k file_access + +# Search by message type +ausearch -m EXECVE + +# Failed events only +ausearch --success no + +# By user +ausearch -ua 1000 + +# CSV output for Python processing +ausearch --format csv > audit_events.csv + +# By time range +ausearch --start today --end now +ausearch --start 01/15/2025 00:00:00 --end 01/16/2025 00:00:00 +``` + +## aureport CLI +```bash +# Summary report +aureport --summary + +# Authentication report +aureport -au + +# Failed events +aureport --failed + +# Executable report +aureport -x + +# File access report +aureport -f + +# Anomaly report +aureport --anomaly +``` + +## Audit Rules (auditctl) +```bash +# Monitor sensitive files +auditctl -w /etc/passwd -p rwxa -k passwd_access +auditctl -w /etc/shadow -p rwxa -k shadow_access +auditctl -w /etc/sudoers -p rwxa -k sudoers_access + +# Monitor privilege escalation +auditctl -a always,exit -F arch=b64 -S execve -F euid=0 -F uid!=0 -k priv_esc + +# Monitor module loading +auditctl -a always,exit -F arch=b64 -S init_module -S finit_module -k modules + +# Monitor network connections +auditctl -a always,exit -F arch=b64 -S connect -k network_connect +``` + +## Audit Log Fields +| Field | Description | +|-------|------------| +| type | Event type (SYSCALL, PATH, EXECVE, USER_CMD) | +| msg | audit(timestamp:event_id) | +| syscall | System call number | +| uid/euid | User ID / Effective UID | +| comm | Command name | +| exe | Executable path | +| key | Audit rule key | +| success | yes/no | +| name | File path (in PATH records) | + +## Suspicious Syscalls +| Syscall | Concern | +|---------|---------| +| execve | Program execution | +| ptrace | Process debugging/injection | +| init_module | Kernel rootkit loading | +| connect | Outbound connection | +| setuid | Privilege change | +| open_by_handle_at | Container escape | diff --git a/skills/analyzing-linux-audit-logs-for-intrusion/scripts/agent.py b/skills/analyzing-linux-audit-logs-for-intrusion/scripts/agent.py new file mode 100644 index 00000000..62e66024 --- /dev/null +++ b/skills/analyzing-linux-audit-logs-for-intrusion/scripts/agent.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +"""Linux audit log analysis agent for intrusion detection. + +Parses /var/log/audit/audit.log entries to detect privilege escalation, +unauthorized file access, suspicious syscalls, and process execution anomalies. +""" + +import argparse +import json +import os +import re +import sys +import datetime +import collections +import subprocess + + +SUSPICIOUS_SYSCALLS = { + "execve": "Program execution", + "connect": "Network connection", + "bind": "Port binding", + "ptrace": "Process tracing/debugging", + "init_module": "Kernel module loading", + "finit_module": "Kernel module loading", + "delete_module": "Kernel module unloading", + "mount": "Filesystem mount", + "umount2": "Filesystem unmount", + "setuid": "UID change", + "setgid": "GID change", + "sethostname": "Hostname change", + "open_by_handle_at": "File open by handle (container escape)", +} + +SENSITIVE_PATHS = [ + "/etc/passwd", "/etc/shadow", "/etc/sudoers", + "/etc/ssh/sshd_config", "/root/.ssh/authorized_keys", + "/etc/crontab", "/var/spool/cron", +] + +SUSPICIOUS_COMMANDS = [ + "curl", "wget", "nc", "ncat", "nmap", "tcpdump", + "python", "perl", "ruby", "gcc", "cc", "make", + "useradd", "usermod", "groupadd", "visudo", + "iptables", "ip6tables", "nft", +] + + +def parse_audit_log(log_path, max_lines=50000): + """Parse raw audit.log file into structured events.""" + events = [] + current = {} + try: + with open(log_path, "r") as f: + for i, line in enumerate(f): + if i >= max_lines: + break + match = re.match( + r"type=(\S+)\s+msg=audit\((\d+\.\d+):(\d+)\):\s*(.*)", line + ) + if not match: + continue + event_type = match.group(1) + timestamp = float(match.group(2)) + event_id = match.group(3) + data_str = match.group(4) + fields = dict(re.findall(r'(\w+)=("[^"]*"|\S+)', data_str)) + for k, v in fields.items(): + fields[k] = v.strip('"') + event = { + "type": event_type, + "timestamp": datetime.datetime.fromtimestamp(timestamp).isoformat(), + "event_id": event_id, + **fields, + } + events.append(event) + except FileNotFoundError: + return {"error": f"Log file not found: {log_path}"} + return events + + +def detect_privilege_escalation(events): + """Detect privilege escalation indicators in audit events.""" + findings = [] + for e in events: + if e.get("type") == "SYSCALL" and e.get("syscall_name") in ("setuid", "setgid", "execve"): + if e.get("uid") != "0" and e.get("euid") == "0": + findings.append({ + "type": "privilege_escalation", + "detail": f"UID {e.get('uid')} escalated to eUID 0", + "command": e.get("comm", ""), + "exe": e.get("exe", ""), + "timestamp": e.get("timestamp"), + "severity": "CRITICAL", + }) + if e.get("type") == "USER_CMD" and "sudo" in e.get("cmd", "").lower(): + findings.append({ + "type": "sudo_usage", + "user": e.get("acct", e.get("uid", "")), + "command": e.get("cmd", ""), + "timestamp": e.get("timestamp"), + "severity": "MEDIUM", + }) + return findings + + +def detect_file_access(events): + """Detect access to sensitive files.""" + findings = [] + for e in events: + if e.get("type") in ("PATH", "SYSCALL"): + path = e.get("name", e.get("exe", "")) + for sensitive in SENSITIVE_PATHS: + if sensitive in path: + findings.append({ + "type": "sensitive_file_access", + "path": path, + "syscall": e.get("syscall_name", e.get("syscall", "")), + "user": e.get("uid", ""), + "timestamp": e.get("timestamp"), + "severity": "HIGH", + }) + break + return findings + + +def detect_suspicious_commands(events): + """Detect execution of suspicious commands.""" + findings = [] + for e in events: + if e.get("type") in ("EXECVE", "SYSCALL"): + comm = e.get("comm", "").lower() + exe = e.get("exe", "").lower() + for cmd in SUSPICIOUS_COMMANDS: + if cmd in comm or cmd in exe: + findings.append({ + "type": "suspicious_command", + "command": comm, + "exe": exe, + "user": e.get("uid", ""), + "timestamp": e.get("timestamp"), + "severity": "MEDIUM", + }) + break + return findings + + +def run_ausearch(key=None, message_type=None, success=None): + """Run ausearch command and return results.""" + cmd = ["ausearch"] + if key: + cmd.extend(["-k", key]) + if message_type: + cmd.extend(["-m", message_type]) + if success is not None: + cmd.extend(["--success", "yes" if success else "no"]) + cmd.extend(["--format", "csv"]) + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + return {"output": result.stdout[:5000], "exit_code": result.returncode} + except (FileNotFoundError, subprocess.TimeoutExpired) as e: + return {"error": str(e)} + + +def generate_summary(events, findings): + """Generate audit log analysis summary.""" + event_types = collections.Counter(e.get("type") for e in events) + finding_types = collections.Counter(f.get("type") for f in findings) + severity_counts = collections.Counter(f.get("severity") for f in findings) + return { + "total_events": len(events), + "event_types": dict(event_types.most_common(10)), + "total_findings": len(findings), + "finding_types": dict(finding_types), + "by_severity": dict(severity_counts), + } + + +def main(): + parser = argparse.ArgumentParser(description="Linux audit log intrusion detection agent") + parser.add_argument("log_file", nargs="?", default="/var/log/audit/audit.log", + help="Path to audit.log (default: /var/log/audit/audit.log)") + parser.add_argument("--max-lines", type=int, default=50000, help="Max log lines to parse") + parser.add_argument("--ausearch-key", help="Run ausearch with this key") + parser.add_argument("--output", "-o", help="Output JSON report path") + args = parser.parse_args() + + print("[*] Linux Audit Log Intrusion Detection Agent") + + if args.ausearch_key: + result = run_ausearch(key=args.ausearch_key) + print(json.dumps(result, indent=2)) + sys.exit(0) + + events = parse_audit_log(args.log_file, args.max_lines) + if isinstance(events, dict) and "error" in events: + print(f"[!] {events['error']}") + print("[DEMO] Specify a valid audit.log path or run on a Linux system") + print(json.dumps({"demo": True, "monitored_syscalls": len(SUSPICIOUS_SYSCALLS)}, indent=2)) + sys.exit(0) + + findings = [] + findings.extend(detect_privilege_escalation(events)) + findings.extend(detect_file_access(events)) + findings.extend(detect_suspicious_commands(events)) + + summary = generate_summary(events, findings) + print(f"[*] Events parsed: {summary['total_events']}") + print(f"[*] Findings: {summary['total_findings']}") + print(f" By severity: {summary['by_severity']}") + for f in findings[:15]: + print(f" [{f['severity']}] {f['type']}: {f.get('command', f.get('path', ''))}") + + if args.output: + report = {"summary": summary, "findings": findings} + with open(args.output, "w") as f: + json.dump(report, f, indent=2, default=str) + + print(json.dumps(summary, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/skills/detecting-oauth-token-theft/LICENSE b/skills/detecting-oauth-token-theft/LICENSE new file mode 100644 index 00000000..09d37ad6 --- /dev/null +++ b/skills/detecting-oauth-token-theft/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Mahipal + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/skills/detecting-oauth-token-theft/SKILL.md b/skills/detecting-oauth-token-theft/SKILL.md new file mode 100644 index 00000000..e4ede345 --- /dev/null +++ b/skills/detecting-oauth-token-theft/SKILL.md @@ -0,0 +1,18 @@ +--- +name: detecting-oauth-token-theft +description: > + Detect OAuth access token theft and misuse by analyzing sign-in logs for + impossible travel, new device patterns, token replay from unusual IPs, + and anomalous scope requests via Microsoft Graph and Okta APIs. +domain: cybersecurity +subdomain: identity-security +tags: [oauth, token-theft, identity-attacks, impossible-travel] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Detecting OAuth Token Theft + +Analyze OAuth sign-in telemetry for indicators of token theft including +impossible travel, device fingerprint changes, and token replay attacks. diff --git a/skills/detecting-oauth-token-theft/references/api-reference.md b/skills/detecting-oauth-token-theft/references/api-reference.md new file mode 100644 index 00000000..b7cabfd1 --- /dev/null +++ b/skills/detecting-oauth-token-theft/references/api-reference.md @@ -0,0 +1,51 @@ +# API Reference: Detecting OAuth Token Theft + +## Microsoft Graph Sign-In Logs +```bash +# Query sign-in logs +curl -H "Authorization: Bearer $MS_TOKEN" \ + "https://graph.microsoft.com/v1.0/auditLogs/signIns?\$filter=createdDateTime ge 2025-01-01&\$top=100" +``` + +### Sign-In Event Fields +| Field | Description | +|-------|------------| +| userPrincipalName | User email/UPN | +| ipAddress | Source IP address | +| location.city | Geo city | +| location.geoCoordinates | Lat/lon | +| deviceDetail.deviceId | Device identifier | +| resourceDisplayName | Target resource | +| status.errorCode | 0 = success | +| riskState | none, confirmedCompromised, remediated | + +## Okta System Log API +```bash +# Query events +curl -H "Authorization: SSWS $OKTA_TOKEN" \ + "https://your-org.okta.com/api/v1/logs?filter=eventType eq \"user.session.start\"&since=2025-01-01" +``` + +## Detection Logic +| Detection | Method | +|-----------|--------| +| Impossible travel | Haversine distance / time > 900 km/h | +| Token replay | Same user, 3+ IPs within 5 min window | +| New device | Device ID not in known device inventory | +| Suspicious scopes | 2+ sensitive OAuth scopes requested | + +## Sensitive OAuth Scopes (Microsoft) +| Scope | Risk | +|-------|------| +| Mail.ReadWrite | Email access | +| Mail.Send | Send-as capability | +| Files.ReadWrite.All | Full file access | +| Directory.ReadWrite.All | AD modification | +| Application.ReadWrite.All | App registration | + +## MITRE ATT&CK Mapping +| Technique | Description | +|-----------|------------| +| T1528 | Steal Application Access Token | +| T1550.001 | Application Access Token reuse | +| T1078.004 | Cloud Accounts | diff --git a/skills/detecting-oauth-token-theft/scripts/agent.py b/skills/detecting-oauth-token-theft/scripts/agent.py new file mode 100644 index 00000000..c267db7c --- /dev/null +++ b/skills/detecting-oauth-token-theft/scripts/agent.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +"""OAuth token theft detection agent. + +Analyzes sign-in logs for impossible travel, new device sign-ins, +token replay from unusual IPs, and anomalous scope requests. +""" + +import argparse +import json +import math +import sys +import datetime +import collections + +try: + import requests + HAS_REQUESTS = True +except ImportError: + HAS_REQUESTS = False + + +EARTH_RADIUS_KM = 6371 + + +def haversine(lat1, lon1, lat2, lon2): + """Calculate great-circle distance between two points in km.""" + lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) + dlat = lat2 - lat1 + dlon = lon2 - lon1 + a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 + return 2 * EARTH_RADIUS_KM * math.asin(math.sqrt(a)) + + +def detect_impossible_travel(sign_ins, max_speed_kmh=900): + """Detect impossible travel based on geo and time between logins.""" + alerts = [] + by_user = collections.defaultdict(list) + for event in sign_ins: + by_user[event.get("user", "")].append(event) + + for user, events in by_user.items(): + sorted_events = sorted(events, key=lambda e: e.get("timestamp", "")) + for i in range(1, len(sorted_events)): + prev, curr = sorted_events[i - 1], sorted_events[i] + if not all(k in prev for k in ("lat", "lon")) or not all(k in curr for k in ("lat", "lon")): + continue + dist = haversine(prev["lat"], prev["lon"], curr["lat"], curr["lon"]) + try: + t1 = datetime.datetime.fromisoformat(prev["timestamp"].replace("Z", "+00:00")) + t2 = datetime.datetime.fromisoformat(curr["timestamp"].replace("Z", "+00:00")) + hours = max((t2 - t1).total_seconds() / 3600, 0.001) + except (ValueError, KeyError): + continue + speed = dist / hours + if speed > max_speed_kmh and dist > 100: + alerts.append({ + "type": "impossible_travel", + "user": user, + "from_ip": prev.get("ip", ""), + "to_ip": curr.get("ip", ""), + "distance_km": round(dist, 1), + "time_hours": round(hours, 2), + "speed_kmh": round(speed, 1), + "severity": "HIGH", + }) + return alerts + + +def detect_token_replay(sign_ins): + """Detect token replay from multiple IPs in short timeframe.""" + alerts = [] + by_user = collections.defaultdict(list) + for event in sign_ins: + by_user[event.get("user", "")].append(event) + + for user, events in by_user.items(): + sorted_events = sorted(events, key=lambda e: e.get("timestamp", "")) + window = [] + for event in sorted_events: + try: + ts = datetime.datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00")) + except (ValueError, KeyError): + continue + window = [e for e in window + if (ts - datetime.datetime.fromisoformat( + e["timestamp"].replace("Z", "+00:00"))).total_seconds() < 300] + window.append(event) + unique_ips = set(e.get("ip") for e in window if e.get("ip")) + if len(unique_ips) >= 3: + alerts.append({ + "type": "token_replay", + "user": user, + "ips": list(unique_ips), + "window_seconds": 300, + "severity": "CRITICAL", + }) + return alerts + + +def detect_new_device(sign_ins, known_devices=None): + """Detect sign-ins from previously unseen devices.""" + known = set(known_devices or []) + alerts = [] + for event in sign_ins: + device_id = event.get("device_id", event.get("user_agent", "")) + if device_id and device_id not in known: + alerts.append({ + "type": "new_device", + "user": event.get("user", ""), + "device": device_id, + "ip": event.get("ip", ""), + "timestamp": event.get("timestamp", ""), + "severity": "MEDIUM", + }) + known.add(device_id) + return alerts + + +def detect_suspicious_scopes(sign_ins): + """Detect OAuth requests with overly broad or sensitive scopes.""" + sensitive_scopes = { + "Mail.ReadWrite", "Mail.Send", "Files.ReadWrite.All", + "Directory.ReadWrite.All", "User.ReadWrite.All", + "Application.ReadWrite.All", "RoleManagement.ReadWrite.Directory", + } + alerts = [] + for event in sign_ins: + scopes = set(event.get("scopes", [])) + dangerous = scopes & sensitive_scopes + if len(dangerous) >= 2: + alerts.append({ + "type": "suspicious_scopes", + "user": event.get("user", ""), + "scopes": list(dangerous), + "app": event.get("app_name", ""), + "severity": "HIGH", + }) + return alerts + + +def main(): + parser = argparse.ArgumentParser(description="OAuth token theft detection agent") + parser.add_argument("--log-file", help="JSON file with sign-in events") + parser.add_argument("--max-speed", type=int, default=900, help="Max travel speed km/h (default: 900)") + parser.add_argument("--output", "-o", help="Output JSON report path") + args = parser.parse_args() + + print("[*] OAuth Token Theft Detection Agent") + report = {"timestamp": datetime.datetime.utcnow().isoformat() + "Z", "alerts": []} + + if args.log_file: + with open(args.log_file) as f: + sign_ins = json.load(f) + else: + sign_ins = [ + {"user": "alice@corp.com", "ip": "203.0.113.10", "lat": 40.7128, "lon": -74.0060, + "timestamp": "2025-06-15T10:00:00Z", "device_id": "device-A"}, + {"user": "alice@corp.com", "ip": "198.51.100.50", "lat": 51.5074, "lon": -0.1278, + "timestamp": "2025-06-15T10:30:00Z", "device_id": "device-B"}, + {"user": "bob@corp.com", "ip": "10.0.0.1", "lat": 37.7749, "lon": -122.4194, + "timestamp": "2025-06-15T09:00:00Z", "device_id": "device-C", + "scopes": ["Mail.ReadWrite", "Mail.Send", "Files.ReadWrite.All"]}, + ] + print("[DEMO] Using sample sign-in events") + + report["alerts"].extend(detect_impossible_travel(sign_ins, args.max_speed)) + report["alerts"].extend(detect_token_replay(sign_ins)) + report["alerts"].extend(detect_new_device(sign_ins)) + report["alerts"].extend(detect_suspicious_scopes(sign_ins)) + + by_type = collections.Counter(a["type"] for a in report["alerts"]) + print(f"[*] Total alerts: {len(report['alerts'])}") + for alert_type, count in by_type.items(): + print(f" {alert_type}: {count}") + for a in report["alerts"]: + print(f" [{a['severity']}] {a['type']}: {a.get('user', '')} - {a.get('distance_km', a.get('ips', a.get('device', '')))}") + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(json.dumps({"total_alerts": len(report["alerts"]), "by_type": dict(by_type)}, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/skills/hunting-for-defense-evasion-via-timestomping/LICENSE b/skills/hunting-for-defense-evasion-via-timestomping/LICENSE new file mode 100644 index 00000000..09d37ad6 --- /dev/null +++ b/skills/hunting-for-defense-evasion-via-timestomping/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Mahipal + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/skills/hunting-for-defense-evasion-via-timestomping/SKILL.md b/skills/hunting-for-defense-evasion-via-timestomping/SKILL.md new file mode 100644 index 00000000..61ed63ed --- /dev/null +++ b/skills/hunting-for-defense-evasion-via-timestomping/SKILL.md @@ -0,0 +1,19 @@ +--- +name: hunting-for-defense-evasion-via-timestomping +description: > + Detect NTFS timestamp manipulation (MITRE T1070.006) by comparing + $STANDARD_INFORMATION vs $FILE_NAME timestamps in the MFT. Uses + analyzeMFT and Python to identify files with anomalous temporal + patterns indicating anti-forensic timestomping activity. +domain: cybersecurity +subdomain: threat-hunting +tags: [timestomping, ntfs-forensics, mft-analysis, defense-evasion] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Hunting for Defense Evasion via Timestomping + +Detect timestamp manipulation by analyzing NTFS MFT entries for +discrepancies between $STANDARD_INFORMATION and $FILE_NAME attributes. diff --git a/skills/hunting-for-defense-evasion-via-timestomping/references/api-reference.md b/skills/hunting-for-defense-evasion-via-timestomping/references/api-reference.md new file mode 100644 index 00000000..e0dfecd7 --- /dev/null +++ b/skills/hunting-for-defense-evasion-via-timestomping/references/api-reference.md @@ -0,0 +1,67 @@ +# API Reference: Hunting for Timestomping (T1070.006) + +## NTFS Timestamp Attributes +| Attribute | Modifiable By | Updated On | +|-----------|--------------|------------| +| $STANDARD_INFORMATION | User-level APIs (SetFileTime) | Create, modify, access, MFT change | +| $FILE_NAME | Windows kernel only | File create, rename, move | + +## Detection Logic +| Indicator | Description | +|-----------|------------| +| SI < FN Created | $SI creation before $FN creation (most reliable) | +| Zero nanoseconds | .0000000 in timestamp (tool artifacts) | +| Future timestamp | Date beyond current time | +| Pre-OS timestamp | $SI before OS install but $FN after | +| Round seconds | No fractional seconds (unusual for NTFS) | + +## analyzeMFT (Python) +```bash +pip install analyzemft + +# Parse MFT to CSV +analyzeMFT.py -f /path/to/$MFT -o mft_output.csv + +# With body file output (for timeline) +analyzeMFT.py -f $MFT -o mft.csv -b body.txt +``` + +## MFTECmd (Eric Zimmerman) +```bash +# Parse MFT to CSV +MFTECmd.exe -f C:\evidence\$MFT --csv C:\output\ + +# With $J (USN Journal) +MFTECmd.exe -f $MFT --csv output\ --json output\ +``` + +### CSV Columns +| Column | Description | +|--------|------------| +| Record Number | MFT entry number | +| Filename | File name | +| SI Created/Modified/Accessed | $STANDARD_INFORMATION timestamps | +| FN Created/Modified/Accessed | $FILE_NAME timestamps | +| In Use | Active record flag | + +## USN Journal Analysis +```bash +# Parse USN Journal for corroboration +MFTECmd.exe -f $J --csv output\ + +# fsutil on live system +fsutil usn readjournal C: csv > usn_journal.csv +``` + +## Timestomping Tools (for detection awareness) +| Tool | Method | +|------|--------| +| timestomp (Metasploit) | SetFileTime API | +| PowerShell Set-ItemProperty | .NET DateTime | +| NirSoft BulkFileChanger | Batch timestamp edit | +| $STANDARD_INFORMATION patch | Direct MFT edit | + +## MITRE ATT&CK +- **T1070.006** - Indicator Removal: Timestomp +- **Tactic**: Defense Evasion +- **Platforms**: Windows diff --git a/skills/hunting-for-defense-evasion-via-timestomping/scripts/agent.py b/skills/hunting-for-defense-evasion-via-timestomping/scripts/agent.py new file mode 100644 index 00000000..d2aac81d --- /dev/null +++ b/skills/hunting-for-defense-evasion-via-timestomping/scripts/agent.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +"""Timestomping detection agent for NTFS MFT analysis. + +Detects MITRE T1070.006 (Timestomping) by comparing $STANDARD_INFORMATION +and $FILE_NAME timestamps in NTFS Master File Table entries. Identifies +anomalous nanosecond patterns and temporal inconsistencies. +""" + +import argparse +import csv +import json +import os +import re +import sys +import datetime + + +TIMESTOMP_INDICATORS = { + "zero_nanoseconds": "Nanosecond field is exactly 0000000 (common in timestomping tools)", + "si_before_fn": "$STANDARD_INFORMATION created before $FILE_NAME created", + "future_timestamp": "Timestamp is in the future", + "pre_os_timestamp": "Timestamp predates the operating system install", + "round_seconds": "Timestamp has perfectly round seconds (no fractional component)", +} + + +def parse_mft_csv(csv_path): + """Parse analyzeMFT CSV output for timestamp analysis.""" + entries = [] + try: + with open(csv_path, "r", encoding="utf-8", errors="replace") as f: + reader = csv.DictReader(f) + for row in reader: + entry = { + "record_number": row.get("Record Number", ""), + "filename": row.get("Filename", row.get("Good", "")), + "si_created": row.get("SI Created", row.get("STD_INFO Creation date", "")), + "si_modified": row.get("SI Modified", row.get("STD_INFO Modification date", "")), + "si_accessed": row.get("SI Accessed", row.get("STD_INFO Access date", "")), + "si_entry_modified": row.get("SI Entry Modified", row.get("STD_INFO Entry date", "")), + "fn_created": row.get("FN Created", row.get("FN Creation date", "")), + "fn_modified": row.get("FN Modified", row.get("FN Modification date", "")), + "fn_accessed": row.get("FN Accessed", row.get("FN Access date", "")), + "fn_entry_modified": row.get("FN Entry Modified", row.get("FN Entry date", "")), + "in_use": row.get("Active", row.get("In Use", "")).lower() in ("true", "1", "yes"), + } + if entry["filename"]: + entries.append(entry) + except FileNotFoundError: + return {"error": f"File not found: {csv_path}"} + return entries + + +def parse_timestamp(ts_str): + """Parse various timestamp formats to datetime.""" + if not ts_str or ts_str in ("", "NoFNDate", "N/A"): + return None + formats = [ + "%Y-%m-%d %H:%M:%S.%f", + "%Y-%m-%d %H:%M:%S", + "%m/%d/%Y %H:%M:%S", + "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%dT%H:%M:%S", + ] + for fmt in formats: + try: + return datetime.datetime.strptime(ts_str.strip(), fmt) + except ValueError: + continue + return None + + +def detect_timestomping(entries, os_install_date=None): + """Analyze MFT entries for timestomping indicators.""" + if os_install_date is None: + os_install_date = datetime.datetime(2020, 1, 1) + now = datetime.datetime.now() + findings = [] + + for entry in entries: + if isinstance(entry, dict) and "error" in entry: + continue + reasons = [] + si_created = parse_timestamp(entry.get("si_created", "")) + fn_created = parse_timestamp(entry.get("fn_created", "")) + + # Check zero nanoseconds + si_str = entry.get("si_created", "") + if ".0000000" in si_str or (si_str and re.search(r"\.\d{6}0$", si_str)): + reasons.append("zero_nanoseconds") + + # Check SI before FN (most reliable indicator) + if si_created and fn_created: + if si_created < fn_created - datetime.timedelta(seconds=2): + reasons.append("si_before_fn") + + # Check future timestamps + if si_created and si_created > now + datetime.timedelta(days=1): + reasons.append("future_timestamp") + + # Check pre-OS timestamps + if si_created and si_created < os_install_date: + if fn_created and fn_created >= os_install_date: + reasons.append("pre_os_timestamp") + + # Check perfectly round timestamps + if si_created and si_created.microsecond == 0: + si_mod = parse_timestamp(entry.get("si_modified", "")) + if si_mod and si_mod.microsecond == 0: + reasons.append("round_seconds") + + if reasons: + findings.append({ + "filename": entry.get("filename", ""), + "record_number": entry.get("record_number", ""), + "si_created": entry.get("si_created", ""), + "fn_created": entry.get("fn_created", ""), + "indicators": reasons, + "descriptions": [TIMESTOMP_INDICATORS.get(r, r) for r in reasons], + "confidence": "HIGH" if "si_before_fn" in reasons else "MEDIUM", + "mitre": "T1070.006", + }) + + return findings + + +def generate_report(entries, findings): + """Generate timestomping analysis report.""" + return { + "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "total_mft_entries": len(entries) if isinstance(entries, list) else 0, + "total_findings": len(findings), + "high_confidence": sum(1 for f in findings if f.get("confidence") == "HIGH"), + "medium_confidence": sum(1 for f in findings if f.get("confidence") == "MEDIUM"), + "indicator_counts": dict(collections.Counter( + ind for f in findings for ind in f.get("indicators", []) + )) if findings else {}, + "mitre_technique": "T1070.006 - Indicator Removal: Timestomp", + } + + +# Need collections for generate_report +import collections + + +def main(): + parser = argparse.ArgumentParser( + description="NTFS timestomping detection via MFT analysis (MITRE T1070.006)" + ) + parser.add_argument("mft_csv", nargs="?", help="Path to analyzeMFT CSV output") + parser.add_argument("--os-install", help="OS install date (YYYY-MM-DD) for baseline") + parser.add_argument("--high-only", action="store_true", help="Show only HIGH confidence findings") + parser.add_argument("--output", "-o", help="Output JSON report path") + args = parser.parse_args() + + print("[*] Timestomping Detection Agent (MITRE T1070.006)") + print("[*] Compares $STANDARD_INFORMATION vs $FILE_NAME timestamps") + + if not args.mft_csv: + print("\n[DEMO] Usage:") + print(" 1. Extract MFT: ftkimager /path/to/image mft_output") + print(" 2. Parse MFT: analyzeMFT.py -f $MFT -o mft.csv") + print(" 3. Detect: python agent.py mft.csv [--os-install 2022-01-15]") + print("\n Indicators detected:") + for name, desc in TIMESTOMP_INDICATORS.items(): + print(f" - {name}: {desc}") + print(json.dumps({"demo": True, "indicators": len(TIMESTOMP_INDICATORS)}, indent=2)) + sys.exit(0) + + os_date = None + if args.os_install: + try: + os_date = datetime.datetime.strptime(args.os_install, "%Y-%m-%d") + except ValueError: + print(f"[!] Invalid date format: {args.os_install}") + + entries = parse_mft_csv(args.mft_csv) + if isinstance(entries, dict) and "error" in entries: + print(f"[!] {entries['error']}") + sys.exit(1) + + findings = detect_timestomping(entries, os_date) + if args.high_only: + findings = [f for f in findings if f.get("confidence") == "HIGH"] + + report = generate_report(entries, findings) + print(f"[*] MFT entries analyzed: {report['total_mft_entries']}") + print(f"[*] Timestomping findings: {report['total_findings']}") + print(f" HIGH confidence: {report['high_confidence']}") + print(f" MEDIUM confidence: {report['medium_confidence']}") + + for f in findings[:20]: + print(f" [{f['confidence']}] {f['filename']}") + for desc in f["descriptions"]: + print(f" - {desc}") + + if args.output: + full_report = {"summary": report, "findings": findings} + with open(args.output, "w") as f: + json.dump(full_report, f, indent=2) + + print(json.dumps(report, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/skills/implementing-security-information-sharing-with-stix2/LICENSE b/skills/implementing-security-information-sharing-with-stix2/LICENSE new file mode 100644 index 00000000..09d37ad6 --- /dev/null +++ b/skills/implementing-security-information-sharing-with-stix2/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Mahipal + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/skills/implementing-security-information-sharing-with-stix2/SKILL.md b/skills/implementing-security-information-sharing-with-stix2/SKILL.md new file mode 100644 index 00000000..37b53be4 --- /dev/null +++ b/skills/implementing-security-information-sharing-with-stix2/SKILL.md @@ -0,0 +1,18 @@ +--- +name: implementing-security-information-sharing-with-stix2 +description: > + Create, validate, and share STIX 2.1 threat intelligence objects using + the stix2 Python library. Covers indicators, malware, campaigns, + relationships, bundles, and TAXII 2.1 publishing. +domain: cybersecurity +subdomain: threat-intelligence +tags: [stix, taxii, threat-sharing, intelligence-exchange] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Implementing Security Information Sharing with STIX 2.1 + +Build and share structured threat intelligence using STIX 2.1 objects +with the stix2 Python library and TAXII 2.1 transport protocol. diff --git a/skills/implementing-security-information-sharing-with-stix2/references/api-reference.md b/skills/implementing-security-information-sharing-with-stix2/references/api-reference.md new file mode 100644 index 00000000..095c5a6d --- /dev/null +++ b/skills/implementing-security-information-sharing-with-stix2/references/api-reference.md @@ -0,0 +1,77 @@ +# API Reference: Security Information Sharing with STIX 2.1 + +## stix2 Python Library +```bash +pip install stix2 taxii2-client +``` + +### Create Objects +```python +from stix2 import Indicator, Malware, Relationship, Bundle, Identity + +identity = Identity(name="My SOC", identity_class="organization") + +indicator = Indicator( + name="Malicious IP", + pattern="[ipv4-addr:value = '198.51.100.42']", + pattern_type="stix", + valid_from="2025-01-01T00:00:00Z", + created_by_ref=identity.id, +) + +malware = Malware(name="EvilRAT", malware_types=["trojan"], is_family=True) + +rel = Relationship(source_ref=indicator.id, target_ref=malware.id, + relationship_type="indicates") + +bundle = Bundle(objects=[identity, indicator, malware, rel]) +print(bundle.serialize(pretty=True)) +``` + +### Validate and Parse +```python +import stix2 + +parsed = stix2.parse(json_string, allow_custom=True) +print(parsed.type, len(parsed.objects)) +``` + +## STIX 2.1 Object Types +| Type | Description | +|------|------------| +| indicator | IOC with STIX pattern | +| malware | Malware family/sample | +| campaign | Named threat campaign | +| threat-actor | Threat group | +| attack-pattern | TTP (ATT&CK technique) | +| relationship | Link between objects | +| sighting | Observation of indicator | +| identity | Organization/individual | + +## TAXII 2.1 Publishing +```python +from taxii2client.v21 import Collection + +collection = Collection( + "https://taxii.server.com/taxii2/collections/abc-123/", + user="api_user", password="api_pass" +) +collection.add_objects(bundle.serialize()) +``` + +## TLP Marking Definitions +| TLP | stix2 Constant | +|-----|---------------| +| TLP:CLEAR | stix2.TLP_WHITE | +| TLP:GREEN | stix2.TLP_GREEN | +| TLP:AMBER | stix2.TLP_AMBER | +| TLP:RED | stix2.TLP_RED | + +## STIX Pattern Examples +| Type | Pattern | +|------|---------| +| IPv4 | `[ipv4-addr:value = '1.2.3.4']` | +| Domain | `[domain-name:value = 'evil.com']` | +| SHA-256 | `[file:hashes.'SHA-256' = 'abc...']` | +| URL | `[url:value = 'https://evil.com/mal']` | +| Email | `[email-addr:value = 'bad@evil.com']` | diff --git a/skills/implementing-security-information-sharing-with-stix2/scripts/agent.py b/skills/implementing-security-information-sharing-with-stix2/scripts/agent.py new file mode 100644 index 00000000..f77505fd --- /dev/null +++ b/skills/implementing-security-information-sharing-with-stix2/scripts/agent.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +"""STIX 2.1 threat intelligence sharing agent. + +Creates, validates, and exports STIX 2.1 objects including indicators, +malware, campaigns, and relationships using the stix2 Python library. +""" + +import argparse +import json +import sys +import datetime +import uuid + +try: + import stix2 + from stix2 import Indicator, Malware, Campaign, Relationship, Bundle + from stix2 import ThreatActor, Identity, Sighting, AttackPattern + HAS_STIX2 = True +except ImportError: + HAS_STIX2 = False + +try: + from taxii2client.v21 import Collection, Server + HAS_TAXII = True +except ImportError: + HAS_TAXII = False + + +IDENTITY = None +if HAS_STIX2: + IDENTITY = Identity( + id="identity--f165a29e-a997-5f8a-a63b-4b72b9f2f963", + name="Security Operations Center", + identity_class="organization", + ) + + +def create_indicator(value, indicator_type="ipv4-addr", confidence=80, tlp="TLP:AMBER"): + """Create a STIX 2.1 Indicator object.""" + if not HAS_STIX2: + return {"error": "stix2 not installed. pip install stix2"} + pattern_map = { + "ipv4-addr": f"[ipv4-addr:value = '{value}']", + "domain-name": f"[domain-name:value = '{value}']", + "url": f"[url:value = '{value}']", + "file-sha256": f"[file:hashes.'SHA-256' = '{value}']", + "file-md5": f"[file:hashes.MD5 = '{value}']", + "email-addr": f"[email-addr:value = '{value}']", + } + pattern = pattern_map.get(indicator_type, f"[ipv4-addr:value = '{value}']") + marking = stix2.TLP_AMBER if tlp == "TLP:AMBER" else stix2.TLP_GREEN + return Indicator( + name=f"Malicious {indicator_type}: {value}", + pattern=pattern, + pattern_type="stix", + valid_from=datetime.datetime.now(datetime.timezone.utc), + confidence=confidence, + created_by_ref=IDENTITY.id, + object_marking_refs=[marking], + ) + + +def create_malware(name, malware_types=None, is_family=True, description=""): + """Create a STIX 2.1 Malware object.""" + if not HAS_STIX2: + return {"error": "stix2 not installed"} + return Malware( + name=name, + malware_types=malware_types or ["ransomware"], + is_family=is_family, + description=description or f"Malware family: {name}", + created_by_ref=IDENTITY.id, + ) + + +def create_campaign(name, description="", first_seen=None): + """Create a STIX 2.1 Campaign object.""" + if not HAS_STIX2: + return {"error": "stix2 not installed"} + kwargs = {"name": name, "description": description or f"Campaign: {name}", + "created_by_ref": IDENTITY.id} + if first_seen: + kwargs["first_seen"] = first_seen + return Campaign(**kwargs) + + +def create_relationship(source, target, relationship_type="indicates"): + """Create a STIX 2.1 Relationship.""" + if not HAS_STIX2: + return {"error": "stix2 not installed"} + return Relationship( + source_ref=source.id if hasattr(source, "id") else source, + target_ref=target.id if hasattr(target, "id") else target, + relationship_type=relationship_type, + created_by_ref=IDENTITY.id, + ) + + +def build_threat_report(indicators, malware_obj=None, campaign_obj=None): + """Build a STIX 2.1 Bundle with all objects and relationships.""" + if not HAS_STIX2: + return {"error": "stix2 not installed"} + objects = [IDENTITY] + list(indicators) + relationships = [] + + if malware_obj: + objects.append(malware_obj) + for ind in indicators: + rel = create_relationship(ind, malware_obj, "indicates") + relationships.append(rel) + + if campaign_obj: + objects.append(campaign_obj) + if malware_obj: + rel = create_relationship(campaign_obj, malware_obj, "uses") + relationships.append(rel) + + objects.extend(relationships) + bundle = Bundle(objects=objects) + return bundle + + +def publish_to_taxii(bundle, collection_url, username=None, password=None): + """Publish STIX bundle to TAXII 2.1 collection.""" + if not HAS_TAXII: + return {"error": "taxii2-client not installed. pip install taxii2-client"} + try: + collection = Collection(collection_url, user=username, password=password) + collection.add_objects(bundle.serialize()) + return {"status": "published", "collection": collection_url, + "object_count": len(bundle.objects)} + except Exception as e: + return {"error": str(e)} + + +def validate_bundle(bundle_json): + """Validate a STIX 2.1 bundle.""" + if not HAS_STIX2: + return {"error": "stix2 not installed"} + try: + parsed = stix2.parse(bundle_json, allow_custom=True) + return {"valid": True, "type": parsed.type, + "object_count": len(parsed.objects) if hasattr(parsed, "objects") else 1} + except Exception as e: + return {"valid": False, "error": str(e)} + + +def main(): + parser = argparse.ArgumentParser(description="STIX 2.1 threat intelligence sharing agent") + parser.add_argument("--create-indicator", help="Create indicator from value (e.g. 198.51.100.42)") + parser.add_argument("--type", default="ipv4-addr", help="Indicator type (default: ipv4-addr)") + parser.add_argument("--malware", help="Create malware object with this name") + parser.add_argument("--campaign", help="Create campaign object with this name") + parser.add_argument("--validate", help="Validate a STIX JSON file") + parser.add_argument("--output", "-o", help="Output STIX bundle JSON path") + args = parser.parse_args() + + print("[*] STIX 2.1 Threat Intelligence Sharing Agent") + print(f" stix2 available: {HAS_STIX2}") + print(f" taxii2-client available: {HAS_TAXII}") + + if args.validate: + with open(args.validate) as f: + result = validate_bundle(f.read()) + print(json.dumps(result, indent=2)) + sys.exit(0) + + if not HAS_STIX2: + print("[!] Install stix2: pip install stix2") + sys.exit(1) + + indicators = [] + if args.create_indicator: + ind = create_indicator(args.create_indicator, args.type) + indicators.append(ind) + print(f"[+] Created indicator: {ind.name}") + else: + demo_iocs = [("198.51.100.42", "ipv4-addr"), ("evil.example.com", "domain-name"), + ("a" * 64, "file-sha256")] + for val, itype in demo_iocs: + indicators.append(create_indicator(val, itype)) + print(f"[DEMO] Created {len(indicators)} sample indicators") + + malware_obj = create_malware(args.malware) if args.malware else create_malware("DemoRAT", ["trojan"]) + campaign_obj = create_campaign(args.campaign) if args.campaign else None + bundle = build_threat_report(indicators, malware_obj, campaign_obj) + + print(f"\n[*] Bundle: {bundle.id}") + print(f" Objects: {len(bundle.objects)}") + for obj in bundle.objects: + print(f" - {obj.type}: {getattr(obj, 'name', getattr(obj, 'id', ''))}") + + if args.output: + with open(args.output, "w") as f: + f.write(bundle.serialize(pretty=True)) + print(f"[*] Bundle saved to {args.output}") + + print(json.dumps({"objects": len(bundle.objects), "stix_version": "2.1"}, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/skills/performing-binary-exploitation-analysis/LICENSE b/skills/performing-binary-exploitation-analysis/LICENSE new file mode 100644 index 00000000..09d37ad6 --- /dev/null +++ b/skills/performing-binary-exploitation-analysis/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Mahipal + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/skills/performing-binary-exploitation-analysis/SKILL.md b/skills/performing-binary-exploitation-analysis/SKILL.md new file mode 100644 index 00000000..b7523db5 --- /dev/null +++ b/skills/performing-binary-exploitation-analysis/SKILL.md @@ -0,0 +1,21 @@ +--- +name: performing-binary-exploitation-analysis +description: > + Analyze binary exploitation techniques including buffer overflows and + ROP chains using pwntools Python library. Covers checksec analysis, + gadget discovery with ROPgadget, and exploit development for CTF and + authorized security assessments. +domain: cybersecurity +subdomain: offensive-security +tags: [binary-exploitation, pwntools, rop-chains, buffer-overflow] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Performing Binary Exploitation Analysis + +# For authorized security testing and CTF challenges only + +Analyze ELF binaries for exploitation vectors using checksec, ROPgadget, +and pwntools for buffer overflow and ROP chain development. diff --git a/skills/performing-binary-exploitation-analysis/references/api-reference.md b/skills/performing-binary-exploitation-analysis/references/api-reference.md new file mode 100644 index 00000000..760383b6 --- /dev/null +++ b/skills/performing-binary-exploitation-analysis/references/api-reference.md @@ -0,0 +1,74 @@ +# API Reference: Binary Exploitation Analysis + +## pwntools (Python) +```bash +pip install pwntools +``` + +### ELF Analysis +```python +from pwn import ELF, ROP, context + +elf = ELF('./vulnerable_binary') +print(elf.checksec()) # Security mitigations +print(hex(elf.sym['main'])) # Symbol address +print(hex(elf.plt['system'])) # PLT entry +print(hex(elf.got['puts'])) # GOT entry + +# ROP gadget discovery +rop = ROP(elf) +pop_rdi = rop.find_gadget(['pop rdi', 'ret'])[0] +ret = rop.find_gadget(['ret'])[0] +``` + +### Exploit Template +```python +from pwn import * + +context.binary = elf = ELF('./vuln') +p = process('./vuln') # or remote('host', port) +payload = flat(b'A' * offset, pop_rdi, next(elf.search(b'/bin/sh')), elf.plt['system']) +p.sendline(payload) +p.interactive() +``` + +## checksec CLI +```bash +checksec --file ./binary +checksec --file ./binary --output json +``` + +### Output Fields +| Field | Values | Impact | +|-------|--------|--------| +| NX | Enabled/Disabled | No shellcode on stack | +| PIE | Enabled/Disabled | Randomized addresses | +| Canary | Found/Not found | Stack smash detection | +| RELRO | Full/Partial/None | GOT write protection | + +## ROPgadget CLI +```bash +# Find all gadgets +ROPgadget --binary ./vuln + +# Search specific gadget +ROPgadget --binary ./vuln --only "pop|ret" + +# Generate ROP chain +ROPgadget --binary ./vuln --ropchain +``` + +## Dangerous Functions +| Function | Risk | +|----------|------| +| gets() | Unbounded stdin read | +| strcpy() | No length check | +| sprintf() | No length check | +| scanf() | Possible overflow | + +## MITRE ATT&CK +| Technique | Description | +|-----------|------------| +| T1203 | Exploitation for Client Execution | +| T1068 | Exploitation for Privilege Escalation | +| T1211 | Exploitation for Defense Evasion | diff --git a/skills/performing-binary-exploitation-analysis/scripts/agent.py b/skills/performing-binary-exploitation-analysis/scripts/agent.py new file mode 100644 index 00000000..c2f8df4e --- /dev/null +++ b/skills/performing-binary-exploitation-analysis/scripts/agent.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +"""Binary exploitation analysis agent. + +# For authorized security testing and CTF challenges only + +Analyzes ELF binaries for security mitigations, discovers ROP gadgets, +and assists exploit development using pwntools and checksec. +""" + +import argparse +import json +import os +import struct +import subprocess +import sys +import datetime + +try: + from pwn import ELF, ROP, context + HAS_PWNTOOLS = True +except ImportError: + HAS_PWNTOOLS = False + + +def run_checksec(binary_path): + """Analyze binary security mitigations using checksec.""" + if HAS_PWNTOOLS: + try: + elf = ELF(binary_path, checksec=False) + return { + "arch": elf.arch, + "bits": elf.bits, + "endian": elf.endian, + "nx": elf.nx, + "pie": elf.pie, + "canary": elf.canary, + "relro": "Full" if elf.relro == "Full" else ("Partial" if elf.relro else "None"), + "stripped": not elf.sym, + "static": elf.statically_linked, + } + except Exception as e: + return {"error": str(e)} + try: + result = subprocess.run(["checksec", "--file", binary_path, "--output", "json"], + capture_output=True, text=True, timeout=10) + if result.stdout: + return json.loads(result.stdout) + except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError): + pass + return {"error": "Neither pwntools nor checksec available"} + + +def find_rop_gadgets(binary_path, max_gadgets=20): + """Find ROP gadgets using pwntools or ROPgadget.""" + if HAS_PWNTOOLS: + try: + elf = ELF(binary_path, checksec=False) + rop = ROP(elf) + gadgets = [] + for gadget in rop.gadgets.values(): + if len(gadgets) >= max_gadgets: + break + gadgets.append({ + "address": hex(gadget.address), + "insns": "; ".join(gadget.insns), + }) + return gadgets + except Exception as e: + return [{"error": str(e)}] + try: + result = subprocess.run( + ["ROPgadget", "--binary", binary_path, "--count", str(max_gadgets)], + capture_output=True, text=True, timeout=30 + ) + gadgets = [] + for line in result.stdout.splitlines(): + if " : " in line: + parts = line.split(" : ", 1) + gadgets.append({"address": parts[0].strip(), "insns": parts[1].strip()}) + return gadgets[:max_gadgets] + except (FileNotFoundError, subprocess.TimeoutExpired): + return [{"error": "Neither pwntools ROP nor ROPgadget available"}] + + +def find_useful_functions(binary_path): + """Find useful functions for exploitation (system, exec, write, etc.).""" + if not HAS_PWNTOOLS: + return {"error": "pwntools not available"} + try: + elf = ELF(binary_path, checksec=False) + interesting = ["system", "execve", "exec", "popen", "gets", "strcpy", + "sprintf", "read", "write", "puts", "printf", "mprotect"] + found = {} + for func in interesting: + addr = elf.sym.get(func) or elf.plt.get(func) + if addr: + found[func] = hex(addr) + got_entries = {} + for name in ["system", "printf", "puts", "__libc_start_main"]: + if name in elf.got: + got_entries[name] = hex(elf.got[name]) + return {"functions": found, "got_entries": got_entries} + except Exception as e: + return {"error": str(e)} + + +def find_vulnerable_functions(binary_path): + """Identify potentially vulnerable functions in the binary.""" + dangerous = {"gets": "Unbounded read - guaranteed buffer overflow", + "strcpy": "No length check - possible overflow", + "strcat": "No length check - possible overflow", + "sprintf": "No length check - possible overflow", + "scanf": "Possible format string / overflow", + "vsprintf": "No length check - possible overflow"} + if not HAS_PWNTOOLS: + return {"error": "pwntools not available"} + try: + elf = ELF(binary_path, checksec=False) + found = [] + for func, reason in dangerous.items(): + if func in elf.plt or func in elf.sym: + found.append({"function": func, "reason": reason, + "address": hex(elf.plt.get(func, elf.sym.get(func, 0)))}) + return found + except Exception as e: + return [{"error": str(e)}] + + +def analyze_binary(binary_path): + """Full binary exploitation analysis.""" + report = { + "binary": binary_path, + "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "checksec": run_checksec(binary_path), + "dangerous_functions": find_vulnerable_functions(binary_path), + "useful_functions": find_useful_functions(binary_path), + "rop_gadgets": find_rop_gadgets(binary_path, max_gadgets=15), + } + mitigations = report["checksec"] + if isinstance(mitigations, dict) and "error" not in mitigations: + report["exploit_difficulty"] = "HARD" if all([ + mitigations.get("nx"), mitigations.get("pie"), + mitigations.get("canary"), mitigations.get("relro") == "Full" + ]) else "MEDIUM" if mitigations.get("nx") else "EASY" + return report + + +def main(): + parser = argparse.ArgumentParser( + description="Binary exploitation analysis agent (authorized testing only)" + ) + parser.add_argument("binary", nargs="?", help="Path to ELF binary") + parser.add_argument("--checksec-only", action="store_true", help="Only run checksec") + parser.add_argument("--gadgets", type=int, default=15, help="Max ROP gadgets to find") + parser.add_argument("--output", "-o", help="Output JSON report path") + args = parser.parse_args() + + print("[*] Binary Exploitation Analysis Agent") + print("[*] For authorized security testing and CTF challenges only") + print(f" pwntools available: {HAS_PWNTOOLS}") + + if not args.binary: + print("\nUsage: python agent.py /path/to/binary [--checksec-only] [--gadgets 20]") + print(" Analyzes: mitigations, dangerous functions, ROP gadgets, GOT entries") + print(json.dumps({"demo": True, "pwntools": HAS_PWNTOOLS}, indent=2)) + sys.exit(0) + + if args.checksec_only: + result = run_checksec(args.binary) + print(json.dumps(result, indent=2)) + sys.exit(0) + + report = analyze_binary(args.binary) + checksec = report.get("checksec", {}) + if isinstance(checksec, dict) and "error" not in checksec: + print(f"\n[*] Architecture: {checksec.get('arch')} ({checksec.get('bits')}-bit)") + print(f" NX: {checksec.get('nx')} | PIE: {checksec.get('pie')} | " + f"Canary: {checksec.get('canary')} | RELRO: {checksec.get('relro')}") + print(f" Exploit difficulty: {report.get('exploit_difficulty', '?')}") + + dangerous = report.get("dangerous_functions", []) + if isinstance(dangerous, list) and dangerous: + print(f"\n[!] Dangerous functions found: {len(dangerous)}") + for d in dangerous: + if "error" not in d: + print(f" {d['function']} @ {d['address']}: {d['reason']}") + + gadgets = report.get("rop_gadgets", []) + if gadgets and "error" not in gadgets[0]: + print(f"\n[*] ROP gadgets found: {len(gadgets)}") + for g in gadgets[:5]: + print(f" {g['address']}: {g['insns']}") + + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(json.dumps({"difficulty": report.get("exploit_difficulty", "unknown"), + "gadgets": len(gadgets)}, indent=2)) + + +if __name__ == "__main__": + main()