feat: add 4 new cybersecurity skills - UEBA insider threat, BeyondCorp zero trust, Linux kernel rootkits, CobaltStrike beacon hunting

This commit is contained in:
mukul975
2026-03-11 00:48:50 +01:00
parent 6b32dc4da2
commit 0c26c1eb87
16 changed files with 1371 additions and 0 deletions
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,42 @@
---
name: analyzing-linux-kernel-rootkits
description: Detect kernel-level rootkits in Linux memory dumps using Volatility3 linux plugins (check_syscall, lsmod, hidden_modules), rkhunter system scanning, and /proc vs /sys discrepancy analysis to identify hooked syscalls, hidden kernel modules, and tampered system structures.
domain: cybersecurity
subdomain: digital-forensics
tags: [rootkit, linux, kernel, volatility3, memory-forensics, malware-analysis, rkhunter, forensics]
version: "1.0"
author: mahipal
license: Apache-2.0
---
# Analyzing Linux Kernel Rootkits
## Overview
Linux kernel rootkits operate at ring 0, modifying kernel data structures to hide processes, files, network connections, and kernel modules from userspace tools. Detection requires either memory forensics (analyzing physical memory dumps with Volatility3) or cross-view analysis (comparing /proc, /sys, and kernel data structures for inconsistencies). This skill covers using Volatility3 Linux plugins to detect syscall table hooks, hidden kernel modules, and modified function pointers, supplemented by live system scanning with rkhunter and chkrootkit.
## Prerequisites
- Volatility3 installed (pip install volatility3)
- Linux memory dump (acquired via LiME, AVML, or /proc/kcore)
- Volatility3 Linux symbol table (ISF) matching the target kernel version
- rkhunter and chkrootkit for live system scanning
- Reference known-good kernel image for comparison
## Steps
### Step 1: Acquire Memory Dump
Capture Linux physical memory using LiME kernel module or AVML for cloud instances.
### Step 2: Analyze with Volatility3
Run linux.check_syscall, linux.lsmod, linux.hidden_modules, and linux.check_idt plugins to detect rootkit artifacts.
### Step 3: Cross-View Analysis
Compare module lists from /proc/modules, lsmod, and /sys/module to identify modules hidden from one view but present in another.
### Step 4: Live System Scanning
Run rkhunter and chkrootkit to detect known rootkit signatures, suspicious files, and modified system binaries.
## Expected Output
JSON report containing detected syscall hooks, hidden kernel modules, modified IDT entries, suspicious /proc discrepancies, and rkhunter findings.
@@ -0,0 +1,92 @@
# API Reference: Analyzing Linux Kernel Rootkits
## Volatility3 Linux Plugins
```bash
# Check syscall table for hooks
vol -f memory.lime linux.check_syscall.Check_syscall
# List loaded kernel modules
vol -f memory.lime linux.lsmod.Lsmod
# Detect hidden kernel modules
vol -f memory.lime linux.hidden_modules.Hidden_modules
# Check IDT for hooks
vol -f memory.lime linux.check_idt.Check_idt
# List processes (detect hidden)
vol -f memory.lime linux.pslist.PsList
vol -f memory.lime linux.pstree.PsTree
# Check for modified cred structures
vol -f memory.lime linux.check_creds.Check_creds
# Network connections
vol -f memory.lime linux.sockstat.Sockstat
# JSON output
vol -f memory.lime linux.check_syscall.Check_syscall -r json > syscalls.json
```
## Memory Acquisition Tools
| Tool | Command | Use Case |
|------|---------|----------|
| LiME | `insmod lime.ko "path=/tmp/mem.lime format=lime"` | Linux kernel module |
| AVML | `avml /tmp/memory.raw` | Azure/cloud instances |
| /proc/kcore | `dd if=/proc/kcore of=mem.raw` | Quick (partial) dump |
## Volatility3 Symbol Tables (ISF)
```bash
# Generate ISF from running kernel
vol -f memory.lime banners.Banners
# Download matching ISF from:
# https://github.com/volatilityfoundation/volatility3#symbol-tables
```
## rkhunter Commands
```bash
# Full system scan
rkhunter --check --skip-keypress --report-warnings-only
# Update signatures
rkhunter --update
# Check specific tests
rkhunter --check --enable rootkits,trojans,os_specific
# Output to log file
rkhunter --check --logfile /var/log/rkhunter.log
```
## Known Linux Rootkits Detected
| Rootkit | Technique | Volatility Plugin |
|---------|-----------|-------------------|
| Diamorphine | Hidden module + syscall hook | check_syscall, hidden_modules |
| Reptile | Syscall hook + port knocking | check_syscall |
| KBeast | Syscall hook + /proc hiding | check_syscall, hidden_modules |
| Adore-ng | VFS hook + hidden files | lsmod, check_syscall |
| Jynx2 | LD_PRELOAD userspace | pslist (parent check) |
## Cross-View Detection
```bash
# Compare /proc/modules vs /sys/module
diff <(cat /proc/modules | awk '{print $1}' | sort) \
<(ls /sys/module/ | sort)
# Check for hidden processes
diff <(ls /proc/ | grep -E '^[0-9]+$' | sort -n) \
<(ps -eo pid --no-headers | sort -n)
```
### References
- Volatility3 Linux Plugins: https://volatility3.readthedocs.io/en/latest/volatility3.plugins.linux.html
- LiME: https://github.com/504ensicsLabs/LiME
- rkhunter: http://rkhunter.sourceforge.net/
- MITRE T1014 Rootkit: https://attack.mitre.org/techniques/T1014/
@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""Linux Kernel Rootkit Detection Agent - analyzes memory dumps with Volatility3 and live system with rkhunter."""
import json
import argparse
import logging
import subprocess
import os
from collections import defaultdict
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def run_vol3_plugin(memory_dump, plugin, isf_url=None):
"""Run a Volatility3 Linux plugin and return parsed output."""
cmd = ["vol", "-f", memory_dump, plugin, "-r", "json"]
if isf_url:
cmd.extend(["--isf", isf_url])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
try:
return json.loads(result.stdout) if result.stdout else []
except json.JSONDecodeError:
logger.error("Volatility3 %s output parse failed", plugin)
return []
def check_syscall_hooks(memory_dump, isf_url=None):
"""Detect hooked system calls using linux.check_syscall."""
results = run_vol3_plugin(memory_dump, "linux.check_syscall.Check_syscall", isf_url)
hooked = []
for entry in results:
row = entry.get("__children", [entry]) if isinstance(entry, dict) else [entry]
for item in row:
symbol = item.get("Symbol", item.get("symbol", ""))
module = item.get("Module", item.get("module", ""))
if module and module != "kernel":
hooked.append({
"syscall_number": item.get("Index", item.get("index", "")),
"expected_handler": symbol,
"actual_module": module,
"severity": "critical",
"indicator": "syscall_hook",
})
return hooked
def detect_hidden_modules(memory_dump, isf_url=None):
"""Detect hidden kernel modules using cross-view analysis."""
lsmod_results = run_vol3_plugin(memory_dump, "linux.lsmod.Lsmod", isf_url)
hidden_results = run_vol3_plugin(memory_dump, "linux.hidden_modules.Hidden_modules", isf_url)
lsmod_names = set()
for entry in lsmod_results:
name = entry.get("Name", entry.get("name", ""))
if name:
lsmod_names.add(name)
hidden = []
for entry in hidden_results:
name = entry.get("Name", entry.get("name", ""))
if name:
hidden.append({
"module_name": name,
"in_lsmod": name in lsmod_names,
"severity": "critical",
"indicator": "hidden_kernel_module",
"detail": f"Module '{name}' hidden from standard listing",
})
return hidden
def check_idt_hooks(memory_dump, isf_url=None):
"""Check Interrupt Descriptor Table for hooks."""
results = run_vol3_plugin(memory_dump, "linux.check_idt.Check_idt", isf_url)
hooked = []
for entry in results:
module = entry.get("Module", entry.get("module", ""))
if module and module != "kernel":
hooked.append({
"interrupt": entry.get("Index", ""),
"handler_module": module,
"severity": "critical",
"indicator": "idt_hook",
})
return hooked
def run_rkhunter():
"""Run rkhunter rootkit scanner on live system."""
cmd = ["rkhunter", "--check", "--skip-keypress", "--report-warnings-only", "--nocolors"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
findings = []
for line in result.stdout.split("\n"):
line = line.strip()
if "Warning:" in line or "[ Warning ]" in line:
findings.append({
"tool": "rkhunter",
"finding": line.replace("Warning:", "").strip(),
"severity": "high",
})
return findings
def check_proc_sys_discrepancy():
"""Compare /proc/modules with /sys/module for hidden modules."""
findings = []
proc_modules = set()
sys_modules = set()
try:
with open("/proc/modules") as f:
for line in f:
proc_modules.add(line.split()[0])
except (FileNotFoundError, PermissionError):
return findings
try:
sys_modules = set(os.listdir("/sys/module"))
except (FileNotFoundError, PermissionError):
return findings
only_in_sys = sys_modules - proc_modules
for mod in only_in_sys:
if not os.path.exists(f"/sys/module/{mod}/initstate"):
continue
findings.append({
"module": mod, "indicator": "proc_sys_discrepancy",
"severity": "high",
"detail": f"Module '{mod}' in /sys/module but missing from /proc/modules",
})
return findings
def generate_report(syscall_hooks, hidden_mods, idt_hooks, rkhunter_findings, proc_findings, source):
all_findings = syscall_hooks + hidden_mods + idt_hooks + rkhunter_findings + proc_findings
critical = sum(1 for f in all_findings if f.get("severity") == "critical")
return {
"timestamp": datetime.utcnow().isoformat(),
"analysis_source": source,
"syscall_hooks": syscall_hooks,
"hidden_modules": hidden_mods,
"idt_hooks": idt_hooks,
"rkhunter_warnings": rkhunter_findings,
"proc_sys_discrepancies": proc_findings,
"total_findings": len(all_findings),
"critical_findings": critical,
"rootkit_detected": critical > 0,
}
def main():
parser = argparse.ArgumentParser(description="Linux Kernel Rootkit Detection Agent")
parser.add_argument("--memory-dump", help="Path to Linux memory dump for Volatility3 analysis")
parser.add_argument("--isf-url", help="Volatility3 ISF symbol table URL")
parser.add_argument("--live-scan", action="store_true", help="Run rkhunter + /proc analysis on live system")
parser.add_argument("--output", default="rootkit_detection_report.json")
args = parser.parse_args()
syscall_hooks, hidden_mods, idt_hooks = [], [], []
rkhunter_findings, proc_findings = [], []
source = "none"
if args.memory_dump:
source = f"memory_dump:{args.memory_dump}"
syscall_hooks = check_syscall_hooks(args.memory_dump, args.isf_url)
hidden_mods = detect_hidden_modules(args.memory_dump, args.isf_url)
idt_hooks = check_idt_hooks(args.memory_dump, args.isf_url)
if args.live_scan:
source = "live_system" if source == "none" else source + "+live_system"
rkhunter_findings = run_rkhunter()
proc_findings = check_proc_sys_discrepancy()
report = generate_report(syscall_hooks, hidden_mods, idt_hooks, rkhunter_findings, proc_findings, source)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info("Rootkit scan: %d findings (%d critical), rootkit detected: %s",
report["total_findings"], report["critical_findings"], report["rootkit_detected"])
print(json.dumps(report, indent=2, default=str))
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,42 @@
---
name: detecting-insider-threat-with-ueba
description: Implement User and Entity Behavior Analytics using Elasticsearch/OpenSearch to build behavioral baselines, calculate anomaly scores, perform peer group analysis, and detect insider threat indicators such as data exfiltration, privilege abuse, and unauthorized access patterns.
domain: cybersecurity
subdomain: threat-detection
tags: [ueba, insider-threat, anomaly-detection, elasticsearch, behavior-analytics, machine-learning, siem]
version: "1.0"
author: mahipal
license: Apache-2.0
---
# Detecting Insider Threat with UEBA
## Overview
User and Entity Behavior Analytics (UEBA) moves beyond static rule-based detection to model normal behavior for users, hosts, and applications, then flag statistically significant deviations that may indicate insider threats. Using Elasticsearch as the analytics backend, this skill covers building behavioral baselines from authentication logs, file access events, and network activity, computing risk scores using statistical deviation and peer group comparison, and correlating multiple low-confidence indicators into high-confidence insider threat alerts.
## Prerequisites
- Elasticsearch 8.x or OpenSearch 2.x cluster with security audit data
- Log sources: Active Directory authentication, VPN, DLP, file server access, email
- Python 3.9+ with elasticsearch client library
- Baseline period of 30+ days of normal user activity data
- Defined peer groups based on department, role, or job function
## Steps
### Step 1: Ingest and Normalize Activity Logs
Configure log pipelines to ingest authentication, file access, email, and network logs into Elasticsearch with a unified user identity field.
### Step 2: Build Behavioral Baselines
Calculate per-user baselines for login times, data volume, application usage, and access patterns over a rolling 30-day window using Elasticsearch aggregations.
### Step 3: Calculate Anomaly Scores
Compare current activity against baselines using z-score deviation and peer group comparison to generate per-user risk scores.
### Step 4: Correlate and Alert
Combine multiple anomalous indicators (unusual hours + large downloads + new system access) into composite risk scores that trigger SOC investigation workflows.
## Expected Output
JSON report containing per-user risk scores, anomalous activity details, peer group deviations, and recommended investigation actions.
@@ -0,0 +1,75 @@
# API Reference: Detecting Insider Threat with UEBA
## Elasticsearch Aggregation Queries
### Per-User Daily Activity Baseline
```json
{
"aggs": {
"users": {
"terms": {"field": "user.name", "size": 5000},
"aggs": {
"daily_events": {"date_histogram": {"field": "@timestamp", "calendar_interval": "day"}},
"unique_hosts": {"cardinality": {"field": "host.name"}},
"data_volume": {"sum": {"field": "bytes_transferred"}}
}
}
}
}
```
### Anomaly Detection (Z-Score > 3)
```python
from elasticsearch import Elasticsearch
es = Elasticsearch(["https://localhost:9200"], api_key="base64key")
result = es.search(index="logs-*", body=query)
z_score = (current - baseline_avg) / baseline_std
```
## Insider Threat Indicators
| Indicator | Detection Method | Severity |
|-----------|-----------------|----------|
| Activity spike | Z-score > 3 standard deviations | High |
| Data exfiltration | Volume > 5x daily average | Critical |
| New host access | Unique hosts > 2x baseline | High |
| Off-hours activity | Login outside 06:00-22:00 | Medium |
| Peer group outlier | Activity > 3x peer average | Medium |
| Privilege escalation | New admin role assignment | Critical |
| Resignation + download | HR flag + high data volume | Critical |
## Elasticsearch Python Client
```bash
pip install elasticsearch>=8.0
```
| Method | Description |
|--------|-------------|
| `es.search(index, body)` | Execute aggregation query |
| `es.indices.get_alias("logs-*")` | List matching indices |
| `es.count(index)` | Get document count |
## Risk Scoring Model
| Score Range | Risk Level | Action |
|-------------|------------|--------|
| 0 - 30 | Low | No action |
| 31 - 60 | Medium | Monitor |
| 61 - 80 | High | SOC investigation |
| 81 - 100 | Critical | Immediate response |
## MITRE ATT&CK Insider Techniques
| Technique | ID | UEBA Detection |
|-----------|----|----------------|
| Data from Local System | T1005 | Volume anomaly on file servers |
| Exfiltration Over Web Service | T1567 | Cloud upload volume spike |
| Account Manipulation | T1098 | Unusual privilege changes |
| Valid Accounts | T1078 | Off-hours or location anomaly |
### References
- Elasticsearch Python Client: https://elasticsearch-py.readthedocs.io/
- MITRE Insider Threat: https://attack.mitre.org/techniques/T1078/
- NIST SP 800-53 AC-2: https://csf.tools/reference/nist-sp-800-53/r5/ac/ac-2/
@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""UEBA Insider Threat Agent - builds behavioral baselines and scores anomalies using Elasticsearch."""
import json
import argparse
import logging
import math
from collections import defaultdict
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def connect_es(hosts, api_key=None):
"""Connect to Elasticsearch cluster."""
kwargs = {"hosts": hosts, "verify_certs": False, "request_timeout": 30}
if api_key:
kwargs["api_key"] = api_key
return Elasticsearch(**kwargs)
def build_user_baseline(es, index, user_field, hours=720):
"""Build 30-day behavioral baseline per user using ES aggregations."""
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
query = {
"size": 0,
"query": {"range": {"@timestamp": {"gte": since}}},
"aggs": {
"users": {
"terms": {"field": user_field, "size": 5000},
"aggs": {
"login_hours": {"histogram": {"field": "hour_of_day", "interval": 1}},
"daily_events": {"date_histogram": {"field": "@timestamp", "calendar_interval": "day"}},
"unique_hosts": {"cardinality": {"field": "host.name"}},
"data_volume": {"sum": {"field": "bytes_transferred"}},
"unique_apps": {"cardinality": {"field": "application.name"}},
}
}
}
}
result = es.search(index=index, body=query)
baselines = {}
for bucket in result["aggregations"]["users"]["buckets"]:
user = bucket["key"]
daily_counts = [d["doc_count"] for d in bucket["daily_events"]["buckets"]]
avg_daily = sum(daily_counts) / max(len(daily_counts), 1)
std_daily = math.sqrt(sum((x - avg_daily) ** 2 for x in daily_counts) / max(len(daily_counts), 1))
baselines[user] = {
"avg_daily_events": round(avg_daily, 1),
"std_daily_events": round(std_daily, 1),
"unique_hosts": bucket["unique_hosts"]["value"],
"total_data_volume": bucket["data_volume"]["value"],
"total_events": bucket["doc_count"],
}
return baselines
def score_current_activity(es, index, user_field, baselines, hours=24):
"""Score current activity against baselines to find anomalies."""
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
query = {
"size": 0,
"query": {"range": {"@timestamp": {"gte": since}}},
"aggs": {
"users": {
"terms": {"field": user_field, "size": 5000},
"aggs": {
"unique_hosts": {"cardinality": {"field": "host.name"}},
"data_volume": {"sum": {"field": "bytes_transferred"}},
"unique_apps": {"cardinality": {"field": "application.name"}},
}
}
}
}
result = es.search(index=index, body=query)
anomalies = []
for bucket in result["aggregations"]["users"]["buckets"]:
user = bucket["key"]
baseline = baselines.get(user)
if not baseline:
anomalies.append({
"user": user, "indicator": "new_user",
"severity": "medium", "detail": "No baseline exists for this user",
"risk_score": 50,
})
continue
current_events = bucket["doc_count"]
avg = baseline["avg_daily_events"]
std = baseline["std_daily_events"]
z_score = (current_events - avg) / max(std, 1)
if z_score > 3:
anomalies.append({
"user": user, "indicator": "activity_spike",
"severity": "high", "z_score": round(z_score, 2),
"current": current_events, "baseline_avg": avg,
"risk_score": min(int(z_score * 15), 100),
"detail": f"Event count {current_events} is {z_score:.1f} std devs above baseline",
})
current_hosts = bucket["unique_hosts"]["value"]
if current_hosts > baseline["unique_hosts"] * 2:
anomalies.append({
"user": user, "indicator": "new_host_access",
"severity": "high",
"current_hosts": current_hosts,
"baseline_hosts": baseline["unique_hosts"],
"risk_score": 70,
"detail": f"Accessed {current_hosts} hosts vs baseline {baseline['unique_hosts']}",
})
current_volume = bucket["data_volume"]["value"]
daily_avg_volume = baseline["total_data_volume"] / 30
if current_volume > daily_avg_volume * 5 and current_volume > 100_000_000:
anomalies.append({
"user": user, "indicator": "data_exfiltration",
"severity": "critical",
"current_bytes": current_volume,
"baseline_daily_avg": round(daily_avg_volume),
"risk_score": 90,
"detail": f"Transferred {current_volume / 1e6:.0f}MB vs daily avg {daily_avg_volume / 1e6:.1f}MB",
})
return sorted(anomalies, key=lambda x: x.get("risk_score", 0), reverse=True)
def peer_group_analysis(baselines, peer_groups):
"""Compare user activity against peer group averages."""
findings = []
group_stats = defaultdict(list)
for user, baseline in baselines.items():
group = peer_groups.get(user, "default")
group_stats[group].append(baseline["avg_daily_events"])
group_avgs = {g: sum(v) / len(v) for g, v in group_stats.items()}
for user, baseline in baselines.items():
group = peer_groups.get(user, "default")
group_avg = group_avgs.get(group, 0)
if group_avg > 0 and baseline["avg_daily_events"] > group_avg * 3:
findings.append({
"user": user, "peer_group": group,
"user_avg": baseline["avg_daily_events"],
"group_avg": round(group_avg, 1),
"deviation_factor": round(baseline["avg_daily_events"] / group_avg, 1),
"severity": "medium",
})
return findings
def generate_report(anomalies, peer_findings, baselines):
critical = sum(1 for a in anomalies if a.get("severity") == "critical")
return {
"timestamp": datetime.utcnow().isoformat(),
"users_baselined": len(baselines),
"anomalies_detected": len(anomalies),
"critical_anomalies": critical,
"top_risk_users": anomalies[:15],
"peer_group_outliers": peer_findings[:10],
"risk_level": "critical" if critical > 0 else "high" if anomalies else "low",
}
def main():
parser = argparse.ArgumentParser(description="UEBA Insider Threat Detection Agent")
parser.add_argument("--es-hosts", default="https://localhost:9200", help="Elasticsearch hosts")
parser.add_argument("--api-key", help="Elasticsearch API key")
parser.add_argument("--index", default="logs-*", help="Log index pattern")
parser.add_argument("--user-field", default="user.name", help="User identity field")
parser.add_argument("--peer-groups", help="JSON file mapping users to peer groups")
parser.add_argument("--lookback", type=int, default=24, help="Anomaly lookback hours")
parser.add_argument("--output", default="ueba_insider_threat_report.json")
args = parser.parse_args()
es = connect_es(args.es_hosts.split(","), args.api_key)
baselines = build_user_baseline(es, args.index, args.user_field)
anomalies = score_current_activity(es, args.index, args.user_field, baselines, args.lookback)
peer_groups = {}
if args.peer_groups:
with open(args.peer_groups) as f:
peer_groups = json.load(f)
peer_findings = peer_group_analysis(baselines, peer_groups)
report = generate_report(anomalies, peer_findings, baselines)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info("UEBA: %d users baselined, %d anomalies (%d critical)",
len(baselines), len(anomalies), report["critical_anomalies"])
print(json.dumps(report, indent=2, default=str))
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,43 @@
---
name: hunting-for-cobalt-strike-beacons
description: Detect Cobalt Strike beacon network activity using default TLS certificate signatures (serial 8BB00EE), JA3/JA3S/JARM fingerprints, HTTP C2 profile pattern matching, beacon jitter analysis, and named pipe detection via Zeek, Suricata, and Python PCAP analysis.
domain: cybersecurity
subdomain: threat-hunting
tags: [cobalt-strike, beacon, threat-hunting, c2, zeek, suricata, ja3, jarm, network-forensics]
version: "1.0"
author: mahipal
license: Apache-2.0
---
# Hunting for Cobalt Strike Beacons
## Overview
Cobalt Strike is the most prevalent command-and-control framework used by both red teams and threat actors. Beacon, its primary payload, communicates with team servers using configurable HTTP/HTTPS/DNS profiles that can mimic legitimate traffic. However, default configurations and behavioral patterns remain detectable through TLS certificate analysis (default serial 8BB00EE), JA3/JA3S fingerprinting, beacon interval jitter analysis, and HTTP malleable profile pattern matching. This skill covers building detection capabilities using Zeek network logs, Suricata IDS rules, and Python-based PCAP analysis to identify beacon callbacks in network traffic.
## Prerequisites
- Zeek 6.0+ with JA3 and HASSH packages installed
- Suricata 7.0+ with Emerging Threats ruleset
- Python 3.9+ with scapy and dpkt libraries
- Network traffic captures (PCAP) or live Zeek logs
- RITA (Real Intelligence Threat Analytics) for beacon scoring
- Threat intelligence feeds with known Cobalt Strike IOCs
## Steps
### Step 1: TLS Certificate Analysis
Detect default Cobalt Strike certificates using JA3S fingerprints, certificate serial numbers, and JARM fingerprints in Zeek ssl.log.
### Step 2: Beacon Interval Analysis
Analyze connection timing patterns to identify regular callback intervals with configurable jitter, characteristic of beacon behavior.
### Step 3: HTTP Profile Detection
Match HTTP request patterns (URI paths, headers, user-agents) against known malleable C2 profiles.
### Step 4: Correlate and Score
Combine multiple indicators (TLS + timing + HTTP profile) into a composite beacon confidence score.
## Expected Output
JSON report containing detected beacon candidates with confidence scores, TLS fingerprints, timing analysis, HTTP profile matches, and recommended response actions.
@@ -0,0 +1,139 @@
# API Reference: Hunting for Cobalt Strike Beacons
## Cobalt Strike Default TLS Indicators
| Indicator | Value | Detection Confidence |
|-----------|-------|---------------------|
| Default cert serial | `8BB00EE` | 95% (unmodified teamserver) |
| Default cert issuer | `Major Cobalt Strike` | 95% |
| JA3S hash (Java TLS) | `ae4edc6faf64d08308082ad26be60767` | 80% |
| JA3S hash (alt) | `a0e9f5d64349fb13191bc781f81f42e1` | 80% |
| JARM fingerprint | `07d14d16d21d21d07c42d41d00041d24a458a375eef0c576d23a7bab9a9fb1` | 90% |
## Zeek Log Fields for Detection
### ssl.log Key Fields
| Field Index | Name | Use |
|-------------|------|-----|
| 0 | ts | Connection timestamp |
| 2 | id.orig_h | Source IP |
| 4 | id.resp_h | Destination IP (C2 server) |
| 5 | id.resp_p | Destination port |
| 20 | cert_chain_fps | Certificate serial number |
| 21 | ja3s | JA3S server fingerprint hash |
### conn.log Beacon Timing Fields
| Field Index | Name | Use |
|-------------|------|-----|
| 0 | ts | Connection epoch timestamp |
| 2 | id.orig_h | Beaconing host |
| 4 | id.resp_h | C2 destination |
| 5 | id.resp_p | C2 port |
| 8 | duration | Session length |
| 9 | orig_bytes | Bytes sent (check size) |
| 10 | resp_bytes | Bytes received (check size) |
## RITA Beacon Analysis
```bash
# Import Zeek logs into RITA
rita import /opt/zeek/logs/current rita_dataset
# Show beaconing connections ranked by score
rita show-beacons rita_dataset --human-readable
# Show long connections (persistent C2)
rita show-long-connections rita_dataset
# Export beacon results as CSV
rita show-beacons rita_dataset -H > beacons.csv
# Show DNS tunneling (alternate C2 channel)
rita show-exploded-dns rita_dataset
```
## Suricata Detection Rules
```yaml
# Detect default Cobalt Strike TLS certificate
alert tls any any -> any any (msg:"ET MALWARE Cobalt Strike Default Certificate"; \
tls.cert_serial; content:"8BB00EE"; sid:2029560; rev:3;)
# Detect known Cobalt Strike JA3S
alert tls any any -> any any (msg:"ET MALWARE Cobalt Strike JA3S"; \
ja3s.hash; content:"ae4edc6faf64d08308082ad26be60767"; sid:2029561; rev:2;)
# Detect Cobalt Strike default HTTP beacon URI
alert http any any -> any any (msg:"ET MALWARE CobaltStrike Beacon URI"; \
content:"GET"; http_method; pcre:"/^\/[a-zA-Z]{4}$/U"; sid:2029562; rev:1;)
# Detect Cobalt Strike named pipe (SMB beacon)
alert smb any any -> any any (msg:"ET MALWARE CobaltStrike Named Pipe"; \
content:"|MSRPC|"; content:"\\\\pipe\\\\"; content:"MSSE-"; sid:2029563; rev:1;)
```
## Malleable C2 Profile HTTP Indicators
| Pattern | URI Regex | Context |
|---------|-----------|---------|
| Default GET | `^/[a-zA-Z]{4}$` | 4-char alpha URI (e.g., /aGth) |
| submit.php | `^/submit\.php\?id=\d+$` | POST callback with numeric ID |
| Pixel tracking | `^/pixel\.(gif\|png)$` | Fake tracking pixel |
| UTM beacon | `^/__utm\.gif$` | Mimics Google Analytics |
| RSS feed | `^/updates\.(rss\|json)$` | Fake feed endpoint |
| JS beacon | `^/visit\.js$` | Fake JavaScript resource |
## Default User-Agent Strings
```
Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)
Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)
Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)
```
## Beacon Timing Analysis Formula
```python
# Jitter percentage calculation
intervals = [t[i+1] - t[i] for i in range(len(t) - 1)]
avg = sum(intervals) / len(intervals)
std = sqrt(sum((x - avg)**2 for x in intervals) / len(intervals))
jitter_pct = (std / avg) * 100
# Beacon score (0-100, higher = more likely beacon)
beacon_score = max(0, 1 - (jitter_pct / 100)) * 100
# Score >= 85 = critical, >= 60 = high suspicion
```
## JARM Scanner CLI
```bash
# Scan single host for JARM fingerprint
python3 jarm.py -p 443 suspicious-host.example.com
# Known Cobalt Strike JARM
# 07d14d16d21d21d07c42d41d00041d24a458a375eef0c576d23a7bab9a9fb1
# Compare against threat intel JARM database
python3 jarm.py -p 8443 10.0.0.50 | grep -f cs_jarm_list.txt
```
## MITRE ATT&CK Mapping
| Technique | ID | Beacon Indicator |
|-----------|----|-----------------|
| Application Layer Protocol | T1071.001 | HTTP/HTTPS beaconing pattern |
| Encrypted Channel | T1573.002 | Default TLS cert / JA3S match |
| Non-Standard Port | T1571 | HTTPS on 8080, 8443, 444 |
| Ingress Tool Transfer | T1105 | Large resp_bytes in beacon |
| Proxy | T1090 | Redirector infrastructure |
### References
- JARM Scanner: https://github.com/salesforce/jarm
- RITA: https://github.com/activecm/rita
- JA3/JA3S: https://github.com/salesforce/ja3
- Cobalt Strike Detection: https://thedfirreport.com
- MITRE T1071.001: https://attack.mitre.org/techniques/T1071/001/
@@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""Cobalt Strike Beacon Hunter - detects beacon signatures in network traffic and Zeek logs."""
import json
import argparse
import logging
import os
import re
import math
from collections import defaultdict
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
CS_DEFAULT_CERT_SERIAL = "8bb00ee"
CS_KNOWN_JA3S = [
"ae4edc6faf64d08308082ad26be60767",
"a0e9f5d64349fb13191bc781f81f42e1",
]
CS_KNOWN_JARM = "07d14d16d21d21d07c42d41d00041d24a458a375eef0c576d23a7bab9a9fb1"
def parse_zeek_ssl_log(ssl_log_path):
"""Parse Zeek ssl.log for default Cobalt Strike certificates."""
findings = []
try:
with open(ssl_log_path) as f:
for line in f:
if line.startswith("#"):
continue
fields = line.strip().split("\t")
if len(fields) < 22:
continue
ts, uid, src_ip, src_port, dst_ip, dst_port = fields[0], fields[1], fields[2], fields[3], fields[4], fields[5]
serial = fields[20] if len(fields) > 20 else ""
ja3s = fields[21] if len(fields) > 21 else ""
if serial.lower().replace(":", "") == CS_DEFAULT_CERT_SERIAL:
findings.append({
"indicator": "default_cs_certificate",
"src_ip": src_ip, "dst_ip": dst_ip, "dst_port": dst_port,
"cert_serial": serial, "timestamp": ts,
"severity": "critical", "confidence": 95,
})
if ja3s in CS_KNOWN_JA3S:
findings.append({
"indicator": "known_cs_ja3s",
"src_ip": src_ip, "dst_ip": dst_ip, "dst_port": dst_port,
"ja3s_hash": ja3s, "timestamp": ts,
"severity": "high", "confidence": 80,
})
except FileNotFoundError:
logger.warning("Zeek ssl.log not found: %s", ssl_log_path)
return findings
def analyze_beacon_timing(conn_log_path, min_connections=20, max_jitter_pct=25):
"""Analyze connection timing for beacon-like regular intervals."""
connections = defaultdict(list)
try:
with open(conn_log_path) as f:
for line in f:
if line.startswith("#"):
continue
fields = line.strip().split("\t")
if len(fields) < 7:
continue
ts = float(fields[0])
src_ip, dst_ip, dst_port = fields[2], fields[4], fields[5]
key = (src_ip, dst_ip, dst_port)
connections[key].append(ts)
except (FileNotFoundError, ValueError):
logger.warning("Zeek conn.log parse failed: %s", conn_log_path)
return []
beacons = []
for (src, dst, port), timestamps in connections.items():
if len(timestamps) < min_connections:
continue
timestamps.sort()
intervals = [timestamps[i + 1] - timestamps[i] for i in range(len(timestamps) - 1)]
if not intervals:
continue
avg_interval = sum(intervals) / len(intervals)
if avg_interval < 1:
continue
std_interval = math.sqrt(sum((x - avg_interval) ** 2 for x in intervals) / len(intervals))
jitter_pct = (std_interval / avg_interval) * 100 if avg_interval > 0 else 100
if jitter_pct <= max_jitter_pct:
beacon_score = round(max(0, 1 - (jitter_pct / 100)) * 100, 1)
if beacon_score >= 60:
beacons.append({
"indicator": "beacon_timing",
"src_ip": src, "dst_ip": dst, "dst_port": port,
"connections": len(timestamps),
"avg_interval_sec": round(avg_interval, 1),
"jitter_pct": round(jitter_pct, 1),
"beacon_score": beacon_score,
"severity": "critical" if beacon_score >= 85 else "high",
"confidence": int(beacon_score),
})
return sorted(beacons, key=lambda x: x["beacon_score"], reverse=True)
def check_http_profiles(http_log_path):
"""Detect known Cobalt Strike HTTP malleable C2 profile patterns."""
cs_uri_patterns = [
r"^/[a-zA-Z]{4}$", r"^/submit\.php\?id=\d+$", r"^/pixel\.(gif|png)$",
r"^/__utm\.gif$", r"^/updates\.(rss|json)$", r"^/visit\.js$",
]
cs_ua_patterns = [
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)",
]
findings = []
try:
with open(http_log_path) as f:
for line in f:
if line.startswith("#"):
continue
fields = line.strip().split("\t")
if len(fields) < 12:
continue
src_ip, dst_ip = fields[2], fields[4]
uri = fields[9] if len(fields) > 9 else ""
user_agent = fields[12] if len(fields) > 12 else ""
for pattern in cs_uri_patterns:
if re.match(pattern, uri):
findings.append({
"indicator": "cs_http_profile",
"src_ip": src_ip, "dst_ip": dst_ip,
"uri": uri, "user_agent": user_agent[:100],
"matched_pattern": pattern,
"severity": "high", "confidence": 60,
})
break
if user_agent in cs_ua_patterns:
findings.append({
"indicator": "cs_default_user_agent",
"src_ip": src_ip, "dst_ip": dst_ip,
"user_agent": user_agent,
"severity": "high", "confidence": 70,
})
except FileNotFoundError:
logger.warning("Zeek http.log not found: %s", http_log_path)
return findings
def generate_report(tls_findings, beacon_findings, http_findings):
all_findings = tls_findings + beacon_findings + http_findings
critical = sum(1 for f in all_findings if f.get("severity") == "critical")
by_dst = defaultdict(int)
for f in all_findings:
by_dst[f.get("dst_ip", "")] += 1
return {
"timestamp": datetime.utcnow().isoformat(),
"tls_certificate_hits": len(tls_findings),
"beacon_timing_detections": len(beacon_findings),
"http_profile_matches": len(http_findings),
"total_indicators": len(all_findings),
"critical_indicators": critical,
"top_suspect_destinations": dict(sorted(by_dst.items(), key=lambda x: x[1], reverse=True)[:10]),
"findings": all_findings[:30],
"cobalt_strike_likely": critical > 0,
}
def main():
parser = argparse.ArgumentParser(description="Cobalt Strike Beacon Hunting Agent")
parser.add_argument("--zeek-dir", required=True, help="Directory containing Zeek log files")
parser.add_argument("--min-connections", type=int, default=20, help="Minimum connections for beacon analysis")
parser.add_argument("--max-jitter", type=int, default=25, help="Maximum jitter percentage for beacon scoring")
parser.add_argument("--output", default="cobalt_strike_hunt_report.json")
args = parser.parse_args()
ssl_log = os.path.join(args.zeek_dir, "ssl.log")
conn_log = os.path.join(args.zeek_dir, "conn.log")
http_log = os.path.join(args.zeek_dir, "http.log")
tls_findings = parse_zeek_ssl_log(ssl_log)
beacon_findings = analyze_beacon_timing(conn_log, args.min_connections, args.max_jitter)
http_findings = check_http_profiles(http_log)
report = generate_report(tls_findings, beacon_findings, http_findings)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info("CS Hunt: %d TLS hits, %d beacons, %d HTTP matches, CS likely: %s",
len(tls_findings), len(beacon_findings), len(http_findings), report["cobalt_strike_likely"])
print(json.dumps(report, indent=2, default=str))
if __name__ == "__main__":
main()
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Mahipal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,43 @@
---
name: implementing-zero-trust-with-beyondcorp
description: Deploy Google BeyondCorp Enterprise zero trust access controls using Identity-Aware Proxy (IAP), context-aware access policies, device trust validation, and Access Context Manager to enforce identity and posture-based access to GCP resources and internal applications.
domain: cybersecurity
subdomain: zero-trust
tags: [zero-trust, beyondcorp, google-cloud, iap, context-aware-access, device-trust, identity]
version: "1.0"
author: mahipal
license: Apache-2.0
---
# Implementing Zero Trust with BeyondCorp
## Overview
Google BeyondCorp Enterprise implements the zero trust security model by eliminating the concept of a trusted network perimeter. Instead of relying on VPNs and network location, BeyondCorp authenticates and authorizes every request based on user identity, device posture, and contextual attributes. Identity-Aware Proxy (IAP) serves as the enforcement point, intercepting all requests to protected resources and evaluating them against Access Context Manager policies. This skill covers configuring IAP for web applications, defining access levels based on device trust and network attributes, and auditing access policies for compliance.
## Prerequisites
- Google Cloud project with BeyondCorp Enterprise license
- IAP API enabled (iap.googleapis.com)
- Access Context Manager API enabled (accesscontextmanager.googleapis.com)
- GCP resources to protect (Compute Engine, App Engine, or GKE services)
- Endpoint Verification deployed on managed devices
- Python 3.9+ with google-cloud-iap library
## Steps
### Step 1: Enable IAP on Target Resources
Configure Identity-Aware Proxy on Compute Engine, App Engine, or HTTPS load balancer backends.
### Step 2: Define Access Levels
Create Access Context Manager access levels based on IP ranges, device attributes (OS version, encryption, screen lock), and geographic location.
### Step 3: Bind Access Policies
Apply access levels as IAP conditions to enforce context-aware access decisions on protected resources.
### Step 4: Audit and Monitor
Query IAP audit logs, verify policy enforcement, and identify gaps in zero trust coverage.
## Expected Output
JSON report containing IAP-protected resources, access level definitions, policy binding audit results, and zero trust coverage metrics.
@@ -0,0 +1,92 @@
# API Reference: Implementing Zero Trust with BeyondCorp
## gcloud IAP Commands
```bash
# Enable IAP on backend service
gcloud iap web enable --resource-type=backend-services \
--service=my-backend --project=my-project
# Get IAP IAM policy
gcloud iap web get-iam-policy --project=my-project
# Grant IAP access with access level condition
gcloud iap web add-iam-policy-binding --project=my-project \
--member="group:team@example.com" \
--role="roles/iap.httpsResourceAccessor" \
--condition="expression=accessPolicies/123/accessLevels/corp_device,title=CorpDevice"
# Enable required APIs
gcloud services enable iap.googleapis.com
gcloud services enable accesscontextmanager.googleapis.com
gcloud services enable beyondcorp.googleapis.com
```
## Access Context Manager Commands
```bash
# Create access policy
gcloud access-context-manager policies create --organization=ORG_ID --title="Corp Policy"
# Create access level (device + IP)
gcloud access-context-manager levels create corp_trusted \
--policy=POLICY_ID --title="Corporate Trusted" \
--basic-level-spec=level_spec.yaml
# List access levels
gcloud access-context-manager levels list --policy=POLICY_ID --format=json
```
## Access Level Spec (YAML)
```yaml
conditions:
- ipSubnetworks:
- "10.0.0.0/8"
- "172.16.0.0/12"
devicePolicy:
requireScreenlock: true
osConstraints:
- osType: DESKTOP_WINDOWS
minimumVersion: "10.0.19041"
- osType: DESKTOP_MAC
minimumVersion: "12.0.0"
allowedEncryptionStatuses:
- ENCRYPTED
regions:
- "US"
- "GB"
```
## IAP Roles
| Role | Description |
|------|-------------|
| roles/iap.httpsResourceAccessor | Access IAP-protected resources |
| roles/iap.admin | Full IAP administration |
| roles/iap.settingsAdmin | Modify IAP settings |
| roles/iap.tunnelResourceAccessor | Access via IAP TCP tunneling |
## Python SDK
```python
from google.cloud import iap_v1
client = iap_v1.IdentityAwareProxyAdminServiceClient()
# List tunnel destinations
request = iap_v1.ListTunnelDestGroupsRequest(parent=f"projects/{project}/iap_tunnel/locations/-")
```
## Audit Log Query (Cloud Logging)
```
resource.type="gce_backend_service"
logName="projects/PROJECT/logs/cloudaudit.googleapis.com%2Fdata_access"
protoPayload.methodName="AuthorizeUser"
protoPayload.authenticationInfo.principalEmail!=""
```
### References
- BeyondCorp Enterprise: https://cloud.google.com/beyondcorp
- IAP Concepts: https://cloud.google.com/iap/docs/concepts-overview
- Access Context Manager: https://cloud.google.com/access-context-manager/docs
@@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""BeyondCorp Zero Trust Agent - audits IAP configuration, access levels, and policy bindings."""
import json
import argparse
import logging
import subprocess
from collections import defaultdict
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def gcloud_json(args_list):
"""Execute gcloud command and return JSON output."""
cmd = ["gcloud"] + args_list + ["--format=json"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
return json.loads(result.stdout) if result.returncode == 0 and result.stdout else []
def list_iap_protected_resources(project):
"""List resources protected by Identity-Aware Proxy."""
backends = gcloud_json(["compute", "backend-services", "list", "--project", project])
protected = []
for backend in backends:
iap = backend.get("iap", {})
protected.append({
"name": backend.get("name", ""),
"iap_enabled": iap.get("enabled", False),
"oauth2_client_id": bool(iap.get("oauth2ClientId", "")),
})
return protected
def get_access_levels(policy_name):
"""Retrieve Access Context Manager access levels."""
levels = gcloud_json(["access-context-manager", "levels", "list", "--policy", policy_name])
parsed = []
for level in levels:
basic = level.get("basic", {})
conditions = basic.get("conditions", [])
parsed.append({
"name": level.get("name", "").split("/")[-1],
"title": level.get("title", ""),
"combining_function": basic.get("combiningFunction", "AND"),
"condition_count": len(conditions),
"has_ip_restriction": any(c.get("ipSubnetworks") for c in conditions),
"has_device_policy": any(c.get("devicePolicy") for c in conditions),
"has_region_restriction": any(c.get("regions") for c in conditions),
})
return parsed
def audit_iap_iam_bindings(project):
"""Audit IAM bindings on IAP-protected resources."""
findings = []
cmd = ["gcloud", "iap", "web", "get-iam-policy", "--project", project, "--format=json"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return [{"issue": "Cannot retrieve IAP IAM policy", "severity": "high"}]
policy = json.loads(result.stdout) if result.stdout else {}
for binding in policy.get("bindings", []):
role = binding.get("role", "")
members = binding.get("members", [])
condition = binding.get("condition")
if role == "roles/iap.httpsResourceAccessor":
if "allUsers" in members or "allAuthenticatedUsers" in members:
findings.append({
"role": role, "issue": "Public access via allUsers/allAuthenticatedUsers",
"severity": "critical",
"recommendation": "Restrict to specific user/group identities",
})
if not condition:
findings.append({
"role": role, "members": members[:5],
"issue": "IAP binding without access level condition",
"severity": "high",
"recommendation": "Add access level condition for context-aware enforcement",
})
return findings
def audit_access_level_strength(access_levels):
"""Audit access levels for security strength."""
findings = []
for level in access_levels:
if not level["has_device_policy"]:
findings.append({
"access_level": level["name"],
"issue": "No device policy requirement",
"severity": "medium",
"recommendation": "Add device trust requirements (encryption, screen lock, OS version)",
})
if not level["has_ip_restriction"] and not level["has_region_restriction"]:
findings.append({
"access_level": level["name"],
"issue": "No network or geographic restriction",
"severity": "medium",
"recommendation": "Consider adding corporate IP range or geo restrictions",
})
if level["condition_count"] == 0:
findings.append({
"access_level": level["name"],
"issue": "Empty access level with no conditions",
"severity": "high",
})
return findings
def check_endpoint_verification(project):
"""Check Endpoint Verification deployment status."""
devices = gcloud_json(["endpoint-verification", "list", "--project", project])
total = len(devices)
compliant = sum(1 for d in devices if d.get("complianceState") == "COMPLIANT")
return {
"total_devices": total,
"compliant": compliant,
"non_compliant": total - compliant,
"compliance_rate": round(compliant / max(total, 1) * 100, 1),
}
def generate_report(protected, access_levels, iam_findings, level_findings, endpoint_status):
iap_enabled = sum(1 for r in protected if r["iap_enabled"])
all_findings = iam_findings + level_findings
return {
"timestamp": datetime.utcnow().isoformat(),
"framework": "BeyondCorp Enterprise / Zero Trust",
"iap_protected_resources": len(protected),
"iap_enabled_count": iap_enabled,
"iap_coverage": round(iap_enabled / max(len(protected), 1) * 100, 1),
"access_levels_defined": len(access_levels),
"access_level_details": access_levels,
"endpoint_verification": endpoint_status,
"iam_findings": iam_findings,
"access_level_findings": level_findings,
"total_findings": len(all_findings),
"critical_findings": sum(1 for f in all_findings if f.get("severity") == "critical"),
}
def main():
parser = argparse.ArgumentParser(description="BeyondCorp Zero Trust Audit Agent")
parser.add_argument("--project", required=True, help="GCP project ID")
parser.add_argument("--access-policy", required=True, help="Access Context Manager policy ID")
parser.add_argument("--output", default="beyondcorp_audit_report.json")
args = parser.parse_args()
protected = list_iap_protected_resources(args.project)
access_levels = get_access_levels(args.access_policy)
iam_findings = audit_iap_iam_bindings(args.project)
level_findings = audit_access_level_strength(access_levels)
endpoint_status = check_endpoint_verification(args.project)
report = generate_report(protected, access_levels, iam_findings, level_findings, endpoint_status)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info("BeyondCorp: %.1f%% IAP coverage, %d access levels, %d findings",
report["iap_coverage"], report["access_levels_defined"], report["total_findings"])
print(json.dumps(report, indent=2, default=str))
if __name__ == "__main__":
main()