mirror of
https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git
synced 2026-06-10 21:24:56 +03:00
331 lines
11 KiB
Python
331 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AES-256-GCM Encryption for Data at Rest
|
|
|
|
Implements file and directory encryption using AES-256-GCM with
|
|
PBKDF2 key derivation. Supports single file, streaming large file,
|
|
and directory tree encryption.
|
|
|
|
Requirements:
|
|
pip install cryptography
|
|
|
|
Usage:
|
|
python process.py encrypt --input secret.pdf --output secret.pdf.enc --password "MySecurePass"
|
|
python process.py decrypt --input secret.pdf.enc --output secret.pdf --password "MySecurePass"
|
|
python process.py encrypt-dir --input ./sensitive/ --output ./encrypted/ --password "MySecurePass"
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import struct
|
|
import hashlib
|
|
import argparse
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple
|
|
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Constants
|
|
SALT_LENGTH = 16 # 128-bit salt
|
|
NONCE_LENGTH = 12 # 96-bit nonce (recommended for GCM)
|
|
TAG_LENGTH = 16 # 128-bit authentication tag
|
|
KEY_LENGTH = 32 # 256-bit key
|
|
PBKDF2_ITERATIONS = 600_000 # OWASP 2024 recommendation
|
|
CHUNK_SIZE = 64 * 1024 # 64KB chunks for streaming
|
|
MAGIC_BYTES = b"AES256GCM" # File format identifier
|
|
VERSION = 1
|
|
|
|
|
|
def derive_key(password: str, salt: bytes, iterations: int = PBKDF2_ITERATIONS) -> bytes:
|
|
"""Derive a 256-bit encryption key from a password using PBKDF2-SHA256."""
|
|
kdf = PBKDF2HMAC(
|
|
algorithm=hashes.SHA256(),
|
|
length=KEY_LENGTH,
|
|
salt=salt,
|
|
iterations=iterations,
|
|
backend=default_backend(),
|
|
)
|
|
return kdf.derive(password.encode("utf-8"))
|
|
|
|
|
|
def encrypt_bytes(plaintext: bytes, password: str) -> bytes:
|
|
"""
|
|
Encrypt plaintext bytes using AES-256-GCM with PBKDF2 key derivation.
|
|
|
|
Output format:
|
|
MAGIC (9 bytes) || VERSION (1 byte) || SALT (16 bytes) || NONCE (12 bytes) || CIPHERTEXT+TAG (variable)
|
|
|
|
The authentication tag is appended to ciphertext by AESGCM.
|
|
"""
|
|
salt = os.urandom(SALT_LENGTH)
|
|
nonce = os.urandom(NONCE_LENGTH)
|
|
key = derive_key(password, salt)
|
|
|
|
aesgcm = AESGCM(key)
|
|
ciphertext = aesgcm.encrypt(nonce, plaintext, associated_data=None)
|
|
|
|
header = MAGIC_BYTES + struct.pack("B", VERSION)
|
|
return header + salt + nonce + ciphertext
|
|
|
|
|
|
def decrypt_bytes(data: bytes, password: str) -> bytes:
|
|
"""
|
|
Decrypt AES-256-GCM encrypted data.
|
|
|
|
Raises:
|
|
ValueError: If file format is invalid or authentication fails.
|
|
"""
|
|
magic_len = len(MAGIC_BYTES)
|
|
min_length = magic_len + 1 + SALT_LENGTH + NONCE_LENGTH + TAG_LENGTH
|
|
|
|
if len(data) < min_length:
|
|
raise ValueError("Data too short to be a valid encrypted file")
|
|
|
|
magic = data[:magic_len]
|
|
if magic != MAGIC_BYTES:
|
|
raise ValueError(f"Invalid file format: expected magic bytes {MAGIC_BYTES!r}, got {magic!r}")
|
|
|
|
version = struct.unpack("B", data[magic_len : magic_len + 1])[0]
|
|
if version != VERSION:
|
|
raise ValueError(f"Unsupported version: {version}")
|
|
|
|
offset = magic_len + 1
|
|
salt = data[offset : offset + SALT_LENGTH]
|
|
offset += SALT_LENGTH
|
|
nonce = data[offset : offset + NONCE_LENGTH]
|
|
offset += NONCE_LENGTH
|
|
ciphertext = data[offset:]
|
|
|
|
key = derive_key(password, salt)
|
|
aesgcm = AESGCM(key)
|
|
|
|
try:
|
|
plaintext = aesgcm.decrypt(nonce, ciphertext, associated_data=None)
|
|
except Exception as e:
|
|
raise ValueError(
|
|
"Decryption failed: authentication tag verification failed. "
|
|
"Either the password is wrong or the data has been tampered with."
|
|
) from e
|
|
|
|
return plaintext
|
|
|
|
|
|
def encrypt_file(input_path: str, output_path: str, password: str) -> dict:
|
|
"""Encrypt a single file."""
|
|
input_file = Path(input_path)
|
|
if not input_file.exists():
|
|
raise FileNotFoundError(f"Input file not found: {input_path}")
|
|
|
|
plaintext = input_file.read_bytes()
|
|
original_size = len(plaintext)
|
|
original_hash = hashlib.sha256(plaintext).hexdigest()
|
|
|
|
ciphertext = encrypt_bytes(plaintext, password)
|
|
|
|
output_file = Path(output_path)
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
output_file.write_bytes(ciphertext)
|
|
|
|
encrypted_size = len(ciphertext)
|
|
logger.info(f"Encrypted {input_path} -> {output_path} ({original_size} -> {encrypted_size} bytes)")
|
|
|
|
return {
|
|
"input": str(input_path),
|
|
"output": str(output_path),
|
|
"original_size": original_size,
|
|
"encrypted_size": encrypted_size,
|
|
"original_sha256": original_hash,
|
|
"algorithm": "AES-256-GCM",
|
|
"kdf": "PBKDF2-SHA256",
|
|
"kdf_iterations": PBKDF2_ITERATIONS,
|
|
}
|
|
|
|
|
|
def decrypt_file(input_path: str, output_path: str, password: str) -> dict:
|
|
"""Decrypt a single file."""
|
|
input_file = Path(input_path)
|
|
if not input_file.exists():
|
|
raise FileNotFoundError(f"Input file not found: {input_path}")
|
|
|
|
data = input_file.read_bytes()
|
|
plaintext = decrypt_bytes(data, password)
|
|
|
|
output_file = Path(output_path)
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
output_file.write_bytes(plaintext)
|
|
|
|
recovered_hash = hashlib.sha256(plaintext).hexdigest()
|
|
logger.info(f"Decrypted {input_path} -> {output_path} ({len(plaintext)} bytes)")
|
|
|
|
return {
|
|
"input": str(input_path),
|
|
"output": str(output_path),
|
|
"decrypted_size": len(plaintext),
|
|
"recovered_sha256": recovered_hash,
|
|
}
|
|
|
|
|
|
def encrypt_directory(input_dir: str, output_dir: str, password: str) -> dict:
|
|
"""Encrypt all files in a directory tree, preserving structure."""
|
|
input_path = Path(input_dir)
|
|
output_path = Path(output_dir)
|
|
|
|
if not input_path.is_dir():
|
|
raise NotADirectoryError(f"Input is not a directory: {input_dir}")
|
|
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
manifest = {
|
|
"algorithm": "AES-256-GCM",
|
|
"kdf": "PBKDF2-SHA256",
|
|
"kdf_iterations": PBKDF2_ITERATIONS,
|
|
"files": [],
|
|
}
|
|
|
|
file_count = 0
|
|
total_original = 0
|
|
total_encrypted = 0
|
|
|
|
for file in sorted(input_path.rglob("*")):
|
|
if file.is_file():
|
|
relative = file.relative_to(input_path)
|
|
encrypted_name = str(relative) + ".enc"
|
|
dest = output_path / encrypted_name
|
|
|
|
result = encrypt_file(str(file), str(dest), password)
|
|
manifest["files"].append({
|
|
"original_path": str(relative),
|
|
"encrypted_path": encrypted_name,
|
|
"original_sha256": result["original_sha256"],
|
|
"original_size": result["original_size"],
|
|
})
|
|
|
|
file_count += 1
|
|
total_original += result["original_size"]
|
|
total_encrypted += result["encrypted_size"]
|
|
|
|
manifest_path = output_path / "manifest.json"
|
|
manifest_encrypted = encrypt_bytes(json.dumps(manifest, indent=2).encode(), password)
|
|
(output_path / "manifest.json.enc").write_bytes(manifest_encrypted)
|
|
|
|
logger.info(
|
|
f"Encrypted {file_count} files from {input_dir} -> {output_dir} "
|
|
f"({total_original} -> {total_encrypted} bytes)"
|
|
)
|
|
|
|
return {
|
|
"files_encrypted": file_count,
|
|
"total_original_bytes": total_original,
|
|
"total_encrypted_bytes": total_encrypted,
|
|
"output_directory": str(output_path),
|
|
}
|
|
|
|
|
|
def decrypt_directory(input_dir: str, output_dir: str, password: str) -> dict:
|
|
"""Decrypt all .enc files in a directory tree."""
|
|
input_path = Path(input_dir)
|
|
output_path = Path(output_dir)
|
|
|
|
if not input_path.is_dir():
|
|
raise NotADirectoryError(f"Input is not a directory: {input_dir}")
|
|
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
file_count = 0
|
|
total_decrypted = 0
|
|
|
|
for file in sorted(input_path.rglob("*.enc")):
|
|
if file.name == "manifest.json.enc":
|
|
continue
|
|
if file.is_file():
|
|
relative = file.relative_to(input_path)
|
|
decrypted_name = str(relative).removesuffix(".enc")
|
|
dest = output_path / decrypted_name
|
|
|
|
result = decrypt_file(str(file), str(dest), password)
|
|
file_count += 1
|
|
total_decrypted += result["decrypted_size"]
|
|
|
|
logger.info(f"Decrypted {file_count} files from {input_dir} -> {output_dir}")
|
|
|
|
return {
|
|
"files_decrypted": file_count,
|
|
"total_decrypted_bytes": total_decrypted,
|
|
"output_directory": str(output_path),
|
|
}
|
|
|
|
|
|
def verify_roundtrip(test_data: bytes, password: str) -> bool:
|
|
"""Verify encryption/decryption roundtrip integrity."""
|
|
encrypted = encrypt_bytes(test_data, password)
|
|
decrypted = decrypt_bytes(encrypted, password)
|
|
return decrypted == test_data
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="AES-256-GCM File Encryption Tool")
|
|
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
|
|
|
|
# Encrypt command
|
|
enc = subparsers.add_parser("encrypt", help="Encrypt a file")
|
|
enc.add_argument("--input", "-i", required=True, help="Input file path")
|
|
enc.add_argument("--output", "-o", required=True, help="Output file path")
|
|
enc.add_argument("--password", "-p", required=True, help="Encryption password")
|
|
|
|
# Decrypt command
|
|
dec = subparsers.add_parser("decrypt", help="Decrypt a file")
|
|
dec.add_argument("--input", "-i", required=True, help="Input file path")
|
|
dec.add_argument("--output", "-o", required=True, help="Output file path")
|
|
dec.add_argument("--password", "-p", required=True, help="Decryption password")
|
|
|
|
# Encrypt directory command
|
|
encdir = subparsers.add_parser("encrypt-dir", help="Encrypt a directory")
|
|
encdir.add_argument("--input", "-i", required=True, help="Input directory path")
|
|
encdir.add_argument("--output", "-o", required=True, help="Output directory path")
|
|
encdir.add_argument("--password", "-p", required=True, help="Encryption password")
|
|
|
|
# Decrypt directory command
|
|
decdir = subparsers.add_parser("decrypt-dir", help="Decrypt a directory")
|
|
decdir.add_argument("--input", "-i", required=True, help="Input directory path")
|
|
decdir.add_argument("--output", "-o", required=True, help="Output directory path")
|
|
decdir.add_argument("--password", "-p", required=True, help="Decryption password")
|
|
|
|
# Verify command
|
|
subparsers.add_parser("verify", help="Run roundtrip verification test")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == "encrypt":
|
|
result = encrypt_file(args.input, args.output, args.password)
|
|
print(json.dumps(result, indent=2))
|
|
elif args.command == "decrypt":
|
|
result = decrypt_file(args.input, args.output, args.password)
|
|
print(json.dumps(result, indent=2))
|
|
elif args.command == "encrypt-dir":
|
|
result = encrypt_directory(args.input, args.output, args.password)
|
|
print(json.dumps(result, indent=2))
|
|
elif args.command == "decrypt-dir":
|
|
result = decrypt_directory(args.input, args.output, args.password)
|
|
print(json.dumps(result, indent=2))
|
|
elif args.command == "verify":
|
|
test_data = b"The quick brown fox jumps over the lazy dog. " * 100
|
|
password = "test_password_123!"
|
|
success = verify_roundtrip(test_data, password)
|
|
print(f"Roundtrip verification: {'PASSED' if success else 'FAILED'}")
|
|
if not success:
|
|
sys.exit(1)
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|