mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
Add 5 new cybersecurity skills batch 2 - oauth token theft, binary exploitation, STIX2 sharing, linux audit logs, timestomping detection
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,18 @@
|
||||
---
|
||||
name: analyzing-linux-audit-logs-for-intrusion
|
||||
description: >
|
||||
Parse and analyze Linux auditd logs to detect intrusion indicators
|
||||
including unauthorized file access, privilege escalation, syscall
|
||||
anomalies, and suspicious process execution using ausearch and Python.
|
||||
domain: cybersecurity
|
||||
subdomain: log-analysis
|
||||
tags: [auditd, linux-forensics, syscall-monitoring, intrusion-detection]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Analyzing Linux Audit Logs for Intrusion
|
||||
|
||||
Parse auditd logs to detect file access violations, privilege escalation,
|
||||
suspicious syscalls, and unauthorized process execution.
|
||||
@@ -0,0 +1,89 @@
|
||||
# API Reference: Analyzing Linux Audit Logs for Intrusion
|
||||
|
||||
## Audit Log Location
|
||||
```
|
||||
/var/log/audit/audit.log
|
||||
```
|
||||
|
||||
## ausearch CLI
|
||||
```bash
|
||||
# Search by key
|
||||
ausearch -k file_access
|
||||
|
||||
# Search by message type
|
||||
ausearch -m EXECVE
|
||||
|
||||
# Failed events only
|
||||
ausearch --success no
|
||||
|
||||
# By user
|
||||
ausearch -ua 1000
|
||||
|
||||
# CSV output for Python processing
|
||||
ausearch --format csv > audit_events.csv
|
||||
|
||||
# By time range
|
||||
ausearch --start today --end now
|
||||
ausearch --start 01/15/2025 00:00:00 --end 01/16/2025 00:00:00
|
||||
```
|
||||
|
||||
## aureport CLI
|
||||
```bash
|
||||
# Summary report
|
||||
aureport --summary
|
||||
|
||||
# Authentication report
|
||||
aureport -au
|
||||
|
||||
# Failed events
|
||||
aureport --failed
|
||||
|
||||
# Executable report
|
||||
aureport -x
|
||||
|
||||
# File access report
|
||||
aureport -f
|
||||
|
||||
# Anomaly report
|
||||
aureport --anomaly
|
||||
```
|
||||
|
||||
## Audit Rules (auditctl)
|
||||
```bash
|
||||
# Monitor sensitive files
|
||||
auditctl -w /etc/passwd -p rwxa -k passwd_access
|
||||
auditctl -w /etc/shadow -p rwxa -k shadow_access
|
||||
auditctl -w /etc/sudoers -p rwxa -k sudoers_access
|
||||
|
||||
# Monitor privilege escalation
|
||||
auditctl -a always,exit -F arch=b64 -S execve -F euid=0 -F uid!=0 -k priv_esc
|
||||
|
||||
# Monitor module loading
|
||||
auditctl -a always,exit -F arch=b64 -S init_module -S finit_module -k modules
|
||||
|
||||
# Monitor network connections
|
||||
auditctl -a always,exit -F arch=b64 -S connect -k network_connect
|
||||
```
|
||||
|
||||
## Audit Log Fields
|
||||
| Field | Description |
|
||||
|-------|------------|
|
||||
| type | Event type (SYSCALL, PATH, EXECVE, USER_CMD) |
|
||||
| msg | audit(timestamp:event_id) |
|
||||
| syscall | System call number |
|
||||
| uid/euid | User ID / Effective UID |
|
||||
| comm | Command name |
|
||||
| exe | Executable path |
|
||||
| key | Audit rule key |
|
||||
| success | yes/no |
|
||||
| name | File path (in PATH records) |
|
||||
|
||||
## Suspicious Syscalls
|
||||
| Syscall | Concern |
|
||||
|---------|---------|
|
||||
| execve | Program execution |
|
||||
| ptrace | Process debugging/injection |
|
||||
| init_module | Kernel rootkit loading |
|
||||
| connect | Outbound connection |
|
||||
| setuid | Privilege change |
|
||||
| open_by_handle_at | Container escape |
|
||||
@@ -0,0 +1,222 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Linux audit log analysis agent for intrusion detection.
|
||||
|
||||
Parses /var/log/audit/audit.log entries to detect privilege escalation,
|
||||
unauthorized file access, suspicious syscalls, and process execution anomalies.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import datetime
|
||||
import collections
|
||||
import subprocess
|
||||
|
||||
|
||||
SUSPICIOUS_SYSCALLS = {
|
||||
"execve": "Program execution",
|
||||
"connect": "Network connection",
|
||||
"bind": "Port binding",
|
||||
"ptrace": "Process tracing/debugging",
|
||||
"init_module": "Kernel module loading",
|
||||
"finit_module": "Kernel module loading",
|
||||
"delete_module": "Kernel module unloading",
|
||||
"mount": "Filesystem mount",
|
||||
"umount2": "Filesystem unmount",
|
||||
"setuid": "UID change",
|
||||
"setgid": "GID change",
|
||||
"sethostname": "Hostname change",
|
||||
"open_by_handle_at": "File open by handle (container escape)",
|
||||
}
|
||||
|
||||
SENSITIVE_PATHS = [
|
||||
"/etc/passwd", "/etc/shadow", "/etc/sudoers",
|
||||
"/etc/ssh/sshd_config", "/root/.ssh/authorized_keys",
|
||||
"/etc/crontab", "/var/spool/cron",
|
||||
]
|
||||
|
||||
SUSPICIOUS_COMMANDS = [
|
||||
"curl", "wget", "nc", "ncat", "nmap", "tcpdump",
|
||||
"python", "perl", "ruby", "gcc", "cc", "make",
|
||||
"useradd", "usermod", "groupadd", "visudo",
|
||||
"iptables", "ip6tables", "nft",
|
||||
]
|
||||
|
||||
|
||||
def parse_audit_log(log_path, max_lines=50000):
|
||||
"""Parse raw audit.log file into structured events."""
|
||||
events = []
|
||||
current = {}
|
||||
try:
|
||||
with open(log_path, "r") as f:
|
||||
for i, line in enumerate(f):
|
||||
if i >= max_lines:
|
||||
break
|
||||
match = re.match(
|
||||
r"type=(\S+)\s+msg=audit\((\d+\.\d+):(\d+)\):\s*(.*)", line
|
||||
)
|
||||
if not match:
|
||||
continue
|
||||
event_type = match.group(1)
|
||||
timestamp = float(match.group(2))
|
||||
event_id = match.group(3)
|
||||
data_str = match.group(4)
|
||||
fields = dict(re.findall(r'(\w+)=("[^"]*"|\S+)', data_str))
|
||||
for k, v in fields.items():
|
||||
fields[k] = v.strip('"')
|
||||
event = {
|
||||
"type": event_type,
|
||||
"timestamp": datetime.datetime.fromtimestamp(timestamp).isoformat(),
|
||||
"event_id": event_id,
|
||||
**fields,
|
||||
}
|
||||
events.append(event)
|
||||
except FileNotFoundError:
|
||||
return {"error": f"Log file not found: {log_path}"}
|
||||
return events
|
||||
|
||||
|
||||
def detect_privilege_escalation(events):
|
||||
"""Detect privilege escalation indicators in audit events."""
|
||||
findings = []
|
||||
for e in events:
|
||||
if e.get("type") == "SYSCALL" and e.get("syscall_name") in ("setuid", "setgid", "execve"):
|
||||
if e.get("uid") != "0" and e.get("euid") == "0":
|
||||
findings.append({
|
||||
"type": "privilege_escalation",
|
||||
"detail": f"UID {e.get('uid')} escalated to eUID 0",
|
||||
"command": e.get("comm", ""),
|
||||
"exe": e.get("exe", ""),
|
||||
"timestamp": e.get("timestamp"),
|
||||
"severity": "CRITICAL",
|
||||
})
|
||||
if e.get("type") == "USER_CMD" and "sudo" in e.get("cmd", "").lower():
|
||||
findings.append({
|
||||
"type": "sudo_usage",
|
||||
"user": e.get("acct", e.get("uid", "")),
|
||||
"command": e.get("cmd", ""),
|
||||
"timestamp": e.get("timestamp"),
|
||||
"severity": "MEDIUM",
|
||||
})
|
||||
return findings
|
||||
|
||||
|
||||
def detect_file_access(events):
|
||||
"""Detect access to sensitive files."""
|
||||
findings = []
|
||||
for e in events:
|
||||
if e.get("type") in ("PATH", "SYSCALL"):
|
||||
path = e.get("name", e.get("exe", ""))
|
||||
for sensitive in SENSITIVE_PATHS:
|
||||
if sensitive in path:
|
||||
findings.append({
|
||||
"type": "sensitive_file_access",
|
||||
"path": path,
|
||||
"syscall": e.get("syscall_name", e.get("syscall", "")),
|
||||
"user": e.get("uid", ""),
|
||||
"timestamp": e.get("timestamp"),
|
||||
"severity": "HIGH",
|
||||
})
|
||||
break
|
||||
return findings
|
||||
|
||||
|
||||
def detect_suspicious_commands(events):
|
||||
"""Detect execution of suspicious commands."""
|
||||
findings = []
|
||||
for e in events:
|
||||
if e.get("type") in ("EXECVE", "SYSCALL"):
|
||||
comm = e.get("comm", "").lower()
|
||||
exe = e.get("exe", "").lower()
|
||||
for cmd in SUSPICIOUS_COMMANDS:
|
||||
if cmd in comm or cmd in exe:
|
||||
findings.append({
|
||||
"type": "suspicious_command",
|
||||
"command": comm,
|
||||
"exe": exe,
|
||||
"user": e.get("uid", ""),
|
||||
"timestamp": e.get("timestamp"),
|
||||
"severity": "MEDIUM",
|
||||
})
|
||||
break
|
||||
return findings
|
||||
|
||||
|
||||
def run_ausearch(key=None, message_type=None, success=None):
|
||||
"""Run ausearch command and return results."""
|
||||
cmd = ["ausearch"]
|
||||
if key:
|
||||
cmd.extend(["-k", key])
|
||||
if message_type:
|
||||
cmd.extend(["-m", message_type])
|
||||
if success is not None:
|
||||
cmd.extend(["--success", "yes" if success else "no"])
|
||||
cmd.extend(["--format", "csv"])
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
return {"output": result.stdout[:5000], "exit_code": result.returncode}
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired) as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def generate_summary(events, findings):
|
||||
"""Generate audit log analysis summary."""
|
||||
event_types = collections.Counter(e.get("type") for e in events)
|
||||
finding_types = collections.Counter(f.get("type") for f in findings)
|
||||
severity_counts = collections.Counter(f.get("severity") for f in findings)
|
||||
return {
|
||||
"total_events": len(events),
|
||||
"event_types": dict(event_types.most_common(10)),
|
||||
"total_findings": len(findings),
|
||||
"finding_types": dict(finding_types),
|
||||
"by_severity": dict(severity_counts),
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Linux audit log intrusion detection agent")
|
||||
parser.add_argument("log_file", nargs="?", default="/var/log/audit/audit.log",
|
||||
help="Path to audit.log (default: /var/log/audit/audit.log)")
|
||||
parser.add_argument("--max-lines", type=int, default=50000, help="Max log lines to parse")
|
||||
parser.add_argument("--ausearch-key", help="Run ausearch with this key")
|
||||
parser.add_argument("--output", "-o", help="Output JSON report path")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("[*] Linux Audit Log Intrusion Detection Agent")
|
||||
|
||||
if args.ausearch_key:
|
||||
result = run_ausearch(key=args.ausearch_key)
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0)
|
||||
|
||||
events = parse_audit_log(args.log_file, args.max_lines)
|
||||
if isinstance(events, dict) and "error" in events:
|
||||
print(f"[!] {events['error']}")
|
||||
print("[DEMO] Specify a valid audit.log path or run on a Linux system")
|
||||
print(json.dumps({"demo": True, "monitored_syscalls": len(SUSPICIOUS_SYSCALLS)}, indent=2))
|
||||
sys.exit(0)
|
||||
|
||||
findings = []
|
||||
findings.extend(detect_privilege_escalation(events))
|
||||
findings.extend(detect_file_access(events))
|
||||
findings.extend(detect_suspicious_commands(events))
|
||||
|
||||
summary = generate_summary(events, findings)
|
||||
print(f"[*] Events parsed: {summary['total_events']}")
|
||||
print(f"[*] Findings: {summary['total_findings']}")
|
||||
print(f" By severity: {summary['by_severity']}")
|
||||
for f in findings[:15]:
|
||||
print(f" [{f['severity']}] {f['type']}: {f.get('command', f.get('path', ''))}")
|
||||
|
||||
if args.output:
|
||||
report = {"summary": summary, "findings": findings}
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(report, f, indent=2, default=str)
|
||||
|
||||
print(json.dumps(summary, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,18 @@
|
||||
---
|
||||
name: detecting-oauth-token-theft
|
||||
description: >
|
||||
Detect OAuth access token theft and misuse by analyzing sign-in logs for
|
||||
impossible travel, new device patterns, token replay from unusual IPs,
|
||||
and anomalous scope requests via Microsoft Graph and Okta APIs.
|
||||
domain: cybersecurity
|
||||
subdomain: identity-security
|
||||
tags: [oauth, token-theft, identity-attacks, impossible-travel]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Detecting OAuth Token Theft
|
||||
|
||||
Analyze OAuth sign-in telemetry for indicators of token theft including
|
||||
impossible travel, device fingerprint changes, and token replay attacks.
|
||||
@@ -0,0 +1,51 @@
|
||||
# API Reference: Detecting OAuth Token Theft
|
||||
|
||||
## Microsoft Graph Sign-In Logs
|
||||
```bash
|
||||
# Query sign-in logs
|
||||
curl -H "Authorization: Bearer $MS_TOKEN" \
|
||||
"https://graph.microsoft.com/v1.0/auditLogs/signIns?\$filter=createdDateTime ge 2025-01-01&\$top=100"
|
||||
```
|
||||
|
||||
### Sign-In Event Fields
|
||||
| Field | Description |
|
||||
|-------|------------|
|
||||
| userPrincipalName | User email/UPN |
|
||||
| ipAddress | Source IP address |
|
||||
| location.city | Geo city |
|
||||
| location.geoCoordinates | Lat/lon |
|
||||
| deviceDetail.deviceId | Device identifier |
|
||||
| resourceDisplayName | Target resource |
|
||||
| status.errorCode | 0 = success |
|
||||
| riskState | none, confirmedCompromised, remediated |
|
||||
|
||||
## Okta System Log API
|
||||
```bash
|
||||
# Query events
|
||||
curl -H "Authorization: SSWS $OKTA_TOKEN" \
|
||||
"https://your-org.okta.com/api/v1/logs?filter=eventType eq \"user.session.start\"&since=2025-01-01"
|
||||
```
|
||||
|
||||
## Detection Logic
|
||||
| Detection | Method |
|
||||
|-----------|--------|
|
||||
| Impossible travel | Haversine distance / time > 900 km/h |
|
||||
| Token replay | Same user, 3+ IPs within 5 min window |
|
||||
| New device | Device ID not in known device inventory |
|
||||
| Suspicious scopes | 2+ sensitive OAuth scopes requested |
|
||||
|
||||
## Sensitive OAuth Scopes (Microsoft)
|
||||
| Scope | Risk |
|
||||
|-------|------|
|
||||
| Mail.ReadWrite | Email access |
|
||||
| Mail.Send | Send-as capability |
|
||||
| Files.ReadWrite.All | Full file access |
|
||||
| Directory.ReadWrite.All | AD modification |
|
||||
| Application.ReadWrite.All | App registration |
|
||||
|
||||
## MITRE ATT&CK Mapping
|
||||
| Technique | Description |
|
||||
|-----------|------------|
|
||||
| T1528 | Steal Application Access Token |
|
||||
| T1550.001 | Application Access Token reuse |
|
||||
| T1078.004 | Cloud Accounts |
|
||||
@@ -0,0 +1,185 @@
|
||||
#!/usr/bin/env python3
|
||||
"""OAuth token theft detection agent.
|
||||
|
||||
Analyzes sign-in logs for impossible travel, new device sign-ins,
|
||||
token replay from unusual IPs, and anomalous scope requests.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
import sys
|
||||
import datetime
|
||||
import collections
|
||||
|
||||
try:
|
||||
import requests
|
||||
HAS_REQUESTS = True
|
||||
except ImportError:
|
||||
HAS_REQUESTS = False
|
||||
|
||||
|
||||
EARTH_RADIUS_KM = 6371
|
||||
|
||||
|
||||
def haversine(lat1, lon1, lat2, lon2):
|
||||
"""Calculate great-circle distance between two points in km."""
|
||||
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
|
||||
dlat = lat2 - lat1
|
||||
dlon = lon2 - lon1
|
||||
a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
|
||||
return 2 * EARTH_RADIUS_KM * math.asin(math.sqrt(a))
|
||||
|
||||
|
||||
def detect_impossible_travel(sign_ins, max_speed_kmh=900):
|
||||
"""Detect impossible travel based on geo and time between logins."""
|
||||
alerts = []
|
||||
by_user = collections.defaultdict(list)
|
||||
for event in sign_ins:
|
||||
by_user[event.get("user", "")].append(event)
|
||||
|
||||
for user, events in by_user.items():
|
||||
sorted_events = sorted(events, key=lambda e: e.get("timestamp", ""))
|
||||
for i in range(1, len(sorted_events)):
|
||||
prev, curr = sorted_events[i - 1], sorted_events[i]
|
||||
if not all(k in prev for k in ("lat", "lon")) or not all(k in curr for k in ("lat", "lon")):
|
||||
continue
|
||||
dist = haversine(prev["lat"], prev["lon"], curr["lat"], curr["lon"])
|
||||
try:
|
||||
t1 = datetime.datetime.fromisoformat(prev["timestamp"].replace("Z", "+00:00"))
|
||||
t2 = datetime.datetime.fromisoformat(curr["timestamp"].replace("Z", "+00:00"))
|
||||
hours = max((t2 - t1).total_seconds() / 3600, 0.001)
|
||||
except (ValueError, KeyError):
|
||||
continue
|
||||
speed = dist / hours
|
||||
if speed > max_speed_kmh and dist > 100:
|
||||
alerts.append({
|
||||
"type": "impossible_travel",
|
||||
"user": user,
|
||||
"from_ip": prev.get("ip", ""),
|
||||
"to_ip": curr.get("ip", ""),
|
||||
"distance_km": round(dist, 1),
|
||||
"time_hours": round(hours, 2),
|
||||
"speed_kmh": round(speed, 1),
|
||||
"severity": "HIGH",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def detect_token_replay(sign_ins):
|
||||
"""Detect token replay from multiple IPs in short timeframe."""
|
||||
alerts = []
|
||||
by_user = collections.defaultdict(list)
|
||||
for event in sign_ins:
|
||||
by_user[event.get("user", "")].append(event)
|
||||
|
||||
for user, events in by_user.items():
|
||||
sorted_events = sorted(events, key=lambda e: e.get("timestamp", ""))
|
||||
window = []
|
||||
for event in sorted_events:
|
||||
try:
|
||||
ts = datetime.datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00"))
|
||||
except (ValueError, KeyError):
|
||||
continue
|
||||
window = [e for e in window
|
||||
if (ts - datetime.datetime.fromisoformat(
|
||||
e["timestamp"].replace("Z", "+00:00"))).total_seconds() < 300]
|
||||
window.append(event)
|
||||
unique_ips = set(e.get("ip") for e in window if e.get("ip"))
|
||||
if len(unique_ips) >= 3:
|
||||
alerts.append({
|
||||
"type": "token_replay",
|
||||
"user": user,
|
||||
"ips": list(unique_ips),
|
||||
"window_seconds": 300,
|
||||
"severity": "CRITICAL",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def detect_new_device(sign_ins, known_devices=None):
|
||||
"""Detect sign-ins from previously unseen devices."""
|
||||
known = set(known_devices or [])
|
||||
alerts = []
|
||||
for event in sign_ins:
|
||||
device_id = event.get("device_id", event.get("user_agent", ""))
|
||||
if device_id and device_id not in known:
|
||||
alerts.append({
|
||||
"type": "new_device",
|
||||
"user": event.get("user", ""),
|
||||
"device": device_id,
|
||||
"ip": event.get("ip", ""),
|
||||
"timestamp": event.get("timestamp", ""),
|
||||
"severity": "MEDIUM",
|
||||
})
|
||||
known.add(device_id)
|
||||
return alerts
|
||||
|
||||
|
||||
def detect_suspicious_scopes(sign_ins):
|
||||
"""Detect OAuth requests with overly broad or sensitive scopes."""
|
||||
sensitive_scopes = {
|
||||
"Mail.ReadWrite", "Mail.Send", "Files.ReadWrite.All",
|
||||
"Directory.ReadWrite.All", "User.ReadWrite.All",
|
||||
"Application.ReadWrite.All", "RoleManagement.ReadWrite.Directory",
|
||||
}
|
||||
alerts = []
|
||||
for event in sign_ins:
|
||||
scopes = set(event.get("scopes", []))
|
||||
dangerous = scopes & sensitive_scopes
|
||||
if len(dangerous) >= 2:
|
||||
alerts.append({
|
||||
"type": "suspicious_scopes",
|
||||
"user": event.get("user", ""),
|
||||
"scopes": list(dangerous),
|
||||
"app": event.get("app_name", ""),
|
||||
"severity": "HIGH",
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="OAuth token theft detection agent")
|
||||
parser.add_argument("--log-file", help="JSON file with sign-in events")
|
||||
parser.add_argument("--max-speed", type=int, default=900, help="Max travel speed km/h (default: 900)")
|
||||
parser.add_argument("--output", "-o", help="Output JSON report path")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("[*] OAuth Token Theft Detection Agent")
|
||||
report = {"timestamp": datetime.datetime.utcnow().isoformat() + "Z", "alerts": []}
|
||||
|
||||
if args.log_file:
|
||||
with open(args.log_file) as f:
|
||||
sign_ins = json.load(f)
|
||||
else:
|
||||
sign_ins = [
|
||||
{"user": "alice@corp.com", "ip": "203.0.113.10", "lat": 40.7128, "lon": -74.0060,
|
||||
"timestamp": "2025-06-15T10:00:00Z", "device_id": "device-A"},
|
||||
{"user": "alice@corp.com", "ip": "198.51.100.50", "lat": 51.5074, "lon": -0.1278,
|
||||
"timestamp": "2025-06-15T10:30:00Z", "device_id": "device-B"},
|
||||
{"user": "bob@corp.com", "ip": "10.0.0.1", "lat": 37.7749, "lon": -122.4194,
|
||||
"timestamp": "2025-06-15T09:00:00Z", "device_id": "device-C",
|
||||
"scopes": ["Mail.ReadWrite", "Mail.Send", "Files.ReadWrite.All"]},
|
||||
]
|
||||
print("[DEMO] Using sample sign-in events")
|
||||
|
||||
report["alerts"].extend(detect_impossible_travel(sign_ins, args.max_speed))
|
||||
report["alerts"].extend(detect_token_replay(sign_ins))
|
||||
report["alerts"].extend(detect_new_device(sign_ins))
|
||||
report["alerts"].extend(detect_suspicious_scopes(sign_ins))
|
||||
|
||||
by_type = collections.Counter(a["type"] for a in report["alerts"])
|
||||
print(f"[*] Total alerts: {len(report['alerts'])}")
|
||||
for alert_type, count in by_type.items():
|
||||
print(f" {alert_type}: {count}")
|
||||
for a in report["alerts"]:
|
||||
print(f" [{a['severity']}] {a['type']}: {a.get('user', '')} - {a.get('distance_km', a.get('ips', a.get('device', '')))}")
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(report, f, indent=2)
|
||||
print(json.dumps({"total_alerts": len(report["alerts"]), "by_type": dict(by_type)}, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,19 @@
|
||||
---
|
||||
name: hunting-for-defense-evasion-via-timestomping
|
||||
description: >
|
||||
Detect NTFS timestamp manipulation (MITRE T1070.006) by comparing
|
||||
$STANDARD_INFORMATION vs $FILE_NAME timestamps in the MFT. Uses
|
||||
analyzeMFT and Python to identify files with anomalous temporal
|
||||
patterns indicating anti-forensic timestomping activity.
|
||||
domain: cybersecurity
|
||||
subdomain: threat-hunting
|
||||
tags: [timestomping, ntfs-forensics, mft-analysis, defense-evasion]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Hunting for Defense Evasion via Timestomping
|
||||
|
||||
Detect timestamp manipulation by analyzing NTFS MFT entries for
|
||||
discrepancies between $STANDARD_INFORMATION and $FILE_NAME attributes.
|
||||
@@ -0,0 +1,67 @@
|
||||
# API Reference: Hunting for Timestomping (T1070.006)
|
||||
|
||||
## NTFS Timestamp Attributes
|
||||
| Attribute | Modifiable By | Updated On |
|
||||
|-----------|--------------|------------|
|
||||
| $STANDARD_INFORMATION | User-level APIs (SetFileTime) | Create, modify, access, MFT change |
|
||||
| $FILE_NAME | Windows kernel only | File create, rename, move |
|
||||
|
||||
## Detection Logic
|
||||
| Indicator | Description |
|
||||
|-----------|------------|
|
||||
| SI < FN Created | $SI creation before $FN creation (most reliable) |
|
||||
| Zero nanoseconds | .0000000 in timestamp (tool artifacts) |
|
||||
| Future timestamp | Date beyond current time |
|
||||
| Pre-OS timestamp | $SI before OS install but $FN after |
|
||||
| Round seconds | No fractional seconds (unusual for NTFS) |
|
||||
|
||||
## analyzeMFT (Python)
|
||||
```bash
|
||||
pip install analyzemft
|
||||
|
||||
# Parse MFT to CSV
|
||||
analyzeMFT.py -f /path/to/$MFT -o mft_output.csv
|
||||
|
||||
# With body file output (for timeline)
|
||||
analyzeMFT.py -f $MFT -o mft.csv -b body.txt
|
||||
```
|
||||
|
||||
## MFTECmd (Eric Zimmerman)
|
||||
```bash
|
||||
# Parse MFT to CSV
|
||||
MFTECmd.exe -f C:\evidence\$MFT --csv C:\output\
|
||||
|
||||
# With $J (USN Journal)
|
||||
MFTECmd.exe -f $MFT --csv output\ --json output\
|
||||
```
|
||||
|
||||
### CSV Columns
|
||||
| Column | Description |
|
||||
|--------|------------|
|
||||
| Record Number | MFT entry number |
|
||||
| Filename | File name |
|
||||
| SI Created/Modified/Accessed | $STANDARD_INFORMATION timestamps |
|
||||
| FN Created/Modified/Accessed | $FILE_NAME timestamps |
|
||||
| In Use | Active record flag |
|
||||
|
||||
## USN Journal Analysis
|
||||
```bash
|
||||
# Parse USN Journal for corroboration
|
||||
MFTECmd.exe -f $J --csv output\
|
||||
|
||||
# fsutil on live system
|
||||
fsutil usn readjournal C: csv > usn_journal.csv
|
||||
```
|
||||
|
||||
## Timestomping Tools (for detection awareness)
|
||||
| Tool | Method |
|
||||
|------|--------|
|
||||
| timestomp (Metasploit) | SetFileTime API |
|
||||
| PowerShell Set-ItemProperty | .NET DateTime |
|
||||
| NirSoft BulkFileChanger | Batch timestamp edit |
|
||||
| $STANDARD_INFORMATION patch | Direct MFT edit |
|
||||
|
||||
## MITRE ATT&CK
|
||||
- **T1070.006** - Indicator Removal: Timestomp
|
||||
- **Tactic**: Defense Evasion
|
||||
- **Platforms**: Windows
|
||||
@@ -0,0 +1,206 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Timestomping detection agent for NTFS MFT analysis.
|
||||
|
||||
Detects MITRE T1070.006 (Timestomping) by comparing $STANDARD_INFORMATION
|
||||
and $FILE_NAME timestamps in NTFS Master File Table entries. Identifies
|
||||
anomalous nanosecond patterns and temporal inconsistencies.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import datetime
|
||||
|
||||
|
||||
TIMESTOMP_INDICATORS = {
|
||||
"zero_nanoseconds": "Nanosecond field is exactly 0000000 (common in timestomping tools)",
|
||||
"si_before_fn": "$STANDARD_INFORMATION created before $FILE_NAME created",
|
||||
"future_timestamp": "Timestamp is in the future",
|
||||
"pre_os_timestamp": "Timestamp predates the operating system install",
|
||||
"round_seconds": "Timestamp has perfectly round seconds (no fractional component)",
|
||||
}
|
||||
|
||||
|
||||
def parse_mft_csv(csv_path):
|
||||
"""Parse analyzeMFT CSV output for timestamp analysis."""
|
||||
entries = []
|
||||
try:
|
||||
with open(csv_path, "r", encoding="utf-8", errors="replace") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
entry = {
|
||||
"record_number": row.get("Record Number", ""),
|
||||
"filename": row.get("Filename", row.get("Good", "")),
|
||||
"si_created": row.get("SI Created", row.get("STD_INFO Creation date", "")),
|
||||
"si_modified": row.get("SI Modified", row.get("STD_INFO Modification date", "")),
|
||||
"si_accessed": row.get("SI Accessed", row.get("STD_INFO Access date", "")),
|
||||
"si_entry_modified": row.get("SI Entry Modified", row.get("STD_INFO Entry date", "")),
|
||||
"fn_created": row.get("FN Created", row.get("FN Creation date", "")),
|
||||
"fn_modified": row.get("FN Modified", row.get("FN Modification date", "")),
|
||||
"fn_accessed": row.get("FN Accessed", row.get("FN Access date", "")),
|
||||
"fn_entry_modified": row.get("FN Entry Modified", row.get("FN Entry date", "")),
|
||||
"in_use": row.get("Active", row.get("In Use", "")).lower() in ("true", "1", "yes"),
|
||||
}
|
||||
if entry["filename"]:
|
||||
entries.append(entry)
|
||||
except FileNotFoundError:
|
||||
return {"error": f"File not found: {csv_path}"}
|
||||
return entries
|
||||
|
||||
|
||||
def parse_timestamp(ts_str):
|
||||
"""Parse various timestamp formats to datetime."""
|
||||
if not ts_str or ts_str in ("", "NoFNDate", "N/A"):
|
||||
return None
|
||||
formats = [
|
||||
"%Y-%m-%d %H:%M:%S.%f",
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
"%m/%d/%Y %H:%M:%S",
|
||||
"%Y-%m-%dT%H:%M:%S.%f",
|
||||
"%Y-%m-%dT%H:%M:%S",
|
||||
]
|
||||
for fmt in formats:
|
||||
try:
|
||||
return datetime.datetime.strptime(ts_str.strip(), fmt)
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def detect_timestomping(entries, os_install_date=None):
|
||||
"""Analyze MFT entries for timestomping indicators."""
|
||||
if os_install_date is None:
|
||||
os_install_date = datetime.datetime(2020, 1, 1)
|
||||
now = datetime.datetime.now()
|
||||
findings = []
|
||||
|
||||
for entry in entries:
|
||||
if isinstance(entry, dict) and "error" in entry:
|
||||
continue
|
||||
reasons = []
|
||||
si_created = parse_timestamp(entry.get("si_created", ""))
|
||||
fn_created = parse_timestamp(entry.get("fn_created", ""))
|
||||
|
||||
# Check zero nanoseconds
|
||||
si_str = entry.get("si_created", "")
|
||||
if ".0000000" in si_str or (si_str and re.search(r"\.\d{6}0$", si_str)):
|
||||
reasons.append("zero_nanoseconds")
|
||||
|
||||
# Check SI before FN (most reliable indicator)
|
||||
if si_created and fn_created:
|
||||
if si_created < fn_created - datetime.timedelta(seconds=2):
|
||||
reasons.append("si_before_fn")
|
||||
|
||||
# Check future timestamps
|
||||
if si_created and si_created > now + datetime.timedelta(days=1):
|
||||
reasons.append("future_timestamp")
|
||||
|
||||
# Check pre-OS timestamps
|
||||
if si_created and si_created < os_install_date:
|
||||
if fn_created and fn_created >= os_install_date:
|
||||
reasons.append("pre_os_timestamp")
|
||||
|
||||
# Check perfectly round timestamps
|
||||
if si_created and si_created.microsecond == 0:
|
||||
si_mod = parse_timestamp(entry.get("si_modified", ""))
|
||||
if si_mod and si_mod.microsecond == 0:
|
||||
reasons.append("round_seconds")
|
||||
|
||||
if reasons:
|
||||
findings.append({
|
||||
"filename": entry.get("filename", ""),
|
||||
"record_number": entry.get("record_number", ""),
|
||||
"si_created": entry.get("si_created", ""),
|
||||
"fn_created": entry.get("fn_created", ""),
|
||||
"indicators": reasons,
|
||||
"descriptions": [TIMESTOMP_INDICATORS.get(r, r) for r in reasons],
|
||||
"confidence": "HIGH" if "si_before_fn" in reasons else "MEDIUM",
|
||||
"mitre": "T1070.006",
|
||||
})
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
def generate_report(entries, findings):
|
||||
"""Generate timestomping analysis report."""
|
||||
return {
|
||||
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
|
||||
"total_mft_entries": len(entries) if isinstance(entries, list) else 0,
|
||||
"total_findings": len(findings),
|
||||
"high_confidence": sum(1 for f in findings if f.get("confidence") == "HIGH"),
|
||||
"medium_confidence": sum(1 for f in findings if f.get("confidence") == "MEDIUM"),
|
||||
"indicator_counts": dict(collections.Counter(
|
||||
ind for f in findings for ind in f.get("indicators", [])
|
||||
)) if findings else {},
|
||||
"mitre_technique": "T1070.006 - Indicator Removal: Timestomp",
|
||||
}
|
||||
|
||||
|
||||
# Need collections for generate_report
|
||||
import collections
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="NTFS timestomping detection via MFT analysis (MITRE T1070.006)"
|
||||
)
|
||||
parser.add_argument("mft_csv", nargs="?", help="Path to analyzeMFT CSV output")
|
||||
parser.add_argument("--os-install", help="OS install date (YYYY-MM-DD) for baseline")
|
||||
parser.add_argument("--high-only", action="store_true", help="Show only HIGH confidence findings")
|
||||
parser.add_argument("--output", "-o", help="Output JSON report path")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("[*] Timestomping Detection Agent (MITRE T1070.006)")
|
||||
print("[*] Compares $STANDARD_INFORMATION vs $FILE_NAME timestamps")
|
||||
|
||||
if not args.mft_csv:
|
||||
print("\n[DEMO] Usage:")
|
||||
print(" 1. Extract MFT: ftkimager /path/to/image mft_output")
|
||||
print(" 2. Parse MFT: analyzeMFT.py -f $MFT -o mft.csv")
|
||||
print(" 3. Detect: python agent.py mft.csv [--os-install 2022-01-15]")
|
||||
print("\n Indicators detected:")
|
||||
for name, desc in TIMESTOMP_INDICATORS.items():
|
||||
print(f" - {name}: {desc}")
|
||||
print(json.dumps({"demo": True, "indicators": len(TIMESTOMP_INDICATORS)}, indent=2))
|
||||
sys.exit(0)
|
||||
|
||||
os_date = None
|
||||
if args.os_install:
|
||||
try:
|
||||
os_date = datetime.datetime.strptime(args.os_install, "%Y-%m-%d")
|
||||
except ValueError:
|
||||
print(f"[!] Invalid date format: {args.os_install}")
|
||||
|
||||
entries = parse_mft_csv(args.mft_csv)
|
||||
if isinstance(entries, dict) and "error" in entries:
|
||||
print(f"[!] {entries['error']}")
|
||||
sys.exit(1)
|
||||
|
||||
findings = detect_timestomping(entries, os_date)
|
||||
if args.high_only:
|
||||
findings = [f for f in findings if f.get("confidence") == "HIGH"]
|
||||
|
||||
report = generate_report(entries, findings)
|
||||
print(f"[*] MFT entries analyzed: {report['total_mft_entries']}")
|
||||
print(f"[*] Timestomping findings: {report['total_findings']}")
|
||||
print(f" HIGH confidence: {report['high_confidence']}")
|
||||
print(f" MEDIUM confidence: {report['medium_confidence']}")
|
||||
|
||||
for f in findings[:20]:
|
||||
print(f" [{f['confidence']}] {f['filename']}")
|
||||
for desc in f["descriptions"]:
|
||||
print(f" - {desc}")
|
||||
|
||||
if args.output:
|
||||
full_report = {"summary": report, "findings": findings}
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(full_report, f, indent=2)
|
||||
|
||||
print(json.dumps(report, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,18 @@
|
||||
---
|
||||
name: implementing-security-information-sharing-with-stix2
|
||||
description: >
|
||||
Create, validate, and share STIX 2.1 threat intelligence objects using
|
||||
the stix2 Python library. Covers indicators, malware, campaigns,
|
||||
relationships, bundles, and TAXII 2.1 publishing.
|
||||
domain: cybersecurity
|
||||
subdomain: threat-intelligence
|
||||
tags: [stix, taxii, threat-sharing, intelligence-exchange]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Implementing Security Information Sharing with STIX 2.1
|
||||
|
||||
Build and share structured threat intelligence using STIX 2.1 objects
|
||||
with the stix2 Python library and TAXII 2.1 transport protocol.
|
||||
+77
@@ -0,0 +1,77 @@
|
||||
# API Reference: Security Information Sharing with STIX 2.1
|
||||
|
||||
## stix2 Python Library
|
||||
```bash
|
||||
pip install stix2 taxii2-client
|
||||
```
|
||||
|
||||
### Create Objects
|
||||
```python
|
||||
from stix2 import Indicator, Malware, Relationship, Bundle, Identity
|
||||
|
||||
identity = Identity(name="My SOC", identity_class="organization")
|
||||
|
||||
indicator = Indicator(
|
||||
name="Malicious IP",
|
||||
pattern="[ipv4-addr:value = '198.51.100.42']",
|
||||
pattern_type="stix",
|
||||
valid_from="2025-01-01T00:00:00Z",
|
||||
created_by_ref=identity.id,
|
||||
)
|
||||
|
||||
malware = Malware(name="EvilRAT", malware_types=["trojan"], is_family=True)
|
||||
|
||||
rel = Relationship(source_ref=indicator.id, target_ref=malware.id,
|
||||
relationship_type="indicates")
|
||||
|
||||
bundle = Bundle(objects=[identity, indicator, malware, rel])
|
||||
print(bundle.serialize(pretty=True))
|
||||
```
|
||||
|
||||
### Validate and Parse
|
||||
```python
|
||||
import stix2
|
||||
|
||||
parsed = stix2.parse(json_string, allow_custom=True)
|
||||
print(parsed.type, len(parsed.objects))
|
||||
```
|
||||
|
||||
## STIX 2.1 Object Types
|
||||
| Type | Description |
|
||||
|------|------------|
|
||||
| indicator | IOC with STIX pattern |
|
||||
| malware | Malware family/sample |
|
||||
| campaign | Named threat campaign |
|
||||
| threat-actor | Threat group |
|
||||
| attack-pattern | TTP (ATT&CK technique) |
|
||||
| relationship | Link between objects |
|
||||
| sighting | Observation of indicator |
|
||||
| identity | Organization/individual |
|
||||
|
||||
## TAXII 2.1 Publishing
|
||||
```python
|
||||
from taxii2client.v21 import Collection
|
||||
|
||||
collection = Collection(
|
||||
"https://taxii.server.com/taxii2/collections/abc-123/",
|
||||
user="api_user", password="api_pass"
|
||||
)
|
||||
collection.add_objects(bundle.serialize())
|
||||
```
|
||||
|
||||
## TLP Marking Definitions
|
||||
| TLP | stix2 Constant |
|
||||
|-----|---------------|
|
||||
| TLP:CLEAR | stix2.TLP_WHITE |
|
||||
| TLP:GREEN | stix2.TLP_GREEN |
|
||||
| TLP:AMBER | stix2.TLP_AMBER |
|
||||
| TLP:RED | stix2.TLP_RED |
|
||||
|
||||
## STIX Pattern Examples
|
||||
| Type | Pattern |
|
||||
|------|---------|
|
||||
| IPv4 | `[ipv4-addr:value = '1.2.3.4']` |
|
||||
| Domain | `[domain-name:value = 'evil.com']` |
|
||||
| SHA-256 | `[file:hashes.'SHA-256' = 'abc...']` |
|
||||
| URL | `[url:value = 'https://evil.com/mal']` |
|
||||
| Email | `[email-addr:value = 'bad@evil.com']` |
|
||||
@@ -0,0 +1,202 @@
|
||||
#!/usr/bin/env python3
|
||||
"""STIX 2.1 threat intelligence sharing agent.
|
||||
|
||||
Creates, validates, and exports STIX 2.1 objects including indicators,
|
||||
malware, campaigns, and relationships using the stix2 Python library.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
try:
|
||||
import stix2
|
||||
from stix2 import Indicator, Malware, Campaign, Relationship, Bundle
|
||||
from stix2 import ThreatActor, Identity, Sighting, AttackPattern
|
||||
HAS_STIX2 = True
|
||||
except ImportError:
|
||||
HAS_STIX2 = False
|
||||
|
||||
try:
|
||||
from taxii2client.v21 import Collection, Server
|
||||
HAS_TAXII = True
|
||||
except ImportError:
|
||||
HAS_TAXII = False
|
||||
|
||||
|
||||
IDENTITY = None
|
||||
if HAS_STIX2:
|
||||
IDENTITY = Identity(
|
||||
id="identity--f165a29e-a997-5f8a-a63b-4b72b9f2f963",
|
||||
name="Security Operations Center",
|
||||
identity_class="organization",
|
||||
)
|
||||
|
||||
|
||||
def create_indicator(value, indicator_type="ipv4-addr", confidence=80, tlp="TLP:AMBER"):
|
||||
"""Create a STIX 2.1 Indicator object."""
|
||||
if not HAS_STIX2:
|
||||
return {"error": "stix2 not installed. pip install stix2"}
|
||||
pattern_map = {
|
||||
"ipv4-addr": f"[ipv4-addr:value = '{value}']",
|
||||
"domain-name": f"[domain-name:value = '{value}']",
|
||||
"url": f"[url:value = '{value}']",
|
||||
"file-sha256": f"[file:hashes.'SHA-256' = '{value}']",
|
||||
"file-md5": f"[file:hashes.MD5 = '{value}']",
|
||||
"email-addr": f"[email-addr:value = '{value}']",
|
||||
}
|
||||
pattern = pattern_map.get(indicator_type, f"[ipv4-addr:value = '{value}']")
|
||||
marking = stix2.TLP_AMBER if tlp == "TLP:AMBER" else stix2.TLP_GREEN
|
||||
return Indicator(
|
||||
name=f"Malicious {indicator_type}: {value}",
|
||||
pattern=pattern,
|
||||
pattern_type="stix",
|
||||
valid_from=datetime.datetime.now(datetime.timezone.utc),
|
||||
confidence=confidence,
|
||||
created_by_ref=IDENTITY.id,
|
||||
object_marking_refs=[marking],
|
||||
)
|
||||
|
||||
|
||||
def create_malware(name, malware_types=None, is_family=True, description=""):
|
||||
"""Create a STIX 2.1 Malware object."""
|
||||
if not HAS_STIX2:
|
||||
return {"error": "stix2 not installed"}
|
||||
return Malware(
|
||||
name=name,
|
||||
malware_types=malware_types or ["ransomware"],
|
||||
is_family=is_family,
|
||||
description=description or f"Malware family: {name}",
|
||||
created_by_ref=IDENTITY.id,
|
||||
)
|
||||
|
||||
|
||||
def create_campaign(name, description="", first_seen=None):
|
||||
"""Create a STIX 2.1 Campaign object."""
|
||||
if not HAS_STIX2:
|
||||
return {"error": "stix2 not installed"}
|
||||
kwargs = {"name": name, "description": description or f"Campaign: {name}",
|
||||
"created_by_ref": IDENTITY.id}
|
||||
if first_seen:
|
||||
kwargs["first_seen"] = first_seen
|
||||
return Campaign(**kwargs)
|
||||
|
||||
|
||||
def create_relationship(source, target, relationship_type="indicates"):
|
||||
"""Create a STIX 2.1 Relationship."""
|
||||
if not HAS_STIX2:
|
||||
return {"error": "stix2 not installed"}
|
||||
return Relationship(
|
||||
source_ref=source.id if hasattr(source, "id") else source,
|
||||
target_ref=target.id if hasattr(target, "id") else target,
|
||||
relationship_type=relationship_type,
|
||||
created_by_ref=IDENTITY.id,
|
||||
)
|
||||
|
||||
|
||||
def build_threat_report(indicators, malware_obj=None, campaign_obj=None):
|
||||
"""Build a STIX 2.1 Bundle with all objects and relationships."""
|
||||
if not HAS_STIX2:
|
||||
return {"error": "stix2 not installed"}
|
||||
objects = [IDENTITY] + list(indicators)
|
||||
relationships = []
|
||||
|
||||
if malware_obj:
|
||||
objects.append(malware_obj)
|
||||
for ind in indicators:
|
||||
rel = create_relationship(ind, malware_obj, "indicates")
|
||||
relationships.append(rel)
|
||||
|
||||
if campaign_obj:
|
||||
objects.append(campaign_obj)
|
||||
if malware_obj:
|
||||
rel = create_relationship(campaign_obj, malware_obj, "uses")
|
||||
relationships.append(rel)
|
||||
|
||||
objects.extend(relationships)
|
||||
bundle = Bundle(objects=objects)
|
||||
return bundle
|
||||
|
||||
|
||||
def publish_to_taxii(bundle, collection_url, username=None, password=None):
|
||||
"""Publish STIX bundle to TAXII 2.1 collection."""
|
||||
if not HAS_TAXII:
|
||||
return {"error": "taxii2-client not installed. pip install taxii2-client"}
|
||||
try:
|
||||
collection = Collection(collection_url, user=username, password=password)
|
||||
collection.add_objects(bundle.serialize())
|
||||
return {"status": "published", "collection": collection_url,
|
||||
"object_count": len(bundle.objects)}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def validate_bundle(bundle_json):
|
||||
"""Validate a STIX 2.1 bundle."""
|
||||
if not HAS_STIX2:
|
||||
return {"error": "stix2 not installed"}
|
||||
try:
|
||||
parsed = stix2.parse(bundle_json, allow_custom=True)
|
||||
return {"valid": True, "type": parsed.type,
|
||||
"object_count": len(parsed.objects) if hasattr(parsed, "objects") else 1}
|
||||
except Exception as e:
|
||||
return {"valid": False, "error": str(e)}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="STIX 2.1 threat intelligence sharing agent")
|
||||
parser.add_argument("--create-indicator", help="Create indicator from value (e.g. 198.51.100.42)")
|
||||
parser.add_argument("--type", default="ipv4-addr", help="Indicator type (default: ipv4-addr)")
|
||||
parser.add_argument("--malware", help="Create malware object with this name")
|
||||
parser.add_argument("--campaign", help="Create campaign object with this name")
|
||||
parser.add_argument("--validate", help="Validate a STIX JSON file")
|
||||
parser.add_argument("--output", "-o", help="Output STIX bundle JSON path")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("[*] STIX 2.1 Threat Intelligence Sharing Agent")
|
||||
print(f" stix2 available: {HAS_STIX2}")
|
||||
print(f" taxii2-client available: {HAS_TAXII}")
|
||||
|
||||
if args.validate:
|
||||
with open(args.validate) as f:
|
||||
result = validate_bundle(f.read())
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0)
|
||||
|
||||
if not HAS_STIX2:
|
||||
print("[!] Install stix2: pip install stix2")
|
||||
sys.exit(1)
|
||||
|
||||
indicators = []
|
||||
if args.create_indicator:
|
||||
ind = create_indicator(args.create_indicator, args.type)
|
||||
indicators.append(ind)
|
||||
print(f"[+] Created indicator: {ind.name}")
|
||||
else:
|
||||
demo_iocs = [("198.51.100.42", "ipv4-addr"), ("evil.example.com", "domain-name"),
|
||||
("a" * 64, "file-sha256")]
|
||||
for val, itype in demo_iocs:
|
||||
indicators.append(create_indicator(val, itype))
|
||||
print(f"[DEMO] Created {len(indicators)} sample indicators")
|
||||
|
||||
malware_obj = create_malware(args.malware) if args.malware else create_malware("DemoRAT", ["trojan"])
|
||||
campaign_obj = create_campaign(args.campaign) if args.campaign else None
|
||||
bundle = build_threat_report(indicators, malware_obj, campaign_obj)
|
||||
|
||||
print(f"\n[*] Bundle: {bundle.id}")
|
||||
print(f" Objects: {len(bundle.objects)}")
|
||||
for obj in bundle.objects:
|
||||
print(f" - {obj.type}: {getattr(obj, 'name', getattr(obj, 'id', ''))}")
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(bundle.serialize(pretty=True))
|
||||
print(f"[*] Bundle saved to {args.output}")
|
||||
|
||||
print(json.dumps({"objects": len(bundle.objects), "stix_version": "2.1"}, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -0,0 +1,21 @@
|
||||
---
|
||||
name: performing-binary-exploitation-analysis
|
||||
description: >
|
||||
Analyze binary exploitation techniques including buffer overflows and
|
||||
ROP chains using pwntools Python library. Covers checksec analysis,
|
||||
gadget discovery with ROPgadget, and exploit development for CTF and
|
||||
authorized security assessments.
|
||||
domain: cybersecurity
|
||||
subdomain: offensive-security
|
||||
tags: [binary-exploitation, pwntools, rop-chains, buffer-overflow]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
# Performing Binary Exploitation Analysis
|
||||
|
||||
# For authorized security testing and CTF challenges only
|
||||
|
||||
Analyze ELF binaries for exploitation vectors using checksec, ROPgadget,
|
||||
and pwntools for buffer overflow and ROP chain development.
|
||||
@@ -0,0 +1,74 @@
|
||||
# API Reference: Binary Exploitation Analysis
|
||||
|
||||
## pwntools (Python)
|
||||
```bash
|
||||
pip install pwntools
|
||||
```
|
||||
|
||||
### ELF Analysis
|
||||
```python
|
||||
from pwn import ELF, ROP, context
|
||||
|
||||
elf = ELF('./vulnerable_binary')
|
||||
print(elf.checksec()) # Security mitigations
|
||||
print(hex(elf.sym['main'])) # Symbol address
|
||||
print(hex(elf.plt['system'])) # PLT entry
|
||||
print(hex(elf.got['puts'])) # GOT entry
|
||||
|
||||
# ROP gadget discovery
|
||||
rop = ROP(elf)
|
||||
pop_rdi = rop.find_gadget(['pop rdi', 'ret'])[0]
|
||||
ret = rop.find_gadget(['ret'])[0]
|
||||
```
|
||||
|
||||
### Exploit Template
|
||||
```python
|
||||
from pwn import *
|
||||
|
||||
context.binary = elf = ELF('./vuln')
|
||||
p = process('./vuln') # or remote('host', port)
|
||||
payload = flat(b'A' * offset, pop_rdi, next(elf.search(b'/bin/sh')), elf.plt['system'])
|
||||
p.sendline(payload)
|
||||
p.interactive()
|
||||
```
|
||||
|
||||
## checksec CLI
|
||||
```bash
|
||||
checksec --file ./binary
|
||||
checksec --file ./binary --output json
|
||||
```
|
||||
|
||||
### Output Fields
|
||||
| Field | Values | Impact |
|
||||
|-------|--------|--------|
|
||||
| NX | Enabled/Disabled | No shellcode on stack |
|
||||
| PIE | Enabled/Disabled | Randomized addresses |
|
||||
| Canary | Found/Not found | Stack smash detection |
|
||||
| RELRO | Full/Partial/None | GOT write protection |
|
||||
|
||||
## ROPgadget CLI
|
||||
```bash
|
||||
# Find all gadgets
|
||||
ROPgadget --binary ./vuln
|
||||
|
||||
# Search specific gadget
|
||||
ROPgadget --binary ./vuln --only "pop|ret"
|
||||
|
||||
# Generate ROP chain
|
||||
ROPgadget --binary ./vuln --ropchain
|
||||
```
|
||||
|
||||
## Dangerous Functions
|
||||
| Function | Risk |
|
||||
|----------|------|
|
||||
| gets() | Unbounded stdin read |
|
||||
| strcpy() | No length check |
|
||||
| sprintf() | No length check |
|
||||
| scanf() | Possible overflow |
|
||||
|
||||
## MITRE ATT&CK
|
||||
| Technique | Description |
|
||||
|-----------|------------|
|
||||
| T1203 | Exploitation for Client Execution |
|
||||
| T1068 | Exploitation for Privilege Escalation |
|
||||
| T1211 | Exploitation for Defense Evasion |
|
||||
@@ -0,0 +1,202 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Binary exploitation analysis agent.
|
||||
|
||||
# For authorized security testing and CTF challenges only
|
||||
|
||||
Analyzes ELF binaries for security mitigations, discovers ROP gadgets,
|
||||
and assists exploit development using pwntools and checksec.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import struct
|
||||
import subprocess
|
||||
import sys
|
||||
import datetime
|
||||
|
||||
try:
|
||||
from pwn import ELF, ROP, context
|
||||
HAS_PWNTOOLS = True
|
||||
except ImportError:
|
||||
HAS_PWNTOOLS = False
|
||||
|
||||
|
||||
def run_checksec(binary_path):
|
||||
"""Analyze binary security mitigations using checksec."""
|
||||
if HAS_PWNTOOLS:
|
||||
try:
|
||||
elf = ELF(binary_path, checksec=False)
|
||||
return {
|
||||
"arch": elf.arch,
|
||||
"bits": elf.bits,
|
||||
"endian": elf.endian,
|
||||
"nx": elf.nx,
|
||||
"pie": elf.pie,
|
||||
"canary": elf.canary,
|
||||
"relro": "Full" if elf.relro == "Full" else ("Partial" if elf.relro else "None"),
|
||||
"stripped": not elf.sym,
|
||||
"static": elf.statically_linked,
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
try:
|
||||
result = subprocess.run(["checksec", "--file", binary_path, "--output", "json"],
|
||||
capture_output=True, text=True, timeout=10)
|
||||
if result.stdout:
|
||||
return json.loads(result.stdout)
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
|
||||
pass
|
||||
return {"error": "Neither pwntools nor checksec available"}
|
||||
|
||||
|
||||
def find_rop_gadgets(binary_path, max_gadgets=20):
|
||||
"""Find ROP gadgets using pwntools or ROPgadget."""
|
||||
if HAS_PWNTOOLS:
|
||||
try:
|
||||
elf = ELF(binary_path, checksec=False)
|
||||
rop = ROP(elf)
|
||||
gadgets = []
|
||||
for gadget in rop.gadgets.values():
|
||||
if len(gadgets) >= max_gadgets:
|
||||
break
|
||||
gadgets.append({
|
||||
"address": hex(gadget.address),
|
||||
"insns": "; ".join(gadget.insns),
|
||||
})
|
||||
return gadgets
|
||||
except Exception as e:
|
||||
return [{"error": str(e)}]
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ROPgadget", "--binary", binary_path, "--count", str(max_gadgets)],
|
||||
capture_output=True, text=True, timeout=30
|
||||
)
|
||||
gadgets = []
|
||||
for line in result.stdout.splitlines():
|
||||
if " : " in line:
|
||||
parts = line.split(" : ", 1)
|
||||
gadgets.append({"address": parts[0].strip(), "insns": parts[1].strip()})
|
||||
return gadgets[:max_gadgets]
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return [{"error": "Neither pwntools ROP nor ROPgadget available"}]
|
||||
|
||||
|
||||
def find_useful_functions(binary_path):
|
||||
"""Find useful functions for exploitation (system, exec, write, etc.)."""
|
||||
if not HAS_PWNTOOLS:
|
||||
return {"error": "pwntools not available"}
|
||||
try:
|
||||
elf = ELF(binary_path, checksec=False)
|
||||
interesting = ["system", "execve", "exec", "popen", "gets", "strcpy",
|
||||
"sprintf", "read", "write", "puts", "printf", "mprotect"]
|
||||
found = {}
|
||||
for func in interesting:
|
||||
addr = elf.sym.get(func) or elf.plt.get(func)
|
||||
if addr:
|
||||
found[func] = hex(addr)
|
||||
got_entries = {}
|
||||
for name in ["system", "printf", "puts", "__libc_start_main"]:
|
||||
if name in elf.got:
|
||||
got_entries[name] = hex(elf.got[name])
|
||||
return {"functions": found, "got_entries": got_entries}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def find_vulnerable_functions(binary_path):
|
||||
"""Identify potentially vulnerable functions in the binary."""
|
||||
dangerous = {"gets": "Unbounded read - guaranteed buffer overflow",
|
||||
"strcpy": "No length check - possible overflow",
|
||||
"strcat": "No length check - possible overflow",
|
||||
"sprintf": "No length check - possible overflow",
|
||||
"scanf": "Possible format string / overflow",
|
||||
"vsprintf": "No length check - possible overflow"}
|
||||
if not HAS_PWNTOOLS:
|
||||
return {"error": "pwntools not available"}
|
||||
try:
|
||||
elf = ELF(binary_path, checksec=False)
|
||||
found = []
|
||||
for func, reason in dangerous.items():
|
||||
if func in elf.plt or func in elf.sym:
|
||||
found.append({"function": func, "reason": reason,
|
||||
"address": hex(elf.plt.get(func, elf.sym.get(func, 0)))})
|
||||
return found
|
||||
except Exception as e:
|
||||
return [{"error": str(e)}]
|
||||
|
||||
|
||||
def analyze_binary(binary_path):
|
||||
"""Full binary exploitation analysis."""
|
||||
report = {
|
||||
"binary": binary_path,
|
||||
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
|
||||
"checksec": run_checksec(binary_path),
|
||||
"dangerous_functions": find_vulnerable_functions(binary_path),
|
||||
"useful_functions": find_useful_functions(binary_path),
|
||||
"rop_gadgets": find_rop_gadgets(binary_path, max_gadgets=15),
|
||||
}
|
||||
mitigations = report["checksec"]
|
||||
if isinstance(mitigations, dict) and "error" not in mitigations:
|
||||
report["exploit_difficulty"] = "HARD" if all([
|
||||
mitigations.get("nx"), mitigations.get("pie"),
|
||||
mitigations.get("canary"), mitigations.get("relro") == "Full"
|
||||
]) else "MEDIUM" if mitigations.get("nx") else "EASY"
|
||||
return report
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Binary exploitation analysis agent (authorized testing only)"
|
||||
)
|
||||
parser.add_argument("binary", nargs="?", help="Path to ELF binary")
|
||||
parser.add_argument("--checksec-only", action="store_true", help="Only run checksec")
|
||||
parser.add_argument("--gadgets", type=int, default=15, help="Max ROP gadgets to find")
|
||||
parser.add_argument("--output", "-o", help="Output JSON report path")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("[*] Binary Exploitation Analysis Agent")
|
||||
print("[*] For authorized security testing and CTF challenges only")
|
||||
print(f" pwntools available: {HAS_PWNTOOLS}")
|
||||
|
||||
if not args.binary:
|
||||
print("\nUsage: python agent.py /path/to/binary [--checksec-only] [--gadgets 20]")
|
||||
print(" Analyzes: mitigations, dangerous functions, ROP gadgets, GOT entries")
|
||||
print(json.dumps({"demo": True, "pwntools": HAS_PWNTOOLS}, indent=2))
|
||||
sys.exit(0)
|
||||
|
||||
if args.checksec_only:
|
||||
result = run_checksec(args.binary)
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0)
|
||||
|
||||
report = analyze_binary(args.binary)
|
||||
checksec = report.get("checksec", {})
|
||||
if isinstance(checksec, dict) and "error" not in checksec:
|
||||
print(f"\n[*] Architecture: {checksec.get('arch')} ({checksec.get('bits')}-bit)")
|
||||
print(f" NX: {checksec.get('nx')} | PIE: {checksec.get('pie')} | "
|
||||
f"Canary: {checksec.get('canary')} | RELRO: {checksec.get('relro')}")
|
||||
print(f" Exploit difficulty: {report.get('exploit_difficulty', '?')}")
|
||||
|
||||
dangerous = report.get("dangerous_functions", [])
|
||||
if isinstance(dangerous, list) and dangerous:
|
||||
print(f"\n[!] Dangerous functions found: {len(dangerous)}")
|
||||
for d in dangerous:
|
||||
if "error" not in d:
|
||||
print(f" {d['function']} @ {d['address']}: {d['reason']}")
|
||||
|
||||
gadgets = report.get("rop_gadgets", [])
|
||||
if gadgets and "error" not in gadgets[0]:
|
||||
print(f"\n[*] ROP gadgets found: {len(gadgets)}")
|
||||
for g in gadgets[:5]:
|
||||
print(f" {g['address']}: {g['insns']}")
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(report, f, indent=2)
|
||||
print(json.dumps({"difficulty": report.get("exploit_difficulty", "unknown"),
|
||||
"gadgets": len(gadgets)}, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user