Files
Anthropic-Cybersecurity-Skills/skills/implementing-aws-nitro-enclave-security/scripts/agent.py
T

511 lines
21 KiB
Python

#!/usr/bin/env python3
# For authorized cloud security assessments only
"""AWS Nitro Enclave Security Agent - Validates enclave attestation, audits KMS policies, and verifies enclave isolation."""
import argparse
import base64
import hashlib
import json
import logging
import socket
import struct
import sys
from datetime import datetime, timezone
try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
print("ERROR: boto3 required. Install with: pip install boto3")
sys.exit(1)
try:
import cbor2
except ImportError:
cbor2 = None
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def get_nitro_instances(ec2_client, region):
"""Find EC2 instances with Nitro Enclave support enabled."""
findings = []
paginator = ec2_client.get_paginator("describe_instances")
for page in paginator.paginate(
Filters=[{"Name": "enclave-options.enabled", "Values": ["true"]}]
):
for reservation in page["Reservations"]:
for instance in reservation["Instances"]:
instance_info = {
"instance_id": instance["InstanceId"],
"instance_type": instance["InstanceType"],
"state": instance["State"]["Name"],
"enclave_enabled": True,
"iam_role": None,
"launch_time": instance.get("LaunchTime", "").isoformat() if instance.get("LaunchTime") else None,
"region": region,
}
if instance.get("IamInstanceProfile"):
instance_info["iam_role"] = instance["IamInstanceProfile"]["Arn"]
findings.append(instance_info)
logger.info("Found %d Nitro Enclave-enabled instances in %s", len(findings), region)
return findings
def audit_kms_key_policy(kms_client, key_id):
"""Audit a KMS key policy for Nitro Enclave attestation conditions."""
result = {
"key_id": key_id,
"has_attestation_condition": False,
"pcr_conditions": [],
"image_sha_condition": False,
"allowed_principals": [],
"allowed_actions": [],
"issues": [],
}
try:
key_meta = kms_client.describe_key(KeyId=key_id)
result["key_arn"] = key_meta["KeyMetadata"]["Arn"]
result["key_state"] = key_meta["KeyMetadata"]["KeyState"]
result["key_usage"] = key_meta["KeyMetadata"]["KeyUsage"]
policy_json = kms_client.get_key_policy(KeyId=key_id, PolicyName="default")["Policy"]
policy = json.loads(policy_json)
for statement in policy.get("Statement", []):
principals = statement.get("Principal", {})
actions = statement.get("Action", [])
if isinstance(actions, str):
actions = [actions]
conditions = statement.get("Condition", {})
for action in actions:
if action not in result["allowed_actions"]:
result["allowed_actions"].append(action)
if isinstance(principals, dict) and "AWS" in principals:
aws_principals = principals["AWS"]
if isinstance(aws_principals, str):
aws_principals = [aws_principals]
result["allowed_principals"].extend(aws_principals)
# Check for attestation conditions
for operator_key, operator_conditions in conditions.items():
for cond_key, cond_value in operator_conditions.items():
if "RecipientAttestation" in cond_key:
result["has_attestation_condition"] = True
if "ImageSha384" in cond_key:
result["image_sha_condition"] = True
result["pcr_conditions"].append({
"type": "ImageSha384 (PCR0)",
"operator": operator_key,
"value": cond_value[:32] + "..." if len(str(cond_value)) > 32 else cond_value,
})
elif "PCR" in cond_key:
pcr_id = cond_key.split(":")[-1]
result["pcr_conditions"].append({
"type": pcr_id,
"operator": operator_key,
"value": cond_value[:32] + "..." if len(str(cond_value)) > 32 else cond_value,
})
# Check for missing attestation on decrypt actions
has_decrypt = any("Decrypt" in a or "GenerateDataKey" in a for a in actions)
if has_decrypt and not any("RecipientAttestation" in str(conditions)):
if statement.get("Effect") == "Allow":
result["issues"].append(
f"Statement '{statement.get('Sid', 'unnamed')}' allows Decrypt/GenerateDataKey "
f"without kms:RecipientAttestation condition - parent instance can decrypt directly"
)
if not result["has_attestation_condition"]:
result["issues"].append(
"KMS key policy has no RecipientAttestation conditions - "
"decryption is not restricted to verified enclaves"
)
except ClientError as e:
result["issues"].append(f"Error accessing key: {e.response['Error']['Message']}")
return result
def audit_iam_role_for_enclave(iam_client, role_name):
"""Check if an IAM role has appropriate permissions for enclave operations."""
result = {
"role_name": role_name,
"has_kms_permissions": False,
"kms_actions": [],
"has_ec2_enclave_permissions": False,
"overprivileged": False,
"issues": [],
}
try:
# Check attached policies
attached = iam_client.list_attached_role_policies(RoleName=role_name)
for policy in attached["AttachedPolicies"]:
if policy["PolicyName"] == "AdministratorAccess":
result["overprivileged"] = True
result["issues"].append(
"Role has AdministratorAccess - violates least privilege for enclave workloads"
)
policy_version = iam_client.get_policy(PolicyArn=policy["PolicyArn"])
version_id = policy_version["Policy"]["DefaultVersionId"]
policy_doc = iam_client.get_policy_version(
PolicyArn=policy["PolicyArn"], VersionId=version_id
)
for stmt in policy_doc["PolicyVersion"]["Document"].get("Statement", []):
actions = stmt.get("Action", [])
if isinstance(actions, str):
actions = [actions]
for action in actions:
if "kms:" in action:
result["has_kms_permissions"] = True
result["kms_actions"].append(action)
if action in ("kms:*", "*"):
result["overprivileged"] = True
result["issues"].append(
f"Role has wildcard KMS permissions ({action}) - should restrict to specific keys"
)
# Check inline policies
inline = iam_client.list_role_policies(RoleName=role_name)
for policy_name in inline["PolicyNames"]:
policy_doc = iam_client.get_role_policy(RoleName=role_name, PolicyName=policy_name)
for stmt in policy_doc["PolicyDocument"].get("Statement", []):
actions = stmt.get("Action", [])
if isinstance(actions, str):
actions = [actions]
resources = stmt.get("Resource", [])
if isinstance(resources, str):
resources = [resources]
for action in actions:
if "kms:" in action:
result["has_kms_permissions"] = True
result["kms_actions"].append(action)
if "*" in resources:
result["issues"].append(
f"Inline policy '{policy_name}' uses wildcard Resource - restrict to specific KMS key ARNs"
)
if not result["has_kms_permissions"]:
result["issues"].append("Role has no KMS permissions - cannot perform enclave-side decryption")
except ClientError as e:
result["issues"].append(f"Error auditing role: {e.response['Error']['Message']}")
return result
def check_enclave_allocator_config(instance_id, ssm_client):
"""Check enclave allocator configuration via SSM (if available)."""
result = {
"instance_id": instance_id,
"allocator_configured": False,
"memory_mib": None,
"cpu_count": None,
"issues": [],
}
try:
response = ssm_client.send_command(
InstanceIds=[instance_id],
DocumentName="AWS-RunShellScript",
Parameters={
"commands": ["cat /etc/nitro_enclaves/allocator.yaml 2>/dev/null || echo 'NOT_FOUND'"]
},
)
command_id = response["Command"]["CommandId"]
import time
time.sleep(3)
output = ssm_client.get_command_invocation(
CommandId=command_id, InstanceId=instance_id
)
stdout = output.get("StandardOutputContent", "")
if "NOT_FOUND" in stdout:
result["issues"].append("Allocator config not found at /etc/nitro_enclaves/allocator.yaml")
else:
result["allocator_configured"] = True
for line in stdout.splitlines():
line = line.strip()
if line.startswith("memory_mib:"):
result["memory_mib"] = int(line.split(":")[1].strip())
elif line.startswith("cpu_count:"):
result["cpu_count"] = int(line.split(":")[1].strip())
if result["memory_mib"] and result["memory_mib"] < 512:
result["issues"].append(
f"Allocated memory ({result['memory_mib']} MiB) is very low - may cause enclave launch failures"
)
if result["cpu_count"] and result["cpu_count"] < 2:
result["issues"].append(
f"Allocated CPUs ({result['cpu_count']}) is minimal - consider 2+ for production"
)
except ClientError as e:
result["issues"].append(f"SSM access failed: {e.response['Error']['Message']}")
return result
def validate_attestation_document_structure(attestation_b64):
"""Validate the structure of a base64-encoded attestation document."""
if cbor2 is None:
return {"error": "cbor2 package required for attestation validation. Install with: pip install cbor2"}
result = {
"valid_structure": False,
"pcrs": {},
"module_id": None,
"digest": None,
"timestamp": None,
"has_certificate": False,
"has_cabundle": False,
"has_public_key": False,
"issues": [],
}
try:
attestation_bytes = base64.b64decode(attestation_b64)
# COSE_Sign1 is a CBOR array: [protected, unprotected, payload, signature]
cose_structure = cbor2.loads(attestation_bytes)
if hasattr(cose_structure, "tag") and cose_structure.tag == 18:
cose_array = cose_structure.value
elif isinstance(cose_structure, list) and len(cose_structure) == 4:
cose_array = cose_structure
else:
result["issues"].append("Not a valid COSE_Sign1 structure")
return result
payload = cbor2.loads(cose_array[2])
result["module_id"] = payload.get("module_id")
result["digest"] = payload.get("digest")
result["timestamp"] = payload.get("timestamp")
if result["timestamp"]:
ts = datetime.fromtimestamp(result["timestamp"] / 1000, tz=timezone.utc)
result["timestamp_human"] = ts.isoformat()
pcrs = payload.get("pcrs", {})
for idx, value in pcrs.items():
result["pcrs"][f"PCR{idx}"] = value.hex() if isinstance(value, bytes) else str(value)
result["has_certificate"] = "certificate" in payload and payload["certificate"] is not None
result["has_cabundle"] = "cabundle" in payload and len(payload.get("cabundle", [])) > 0
result["has_public_key"] = "public_key" in payload and payload["public_key"] is not None
result["valid_structure"] = True
if not result["has_cabundle"]:
result["issues"].append("Missing CA bundle - cannot verify certificate chain to AWS root")
if not result["has_public_key"]:
result["issues"].append("No public key in attestation - KMS cannot encrypt response to enclave")
if "PCR0" not in result["pcrs"]:
result["issues"].append("PCR0 (image hash) not present in attestation document")
except Exception as e:
result["issues"].append(f"Attestation parsing error: {str(e)}")
return result
def audit_cloudtrail_enclave_events(cloudtrail_client, days_back=7):
"""Search CloudTrail for enclave-related security events."""
from datetime import timedelta
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=days_back)
events_of_interest = [
"RunInstances",
"TerminateInstances",
"ModifyInstanceAttribute",
]
kms_events = ["Decrypt", "GenerateDataKey", "GenerateDataKeyPair", "GenerateRandom"]
findings = []
# Check for instance launches with enclave options
for event_name in events_of_interest:
try:
response = cloudtrail_client.lookup_events(
LookupAttributes=[
{"AttributeKey": "EventName", "AttributeValue": event_name}
],
StartTime=start_time,
EndTime=end_time,
MaxResults=50,
)
for event in response.get("Events", []):
ct_event = json.loads(event.get("CloudTrailEvent", "{}"))
req_params = ct_event.get("requestParameters", {})
if event_name == "RunInstances":
enclave_opts = req_params.get("enclaveOptions", {})
if enclave_opts.get("enabled"):
findings.append({
"event": event_name,
"time": event["EventTime"].isoformat(),
"user": event.get("Username"),
"detail": "Enclave-enabled instance launched",
"source_ip": ct_event.get("sourceIPAddress"),
})
except ClientError:
continue
# Check for KMS calls with Recipient parameter (enclave attestation)
for event_name in kms_events:
try:
response = cloudtrail_client.lookup_events(
LookupAttributes=[
{"AttributeKey": "EventName", "AttributeValue": event_name}
],
StartTime=start_time,
EndTime=end_time,
MaxResults=50,
)
for event in response.get("Events", []):
ct_event = json.loads(event.get("CloudTrailEvent", "{}"))
req_params = ct_event.get("requestParameters", {})
if "recipient" in req_params or "Recipient" in req_params:
findings.append({
"event": event_name,
"time": event["EventTime"].isoformat(),
"user": event.get("Username"),
"detail": "KMS operation with enclave attestation document",
"key_id": req_params.get("keyId"),
"source_ip": ct_event.get("sourceIPAddress"),
})
except ClientError:
continue
logger.info("Found %d enclave-related CloudTrail events", len(findings))
return findings
def generate_report(instances, kms_audits, iam_audits, cloudtrail_events, attestation_results=None):
"""Generate comprehensive Nitro Enclave security assessment report."""
total_issues = 0
critical_issues = []
for audit in kms_audits:
total_issues += len(audit.get("issues", []))
if not audit.get("has_attestation_condition"):
critical_issues.append(f"KMS key {audit['key_id']} has no attestation conditions")
for audit in iam_audits:
total_issues += len(audit.get("issues", []))
if audit.get("overprivileged"):
critical_issues.append(f"IAM role {audit['role_name']} is overprivileged")
report = {
"report_type": "Nitro Enclave Security Assessment",
"generated_at": datetime.now(timezone.utc).isoformat(),
"summary": {
"enclave_instances": len(instances),
"kms_keys_audited": len(kms_audits),
"iam_roles_audited": len(iam_audits),
"cloudtrail_events": len(cloudtrail_events),
"total_issues": total_issues,
"critical_issues": len(critical_issues),
},
"critical_findings": critical_issues,
"instances": instances,
"kms_policy_audits": kms_audits,
"iam_role_audits": iam_audits,
"cloudtrail_events": cloudtrail_events,
}
if attestation_results:
report["attestation_validation"] = attestation_results
return report
def main():
parser = argparse.ArgumentParser(description="AWS Nitro Enclave Security Assessment Agent")
parser.add_argument("--region", default="us-east-1", help="AWS region")
parser.add_argument("--kms-key-ids", nargs="+", help="KMS key IDs to audit")
parser.add_argument("--iam-roles", nargs="+", help="IAM role names to audit for enclave permissions")
parser.add_argument("--attestation-doc", help="Base64-encoded attestation document to validate")
parser.add_argument("--cloudtrail-days", type=int, default=7, help="Days of CloudTrail history to search")
parser.add_argument("--output", default="nitro_enclave_security_report.json", help="Output report file")
args = parser.parse_args()
session = boto3.Session(region_name=args.region)
ec2_client = session.client("ec2")
kms_client = session.client("kms")
iam_client = session.client("iam")
cloudtrail_client = session.client("cloudtrail")
logger.info("Starting Nitro Enclave security assessment in %s", args.region)
# Step 1: Find enclave-enabled instances
instances = get_nitro_instances(ec2_client, args.region)
# Step 2: Audit KMS key policies
kms_audits = []
if args.kms_key_ids:
for key_id in args.kms_key_ids:
logger.info("Auditing KMS key: %s", key_id)
kms_audits.append(audit_kms_key_policy(kms_client, key_id))
else:
# Auto-discover KMS keys
try:
keys_response = kms_client.list_keys(Limit=100)
for key in keys_response.get("Keys", []):
audit = audit_kms_key_policy(kms_client, key["KeyId"])
if audit.get("has_attestation_condition") or audit.get("allowed_actions"):
kms_audits.append(audit)
except ClientError as e:
logger.warning("Cannot list KMS keys: %s", e)
# Step 3: Audit IAM roles
iam_audits = []
if args.iam_roles:
for role_name in args.iam_roles:
logger.info("Auditing IAM role: %s", role_name)
iam_audits.append(audit_iam_role_for_enclave(iam_client, role_name))
# Step 4: Search CloudTrail events
cloudtrail_events = audit_cloudtrail_enclave_events(cloudtrail_client, args.cloudtrail_days)
# Step 5: Validate attestation document if provided
attestation_results = None
if args.attestation_doc:
logger.info("Validating attestation document")
attestation_results = validate_attestation_document_structure(args.attestation_doc)
# Generate report
report = generate_report(instances, kms_audits, iam_audits, cloudtrail_events, attestation_results)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info("Report saved to %s", args.output)
# Print summary
summary = report["summary"]
logger.info(
"Assessment complete: %d instances, %d KMS keys, %d IAM roles, %d issues (%d critical)",
summary["enclave_instances"],
summary["kms_keys_audited"],
summary["iam_roles_audited"],
summary["total_issues"],
summary["critical_issues"],
)
if report["critical_findings"]:
logger.warning("CRITICAL FINDINGS:")
for finding in report["critical_findings"]:
logger.warning(" - %s", finding)
return 0 if summary["critical_issues"] == 0 else 1
if __name__ == "__main__":
sys.exit(main())