diff --git a/skills/analyzing-powershell-empire-artifacts/LICENSE b/skills/analyzing-powershell-empire-artifacts/LICENSE new file mode 100644 index 00000000..e0c9ede6 --- /dev/null +++ b/skills/analyzing-powershell-empire-artifacts/LICENSE @@ -0,0 +1,17 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +Copyright 2025 Mahipal + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/skills/analyzing-powershell-empire-artifacts/SKILL.md b/skills/analyzing-powershell-empire-artifacts/SKILL.md new file mode 100644 index 00000000..fd36c21f --- /dev/null +++ b/skills/analyzing-powershell-empire-artifacts/SKILL.md @@ -0,0 +1,34 @@ +--- +name: analyzing-powershell-empire-artifacts +description: Detect PowerShell Empire framework artifacts in Windows event logs by identifying Base64 encoded launcher patterns, default user agents, staging URL structures, stager IOCs, and known Empire module signatures in Script Block Logging events. +domain: cybersecurity +subdomain: threat-hunting +tags: [PowerShell-Empire, threat-hunting, Script-Block-Logging, base64, stager, C2, MITRE-ATT&CK, T1059.001, forensics] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Analyzing PowerShell Empire Artifacts + +## Overview + +PowerShell Empire is a post-exploitation framework consisting of listeners, stagers, and agents. Its artifacts leave detectable traces in Windows event logs, particularly PowerShell Script Block Logging (Event ID 4104) and Module Logging (Event ID 4103). This skill analyzes event logs for Empire's default launcher string (`powershell -noP -sta -w 1 -enc`), Base64 encoded payloads containing `System.Net.WebClient` and `FromBase64String`, known module invocations (Invoke-Mimikatz, Invoke-Kerberoast, Invoke-TokenManipulation), and staging URL patterns. + +## Prerequisites + +- Python 3.9+ with access to Windows Event Log or exported EVTX files +- PowerShell Script Block Logging (Event ID 4104) enabled via Group Policy +- Module Logging (Event ID 4103) enabled for comprehensive coverage + +## Key Detection Patterns + +1. **Default launcher** — `powershell -noP -sta -w 1 -enc` followed by Base64 blob +2. **Stager indicators** — `System.Net.WebClient`, `DownloadData`, `DownloadString`, `FromBase64String` +3. **Module signatures** — Invoke-Mimikatz, Invoke-Kerberoast, Invoke-TokenManipulation, Invoke-PSInject, Invoke-DCOM +4. **User agent strings** — default Empire user agents in HTTP listener configuration +5. **Staging URLs** — `/login/process.php`, `/admin/get.php` and similar default URI patterns + +## Output + +JSON report with matched IOCs, decoded Base64 payloads, timeline of suspicious events, MITRE ATT&CK technique mappings, and severity scores. diff --git a/skills/analyzing-powershell-empire-artifacts/references/api-reference.md b/skills/analyzing-powershell-empire-artifacts/references/api-reference.md new file mode 100644 index 00000000..29aa10c2 --- /dev/null +++ b/skills/analyzing-powershell-empire-artifacts/references/api-reference.md @@ -0,0 +1,101 @@ +# PowerShell Empire Artifact Detection Reference + +## Enable Script Block Logging (GPO) + +``` +Computer Configuration > Administrative Templates > Windows Components > +Windows PowerShell > Turn on PowerShell Script Block Logging: Enabled +``` + +Registry: `HKLM\SOFTWARE\Policies\Microsoft\Windows\PowerShell\ScriptBlockLogging` +- `EnableScriptBlockLogging` = 1 + +## Enable Module Logging (GPO) + +``` +Computer Configuration > Administrative Templates > Windows Components > +Windows PowerShell > Turn on Module Logging: Enabled +Module Names: * +``` + +## Key Event IDs + +| Event ID | Log | Description | +|----------|-----|-------------| +| 4104 | Microsoft-Windows-PowerShell/Operational | Script Block Logging — captures executed script text | +| 4103 | Microsoft-Windows-PowerShell/Operational | Module Logging — captures pipeline execution details | +| 4688 | Security | Process Creation — captures command line arguments | +| 800 | Windows PowerShell | Pipeline execution (legacy) | + +## Default Empire Launcher Pattern + +``` +powershell -noP -sta -w 1 -enc +``` + +### Launcher Flags + +| Flag | Meaning | +|------|---------| +| `-noP` | No profile — skips PowerShell profile scripts | +| `-sta` | Single-threaded apartment | +| `-w 1` | Window style hidden | +| `-enc` | Encoded command (Base64 UTF-16LE) | + +## Empire Stager IOC Patterns + +| Pattern | Context | +|---------|---------| +| `System.Net.WebClient` | Downloads stager payload from listener | +| `.DownloadString()` | Fetches PowerShell script from C2 | +| `.DownloadData()` | Fetches binary data from C2 | +| `[System.Convert]::FromBase64String` | Decodes embedded payload | +| `IEX()` / `Invoke-Expression` | Executes downloaded script | +| `New-Object System.Net.WebClient` | Creates web client for download | + +## Empire Module Signatures + +| Module | MITRE | Description | +|--------|-------|-------------| +| `Invoke-Mimikatz` | T1003.001 | Credential dumping via Mimikatz | +| `Invoke-Kerberoast` | T1558.003 | Service ticket requests for offline cracking | +| `Invoke-TokenManipulation` | T1134 | Access token manipulation | +| `Invoke-PSInject` | T1055.012 | Process hollowing injection | +| `Invoke-DCOM` | T1021.003 | Lateral movement via DCOM | +| `Invoke-SMBExec` | T1021.002 | SMB-based lateral movement | +| `Invoke-WMIExec` | T1047 | WMI-based execution | +| `Invoke-RunAs` | T1134.002 | Create process with alternate token | +| `Invoke-SessionGopher` | T1552.001 | Extract saved session credentials | +| `Install-SSP` | T1547.005 | Security Support Provider persistence | +| `New-GPOImmediateTask` | T1484.001 | GPO abuse for execution | + +## Default Empire Staging URIs + +``` +/login/process.php +/admin/get.php +/admin/news.php +/news.php +/login/process.jsp +``` + +## Splunk Detection Query + +```spl +index=wineventlog source="WinEventLog:Microsoft-Windows-PowerShell/Operational" EventCode=4104 +| where match(ScriptBlockText, "(?i)system\.net\.webclient") AND match(ScriptBlockText, "(?i)frombase64string") +| stats count by Computer, UserID, ScriptBlockText +``` + +## Elastic KQL Detection + +``` +event.code: "4104" AND powershell.file.script_block_text: (*System.Net.WebClient* AND *FromBase64String*) +``` + +## MITRE ATT&CK Mapping + +- **T1059.001** — Command and Scripting Interpreter: PowerShell +- **T1071.001** — Application Layer Protocol: Web Protocols +- **T1027** — Obfuscated Files or Information +- **T1105** — Ingress Tool Transfer diff --git a/skills/analyzing-powershell-empire-artifacts/scripts/agent.py b/skills/analyzing-powershell-empire-artifacts/scripts/agent.py new file mode 100644 index 00000000..b540e84e --- /dev/null +++ b/skills/analyzing-powershell-empire-artifacts/scripts/agent.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +"""Detect PowerShell Empire framework artifacts in Windows event logs.""" + +import argparse +import base64 +import json +import re +import subprocess +import sys +from datetime import datetime, timezone + + +EMPIRE_LAUNCHER_PATTERN = re.compile( + r"powershell\s+-noP\s+-sta\s+-w\s+1\s+-enc\s+", re.IGNORECASE +) + +EMPIRE_STAGER_PATTERNS = [ + re.compile(r"System\.Net\.WebClient", re.IGNORECASE), + re.compile(r"\.DownloadString\(", re.IGNORECASE), + re.compile(r"\.DownloadData\(", re.IGNORECASE), + re.compile(r"FromBase64String", re.IGNORECASE), + re.compile(r"IEX\s*\(", re.IGNORECASE), + re.compile(r"Invoke-Expression", re.IGNORECASE), + re.compile(r"New-Object\s+System\.Net\.WebClient", re.IGNORECASE), + re.compile(r"\[System\.Convert\]::FromBase64String", re.IGNORECASE), +] + +EMPIRE_MODULE_SIGNATURES = [ + "Invoke-Mimikatz", + "Invoke-Kerberoast", + "Invoke-TokenManipulation", + "Invoke-PSInject", + "Invoke-DCOM", + "Invoke-RunAs", + "Invoke-PSRemoting", + "Invoke-SessionGopher", + "Invoke-ReflectivePEInjection", + "Install-SSP", + "New-GPOImmediateTask", + "Get-Keystrokes", + "Get-Screenshot", + "Get-ClipboardContents", + "Invoke-Portscan", + "Invoke-SMBExec", + "Invoke-WMIExec", +] + +EMPIRE_DEFAULT_URIS = [ + "/login/process.php", + "/admin/get.php", + "/admin/news.php", + "/news.php", + "/login/process.jsp", +] + +EMPIRE_DEFAULT_USER_AGENTS = [ + "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko", + "Mozilla/5.0 (compatible, MSIE 11, Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko", +] + + +def query_event_log(event_id, log_name="Microsoft-Windows-PowerShell/Operational", max_events=1000): + """Query Windows Event Log for specific event ID using wevtutil.""" + cmd = [ + "wevtutil", "qe", log_name, + "/q:*[System[(EventID={})]]".format(event_id), + "/c:{}".format(max_events), + "/f:xml", "/rd:true" + ] + try: + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + return proc.stdout + except FileNotFoundError: + return "" + except subprocess.TimeoutExpired: + return "" + + +def parse_script_block_events(xml_data): + """Parse Event ID 4104 Script Block Logging events from XML.""" + events = [] + if not xml_data: + return events + + blocks = re.split(r"([^<]+)", block) + script_match = re.search(r"([^<]+)", block) + sid_match = re.search(r"= 2: + findings.append({ + "indicator": "empire_stager_patterns", + "severity": "High", + "description": f"Multiple Empire stager patterns detected: {', '.join(matched_stagers[:5])}", + "matched_count": len(matched_stagers), + "mitre": "T1059.001" + }) + + # Check for known Empire module signatures + for module in EMPIRE_MODULE_SIGNATURES: + if module.lower() in script_text.lower(): + findings.append({ + "indicator": "empire_module", + "severity": "Critical", + "module_name": module, + "description": f"Empire post-exploitation module detected: {module}", + "mitre": "T1059.001" + }) + + # Check for Empire default URIs + for uri in EMPIRE_DEFAULT_URIS: + if uri in script_text: + findings.append({ + "indicator": "empire_staging_uri", + "severity": "High", + "uri": uri, + "description": f"Default Empire staging URI detected: {uri}", + "mitre": "T1071.001" + }) + + # Check for Empire user agents + for ua in EMPIRE_DEFAULT_USER_AGENTS: + if ua in script_text: + findings.append({ + "indicator": "empire_default_useragent", + "severity": "Medium", + "user_agent": ua, + "description": "Default Empire HTTP listener user agent detected", + "mitre": "T1071.001" + }) + + # Check for encoded command patterns + b64_blocks = re.findall(r"[A-Za-z0-9+/]{100,}={0,2}", script_text) + for b64 in b64_blocks[:5]: + decoded = decode_base64_payload(b64) + if decoded and any(p.search(decoded) for p in EMPIRE_STAGER_PATTERNS): + findings.append({ + "indicator": "encoded_empire_payload", + "severity": "Critical", + "description": "Base64 encoded payload contains Empire stager patterns", + "decoded_preview": decoded[:300], + "mitre": "T1027" + }) + + return findings + + +def scan_event_logs(max_events=1000): + """Scan PowerShell event logs for Empire artifacts.""" + results = { + "scan_time": datetime.now(timezone.utc).isoformat(), + "events_analyzed": 0, + "suspicious_events": [], + "summary": { + "total_findings": 0, + "critical": 0, + "high": 0, + "medium": 0 + } + } + + # Query Event ID 4104 (Script Block Logging) + xml_data = query_event_log(4104, "Microsoft-Windows-PowerShell/Operational", max_events) + events = parse_script_block_events(xml_data) + results["events_analyzed"] = len(events) + + for event in events: + findings = analyze_script_block(event["script_block"]) + if findings: + results["suspicious_events"].append({ + "timestamp": event["timestamp"], + "computer": event["computer"], + "user_sid": event["user_sid"], + "findings": findings, + "script_preview": event["script_block"][:200] + }) + for f in findings: + results["summary"]["total_findings"] += 1 + sev = f.get("severity", "").lower() + if sev in results["summary"]: + results["summary"][sev] += 1 + + # Also check Event ID 4103 (Module Logging) + xml_4103 = query_event_log(4103, "Microsoft-Windows-PowerShell/Operational", max_events) + events_4103 = parse_script_block_events(xml_4103) + for event in events_4103: + for module in EMPIRE_MODULE_SIGNATURES: + if module.lower() in event.get("script_block", "").lower(): + results["suspicious_events"].append({ + "timestamp": event["timestamp"], + "computer": event["computer"], + "event_id": 4103, + "findings": [{ + "indicator": "empire_module_in_module_log", + "severity": "Critical", + "module_name": module, + "mitre": "T1059.001" + }] + }) + results["summary"]["total_findings"] += 1 + results["summary"]["critical"] += 1 + + return results + + +def analyze_script_file(filepath): + """Analyze a PowerShell script file or exported log for Empire artifacts.""" + with open(filepath, "r", encoding="utf-8", errors="replace") as f: + content = f.read() + + findings = analyze_script_block(content) + return { + "file": filepath, + "scan_time": datetime.now(timezone.utc).isoformat(), + "findings": findings, + "finding_count": len(findings) + } + + +def main(): + parser = argparse.ArgumentParser( + description="Detect PowerShell Empire artifacts in event logs and scripts" + ) + subparsers = parser.add_subparsers(dest="command", help="Analysis mode") + + scan_parser = subparsers.add_parser("scan-logs", help="Scan Windows event logs for Empire IOCs") + scan_parser.add_argument("--max-events", type=int, default=1000, help="Max events to query (default: 1000)") + + file_parser = subparsers.add_parser("analyze-file", help="Analyze a PowerShell script or log file") + file_parser.add_argument("file", help="Path to script or log file") + + decode_parser = subparsers.add_parser("decode", help="Decode Base64 encoded PowerShell payload") + decode_parser.add_argument("payload", help="Base64 encoded string") + + args = parser.parse_args() + + if args.command == "scan-logs": + result = scan_event_logs(args.max_events) + elif args.command == "analyze-file": + result = analyze_script_file(args.file) + elif args.command == "decode": + decoded = decode_base64_payload(args.payload) + result = { + "encoded": args.payload[:100] + "..." if len(args.payload) > 100 else args.payload, + "decoded": decoded, + "empire_indicators": analyze_script_block(decoded) if decoded else [] + } + else: + parser.print_help() + sys.exit(0) + + print(json.dumps(result, indent=2, default=str)) + + +if __name__ == "__main__": + main() diff --git a/skills/detecting-azure-storage-account-misconfigurations/LICENSE b/skills/detecting-azure-storage-account-misconfigurations/LICENSE new file mode 100644 index 00000000..e0c9ede6 --- /dev/null +++ b/skills/detecting-azure-storage-account-misconfigurations/LICENSE @@ -0,0 +1,17 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +Copyright 2025 Mahipal + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/skills/detecting-azure-storage-account-misconfigurations/SKILL.md b/skills/detecting-azure-storage-account-misconfigurations/SKILL.md new file mode 100644 index 00000000..bdbf9e35 --- /dev/null +++ b/skills/detecting-azure-storage-account-misconfigurations/SKILL.md @@ -0,0 +1,35 @@ +--- +name: detecting-azure-storage-account-misconfigurations +description: Audit Azure Blob and ADLS storage accounts for public access exposure, weak or long-lived SAS tokens, missing encryption at rest, disabled HTTPS-only traffic, and outdated TLS versions using the azure-mgmt-storage Python SDK. +domain: cybersecurity +subdomain: cloud-security +tags: [Azure, storage-accounts, blob-storage, ADLS, SAS-tokens, encryption, public-access, cloud-misconfiguration, azure-mgmt-storage] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Detecting Azure Storage Account Misconfigurations + +## Overview + +Azure Storage accounts are a frequent target for attackers due to misconfigured public access, long-lived SAS tokens, missing encryption, and outdated TLS versions. This skill uses the azure-mgmt-storage Python SDK with StorageManagementClient to enumerate all storage accounts in a subscription, inspect their security properties, list blob containers for public access settings, and generate a risk-scored audit report identifying critical misconfigurations. + +## Prerequisites + +- Python 3.9+ with `azure-mgmt-storage`, `azure-identity` +- Azure service principal with Reader role on target subscription +- Environment variables: AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET, AZURE_SUBSCRIPTION_ID + +## Key Detection Areas + +1. **Public blob access** — `allow_blob_public_access` enabled on storage account or individual containers set to Blob/Container access level +2. **HTTPS enforcement** — `enable_https_traffic_only` disabled, allowing unencrypted HTTP traffic +3. **Minimum TLS version** — accounts accepting TLS 1.0 or TLS 1.1 instead of minimum TLS 1.2 +4. **Encryption at rest** — storage service encryption not enabled or missing customer-managed keys +5. **Network rules** — default action set to Allow instead of Deny, exposing storage to all networks +6. **SAS token risks** — account-level SAS with overly broad permissions or excessive lifetime + +## Output + +JSON report with per-account findings, severity ratings (Critical/High/Medium/Low), and remediation recommendations aligned with CIS Azure Benchmark controls. diff --git a/skills/detecting-azure-storage-account-misconfigurations/references/api-reference.md b/skills/detecting-azure-storage-account-misconfigurations/references/api-reference.md new file mode 100644 index 00000000..d6d8d566 --- /dev/null +++ b/skills/detecting-azure-storage-account-misconfigurations/references/api-reference.md @@ -0,0 +1,106 @@ +# Azure Storage Account Misconfiguration Detection Reference + +## SDK Installation + +```bash +pip install azure-mgmt-storage azure-identity +``` + +## StorageManagementClient Initialization + +```python +from azure.identity import DefaultAzureCredential +from azure.mgmt.storage import StorageManagementClient + +client = StorageManagementClient( + credential=DefaultAzureCredential(), + subscription_id="" +) +``` + +## Key Operations + +### List All Storage Accounts + +```python +for account in client.storage_accounts.list(): + print(account.name, account.location, account.kind) +``` + +### Get Storage Account Properties + +```python +account = client.storage_accounts.get_properties( + resource_group_name="myResourceGroup", + account_name="mystorageaccount" +) +``` + +### List Blob Containers + +```python +containers = client.blob_containers.list( + resource_group_name="myResourceGroup", + account_name="mystorageaccount" +) +for container in containers: + print(container.name, container.public_access) +``` + +## Security Properties to Audit + +| Property | Secure Value | Risk if Misconfigured | +|----------|-------------|----------------------| +| `allow_blob_public_access` | `False` | Critical — data exposed to internet | +| `enable_https_traffic_only` | `True` | High — credentials sent in cleartext | +| `minimum_tls_version` | `TLS1_2` | High — vulnerable to downgrade attacks | +| `encryption.services.blob.enabled` | `True` | High — data at rest unencrypted | +| `encryption.key_source` | `Microsoft.Keyvault` | Low — Microsoft-managed keys less controlled | +| `network_rule_set.default_action` | `Deny` | High — storage open to all networks | +| `encryption.require_infrastructure_encryption` | `True` | Low — no double encryption | + +## Container Public Access Levels + +| Level | Description | Risk | +|-------|-------------|------| +| `None` | Private, no public access | Safe | +| `Blob` | Anonymous read for blobs only | High | +| `Container` | Anonymous read for container and blobs | Critical | + +## Azure CLI Equivalents + +```bash +# List storage accounts +az storage account list --query "[].{name:name, publicAccess:allowBlobPublicAccess, httpsOnly:enableHttpsTrafficOnly, minTls:minimumTlsVersion}" -o table + +# Check specific account +az storage account show -n mystorageaccount -g myResourceGroup + +# List containers with access level +az storage container list --account-name mystorageaccount --query "[].{name:name, publicAccess:properties.publicAccess}" -o table + +# Disable public blob access +az storage account update -n mystorageaccount -g myResourceGroup --allow-blob-public-access false + +# Set minimum TLS +az storage account update -n mystorageaccount -g myResourceGroup --min-tls-version TLS1_2 +``` + +## CIS Azure Benchmark Controls + +| Control | Description | +|---------|-------------| +| 3.1 | Ensure 'Secure transfer required' is enabled | +| 3.7 | Ensure default network access rule is set to deny | +| 3.8 | Ensure 'Trusted Microsoft Services' is enabled | +| 3.10 | Ensure storage logging is enabled for Blob service | +| 3.12 | Ensure storage account access keys are periodically regenerated | + +## Environment Variables + +| Variable | Description | +|----------|-------------| +| `AZURE_SUBSCRIPTION_ID` | Target Azure subscription | +| `AZURE_CLIENT_ID` | Service principal application ID | +| `AZURE_TENANT_ID` | Azure AD tenant ID | +| `AZURE_CLIENT_SECRET` | Service principal secret | diff --git a/skills/detecting-azure-storage-account-misconfigurations/scripts/agent.py b/skills/detecting-azure-storage-account-misconfigurations/scripts/agent.py new file mode 100644 index 00000000..51439559 --- /dev/null +++ b/skills/detecting-azure-storage-account-misconfigurations/scripts/agent.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +"""Audit Azure Storage accounts for security misconfigurations using azure-mgmt-storage SDK.""" + +import argparse +import json +import os +import sys +from datetime import datetime + + +def get_storage_client(): + """Initialize StorageManagementClient with DefaultAzureCredential.""" + try: + from azure.identity import DefaultAzureCredential + from azure.mgmt.storage import StorageManagementClient + except ImportError: + print("Install required packages: pip install azure-mgmt-storage azure-identity", file=sys.stderr) + sys.exit(1) + + subscription_id = os.environ.get("AZURE_SUBSCRIPTION_ID") + if not subscription_id: + print("Set AZURE_SUBSCRIPTION_ID environment variable", file=sys.stderr) + sys.exit(1) + + credential = DefaultAzureCredential() + return StorageManagementClient(credential=credential, subscription_id=subscription_id) + + +def audit_storage_account(client, account): + """Audit a single storage account for misconfigurations.""" + findings = [] + resource_group = account.id.split("/")[4] + account_name = account.name + + # Check public blob access + if account.allow_blob_public_access is True: + findings.append({ + "check": "public_blob_access", + "severity": "Critical", + "message": f"Storage account '{account_name}' allows public blob access", + "remediation": "Set allow_blob_public_access to false on the storage account" + }) + + # Check HTTPS-only enforcement + if account.enable_https_traffic_only is False: + findings.append({ + "check": "https_enforcement", + "severity": "High", + "message": f"Storage account '{account_name}' allows HTTP traffic", + "remediation": "Enable 'Secure transfer required' in storage account settings" + }) + + # Check minimum TLS version + min_tls = getattr(account, "minimum_tls_version", None) + if min_tls and min_tls in ("TLS1_0", "TLS1_1"): + findings.append({ + "check": "minimum_tls_version", + "severity": "High", + "message": f"Storage account '{account_name}' allows {min_tls} (should be TLS1_2)", + "remediation": "Set minimum TLS version to TLS1_2" + }) + + # Check encryption at rest + encryption = account.encryption + if encryption: + if not getattr(encryption.services, "blob", None) or not encryption.services.blob.enabled: + findings.append({ + "check": "blob_encryption", + "severity": "High", + "message": f"Storage account '{account_name}' does not have blob encryption enabled", + "remediation": "Enable Azure Storage Service Encryption for blobs" + }) + if not getattr(encryption.services, "file", None) or not encryption.services.file.enabled: + findings.append({ + "check": "file_encryption", + "severity": "Medium", + "message": f"Storage account '{account_name}' does not have file encryption enabled", + "remediation": "Enable Azure Storage Service Encryption for files" + }) + else: + findings.append({ + "check": "encryption_missing", + "severity": "Critical", + "message": f"Storage account '{account_name}' has no encryption configuration", + "remediation": "Enable Azure Storage Service Encryption" + }) + + # Check network rules - default action + network_rules = account.network_rule_set + if network_rules and network_rules.default_action == "Allow": + findings.append({ + "check": "network_default_allow", + "severity": "High", + "message": f"Storage account '{account_name}' allows access from all networks", + "remediation": "Set network default action to Deny and add specific virtual network/IP rules" + }) + + # Check infrastructure encryption (double encryption) + if encryption and not getattr(encryption, "require_infrastructure_encryption", False): + findings.append({ + "check": "infrastructure_encryption", + "severity": "Low", + "message": f"Storage account '{account_name}' does not use infrastructure encryption (double encryption)", + "remediation": "Enable infrastructure encryption for additional protection" + }) + + # Check key source - prefer customer-managed keys + if encryption and getattr(encryption, "key_source", None) == "Microsoft.Storage": + findings.append({ + "check": "customer_managed_keys", + "severity": "Low", + "message": f"Storage account '{account_name}' uses Microsoft-managed keys instead of customer-managed keys", + "remediation": "Configure customer-managed keys via Azure Key Vault for enhanced control" + }) + + return { + "account_name": account_name, + "resource_group": resource_group, + "location": account.location, + "sku": account.sku.name if account.sku else "unknown", + "kind": account.kind, + "findings": findings, + "finding_count": len(findings) + } + + +def audit_blob_containers(client, account): + """Check blob containers for individual public access settings.""" + resource_group = account.id.split("/")[4] + container_findings = [] + + try: + containers = client.blob_containers.list( + resource_group_name=resource_group, + account_name=account.name + ) + for container in containers: + public_access = getattr(container, "public_access", None) + if public_access and public_access != "None": + container_findings.append({ + "container_name": container.name, + "public_access_level": str(public_access), + "severity": "Critical", + "message": f"Container '{container.name}' has public access level: {public_access}", + "remediation": "Set container public access level to 'None' (private)" + }) + except Exception as e: + container_findings.append({ + "error": f"Could not list containers for {account.name}: {str(e)}" + }) + + return container_findings + + +def run_audit(args): + """Run the full storage account audit.""" + client = get_storage_client() + results = { + "scan_time": datetime.utcnow().isoformat() + "Z", + "subscription_id": os.environ.get("AZURE_SUBSCRIPTION_ID", ""), + "accounts": [], + "summary": { + "total_accounts": 0, + "accounts_with_findings": 0, + "critical": 0, + "high": 0, + "medium": 0, + "low": 0 + } + } + + accounts = list(client.storage_accounts.list()) + results["summary"]["total_accounts"] = len(accounts) + + for account in accounts: + account_result = audit_storage_account(client, account) + + if args.check_containers: + container_findings = audit_blob_containers(client, account) + account_result["container_findings"] = container_findings + + results["accounts"].append(account_result) + + if account_result["finding_count"] > 0: + results["summary"]["accounts_with_findings"] += 1 + + for finding in account_result["findings"]: + severity = finding.get("severity", "").lower() + if severity in results["summary"]: + results["summary"][severity] += 1 + + return results + + +def main(): + parser = argparse.ArgumentParser( + description="Audit Azure Storage accounts for security misconfigurations" + ) + parser.add_argument( + "--check-containers", action="store_true", + help="Also check individual blob container public access settings" + ) + parser.add_argument( + "--output", "-o", default="-", + help="Output file path (default: stdout)" + ) + parser.add_argument( + "--severity-filter", choices=["critical", "high", "medium", "low"], + help="Only show findings at or above this severity level" + ) + args = parser.parse_args() + + results = run_audit(args) + + if args.severity_filter: + severity_order = {"critical": 4, "high": 3, "medium": 2, "low": 1} + min_severity = severity_order[args.severity_filter] + for account in results["accounts"]: + account["findings"] = [ + f for f in account["findings"] + if severity_order.get(f.get("severity", "").lower(), 0) >= min_severity + ] + account["finding_count"] = len(account["findings"]) + + output_json = json.dumps(results, indent=2) + + if args.output == "-": + print(output_json) + else: + with open(args.output, "w") as f: + f.write(output_json) + print(f"Audit report written to {args.output}", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/skills/hunting-for-ntlm-relay-attacks/LICENSE b/skills/hunting-for-ntlm-relay-attacks/LICENSE new file mode 100644 index 00000000..e0c9ede6 --- /dev/null +++ b/skills/hunting-for-ntlm-relay-attacks/LICENSE @@ -0,0 +1,17 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +Copyright 2025 Mahipal + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/skills/hunting-for-ntlm-relay-attacks/SKILL.md b/skills/hunting-for-ntlm-relay-attacks/SKILL.md new file mode 100644 index 00000000..cefc2b63 --- /dev/null +++ b/skills/hunting-for-ntlm-relay-attacks/SKILL.md @@ -0,0 +1,35 @@ +--- +name: hunting-for-ntlm-relay-attacks +description: Detect NTLM relay attacks by analyzing Windows Event 4624 logon type 3 with NTLMSSP authentication, identifying IP-to-hostname mismatches, Responder traffic signatures, SMB signing status, and suspicious authentication patterns across the domain. +domain: cybersecurity +subdomain: threat-hunting +tags: [NTLM-relay, Windows-events, Event-4624, NTLMSSP, Responder, SMB-signing, credential-access, T1557.001, Active-Directory] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Hunting for NTLM Relay Attacks + +## Overview + +NTLM relay attacks intercept and forward NTLM authentication messages to gain unauthorized access to network resources. Attackers use tools like Responder for LLMNR/NBT-NS poisoning and ntlmrelayx for credential relay. This skill detects relay activity by querying Windows Security Event 4624 (successful logon) for type 3 network logons with NTLMSSP authentication, identifying mismatches between WorkstationName and source IpAddress, detecting rapid multi-host authentication from single accounts, and auditing SMB signing configuration across domain hosts. + +## Prerequisites + +- Python 3.9+ with Windows Event Log access or exported logs +- Windows Security audit logging enabled (Event ID 4624, 4625, 5145) +- Network access for SMB signing status checks + +## Key Detection Areas + +1. **IP-hostname mismatch** — WorkstationName in Event 4624 does not resolve to the source IpAddress +2. **NTLMSSP authentication** — logon events using NTLM instead of Kerberos from domain-joined hosts +3. **Machine account relay** — computer accounts (ending in $) authenticating from unexpected IPs +4. **Rapid authentication** — single account authenticating to multiple hosts within seconds +5. **Named pipe access** — Event 5145 showing access to Spoolss, lsarpc, netlogon, samr pipes +6. **SMB signing disabled** — hosts not enforcing SMB signing, enabling relay attacks + +## Output + +JSON report with suspected relay events, IP-hostname correlation anomalies, SMB signing audit results, and MITRE ATT&CK mapping to T1557.001. diff --git a/skills/hunting-for-ntlm-relay-attacks/references/api-reference.md b/skills/hunting-for-ntlm-relay-attacks/references/api-reference.md new file mode 100644 index 00000000..0d0780d1 --- /dev/null +++ b/skills/hunting-for-ntlm-relay-attacks/references/api-reference.md @@ -0,0 +1,113 @@ +# NTLM Relay Attack Detection Reference + +## Windows Event IDs + +| Event ID | Log | Description | +|----------|-----|-------------| +| 4624 | Security | Successful logon — primary relay detection event | +| 4625 | Security | Failed logon — may indicate relay attempts | +| 5145 | Security | Network share object access — named pipe monitoring | +| 4776 | Security | NTLM credential validation | + +## Event 4624 Fields for Relay Detection + +| Field | Suspicious Value | Significance | +|-------|-----------------|--------------| +| LogonType | 3 (Network) | Relay always produces network logon | +| AuthenticationPackageName | NTLMSSP | NTLM used instead of Kerberos | +| LmPackageName | NTLM V1 | Downgraded to NTLMv1 (very suspicious) | +| WorkstationName | Mismatch with IpAddress | Key relay indicator | +| TargetUserSid | S-1-0-0 (NULL SID) | Unauthenticated relay attempt | +| LogonGuid | {00000000-...} | Empty GUID indicates relay | +| ImpersonationLevel | Impersonation | Relay uses impersonation | + +## Suspicious Named Pipes (Event 5145) + +| Pipe Name | Service | Relay Target | +|-----------|---------|-------------| +| `spoolss` | Print Spooler | PrinterBug/SpoolSample | +| `lsarpc` | LSA | PetitPotam, DFSCoerce | +| `netlogon` | Netlogon | ZeroLogon relay | +| `samr` | SAM | User enumeration | +| `efsrpc` | EFS | PetitPotam | +| `netdfs` | DFS | DFSCoerce | +| `srvsvc` | Server Service | General relay | + +## Splunk Detection Query + +```spl +index=wineventlog EventCode=4624 Logon_Type=3 Authentication_Package=NTLM +| eval hostname_ip_match=if(Workstation_Name==src_ip OR isnull(Workstation_Name), "match", "mismatch") +| where hostname_ip_match="mismatch" +| stats count values(src_ip) as source_ips values(Workstation_Name) as workstations by Account_Name, Computer +| where count > 3 +``` + +## Elastic EQL Detection (NTLM Relay Against Computer Account) + +```eql +sequence by winlog.computer_name with maxspan=5s + [any where event.code == "5145" and + winlog.event_data.RelativeTargetName in ("spoolss","netdfs","lsarpc","samr","efsrpc","netlogon") and + winlog.event_data.SubjectUserName != winlog.computer_name] + [authentication where event.code in ("4624","4625") and + winlog.event_data.AuthenticationPackageName == "NTLM" and + winlog.event_data.LogonType == "3" and + winlog.event_data.TargetUserName : "*$"] +``` + +## PowerShell Detection + +```powershell +# Query NTLM type 3 logons +Get-WinEvent -FilterHashtable @{LogName='Security'; Id=4624} | + Where-Object { + $_.Properties[8].Value -eq 3 -and + $_.Properties[14].Value -match 'NTLM' + } | Select-Object TimeCreated, + @{N='User';E={$_.Properties[5].Value}}, + @{N='Workstation';E={$_.Properties[11].Value}}, + @{N='SourceIP';E={$_.Properties[18].Value}}, + @{N='AuthPkg';E={$_.Properties[14].Value}} + +# Check SMB signing +Get-SmbServerConfiguration | Select-Object RequireSecuritySignature, EnableSecuritySignature +``` + +## SMB Signing Enforcement + +```powershell +# Enable SMB signing (require on server) +Set-SmbServerConfiguration -RequireSecuritySignature $true -Force + +# Group Policy path +# Computer Configuration > Policies > Windows Settings > Security Settings > +# Local Policies > Security Options > +# Microsoft network server: Digitally sign communications (always): Enabled +``` + +## Common Relay Tools (Detection Signatures) + +| Tool | Network Signature | +|------|------------------| +| Responder | LLMNR/NBT-NS responses from non-authoritative source | +| ntlmrelayx | Rapid sequential NTLM auth from single source IP | +| PetitPotam | EFS RPC calls to \\attacker\share via lsarpc pipe | +| PrinterBug | RPC call to spoolss pipe targeting attacker listener | +| mitm6 | DHCPv6 responses with rogue DNS server | + +## MITRE ATT&CK Mapping + +- **T1557.001** — Adversary-in-the-Middle: LLMNR/NBT-NS Poisoning and SMB Relay +- **T1187** — Forced Authentication +- **T1003.001** — OS Credential Dumping: LSASS Memory +- **TA0006** — Credential Access (Tactic) + +## Response Checklist + +1. Enable SMB signing on all domain hosts via GPO +2. Disable LLMNR: `Set-DnsClientGlobalSetting -SuffixSearchList @("")` +3. Disable NBT-NS in network adapter advanced settings +4. Enable Extended Protection for Authentication (EPA) +5. Enforce NTLMv2 and deny NTLMv1: `LmCompatibilityLevel = 5` +6. Deploy SMB signing GPO: `RequireSecuritySignature = 1` diff --git a/skills/hunting-for-ntlm-relay-attacks/scripts/agent.py b/skills/hunting-for-ntlm-relay-attacks/scripts/agent.py new file mode 100644 index 00000000..1b3c1ba6 --- /dev/null +++ b/skills/hunting-for-ntlm-relay-attacks/scripts/agent.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3 +"""Detect NTLM relay attacks via Windows Event 4624 analysis, IP-hostname correlation, and SMB signing audit.""" + +import argparse +import json +import re +import subprocess +import sys +from collections import defaultdict +from datetime import datetime, timezone + + +def query_security_log(event_id, max_events=2000): + """Query Windows Security event log for specific event ID using wevtutil.""" + cmd = [ + "wevtutil", "qe", "Security", + "/q:*[System[(EventID={})]]".format(event_id), + "/c:{}".format(max_events), + "/f:xml", "/rd:true" + ] + try: + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + return proc.stdout + except (FileNotFoundError, subprocess.TimeoutExpired): + return "" + + +def parse_4624_events(xml_data): + """Parse Event 4624 logon events from XML output.""" + events = [] + if not xml_data: + return events + + blocks = re.split(r"([^<]+)", block) + + data_fields = {} + for m in re.finditer(r"([^<]*)", block): + data_fields[m.group(1)] = m.group(2) + + logon_type = data_fields.get("LogonType", "") + if logon_type != "3": + continue + + auth_package = data_fields.get("AuthenticationPackageName", "") + if "NTLM" not in auth_package.upper(): + continue + + events.append({ + "timestamp": time_match.group(1) if time_match else "", + "computer": computer_match.group(1) if computer_match else "", + "target_username": data_fields.get("TargetUserName", ""), + "target_domain": data_fields.get("TargetDomainName", ""), + "logon_type": logon_type, + "auth_package": auth_package, + "lm_package": data_fields.get("LmPackageName", ""), + "workstation_name": data_fields.get("WorkstationName", ""), + "source_ip": data_fields.get("IpAddress", ""), + "source_port": data_fields.get("IpPort", ""), + "logon_process": data_fields.get("LogonProcessName", ""), + "target_sid": data_fields.get("TargetUserSid", ""), + "logon_guid": data_fields.get("LogonGuid", ""), + "impersonation_level": data_fields.get("ImpersonationLevel", "") + }) + + return events + + +def parse_5145_events(xml_data): + """Parse Event 5145 network share access events for named pipe monitoring.""" + events = [] + if not xml_data: + return events + + suspicious_pipes = [ + "spoolss", "netdfs", "lsarpc", "lsass", "netlogon", "samr", + "efsrpc", "fssagentrpc", "eventlog", "winreg", "srvsvc", + "dnsserver", "dhcpserver", "winspipe" + ] + + blocks = re.split(r"([^<]*)", block): + data_fields[m.group(1)] = m.group(2) + + share_name = data_fields.get("ShareName", "").lower() + relative_target = data_fields.get("RelativeTargetName", "").lower() + + if any(pipe in relative_target for pipe in suspicious_pipes): + events.append({ + "timestamp": time_match.group(1) if time_match else "", + "subject_username": data_fields.get("SubjectUserName", ""), + "subject_domain": data_fields.get("SubjectDomainName", ""), + "source_ip": data_fields.get("IpAddress", ""), + "share_name": share_name, + "pipe_name": relative_target + }) + + return events + + +def detect_ip_hostname_mismatch(events): + """Detect when WorkstationName IP doesn't match source IpAddress.""" + findings = [] + hostname_ip_map = defaultdict(set) + + # Build hostname-to-IP baseline + for event in events: + hostname = event.get("workstation_name", "").upper() + ip = event.get("source_ip", "") + if hostname and ip and ip != "-" and ip != "::1" and ip != "127.0.0.1": + hostname_ip_map[hostname].add(ip) + + # Detect hostnames authenticating from multiple IPs + for hostname, ips in hostname_ip_map.items(): + if len(ips) > 2: + findings.append({ + "check": "multiple_source_ips", + "severity": "High", + "hostname": hostname, + "source_ips": list(ips), + "ip_count": len(ips), + "description": f"Workstation '{hostname}' authenticating via NTLM from {len(ips)} different IPs — possible relay", + "mitre": "T1557.001" + }) + + return findings + + +def detect_machine_account_relay(events): + """Detect machine accounts authenticating from unexpected IPs.""" + findings = [] + machine_events = [e for e in events if e["target_username"].endswith("$")] + machine_ip_map = defaultdict(set) + + for event in machine_events: + machine_ip_map[event["target_username"]].add(event["source_ip"]) + + for machine, ips in machine_ip_map.items(): + if len(ips) > 1: + findings.append({ + "check": "machine_account_multi_ip", + "severity": "Critical", + "machine_account": machine, + "source_ips": list(ips), + "description": f"Machine account '{machine}' authenticated from {len(ips)} different IPs", + "mitre": "T1557.001" + }) + + return findings + + +def detect_rapid_authentication(events, window_seconds=5, threshold=5): + """Detect rapid multi-host authentication from a single account.""" + findings = [] + user_events = defaultdict(list) + + for event in events: + user_key = f"{event['target_domain']}\\{event['target_username']}" + user_events[user_key].append(event) + + for user, user_evts in user_events.items(): + if user.endswith("$"): + continue + sorted_evts = sorted(user_evts, key=lambda x: x.get("timestamp", "")) + unique_targets = set() + window_start = 0 + + for i, evt in enumerate(sorted_evts): + unique_targets.add(evt.get("computer", "")) + if len(unique_targets) >= threshold: + findings.append({ + "check": "rapid_multi_host_auth", + "severity": "Critical", + "username": user, + "target_count": len(unique_targets), + "targets": list(unique_targets)[:10], + "description": f"User '{user}' authenticated to {len(unique_targets)} hosts in rapid succession", + "mitre": "T1557.001" + }) + break + + return findings + + +def detect_null_sid_logons(events): + """Detect logon events with NULL SID and missing LogonGUID — relay indicator.""" + findings = [] + null_events = [] + + for event in events: + sid = event.get("target_sid", "") + guid = event.get("logon_guid", "") + if (sid == "S-1-0-0" or sid == "NULL SID" or not sid) and (not guid or guid == "{00000000-0000-0000-0000-000000000000}"): + null_events.append(event) + + if null_events: + findings.append({ + "check": "null_sid_logon", + "severity": "High", + "event_count": len(null_events), + "description": f"{len(null_events)} NTLM logon events with NULL SID and empty LogonGUID detected", + "sample_events": null_events[:5], + "mitre": "T1557.001" + }) + + return findings + + +def check_smb_signing(hosts=None): + """Check SMB signing status on specified hosts using PowerShell.""" + if not hosts: + # Get domain computers from AD + cmd = [ + "powershell", "-NoProfile", "-Command", + "Get-ADComputer -Filter * -Property DNSHostName | " + "Select-Object -ExpandProperty DNSHostName | " + "ConvertTo-Json" + ] + else: + cmd = ["powershell", "-NoProfile", "-Command", + f"'{','.join(hosts)}' -split ',' | ConvertTo-Json"] + + try: + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + if proc.returncode != 0: + return [{"error": "Failed to enumerate hosts", "stderr": proc.stderr.strip()}] + + host_list = json.loads(proc.stdout) if proc.stdout.strip() else [] + if isinstance(host_list, str): + host_list = [host_list] + except Exception as e: + return [{"error": str(e)}] + + results = [] + for host in host_list[:50]: + check_cmd = [ + "powershell", "-NoProfile", "-Command", + f"try {{ $smb = Get-SmbServerConfiguration -CimSession '{host}' " + f"-ErrorAction Stop; " + f"@{{ Host='{host}'; RequireSecuritySignature=$smb.RequireSecuritySignature; " + f"EnableSecuritySignature=$smb.EnableSecuritySignature }} | ConvertTo-Json " + f"}} catch {{ @{{ Host='{host}'; Error=$_.Exception.Message }} | ConvertTo-Json }}" + ] + try: + proc = subprocess.run(check_cmd, capture_output=True, text=True, timeout=10) + if proc.stdout.strip(): + result = json.loads(proc.stdout) + if not result.get("RequireSecuritySignature", True): + result["severity"] = "High" + result["message"] = f"SMB signing not required on {host} — vulnerable to relay" + results.append(result) + except Exception: + results.append({"host": host, "error": "Connection failed"}) + + return results + + +def run_hunt(args): + """Run full NTLM relay detection hunt.""" + results = { + "scan_time": datetime.now(timezone.utc).isoformat(), + "events_analyzed": 0, + "findings": [], + "summary": {"total_findings": 0, "critical": 0, "high": 0, "medium": 0} + } + + # Parse Event 4624 NTLM logon events + xml_4624 = query_security_log(4624, args.max_events) + events = parse_4624_events(xml_4624) + results["events_analyzed"] = len(events) + + # Run all detection checks + results["findings"].extend(detect_ip_hostname_mismatch(events)) + results["findings"].extend(detect_machine_account_relay(events)) + results["findings"].extend(detect_rapid_authentication(events)) + results["findings"].extend(detect_null_sid_logons(events)) + + # Parse Event 5145 for named pipe access + if args.check_pipes: + xml_5145 = query_security_log(5145, args.max_events) + pipe_events = parse_5145_events(xml_5145) + if pipe_events: + results["findings"].append({ + "check": "suspicious_pipe_access", + "severity": "Medium", + "event_count": len(pipe_events), + "description": f"{len(pipe_events)} accesses to sensitive named pipes detected", + "sample_events": pipe_events[:5], + "mitre": "T1557.001" + }) + + # Check SMB signing if requested + if args.check_smb_signing: + smb_results = check_smb_signing(args.hosts.split(",") if args.hosts else None) + unsigned = [r for r in smb_results if r.get("severity") == "High"] + if unsigned: + results["findings"].append({ + "check": "smb_signing_disabled", + "severity": "High", + "hosts_without_signing": len(unsigned), + "details": unsigned[:10], + "description": f"{len(unsigned)} hosts do not require SMB signing", + "mitre": "T1557.001" + }) + + # Calculate summary + for f in results["findings"]: + results["summary"]["total_findings"] += 1 + sev = f.get("severity", "").lower() + if sev in results["summary"]: + results["summary"][sev] += 1 + + return results + + +def main(): + parser = argparse.ArgumentParser(description="Hunt for NTLM relay attacks in Windows event logs") + parser.add_argument("--max-events", type=int, default=2000, help="Max events to query (default: 2000)") + parser.add_argument("--check-pipes", action="store_true", help="Also check Event 5145 for named pipe access") + parser.add_argument("--check-smb-signing", action="store_true", help="Audit SMB signing on domain hosts") + parser.add_argument("--hosts", help="Comma-separated list of hosts to check SMB signing") + parser.add_argument("--output", "-o", default="-", help="Output file (default: stdout)") + args = parser.parse_args() + + results = run_hunt(args) + output = json.dumps(results, indent=2, default=str) + + if args.output == "-": + print(output) + else: + with open(args.output, "w") as f: + f.write(output) + print(f"Hunt report written to {args.output}", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/skills/implementing-privileged-identity-management-with-azure/LICENSE b/skills/implementing-privileged-identity-management-with-azure/LICENSE new file mode 100644 index 00000000..e0c9ede6 --- /dev/null +++ b/skills/implementing-privileged-identity-management-with-azure/LICENSE @@ -0,0 +1,17 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +Copyright 2025 Mahipal + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/skills/implementing-privileged-identity-management-with-azure/SKILL.md b/skills/implementing-privileged-identity-management-with-azure/SKILL.md new file mode 100644 index 00000000..d6dca126 --- /dev/null +++ b/skills/implementing-privileged-identity-management-with-azure/SKILL.md @@ -0,0 +1,34 @@ +--- +name: implementing-privileged-identity-management-with-azure +description: Configure Azure AD Privileged Identity Management (PIM) using Microsoft Graph API to manage eligible role assignments, just-in-time activation, access reviews, and role management policies for zero-trust privileged access. +domain: cybersecurity +subdomain: identity-access-management +tags: [Azure-AD, PIM, privileged-access, just-in-time, eligible-roles, Microsoft-Graph, zero-trust, access-reviews, Entra-ID] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Implementing Privileged Identity Management with Azure + +## Overview + +Azure AD Privileged Identity Management (PIM) enforces just-in-time privileged access by converting permanent role assignments to eligible assignments that require activation. This skill uses the Microsoft Graph API to enumerate active and eligible role assignments, create eligibility schedule requests, configure role management policies (MFA requirements, approval workflows, maximum activation duration), audit PIM activation logs, and identify over-privileged permanent assignments that should be converted to eligible. + +## Prerequisites + +- Python 3.9+ with `msal`, `requests` +- Azure AD application registration with `RoleManagement.ReadWrite.Directory`, `RoleEligibilitySchedule.ReadWrite.Directory` permissions +- Microsoft Entra ID P2 or Microsoft Entra ID Governance license + +## Key Operations + +1. **List eligible assignments** — GET /roleManagement/directory/roleEligibilityScheduleInstances +2. **Create eligibility requests** — POST /roleManagement/directory/roleEligibilityScheduleRequests +3. **Activate eligible role** — POST /roleManagement/directory/roleAssignmentScheduleRequests with action=selfActivate +4. **Audit role activations** — GET /auditLogs/directoryAudits filtered by PIM activities +5. **Review role policies** — GET /policies/roleManagementPolicies to check MFA/approval requirements + +## Output + +JSON audit report with permanent vs. eligible assignment counts, over-privileged accounts, policy compliance status, and recent activation history. diff --git a/skills/implementing-privileged-identity-management-with-azure/references/api-reference.md b/skills/implementing-privileged-identity-management-with-azure/references/api-reference.md new file mode 100644 index 00000000..a088af55 --- /dev/null +++ b/skills/implementing-privileged-identity-management-with-azure/references/api-reference.md @@ -0,0 +1,130 @@ +# Azure AD PIM Microsoft Graph API Reference + +## Authentication + +```python +import msal + +app = msal.ConfidentialClientApplication( + client_id="", + authority="https://login.microsoftonline.com/", + client_credential="" +) +token = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) +``` + +## Required API Permissions + +| Permission | Type | Description | +|-----------|------|-------------| +| `RoleManagement.ReadWrite.Directory` | Application | Manage role assignments | +| `RoleEligibilitySchedule.ReadWrite.Directory` | Application | Manage eligible assignments | +| `RoleAssignmentSchedule.ReadWrite.Directory` | Application | Manage active assignments | +| `AuditLog.Read.All` | Application | Read PIM audit logs | +| `Policy.Read.All` | Application | Read role management policies | + +## PIM API Endpoints + +### List Eligible Role Assignments + +``` +GET /roleManagement/directory/roleEligibilityScheduleInstances +``` + +### Create Eligible Assignment + +``` +POST /roleManagement/directory/roleEligibilityScheduleRequests +{ + "action": "adminAssign", + "justification": "Business need for temporary access", + "roleDefinitionId": "", + "directoryScopeId": "/", + "principalId": "", + "scheduleInfo": { + "startDateTime": "2025-03-01T00:00:00Z", + "expiration": { + "type": "afterDuration", + "duration": "PT8H" + } + } +} +``` + +### Activate Eligible Role (JIT) + +``` +POST /roleManagement/directory/roleAssignmentScheduleRequests +{ + "action": "selfActivate", + "justification": "Need Global Admin for security investigation", + "roleDefinitionId": "", + "directoryScopeId": "/", + "principalId": "me", + "scheduleInfo": { + "startDateTime": "2025-03-01T12:00:00Z", + "expiration": { + "type": "afterDuration", + "duration": "PT1H" + } + } +} +``` + +### List Active Role Assignments + +``` +GET /roleManagement/directory/roleAssignmentScheduleInstances +``` + +### List Role Definitions + +``` +GET /roleManagement/directory/roleDefinitions +``` + +### Query PIM Audit Logs + +``` +GET /auditLogs/directoryAudits?$filter=activityDisplayName eq 'Add member to role completed (PIM activation)' and activityDateTime ge 2025-03-01T00:00:00Z +``` + +### Get Role Management Policies + +``` +GET /policies/roleManagementPolicies +``` + +## Key Role Definition IDs + +| Role | ID | +|------|-----| +| Global Administrator | `62e90394-69f5-4237-9190-012177145e10` | +| Security Administrator | `194ae4cb-b126-40b2-bd5b-6091b380977d` | +| User Administrator | `fe930be7-5e62-47db-91af-98c3a49a38b1` | +| Exchange Administrator | `29232cdf-9323-42fd-ade2-1d097af3e4de` | +| Privileged Role Administrator | `e8611ab8-c189-46e8-94e1-60213ab1f814` | + +## Schedule Action Types + +| Action | Description | +|--------|-------------| +| `adminAssign` | Admin assigns active or eligible role | +| `adminRemove` | Admin removes role assignment | +| `adminUpdate` | Admin updates existing assignment | +| `adminExtend` | Admin extends expiring assignment | +| `adminRenew` | Admin renews expired assignment | +| `selfActivate` | User activates eligible role | +| `selfDeactivate` | User deactivates active role | +| `selfExtend` | User requests extension | +| `selfRenew` | User requests renewal | + +## Azure CLI Equivalent + +```bash +# List PIM eligible assignments +az rest --method GET --url "https://graph.microsoft.com/v1.0/roleManagement/directory/roleEligibilityScheduleInstances" + +# List active assignments +az rest --method GET --url "https://graph.microsoft.com/v1.0/roleManagement/directory/roleAssignmentScheduleInstances" +``` diff --git a/skills/implementing-privileged-identity-management-with-azure/scripts/agent.py b/skills/implementing-privileged-identity-management-with-azure/scripts/agent.py new file mode 100644 index 00000000..69041207 --- /dev/null +++ b/skills/implementing-privileged-identity-management-with-azure/scripts/agent.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python3 +"""Manage Azure AD PIM: eligible role assignments, JIT activation, access reviews via Microsoft Graph API.""" + +import argparse +import json +import sys +from datetime import datetime, timezone + + +def get_graph_token(tenant_id, client_id, client_secret): + """Acquire OAuth2 token for Microsoft Graph API using client credentials flow.""" + try: + import msal + except ImportError: + print("Install required package: pip install msal", file=sys.stderr) + sys.exit(1) + + app = msal.ConfidentialClientApplication( + client_id, + authority=f"https://login.microsoftonline.com/{tenant_id}", + client_credential=client_secret + ) + result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) + if "access_token" not in result: + print(f"Token acquisition failed: {result.get('error_description', 'Unknown error')}", file=sys.stderr) + sys.exit(1) + return result["access_token"] + + +def graph_request(token, method, endpoint, body=None): + """Make authenticated request to Microsoft Graph API.""" + import requests + + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + url = f"https://graph.microsoft.com/v1.0{endpoint}" + + if method == "GET": + resp = requests.get(url, headers=headers, timeout=30) + elif method == "POST": + resp = requests.post(url, headers=headers, json=body, timeout=30) + elif method == "PATCH": + resp = requests.patch(url, headers=headers, json=body, timeout=30) + else: + raise ValueError(f"Unsupported method: {method}") + + if resp.status_code >= 400: + return {"error": resp.status_code, "message": resp.text} + return resp.json() if resp.text else {} + + +def list_eligible_assignments(token): + """List all eligible role assignments via PIM.""" + results = [] + endpoint = "/roleManagement/directory/roleEligibilityScheduleInstances" + response = graph_request(token, "GET", endpoint) + + if "error" in response: + return [response] + + for item in response.get("value", []): + results.append({ + "id": item.get("id"), + "principal_id": item.get("principalId"), + "role_definition_id": item.get("roleDefinitionId"), + "directory_scope_id": item.get("directoryScopeId"), + "start_date_time": item.get("startDateTime"), + "end_date_time": item.get("endDateTime"), + "assignment_type": item.get("assignmentType"), + "member_type": item.get("memberType") + }) + return results + + +def list_active_assignments(token): + """List all active (permanent and temporary) role assignments.""" + results = [] + endpoint = "/roleManagement/directory/roleAssignmentScheduleInstances" + response = graph_request(token, "GET", endpoint) + + if "error" in response: + return [response] + + for item in response.get("value", []): + results.append({ + "id": item.get("id"), + "principal_id": item.get("principalId"), + "role_definition_id": item.get("roleDefinitionId"), + "directory_scope_id": item.get("directoryScopeId"), + "start_date_time": item.get("startDateTime"), + "end_date_time": item.get("endDateTime"), + "assignment_type": item.get("assignmentType"), + "member_type": item.get("memberType") + }) + return results + + +def create_eligible_assignment(token, principal_id, role_definition_id, justification, duration_hours=8): + """Create an eligible role assignment via PIM eligibility schedule request.""" + body = { + "action": "adminAssign", + "justification": justification, + "roleDefinitionId": role_definition_id, + "directoryScopeId": "/", + "principalId": principal_id, + "scheduleInfo": { + "startDateTime": datetime.now(timezone.utc).isoformat(), + "expiration": { + "type": "afterDuration", + "duration": f"PT{duration_hours}H" + } + } + } + endpoint = "/roleManagement/directory/roleEligibilityScheduleRequests" + return graph_request(token, "POST", endpoint, body) + + +def activate_eligible_role(token, role_definition_id, justification, duration_hours=1): + """Activate an eligible role assignment (self-activate JIT access).""" + body = { + "action": "selfActivate", + "justification": justification, + "roleDefinitionId": role_definition_id, + "directoryScopeId": "/", + "principalId": "me", + "scheduleInfo": { + "startDateTime": datetime.now(timezone.utc).isoformat(), + "expiration": { + "type": "afterDuration", + "duration": f"PT{duration_hours}H" + } + } + } + endpoint = "/roleManagement/directory/roleAssignmentScheduleRequests" + return graph_request(token, "POST", endpoint, body) + + +def list_role_definitions(token): + """List all Microsoft Entra role definitions.""" + endpoint = "/roleManagement/directory/roleDefinitions" + response = graph_request(token, "GET", endpoint) + if "error" in response: + return [response] + return [ + { + "id": r.get("id"), + "display_name": r.get("displayName"), + "is_built_in": r.get("isBuiltIn"), + "is_enabled": r.get("isEnabled") + } + for r in response.get("value", []) + ] + + +def audit_pim_activations(token, days=7): + """Query directory audit logs for PIM role activation events.""" + from datetime import timedelta + start_date = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ") + endpoint = ( + f"/auditLogs/directoryAudits?" + f"$filter=activityDisplayName eq 'Add member to role completed (PIM activation)' " + f"and activityDateTime ge {start_date}" + ) + response = graph_request(token, "GET", endpoint) + if "error" in response: + return [response] + + activations = [] + for entry in response.get("value", []): + activations.append({ + "activity": entry.get("activityDisplayName"), + "timestamp": entry.get("activityDateTime"), + "initiated_by": entry.get("initiatedBy", {}).get("user", {}).get("userPrincipalName"), + "target_resources": [ + {"display_name": t.get("displayName"), "type": t.get("type")} + for t in entry.get("targetResources", []) + ], + "result": entry.get("result") + }) + return activations + + +def get_role_management_policies(token): + """Retrieve role management policies to check MFA/approval requirements.""" + endpoint = "/policies/roleManagementPolicies" + response = graph_request(token, "GET", endpoint) + if "error" in response: + return [response] + + policies = [] + for policy in response.get("value", []): + policies.append({ + "id": policy.get("id"), + "display_name": policy.get("displayName"), + "scope_id": policy.get("scopeId"), + "scope_type": policy.get("scopeType"), + "last_modified": policy.get("lastModifiedDateTime") + }) + return policies + + +def generate_audit_report(token): + """Generate comprehensive PIM audit report.""" + eligible = list_eligible_assignments(token) + active = list_active_assignments(token) + roles = list_role_definitions(token) + + permanent_active = [a for a in active if not a.get("end_date_time")] + temporary_active = [a for a in active if a.get("end_date_time")] + + report = { + "scan_time": datetime.now(timezone.utc).isoformat(), + "summary": { + "total_role_definitions": len(roles), + "eligible_assignments": len(eligible), + "active_assignments": len(active), + "permanent_active_assignments": len(permanent_active), + "temporary_active_assignments": len(temporary_active) + }, + "findings": [], + "eligible_assignments": eligible, + "permanent_active_assignments": permanent_active + } + + if len(permanent_active) > 0: + report["findings"].append({ + "severity": "High", + "check": "permanent_privileged_assignments", + "message": f"{len(permanent_active)} permanent active role assignments found — consider converting to eligible", + "count": len(permanent_active) + }) + + if len(eligible) == 0 and len(active) > 0: + report["findings"].append({ + "severity": "High", + "check": "no_eligible_assignments", + "message": "No eligible (JIT) assignments configured — all access is permanent" + }) + + return report + + +def main(): + parser = argparse.ArgumentParser(description="Azure AD PIM management via Microsoft Graph API") + parser.add_argument("--tenant-id", required=True, help="Azure AD tenant ID") + parser.add_argument("--client-id", required=True, help="Application (client) ID") + parser.add_argument("--client-secret", required=True, help="Client secret") + + subparsers = parser.add_subparsers(dest="command", help="PIM operation") + + subparsers.add_parser("list-eligible", help="List eligible role assignments") + subparsers.add_parser("list-active", help="List active role assignments") + subparsers.add_parser("list-roles", help="List role definitions") + subparsers.add_parser("audit-report", help="Generate PIM audit report") + + audit_parser = subparsers.add_parser("audit-activations", help="Query PIM activation logs") + audit_parser.add_argument("--days", type=int, default=7, help="Look back N days (default: 7)") + + create_parser = subparsers.add_parser("create-eligible", help="Create eligible assignment") + create_parser.add_argument("--principal-id", required=True, help="User/group object ID") + create_parser.add_argument("--role-id", required=True, help="Role definition ID") + create_parser.add_argument("--justification", required=True, help="Business justification") + create_parser.add_argument("--duration", type=int, default=8, help="Duration in hours (default: 8)") + + activate_parser = subparsers.add_parser("activate", help="Activate eligible role (JIT)") + activate_parser.add_argument("--role-id", required=True, help="Role definition ID") + activate_parser.add_argument("--justification", required=True, help="Activation justification") + activate_parser.add_argument("--duration", type=int, default=1, help="Duration in hours (default: 1)") + + subparsers.add_parser("policies", help="List role management policies") + + args = parser.parse_args() + token = get_graph_token(args.tenant_id, args.client_id, args.client_secret) + + if args.command == "list-eligible": + result = list_eligible_assignments(token) + elif args.command == "list-active": + result = list_active_assignments(token) + elif args.command == "list-roles": + result = list_role_definitions(token) + elif args.command == "audit-report": + result = generate_audit_report(token) + elif args.command == "audit-activations": + result = audit_pim_activations(token, args.days) + elif args.command == "create-eligible": + result = create_eligible_assignment(token, args.principal_id, args.role_id, args.justification, args.duration) + elif args.command == "activate": + result = activate_eligible_role(token, args.role_id, args.justification, args.duration) + elif args.command == "policies": + result = get_role_management_policies(token) + else: + parser.print_help() + sys.exit(0) + + print(json.dumps(result, indent=2, default=str)) + + +if __name__ == "__main__": + main() diff --git a/skills/performing-supply-chain-attack-simulation/LICENSE b/skills/performing-supply-chain-attack-simulation/LICENSE new file mode 100644 index 00000000..e0c9ede6 --- /dev/null +++ b/skills/performing-supply-chain-attack-simulation/LICENSE @@ -0,0 +1,17 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +Copyright 2025 Mahipal + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/skills/performing-supply-chain-attack-simulation/SKILL.md b/skills/performing-supply-chain-attack-simulation/SKILL.md new file mode 100644 index 00000000..ebefd81f --- /dev/null +++ b/skills/performing-supply-chain-attack-simulation/SKILL.md @@ -0,0 +1,34 @@ +--- +name: performing-supply-chain-attack-simulation +description: Simulate and detect software supply chain attacks including typosquatting detection via Levenshtein distance, dependency confusion testing against private registries, package hash verification with pip, and known vulnerability scanning with pip-audit. +domain: cybersecurity +subdomain: application-security +tags: [supply-chain, typosquatting, dependency-confusion, package-verification, pip-audit, PyPI, software-composition-analysis] +version: "1.0" +author: mahipal +license: Apache-2.0 +--- + +# Performing Supply Chain Attack Simulation + +## Overview + +Software supply chain attacks exploit trust in package registries through typosquatting (registering names similar to popular packages), dependency confusion (publishing higher-version public packages matching private names), and compromised package distribution. This skill detects these attack vectors by computing Levenshtein distance between package names and popular PyPI packages, verifying package integrity via SHA-256 hash comparison, scanning for known CVEs with pip-audit, and testing dependency resolution order for confusion vulnerabilities. + +## Prerequisites + +- Python 3.9+ with `pip-audit`, `Levenshtein`, `requests` +- Access to PyPI JSON API (https://pypi.org/pypi/{package}/json) +- Network access for package metadata retrieval + +## Key Detection Areas + +1. **Typosquatting** — compare package names against top PyPI packages using edit distance thresholds +2. **Dependency confusion** — check if internal package names exist on public PyPI with higher version numbers +3. **Hash verification** — download packages and verify SHA-256 digests match published hashes +4. **Vulnerability scanning** — audit installed packages against OSV and PyPA advisory databases +5. **Metadata anomalies** — flag packages with suspicious author emails, missing homepages, or very recent first upload dates + +## Output + +JSON report with risk scores per package, detected attack vectors, hash verification results, and CVE findings. diff --git a/skills/performing-supply-chain-attack-simulation/references/api-reference.md b/skills/performing-supply-chain-attack-simulation/references/api-reference.md new file mode 100644 index 00000000..229ba355 --- /dev/null +++ b/skills/performing-supply-chain-attack-simulation/references/api-reference.md @@ -0,0 +1,123 @@ +# Supply Chain Attack Simulation Reference + +## Tool Installation + +```bash +pip install pip-audit python-Levenshtein requests +``` + +## PyPI JSON API + +```bash +# Get package metadata +curl https://pypi.org/pypi/{package_name}/json + +# Get specific version +curl https://pypi.org/pypi/{package_name}/{version}/json +``` + +### Response Structure + +```json +{ + "info": { + "name": "requests", + "version": "2.31.0", + "author": "Kenneth Reitz", + "author_email": "me@kennethreitz.org", + "home_page": "https://requests.readthedocs.io", + "summary": "Python HTTP for Humans." + }, + "urls": [ + { + "filename": "requests-2.31.0.tar.gz", + "packagetype": "sdist", + "digests": { + "sha256": "942c5a758f98d790eaed1a29cb6eefc7f0edf3fcb0fce8aea3fbd5951d bfcfeb" + } + } + ] +} +``` + +## pip-audit CLI + +```bash +# Audit current environment +pip-audit + +# JSON output +pip-audit --format json + +# Audit requirements file +pip-audit -r requirements.txt + +# Hash-checking mode (pinned deps only) +pip-audit --require-hashes -r requirements.txt + +# Fix vulnerabilities automatically +pip-audit --fix +``` + +## pip Hash Verification + +```bash +# Download with hash verification +pip download --no-deps --require-hashes -r requirements.txt + +# requirements.txt with hashes +requests==2.31.0 \ + --hash=sha256:942c5a758f98d790eaed1a29cb6eefc7f0edf3fcb0fce8aea3fbd5951dbfcfeb +``` + +## pypi-scan Typosquatting Detection + +```bash +# Install +pip install pypi-scan + +# Scan for typosquatting of a package +pypi-scan --package requests + +# Scan with custom edit distance +pypi-scan --package numpy --edit-distance 2 +``` + +## Levenshtein Distance Examples + +| Package | Target | Distance | Risk | +|---------|--------|----------|------| +| `reqeusts` | `requests` | 1 | High | +| `requets` | `requests` | 1 | High | +| `request` | `requests` | 1 | High | +| `numpys` | `numpy` | 1 | High | +| `pandsa` | `pandas` | 1 | High | +| `flaask` | `flask` | 1 | High | + +## Dependency Confusion Test Structure + +```json +[ + {"name": "my-internal-lib", "version": "1.2.0"}, + {"name": "company-utils", "version": "0.5.3"}, + {"name": "private-auth-sdk", "version": "2.1.0"} +] +``` + +## MITRE ATT&CK Mapping + +| Technique | ID | Description | +|-----------|-----|-------------| +| Supply Chain Compromise | T1195.001 | Compromise software dependencies | +| Trusted Developer Utilities | T1127 | Abuse package manager trust | +| Ingress Tool Transfer | T1105 | Download malicious packages | + +## Metadata Anomaly Indicators + +| Indicator | Risk | Description | +|-----------|------|-------------| +| Disposable email author | High | Author uses throwaway email service | +| No homepage/repo URL | Medium | Package has no verifiable source | +| No author info | Medium | Anonymous package publication | +| Very short description | Low | Minimal package documentation | +| Recent first upload + popular name variant | Critical | Likely typosquatting attempt | diff --git a/skills/performing-supply-chain-attack-simulation/scripts/agent.py b/skills/performing-supply-chain-attack-simulation/scripts/agent.py new file mode 100644 index 00000000..acfe6ccb --- /dev/null +++ b/skills/performing-supply-chain-attack-simulation/scripts/agent.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python3 +"""Simulate and detect software supply chain attacks: typosquatting, dependency confusion, hash verification.""" + +import argparse +import hashlib +import json +import subprocess +import sys +from datetime import datetime, timezone + + +def get_levenshtein_distance(s1, s2): + """Compute Levenshtein edit distance between two strings.""" + if len(s1) < len(s2): + return get_levenshtein_distance(s2, s1) + if len(s2) == 0: + return len(s1) + prev_row = range(len(s2) + 1) + for i, c1 in enumerate(s1): + curr_row = [i + 1] + for j, c2 in enumerate(s2): + insertions = prev_row[j + 1] + 1 + deletions = curr_row[j] + 1 + substitutions = prev_row[j] + (c1 != c2) + curr_row.append(min(insertions, deletions, substitutions)) + prev_row = curr_row + return prev_row[-1] + + +TOP_PYPI_PACKAGES = [ + "requests", "numpy", "pandas", "flask", "django", "boto3", "scipy", + "tensorflow", "torch", "scikit-learn", "pillow", "matplotlib", + "cryptography", "pyyaml", "sqlalchemy", "celery", "redis", "psycopg2", + "paramiko", "beautifulsoup4", "selenium", "pytest", "setuptools", + "urllib3", "certifi", "idna", "charset-normalizer", "pip", "wheel", + "packaging", "six", "python-dateutil", "jinja2", "markupsafe", + "pydantic", "fastapi", "uvicorn", "httpx", "aiohttp", "grpcio" +] + + +def check_typosquatting(package_name, threshold=2): + """Check if package name is suspiciously similar to popular packages.""" + matches = [] + for popular in TOP_PYPI_PACKAGES: + if package_name == popular: + continue + distance = get_levenshtein_distance(package_name.lower(), popular.lower()) + if 0 < distance <= threshold: + matches.append({ + "popular_package": popular, + "edit_distance": distance, + "risk": "High" if distance == 1 else "Medium" + }) + return matches + + +def query_pypi_metadata(package_name): + """Fetch package metadata from PyPI JSON API.""" + try: + import requests + resp = requests.get( + f"https://pypi.org/pypi/{package_name}/json", + timeout=10 + ) + if resp.status_code == 200: + return resp.json() + return None + except Exception: + return None + + +def check_dependency_confusion(private_packages): + """Check if private package names exist on public PyPI.""" + findings = [] + for pkg_info in private_packages: + name = pkg_info["name"] + internal_version = pkg_info.get("version", "0.0.0") + metadata = query_pypi_metadata(name) + if metadata: + public_version = metadata.get("info", {}).get("version", "0.0.0") + findings.append({ + "package": name, + "internal_version": internal_version, + "public_version": public_version, + "risk": "Critical", + "message": f"Private package '{name}' exists on public PyPI as version {public_version}", + "attack_vector": "dependency_confusion" + }) + else: + findings.append({ + "package": name, + "internal_version": internal_version, + "risk": "Info", + "message": f"Private package '{name}' not found on public PyPI (safe)" + }) + return findings + + +def verify_package_hash(package_name, expected_hash=None): + """Download package and verify SHA-256 hash against PyPI published digests.""" + metadata = query_pypi_metadata(package_name) + if not metadata: + return {"package": package_name, "status": "error", "message": "Package not found on PyPI"} + + releases = metadata.get("urls", []) + if not releases: + return {"package": package_name, "status": "error", "message": "No release files found"} + + sdist = None + for release in releases: + if release.get("packagetype") == "sdist": + sdist = release + break + if not sdist: + sdist = releases[0] + + published_sha256 = sdist.get("digests", {}).get("sha256", "") + result = { + "package": package_name, + "version": metadata["info"]["version"], + "filename": sdist["filename"], + "published_sha256": published_sha256, + "packagetype": sdist["packagetype"] + } + + if expected_hash: + if expected_hash == published_sha256: + result["status"] = "verified" + result["message"] = "Hash matches expected value" + else: + result["status"] = "mismatch" + result["risk"] = "Critical" + result["message"] = "Hash does NOT match expected value — possible tampering" + result["expected_hash"] = expected_hash + else: + result["status"] = "retrieved" + result["message"] = "Published hash retrieved for manual verification" + + return result + + +def run_pip_audit(): + """Run pip-audit to scan installed packages for known vulnerabilities.""" + try: + proc = subprocess.run( + ["pip-audit", "--format", "json", "--progress-spinner", "off"], + capture_output=True, text=True, timeout=120 + ) + if proc.returncode == 0 or proc.stdout: + return json.loads(proc.stdout) if proc.stdout.strip() else [] + return [{"error": proc.stderr.strip()}] + except FileNotFoundError: + return [{"error": "pip-audit not installed. Run: pip install pip-audit"}] + except subprocess.TimeoutExpired: + return [{"error": "pip-audit timed out after 120 seconds"}] + except json.JSONDecodeError: + return [{"error": "Failed to parse pip-audit output"}] + + +def analyze_metadata_anomalies(package_name): + """Detect suspicious metadata patterns in a PyPI package.""" + metadata = query_pypi_metadata(package_name) + if not metadata: + return {"package": package_name, "status": "not_found"} + + info = metadata["info"] + anomalies = [] + + if not info.get("home_page") and not info.get("project_url"): + anomalies.append({ + "check": "missing_homepage", + "severity": "Medium", + "message": "Package has no homepage or project URL" + }) + + if not info.get("author") and not info.get("author_email"): + anomalies.append({ + "check": "missing_author", + "severity": "Medium", + "message": "Package has no author information" + }) + + if info.get("author_email") and any( + domain in info["author_email"] + for domain in ["mailinator.com", "guerrillamail.com", "tempmail.com", "throwaway.email"] + ): + anomalies.append({ + "check": "disposable_email", + "severity": "High", + "message": f"Author uses disposable email: {info['author_email']}" + }) + + summary = info.get("summary", "") + if not summary or len(summary) < 10: + anomalies.append({ + "check": "missing_description", + "severity": "Low", + "message": "Package has no meaningful description" + }) + + return { + "package": package_name, + "version": info.get("version"), + "author": info.get("author"), + "author_email": info.get("author_email"), + "anomalies": anomalies, + "anomaly_count": len(anomalies) + } + + +def main(): + parser = argparse.ArgumentParser( + description="Simulate and detect software supply chain attacks" + ) + subparsers = parser.add_subparsers(dest="command", help="Attack simulation type") + + typo_parser = subparsers.add_parser("typosquat", help="Check for typosquatting") + typo_parser.add_argument("packages", nargs="+", help="Package names to check") + typo_parser.add_argument("--threshold", type=int, default=2, help="Max edit distance (default: 2)") + + confusion_parser = subparsers.add_parser("confusion", help="Test dependency confusion") + confusion_parser.add_argument("--packages", required=True, help="JSON file with private packages [{name, version}]") + + hash_parser = subparsers.add_parser("verify-hash", help="Verify package hash") + hash_parser.add_argument("package", help="Package name") + hash_parser.add_argument("--expected-hash", help="Expected SHA-256 hash to compare") + + subparsers.add_parser("audit", help="Run pip-audit vulnerability scan") + + meta_parser = subparsers.add_parser("metadata", help="Check metadata anomalies") + meta_parser.add_argument("packages", nargs="+", help="Package names to analyze") + + args = parser.parse_args() + + if args.command == "typosquat": + results = [] + for pkg in args.packages: + matches = check_typosquatting(pkg, args.threshold) + results.append({"package": pkg, "typosquat_matches": matches, "is_suspicious": len(matches) > 0}) + print(json.dumps({"scan_type": "typosquatting", "results": results, "timestamp": datetime.now(timezone.utc).isoformat()}, indent=2)) + + elif args.command == "confusion": + with open(args.packages) as f: + private_pkgs = json.load(f) + results = check_dependency_confusion(private_pkgs) + print(json.dumps({"scan_type": "dependency_confusion", "results": results, "timestamp": datetime.now(timezone.utc).isoformat()}, indent=2)) + + elif args.command == "verify-hash": + result = verify_package_hash(args.package, args.expected_hash) + print(json.dumps({"scan_type": "hash_verification", "result": result, "timestamp": datetime.now(timezone.utc).isoformat()}, indent=2)) + + elif args.command == "audit": + results = run_pip_audit() + print(json.dumps({"scan_type": "vulnerability_audit", "results": results, "timestamp": datetime.now(timezone.utc).isoformat()}, indent=2)) + + elif args.command == "metadata": + results = [analyze_metadata_anomalies(pkg) for pkg in args.packages] + print(json.dumps({"scan_type": "metadata_analysis", "results": results, "timestamp": datetime.now(timezone.utc).isoformat()}, indent=2)) + + else: + parser.print_help() + + +if __name__ == "__main__": + main()