mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
1504 lines
58 KiB
Python
1504 lines
58 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
GDPR Data Subject Access Request (DSAR) Workflow Automation Agent.
|
|
|
|
Implements end-to-end DSAR processing: intake, identity verification, PII discovery
|
|
using regex and NER, data mapping to Article 15 categories, exemption review,
|
|
response generation, deadline tracking, and audit logging.
|
|
|
|
References:
|
|
- GDPR Article 15: https://gdpr-info.eu/art-15-gdpr/
|
|
- ICO DSAR Guidance: https://ico.org.uk/for-organisations/uk-gdpr-guidance-and-resources/subject-access-requests/
|
|
- EDPB Guidelines 01/2022 on Right of Access
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import uuid
|
|
import hashlib
|
|
import argparse
|
|
import csv
|
|
import io
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PII Regex Patterns -- sourced from Netwrix, PII Crawler, and Varonis
|
|
# guidance for EU/UK personal data discovery
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PII_PATTERNS = {
|
|
"email": {
|
|
"pattern": r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b",
|
|
"description": "Email address",
|
|
"confidence": 0.95,
|
|
"gdpr_category": "contact_information",
|
|
},
|
|
"phone_international": {
|
|
"pattern": r"(?:\+\d{1,3}[\s\-]?)?\(?\d{2,4}\)?[\s\-]?\d{3,4}[\s\-]?\d{3,4}",
|
|
"description": "Phone number (international format)",
|
|
"confidence": 0.70,
|
|
"gdpr_category": "contact_information",
|
|
},
|
|
"uk_phone": {
|
|
"pattern": r"\b(?:0|\+44[\s\-]?)(?:\d[\s\-]?){9,10}\b",
|
|
"description": "UK phone number",
|
|
"confidence": 0.80,
|
|
"gdpr_category": "contact_information",
|
|
},
|
|
"ssn_us": {
|
|
"pattern": r"\b(?!000|666|9\d{2})\d{3}[\-\s]?(?!00)\d{2}[\-\s]?(?!0000)\d{4}\b",
|
|
"description": "US Social Security Number",
|
|
"confidence": 0.85,
|
|
"gdpr_category": "government_id",
|
|
},
|
|
"nino_uk": {
|
|
"pattern": r"\b[A-CEGHJ-PR-TW-Z]{2}\s?\d{2}\s?\d{2}\s?\d{2}\s?[A-D]\b",
|
|
"description": "UK National Insurance Number",
|
|
"confidence": 0.90,
|
|
"gdpr_category": "government_id",
|
|
},
|
|
"credit_card": {
|
|
"pattern": r"\b(?:4\d{3}|5[1-5]\d{2}|3[47]\d{2}|6(?:011|5\d{2}))"
|
|
r"[\-\s]?\d{4}[\-\s]?\d{4}[\-\s]?\d{1,4}\b",
|
|
"description": "Credit/debit card number",
|
|
"confidence": 0.85,
|
|
"gdpr_category": "financial_data",
|
|
},
|
|
"iban": {
|
|
"pattern": r"\b[A-Z]{2}\d{2}\s?(?:\d{4}\s?){2,7}\d{1,4}\b",
|
|
"description": "IBAN (International Bank Account Number)",
|
|
"confidence": 0.80,
|
|
"gdpr_category": "financial_data",
|
|
},
|
|
"ipv4": {
|
|
"pattern": r"\b(?:(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){3}"
|
|
r"(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\b",
|
|
"description": "IPv4 address",
|
|
"confidence": 0.60,
|
|
"gdpr_category": "online_identifier",
|
|
},
|
|
"date_of_birth": {
|
|
"pattern": r"\b(?:0[1-9]|[12]\d|3[01])[/\-.](?:0[1-9]|1[0-2])[/\-.]"
|
|
r"(?:19|20)\d{2}\b",
|
|
"description": "Date of birth (DD/MM/YYYY or DD-MM-YYYY)",
|
|
"confidence": 0.65,
|
|
"gdpr_category": "demographic_data",
|
|
},
|
|
"uk_postcode": {
|
|
"pattern": r"\b[A-Z]{1,2}\d[A-Z\d]?\s?\d[A-Z]{2}\b",
|
|
"description": "UK postcode",
|
|
"confidence": 0.75,
|
|
"gdpr_category": "location_data",
|
|
},
|
|
"passport_uk": {
|
|
"pattern": r"\b\d{9}\b",
|
|
"description": "UK passport number (9 digits)",
|
|
"confidence": 0.40,
|
|
"gdpr_category": "government_id",
|
|
},
|
|
"eu_vat": {
|
|
"pattern": r"\b[A-Z]{2}\d{8,12}\b",
|
|
"description": "EU VAT number",
|
|
"confidence": 0.50,
|
|
"gdpr_category": "financial_data",
|
|
},
|
|
}
|
|
|
|
# Compiled patterns for performance
|
|
COMPILED_PATTERNS = {
|
|
name: re.compile(info["pattern"], re.IGNORECASE if name in ("email",) else 0)
|
|
for name, info in PII_PATTERNS.items()
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Article 15 response categories -- information that MUST be provided
|
|
# ---------------------------------------------------------------------------
|
|
|
|
ARTICLE_15_CATEGORIES = {
|
|
"processing_purposes": {
|
|
"label": "Purposes of Processing",
|
|
"article_ref": "Art. 15(1)(a)",
|
|
"description": "The purposes for which the personal data are being processed",
|
|
},
|
|
"data_categories": {
|
|
"label": "Categories of Personal Data",
|
|
"article_ref": "Art. 15(1)(b)",
|
|
"description": "The categories of personal data concerned",
|
|
},
|
|
"recipients": {
|
|
"label": "Recipients or Categories of Recipients",
|
|
"article_ref": "Art. 15(1)(c)",
|
|
"description": "Recipients to whom personal data have been or will be disclosed",
|
|
},
|
|
"retention_period": {
|
|
"label": "Retention Period",
|
|
"article_ref": "Art. 15(1)(d)",
|
|
"description": "Envisaged retention period or criteria used to determine it",
|
|
},
|
|
"data_subject_rights": {
|
|
"label": "Data Subject Rights",
|
|
"article_ref": "Art. 15(1)(e-f)",
|
|
"description": "Right to rectification, erasure, restriction, objection, and complaint",
|
|
},
|
|
"data_source": {
|
|
"label": "Source of Data",
|
|
"article_ref": "Art. 15(1)(g)",
|
|
"description": "Where data was not collected from the subject, available source info",
|
|
},
|
|
"automated_decisions": {
|
|
"label": "Automated Decision-Making",
|
|
"article_ref": "Art. 15(1)(h)",
|
|
"description": "Existence of automated decision-making including profiling",
|
|
},
|
|
"international_transfers": {
|
|
"label": "International Transfers",
|
|
"article_ref": "Art. 15(2)",
|
|
"description": "Appropriate safeguards for transfers to third countries",
|
|
},
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DSAR exemption types per GDPR/UK GDPR
|
|
# ---------------------------------------------------------------------------
|
|
|
|
EXEMPTION_TYPES = {
|
|
"third_party_data": {
|
|
"label": "Third-Party Personal Data",
|
|
"description": "Data relating to another identifiable individual",
|
|
"legal_basis": "Art. 15(4) / DPA 2018 Sch. 2 Para 16",
|
|
"action": "redact",
|
|
},
|
|
"legal_professional_privilege": {
|
|
"label": "Legal Professional Privilege",
|
|
"description": "Communications subject to legal privilege",
|
|
"legal_basis": "DPA 2018 Sch. 2 Para 19",
|
|
"action": "withhold",
|
|
},
|
|
"trade_secrets": {
|
|
"label": "Trade Secrets / Confidential Info",
|
|
"description": "Trade secrets or intellectual property",
|
|
"legal_basis": "Recital 63 GDPR",
|
|
"action": "redact",
|
|
},
|
|
"crime_prevention": {
|
|
"label": "Crime Prevention / Detection",
|
|
"description": "Data processed for crime prevention purposes",
|
|
"legal_basis": "DPA 2018 Sch. 2 Para 2",
|
|
"action": "withhold",
|
|
},
|
|
"management_forecasting": {
|
|
"label": "Management Forecasting / Planning",
|
|
"description": "Data processed for management planning that would prejudice business",
|
|
"legal_basis": "DPA 2018 Sch. 2 Para 22",
|
|
"action": "withhold",
|
|
},
|
|
"negotiations": {
|
|
"label": "Negotiations",
|
|
"description": "Data that would prejudice negotiations with the data subject",
|
|
"legal_basis": "DPA 2018 Sch. 2 Para 24",
|
|
"action": "withhold",
|
|
},
|
|
"regulatory_function": {
|
|
"label": "Regulatory Functions",
|
|
"description": "Data processed for regulatory purposes",
|
|
"legal_basis": "DPA 2018 Sch. 2 Para 20",
|
|
"action": "withhold",
|
|
},
|
|
}
|
|
|
|
|
|
# ===========================================================================
|
|
# PII Pattern Matcher
|
|
# ===========================================================================
|
|
|
|
class PIIPatternMatcher:
|
|
"""Scans text for PII using compiled regex patterns with confidence scoring."""
|
|
|
|
def __init__(self, custom_patterns=None):
|
|
self.patterns = dict(COMPILED_PATTERNS)
|
|
self.pattern_info = dict(PII_PATTERNS)
|
|
if custom_patterns:
|
|
for name, spec in custom_patterns.items():
|
|
self.patterns[name] = re.compile(spec["pattern"])
|
|
self.pattern_info[name] = spec
|
|
|
|
def scan_text(self, text: str, min_confidence: float = 0.5) -> list[dict]:
|
|
"""Scan text for PII matches with confidence scoring."""
|
|
matches = []
|
|
for name, compiled in self.patterns.items():
|
|
info = self.pattern_info[name]
|
|
if info.get("confidence", 1.0) < min_confidence:
|
|
continue
|
|
for m in compiled.finditer(text):
|
|
value = m.group().strip()
|
|
if len(value) < 3:
|
|
continue
|
|
confidence = info.get("confidence", 0.5)
|
|
# Boost confidence if contextual keywords are nearby
|
|
context_start = max(0, m.start() - 50)
|
|
context_end = min(len(text), m.end() + 50)
|
|
context = text[context_start:context_end].lower()
|
|
context_keywords = {
|
|
"email": ["email", "e-mail", "contact", "address"],
|
|
"phone_international": ["phone", "tel", "mobile", "call"],
|
|
"uk_phone": ["phone", "tel", "mobile", "call"],
|
|
"ssn_us": ["ssn", "social security", "tax id"],
|
|
"nino_uk": ["nino", "national insurance", "ni number"],
|
|
"credit_card": ["card", "visa", "mastercard", "payment"],
|
|
"iban": ["iban", "bank", "account"],
|
|
"date_of_birth": ["dob", "birth", "born", "age"],
|
|
"uk_postcode": ["postcode", "post code", "address", "zip"],
|
|
}
|
|
if name in context_keywords:
|
|
for kw in context_keywords[name]:
|
|
if kw in context:
|
|
confidence = min(1.0, confidence + 0.15)
|
|
break
|
|
|
|
matches.append({
|
|
"type": name,
|
|
"value": value,
|
|
"description": info["description"],
|
|
"confidence": round(confidence, 2),
|
|
"gdpr_category": info.get("gdpr_category", "unknown"),
|
|
"position": {"start": m.start(), "end": m.end()},
|
|
})
|
|
return matches
|
|
|
|
def scan_file(self, file_path: str, min_confidence: float = 0.5) -> dict:
|
|
"""Scan a file for PII matches."""
|
|
path = Path(file_path)
|
|
if not path.exists():
|
|
return {"file": file_path, "error": "File not found", "matches": []}
|
|
try:
|
|
text = path.read_text(encoding="utf-8", errors="replace")
|
|
except Exception as e:
|
|
return {"file": file_path, "error": str(e), "matches": []}
|
|
matches = self.scan_text(text, min_confidence)
|
|
return {
|
|
"file": file_path,
|
|
"size_bytes": path.stat().st_size,
|
|
"matches": matches,
|
|
"match_count": len(matches),
|
|
"pii_types_found": list({m["type"] for m in matches}),
|
|
}
|
|
|
|
|
|
# ===========================================================================
|
|
# PII Discovery Engine
|
|
# ===========================================================================
|
|
|
|
class PIIDiscoveryEngine:
|
|
"""Discovers PII across structured (database) and unstructured (files) data sources."""
|
|
|
|
def __init__(self, custom_patterns=None):
|
|
self.matcher = PIIPatternMatcher(custom_patterns)
|
|
self.results = []
|
|
|
|
def scan_database(self, connection_string: str,
|
|
search_identifiers: dict,
|
|
tables: list[str] | None = None) -> dict:
|
|
"""
|
|
Scan a database for records matching search identifiers.
|
|
|
|
In production, this connects via SQLAlchemy/psycopg2. This implementation
|
|
generates the parameterized queries needed for discovery.
|
|
"""
|
|
queries = []
|
|
if not tables:
|
|
tables = [
|
|
"users", "customers", "orders", "contacts", "employees",
|
|
"audit_log", "login_history", "consent_records",
|
|
"communication_preferences", "support_tickets",
|
|
]
|
|
|
|
safe_table_re = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_.]*$")
|
|
|
|
for table in tables:
|
|
if not safe_table_re.match(table):
|
|
continue
|
|
for field, value in search_identifiers.items():
|
|
if not safe_table_re.match(field):
|
|
continue
|
|
queries.append({
|
|
"table": table,
|
|
"query": f"SELECT * FROM [{table}] WHERE [{field}] = ?",
|
|
"params": [value],
|
|
"search_field": field,
|
|
"search_value": value,
|
|
})
|
|
|
|
# Full-text search query for unstructured columns
|
|
for table in tables:
|
|
if not safe_table_re.match(table):
|
|
continue
|
|
for identifier_value in search_identifiers.values():
|
|
queries.append({
|
|
"table": table,
|
|
"query": f"SELECT * FROM [{table}] WHERE CAST(* AS TEXT) LIKE ?",
|
|
"params": [f"%{identifier_value}%"],
|
|
"search_type": "full_text",
|
|
})
|
|
|
|
result = {
|
|
"source_type": "database",
|
|
"connection": _redact_connection_string(connection_string),
|
|
"tables_scanned": len(tables),
|
|
"queries_generated": len(queries),
|
|
"queries": queries,
|
|
"scan_timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
self.results.append(result)
|
|
return result
|
|
|
|
def scan_files(self, directories: list[str],
|
|
search_identifiers: dict,
|
|
file_extensions: list[str] | None = None,
|
|
max_file_size_mb: int = 50) -> dict:
|
|
"""Scan files in directories for PII matching search identifiers."""
|
|
if not file_extensions:
|
|
file_extensions = [
|
|
".txt", ".csv", ".json", ".xml", ".log", ".html",
|
|
".md", ".yaml", ".yml", ".ini", ".conf", ".cfg",
|
|
]
|
|
|
|
scanned_files = []
|
|
matches_found = []
|
|
errors = []
|
|
max_bytes = max_file_size_mb * 1024 * 1024
|
|
|
|
for directory in directories:
|
|
dir_path = Path(directory)
|
|
if not dir_path.exists():
|
|
errors.append({"directory": directory, "error": "Directory not found"})
|
|
continue
|
|
for ext in file_extensions:
|
|
for file_path in dir_path.rglob(f"*{ext}"):
|
|
if file_path.stat().st_size > max_bytes:
|
|
continue
|
|
try:
|
|
text = file_path.read_text(encoding="utf-8", errors="replace")
|
|
except Exception as e:
|
|
errors.append({"file": str(file_path), "error": str(e)})
|
|
continue
|
|
|
|
scanned_files.append(str(file_path))
|
|
|
|
# Check for identifier matches
|
|
for id_type, id_value in search_identifiers.items():
|
|
if id_value.lower() in text.lower():
|
|
# Run full PII scan on matching files
|
|
pii_matches = self.matcher.scan_text(text)
|
|
matches_found.append({
|
|
"file": str(file_path),
|
|
"matched_identifier": id_type,
|
|
"pii_matches": pii_matches,
|
|
})
|
|
break
|
|
|
|
result = {
|
|
"source_type": "files",
|
|
"directories_scanned": len(directories),
|
|
"files_scanned": len(scanned_files),
|
|
"files_with_matches": len(matches_found),
|
|
"matches": matches_found,
|
|
"errors": errors,
|
|
"raw_text_matches": [m["file"] for m in matches_found],
|
|
"scan_timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
self.results.append(result)
|
|
return result
|
|
|
|
def scan_with_ner(self, text_corpus: list[str],
|
|
entity_types: list[str] | None = None,
|
|
confidence_threshold: float = 0.7) -> dict:
|
|
"""
|
|
Scan text using Named Entity Recognition for contextual PII detection.
|
|
|
|
Uses spaCy NER model when available, falls back to regex+context heuristics.
|
|
Entity types: PERSON, EMAIL, PHONE_NUMBER, LOCATION, DATE_OF_BIRTH,
|
|
ORG, GPE, NORP, CARDINAL
|
|
"""
|
|
if not entity_types:
|
|
entity_types = [
|
|
"PERSON", "EMAIL", "PHONE_NUMBER", "LOCATION",
|
|
"DATE_OF_BIRTH", "ORG", "GPE",
|
|
]
|
|
|
|
ner_results = []
|
|
nlp = None
|
|
|
|
# Attempt to load spaCy model
|
|
try:
|
|
import spacy
|
|
try:
|
|
nlp = spacy.load("en_core_web_lg")
|
|
except OSError:
|
|
try:
|
|
nlp = spacy.load("en_core_web_sm")
|
|
except OSError:
|
|
nlp = None
|
|
except ImportError:
|
|
nlp = None
|
|
|
|
for file_path in text_corpus:
|
|
path = Path(file_path)
|
|
if not path.exists():
|
|
continue
|
|
try:
|
|
text = path.read_text(encoding="utf-8", errors="replace")
|
|
except Exception:
|
|
continue
|
|
|
|
entities_found = []
|
|
|
|
if nlp is not None:
|
|
# Use spaCy NER
|
|
doc = nlp(text[:100000]) # Limit to 100k chars for performance
|
|
for ent in doc.ents:
|
|
if ent.label_ in entity_types:
|
|
entities_found.append({
|
|
"text": ent.text,
|
|
"label": ent.label_,
|
|
"start": ent.start_char,
|
|
"end": ent.end_char,
|
|
"confidence": round(0.7 + (0.3 if ent.label_ in ("PERSON", "ORG") else 0.1), 2),
|
|
"method": "spacy_ner",
|
|
})
|
|
else:
|
|
# Fallback: regex + context heuristics
|
|
regex_matches = self.matcher.scan_text(text, min_confidence=confidence_threshold)
|
|
for m in regex_matches:
|
|
ner_label = _map_pii_type_to_ner(m["type"])
|
|
if ner_label in entity_types:
|
|
entities_found.append({
|
|
"text": m["value"],
|
|
"label": ner_label,
|
|
"start": m["position"]["start"],
|
|
"end": m["position"]["end"],
|
|
"confidence": m["confidence"],
|
|
"method": "regex_heuristic",
|
|
})
|
|
|
|
# Name detection heuristic (Title Case sequences near person-keywords)
|
|
if "PERSON" in entity_types:
|
|
name_pattern = re.compile(
|
|
r"(?:(?:name|customer|employee|patient|client|user|requester|subject)"
|
|
r"[\s:=]+)([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,3})",
|
|
re.MULTILINE,
|
|
)
|
|
for m in name_pattern.finditer(text):
|
|
entities_found.append({
|
|
"text": m.group(1),
|
|
"label": "PERSON",
|
|
"start": m.start(1),
|
|
"end": m.end(1),
|
|
"confidence": 0.75,
|
|
"method": "context_heuristic",
|
|
})
|
|
|
|
ner_results.append({
|
|
"file": str(file_path),
|
|
"entities": entities_found,
|
|
"entity_count": len(entities_found),
|
|
})
|
|
|
|
return {
|
|
"source_type": "ner",
|
|
"files_processed": len(ner_results),
|
|
"total_entities": sum(r["entity_count"] for r in ner_results),
|
|
"results": ner_results,
|
|
"model_used": "spacy" if nlp else "regex_heuristic",
|
|
"entity_types_requested": entity_types,
|
|
"scan_timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
def consolidate_results(self, *result_sets) -> dict:
|
|
"""Consolidate PII discovery results from multiple sources."""
|
|
all_records = []
|
|
sources = set()
|
|
|
|
for result in result_sets:
|
|
if not result:
|
|
continue
|
|
source_type = result.get("source_type", "unknown")
|
|
sources.add(source_type)
|
|
|
|
if source_type == "database":
|
|
for query in result.get("queries", []):
|
|
all_records.append({
|
|
"source": f"database:{query['table']}",
|
|
"type": "structured",
|
|
"details": query,
|
|
})
|
|
|
|
elif source_type == "files":
|
|
for match in result.get("matches", []):
|
|
for pii in match.get("pii_matches", []):
|
|
all_records.append({
|
|
"source": f"file:{match['file']}",
|
|
"type": "unstructured",
|
|
"pii_type": pii["type"],
|
|
"value_hash": hashlib.sha256(
|
|
pii["value"].encode()
|
|
).hexdigest()[:16],
|
|
"confidence": pii["confidence"],
|
|
"gdpr_category": pii["gdpr_category"],
|
|
})
|
|
|
|
elif source_type == "ner":
|
|
for file_result in result.get("results", []):
|
|
for entity in file_result.get("entities", []):
|
|
all_records.append({
|
|
"source": f"ner:{file_result['file']}",
|
|
"type": "ner_entity",
|
|
"entity_label": entity["label"],
|
|
"value_hash": hashlib.sha256(
|
|
entity["text"].encode()
|
|
).hexdigest()[:16],
|
|
"confidence": entity["confidence"],
|
|
})
|
|
|
|
return {
|
|
"total_records": len(all_records),
|
|
"source_count": len(sources),
|
|
"sources": list(sources),
|
|
"records": all_records,
|
|
"consolidated_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
def full_scan(self, search_identifiers: dict,
|
|
sources: list[str] | None = None,
|
|
db_connection: str = "",
|
|
directories: list[str] | None = None) -> dict:
|
|
"""Run a complete PII discovery scan across all source types."""
|
|
if sources is None:
|
|
sources = ["database", "files"]
|
|
if directories is None:
|
|
directories = []
|
|
|
|
results = []
|
|
|
|
if "database" in sources and db_connection:
|
|
results.append(self.scan_database(db_connection, search_identifiers))
|
|
|
|
if "files" in sources and directories:
|
|
results.append(self.scan_files(directories, search_identifiers))
|
|
|
|
if "ner" in sources:
|
|
# Gather text files from file scan
|
|
text_files = []
|
|
for r in results:
|
|
text_files.extend(r.get("raw_text_matches", []))
|
|
if text_files:
|
|
results.append(self.scan_with_ner(text_files))
|
|
|
|
return self.consolidate_results(*results)
|
|
|
|
|
|
# ===========================================================================
|
|
# Data Mapper -- maps PII to Article 15 categories
|
|
# ===========================================================================
|
|
|
|
class DataMapper:
|
|
"""Maps discovered PII to GDPR Article 15 disclosure categories."""
|
|
|
|
def __init__(self, data_inventory_path: str | None = None):
|
|
self.inventory = {}
|
|
if data_inventory_path and Path(data_inventory_path).exists():
|
|
with open(data_inventory_path) as f:
|
|
self.inventory = json.load(f)
|
|
|
|
def map_to_article15(self, pii_records: dict,
|
|
data_subject_id: str) -> dict:
|
|
"""Map PII records to Article 15 required categories."""
|
|
categories = []
|
|
gdpr_categories_found = set()
|
|
|
|
for record in pii_records.get("records", []):
|
|
cat = record.get("gdpr_category") or record.get("entity_label", "unknown")
|
|
gdpr_categories_found.add(cat)
|
|
|
|
# Build category mappings
|
|
category_mapping = {
|
|
"contact_information": {
|
|
"name": "Contact Information",
|
|
"processing_purpose": "Account management, communication, service delivery",
|
|
"legal_basis": "Art. 6(1)(b) - Contract performance",
|
|
"retention_period": "Duration of account + 6 years post-closure",
|
|
"recipients": ["Internal customer service", "Email service provider"],
|
|
"data_types": ["Email address", "Phone number", "Postal address"],
|
|
},
|
|
"government_id": {
|
|
"name": "Government-Issued Identification",
|
|
"processing_purpose": "Identity verification, regulatory compliance (KYC/AML)",
|
|
"legal_basis": "Art. 6(1)(c) - Legal obligation",
|
|
"retention_period": "5 years after last verification event",
|
|
"recipients": ["Compliance team", "Identity verification provider"],
|
|
"data_types": ["National Insurance Number", "Passport number", "SSN"],
|
|
},
|
|
"financial_data": {
|
|
"name": "Financial Information",
|
|
"processing_purpose": "Payment processing, billing, fraud prevention",
|
|
"legal_basis": "Art. 6(1)(b) - Contract performance",
|
|
"retention_period": "7 years for tax compliance",
|
|
"recipients": ["Payment processor", "Finance department", "Tax authority"],
|
|
"data_types": ["Credit card number (tokenized)", "IBAN", "Transaction records"],
|
|
},
|
|
"online_identifier": {
|
|
"name": "Online Identifiers",
|
|
"processing_purpose": "Security monitoring, service analytics",
|
|
"legal_basis": "Art. 6(1)(f) - Legitimate interest (security)",
|
|
"retention_period": "90 days for logs, 2 years for analytics",
|
|
"recipients": ["IT security team", "Analytics platform"],
|
|
"data_types": ["IP address", "Cookie ID", "Device fingerprint"],
|
|
},
|
|
"demographic_data": {
|
|
"name": "Demographic Data",
|
|
"processing_purpose": "Service personalization, age verification",
|
|
"legal_basis": "Art. 6(1)(a) - Consent / Art. 6(1)(b) - Contract",
|
|
"retention_period": "Duration of account relationship",
|
|
"recipients": ["Marketing team (with consent)", "Analytics"],
|
|
"data_types": ["Date of birth", "Gender", "Language preference"],
|
|
},
|
|
"location_data": {
|
|
"name": "Location Data",
|
|
"processing_purpose": "Service delivery, address verification",
|
|
"legal_basis": "Art. 6(1)(b) - Contract performance",
|
|
"retention_period": "Duration of account + 2 years",
|
|
"recipients": ["Delivery partner", "Address verification service"],
|
|
"data_types": ["Postal code", "City", "Country"],
|
|
},
|
|
}
|
|
|
|
# Override with data inventory if available
|
|
if self.inventory:
|
|
for cat_key, inv_data in self.inventory.items():
|
|
if cat_key in category_mapping:
|
|
category_mapping[cat_key].update(inv_data)
|
|
|
|
for cat in gdpr_categories_found:
|
|
if cat in category_mapping:
|
|
mapping = category_mapping[cat]
|
|
categories.append(mapping)
|
|
else:
|
|
categories.append({
|
|
"name": cat.replace("_", " ").title(),
|
|
"processing_purpose": "See data processing register for details",
|
|
"legal_basis": "Determined per processing activity",
|
|
"retention_period": "Per retention schedule",
|
|
"recipients": ["See recipient register"],
|
|
"data_types": [cat],
|
|
})
|
|
|
|
# Add standard Article 15 supplementary information
|
|
supplementary = {
|
|
"data_subject_rights": {
|
|
"right_to_rectification": "Art. 16 - Right to rectification of inaccurate data",
|
|
"right_to_erasure": "Art. 17 - Right to erasure ('right to be forgotten')",
|
|
"right_to_restriction": "Art. 18 - Right to restriction of processing",
|
|
"right_to_data_portability": "Art. 20 - Right to data portability",
|
|
"right_to_object": "Art. 21 - Right to object to processing",
|
|
"right_to_complaint": "Right to lodge a complaint with the ICO (ico.org.uk) "
|
|
"or relevant supervisory authority",
|
|
},
|
|
"automated_decision_making": {
|
|
"exists": False,
|
|
"description": "No automated decision-making or profiling with legal/significant effect",
|
|
"note": "Update based on actual processing activities",
|
|
},
|
|
"international_transfers": {
|
|
"transfers_exist": False,
|
|
"safeguards": "Standard Contractual Clauses (SCCs) where applicable",
|
|
"countries": [],
|
|
},
|
|
}
|
|
|
|
return {
|
|
"data_subject": data_subject_id,
|
|
"categories": categories,
|
|
"supplementary_info": supplementary,
|
|
"article_15_reference": ARTICLE_15_CATEGORIES,
|
|
"mapped_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
# ===========================================================================
|
|
# Exemption Reviewer
|
|
# ===========================================================================
|
|
|
|
class ExemptionReviewer:
|
|
"""Reviews DSAR data against applicable GDPR/UK GDPR exemptions."""
|
|
|
|
def __init__(self):
|
|
self.exemption_types = EXEMPTION_TYPES
|
|
|
|
def review_exemptions(self, mapped_data: dict,
|
|
exemption_checks: list[str] | None = None) -> dict:
|
|
"""Review mapped data for applicable exemptions."""
|
|
if not exemption_checks:
|
|
exemption_checks = list(self.exemption_types.keys())
|
|
|
|
applicable_exemptions = []
|
|
|
|
for check in exemption_checks:
|
|
if check not in self.exemption_types:
|
|
continue
|
|
|
|
exemption_info = self.exemption_types[check]
|
|
# Each exemption requires manual DPO review; we flag candidates
|
|
applicable_exemptions.append({
|
|
"exemption_type": check,
|
|
"label": exemption_info["label"],
|
|
"legal_basis": exemption_info["legal_basis"],
|
|
"action": exemption_info["action"],
|
|
"status": "pending_review",
|
|
"dpo_review_required": True,
|
|
"notes": f"Flagged for DPO review: {exemption_info['description']}",
|
|
})
|
|
|
|
return {
|
|
"exemption_count": len(applicable_exemptions),
|
|
"exemptions": applicable_exemptions,
|
|
"review_status": "pending_dpo_approval",
|
|
"reviewed_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
def apply_redactions(self, mapped_data: dict,
|
|
approved_exemptions: list[dict]) -> dict:
|
|
"""Apply approved exemption redactions to mapped data."""
|
|
redacted = json.loads(json.dumps(mapped_data))
|
|
|
|
redaction_log = []
|
|
for exemption in approved_exemptions:
|
|
if exemption.get("status") != "approved":
|
|
continue
|
|
action = exemption.get("action", "redact")
|
|
redaction_log.append({
|
|
"exemption_type": exemption["exemption_type"],
|
|
"action_taken": action,
|
|
"legal_basis": exemption["legal_basis"],
|
|
"applied_at": datetime.utcnow().isoformat(),
|
|
})
|
|
|
|
redacted["redaction_log"] = redaction_log
|
|
redacted["redactions_applied"] = len(redaction_log)
|
|
return redacted
|
|
|
|
|
|
# ===========================================================================
|
|
# DSAR Response Generator
|
|
# ===========================================================================
|
|
|
|
class DSARResponseGenerator:
|
|
"""Generates compliant DSAR response packages per GDPR Article 15."""
|
|
|
|
COVER_LETTER_TEMPLATE = """
|
|
DATA SUBJECT ACCESS REQUEST RESPONSE
|
|
=====================================
|
|
|
|
Date: {response_date}
|
|
DSAR Reference: {dsar_id}
|
|
|
|
Dear {data_subject},
|
|
|
|
Thank you for your data subject access request received on {request_date}.
|
|
|
|
In accordance with Article 15 of the General Data Protection Regulation (GDPR),
|
|
we are writing to confirm that we do process your personal data. Please find
|
|
enclosed:
|
|
|
|
1. A copy of all personal data we hold about you
|
|
2. Supplementary information as required under Article 15(1)
|
|
|
|
SUPPLEMENTARY INFORMATION
|
|
--------------------------
|
|
|
|
Purposes of Processing:
|
|
{processing_purposes}
|
|
|
|
Categories of Personal Data:
|
|
{data_categories}
|
|
|
|
Recipients:
|
|
{recipients}
|
|
|
|
Retention Periods:
|
|
{retention_periods}
|
|
|
|
Data Source:
|
|
{data_source}
|
|
|
|
Your Rights:
|
|
You have the right to:
|
|
- Request rectification of inaccurate personal data (Art. 16)
|
|
- Request erasure of your personal data (Art. 17)
|
|
- Request restriction of processing (Art. 18)
|
|
- Receive your data in a portable format (Art. 20)
|
|
- Object to processing based on legitimate interest (Art. 21)
|
|
- Lodge a complaint with the Information Commissioner's Office (ico.org.uk)
|
|
|
|
Automated Decision-Making:
|
|
{automated_decisions}
|
|
|
|
International Transfers:
|
|
{international_transfers}
|
|
|
|
If you have any questions about this response, please contact our Data
|
|
Protection Officer at {dpo_email}.
|
|
|
|
Yours sincerely,
|
|
{controller_name}
|
|
Data Protection Officer
|
|
{organization_name}
|
|
"""
|
|
|
|
def __init__(self, template_dir: str | None = None,
|
|
organization_name: str = "Organization",
|
|
dpo_email: str = "dpo@organization.com",
|
|
controller_name: str = "Data Protection Officer"):
|
|
self.template_dir = template_dir
|
|
self.organization_name = organization_name
|
|
self.dpo_email = dpo_email
|
|
self.controller_name = controller_name
|
|
|
|
def generate_response(self, dsar_id: str, data_subject: str,
|
|
mapped_data: dict, format: str = "json",
|
|
request_date: str | None = None) -> dict:
|
|
"""Generate a complete DSAR response package."""
|
|
if not request_date:
|
|
request_date = datetime.utcnow().strftime("%Y-%m-%d")
|
|
|
|
documents = []
|
|
|
|
# 1. Cover letter with supplementary information
|
|
cover_letter = self._generate_cover_letter(
|
|
dsar_id, data_subject, mapped_data, request_date
|
|
)
|
|
documents.append({
|
|
"filename": f"DSAR_{dsar_id}_cover_letter.txt",
|
|
"type": "cover_letter",
|
|
"content": cover_letter,
|
|
})
|
|
|
|
# 2. Personal data export
|
|
data_export = self._generate_data_export(dsar_id, mapped_data, format)
|
|
ext = "json" if format == "json" else "csv"
|
|
documents.append({
|
|
"filename": f"DSAR_{dsar_id}_personal_data.{ext}",
|
|
"type": "data_export",
|
|
"content": data_export,
|
|
})
|
|
|
|
# 3. Supplementary information document
|
|
supp_doc = self._generate_supplementary_doc(dsar_id, mapped_data)
|
|
documents.append({
|
|
"filename": f"DSAR_{dsar_id}_supplementary_info.json",
|
|
"type": "supplementary_information",
|
|
"content": supp_doc,
|
|
})
|
|
|
|
# 4. Audit metadata
|
|
audit_meta = {
|
|
"dsar_id": dsar_id,
|
|
"data_subject": data_subject,
|
|
"response_generated_at": datetime.utcnow().isoformat(),
|
|
"documents_generated": len(documents),
|
|
"format": format,
|
|
"exemptions_applied": mapped_data.get("redactions_applied", 0),
|
|
}
|
|
documents.append({
|
|
"filename": f"DSAR_{dsar_id}_audit_metadata.json",
|
|
"type": "audit_metadata",
|
|
"content": json.dumps(audit_meta, indent=2),
|
|
})
|
|
|
|
return {
|
|
"dsar_id": dsar_id,
|
|
"documents": documents,
|
|
"generated_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
def _generate_cover_letter(self, dsar_id: str, data_subject: str,
|
|
mapped_data: dict, request_date: str) -> str:
|
|
"""Generate the DSAR cover letter."""
|
|
categories = mapped_data.get("categories", [])
|
|
supplementary = mapped_data.get("supplementary_info", {})
|
|
|
|
processing_purposes = "\n".join(
|
|
f" - {cat['name']}: {cat['processing_purpose']}"
|
|
for cat in categories
|
|
) or " No personal data processing identified."
|
|
|
|
data_categories_text = "\n".join(
|
|
f" - {cat['name']}: {', '.join(cat.get('data_types', []))}"
|
|
for cat in categories
|
|
) or " No categories identified."
|
|
|
|
recipients_text = "\n".join(
|
|
f" - {cat['name']}: {', '.join(cat.get('recipients', []))}"
|
|
for cat in categories
|
|
) or " No third-party recipients."
|
|
|
|
retention_text = "\n".join(
|
|
f" - {cat['name']}: {cat.get('retention_period', 'Per retention schedule')}"
|
|
for cat in categories
|
|
) or " Per organizational retention schedule."
|
|
|
|
auto_decisions = supplementary.get("automated_decision_making", {})
|
|
auto_text = auto_decisions.get(
|
|
"description",
|
|
"No automated decision-making or profiling applies."
|
|
)
|
|
|
|
transfers = supplementary.get("international_transfers", {})
|
|
transfer_text = (
|
|
f"Transfers to: {', '.join(transfers['countries'])}. "
|
|
f"Safeguards: {transfers.get('safeguards', 'N/A')}"
|
|
if transfers.get("transfers_exist")
|
|
else "No international transfers of your personal data."
|
|
)
|
|
|
|
return self.COVER_LETTER_TEMPLATE.format(
|
|
response_date=datetime.utcnow().strftime("%d %B %Y"),
|
|
dsar_id=dsar_id,
|
|
data_subject=data_subject,
|
|
request_date=request_date,
|
|
processing_purposes=processing_purposes,
|
|
data_categories=data_categories_text,
|
|
recipients=recipients_text,
|
|
retention_periods=retention_text,
|
|
data_source="Data collected directly from you unless otherwise stated.",
|
|
automated_decisions=auto_text,
|
|
international_transfers=transfer_text,
|
|
dpo_email=self.dpo_email,
|
|
controller_name=self.controller_name,
|
|
organization_name=self.organization_name,
|
|
)
|
|
|
|
def _generate_data_export(self, dsar_id: str, mapped_data: dict,
|
|
format: str) -> str:
|
|
"""Generate the personal data export in requested format."""
|
|
export_data = {
|
|
"dsar_reference": dsar_id,
|
|
"export_date": datetime.utcnow().isoformat(),
|
|
"categories": [],
|
|
}
|
|
|
|
for cat in mapped_data.get("categories", []):
|
|
export_data["categories"].append({
|
|
"category": cat["name"],
|
|
"data_types": cat.get("data_types", []),
|
|
"processing_purpose": cat["processing_purpose"],
|
|
"legal_basis": cat.get("legal_basis", ""),
|
|
})
|
|
|
|
if format == "csv":
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
writer.writerow([
|
|
"Category", "Data Types", "Processing Purpose", "Legal Basis",
|
|
])
|
|
for cat in export_data["categories"]:
|
|
writer.writerow([
|
|
cat["category"],
|
|
"; ".join(cat["data_types"]),
|
|
cat["processing_purpose"],
|
|
cat["legal_basis"],
|
|
])
|
|
return output.getvalue()
|
|
|
|
return json.dumps(export_data, indent=2)
|
|
|
|
def _generate_supplementary_doc(self, dsar_id: str,
|
|
mapped_data: dict) -> str:
|
|
"""Generate the Article 15 supplementary information document."""
|
|
doc = {
|
|
"dsar_reference": dsar_id,
|
|
"article_15_compliance": {},
|
|
}
|
|
|
|
for key, cat_info in ARTICLE_15_CATEGORIES.items():
|
|
doc["article_15_compliance"][key] = {
|
|
"article_reference": cat_info["article_ref"],
|
|
"label": cat_info["label"],
|
|
"description": cat_info["description"],
|
|
"provided": True,
|
|
}
|
|
|
|
doc["supplementary_info"] = mapped_data.get("supplementary_info", {})
|
|
doc["redaction_log"] = mapped_data.get("redaction_log", [])
|
|
|
|
return json.dumps(doc, indent=2)
|
|
|
|
def save_response_package(self, response: dict, output_dir: str) -> list[str]:
|
|
"""Save all response documents to disk."""
|
|
out_path = Path(output_dir)
|
|
out_path.mkdir(parents=True, exist_ok=True)
|
|
saved = []
|
|
for doc in response.get("documents", []):
|
|
file_path = out_path / doc["filename"]
|
|
file_path.write_text(doc["content"], encoding="utf-8")
|
|
saved.append(str(file_path))
|
|
return saved
|
|
|
|
|
|
# ===========================================================================
|
|
# DSAR Workflow Engine -- orchestrates the full lifecycle
|
|
# ===========================================================================
|
|
|
|
class DSARWorkflowEngine:
|
|
"""Manages the complete DSAR lifecycle: intake, tracking, and compliance."""
|
|
|
|
VALID_STATUSES = [
|
|
"received", "identity_verification", "verification_failed",
|
|
"in_progress", "pii_discovery", "exemption_review",
|
|
"dpo_review", "response_generation", "response_sent",
|
|
"closed", "refused",
|
|
]
|
|
|
|
def __init__(self, config_path: str | None = None):
|
|
self.config = {}
|
|
if config_path and Path(config_path).exists():
|
|
with open(config_path) as f:
|
|
self.config = json.load(f)
|
|
self.dsars: dict[str, dict] = {}
|
|
|
|
def register_dsar(self, requester_name: str, requester_email: str,
|
|
request_channel: str, request_text: str,
|
|
identity_docs: list[str] | None = None) -> dict:
|
|
"""Register a new DSAR and start the compliance clock."""
|
|
dsar_id = f"DSAR-{datetime.utcnow().strftime('%Y%m%d')}-{uuid.uuid4().hex[:8].upper()}"
|
|
received_at = datetime.utcnow()
|
|
deadline = received_at + timedelta(days=30)
|
|
|
|
identity_verified = bool(identity_docs and len(identity_docs) > 0)
|
|
|
|
dsar = {
|
|
"dsar_id": dsar_id,
|
|
"requester_name": requester_name,
|
|
"requester_email": requester_email,
|
|
"request_channel": request_channel,
|
|
"request_text": request_text,
|
|
"received_at": received_at.isoformat(),
|
|
"deadline": deadline.isoformat(),
|
|
"deadline_date": deadline.strftime("%Y-%m-%d"),
|
|
"identity_verified": identity_verified,
|
|
"identity_docs": identity_docs or [],
|
|
"status": "received" if identity_verified else "identity_verification",
|
|
"status_history": [
|
|
{
|
|
"status": "received",
|
|
"timestamp": received_at.isoformat(),
|
|
"notes": f"Request received via {request_channel}",
|
|
}
|
|
],
|
|
"clock_paused": False,
|
|
"extension_applied": False,
|
|
}
|
|
|
|
self.dsars[dsar_id] = dsar
|
|
return dsar
|
|
|
|
def update_status(self, dsar_id: str, new_status: str,
|
|
notes: str = "") -> dict:
|
|
"""Update DSAR processing status."""
|
|
if dsar_id not in self.dsars:
|
|
raise ValueError(f"DSAR not found: {dsar_id}")
|
|
if new_status not in self.VALID_STATUSES:
|
|
raise ValueError(f"Invalid status: {new_status}")
|
|
|
|
dsar = self.dsars[dsar_id]
|
|
dsar["status"] = new_status
|
|
dsar["status_history"].append({
|
|
"status": new_status,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"notes": notes,
|
|
})
|
|
return dsar
|
|
|
|
def apply_extension(self, dsar_id: str, reason: str) -> dict:
|
|
"""Apply a 2-month extension for complex requests (Art. 12(3))."""
|
|
if dsar_id not in self.dsars:
|
|
raise ValueError(f"DSAR not found: {dsar_id}")
|
|
|
|
dsar = self.dsars[dsar_id]
|
|
if dsar["extension_applied"]:
|
|
raise ValueError("Extension already applied to this DSAR")
|
|
|
|
original_deadline = datetime.fromisoformat(dsar["deadline"])
|
|
new_deadline = original_deadline + timedelta(days=60)
|
|
|
|
dsar["deadline"] = new_deadline.isoformat()
|
|
dsar["deadline_date"] = new_deadline.strftime("%Y-%m-%d")
|
|
dsar["extension_applied"] = True
|
|
dsar["extension_reason"] = reason
|
|
dsar["status_history"].append({
|
|
"status": "extension_applied",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"notes": f"2-month extension: {reason}",
|
|
})
|
|
return dsar
|
|
|
|
def pause_clock(self, dsar_id: str, reason: str) -> dict:
|
|
"""Pause the response clock (e.g., awaiting identity verification)."""
|
|
if dsar_id not in self.dsars:
|
|
raise ValueError(f"DSAR not found: {dsar_id}")
|
|
|
|
dsar = self.dsars[dsar_id]
|
|
dsar["clock_paused"] = True
|
|
dsar["clock_paused_at"] = datetime.utcnow().isoformat()
|
|
dsar["clock_pause_reason"] = reason
|
|
dsar["status_history"].append({
|
|
"status": "clock_paused",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"notes": f"Clock paused: {reason}",
|
|
})
|
|
return dsar
|
|
|
|
def days_remaining(self, dsar_id: str) -> int:
|
|
"""Calculate remaining days until DSAR deadline."""
|
|
if dsar_id not in self.dsars:
|
|
raise ValueError(f"DSAR not found: {dsar_id}")
|
|
|
|
dsar = self.dsars[dsar_id]
|
|
deadline = datetime.fromisoformat(dsar["deadline"])
|
|
remaining = (deadline - datetime.utcnow()).days
|
|
return max(0, remaining)
|
|
|
|
def get_overdue_dsars(self) -> list[dict]:
|
|
"""Get all DSARs that are past their deadline."""
|
|
overdue = []
|
|
now = datetime.utcnow()
|
|
for dsar in self.dsars.values():
|
|
if dsar["status"] in ("closed", "refused", "response_sent"):
|
|
continue
|
|
deadline = datetime.fromisoformat(dsar["deadline"])
|
|
if now > deadline:
|
|
overdue.append({
|
|
"dsar_id": dsar["dsar_id"],
|
|
"requester": dsar["requester_name"],
|
|
"deadline": dsar["deadline_date"],
|
|
"days_overdue": (now - deadline).days,
|
|
"status": dsar["status"],
|
|
})
|
|
return overdue
|
|
|
|
def generate_dashboard(self) -> dict:
|
|
"""Generate a DSAR processing dashboard summary."""
|
|
total = len(self.dsars)
|
|
statuses = {}
|
|
for dsar in self.dsars.values():
|
|
status = dsar["status"]
|
|
statuses[status] = statuses.get(status, 0) + 1
|
|
|
|
overdue = self.get_overdue_dsars()
|
|
|
|
return {
|
|
"total_dsars": total,
|
|
"status_breakdown": statuses,
|
|
"overdue_count": len(overdue),
|
|
"overdue_dsars": overdue,
|
|
"generated_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
# ===========================================================================
|
|
# DSAR Audit Logger
|
|
# ===========================================================================
|
|
|
|
class DSARAuditLogger:
|
|
"""Maintains audit trails for DSAR processing lifecycle."""
|
|
|
|
def __init__(self, log_path: str = "dsar_audit_logs"):
|
|
self.log_path = Path(log_path)
|
|
self.log_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
def log_event(self, dsar_id: str, event_type: str,
|
|
details: dict | None = None) -> dict:
|
|
"""Log a DSAR processing event."""
|
|
event = {
|
|
"dsar_id": dsar_id,
|
|
"event_type": event_type,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"details": details or {},
|
|
"event_id": uuid.uuid4().hex[:12],
|
|
}
|
|
|
|
log_file = self.log_path / f"{dsar_id}.jsonl"
|
|
with open(log_file, "a") as f:
|
|
f.write(json.dumps(event) + "\n")
|
|
|
|
return event
|
|
|
|
def get_audit_trail(self, dsar_id: str) -> list[dict]:
|
|
"""Retrieve the complete audit trail for a DSAR."""
|
|
log_file = self.log_path / f"{dsar_id}.jsonl"
|
|
if not log_file.exists():
|
|
return []
|
|
events = []
|
|
with open(log_file) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
events.append(json.loads(line))
|
|
return events
|
|
|
|
def generate_compliance_report(self, dsar_id: str) -> dict:
|
|
"""Generate a compliance report for a DSAR showing all processing steps."""
|
|
events = self.get_audit_trail(dsar_id)
|
|
|
|
report = {
|
|
"dsar_id": dsar_id,
|
|
"report_generated_at": datetime.utcnow().isoformat(),
|
|
"total_events": len(events),
|
|
"event_types": list({e["event_type"] for e in events}),
|
|
"timeline": [],
|
|
"compliance_checks": {
|
|
"request_acknowledged": False,
|
|
"identity_verified": False,
|
|
"pii_discovery_complete": False,
|
|
"exemption_review_complete": False,
|
|
"response_generated": False,
|
|
"response_sent": False,
|
|
"within_deadline": False,
|
|
},
|
|
}
|
|
|
|
for event in events:
|
|
report["timeline"].append({
|
|
"timestamp": event["timestamp"],
|
|
"event": event["event_type"],
|
|
"details": event.get("details", {}),
|
|
})
|
|
|
|
etype = event["event_type"]
|
|
if etype == "request_received":
|
|
report["compliance_checks"]["request_acknowledged"] = True
|
|
elif etype == "identity_verified":
|
|
report["compliance_checks"]["identity_verified"] = True
|
|
elif etype == "pii_discovery_complete":
|
|
report["compliance_checks"]["pii_discovery_complete"] = True
|
|
elif etype == "exemption_review_complete":
|
|
report["compliance_checks"]["exemption_review_complete"] = True
|
|
elif etype == "response_generated":
|
|
report["compliance_checks"]["response_generated"] = True
|
|
elif etype == "response_sent":
|
|
report["compliance_checks"]["response_sent"] = True
|
|
report["compliance_checks"]["within_deadline"] = True
|
|
|
|
all_passed = all(report["compliance_checks"].values())
|
|
report["overall_compliance"] = "COMPLIANT" if all_passed else "REVIEW_REQUIRED"
|
|
|
|
return report
|
|
|
|
|
|
# ===========================================================================
|
|
# Utility functions
|
|
# ===========================================================================
|
|
|
|
def _redact_connection_string(conn_str: str) -> str:
|
|
"""Redact passwords from connection strings for logging."""
|
|
return re.sub(r"://([^:]+):([^@]+)@", r"://\1:****@", conn_str)
|
|
|
|
|
|
def _map_pii_type_to_ner(pii_type: str) -> str:
|
|
"""Map PII regex type names to NER entity labels."""
|
|
mapping = {
|
|
"email": "EMAIL",
|
|
"phone_international": "PHONE_NUMBER",
|
|
"uk_phone": "PHONE_NUMBER",
|
|
"ssn_us": "GOVERNMENT_ID",
|
|
"nino_uk": "GOVERNMENT_ID",
|
|
"credit_card": "FINANCIAL",
|
|
"iban": "FINANCIAL",
|
|
"ipv4": "ONLINE_ID",
|
|
"date_of_birth": "DATE_OF_BIRTH",
|
|
"uk_postcode": "LOCATION",
|
|
"passport_uk": "GOVERNMENT_ID",
|
|
"eu_vat": "FINANCIAL",
|
|
}
|
|
return mapping.get(pii_type, "UNKNOWN")
|
|
|
|
|
|
# ===========================================================================
|
|
# CLI Entry Point
|
|
# ===========================================================================
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="GDPR DSAR Workflow Automation Agent"
|
|
)
|
|
parser.add_argument(
|
|
"--action",
|
|
choices=[
|
|
"register", "scan_pii", "scan_files", "map_data",
|
|
"generate_response", "full_pipeline", "dashboard",
|
|
],
|
|
default="full_pipeline",
|
|
help="Action to perform",
|
|
)
|
|
parser.add_argument("--requester-name", default="Test Subject")
|
|
parser.add_argument("--requester-email", default="test@example.com")
|
|
parser.add_argument("--request-channel", default="email")
|
|
parser.add_argument("--scan-dirs", nargs="*", default=[])
|
|
parser.add_argument("--db-connection", default="")
|
|
parser.add_argument("--output-dir", default="dsar_output")
|
|
parser.add_argument("--config", default="dsar_config.json")
|
|
parser.add_argument("--format", choices=["json", "csv"], default="json")
|
|
parser.add_argument("--min-confidence", type=float, default=0.5)
|
|
parser.add_argument(
|
|
"--scan-text",
|
|
help="Direct text to scan for PII",
|
|
default="",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 60)
|
|
print("GDPR DSAR Workflow Automation Agent")
|
|
print("=" * 60)
|
|
|
|
if args.action == "scan_pii" and args.scan_text:
|
|
matcher = PIIPatternMatcher()
|
|
matches = matcher.scan_text(args.scan_text, args.min_confidence)
|
|
print(f"\n[+] PII Scan Results ({len(matches)} matches):")
|
|
for m in matches:
|
|
print(f" [{m['type']}] '{m['value']}' "
|
|
f"(confidence: {m['confidence']}, category: {m['gdpr_category']})")
|
|
return
|
|
|
|
if args.action == "scan_files" and args.scan_dirs:
|
|
pii = PIIDiscoveryEngine()
|
|
results = pii.scan_files(
|
|
args.scan_dirs,
|
|
{"email": args.requester_email, "name": args.requester_name},
|
|
)
|
|
print(f"\n[+] File Scan: {results['files_scanned']} files scanned, "
|
|
f"{results['files_with_matches']} with matches")
|
|
output_file = Path(args.output_dir) / "file_scan_results.json"
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
output_file.write_text(json.dumps(results, indent=2))
|
|
print(f"[+] Results saved to {output_file}")
|
|
return
|
|
|
|
# Full pipeline
|
|
engine = DSARWorkflowEngine(config_path=args.config)
|
|
pii_engine = PIIDiscoveryEngine()
|
|
mapper = DataMapper()
|
|
reviewer = ExemptionReviewer()
|
|
generator = DSARResponseGenerator(
|
|
organization_name=engine.config.get("organization_name", "Organization"),
|
|
dpo_email=engine.config.get("dpo_email", "dpo@organization.com"),
|
|
)
|
|
audit_logger = DSARAuditLogger(log_path=f"{args.output_dir}/audit_logs")
|
|
|
|
# Step 1: Register DSAR
|
|
print("\n[Step 1] Registering DSAR...")
|
|
request = engine.register_dsar(
|
|
requester_name=args.requester_name,
|
|
requester_email=args.requester_email,
|
|
request_channel=args.request_channel,
|
|
request_text="Request for all personal data under GDPR Article 15.",
|
|
identity_docs=["email_verified"],
|
|
)
|
|
print(f" DSAR ID: {request['dsar_id']}")
|
|
print(f" Deadline: {request['deadline_date']}")
|
|
print(f" Status: {request['status']}")
|
|
|
|
audit_logger.log_event(request["dsar_id"], "request_received", {
|
|
"channel": args.request_channel,
|
|
"requester": args.requester_name,
|
|
})
|
|
|
|
# Step 2: PII Discovery
|
|
print("\n[Step 2] Running PII Discovery...")
|
|
engine.update_status(request["dsar_id"], "pii_discovery")
|
|
|
|
search_ids = {"email": args.requester_email, "name": args.requester_name}
|
|
all_results = []
|
|
|
|
if args.db_connection:
|
|
db_results = pii_engine.scan_database(args.db_connection, search_ids)
|
|
all_results.append(db_results)
|
|
print(f" Database: {db_results['queries_generated']} queries generated")
|
|
|
|
if args.scan_dirs:
|
|
file_results = pii_engine.scan_files(args.scan_dirs, search_ids)
|
|
all_results.append(file_results)
|
|
print(f" Files: {file_results['files_scanned']} scanned, "
|
|
f"{file_results['files_with_matches']} matches")
|
|
|
|
consolidated = pii_engine.consolidate_results(*all_results)
|
|
print(f" Total PII records: {consolidated['total_records']}")
|
|
|
|
audit_logger.log_event(request["dsar_id"], "pii_discovery_complete", {
|
|
"records_found": consolidated["total_records"],
|
|
"sources": consolidated["sources"],
|
|
})
|
|
|
|
# Step 3: Data Mapping
|
|
print("\n[Step 3] Mapping to Article 15 categories...")
|
|
mapped = mapper.map_to_article15(consolidated, args.requester_email)
|
|
print(f" Categories mapped: {len(mapped['categories'])}")
|
|
|
|
# Step 4: Exemption Review
|
|
print("\n[Step 4] Reviewing exemptions...")
|
|
engine.update_status(request["dsar_id"], "exemption_review")
|
|
review = reviewer.review_exemptions(mapped)
|
|
redacted = reviewer.apply_redactions(mapped, review["exemptions"])
|
|
print(f" Exemptions flagged for DPO review: {review['exemption_count']}")
|
|
|
|
audit_logger.log_event(request["dsar_id"], "exemption_review_complete", {
|
|
"exemptions_flagged": review["exemption_count"],
|
|
})
|
|
|
|
# Step 5: Response Generation
|
|
print("\n[Step 5] Generating response package...")
|
|
engine.update_status(request["dsar_id"], "response_generation")
|
|
response = generator.generate_response(
|
|
dsar_id=request["dsar_id"],
|
|
data_subject=args.requester_name,
|
|
mapped_data=redacted,
|
|
format=args.format,
|
|
request_date=datetime.utcnow().strftime("%Y-%m-%d"),
|
|
)
|
|
saved_files = generator.save_response_package(response, args.output_dir)
|
|
for f in saved_files:
|
|
print(f" Saved: {f}")
|
|
|
|
audit_logger.log_event(request["dsar_id"], "response_generated", {
|
|
"documents": len(response["documents"]),
|
|
"format": args.format,
|
|
})
|
|
|
|
# Step 6: Mark complete
|
|
engine.update_status(request["dsar_id"], "response_sent",
|
|
"Response package generated and ready for delivery")
|
|
audit_logger.log_event(request["dsar_id"], "response_sent", {
|
|
"delivery_method": "manual",
|
|
})
|
|
|
|
# Compliance report
|
|
print("\n[Step 6] Generating compliance report...")
|
|
compliance = audit_logger.generate_compliance_report(request["dsar_id"])
|
|
compliance_file = Path(args.output_dir) / f"compliance_report_{request['dsar_id']}.json"
|
|
compliance_file.write_text(json.dumps(compliance, indent=2))
|
|
print(f" Compliance status: {compliance['overall_compliance']}")
|
|
print(f" Report saved: {compliance_file}")
|
|
|
|
# Dashboard
|
|
print("\n" + "=" * 60)
|
|
dashboard = engine.generate_dashboard()
|
|
print(f"Dashboard: {dashboard['total_dsars']} DSARs, "
|
|
f"{dashboard['overdue_count']} overdue")
|
|
print(f"Days remaining: {engine.days_remaining(request['dsar_id'])}")
|
|
print("=" * 60)
|
|
print("\n[+] DSAR processing complete.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|