mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
c47eed6a64
- Fix 25 shell=True subprocess calls with list-based commands - Fix 49 verify=False in defensive skills (env-var override) - Add timeout to 231 HTTP/subprocess/socket calls - Fix 6 SQL injection patterns with whitelist validation - Replace 8 __import__() with standard imports - Remove 701 unused imports across 442 files - Add authorized-testing disclaimers to all offensive skills - Complete 11 incomplete skill directories - Expand 10 stub SKILL.md files with full content - Fix 2 YAML parse errors in frontmatter - Fix 5 pre-existing syntax errors - Convert 22 hardcoded paths/ports to environment variables - Back up 21 redundant skill pairs to .bak - Fix 2 global declaration errors - 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE) - 0 compile errors across all 724 agent.py files
306 lines
11 KiB
Python
306 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Kubernetes Pod Security Admission audit agent.
|
|
|
|
Audits Kubernetes namespaces and workloads for Pod Security Standards (PSS)
|
|
compliance using kubectl. Checks namespace labels for enforce/audit/warn
|
|
modes, analyzes pod specs against Baseline and Restricted profiles, and
|
|
reports violations.
|
|
"""
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
def run_kubectl(args_list, timeout=60):
|
|
"""Execute a kubectl command and return parsed JSON output."""
|
|
cmd = ["kubectl"] + args_list + ["-o", "json"]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
|
if result.returncode != 0:
|
|
print(f"[!] kubectl error: {result.stderr[:200]}", file=sys.stderr)
|
|
return None
|
|
try:
|
|
return json.loads(result.stdout)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def get_namespaces():
|
|
"""Get all namespaces with their PSA labels."""
|
|
print("[*] Fetching namespaces...")
|
|
data = run_kubectl(["get", "namespaces"])
|
|
if not data:
|
|
return []
|
|
namespaces = []
|
|
for ns in data.get("items", []):
|
|
name = ns.get("metadata", {}).get("name", "")
|
|
labels = ns.get("metadata", {}).get("labels", {})
|
|
psa_enforce = labels.get("pod-security.kubernetes.io/enforce", "")
|
|
psa_audit = labels.get("pod-security.kubernetes.io/audit", "")
|
|
psa_warn = labels.get("pod-security.kubernetes.io/warn", "")
|
|
psa_enforce_version = labels.get("pod-security.kubernetes.io/enforce-version", "")
|
|
namespaces.append({
|
|
"name": name,
|
|
"enforce": psa_enforce,
|
|
"audit": psa_audit,
|
|
"warn": psa_warn,
|
|
"enforce_version": psa_enforce_version,
|
|
"has_psa": bool(psa_enforce or psa_audit or psa_warn),
|
|
})
|
|
print(f"[+] Found {len(namespaces)} namespaces")
|
|
return namespaces
|
|
|
|
|
|
def audit_namespace_psa(namespaces):
|
|
"""Audit PSA label configuration across namespaces."""
|
|
findings = []
|
|
system_ns = {"kube-system", "kube-public", "kube-node-lease", "default"}
|
|
|
|
for ns in namespaces:
|
|
name = ns["name"]
|
|
if name in system_ns:
|
|
continue
|
|
|
|
if not ns["has_psa"]:
|
|
findings.append({
|
|
"namespace": name,
|
|
"check": "No PSA labels configured",
|
|
"severity": "HIGH",
|
|
"recommendation": (
|
|
f"Apply PSA labels: kubectl label namespace {name} "
|
|
f"pod-security.kubernetes.io/enforce=baseline "
|
|
f"pod-security.kubernetes.io/warn=restricted"
|
|
),
|
|
})
|
|
elif ns["enforce"] == "privileged":
|
|
findings.append({
|
|
"namespace": name,
|
|
"check": "PSA enforce set to privileged (permissive)",
|
|
"severity": "HIGH",
|
|
"recommendation": "Upgrade to baseline or restricted enforcement",
|
|
})
|
|
elif ns["enforce"] == "baseline" and not ns["warn"]:
|
|
findings.append({
|
|
"namespace": name,
|
|
"check": "Enforce baseline but no warn=restricted",
|
|
"severity": "MEDIUM",
|
|
"recommendation": (
|
|
f"Add warn label: kubectl label namespace {name} "
|
|
f"pod-security.kubernetes.io/warn=restricted"
|
|
),
|
|
})
|
|
elif ns["enforce"] == "restricted":
|
|
findings.append({
|
|
"namespace": name,
|
|
"check": "PSA enforce=restricted (most secure)",
|
|
"severity": "INFO",
|
|
})
|
|
|
|
return findings
|
|
|
|
|
|
def audit_pod_security(namespace=None):
|
|
"""Audit pods for security spec violations against PSS profiles."""
|
|
findings = []
|
|
cmd = ["get", "pods", "--all-namespaces"] if not namespace else ["get", "pods", "-n", namespace]
|
|
data = run_kubectl(cmd)
|
|
if not data:
|
|
return findings
|
|
|
|
for pod in data.get("items", []):
|
|
pod_name = pod.get("metadata", {}).get("name", "")
|
|
pod_ns = pod.get("metadata", {}).get("namespace", "")
|
|
spec = pod.get("spec", {})
|
|
|
|
# Check host namespaces
|
|
if spec.get("hostNetwork"):
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name,
|
|
"check": "hostNetwork enabled",
|
|
"severity": "CRITICAL", "profile": "baseline",
|
|
})
|
|
if spec.get("hostPID"):
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name,
|
|
"check": "hostPID enabled",
|
|
"severity": "CRITICAL", "profile": "baseline",
|
|
})
|
|
if spec.get("hostIPC"):
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name,
|
|
"check": "hostIPC enabled",
|
|
"severity": "HIGH", "profile": "baseline",
|
|
})
|
|
|
|
containers = spec.get("containers", []) + spec.get("initContainers", [])
|
|
for container in containers:
|
|
c_name = container.get("name", "")
|
|
sec_ctx = container.get("securityContext", {})
|
|
|
|
# Privileged container
|
|
if sec_ctx.get("privileged"):
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name, "container": c_name,
|
|
"check": "Privileged container",
|
|
"severity": "CRITICAL", "profile": "baseline",
|
|
})
|
|
|
|
# Run as root
|
|
if sec_ctx.get("runAsUser") == 0:
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name, "container": c_name,
|
|
"check": "Running as root (UID 0)",
|
|
"severity": "HIGH", "profile": "restricted",
|
|
})
|
|
|
|
# Missing runAsNonRoot
|
|
if not sec_ctx.get("runAsNonRoot"):
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name, "container": c_name,
|
|
"check": "runAsNonRoot not set",
|
|
"severity": "MEDIUM", "profile": "restricted",
|
|
})
|
|
|
|
# Writable root filesystem
|
|
if not sec_ctx.get("readOnlyRootFilesystem"):
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name, "container": c_name,
|
|
"check": "Root filesystem not read-only",
|
|
"severity": "MEDIUM", "profile": "restricted",
|
|
})
|
|
|
|
# Privilege escalation
|
|
if sec_ctx.get("allowPrivilegeEscalation", True):
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name, "container": c_name,
|
|
"check": "allowPrivilegeEscalation not disabled",
|
|
"severity": "MEDIUM", "profile": "restricted",
|
|
})
|
|
|
|
# Dangerous capabilities
|
|
caps = sec_ctx.get("capabilities", {})
|
|
added = caps.get("add", [])
|
|
dangerous_caps = {"SYS_ADMIN", "NET_ADMIN", "SYS_PTRACE", "ALL",
|
|
"NET_RAW", "SYS_RAWIO", "SYS_MODULE"}
|
|
for cap in added:
|
|
if cap.upper() in dangerous_caps:
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name, "container": c_name,
|
|
"check": f"Dangerous capability added: {cap}",
|
|
"severity": "CRITICAL" if cap.upper() in ("SYS_ADMIN", "ALL") else "HIGH",
|
|
"profile": "baseline",
|
|
})
|
|
|
|
# Drop ALL capabilities check (restricted)
|
|
dropped = [c.upper() for c in caps.get("drop", [])]
|
|
if "ALL" not in dropped:
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name, "container": c_name,
|
|
"check": "Capabilities not dropping ALL",
|
|
"severity": "LOW", "profile": "restricted",
|
|
})
|
|
|
|
# Host ports
|
|
for port in container.get("ports", []):
|
|
if port.get("hostPort"):
|
|
findings.append({
|
|
"namespace": pod_ns, "pod": pod_name, "container": c_name,
|
|
"check": f"hostPort exposed: {port['hostPort']}",
|
|
"severity": "HIGH", "profile": "baseline",
|
|
})
|
|
|
|
return findings
|
|
|
|
|
|
def format_summary(ns_findings, pod_findings, namespaces):
|
|
"""Print audit summary."""
|
|
print(f"\n{'='*60}")
|
|
print(f" Pod Security Admission Audit Report")
|
|
print(f"{'='*60}")
|
|
print(f" Namespaces : {len(namespaces)}")
|
|
psa_configured = sum(1 for ns in namespaces if ns["has_psa"])
|
|
print(f" PSA Configured : {psa_configured}/{len(namespaces)}")
|
|
print(f" NS Findings : {len(ns_findings)}")
|
|
print(f" Pod Findings : {len(pod_findings)}")
|
|
|
|
all_findings = ns_findings + pod_findings
|
|
severity_counts = {}
|
|
for f in all_findings:
|
|
sev = f.get("severity", "INFO")
|
|
severity_counts[sev] = severity_counts.get(sev, 0) + 1
|
|
|
|
print(f"\n By Severity:")
|
|
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
|
|
count = severity_counts.get(sev, 0)
|
|
if count > 0:
|
|
print(f" {sev:10s}: {count}")
|
|
|
|
print(f"\n Namespace PSA Status:")
|
|
for ns in namespaces:
|
|
if ns["name"] in ("kube-system", "kube-public", "kube-node-lease"):
|
|
continue
|
|
status = ns["enforce"] or "none"
|
|
print(f" {ns['name']:30s}: enforce={status:12s} audit={ns['audit'] or 'none':12s}")
|
|
|
|
if pod_findings:
|
|
print(f"\n Top Pod Violations:")
|
|
for f in pod_findings[:15]:
|
|
if f["severity"] in ("CRITICAL", "HIGH"):
|
|
print(f" [{f['severity']:8s}] {f.get('namespace', '')}/"
|
|
f"{f.get('pod', '')}: {f['check']}")
|
|
|
|
return severity_counts
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Kubernetes Pod Security Admission audit agent"
|
|
)
|
|
parser.add_argument("--namespace", "-n", help="Specific namespace to audit (default: all)")
|
|
parser.add_argument("--skip-pods", action="store_true", help="Only audit namespace labels")
|
|
parser.add_argument("--kubeconfig", help="Path to kubeconfig file")
|
|
parser.add_argument("--context", help="Kubernetes context to use")
|
|
parser.add_argument("--output", "-o", help="Output JSON report path")
|
|
parser.add_argument("--verbose", "-v", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
if args.kubeconfig:
|
|
os.environ["KUBECONFIG"] = args.kubeconfig
|
|
|
|
namespaces = get_namespaces()
|
|
ns_findings = audit_namespace_psa(namespaces)
|
|
|
|
pod_findings = []
|
|
if not args.skip_pods:
|
|
pod_findings = audit_pod_security(args.namespace)
|
|
|
|
severity_counts = format_summary(ns_findings, pod_findings, namespaces)
|
|
|
|
report = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"tool": "PSA Audit",
|
|
"namespaces": namespaces,
|
|
"namespace_findings": ns_findings,
|
|
"pod_findings": pod_findings,
|
|
"severity_counts": severity_counts,
|
|
"risk_level": (
|
|
"CRITICAL" if severity_counts.get("CRITICAL", 0) > 0
|
|
else "HIGH" if severity_counts.get("HIGH", 0) > 0
|
|
else "MEDIUM" if severity_counts.get("MEDIUM", 0) > 0
|
|
else "LOW"
|
|
),
|
|
}
|
|
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"\n[+] Report saved to {args.output}")
|
|
elif args.verbose:
|
|
print(json.dumps(report, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|