Fix review comments: correct AWS Detective API usage and forensic ordering

- Fix FilterCriteria to use singular Severity/Status with Value objects
  instead of invalid plural Severities/Statuses arrays (SKILL.md + process.py)
- Fix get_entity_history: rename to get_investigation_indicators, use
  investigation_id instead of entity_arn for InvestigationId parameter
- Replace invalid inv-* placeholders with 21-digit numeric IDs
- Fix Expected Output to match real API response structure (no embedded
  Indicators; document separate list-indicators call and indicator types)
- Fix CLI --filter-criteria example to use correct format
- Update process.py --severity to accept single value with validation
- Add --max-results validation (1-100 range)
- Add pagination via _collect_all_pages helper for all list API calls
- Reorder Response Actions checklist: evidence preservation before containment
- Reorder Phase 5 workflow: preserve evidence first when safe
This commit is contained in:
MAGI
2026-03-22 16:06:14 -06:00
committed by Julio César Suástegui
parent 41b828e758
commit a072845a3f
4 changed files with 54 additions and 40 deletions
@@ -46,7 +46,7 @@ aws detective list-graphs --output table
# Get entity profile for an IAM user
aws detective get-investigation \
--graph-arn arn:aws:detective:us-east-1:123456789012:graph:a1b2c3d4 \
--investigation-id inv-1234567890
--investigation-id 000000000000000000001
```
### Step 3: Search Entities Programmatically
@@ -65,29 +65,26 @@ def list_behavior_graphs():
response = detective.list_graphs()
return response.get('GraphList', [])
def get_entity_history(graph_arn, entity_arn, hours=24):
"""Get API call history for an entity."""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours)
def get_investigation_indicators(graph_arn, investigation_id, max_results=50):
"""Get indicators for a specific investigation."""
response = detective.list_indicators(
GraphArn=graph_arn,
InvestigationId=entity_arn,
MaxResults=50
InvestigationId=investigation_id,
MaxResults=max_results
)
return response.get('Indicators', [])
def investigate_guardduty_findings(graph_arn):
"""List finding groups correlated by Detective."""
"""List high-severity investigations correlated by Detective."""
response = detective.list_investigations(
GraphArn=graph_arn,
FilterCriteria={
'Statuses': ['RUNNING', 'FAILED'],
'Severities': ['HIGH', 'CRITICAL']
'Severity': {'Value': 'CRITICAL'},
'Status': {'Value': 'RUNNING'}
},
MaxResults=20
)
for investigation in response.get('InvestigationDetails', []):
print(f"Investigation: {investigation['InvestigationId']}")
print(f" Entity: {investigation['EntityArn']}")
@@ -109,7 +106,7 @@ if __name__ == "__main__":
# List investigations with high severity
aws detective list-investigations \
--graph-arn arn:aws:detective:us-east-1:123456789012:graph:a1b2c3d4 \
--filter-criteria '{"Severities":["HIGH","CRITICAL"]}' \
--filter-criteria '{"Severity":{"Value":"HIGH"}}' \
--max-results 10
```
@@ -119,33 +116,32 @@ aws detective list-investigations \
# Get indicators for a specific investigation
aws detective list-indicators \
--graph-arn arn:aws:detective:us-east-1:123456789012:graph:a1b2c3d4 \
--investigation-id inv-1234567890 \
--investigation-id 000000000000000000001 \
--max-results 50
```
## Expected Output
The `list-investigations` command returns investigation metadata:
```json
{
"InvestigationDetails": [
{
"InvestigationId": "inv-0a1b2c3d4e5f",
"InvestigationId": "000000000000000000001",
"Severity": "CRITICAL",
"Status": "RUNNING",
"State": "ACTIVE",
"EntityArn": "arn:aws:iam::123456789012:user/suspicious-user",
"EntityType": "IAM_USER",
"CreatedTime": "2026-03-15T14:30:00Z",
"Indicators": {
"ImpossibleTravel": true,
"NewGeoLocation": "ru-central-1",
"UnusualApiCalls": ["CreateAccessKey", "AttachUserPolicy", "PutBucketPolicy"],
"RelatedFindings": 7
}
"CreatedTime": "2026-03-15T14:30:00Z"
}
]
}
```
Indicators are retrieved separately via `list-indicators` and include types such as `TTP_OBSERVED`, `IMPOSSIBLE_TRAVEL`, `FLAGGED_IP_ADDRESS`, `NEW_GEOLOCATION`, `NEW_ASO`, `NEW_USER_AGENT`, `RELATED_FINDING`, and `RELATED_FINDING_GROUP`.
## Verification
1. Confirm behavior graph has data: `aws detective list-graphs` returns non-empty list
@@ -27,8 +27,8 @@
- [ ] Initial access vector identified
## Response Actions
- [ ] Evidence preserved (or capture rationale if immediate containment required)
- [ ] Compromised credentials disabled
- [ ] Active sessions revoked
- [ ] Affected resources isolated
- [ ] Evidence preserved
- [ ] Stakeholders notified
@@ -24,7 +24,8 @@
4. Document indicators of compromise (IOCs)
## Phase 5: Response
1. Disable compromised credentials
2. Revoke active sessions
3. Isolate affected resources
4. Preserve evidence (CloudTrail logs, flow logs)
1. Preserve evidence (CloudTrail logs, flow logs, snapshots) when safe
2. Disable compromised credentials
3. Revoke active sessions
4. Isolate affected resources
5. If active impact is ongoing, contain first and document evidence trade-offs
@@ -13,11 +13,23 @@ import os
from datetime import datetime, timedelta
def _collect_all_pages(client_method, result_key, **kwargs):
"""Paginate through all pages of an AWS Detective API call."""
all_items = []
while True:
response = client_method(**kwargs)
all_items.extend(response.get(result_key, []))
next_token = response.get('NextToken')
if not next_token:
break
kwargs['NextToken'] = next_token
return all_items
def list_behavior_graphs(session):
"""List all Detective behavior graphs in the account."""
client = session.client('detective')
response = client.list_graphs()
graphs = response.get('GraphList', [])
graphs = _collect_all_pages(client.list_graphs, 'GraphList')
if not graphs:
print("[!] No behavior graphs found. Enable Detective first.")
@@ -33,13 +45,13 @@ def list_behavior_graphs(session):
return graphs
def list_investigations(session, graph_arn, severities=None, max_results=20):
def list_investigations(session, graph_arn, severity=None, max_results=20):
"""List investigations filtered by severity."""
client = session.client('detective')
filter_criteria = {}
if severities:
filter_criteria['Severities'] = severities
if severity:
filter_criteria['Severity'] = {'Value': severity}
kwargs = {
'GraphArn': graph_arn,
@@ -48,8 +60,9 @@ def list_investigations(session, graph_arn, severities=None, max_results=20):
if filter_criteria:
kwargs['FilterCriteria'] = filter_criteria
response = client.list_investigations(**kwargs)
investigations = response.get('InvestigationDetails', [])
investigations = _collect_all_pages(
client.list_investigations, 'InvestigationDetails', **kwargs
)
if not investigations:
print("[+] No investigations found matching criteria")
@@ -96,13 +109,12 @@ def list_indicators(session, graph_arn, investigation_id, max_results=50):
"""List indicators for a specific investigation."""
client = session.client('detective')
response = client.list_indicators(
indicators = _collect_all_pages(
client.list_indicators, 'Indicators',
GraphArn=graph_arn,
InvestigationId=investigation_id,
MaxResults=max_results,
)
indicators = response.get('Indicators', [])
if not indicators:
print("[+] No indicators found for this investigation")
return []
@@ -150,11 +162,13 @@ if __name__ == "__main__":
)
parser.add_argument(
"--severity",
nargs="+",
type=str,
default=None,
help="Severity filter (e.g. HIGH CRITICAL)",
choices=["INFORMATIONAL", "LOW", "MEDIUM", "HIGH", "CRITICAL"],
help="Severity filter (e.g. HIGH)",
)
parser.add_argument("--max-results", type=int, default=20)
parser.add_argument("--max-results", type=int, default=20,
help="Max results per API call (1-100)")
parser.add_argument("--region", default="us-east-1")
parser.add_argument("--profile", type=str, help="AWS profile name")
parser.add_argument(
@@ -162,6 +176,9 @@ if __name__ == "__main__":
)
args = parser.parse_args()
if args.max_results < 1 or args.max_results > 100:
parser.error("--max-results must be between 1 and 100")
kwargs = {"region_name": args.region}
if args.profile:
kwargs["profile_name"] = args.profile