mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-11 21:54:56 +03:00
feat: add 5 new cybersecurity skills - secrets scanning CI/CD, Bluetooth assessment, DNS exfil Zeek, SOAR phishing, AD ACL abuse
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
MIT License
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@@ -0,0 +1,64 @@
|
||||
---
|
||||
name: analyzing-active-directory-acl-abuse
|
||||
description: Detect dangerous ACL misconfigurations in Active Directory using ldap3 to identify GenericAll, WriteDACL, and WriteOwner abuse paths
|
||||
domain: cybersecurity
|
||||
subdomain: identity-security
|
||||
tags: [active-directory, acl-abuse, ldap, privilege-escalation]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
Active Directory Access Control Lists (ACLs) define permissions on AD objects through Discretionary Access Control Lists (DACLs) containing Access Control Entries (ACEs). Misconfigured ACEs can grant non-privileged users dangerous permissions such as GenericAll (full control), WriteDACL (modify permissions), WriteOwner (take ownership), and GenericWrite (modify attributes) on sensitive objects like Domain Admins groups, domain controllers, or GPOs.
|
||||
|
||||
This skill uses the ldap3 Python library to connect to a Domain Controller, query objects with their nTSecurityDescriptor attribute, parse the binary security descriptor into SDDL (Security Descriptor Definition Language) format, and identify ACEs that grant dangerous permissions to non-administrative principals. These misconfigurations are the basis for ACL-based attack paths discovered by tools like BloodHound.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.9 or later with ldap3 library (`pip install ldap3`)
|
||||
- Domain user credentials with read access to AD objects
|
||||
- Network connectivity to Domain Controller on port 389 (LDAP) or 636 (LDAPS)
|
||||
- Understanding of Active Directory security model and SDDL format
|
||||
|
||||
## Steps
|
||||
|
||||
1. **Connect to Domain Controller**: Establish an LDAP connection using ldap3 with NTLM or simple authentication. Use LDAPS (port 636) for encrypted connections in production.
|
||||
|
||||
2. **Query target objects**: Search the target OU or entire domain for objects including users, groups, computers, and OUs. Request the `nTSecurityDescriptor`, `distinguishedName`, `objectClass`, and `sAMAccountName` attributes.
|
||||
|
||||
3. **Parse security descriptors**: Convert the binary nTSecurityDescriptor into its SDDL string representation. Parse each ACE in the DACL to extract the trustee SID, access mask, and ACE type (allow/deny).
|
||||
|
||||
4. **Resolve SIDs to principals**: Map security identifiers (SIDs) to human-readable account names using LDAP lookups against the domain. Identify well-known SIDs for built-in groups.
|
||||
|
||||
5. **Check for dangerous permissions**: Compare each ACE's access mask against dangerous permission bitmasks: GenericAll (0x10000000), WriteDACL (0x00040000), WriteOwner (0x00080000), GenericWrite (0x40000000), and WriteProperty for specific extended rights.
|
||||
|
||||
6. **Filter non-admin trustees**: Exclude expected administrative trustees (Domain Admins, Enterprise Admins, SYSTEM, Administrators) and flag ACEs where non-privileged users or groups hold dangerous permissions.
|
||||
|
||||
7. **Map attack paths**: For each finding, document the potential attack chain (e.g., GenericAll on user allows password reset, WriteDACL on group allows adding self to group).
|
||||
|
||||
8. **Generate remediation report**: Output a JSON report with all dangerous ACEs, affected objects, non-admin trustees, and recommended remediation steps.
|
||||
|
||||
## Expected Output
|
||||
|
||||
```json
|
||||
{
|
||||
"domain": "corp.example.com",
|
||||
"objects_scanned": 1247,
|
||||
"dangerous_aces_found": 8,
|
||||
"findings": [
|
||||
{
|
||||
"severity": "critical",
|
||||
"target_object": "CN=Domain Admins,CN=Users,DC=corp,DC=example,DC=com",
|
||||
"target_type": "group",
|
||||
"trustee": "CORP\\helpdesk-team",
|
||||
"permission": "GenericAll",
|
||||
"access_mask": "0x10000000",
|
||||
"ace_type": "ACCESS_ALLOWED",
|
||||
"attack_path": "GenericAll on Domain Admins group allows adding arbitrary members",
|
||||
"remediation": "Remove GenericAll ACE for helpdesk-team on Domain Admins"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
@@ -0,0 +1,94 @@
|
||||
# Active Directory ACL Abuse API Reference
|
||||
|
||||
## ldap3 Python Connection
|
||||
|
||||
```python
|
||||
from ldap3 import Server, Connection, ALL, NTLM, SUBTREE
|
||||
|
||||
server = Server("192.168.1.10", get_info=ALL, use_ssl=False)
|
||||
conn = Connection(server, user="DOMAIN\\user", password="pass",
|
||||
authentication=NTLM, auto_bind=True)
|
||||
|
||||
# Search with nTSecurityDescriptor
|
||||
conn.search(
|
||||
"DC=corp,DC=example,DC=com",
|
||||
"(objectClass=group)",
|
||||
search_scope=SUBTREE,
|
||||
attributes=["distinguishedName", "sAMAccountName",
|
||||
"objectClass", "nTSecurityDescriptor"],
|
||||
)
|
||||
```
|
||||
|
||||
## SDDL ACE Format
|
||||
|
||||
```
|
||||
ACE String: (ace_type;ace_flags;rights;object_guid;inherit_guid;trustee_sid)
|
||||
Example: (A;;GA;;;S-1-5-21-xxx-512)
|
||||
```
|
||||
|
||||
| Component | Description |
|
||||
|-----------|-------------|
|
||||
| `A` | Access Allowed |
|
||||
| `D` | Access Denied |
|
||||
| `OA` | Object Access Allowed |
|
||||
| `GA` | Generic All |
|
||||
| `GW` | Generic Write |
|
||||
| `WD` | Write DACL |
|
||||
| `WO` | Write Owner |
|
||||
|
||||
## Dangerous Permission Bitmasks
|
||||
|
||||
| Permission | Hex Mask | Risk |
|
||||
|-----------|----------|------|
|
||||
| GenericAll | `0x10000000` | Full control over object |
|
||||
| GenericWrite | `0x40000000` | Modify all writable attributes |
|
||||
| WriteDACL | `0x00040000` | Modify object permissions |
|
||||
| WriteOwner | `0x00080000` | Take object ownership |
|
||||
| WriteProperty | `0x00000020` | Write specific properties |
|
||||
| ExtendedRight | `0x00000100` | Extended rights (password reset, etc.) |
|
||||
| Self | `0x00000008` | Self-membership modification |
|
||||
| Delete | `0x00010000` | Delete the object |
|
||||
|
||||
## BloodHound Cypher Queries for ACL Paths
|
||||
|
||||
```cypher
|
||||
-- Find all users with GenericAll on Domain Admins
|
||||
MATCH p=(n:User)-[r:GenericAll]->(g:Group {name:"DOMAIN ADMINS@CORP.COM"})
|
||||
RETURN p
|
||||
|
||||
-- Find WriteDACL paths from non-admins to high-value targets
|
||||
MATCH (n:User {admincount:false})
|
||||
MATCH p=allShortestPaths((n)-[r:WriteDacl|WriteOwner|GenericAll*1..]->(m:Group))
|
||||
WHERE m.highvalue = true
|
||||
RETURN p
|
||||
|
||||
-- Find GenericWrite on computers for RBCD attacks
|
||||
MATCH p=(n:User)-[r:GenericWrite]->(c:Computer)
|
||||
WHERE NOT n.admincount
|
||||
RETURN n.name, c.name
|
||||
|
||||
-- Enumerate all outbound ACL edges for a principal
|
||||
MATCH p=(n {name:"HELPDESK@CORP.COM"})-[r:GenericAll|GenericWrite|WriteDacl|WriteOwner|Owns]->(m)
|
||||
RETURN type(r), m.name, labels(m)
|
||||
|
||||
-- Find shortest ACL abuse path to Domain Admin
|
||||
MATCH (n:User {name:"JSMITH@CORP.COM"})
|
||||
MATCH (da:Group {name:"DOMAIN ADMINS@CORP.COM"})
|
||||
MATCH p=shortestPath((n)-[r:MemberOf|GenericAll|GenericWrite|WriteDacl|WriteOwner|Owns|ForceChangePassword*1..]->(da))
|
||||
RETURN p
|
||||
```
|
||||
|
||||
## PowerView Commands for ACL Enumeration
|
||||
|
||||
```powershell
|
||||
# Get ACL for Domain Admins group
|
||||
Get-DomainObjectAcl -Identity "Domain Admins" -ResolveGUIDs
|
||||
|
||||
# Find interesting ACEs for non-admin users
|
||||
Find-InterestingDomainAcl -ResolveGUIDs | Where-Object {
|
||||
$_.ActiveDirectoryRights -match "GenericAll|WriteDacl|WriteOwner"
|
||||
}
|
||||
|
||||
# Get ACL for specific OU
|
||||
Get-DomainObjectAcl -SearchBase "OU=Servers,DC=corp,DC=com" -ResolveGUIDs
|
||||
```
|
||||
@@ -0,0 +1,261 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Active Directory ACL abuse detection using ldap3 to find dangerous permissions."""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import struct
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
|
||||
from ldap3 import Server, Connection, ALL, NTLM, SUBTREE
|
||||
from ldap3.protocol.formatters.formatters import format_sid
|
||||
|
||||
|
||||
DANGEROUS_MASKS = {
|
||||
"GenericAll": 0x10000000,
|
||||
"GenericWrite": 0x40000000,
|
||||
"WriteDACL": 0x00040000,
|
||||
"WriteOwner": 0x00080000,
|
||||
"WriteProperty": 0x00000020,
|
||||
"Self": 0x00000008,
|
||||
"ExtendedRight": 0x00000100,
|
||||
"DeleteChild": 0x00000002,
|
||||
"Delete": 0x00010000,
|
||||
}
|
||||
|
||||
ADMIN_SIDS = {
|
||||
"S-1-5-18",
|
||||
"S-1-5-32-544",
|
||||
"S-1-5-9",
|
||||
}
|
||||
|
||||
ADMIN_RID_SUFFIXES = {
|
||||
"-500",
|
||||
"-512",
|
||||
"-516",
|
||||
"-518",
|
||||
"-519",
|
||||
"-498",
|
||||
}
|
||||
|
||||
ATTACK_PATHS = {
|
||||
"GenericAll": {
|
||||
"user": "Full control allows password reset, Kerberoasting via SPN, or shadow credential attack",
|
||||
"group": "Full control allows adding arbitrary members to the group",
|
||||
"computer": "Full control allows resource-based constrained delegation attack",
|
||||
"organizationalUnit": "Full control allows linking malicious GPO or moving objects",
|
||||
},
|
||||
"WriteDACL": {
|
||||
"user": "Can modify DACL to grant self GenericAll, then reset password",
|
||||
"group": "Can modify DACL to grant self write membership, then add self",
|
||||
"computer": "Can modify DACL to grant self full control on machine account",
|
||||
"organizationalUnit": "Can modify DACL to gain control over OU child objects",
|
||||
},
|
||||
"WriteOwner": {
|
||||
"user": "Can take ownership then modify DACL to escalate privileges",
|
||||
"group": "Can take ownership of group then modify membership",
|
||||
"computer": "Can take ownership then configure delegation abuse",
|
||||
"organizationalUnit": "Can take ownership then control OU policies",
|
||||
},
|
||||
"GenericWrite": {
|
||||
"user": "Can write scriptPath for logon script execution or modify SPN for Kerberoasting",
|
||||
"group": "Can modify group attributes including membership",
|
||||
"computer": "Can write msDS-AllowedToActOnBehalfOfOtherIdentity for RBCD attack",
|
||||
"organizationalUnit": "Can modify OU attributes and link GPO",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def is_admin_sid(sid: str, domain_sid: str) -> bool:
|
||||
if sid in ADMIN_SIDS:
|
||||
return True
|
||||
for suffix in ADMIN_RID_SUFFIXES:
|
||||
if sid == domain_sid + suffix:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def parse_sid(raw: bytes) -> str:
|
||||
if len(raw) < 8:
|
||||
return ""
|
||||
revision = raw[0]
|
||||
sub_auth_count = raw[1]
|
||||
authority = int.from_bytes(raw[2:8], byteorder="big")
|
||||
subs = []
|
||||
for i in range(sub_auth_count):
|
||||
offset = 8 + i * 4
|
||||
if offset + 4 > len(raw):
|
||||
break
|
||||
subs.append(struct.unpack("<I", raw[offset:offset + 4])[0])
|
||||
return f"S-{revision}-{authority}-" + "-".join(str(s) for s in subs)
|
||||
|
||||
|
||||
def parse_acl(descriptor_bytes: bytes) -> list:
|
||||
aces = []
|
||||
if len(descriptor_bytes) < 20:
|
||||
return aces
|
||||
revision = descriptor_bytes[0]
|
||||
control = struct.unpack("<H", descriptor_bytes[2:4])[0]
|
||||
dacl_offset = struct.unpack("<I", descriptor_bytes[16:20])[0]
|
||||
if dacl_offset == 0 or dacl_offset >= len(descriptor_bytes):
|
||||
return aces
|
||||
dacl = descriptor_bytes[dacl_offset:]
|
||||
if len(dacl) < 8:
|
||||
return aces
|
||||
acl_size = struct.unpack("<H", dacl[2:4])[0]
|
||||
ace_count = struct.unpack("<H", dacl[4:6])[0]
|
||||
offset = 8
|
||||
for _ in range(ace_count):
|
||||
if offset + 4 > len(dacl):
|
||||
break
|
||||
ace_type = dacl[offset]
|
||||
ace_flags = dacl[offset + 1]
|
||||
ace_size = struct.unpack("<H", dacl[offset + 2:offset + 4])[0]
|
||||
if ace_size < 4 or offset + ace_size > len(dacl):
|
||||
break
|
||||
if ace_type in (0x00, 0x05):
|
||||
if offset + 8 <= len(dacl):
|
||||
access_mask = struct.unpack("<I", dacl[offset + 4:offset + 8])[0]
|
||||
sid_offset = offset + 8
|
||||
if ace_type == 0x05:
|
||||
sid_offset = offset + 8 + 32
|
||||
if sid_offset < offset + ace_size:
|
||||
sid_bytes = dacl[sid_offset:offset + ace_size]
|
||||
sid_str = parse_sid(sid_bytes)
|
||||
matched_perms = []
|
||||
for perm_name, mask_val in DANGEROUS_MASKS.items():
|
||||
if access_mask & mask_val:
|
||||
matched_perms.append(perm_name)
|
||||
if matched_perms:
|
||||
aces.append({
|
||||
"ace_type": "ACCESS_ALLOWED" if ace_type in (0x00, 0x05) else "OTHER",
|
||||
"access_mask": f"0x{access_mask:08x}",
|
||||
"trustee_sid": sid_str,
|
||||
"permissions": matched_perms,
|
||||
})
|
||||
offset += ace_size
|
||||
return aces
|
||||
|
||||
|
||||
def resolve_sid(conn: Connection, base_dn: str, sid: str) -> str:
|
||||
try:
|
||||
conn.search(base_dn, f"(objectSid={sid})", attributes=["sAMAccountName", "cn"])
|
||||
if conn.entries:
|
||||
entry = conn.entries[0]
|
||||
return str(entry.sAMAccountName) if hasattr(entry, "sAMAccountName") else str(entry.cn)
|
||||
except Exception:
|
||||
pass
|
||||
return sid
|
||||
|
||||
|
||||
def get_domain_sid(conn: Connection, base_dn: str) -> str:
|
||||
conn.search(base_dn, "(objectClass=domain)", attributes=["objectSid"])
|
||||
if conn.entries:
|
||||
raw = conn.entries[0].objectSid.raw_values[0]
|
||||
return parse_sid(raw)
|
||||
return ""
|
||||
|
||||
|
||||
def analyze_acls(dc_ip: str, domain: str, username: str, password: str,
|
||||
target_ou: str) -> dict:
|
||||
server = Server(dc_ip, get_info=ALL, use_ssl=False)
|
||||
domain_parts = domain.split(".")
|
||||
base_dn = ",".join(f"DC={p}" for p in domain_parts)
|
||||
search_base = target_ou if target_ou else base_dn
|
||||
ntlm_user = f"{domain}\\{username}"
|
||||
|
||||
conn = Connection(server, user=ntlm_user, password=password,
|
||||
authentication=NTLM, auto_bind=True)
|
||||
domain_sid = get_domain_sid(conn, base_dn)
|
||||
|
||||
conn.search(
|
||||
search_base,
|
||||
"(|(objectClass=user)(objectClass=group)(objectClass=computer)(objectClass=organizationalUnit))",
|
||||
search_scope=SUBTREE,
|
||||
attributes=["distinguishedName", "sAMAccountName", "objectClass", "nTSecurityDescriptor"],
|
||||
)
|
||||
|
||||
findings = []
|
||||
objects_scanned = 0
|
||||
sid_cache = {}
|
||||
|
||||
for entry in conn.entries:
|
||||
objects_scanned += 1
|
||||
dn = str(entry.distinguishedName)
|
||||
obj_classes = [str(c) for c in entry.objectClass.values] if hasattr(entry, "objectClass") else []
|
||||
obj_type = "unknown"
|
||||
for oc in obj_classes:
|
||||
if oc.lower() in ("user", "group", "computer", "organizationalunit"):
|
||||
obj_type = oc.lower()
|
||||
break
|
||||
|
||||
if not hasattr(entry, "nTSecurityDescriptor"):
|
||||
continue
|
||||
raw_sd = entry.nTSecurityDescriptor.raw_values
|
||||
if not raw_sd:
|
||||
continue
|
||||
sd_bytes = raw_sd[0]
|
||||
aces = parse_acl(sd_bytes)
|
||||
|
||||
for ace in aces:
|
||||
trustee_sid = ace["trustee_sid"]
|
||||
if is_admin_sid(trustee_sid, domain_sid):
|
||||
continue
|
||||
if trustee_sid not in sid_cache:
|
||||
sid_cache[trustee_sid] = resolve_sid(conn, base_dn, trustee_sid)
|
||||
trustee_name = sid_cache[trustee_sid]
|
||||
|
||||
for perm in ace["permissions"]:
|
||||
if perm in ("Delete", "DeleteChild", "Self", "WriteProperty", "ExtendedRight"):
|
||||
severity = "medium"
|
||||
else:
|
||||
severity = "critical"
|
||||
attack = ATTACK_PATHS.get(perm, {}).get(obj_type,
|
||||
f"{perm} on {obj_type} may allow privilege escalation")
|
||||
findings.append({
|
||||
"severity": severity,
|
||||
"target_object": dn,
|
||||
"target_type": obj_type,
|
||||
"trustee": trustee_name,
|
||||
"trustee_sid": trustee_sid,
|
||||
"permission": perm,
|
||||
"access_mask": ace["access_mask"],
|
||||
"ace_type": ace["ace_type"],
|
||||
"attack_path": attack,
|
||||
"remediation": f"Remove {perm} ACE for {trustee_name} on {dn}",
|
||||
})
|
||||
|
||||
conn.unbind()
|
||||
findings.sort(key=lambda f: 0 if f["severity"] == "critical" else 1)
|
||||
return {
|
||||
"domain": domain,
|
||||
"domain_sid": domain_sid,
|
||||
"search_base": search_base,
|
||||
"objects_scanned": objects_scanned,
|
||||
"dangerous_aces_found": len(findings),
|
||||
"findings": findings,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Active Directory ACL Abuse Analyzer")
|
||||
parser.add_argument("--dc-ip", required=True, help="Domain Controller IP address")
|
||||
parser.add_argument("--domain", required=True, help="AD domain name (e.g., corp.example.com)")
|
||||
parser.add_argument("--username", required=True, help="Domain username for LDAP bind")
|
||||
parser.add_argument("--password", required=True, help="Domain user password")
|
||||
parser.add_argument("--target-ou", default=None,
|
||||
help="Target OU distinguished name to scope the search")
|
||||
parser.add_argument("--output", default=None, help="Output JSON file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
result = analyze_acls(args.dc_ip, args.domain, args.username,
|
||||
args.password, args.target_ou)
|
||||
report = json.dumps(result, indent=2)
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(report)
|
||||
print(report)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,8 @@
|
||||
MIT License
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@@ -0,0 +1,65 @@
|
||||
---
|
||||
name: detecting-exfiltration-over-dns-with-zeek
|
||||
description: Detect DNS-based data exfiltration by analyzing Zeek dns.log for high-entropy subdomains and anomalous query patterns
|
||||
domain: cybersecurity
|
||||
subdomain: network-security
|
||||
tags: [dns-exfiltration, zeek, entropy-analysis, threat-hunting]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
DNS tunneling and exfiltration is a technique used by attackers to bypass firewalls and DLP controls by encoding stolen data into DNS query subdomains. Legitimate DNS queries have predictable entropy and length patterns, while exfiltration queries contain encoded data with high Shannon entropy, unusually long subdomain labels, and high volumes of unique subdomains per parent domain.
|
||||
|
||||
This skill analyzes Zeek dns.log files (TSV format) to detect exfiltration indicators. The agent computes Shannon entropy for each subdomain component, identifies queries exceeding the 63-character DNS label limit, counts unique subdomains per parent domain, and flags domains that exceed configurable thresholds. These techniques detect tools like dnscat2, iodine, dns2tcp, and custom DNS tunneling implementations.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.9 or later with math and collections modules (stdlib)
|
||||
- Zeek dns.log files in TSV format with standard field headers
|
||||
- Network capture data processed by Zeek 5.0+ or later
|
||||
- Understanding of DNS protocol structure and query types
|
||||
|
||||
## Steps
|
||||
|
||||
1. **Parse Zeek dns.log headers**: Read the TSV file, extract the `#fields` header line to identify column positions for `ts`, `id.orig_h`, `query`, `qtype_name`, `rcode_name`, and `answers`.
|
||||
|
||||
2. **Extract and decompose queries**: For each DNS query, split the FQDN into subdomain labels and parent domain. Skip queries to known safe domains and internal zones.
|
||||
|
||||
3. **Compute Shannon entropy**: Calculate the information entropy of each subdomain label. Legitimate subdomains typically have entropy below 3.5, while encoded/encrypted data produces entropy above 4.0.
|
||||
|
||||
4. **Detect long labels**: Flag DNS labels exceeding 52 characters (approaching the 63-character maximum). Long labels are a strong indicator of data tunneling.
|
||||
|
||||
5. **Count unique subdomains per domain**: Track how many distinct subdomains each parent domain receives. Domains with more than 50 unique subdomains within the log window are suspicious.
|
||||
|
||||
6. **Identify query volume anomalies**: Calculate queries-per-minute per source IP per domain. Exfiltration tools generate sustained high-volume query streams that differ from normal browsing.
|
||||
|
||||
7. **Score and rank domains**: Combine entropy, label length, uniqueness count, and query volume into a composite risk score. Rank domains by score and output the top suspicious domains.
|
||||
|
||||
8. **Generate detection report**: Produce a JSON report with flagged domains, their evidence indicators, originating source IPs, and recommended response actions.
|
||||
|
||||
## Expected Output
|
||||
|
||||
```json
|
||||
{
|
||||
"analysis_summary": {
|
||||
"total_queries_analyzed": 145832,
|
||||
"unique_domains": 3421,
|
||||
"flagged_domains": 3,
|
||||
"entropy_threshold": 3.5
|
||||
},
|
||||
"flagged_domains": [
|
||||
{
|
||||
"domain": "data.evil-c2.com",
|
||||
"unique_subdomains": 892,
|
||||
"avg_entropy": 4.72,
|
||||
"max_label_length": 61,
|
||||
"source_ips": ["10.0.1.45"],
|
||||
"risk_score": 9.4,
|
||||
"indicators": ["high_entropy", "long_labels", "high_subdomain_count"]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
@@ -0,0 +1,83 @@
|
||||
# DNS Exfiltration Detection Reference
|
||||
|
||||
## Zeek dns.log Field Reference (TSV Format)
|
||||
|
||||
| Field | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `ts` | time | Timestamp of the DNS request |
|
||||
| `uid` | string | Unique connection identifier |
|
||||
| `id.orig_h` | addr | Source IP address |
|
||||
| `id.orig_p` | port | Source port |
|
||||
| `id.resp_h` | addr | Destination IP (DNS server) |
|
||||
| `id.resp_p` | port | Destination port (usually 53) |
|
||||
| `proto` | enum | Transport protocol (udp/tcp) |
|
||||
| `trans_id` | count | DNS transaction ID |
|
||||
| `rtt` | interval | Round trip time |
|
||||
| `query` | string | The domain name queried |
|
||||
| `qclass` | count | Query class value |
|
||||
| `qclass_name` | string | Query class name (C_INTERNET) |
|
||||
| `qtype` | count | Query type value |
|
||||
| `qtype_name` | string | Query type name (A, AAAA, TXT, MX, CNAME, NULL) |
|
||||
| `rcode` | count | Response code value |
|
||||
| `rcode_name` | string | Response code name (NOERROR, NXDOMAIN, SERVFAIL) |
|
||||
| `AA` | bool | Authoritative Answer flag |
|
||||
| `TC` | bool | Truncation flag |
|
||||
| `RD` | bool | Recursion Desired flag |
|
||||
| `RA` | bool | Recursion Available flag |
|
||||
| `Z` | count | Reserved field |
|
||||
| `answers` | vector | Resource record answers |
|
||||
| `TTLs` | vector | TTL values for answer RRs |
|
||||
| `rejected` | bool | Whether query was rejected |
|
||||
|
||||
## zeek-cut Usage
|
||||
|
||||
```bash
|
||||
# Extract specific fields from dns.log
|
||||
cat dns.log | zeek-cut ts id.orig_h query qtype_name answers
|
||||
|
||||
# Filter TXT queries (common in DNS tunneling)
|
||||
cat dns.log | zeek-cut query qtype_name | grep TXT
|
||||
|
||||
# Count queries per domain
|
||||
cat dns.log | zeek-cut query | rev | cut -d. -f1-2 | rev | sort | uniq -c | sort -rn
|
||||
```
|
||||
|
||||
## RITA Beacon Detection
|
||||
|
||||
```bash
|
||||
# Import Zeek logs into RITA
|
||||
rita import /opt/zeek/logs/current rita-dataset
|
||||
|
||||
# Analyze for beaconing
|
||||
rita show-beacons rita-dataset
|
||||
|
||||
# Show DNS tunneling indicators
|
||||
rita show-dns rita-dataset
|
||||
|
||||
# HTML report
|
||||
rita html-report rita-dataset /var/www/html/rita-report
|
||||
```
|
||||
|
||||
## Suricata DNS Exfiltration Rules
|
||||
|
||||
```
|
||||
# Detect long DNS queries (potential tunneling)
|
||||
alert dns any any -> any any (msg:"Possible DNS tunneling - long query"; \
|
||||
dns.query; content:"|00|"; byte_test:1,>,50,0,relative; \
|
||||
sid:1000001; rev:1;)
|
||||
|
||||
# Detect TXT record queries to unusual domains
|
||||
alert dns any any -> any any (msg:"Suspicious DNS TXT query"; \
|
||||
dns_query; pcre:"/^[a-z0-9]{30,}\./i"; sid:1000002; rev:1;)
|
||||
```
|
||||
|
||||
## Splunk SPL for DNS Exfiltration
|
||||
|
||||
```spl
|
||||
index=zeek sourcetype=zeek_dns
|
||||
| eval subdomain_len=len(mvindex(split(query, "."), 0))
|
||||
| where subdomain_len > 50
|
||||
| stats count dc(query) as unique_queries by "id.orig_h" query
|
||||
| where unique_queries > 100
|
||||
| sort -unique_queries
|
||||
```
|
||||
@@ -0,0 +1,213 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Detect DNS exfiltration from Zeek dns.log by analyzing entropy and query patterns."""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
SAFE_DOMAINS = {
|
||||
"in-addr.arpa", "ip6.arpa", "local", "localhost",
|
||||
"google.com", "googleapis.com", "gstatic.com",
|
||||
"microsoft.com", "windows.net", "windowsupdate.com",
|
||||
"apple.com", "icloud.com", "akamai.net", "cloudflare.com",
|
||||
"amazonaws.com", "azure.com",
|
||||
}
|
||||
|
||||
|
||||
def shannon_entropy(data: str) -> float:
|
||||
if not data:
|
||||
return 0.0
|
||||
freq = defaultdict(int)
|
||||
for ch in data:
|
||||
freq[ch] += 1
|
||||
length = len(data)
|
||||
entropy = 0.0
|
||||
for count in freq.values():
|
||||
prob = count / length
|
||||
entropy -= prob * math.log2(prob)
|
||||
return round(entropy, 4)
|
||||
|
||||
|
||||
def parse_zeek_dns_log(log_path: str) -> list:
|
||||
records = []
|
||||
field_names = []
|
||||
separator = "\t"
|
||||
with open(log_path, "r", encoding="utf-8", errors="replace") as f:
|
||||
for line in f:
|
||||
line = line.rstrip("\n")
|
||||
if line.startswith("#separator"):
|
||||
sep_value = line.split(" ", 1)[1] if " " in line else "\\x09"
|
||||
if sep_value == "\\x09":
|
||||
separator = "\t"
|
||||
else:
|
||||
separator = sep_value
|
||||
continue
|
||||
if line.startswith("#fields"):
|
||||
field_names = line.split(separator)[1:] if separator == "\t" else line.split("\t")[1:]
|
||||
field_names = [f.strip() for f in field_names]
|
||||
continue
|
||||
if line.startswith("#"):
|
||||
continue
|
||||
if not field_names:
|
||||
continue
|
||||
values = line.split(separator)
|
||||
if len(values) < len(field_names):
|
||||
continue
|
||||
record = {}
|
||||
for i, name in enumerate(field_names):
|
||||
record[name] = values[i] if i < len(values) else "-"
|
||||
records.append(record)
|
||||
return records
|
||||
|
||||
|
||||
def extract_parent_domain(fqdn: str, levels: int = 2) -> str:
|
||||
parts = fqdn.rstrip(".").split(".")
|
||||
if len(parts) <= levels:
|
||||
return fqdn.rstrip(".")
|
||||
return ".".join(parts[-levels:])
|
||||
|
||||
|
||||
def extract_subdomain(fqdn: str, levels: int = 2) -> str:
|
||||
parts = fqdn.rstrip(".").split(".")
|
||||
if len(parts) <= levels:
|
||||
return ""
|
||||
return ".".join(parts[:-levels])
|
||||
|
||||
|
||||
def analyze_dns_log(log_path: str, entropy_threshold: float, subdomain_threshold: int,
|
||||
label_length_threshold: int) -> dict:
|
||||
records = parse_zeek_dns_log(log_path)
|
||||
domain_stats = defaultdict(lambda: {
|
||||
"subdomains": set(),
|
||||
"entropies": [],
|
||||
"max_label_len": 0,
|
||||
"source_ips": set(),
|
||||
"query_count": 0,
|
||||
"qtypes": defaultdict(int),
|
||||
"sample_queries": [],
|
||||
})
|
||||
|
||||
total_queries = 0
|
||||
for rec in records:
|
||||
query = rec.get("query", "-")
|
||||
if query == "-" or not query:
|
||||
continue
|
||||
total_queries += 1
|
||||
src_ip = rec.get("id.orig_h", "unknown")
|
||||
qtype = rec.get("qtype_name", "unknown")
|
||||
parent = extract_parent_domain(query)
|
||||
subdomain = extract_subdomain(query)
|
||||
|
||||
if parent.lower() in SAFE_DOMAINS:
|
||||
continue
|
||||
|
||||
stats = domain_stats[parent]
|
||||
stats["query_count"] += 1
|
||||
stats["source_ips"].add(src_ip)
|
||||
stats["qtypes"][qtype] += 1
|
||||
|
||||
if subdomain:
|
||||
stats["subdomains"].add(subdomain)
|
||||
ent = shannon_entropy(subdomain)
|
||||
stats["entropies"].append(ent)
|
||||
labels = subdomain.split(".")
|
||||
for label in labels:
|
||||
if len(label) > stats["max_label_len"]:
|
||||
stats["max_label_len"] = len(label)
|
||||
if len(stats["sample_queries"]) < 5:
|
||||
stats["sample_queries"].append(query)
|
||||
|
||||
flagged = []
|
||||
for domain, stats in domain_stats.items():
|
||||
indicators = []
|
||||
avg_entropy = 0.0
|
||||
if stats["entropies"]:
|
||||
avg_entropy = round(sum(stats["entropies"]) / len(stats["entropies"]), 4)
|
||||
unique_count = len(stats["subdomains"])
|
||||
max_label = stats["max_label_len"]
|
||||
|
||||
if avg_entropy >= entropy_threshold and unique_count >= 5:
|
||||
indicators.append("high_entropy")
|
||||
if max_label >= label_length_threshold:
|
||||
indicators.append("long_labels")
|
||||
if unique_count >= subdomain_threshold:
|
||||
indicators.append("high_subdomain_count")
|
||||
txt_ratio = stats["qtypes"].get("TXT", 0) / max(stats["query_count"], 1)
|
||||
if txt_ratio > 0.5 and stats["query_count"] > 20:
|
||||
indicators.append("high_txt_ratio")
|
||||
null_ratio = stats["qtypes"].get("NULL", 0) / max(stats["query_count"], 1)
|
||||
if null_ratio > 0.3:
|
||||
indicators.append("null_queries")
|
||||
|
||||
if not indicators:
|
||||
continue
|
||||
|
||||
risk_score = 0.0
|
||||
if "high_entropy" in indicators:
|
||||
risk_score += min(avg_entropy, 5.0)
|
||||
if "long_labels" in indicators:
|
||||
risk_score += min(max_label / 15.0, 3.0)
|
||||
if "high_subdomain_count" in indicators:
|
||||
risk_score += min(unique_count / 100.0, 3.0)
|
||||
if "high_txt_ratio" in indicators:
|
||||
risk_score += 1.5
|
||||
if "null_queries" in indicators:
|
||||
risk_score += 1.0
|
||||
risk_score = min(round(risk_score, 1), 10.0)
|
||||
|
||||
flagged.append({
|
||||
"domain": domain,
|
||||
"unique_subdomains": unique_count,
|
||||
"avg_entropy": avg_entropy,
|
||||
"max_label_length": max_label,
|
||||
"query_count": stats["query_count"],
|
||||
"source_ips": sorted(stats["source_ips"]),
|
||||
"qtypes": dict(stats["qtypes"]),
|
||||
"risk_score": risk_score,
|
||||
"indicators": indicators,
|
||||
"sample_queries": stats["sample_queries"],
|
||||
})
|
||||
|
||||
flagged.sort(key=lambda x: x["risk_score"], reverse=True)
|
||||
return {
|
||||
"analysis_summary": {
|
||||
"log_file": log_path,
|
||||
"total_queries_analyzed": total_queries,
|
||||
"unique_domains": len(domain_stats),
|
||||
"flagged_domains": len(flagged),
|
||||
"entropy_threshold": entropy_threshold,
|
||||
"subdomain_threshold": subdomain_threshold,
|
||||
"label_length_threshold": label_length_threshold,
|
||||
},
|
||||
"flagged_domains": flagged,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="DNS Exfiltration Detection from Zeek dns.log")
|
||||
parser.add_argument("--log-file", required=True, help="Path to Zeek dns.log file")
|
||||
parser.add_argument("--entropy-threshold", type=float, default=3.5,
|
||||
help="Shannon entropy threshold for flagging (default: 3.5)")
|
||||
parser.add_argument("--subdomain-threshold", type=int, default=50,
|
||||
help="Unique subdomain count threshold (default: 50)")
|
||||
parser.add_argument("--label-length-threshold", type=int, default=52,
|
||||
help="DNS label length threshold for flagging (default: 52)")
|
||||
parser.add_argument("--output", type=str, default=None,
|
||||
help="Output JSON file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
result = analyze_dns_log(args.log_file, args.entropy_threshold,
|
||||
args.subdomain_threshold, args.label_length_threshold)
|
||||
report = json.dumps(result, indent=2)
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(report)
|
||||
print(report)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,8 @@
|
||||
MIT License
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@@ -0,0 +1,67 @@
|
||||
---
|
||||
name: implementing-secrets-scanning-in-ci-cd
|
||||
description: Integrate gitleaks and trufflehog into CI/CD pipelines to detect leaked secrets before deployment
|
||||
domain: cybersecurity
|
||||
subdomain: devsecops
|
||||
tags: [secrets-scanning, gitleaks, trufflehog, ci-cd]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
This skill covers implementing automated secrets scanning in CI/CD pipelines using gitleaks and trufflehog. It enables security teams to detect API keys, tokens, passwords, and other credentials that have been accidentally committed to source code repositories, providing a CI gate that blocks deployments containing high-severity findings.
|
||||
|
||||
Gitleaks scans git repositories and directories for hardcoded secrets using regex patterns and entropy analysis. TruffleHog performs filesystem and git history scans with optional secret verification against live services. Together they provide comprehensive coverage for secrets detection.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.9 or later
|
||||
- gitleaks v8.x installed and available on PATH
|
||||
- trufflehog v3.x installed and available on PATH
|
||||
- A git repository or directory to scan
|
||||
- Access to CI/CD platform (GitHub Actions, GitLab CI, Jenkins)
|
||||
|
||||
## Steps
|
||||
|
||||
1. **Install scanning tools**: Install gitleaks via package manager or binary download. Install trufflehog via `brew install trufflehog` or download from GitHub releases.
|
||||
|
||||
2. **Configure gitleaks**: Create a `.gitleaks.toml` configuration file in the repository root to define custom rules, allowlists, and path exclusions. Use `--config` flag to point to custom configs.
|
||||
|
||||
3. **Run gitleaks directory scan**: Execute `gitleaks dir --source . --report-format json --report-path gitleaks-report.json` to scan the working directory and generate a JSON report.
|
||||
|
||||
4. **Run trufflehog filesystem scan**: Execute `trufflehog filesystem /path/to/repo --json > trufflehog-report.json` to scan files and output JSON findings to a report file.
|
||||
|
||||
5. **Parse and filter findings**: Use the agent script to parse both JSON reports, filter findings by severity (critical, high, medium, low), and determine whether the CI pipeline should pass or fail.
|
||||
|
||||
6. **Integrate into CI pipeline**: Add the scanning step to your GitHub Actions workflow, GitLab CI config, or Jenkins pipeline as a pre-deployment gate. Use `--exit-code` flag in gitleaks to control pipeline behavior.
|
||||
|
||||
7. **Configure pre-commit hooks**: Set up gitleaks as a pre-commit hook using `gitleaks protect --staged` to catch secrets before they are committed.
|
||||
|
||||
8. **Review and triage findings**: Examine the JSON output for false positives, add legitimate entries to `.gitleaksignore`, and rotate any confirmed leaked credentials immediately.
|
||||
|
||||
## Expected Output
|
||||
|
||||
The agent script produces a JSON report containing:
|
||||
- Total findings count from each scanner
|
||||
- Findings grouped by severity level
|
||||
- Individual finding details including file path, line number, rule ID, and redacted secret
|
||||
- A CI gate verdict (pass/fail) based on the configured severity threshold
|
||||
- Execution metadata including scan duration and tool versions
|
||||
|
||||
```json
|
||||
{
|
||||
"scan_summary": {
|
||||
"tool": "both",
|
||||
"total_findings": 3,
|
||||
"critical": 1,
|
||||
"high": 1,
|
||||
"medium": 1,
|
||||
"low": 0,
|
||||
"ci_gate": "FAIL",
|
||||
"fail_reason": "Found 1 critical and 1 high severity findings"
|
||||
},
|
||||
"findings": [...]
|
||||
}
|
||||
```
|
||||
@@ -0,0 +1,97 @@
|
||||
# Secrets Scanning API Reference
|
||||
|
||||
## Gitleaks CLI
|
||||
|
||||
### Subcommands
|
||||
```bash
|
||||
gitleaks git # Scan git repositories
|
||||
gitleaks dir # Scan directories or files
|
||||
gitleaks stdin # Detect secrets from stdin
|
||||
```
|
||||
|
||||
### Key Flags
|
||||
| Flag | Description |
|
||||
|------|-------------|
|
||||
| `--source, -s` | Path to source directory or repository |
|
||||
| `--config, -c` | Path to .gitleaks.toml config file |
|
||||
| `--report-format, -f` | Output format: json, csv, junit, sarif, template |
|
||||
| `--report-path, -r` | File path to write report |
|
||||
| `--exit-code` | Exit code when leaks found (default: 1) |
|
||||
| `--redact` | Redact secrets from output (0-100%, default: 100) |
|
||||
| `--baseline-path, -b` | Path to baseline report to ignore known findings |
|
||||
| `--no-banner` | Suppress banner output |
|
||||
| `--verbose, -v` | Show verbose scan details |
|
||||
| `--log-level, -l` | Log level: trace, debug, info, warn, error, fatal |
|
||||
| `--max-target-megabytes` | Skip files larger than specified MB |
|
||||
| `--enable-rule` | Only enable specific rule IDs |
|
||||
| `--gitleaks-ignore-path, -i` | Path to .gitleaksignore file |
|
||||
|
||||
### Pre-commit Hook
|
||||
```yaml
|
||||
# .pre-commit-config.yaml
|
||||
repos:
|
||||
- repo: https://github.com/gitleaks/gitleaks
|
||||
rev: v8.21.2
|
||||
hooks:
|
||||
- id: gitleaks
|
||||
```
|
||||
|
||||
### Protect Staged Files
|
||||
```bash
|
||||
gitleaks protect --staged --report-format json --report-path staged-report.json
|
||||
```
|
||||
|
||||
## TruffleHog CLI
|
||||
|
||||
### Subcommands
|
||||
```bash
|
||||
trufflehog git <repo-url> # Scan git repository
|
||||
trufflehog filesystem <path> # Scan local filesystem
|
||||
trufflehog github --org=<org> # Scan GitHub org
|
||||
trufflehog gitlab --token=<token> # Scan GitLab instance
|
||||
trufflehog s3 --bucket=<name> # Scan S3 bucket
|
||||
trufflehog docker --image=<img> # Scan Docker image
|
||||
```
|
||||
|
||||
### Key Flags
|
||||
| Flag | Description |
|
||||
|------|-------------|
|
||||
| `--json, -j` | Output in JSON format (one object per line) |
|
||||
| `--no-verification` | Skip live secret verification |
|
||||
| `--concurrency` | Number of concurrent workers (default: 20) |
|
||||
| `--results` | Filter: verified, unknown, unverified, filtered_unverified |
|
||||
| `--force-skip-binaries` | Skip binary files during scan |
|
||||
| `--force-skip-archives` | Skip archive files during scan |
|
||||
| `--include-paths` | Path to file with include path patterns |
|
||||
| `--exclude-paths` | Path to file with exclude path patterns |
|
||||
| `--log-level` | Verbosity: 0 (info) to 5 (trace) |
|
||||
|
||||
## GitHub Secret Scanning API
|
||||
|
||||
### List Alerts
|
||||
```bash
|
||||
curl -H "Authorization: Bearer $TOKEN" \
|
||||
https://api.github.com/repos/{owner}/{repo}/secret-scanning/alerts
|
||||
```
|
||||
|
||||
### Update Alert State
|
||||
```bash
|
||||
curl -X PATCH \
|
||||
-H "Authorization: Bearer $TOKEN" \
|
||||
https://api.github.com/repos/{owner}/{repo}/secret-scanning/alerts/{alert_number} \
|
||||
-d '{"state": "resolved", "resolution": "revoked"}'
|
||||
```
|
||||
|
||||
## GitHub Actions Integration
|
||||
|
||||
```yaml
|
||||
- name: Gitleaks Scan
|
||||
uses: gitleaks/gitleaks-action@v2
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: TruffleHog Scan
|
||||
uses: trufflesecurity/trufflehog@main
|
||||
with:
|
||||
extra_args: --results verified,unknown
|
||||
```
|
||||
@@ -0,0 +1,173 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Secrets scanning CI/CD gate using gitleaks and trufflehog."""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
|
||||
SEVERITY_ORDER = {"critical": 4, "high": 3, "medium": 2, "low": 1, "info": 0}
|
||||
|
||||
GITLEAKS_SEVERITY_MAP = {
|
||||
"aws-access-key": "critical",
|
||||
"aws-secret-key": "critical",
|
||||
"github-pat": "critical",
|
||||
"private-key": "critical",
|
||||
"generic-api-key": "high",
|
||||
"slack-webhook": "high",
|
||||
"stripe-api-key": "critical",
|
||||
"twilio-api-key": "high",
|
||||
"sendgrid-api-key": "high",
|
||||
"npm-access-token": "high",
|
||||
"pypi-upload-token": "high",
|
||||
"gcp-api-key": "critical",
|
||||
"heroku-api-key": "high",
|
||||
"jwt": "medium",
|
||||
"password-in-url": "high",
|
||||
"generic-password": "medium",
|
||||
}
|
||||
|
||||
|
||||
def run_gitleaks(scan_path: str) -> list:
|
||||
report_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
|
||||
report_path = report_file.name
|
||||
report_file.close()
|
||||
cmd = [
|
||||
"gitleaks", "dir",
|
||||
"--source", scan_path,
|
||||
"--report-format", "json",
|
||||
"--report-path", report_path,
|
||||
"--exit-code", "0",
|
||||
"--no-banner",
|
||||
]
|
||||
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
||||
findings = []
|
||||
if os.path.exists(report_path) and os.path.getsize(report_path) > 0:
|
||||
with open(report_path, "r") as f:
|
||||
raw = json.load(f)
|
||||
for item in raw:
|
||||
rule_id = item.get("RuleID", "unknown")
|
||||
severity = GITLEAKS_SEVERITY_MAP.get(rule_id, "medium")
|
||||
secret_val = item.get("Secret", "")
|
||||
redacted = secret_val[:4] + "****" if len(secret_val) > 4 else "****"
|
||||
findings.append({
|
||||
"tool": "gitleaks",
|
||||
"rule_id": rule_id,
|
||||
"description": item.get("Description", ""),
|
||||
"file": item.get("File", ""),
|
||||
"line": item.get("StartLine", 0),
|
||||
"severity": severity,
|
||||
"redacted_secret": redacted,
|
||||
"commit": item.get("Commit", ""),
|
||||
"author": item.get("Author", ""),
|
||||
"entropy": item.get("Entropy", 0.0),
|
||||
})
|
||||
os.unlink(report_path)
|
||||
return findings
|
||||
|
||||
|
||||
def run_trufflehog(scan_path: str) -> list:
|
||||
cmd = [
|
||||
"trufflehog", "filesystem", scan_path,
|
||||
"--json",
|
||||
"--no-verification",
|
||||
"--concurrency", "10",
|
||||
]
|
||||
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
||||
findings = []
|
||||
for line in proc.stdout.strip().split("\n"):
|
||||
if not line.strip():
|
||||
continue
|
||||
item = json.loads(line)
|
||||
detector = item.get("DetectorName", item.get("DetectorType", "unknown"))
|
||||
source_meta = item.get("SourceMetadata", {})
|
||||
fs_data = source_meta.get("Data", {}).get("Filesystem", {})
|
||||
raw_secret = item.get("Raw", "")
|
||||
redacted = raw_secret[:4] + "****" if len(raw_secret) > 4 else "****"
|
||||
severity = "critical" if item.get("Verified", False) else "high"
|
||||
findings.append({
|
||||
"tool": "trufflehog",
|
||||
"rule_id": detector,
|
||||
"description": f"Detected {detector} secret",
|
||||
"file": fs_data.get("file", ""),
|
||||
"line": fs_data.get("line", 0),
|
||||
"severity": severity,
|
||||
"redacted_secret": redacted,
|
||||
"verified": item.get("Verified", False),
|
||||
"decoder_name": item.get("DecoderName", ""),
|
||||
})
|
||||
return findings
|
||||
|
||||
|
||||
def evaluate_gate(findings: list, fail_on_severity: str) -> dict:
|
||||
threshold = SEVERITY_ORDER.get(fail_on_severity, 2)
|
||||
counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0}
|
||||
for f in findings:
|
||||
sev = f.get("severity", "medium")
|
||||
counts[sev] = counts.get(sev, 0) + 1
|
||||
gate_failed = False
|
||||
fail_reasons = []
|
||||
for sev, count in counts.items():
|
||||
if count > 0 and SEVERITY_ORDER.get(sev, 0) >= threshold:
|
||||
gate_failed = True
|
||||
fail_reasons.append(f"Found {count} {sev} severity findings")
|
||||
return {
|
||||
"ci_gate": "FAIL" if gate_failed else "PASS",
|
||||
"fail_reason": "; ".join(fail_reasons) if fail_reasons else None,
|
||||
"total_findings": len(findings),
|
||||
**counts,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Secrets scanning CI/CD gate")
|
||||
parser.add_argument("--path", required=True, help="Path to scan for secrets")
|
||||
parser.add_argument("--tool", choices=["gitleaks", "trufflehog", "both"],
|
||||
default="both", help="Scanner tool to use")
|
||||
parser.add_argument("--fail-on-severity", choices=["critical", "high", "medium", "low"],
|
||||
default="high", help="Minimum severity to fail CI gate")
|
||||
parser.add_argument("--output", default=None, help="Output JSON file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
if not os.path.exists(args.path):
|
||||
print(json.dumps({"error": f"Path does not exist: {args.path}"}))
|
||||
sys.exit(1)
|
||||
|
||||
start_time = time.time()
|
||||
all_findings = []
|
||||
|
||||
if args.tool in ("gitleaks", "both"):
|
||||
all_findings.extend(run_gitleaks(args.path))
|
||||
if args.tool in ("trufflehog", "both"):
|
||||
all_findings.extend(run_trufflehog(args.path))
|
||||
|
||||
gate_result = evaluate_gate(all_findings, args.fail_on_severity)
|
||||
elapsed = round(time.time() - start_time, 2)
|
||||
|
||||
report = {
|
||||
"scan_summary": {
|
||||
"tool": args.tool,
|
||||
"scanned_path": args.path,
|
||||
"fail_on_severity": args.fail_on_severity,
|
||||
"scan_duration_seconds": elapsed,
|
||||
**gate_result,
|
||||
},
|
||||
"findings": all_findings,
|
||||
}
|
||||
|
||||
output_json = json.dumps(report, indent=2)
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(output_json)
|
||||
print(output_json)
|
||||
|
||||
if gate_result["ci_gate"] == "FAIL":
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,8 @@
|
||||
MIT License
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@@ -0,0 +1,68 @@
|
||||
---
|
||||
name: implementing-soar-playbook-for-phishing
|
||||
description: Automate phishing incident response using Splunk SOAR REST API to create containers, add artifacts, and trigger playbooks
|
||||
domain: cybersecurity
|
||||
subdomain: security-operations
|
||||
tags: [soar, splunk-phantom, phishing, incident-response]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
This skill implements a phishing incident response workflow using the Splunk SOAR (formerly Phantom) REST API. When a suspected phishing email is reported, the agent parses email headers and body, creates a SOAR container representing the incident, attaches artifacts containing indicators of compromise (sender address, URLs, IP addresses, file hashes), triggers an automated investigation playbook, and polls for action results.
|
||||
|
||||
Splunk SOAR orchestrates and automates security operations through playbooks that chain together investigative and response actions. The REST API at `/rest/container`, `/rest/artifact`, and `/rest/playbook_run` enables programmatic incident creation and automation triggering from external tools, email gateways, and SIEM alerts.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.9 or later with `requests` and `email` modules
|
||||
- Splunk SOAR instance (Cloud or On-Premises) with REST API access
|
||||
- SOAR API token with permissions to create containers and trigger playbooks
|
||||
- Network connectivity to SOAR instance on port 443
|
||||
- A configured phishing investigation playbook in SOAR
|
||||
|
||||
## Steps
|
||||
|
||||
1. **Parse the phishing email**: Read the email file (.eml format) and extract headers including From, To, Subject, Reply-To, Return-Path, Received, Message-ID, X-Mailer, and authentication results (SPF, DKIM, DMARC). Extract URLs and IP addresses from the email body.
|
||||
|
||||
2. **Authenticate to SOAR REST API**: Use the API token in the `ph-auth-token` header to authenticate all REST API requests to the SOAR instance.
|
||||
|
||||
3. **Create a container**: POST to `/rest/container` with the incident label, name, description, severity, and status. The container represents the phishing incident and receives a container ID in the response.
|
||||
|
||||
4. **Add email header artifacts**: POST to `/rest/artifact` with `container_id` and CEF (Common Event Format) fields containing sender address (`fromAddress`), recipient (`toAddress`), subject, originating IP (`sourceAddress`), and Message-ID. Set `run_automation` to False for all but the last artifact.
|
||||
|
||||
5. **Add URL artifacts**: For each URL extracted from the email body, create an artifact with CEF field `requestURL` and type `url`. These artifacts feed into URL reputation checks in the playbook.
|
||||
|
||||
6. **Trigger the playbook**: POST to `/rest/playbook_run` with the playbook ID or name and the container ID. This initiates the automated investigation workflow.
|
||||
|
||||
7. **Poll action results**: GET `/rest/action_run` filtered by container ID to monitor playbook progress. Poll until all actions reach a terminal state (success, failed, or cancelled).
|
||||
|
||||
8. **Compile response report**: Aggregate playbook action results into a summary report with verdicts from URL reputation, domain reputation, IP geolocation, and email header analysis.
|
||||
|
||||
## Expected Output
|
||||
|
||||
```json
|
||||
{
|
||||
"incident": {
|
||||
"container_id": 1542,
|
||||
"status": "new",
|
||||
"severity": "high",
|
||||
"artifacts_created": 5
|
||||
},
|
||||
"playbook": {
|
||||
"name": "phishing_investigate",
|
||||
"run_id": 892,
|
||||
"status": "success",
|
||||
"actions_completed": 8
|
||||
},
|
||||
"verdict": "malicious",
|
||||
"indicators": {
|
||||
"sender_domain_reputation": "malicious",
|
||||
"urls_flagged": 2,
|
||||
"spf_result": "fail",
|
||||
"dkim_result": "fail"
|
||||
}
|
||||
}
|
||||
```
|
||||
@@ -0,0 +1,122 @@
|
||||
# SOAR Phishing Playbook API Reference
|
||||
|
||||
## Splunk SOAR REST API
|
||||
|
||||
### Authentication
|
||||
All requests require the `ph-auth-token` header:
|
||||
```
|
||||
ph-auth-token: <your-api-token>
|
||||
```
|
||||
|
||||
### Create Container (Incident)
|
||||
```
|
||||
POST /rest/container
|
||||
```
|
||||
```json
|
||||
{
|
||||
"name": "Phishing: Suspicious invoice email",
|
||||
"description": "User reported phishing email",
|
||||
"label": "phishing",
|
||||
"severity": "high",
|
||||
"status": "new",
|
||||
"sensitivity": "amber",
|
||||
"owner_id": 1,
|
||||
"tags": ["phishing", "email"]
|
||||
}
|
||||
```
|
||||
Response: `{"success": true, "id": 1542}`
|
||||
|
||||
### Create Artifact
|
||||
```
|
||||
POST /rest/artifact
|
||||
```
|
||||
```json
|
||||
{
|
||||
"container_id": 1542,
|
||||
"name": "Sender Email",
|
||||
"label": "email",
|
||||
"type": "email",
|
||||
"severity": "high",
|
||||
"cef": {
|
||||
"fromAddress": "attacker@evil.com",
|
||||
"toAddress": "victim@company.com",
|
||||
"emailSubject": "Urgent Invoice #9921",
|
||||
"sourceAddress": "198.51.100.23",
|
||||
"requestURL": "https://evil-phish.com/login"
|
||||
},
|
||||
"run_automation": true
|
||||
}
|
||||
```
|
||||
Response: `{"success": true, "id": 8834}`
|
||||
|
||||
### Trigger Playbook
|
||||
```
|
||||
POST /rest/playbook_run
|
||||
```
|
||||
```json
|
||||
{
|
||||
"container_id": 1542,
|
||||
"playbook_id": "local/phishing_investigate",
|
||||
"scope": "new",
|
||||
"run": true
|
||||
}
|
||||
```
|
||||
|
||||
### List Action Runs
|
||||
```
|
||||
GET /rest/action_run?_filter_container=1542&page_size=100
|
||||
```
|
||||
|
||||
### Get Container Details
|
||||
```
|
||||
GET /rest/container/{container_id}
|
||||
GET /rest/container/{container_id}/artifacts
|
||||
GET /rest/container/{container_id}/actions
|
||||
```
|
||||
|
||||
### Update Container Status
|
||||
```
|
||||
POST /rest/container/{container_id}
|
||||
```
|
||||
```json
|
||||
{"status": "closed", "close_reason": "resolved"}
|
||||
```
|
||||
|
||||
## XSOAR (Cortex XSOAR) API Comparison
|
||||
|
||||
### Create Incident
|
||||
```
|
||||
POST /incident
|
||||
```
|
||||
```json
|
||||
{
|
||||
"name": "Phishing Report",
|
||||
"type": "Phishing",
|
||||
"severity": 3,
|
||||
"labels": [
|
||||
{"type": "Email/from", "value": "attacker@evil.com"},
|
||||
{"type": "Email/subject", "value": "Urgent Invoice"}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Run Playbook on Incident
|
||||
```
|
||||
POST /incident/investigate
|
||||
```
|
||||
```json
|
||||
{"id": "1542", "playbookId": "phishing_investigation"}
|
||||
```
|
||||
|
||||
## Common Phishing Playbook Actions
|
||||
|
||||
| Action | App | Description |
|
||||
|--------|-----|-------------|
|
||||
| `url reputation` | VirusTotal | Check URL against VT database |
|
||||
| `domain reputation` | VirusTotal | Check sender domain reputation |
|
||||
| `ip reputation` | AbuseIPDB | Check originating IP reputation |
|
||||
| `whois domain` | WHOIS | Domain registration lookup |
|
||||
| `detonate url` | URLScan.io | Sandbox URL detonation |
|
||||
| `get email headers` | IMAP | Retrieve full email headers |
|
||||
| `block sender` | Exchange | Block sender at email gateway |
|
||||
| `quarantine email` | O365 | Remove email from all mailboxes |
|
||||
@@ -0,0 +1,245 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Splunk SOAR phishing playbook automation via REST API."""
|
||||
|
||||
import argparse
|
||||
import email
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
import requests
|
||||
from email import policy
|
||||
from email.parser import BytesParser
|
||||
|
||||
|
||||
URL_PATTERN = re.compile(r'https?://[^\s<>"\']+', re.IGNORECASE)
|
||||
IP_PATTERN = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
|
||||
|
||||
|
||||
class SOARClient:
|
||||
def __init__(self, base_url: str, token: str, verify_ssl: bool = True):
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
"ph-auth-token": token,
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
self.session.verify = verify_ssl
|
||||
|
||||
def create_container(self, name: str, description: str, severity: str,
|
||||
label: str = "events") -> dict:
|
||||
payload = {
|
||||
"name": name,
|
||||
"description": description,
|
||||
"severity": severity,
|
||||
"label": label,
|
||||
"status": "new",
|
||||
"sensitivity": "amber",
|
||||
}
|
||||
resp = self.session.post(f"{self.base_url}/rest/container", json=payload)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return {"container_id": data.get("id"), "success": data.get("success", False)}
|
||||
|
||||
def add_artifact(self, container_id: int, name: str, cef: dict,
|
||||
label: str = "event", severity: str = "medium",
|
||||
artifact_type: str = "network", run_automation: bool = False) -> dict:
|
||||
payload = {
|
||||
"container_id": container_id,
|
||||
"name": name,
|
||||
"label": label,
|
||||
"severity": severity,
|
||||
"type": artifact_type,
|
||||
"cef": cef,
|
||||
"run_automation": run_automation,
|
||||
}
|
||||
resp = self.session.post(f"{self.base_url}/rest/artifact", json=payload)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return {"artifact_id": data.get("id"), "success": data.get("success", False)}
|
||||
|
||||
def trigger_playbook(self, playbook_name: str, container_id: int,
|
||||
scope: str = "new") -> dict:
|
||||
payload = {
|
||||
"container_id": container_id,
|
||||
"playbook_id": playbook_name,
|
||||
"scope": scope,
|
||||
"run": True,
|
||||
}
|
||||
resp = self.session.post(f"{self.base_url}/rest/playbook_run", json=payload)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
def get_action_runs(self, container_id: int) -> list:
|
||||
resp = self.session.get(
|
||||
f"{self.base_url}/rest/action_run",
|
||||
params={"_filter_container": container_id, "page_size": 100}
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("data", [])
|
||||
|
||||
def poll_playbook(self, container_id: int, timeout: int = 300,
|
||||
interval: int = 10) -> list:
|
||||
terminal_states = {"success", "failed", "cancelled"}
|
||||
elapsed = 0
|
||||
while elapsed < timeout:
|
||||
runs = self.get_action_runs(container_id)
|
||||
if runs and all(r.get("status") in terminal_states for r in runs):
|
||||
return runs
|
||||
time.sleep(interval)
|
||||
elapsed += interval
|
||||
return self.get_action_runs(container_id)
|
||||
|
||||
|
||||
def parse_email_file(email_path: str) -> dict:
|
||||
with open(email_path, "rb") as f:
|
||||
msg = BytesParser(policy=policy.default).parse(f)
|
||||
|
||||
headers = {
|
||||
"from": msg.get("From", ""),
|
||||
"to": msg.get("To", ""),
|
||||
"subject": msg.get("Subject", ""),
|
||||
"reply_to": msg.get("Reply-To", ""),
|
||||
"return_path": msg.get("Return-Path", ""),
|
||||
"message_id": msg.get("Message-ID", ""),
|
||||
"date": msg.get("Date", ""),
|
||||
"x_mailer": msg.get("X-Mailer", ""),
|
||||
}
|
||||
|
||||
received_headers = msg.get_all("Received", [])
|
||||
auth_results = msg.get("Authentication-Results", "")
|
||||
spf_result = "none"
|
||||
dkim_result = "none"
|
||||
dmarc_result = "none"
|
||||
if "spf=pass" in auth_results.lower():
|
||||
spf_result = "pass"
|
||||
elif "spf=fail" in auth_results.lower():
|
||||
spf_result = "fail"
|
||||
if "dkim=pass" in auth_results.lower():
|
||||
dkim_result = "pass"
|
||||
elif "dkim=fail" in auth_results.lower():
|
||||
dkim_result = "fail"
|
||||
if "dmarc=pass" in auth_results.lower():
|
||||
dmarc_result = "pass"
|
||||
elif "dmarc=fail" in auth_results.lower():
|
||||
dmarc_result = "fail"
|
||||
|
||||
body_text = ""
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
if part.get_content_type() == "text/plain":
|
||||
body_text += part.get_content()
|
||||
elif part.get_content_type() == "text/html":
|
||||
body_text += part.get_content()
|
||||
else:
|
||||
body_text = msg.get_content()
|
||||
|
||||
urls = list(set(URL_PATTERN.findall(body_text)))
|
||||
originating_ips = []
|
||||
for recv in received_headers:
|
||||
originating_ips.extend(IP_PATTERN.findall(recv))
|
||||
originating_ips = list(set(originating_ips))
|
||||
|
||||
return {
|
||||
"headers": headers,
|
||||
"received_count": len(received_headers),
|
||||
"auth": {"spf": spf_result, "dkim": dkim_result, "dmarc": dmarc_result},
|
||||
"urls": urls,
|
||||
"originating_ips": originating_ips,
|
||||
}
|
||||
|
||||
|
||||
def run_phishing_workflow(args) -> dict:
|
||||
email_data = parse_email_file(args.email_file)
|
||||
client = SOARClient(args.soar_url, args.token, verify_ssl=not args.no_verify)
|
||||
|
||||
sender = email_data["headers"]["from"]
|
||||
subject = email_data["headers"]["subject"]
|
||||
severity = "high" if email_data["auth"]["spf"] == "fail" else "medium"
|
||||
|
||||
container = client.create_container(
|
||||
name=f"Phishing Report: {subject[:80]}",
|
||||
description=f"Reported phishing email from {sender}",
|
||||
severity=severity,
|
||||
label="phishing",
|
||||
)
|
||||
cid = container["container_id"]
|
||||
|
||||
artifacts_created = 0
|
||||
client.add_artifact(cid, "Email Headers", {
|
||||
"fromAddress": sender,
|
||||
"toAddress": email_data["headers"]["to"],
|
||||
"emailSubject": subject,
|
||||
"emailMessageId": email_data["headers"]["message_id"],
|
||||
"emailReplyTo": email_data["headers"]["reply_to"],
|
||||
"emailReturnPath": email_data["headers"]["return_path"],
|
||||
}, label="email", artifact_type="email", severity=severity)
|
||||
artifacts_created += 1
|
||||
|
||||
for ip in email_data["originating_ips"]:
|
||||
client.add_artifact(cid, f"Originating IP: {ip}", {
|
||||
"sourceAddress": ip,
|
||||
}, label="email", artifact_type="ip", severity="medium")
|
||||
artifacts_created += 1
|
||||
|
||||
url_list = email_data["urls"]
|
||||
for i, url in enumerate(url_list):
|
||||
is_last = (i == len(url_list) - 1) and not args.playbook
|
||||
client.add_artifact(cid, f"Embedded URL: {url[:60]}", {
|
||||
"requestURL": url,
|
||||
}, label="email", artifact_type="url", severity="high",
|
||||
run_automation=is_last)
|
||||
artifacts_created += 1
|
||||
|
||||
playbook_result = None
|
||||
if args.playbook:
|
||||
playbook_result = client.trigger_playbook(args.playbook, cid)
|
||||
action_runs = client.poll_playbook(cid, timeout=args.poll_timeout)
|
||||
playbook_result["action_runs"] = len(action_runs)
|
||||
playbook_result["actions_completed"] = sum(
|
||||
1 for r in action_runs if r.get("status") == "success"
|
||||
)
|
||||
|
||||
return {
|
||||
"incident": {
|
||||
"container_id": cid,
|
||||
"status": "new",
|
||||
"severity": severity,
|
||||
"artifacts_created": artifacts_created,
|
||||
},
|
||||
"email_analysis": {
|
||||
"sender": sender,
|
||||
"subject": subject,
|
||||
"urls_found": len(url_list),
|
||||
"originating_ips": email_data["originating_ips"],
|
||||
"auth": email_data["auth"],
|
||||
},
|
||||
"playbook": playbook_result,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="SOAR Phishing Playbook Automation")
|
||||
parser.add_argument("--soar-url", required=True, help="Splunk SOAR base URL")
|
||||
parser.add_argument("--token", required=True, help="SOAR API auth token")
|
||||
parser.add_argument("--email-file", required=True, help="Path to .eml phishing email file")
|
||||
parser.add_argument("--playbook", default=None,
|
||||
help="Playbook name or ID to trigger")
|
||||
parser.add_argument("--poll-timeout", type=int, default=300,
|
||||
help="Max seconds to poll for playbook completion")
|
||||
parser.add_argument("--no-verify", action="store_true",
|
||||
help="Disable SSL certificate verification")
|
||||
parser.add_argument("--output", default=None, help="Output JSON file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
result = run_phishing_workflow(args)
|
||||
report = json.dumps(result, indent=2)
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(report)
|
||||
print(report)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,8 @@
|
||||
MIT License
|
||||
Copyright (c) 2025 Mahipal
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@@ -0,0 +1,66 @@
|
||||
---
|
||||
name: performing-bluetooth-security-assessment
|
||||
description: Assess Bluetooth Low Energy device security by scanning, enumerating GATT services, and detecting vulnerabilities
|
||||
domain: cybersecurity
|
||||
subdomain: wireless-security
|
||||
tags: [bluetooth, ble, gatt, wireless-security]
|
||||
version: "1.0"
|
||||
author: mahipal
|
||||
license: Apache-2.0
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
This skill covers performing Bluetooth Low Energy (BLE) security assessments using the Python bleak library. BLE devices are ubiquitous in IoT, healthcare, fitness, and smart home applications, and many ship with weak or absent security controls. This assessment identifies unencrypted GATT characteristics, devices broadcasting sensitive data, known vulnerable device fingerprints, and improperly secured pairing configurations.
|
||||
|
||||
The agent uses bleak's asyncio API to discover nearby BLE devices, connect to target devices, enumerate all GATT services and characteristics, and analyze security properties of each characteristic. It flags characteristics that allow unauthenticated read/write access to sensitive data and identifies devices matching known vulnerable profiles.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.9 or later
|
||||
- bleak library (`pip install bleak`)
|
||||
- Bluetooth adapter supporting BLE (Bluetooth 4.0+)
|
||||
- Linux: BlueZ 5.43+ with D-Bus permissions
|
||||
- Windows: Windows 10 version 1709+ with Bluetooth support
|
||||
- macOS: macOS 10.15+ with CoreBluetooth
|
||||
|
||||
## Steps
|
||||
|
||||
1. **Scan for BLE devices**: Use BleakScanner to discover all advertising BLE devices within range. Capture device name, address (MAC), RSSI signal strength, and advertised service UUIDs.
|
||||
|
||||
2. **Identify target devices**: Filter discovered devices by name pattern, address, or minimum signal strength. Flag devices broadcasting default or well-known vulnerable names.
|
||||
|
||||
3. **Connect and enumerate GATT services**: Use BleakClient to connect to the target device and iterate over all GATT services. For each service, record the UUID, description, and contained characteristics.
|
||||
|
||||
4. **Analyze characteristic properties**: For each characteristic, examine its properties (read, write, write-without-response, notify, indicate). Flag characteristics that expose read or write access without requiring authentication or encryption.
|
||||
|
||||
5. **Check for known vulnerable UUIDs**: Compare discovered service and characteristic UUIDs against a database of known vulnerable or sensitive services (Heart Rate, Blood Pressure, Device Information, Battery Level) that should require encryption.
|
||||
|
||||
6. **Detect unencrypted data exposure**: Attempt to read characteristics that should be protected. Successful unauthenticated reads of sensitive data indicate missing security controls.
|
||||
|
||||
7. **Generate security report**: Compile all findings into a structured JSON report with severity classifications and remediation recommendations.
|
||||
|
||||
## Expected Output
|
||||
|
||||
```json
|
||||
{
|
||||
"assessment_type": "ble_security_audit",
|
||||
"target_device": {
|
||||
"name": "SmartBand-XR",
|
||||
"address": "AA:BB:CC:DD:EE:FF",
|
||||
"rssi": -42
|
||||
},
|
||||
"services_found": 5,
|
||||
"characteristics_found": 18,
|
||||
"findings": [
|
||||
{
|
||||
"severity": "high",
|
||||
"finding": "Heart Rate Measurement readable without encryption",
|
||||
"uuid": "00002a37-0000-1000-8000-00805f9b34fb",
|
||||
"properties": ["read", "notify"],
|
||||
"remediation": "Enable encryption requirement on characteristic"
|
||||
}
|
||||
],
|
||||
"risk_score": 7.5
|
||||
}
|
||||
```
|
||||
@@ -0,0 +1,85 @@
|
||||
# BLE Security Assessment API Reference
|
||||
|
||||
## Bleak Python Library (v0.21+)
|
||||
|
||||
### Device Discovery
|
||||
```python
|
||||
from bleak import BleakScanner
|
||||
|
||||
# Scan with advertisement data
|
||||
devices = await BleakScanner.discover(timeout=10.0, return_adv=True)
|
||||
# Returns: dict[str, tuple[BLEDevice, AdvertisementData]]
|
||||
|
||||
# Find specific device
|
||||
device = await BleakScanner.find_device_by_name("DeviceName", timeout=10.0)
|
||||
device = await BleakScanner.find_device_by_address("AA:BB:CC:DD:EE:FF", timeout=10.0)
|
||||
```
|
||||
|
||||
### GATT Client Operations
|
||||
```python
|
||||
from bleak import BleakClient
|
||||
|
||||
async with BleakClient(address, timeout=15.0) as client:
|
||||
# Enumerate services
|
||||
for service in client.services:
|
||||
print(service.uuid, service.description)
|
||||
for char in service.characteristics:
|
||||
print(char.uuid, char.properties, char.descriptors)
|
||||
|
||||
# Read characteristic
|
||||
value = await client.read_gatt_char("00002a19-0000-1000-8000-00805f9b34fb")
|
||||
|
||||
# Write characteristic
|
||||
await client.write_gatt_char(char_uuid, bytearray([0x01, 0x02]))
|
||||
|
||||
# Subscribe to notifications
|
||||
await client.start_notify(char_uuid, callback)
|
||||
await client.stop_notify(char_uuid)
|
||||
```
|
||||
|
||||
## Common GATT Service UUIDs
|
||||
|
||||
| UUID (16-bit) | Service Name |
|
||||
|---------------|-------------|
|
||||
| `0x180D` | Heart Rate |
|
||||
| `0x1810` | Blood Pressure |
|
||||
| `0x1808` | Glucose |
|
||||
| `0x180F` | Battery Service |
|
||||
| `0x180A` | Device Information |
|
||||
| `0x1812` | Human Interface Device |
|
||||
| `0x1811` | Alert Notification |
|
||||
| `0x1802` | Immediate Alert |
|
||||
| `0x1803` | Link Loss |
|
||||
|
||||
## BLE Security Modes
|
||||
|
||||
| Mode | Level | Description |
|
||||
|------|-------|-------------|
|
||||
| LE Security Mode 1 | Level 1 | No security (no auth, no encryption) |
|
||||
| LE Security Mode 1 | Level 2 | Unauthenticated pairing with encryption |
|
||||
| LE Security Mode 1 | Level 3 | Authenticated pairing with encryption |
|
||||
| LE Security Mode 1 | Level 4 | Authenticated LE Secure Connections |
|
||||
| LE Security Mode 2 | Level 1 | Unauthenticated data signing |
|
||||
| LE Security Mode 2 | Level 2 | Authenticated data signing |
|
||||
|
||||
## Linux BlueZ Commands
|
||||
|
||||
```bash
|
||||
# Scan for BLE devices
|
||||
sudo hcitool lescan
|
||||
|
||||
# Device info
|
||||
sudo hcitool leinfo AA:BB:CC:DD:EE:FF
|
||||
|
||||
# Interactive GATT tool
|
||||
gatttool -b AA:BB:CC:DD:EE:FF -I
|
||||
> connect
|
||||
> primary # List services
|
||||
> characteristics # List characteristics
|
||||
> char-read-hnd 0x000e
|
||||
|
||||
# btmgmt commands
|
||||
sudo btmgmt info
|
||||
sudo btmgmt find -l
|
||||
sudo btmgmt pair -c 3 -t 0 AA:BB:CC:DD:EE:FF
|
||||
```
|
||||
@@ -0,0 +1,187 @@
|
||||
#!/usr/bin/env python3
|
||||
"""BLE security assessment using bleak for device scanning and GATT enumeration."""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
|
||||
from bleak import BleakClient, BleakScanner
|
||||
from bleak.backends.characteristic import BleakGATTCharacteristic
|
||||
|
||||
|
||||
SENSITIVE_SERVICE_UUIDS = {
|
||||
"0000180d-0000-1000-8000-00805f9b34fb": "Heart Rate",
|
||||
"00001810-0000-1000-8000-00805f9b34fb": "Blood Pressure",
|
||||
"00001808-0000-1000-8000-00805f9b34fb": "Glucose",
|
||||
"00001809-0000-1000-8000-00805f9b34fb": "Health Thermometer",
|
||||
"0000180f-0000-1000-8000-00805f9b34fb": "Battery Service",
|
||||
"0000180a-0000-1000-8000-00805f9b34fb": "Device Information",
|
||||
"00001812-0000-1000-8000-00805f9b34fb": "Human Interface Device",
|
||||
"00001802-0000-1000-8000-00805f9b34fb": "Immediate Alert",
|
||||
}
|
||||
|
||||
SENSITIVE_CHAR_UUIDS = {
|
||||
"00002a37-0000-1000-8000-00805f9b34fb": "Heart Rate Measurement",
|
||||
"00002a35-0000-1000-8000-00805f9b34fb": "Blood Pressure Measurement",
|
||||
"00002a18-0000-1000-8000-00805f9b34fb": "Glucose Measurement",
|
||||
"00002a1c-0000-1000-8000-00805f9b34fb": "Temperature Measurement",
|
||||
"00002a19-0000-1000-8000-00805f9b34fb": "Battery Level",
|
||||
"00002a29-0000-1000-8000-00805f9b34fb": "Manufacturer Name",
|
||||
"00002a26-0000-1000-8000-00805f9b34fb": "Firmware Revision",
|
||||
"00002a28-0000-1000-8000-00805f9b34fb": "Software Revision",
|
||||
"00002a25-0000-1000-8000-00805f9b34fb": "Serial Number",
|
||||
}
|
||||
|
||||
VULNERABLE_DEVICE_PATTERNS = [
|
||||
"ITAG", "SmartLock", "BLE_Door", "FitBand", "iTag",
|
||||
"CC2541", "HM-10", "JDY-08", "AT-09", "MLT-BT05",
|
||||
]
|
||||
|
||||
|
||||
async def scan_devices(scan_time: float) -> list:
|
||||
devices = await BleakScanner.discover(timeout=scan_time, return_adv=True)
|
||||
results = []
|
||||
for addr, (device, adv_data) in devices.items():
|
||||
name = adv_data.local_name or device.name or "Unknown"
|
||||
vuln_match = None
|
||||
for pattern in VULNERABLE_DEVICE_PATTERNS:
|
||||
if pattern.lower() in name.lower():
|
||||
vuln_match = pattern
|
||||
break
|
||||
results.append({
|
||||
"address": str(addr),
|
||||
"name": name,
|
||||
"rssi": adv_data.rssi,
|
||||
"service_uuids": [str(u) for u in (adv_data.service_uuids or [])],
|
||||
"manufacturer_data": {str(k): v.hex() for k, v in (adv_data.manufacturer_data or {}).items()},
|
||||
"known_vulnerable_pattern": vuln_match,
|
||||
})
|
||||
results.sort(key=lambda d: d["rssi"], reverse=True)
|
||||
return results
|
||||
|
||||
|
||||
async def enumerate_gatt(device_address: str) -> dict:
|
||||
findings = []
|
||||
services_info = []
|
||||
total_chars = 0
|
||||
async with BleakClient(device_address, timeout=15.0) as client:
|
||||
if not client.is_connected:
|
||||
return {"error": f"Failed to connect to {device_address}"}
|
||||
for service in client.services:
|
||||
svc_uuid = str(service.uuid)
|
||||
svc_name = SENSITIVE_SERVICE_UUIDS.get(svc_uuid, service.description or "Unknown")
|
||||
is_sensitive_svc = svc_uuid in SENSITIVE_SERVICE_UUIDS
|
||||
chars_info = []
|
||||
for char in service.characteristics:
|
||||
total_chars += 1
|
||||
char_uuid = str(char.uuid)
|
||||
props = char.properties
|
||||
char_name = SENSITIVE_CHAR_UUIDS.get(char_uuid, char.description or "Unknown")
|
||||
is_sensitive_char = char_uuid in SENSITIVE_CHAR_UUIDS
|
||||
char_entry = {
|
||||
"uuid": char_uuid,
|
||||
"name": char_name,
|
||||
"properties": list(props),
|
||||
"handle": char.handle,
|
||||
}
|
||||
if is_sensitive_char and ("read" in props):
|
||||
findings.append({
|
||||
"severity": "high",
|
||||
"finding": f"{char_name} readable without encryption",
|
||||
"uuid": char_uuid,
|
||||
"service": svc_name,
|
||||
"properties": list(props),
|
||||
"remediation": "Enable encryption requirement on characteristic",
|
||||
})
|
||||
if "write-without-response" in props and is_sensitive_svc:
|
||||
findings.append({
|
||||
"severity": "critical",
|
||||
"finding": f"{char_name} writable without response in sensitive service",
|
||||
"uuid": char_uuid,
|
||||
"service": svc_name,
|
||||
"properties": list(props),
|
||||
"remediation": "Remove write-without-response or require authenticated pairing",
|
||||
})
|
||||
if "write" in props and not is_sensitive_svc:
|
||||
findings.append({
|
||||
"severity": "medium",
|
||||
"finding": f"{char_name} writable without known authentication",
|
||||
"uuid": char_uuid,
|
||||
"service": svc_name,
|
||||
"properties": list(props),
|
||||
"remediation": "Verify write access requires bonded connection",
|
||||
})
|
||||
chars_info.append(char_entry)
|
||||
services_info.append({
|
||||
"uuid": svc_uuid,
|
||||
"name": svc_name,
|
||||
"sensitive": is_sensitive_svc,
|
||||
"characteristics": chars_info,
|
||||
})
|
||||
severity_weights = {"critical": 10, "high": 7, "medium": 4, "low": 1}
|
||||
risk_total = sum(severity_weights.get(f["severity"], 0) for f in findings)
|
||||
risk_score = min(10.0, round(risk_total / max(len(findings), 1), 1))
|
||||
return {
|
||||
"services_found": len(services_info),
|
||||
"characteristics_found": total_chars,
|
||||
"services": services_info,
|
||||
"findings": findings,
|
||||
"risk_score": risk_score,
|
||||
}
|
||||
|
||||
|
||||
async def run_audit(device_address: str, scan_time: float) -> dict:
|
||||
scan_results = await scan_devices(scan_time)
|
||||
target = None
|
||||
for dev in scan_results:
|
||||
if dev["address"].upper() == device_address.upper():
|
||||
target = dev
|
||||
break
|
||||
if not target:
|
||||
return {"error": f"Device {device_address} not found in scan", "scanned_devices": len(scan_results)}
|
||||
gatt_result = await enumerate_gatt(device_address)
|
||||
return {
|
||||
"assessment_type": "ble_security_audit",
|
||||
"target_device": target,
|
||||
**gatt_result,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="BLE Security Assessment Tool")
|
||||
parser.add_argument("--action", choices=["scan", "enumerate", "audit"],
|
||||
required=True, help="Action to perform")
|
||||
parser.add_argument("--scan-time", type=float, default=10.0,
|
||||
help="BLE scan duration in seconds")
|
||||
parser.add_argument("--device-address", type=str, default=None,
|
||||
help="Target BLE device address (MAC or UUID)")
|
||||
parser.add_argument("--output", type=str, default=None,
|
||||
help="Output JSON file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.action in ("enumerate", "audit") and not args.device_address:
|
||||
print(json.dumps({"error": "Device address required for enumerate/audit"}))
|
||||
sys.exit(1)
|
||||
|
||||
start = time.time()
|
||||
if args.action == "scan":
|
||||
result = asyncio.run(scan_devices(args.scan_time))
|
||||
output = {"action": "scan", "devices_found": len(result), "devices": result}
|
||||
elif args.action == "enumerate":
|
||||
result = asyncio.run(enumerate_gatt(args.device_address))
|
||||
output = {"action": "enumerate", "target": args.device_address, **result}
|
||||
elif args.action == "audit":
|
||||
output = asyncio.run(run_audit(args.device_address, args.scan_time))
|
||||
|
||||
output["elapsed_seconds"] = round(time.time() - start, 2)
|
||||
report = json.dumps(output, indent=2)
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(report)
|
||||
print(report)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user