mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
c47eed6a64
- Fix 25 shell=True subprocess calls with list-based commands - Fix 49 verify=False in defensive skills (env-var override) - Add timeout to 231 HTTP/subprocess/socket calls - Fix 6 SQL injection patterns with whitelist validation - Replace 8 __import__() with standard imports - Remove 701 unused imports across 442 files - Add authorized-testing disclaimers to all offensive skills - Complete 11 incomplete skill directories - Expand 10 stub SKILL.md files with full content - Fix 2 YAML parse errors in frontmatter - Fix 5 pre-existing syntax errors - Convert 22 hardcoded paths/ports to environment variables - Back up 21 redundant skill pairs to .bak - Fix 2 global declaration errors - 724/724 skills with full folder anatomy (SKILL.md + agent.py + api-reference.md + LICENSE) - 0 compile errors across all 724 agent.py files
176 lines
7.1 KiB
Python
176 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Agent for auditing and securing AWS Lambda execution roles."""
|
|
|
|
import boto3
|
|
import json
|
|
import argparse
|
|
|
|
|
|
def list_lambda_roles(region="us-east-1"):
|
|
"""List all Lambda functions and their execution roles."""
|
|
lam = boto3.client("lambda", region_name=region)
|
|
iam = boto3.client("iam")
|
|
functions = []
|
|
paginator = lam.get_paginator("list_functions")
|
|
for page in paginator.paginate():
|
|
for func in page["Functions"]:
|
|
role_arn = func["Role"]
|
|
role_name = role_arn.split("/")[-1]
|
|
functions.append({
|
|
"function_name": func["FunctionName"],
|
|
"runtime": func.get("Runtime", "N/A"),
|
|
"role_name": role_name,
|
|
"role_arn": role_arn,
|
|
})
|
|
print(f" {func['FunctionName']} -> {role_name} ({func.get('Runtime', 'N/A')})")
|
|
print(f"\n[*] Total Lambda functions: {len(functions)}")
|
|
return functions
|
|
|
|
|
|
def audit_role_permissions(role_name):
|
|
"""Analyze attached and inline policies for a Lambda execution role."""
|
|
iam = boto3.client("iam")
|
|
findings = []
|
|
attached = iam.list_attached_role_policies(RoleName=role_name)["AttachedPolicies"]
|
|
broad_policies = ["AdministratorAccess", "PowerUserAccess", "AmazonS3FullAccess",
|
|
"AmazonDynamoDBFullAccess", "AmazonSQSFullAccess"]
|
|
for pol in attached:
|
|
if pol["PolicyName"] in broad_policies:
|
|
findings.append({
|
|
"type": "OVERPRIVILEGED_MANAGED_POLICY", "severity": "CRITICAL",
|
|
"role": role_name, "policy": pol["PolicyName"],
|
|
"detail": f"Broad managed policy '{pol['PolicyName']}' attached to Lambda role",
|
|
})
|
|
print(f" [!] CRITICAL: {role_name} has {pol['PolicyName']}")
|
|
|
|
inline_names = iam.list_role_policies(RoleName=role_name)["PolicyNames"]
|
|
for pol_name in inline_names:
|
|
doc = iam.get_role_policy(RoleName=role_name, PolicyName=pol_name)["PolicyDocument"]
|
|
for stmt in doc.get("Statement", []):
|
|
if stmt.get("Effect") != "Allow":
|
|
continue
|
|
actions = stmt.get("Action", [])
|
|
resources = stmt.get("Resource", [])
|
|
if isinstance(actions, str):
|
|
actions = [actions]
|
|
if isinstance(resources, str):
|
|
resources = [resources]
|
|
wildcard_actions = [a for a in actions if a.endswith(":*") or a == "*"]
|
|
if wildcard_actions and "*" in resources:
|
|
findings.append({
|
|
"type": "WILDCARD_POLICY", "severity": "HIGH",
|
|
"role": role_name, "policy": pol_name,
|
|
"detail": f"Wildcard actions {wildcard_actions} on Resource '*'",
|
|
})
|
|
print(f" [!] HIGH: {role_name}/{pol_name} has wildcard actions on *")
|
|
return findings
|
|
|
|
|
|
def check_permission_boundary(role_name):
|
|
"""Check if a Lambda execution role has a permission boundary."""
|
|
iam = boto3.client("iam")
|
|
role = iam.get_role(RoleName=role_name)["Role"]
|
|
boundary = role.get("PermissionsBoundary")
|
|
if boundary:
|
|
print(f" [OK] {role_name} has boundary: {boundary['PermissionsBoundaryArn']}")
|
|
return True
|
|
else:
|
|
print(f" [!] {role_name} has NO permission boundary")
|
|
return False
|
|
|
|
|
|
def check_trust_policy(role_name):
|
|
"""Validate the trust policy for confused deputy prevention."""
|
|
iam = boto3.client("iam")
|
|
role = iam.get_role(RoleName=role_name)["Role"]
|
|
trust_doc = role["AssumeRolePolicyDocument"]
|
|
findings = []
|
|
for stmt in trust_doc.get("Statement", []):
|
|
conditions = stmt.get("Condition", {})
|
|
principal = stmt.get("Principal", {})
|
|
service = principal.get("Service", "")
|
|
if service == "lambda.amazonaws.com" and not conditions:
|
|
findings.append({
|
|
"type": "MISSING_TRUST_CONDITION", "severity": "MEDIUM",
|
|
"role": role_name,
|
|
"detail": "Trust policy lacks aws:SourceAccount or aws:SourceArn condition",
|
|
})
|
|
print(f" [!] MEDIUM: {role_name} trust policy lacks confused deputy prevention")
|
|
return findings
|
|
|
|
|
|
def validate_with_access_analyzer(policy_document, region="us-east-1"):
|
|
"""Validate a policy document using IAM Access Analyzer."""
|
|
aa = boto3.client("accessanalyzer", region_name=region)
|
|
response = aa.validate_policy(
|
|
policyDocument=json.dumps(policy_document),
|
|
policyType="IDENTITY_POLICY",
|
|
)
|
|
findings = response.get("findings", [])
|
|
for f in findings:
|
|
print(f" [{f['findingType']}] {f['issueCode']}: {f.get('findingDetails', '')}")
|
|
return findings
|
|
|
|
|
|
def full_audit(region="us-east-1"):
|
|
"""Run a complete audit of all Lambda execution roles."""
|
|
print("[*] Starting Lambda execution role audit...")
|
|
functions = list_lambda_roles(region)
|
|
all_findings = []
|
|
audited_roles = set()
|
|
|
|
for func in functions:
|
|
role_name = func["role_name"]
|
|
if role_name in audited_roles:
|
|
continue
|
|
audited_roles.add(role_name)
|
|
print(f"\n[*] Auditing role: {role_name}")
|
|
all_findings.extend(audit_role_permissions(role_name))
|
|
all_findings.extend(check_trust_policy(role_name))
|
|
has_boundary = check_permission_boundary(role_name)
|
|
if not has_boundary:
|
|
all_findings.append({
|
|
"type": "NO_PERMISSION_BOUNDARY", "severity": "MEDIUM",
|
|
"role": role_name, "detail": "No permission boundary applied",
|
|
})
|
|
|
|
critical = sum(1 for f in all_findings if f["severity"] == "CRITICAL")
|
|
high = sum(1 for f in all_findings if f["severity"] == "HIGH")
|
|
medium = sum(1 for f in all_findings if f["severity"] == "MEDIUM")
|
|
print(f"\n[*] Audit complete: {len(all_findings)} findings "
|
|
f"(Critical: {critical}, High: {high}, Medium: {medium})")
|
|
return all_findings
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="AWS Lambda Execution Role Security Agent")
|
|
parser.add_argument("action", choices=["list", "audit-role", "full-audit", "check-boundary",
|
|
"check-trust", "validate-policy"])
|
|
parser.add_argument("--role", help="Role name to audit")
|
|
parser.add_argument("--region", default="us-east-1")
|
|
parser.add_argument("--policy-file", help="Policy JSON file to validate")
|
|
parser.add_argument("-o", "--output", default="lambda_role_audit.json")
|
|
args = parser.parse_args()
|
|
|
|
if args.action == "list":
|
|
list_lambda_roles(args.region)
|
|
elif args.action == "audit-role":
|
|
audit_role_permissions(args.role)
|
|
elif args.action == "full-audit":
|
|
findings = full_audit(args.region)
|
|
with open(args.output, "w") as f:
|
|
json.dump(findings, f, indent=2, default=str)
|
|
print(f"[*] Report saved to {args.output}")
|
|
elif args.action == "check-boundary":
|
|
check_permission_boundary(args.role)
|
|
elif args.action == "check-trust":
|
|
check_trust_policy(args.role)
|
|
elif args.action == "validate-policy":
|
|
with open(args.policy_file) as f:
|
|
doc = json.load(f)
|
|
validate_with_access_analyzer(doc, args.region)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|