Add skill: detecting-lateral-movement-with-zeek (fixes #5) (#29)

This commit is contained in:
Julio César Suástegui
2026-03-27 03:24:16 -06:00
committed by GitHub
parent 9314565dd9
commit 97c213f9a4
4 changed files with 378 additions and 10 deletions
@@ -3,11 +3,11 @@ name: detecting-lateral-movement-with-zeek
description: >
Detect lateral movement in network traffic using Zeek (formerly Bro) log
analysis. Parses conn.log, smb_mapping.log, smb_files.log, dce_rpc.log,
kerberos.log, and ntlm.log to identify SMB file transfers, Pass-the-Hash
activity, remote service execution, and anomalous internal connections.
kerberos.log, and ntlm.log to identify SMB file transfers, NTLM account
spray activity, remote service execution, and anomalous internal connections.
domain: cybersecurity
subdomain: network-security
tags: [zeek, lateral-movement, smb, dce-rpc, pass-the-hash, network-forensics]
tags: [zeek, lateral-movement, smb, dce-rpc, ntlm-spray, network-forensics]
version: "1.0"
author: mahipal
license: Apache-2.0
@@ -16,14 +16,14 @@ license: Apache-2.0
# Detecting Lateral Movement with Zeek
Analyze Zeek network logs to identify lateral movement techniques including
SMB admin share access, DCE/RPC remote service creation, Pass-the-Hash via
NTLM, Kerberos ticket anomalies, and large internal data transfers indicative
SMB admin share access, DCE/RPC remote service creation, NTLM account spray,
Kerberos ticket anomalies, and large internal data transfers indicative
of staging or exfiltration between hosts.
## When to Use
- Hunting for lateral movement after an initial compromise indicator is found on one endpoint
- Investigating suspected Pass-the-Hash or Pass-the-Ticket attacks across the internal network
- Investigating suspected NTLM account spray or Pass-the-Ticket attacks across the internal network
- Monitoring SMB traffic for unauthorized file transfers to admin shares (C$, ADMIN$, IPC$)
- Detecting remote service execution via DCE/RPC (PsExec, schtasks, WMI lateral patterns)
- Building alerting rules for internal network anomalies in a Zeek-based NSMP deployment
@@ -37,7 +37,7 @@ of staging or exfiltration between hosts.
- Zeek SMB analyzer enabled (loaded by default: `@load base/protocols/smb`)
- Zeek DCE/RPC analyzer enabled (`@load base/protocols/dce-rpc`)
- Zeek Kerberos analyzer enabled (`@load base/protocols/krb`)
- Python 3.8+ with `pandas` for log analysis
- Python 3.8+ (standard library only)
- Access to Zeek log directory (default: `/opt/zeek/logs/current/`)
- Familiarity with Zeek TSV log format (fields separated by `\t`, header lines prefixed with `#`)
@@ -97,6 +97,24 @@ zeek-cut ts id.orig_h id.resp_h action path name size \
| grep -i 'SMB::FILE_WRITE'
```
Deploy the following Zeek script to generate `notice.log` alerts on admin share access:
```zeek
@load base/protocols/smb
@load base/frameworks/notice
redef enum Notice::Type += {
Admin_Share_Access
};
event smb1_tree_connect_andx_request(c: connection, hdr: SMB1::Header, path: string, service: string) {
if ( /\$/ in path )
NOTICE([$note=Admin_Share_Access,
$msg=fmt("Admin share access: %s -> %s (%s)", c$id$orig_h, c$id$resp_h, path),
$conn=c]);
}
```
### Step 4: Detect DCE/RPC Remote Service Operations
Monitor for remote service creation and scheduled task registration via DCE/RPC:
@@ -108,9 +126,12 @@ zeek-cut ts id.orig_h id.resp_h endpoint operation \
| grep -iE '(svcctl|atsvc|ITaskSchedulerService)'
```
### Step 5: Detect Pass-the-Hash via NTLM
### Step 5: Detect NTLM Account Spray
Analyze ntlm.log for authentication anomalies indicating credential reuse:
Analyze ntlm.log for authentication anomalies indicating credential reuse.
Zeek's ntlm.log does not expose password hashes, so this detection identifies
a single account authenticating to many hosts in a short window — the network
signature of credential spraying tools like CrackMapExec:
```bash
# Extract NTLM authentications
@@ -121,6 +142,39 @@ zeek-cut ts id.orig_h id.resp_h username domainname server_nb_computer_name succ
zeek-cut ts id.orig_h id.resp_h username success \
< /opt/zeek/logs/current/ntlm.log \
| awk '$5 == "F"'
# Sort by timestamp for timeline analysis
zeek-cut ts id.orig_h id.resp_h username success \
< /opt/zeek/logs/current/ntlm.log \
| sort -k1,1
```
Deploy the following Zeek script to generate `notice.log` alerts when a single
account touches more hosts than the threshold in a rolling window:
```zeek
@load base/protocols/ntlm
@load base/frameworks/notice
redef enum Notice::Type += {
NTLM_Account_Spray
};
global ntlm_tracker: table[string] of set[addr] &create_expire=5min;
const spray_threshold = 3 &redef;
event ntlm_log(rec: NTLM::Info) {
if ( ! rec?$username || rec$username == "-" )
return;
if ( rec$username !in ntlm_tracker )
ntlm_tracker[rec$username] = set();
add ntlm_tracker[rec$username][rec$id$resp_h];
if ( |ntlm_tracker[rec$username]| >= spray_threshold )
NOTICE([$note=NTLM_Account_Spray,
$msg=fmt("NTLM account spray: %s -> %d hosts", rec$username, |ntlm_tracker[rec$username]|),
$sub=rec$username,
$conn=rec$id]);
}
```
### Step 6: Run the Automated Analysis Agent
@@ -137,6 +191,6 @@ python3 agent.py /opt/zeek/logs/2026-03-18/ # Analyze a specific date
- Confirm conn.log captures internal SMB (port 445) and DCE/RPC (port 135) connections with correct field parsing
- Verify smb_mapping.log correctly logs admin share paths (C$, ADMIN$, IPC$)
- Test with a known PsExec execution in a lab: expect to see SMB FILE_WRITE of the service binary followed by DCE/RPC svcctl CreateService
- Validate NTLM log parsing by performing a test authentication and confirming username, domain, and success fields are captured
- Validate NTLM log parsing by performing a test authentication and confirming username, domain, and success fields are captured; verify the NTLM Account Spray Zeek script generates a `notice.log` entry when the spray threshold is exceeded
- Cross-reference Zeek alerts with Sysmon Event ID 1 (Process Creation) on the target host to confirm end-to-end detection
- Verify the agent correctly handles both TSV and JSON Zeek log formats
@@ -0,0 +1,20 @@
# Standards & References
## MITRE ATT&CK — Lateral Movement (TA0008)
- **T1021.001** Remote Desktop Protocol
- **T1021.002** SMB/Windows Admin Shares
- **T1021.003** DCOM
- **T1021.006** Windows Remote Management
- **T1550.002** Pass the Hash
- **T1570** Lateral Tool Transfer
- **T1210** Exploitation of Remote Services
## Zeek Documentation
- [Zeek SMB Analyzer](https://docs.zeek.org/en/current/scripts/base/protocols/smb/)
- [Zeek DCE-RPC Analyzer](https://docs.zeek.org/en/current/scripts/base/protocols/dce-rpc/)
- [Zeek NTLM Analyzer](https://docs.zeek.org/en/current/scripts/base/protocols/ntlm/)
- [zeek-cut Reference](https://docs.zeek.org/en/current/auxil/zeek-cut/)
## Detection References
- SANS: Detecting Lateral Movement with Zeek
- Red Canary Threat Detection Report — Lateral Movement chapter
@@ -0,0 +1,145 @@
# Detection Workflow — Lateral Movement with Zeek
## Overview
This document describes the end-to-end workflow for detecting lateral movement using Zeek network logs, from data collection through investigation and response.
## Workflow Stages
### Stage 1: Data Collection
```
Network Traffic (Span/TAP)
Zeek Sensor
├── conn.log (all connections)
├── smb_mapping.log (SMB share access)
├── dce_rpc.log (DCE/RPC calls)
├── ntlm.log (NTLM authentication)
├── files.log (file transfers)
└── notice.log (Zeek-generated alerts)
```
**Requirements:**
- Zeek deployed on network tap/span port covering internal segments
- Protocol analyzers loaded: SMB, DCE/RPC, NTLM, RDP
- Log rotation configured (recommended: daily rotation, 90-day retention)
### Stage 2: Detection Rules
Apply detection logic via Zeek scripts and/or post-processing:
| Detection | Input Logs | Method |
|---|---|---|
| Admin Share Access | smb_mapping.log | Pattern match on `C$`, `ADMIN$`, `IPC$` |
| PsExec Execution | dce_rpc.log | Match `svcctl` endpoint + `CreateServiceW` |
| RDP Pivoting | conn.log | Graph analysis: host is both RDP client and server |
| NTLM Account Spray | ntlm.log | Same user from N+ distinct sources in time window |
| DCSync | dce_rpc.log | `drsuapi` endpoint + opnum 3 from non-DC |
| Tool Transfer | files.log | PE MIME type between internal hosts |
### Stage 3: Alert Triage
```
Detection Fires
┌─────────────────┐
│ Initial Triage │
│ │
│ 1. Is source a │
│ known admin │──Yes──▶ Log & reduce priority
│ workstation? │
│ │
│ 2. Is activity │
│ during change │──Yes──▶ Verify change ticket
│ window? │
│ │
│ 3. Multiple │
│ indicators? │──Yes──▶ ESCALATE immediately
└─────────────────┘
No match
Standard investigation
```
### Stage 4: Investigation
For each confirmed alert, follow the investigation checklist (see `assets/template.md`):
1. **Identify the source host**
- Query `conn.log` for all connections from the source in the alert timeframe
- Check `ntlm.log` for authentication patterns
- Look for preceding inbound connections (initial access vector)
2. **Map the movement chain**
```bash
# Build connection graph for suspect host
cat conn.log | zeek-cut id.orig_h id.resp_h id.resp_p | \
awk '$1 == "SUSPECT_IP" || $2 == "SUSPECT_IP"' | sort -u
```
3. **Identify transferred payloads**
```bash
# Find files transferred by suspect
cat files.log | zeek-cut tx_hosts rx_hosts filename mime_type total_bytes | \
grep "SUSPECT_IP"
```
4. **Check authentication anomalies**
```bash
# NTLM auth from suspect host
cat ntlm.log | zeek-cut ts id.orig_h username domainname success | \
grep "SUSPECT_IP"
```
5. **Timeline reconstruction**
- Correlate all log entries by timestamp
- Build a chronological sequence of events
- Identify initial compromise, lateral movement, and objectives
### Stage 5: Response
| Finding | Response Action |
|---|---|
| Confirmed lateral movement | Isolate affected hosts from network |
| NTLM Account Spray detected | Force password reset for compromised accounts |
| DCSync detected | Rotate krbtgt and affected credentials, audit DC access |
| Tool transfer identified | Extract and analyze transferred files |
| RDP pivot chain | Disable RDP on non-essential hosts, enforce NLA |
### Stage 6: Post-Incident
1. **Update baselines** — Add legitimate admin share usage to allowlists
2. **Tune detections** — Adjust thresholds based on false positive analysis
3. **Document findings** — Update incident report with Zeek evidence
4. **Improve coverage** — Deploy additional Zeek scripts for newly discovered TTPs
## Automation Integration
### SIEM Forwarding
```bash
# Forward Zeek logs to SIEM via syslog
# Add to local.zeek:
@load policy/tuning/json-logs.zeek
# Configure rsyslog/filebeat to ship JSON logs to SIEM
```
### SOAR Playbook Triggers
- Admin share access from non-admin workstation → Auto-isolate + ticket
- DCSync from non-DC → Emergency alert + auto-isolate
- NTLM Account Spray threshold exceeded → Auto-disable account + alert
## Continuous Improvement
- Review detection efficacy monthly
- Test with red team exercises quarterly
- Update MITRE ATT&CK mappings as new sub-techniques emerge
- Correlate Zeek findings with endpoint telemetry (EDR) for higher fidelity
@@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""Parse Zeek logs to detect lateral movement indicators.
Usage:
python process.py smb_mapping <log_file> [--internal-nets 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16]
python process.py conn <log_file>
python process.py ntlm <log_file> [--window 300]
python process.py dce_rpc <log_file> [--dc-ips 10.0.1.1,10.0.1.2]
"""
import csv
import sys
import ipaddress
from collections import defaultdict
DEFAULT_INTERNAL = ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]
def is_internal(ip_str, networks):
"""Check if IP is in internal networks."""
try:
ip = ipaddress.ip_address(ip_str)
return any(ip in net for net in networks)
except ValueError:
return False
def parse_internal_nets(nets_str):
"""Parse comma-separated CIDR networks."""
if not nets_str:
return [ipaddress.ip_network(n) for n in DEFAULT_INTERNAL]
return [ipaddress.ip_network(n.strip()) for n in nets_str.split(",")]
def parse_zeek_log(filepath):
"""Parse a Zeek TSV log file, skipping comment lines."""
rows = []
fields = []
with open(filepath) as f:
for line in f:
if line.startswith('#fields'):
fields = line.strip().split('\t')[1:]
elif not line.startswith('#'):
values = line.strip().split('\t')
if fields and len(values) == len(fields):
rows.append(dict(zip(fields, values)))
return rows
def detect_admin_shares(log_file, internal_nets):
"""Detect admin share access — only between internal hosts."""
networks = parse_internal_nets(internal_nets)
entries = parse_zeek_log(log_file)
for entry in entries:
src = entry.get('id.orig_h', '')
dst = entry.get('id.resp_h', '')
share = entry.get('path', '') or entry.get('share_type', '')
if not (is_internal(src, networks) and is_internal(dst, networks)):
continue
share_upper = share.upper()
if any(s in share_upper for s in ['ADMIN$', 'C$', 'IPC$']):
severity = "HIGH" if 'ADMIN$' in share_upper or 'C$' in share_upper else "MEDIUM"
print(f"[{severity}] ADMIN SHARE: {entry.get('ts', '')} {src} -> {dst} ({share})")
def detect_rdp_pivots(log_file, window_minutes=10):
"""Detect RDP pivot chains from conn.log."""
entries = parse_zeek_log(log_file)
rdp_sessions = [(float(e.get('ts', 0)), e.get('id.orig_h', ''), e.get('id.resp_h', ''))
for e in entries if e.get('id.resp_p') == '3389']
rdp_sessions.sort()
# Find chains: A->B then B->C within window
dst_arrivals = defaultdict(list)
for ts, src, dst in rdp_sessions:
dst_arrivals[dst].append((ts, src))
for ts, src, dst in rdp_sessions:
for arrival_ts, arrival_src in dst_arrivals.get(src, []):
if 0 < (ts - arrival_ts) < window_minutes * 60:
print(f"[HIGH] RDP PIVOT: {arrival_src} -> {src} -> {dst} (delta: {int(ts - arrival_ts)}s)")
def detect_ntlm_spray(log_file, window_seconds=300, threshold=3):
"""Detect NTLM account spray via time-windowed burst analysis."""
entries = parse_zeek_log(log_file)
user_events = defaultdict(list)
for entry in entries:
user = entry.get('username', '')
dst = entry.get('id.resp_h', '')
ts = float(entry.get('ts', 0))
if user and user != '-':
user_events[user].append((ts, dst))
for user, events in user_events.items():
events.sort()
# Sliding window analysis
for i, (ts_start, _) in enumerate(events):
window_hosts = set()
for j in range(i, len(events)):
ts_j, dst_j = events[j]
if ts_j - ts_start > window_seconds:
break
window_hosts.add(dst_j)
if len(window_hosts) >= threshold:
print(f"[CRITICAL] NTLM ACCOUNT SPRAY: {user} authenticated to {len(window_hosts)} "
f"hosts within {window_seconds}s: {', '.join(sorted(window_hosts))}")
break # One alert per user
def detect_dcsync(log_file, dc_ips=None):
"""Detect DCSync attacks via DRS replication calls — requires DC IPs."""
if not dc_ips:
print("[WARN] DCSync detection skipped: --dc-ips not provided. "
"Specify domain controller IPs to enable this detector.")
return
dc_set = set(dc_ips.split(","))
entries = parse_zeek_log(log_file)
for entry in entries:
src = entry.get('id.orig_h', '')
dst = entry.get('id.resp_h', '')
operation = entry.get('operation', '')
if dst in dc_set and src not in dc_set:
if 'DrsReplicaAdd' in operation or 'DrsGetNCChanges' in operation:
print(f"[CRITICAL] DCSYNC: {src} -> {dst} ({operation})")
if __name__ == "__main__":
if len(sys.argv) < 3:
print(__doc__)
sys.exit(1)
log_type, log_file = sys.argv[1], sys.argv[2]
# Parse optional args
args = {sys.argv[i]: sys.argv[i+1] for i in range(3, len(sys.argv)-1, 2) if sys.argv[i].startswith('--')}
if log_type == "smb_mapping":
detect_admin_shares(log_file, args.get('--internal-nets'))
elif log_type == "conn":
detect_rdp_pivots(log_file, int(args.get('--window', 10)))
elif log_type == "ntlm":
detect_ntlm_spray(log_file, int(args.get('--window', 300)), int(args.get('--threshold', 3)))
elif log_type == "dce_rpc":
detect_dcsync(log_file, args.get('--dc-ips'))
else:
print(f"Unknown log type: {log_type}")
sys.exit(1)