From 34676fc03faf8dcd36fc64fcbb68438d99e79918 Mon Sep 17 00:00:00 2001 From: "William (Lindy) Lindstrom" Date: Mon, 30 Mar 2026 13:31:43 -0700 Subject: [PATCH 01/14] add master envelope encryption key TOMTOOKIT_FIELD_ENCRYPTION_KEY is used as the master encryption key to encrypt/decrypt each User's encrypted_data_encryption_key, which is saved in the Profile model. --- tom_base/settings.py | 25 +++++++++++++++------ tom_setup/management/commands/tom_setup.py | 8 +++++++ tom_setup/templates/tom_setup/settings.tmpl | 7 ++++++ 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/tom_base/settings.py b/tom_base/settings.py index 487fdda77..cfcecf8cb 100644 --- a/tom_base/settings.py +++ b/tom_base/settings.py @@ -1,13 +1,17 @@ -""" -Django settings for tom_base project. +"""Django settings for the tom_base repository itself. + +THIS IS NOT YOUR TOM's `settings.py`. -Generated by 'django-admin startproject' using Django 2.0.6. +This file is used when running commands directly from the tom_base repo — +for example, ``python manage.py test`` within the tom_base repo. It is NOT +the settings file that individual TOM projects use. -For more information on this file, see -https://docs.djangoproject.com/en/2.0/topics/settings/ +Each TOM project gets its own standalone ``settings.py``, generated by +``tom_setup`` from the template ``tom_setup/templates/tom_setup/settings.tmpl``. +That project-level settings file is what a TOM runs in production. -For the full list of settings and their values, see -https://docs.djangoproject.com/en/2.0/ref/settings/ +This file exists only so that the tom_base repo has a working +Django configuration for development, testing, and CI. """ import logging.config import os @@ -27,6 +31,13 @@ # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True +# Encryption key for protecting sensitive user data (API keys, credentials) at rest. +# This is a Fernet key — a 44-character URL-safe base64 string encoding 32 random bytes. +# Treat this like SECRET_KEY. See the TOM Toolkit encryption documentation. +TOMTOOLKIT_FIELD_ENCRYPTION_KEY = os.getenv( + 'TOMTOOLKIT_FIELD_ENCRYPTION_KEY', + 'UlUYyKsGzQVwjpTbvhtgCihKaj07H1voc-V4pmb7NN4=') # 44-char URL-safe base64 string + ALLOWED_HOSTS = [''] # Application definition diff --git a/tom_setup/management/commands/tom_setup.py b/tom_setup/management/commands/tom_setup.py index afec74fd3..23b8a5317 100644 --- a/tom_setup/management/commands/tom_setup.py +++ b/tom_setup/management/commands/tom_setup.py @@ -7,6 +7,8 @@ from django.core.management.utils import get_random_secret_key from django.utils import timezone +from cryptography.fernet import Fernet + BASE_DIR = settings.BASE_DIR @@ -212,6 +214,11 @@ def generate_secret_key(self): self.context['SECRET_KEY'] = get_random_secret_key() self.ok() + def generate_field_encryption_key(self): + self.status('Generating field encryption key... ') + self.context['TOMTOOLKIT_FIELD_ENCRYPTION_KEY'] = Fernet.generate_key().decode() + self.ok() + def generate_config(self): self.status('Generating settings.py... ') template = get_template('tom_setup/settings.tmpl') @@ -285,6 +292,7 @@ def handle(self, *args, **options): self.create_custom_code_app() self.create_project_dirs() self.generate_secret_key() + self.generate_field_encryption_key() self.get_target_type() self.get_hint_preference() self.generate_config() diff --git a/tom_setup/templates/tom_setup/settings.tmpl b/tom_setup/templates/tom_setup/settings.tmpl index 0ba2b73b6..2ff4a5aec 100644 --- a/tom_setup/templates/tom_setup/settings.tmpl +++ b/tom_setup/templates/tom_setup/settings.tmpl @@ -25,6 +25,13 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = '{{ SECRET_KEY }}' +# Encryption key for protecting sensitive user data (API keys, credentials) at rest. +# This is a Fernet key — a 44-character URL-safe base64 string encoding 32 random bytes. +# Treat this like SECRET_KEY. See the TOM Toolkit encryption documentation. +TOMTOOLKIT_FIELD_ENCRYPTION_KEY = os.getenv( + 'TOMTOOLKIT_FIELD_ENCRYPTION_KEY', + '{{ TOMTOOLKIT_FIELD_ENCRYPTION_KEY }}') # 44-char URL-safe base64 string + # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True From 8f7a55bfb7ed8d441483a36797549cf387c08fea Mon Sep 17 00:00:00 2001 From: "William (Lindy) Lindstrom" Date: Mon, 30 Mar 2026 14:47:47 -0700 Subject: [PATCH 02/14] add encrypted_dek field to user's Profile This is the user-specific, envelope-encrypted (by the master cipher), Data Encryption Field (DEK). It is created with the user's Profile and encrypted with the master cipher (created with the master TOMTOOLKIT_FIELD_ENCRYPTION_KEY). If that master key changes, then each user's Profile.encrypted_dek must be re-encrypted, but that user's encrypted-data itself doesn't have to change. --- .../migrations/0003_profile_encrypted_dek.py | 18 +++++++++ tom_common/models.py | 38 ++++++++++++++++++- tom_common/signals.py | 33 +++++++++++----- 3 files changed, 79 insertions(+), 10 deletions(-) create mode 100644 tom_common/migrations/0003_profile_encrypted_dek.py diff --git a/tom_common/migrations/0003_profile_encrypted_dek.py b/tom_common/migrations/0003_profile_encrypted_dek.py new file mode 100644 index 000000000..ea5b9f893 --- /dev/null +++ b/tom_common/migrations/0003_profile_encrypted_dek.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.12 on 2026-03-27 22:29 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('tom_common', '0002_usersession'), + ] + + operations = [ + migrations.AddField( + model_name='profile', + name='encrypted_dek', + field=models.BinaryField(blank=True, null=True), + ), + ] diff --git a/tom_common/models.py b/tom_common/models.py index 8091ed2f4..c2ed78655 100644 --- a/tom_common/models.py +++ b/tom_common/models.py @@ -1,3 +1,32 @@ +"""Models for TOM Toolkit's user profiles and encrypted field storage. + +Encryption Architecture +----------------------- +TOM Toolkit uses envelope encryption to protect sensitive user data (API keys, +observatory credentials) at rest in the database. The scheme has two layers: + +1. A server-side **master key** (``TOMTOOLKIT_FIELD_ENCRYPTION_KEY``) is stored in the + environment, never in the database. It is a Fernet key used to wrap + (encrypt) per-user keys. + +2. Each user has a random **Data Encryption Key (DEK)** that encrypts their + actual data. The DEK is stored on the user's ``Profile`` as ``wrapped_dek`` + — encrypted by the master key. To use it, we unwrap (decrypt) it with the + master key, create a Fernet cipher, and use that cipher to encrypt or + decrypt individual fields. + +This means database access alone cannot decrypt user data — an attacker also +needs the master key from the server environment. See +``docs/design/encryption_architecture_redesign.md`` for the full design. + +Plugin developers use ``EncryptedProperty`` descriptors and +``EncryptableModelMixin`` to add encrypted fields to their models, and the +helper functions in ``session_utils`` to read/write those fields. The +encryption plumbing is handled transparently. +""" + +from __future__ import annotations + import logging from django.conf import settings from django.db import models @@ -10,13 +39,20 @@ class Profile(models.Model): - """Profile model for a TOMToolkit User""" + """Profile model for a TOMToolkit User. + """ user = models.OneToOneField(User, on_delete=models.CASCADE) affiliation = models.CharField(max_length=100, null=True, blank=True) def __str__(self): return f'{self.user.username} Profile' + # The user's Data Encryption Key (DEK), wrapped (encrypted) by the master key. + # This is a Fernet-encrypted blob of the user's random DEK. It can only be + # unwrapped using TOMTOOLKIT_FIELD_ENCRYPTION_KEY from the server environment. + # Null means no DEK has been generated yet (e.g., pre-existing users before + # this feature was added). + encrypted_dek = models.BinaryField(null=True, blank=True) class UserSession(models.Model): """Mapping model to associate the User and their Sessions diff --git a/tom_common/signals.py b/tom_common/signals.py index 108720412..23b4a50a9 100644 --- a/tom_common/signals.py +++ b/tom_common/signals.py @@ -23,17 +23,32 @@ # while get_user_model() is valid after INSTALLED_APPS are loaded. -# Signal: Create a Profile for the User when the User instance is created +# Signal: Create a Profile (with a wrapped DEK) for the User when the User instance is created @receiver(post_save, sender=User) -def save_profile_on_user_post_save(sender, instance, **kwargs): - """When a user is saved, save their profile.""" - # Take advantage of the fact that logging in updates a user's last_login field - # to create a profile for users that don't have one. +def save_profile_on_user_post_save(sender, instance, created, **kwargs) -> None: + """When a user is saved, ensure their Profile exists and has a wrapped DEK. + + On first save (user creation), creates a new Profile and generates a + wrapped Data Encryption Key (DEK) for the user. The DEK is a random + Fernet key encrypted by the server-side master key — see + ``session_utils.create_encrypted_data_encryption_key()`` for details. + + On subsequent saves, just saves the existing Profile (e.g., to propagate + any changes from inline formsets). + """ try: - instance.profile.save() - except User.profile.RelatedObjectDoesNotExist: # type: ignore - logger.info(f'No Profile found for {instance}. Creating Profile.') - Profile.objects.create(user=instance) + profile = instance.profile + # If the Profile exists but has no DEK (e.g., it was created before + # the encryption system was added), generate one now. + if not profile.encrypted_dek: + profile.encrypted_dek = session_utils.create_encrypted_dek() + profile.save() + except User.profile.RelatedObjectDoesNotExist: # type: ignore[attr-defined] + logger.info(f'No Profile found for {instance}. Creating Profile with encryption key.') + Profile.objects.create( + user=instance, + encrypted_dek=session_utils.create_encrypted_dek(), + ) # Signal: Create a DRF token for the User when the User instance is created From 5de019c4da934d52797bf853e4582897f7e99a28 Mon Sep 17 00:00:00 2001 From: "William (Lindy) Lindstrom" Date: Mon, 30 Mar 2026 14:55:25 -0700 Subject: [PATCH 03/14] the UserSession model is not necessary any more the (way better) envelope encryption scheme doesn't save anything to the session --- .../migrations/0004_delete_usersession.py | 16 + tom_common/models.py | 141 ++----- tom_common/session_utils.py | 384 ++++++------------ tom_common/signals.py | 124 +----- tom_common/views.py | 59 +-- 5 files changed, 192 insertions(+), 532 deletions(-) create mode 100644 tom_common/migrations/0004_delete_usersession.py diff --git a/tom_common/migrations/0004_delete_usersession.py b/tom_common/migrations/0004_delete_usersession.py new file mode 100644 index 000000000..c0fcc8f3e --- /dev/null +++ b/tom_common/migrations/0004_delete_usersession.py @@ -0,0 +1,16 @@ +# Generated by Django 5.2.12 on 2026-03-27 22:29 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('tom_common', '0003_profile_encrypted_dek'), + ] + + operations = [ + migrations.DeleteModel( + name='UserSession', + ), + ] diff --git a/tom_common/models.py b/tom_common/models.py index c2ed78655..20e3c0641 100644 --- a/tom_common/models.py +++ b/tom_common/models.py @@ -28,11 +28,11 @@ from __future__ import annotations import logging + +from cryptography.fernet import Fernet from django.conf import settings from django.db import models from django.contrib.auth.models import User -from django.contrib.sessions.models import Session -from cryptography.fernet import Fernet logger = logging.getLogger(__name__) @@ -44,9 +44,6 @@ class Profile(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE) affiliation = models.CharField(max_length=100, null=True, blank=True) - def __str__(self): - return f'{self.user.username} Profile' - # The user's Data Encryption Key (DEK), wrapped (encrypted) by the master key. # This is a Fernet-encrypted blob of the user's random DEK. It can only be # unwrapped using TOMTOOLKIT_FIELD_ENCRYPTION_KEY from the server environment. @@ -54,46 +51,39 @@ def __str__(self): # this feature was added). encrypted_dek = models.BinaryField(null=True, blank=True) -class UserSession(models.Model): - """Mapping model to associate the User and their Sessions - - An instance of this model is created whenever we receive the user_logged_in - signal (see signals.py). Upon receiving user_logged_out, we delete all instances - of UserSession for the specific User logging out. + def __str__(self) -> str: + return f'{self.user.username} Profile' - This allows us to manage the User's encrypted data in their app profiles, - should they change their password (see signals.py). - """ - # if either of the referenced objects are deleted, delete this object (CASCADE). - user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE) - session = models.ForeignKey(Session, on_delete=models.CASCADE) - def __str__(self): - return f'UserSession for {self.user.username} with Session key {self.session.session_key}' +class EncryptedProperty: + """A Python descriptor that provides transparent encryption and decryption + for a model field. + This descriptor works with ``EncryptableModelMixin`` and the helper + functions in ``session_utils``. It expects a Fernet cipher to be + temporarily attached to the model instance as ``_cipher`` before the + property is read or written. The cipher is created from the user's + unwrapped DEK by the helper functions and removed immediately after use. -class EncryptedProperty: - """ - A Python descriptor that provides transparent encryption and decryption for a - model field. + The ``_cipher`` attachment pattern exists because Python descriptors cannot + accept extra arguments — the cipher must be passed through the instance. + Direct access without a cipher raises ``AttributeError`` to prevent + accidental plaintext reads of encrypted data. - This descriptor is used in conjunction with the EncryptableModelMixin. It - requires a cipher to be temporarily attached to the model instance as `_cipher` - before accessing the property. + Usage:: - Usage: class MyModel(EncryptableModelMixin, models.Model): _my_secret_encrypted = models.BinaryField(null=True) my_secret = EncryptedProperty('_my_secret_encrypted') """ def __init__(self, db_field_name: str): self.db_field_name = db_field_name - self.property_name = None # Set by __set_name__ + self.property_name: str | None = None # Set by __set_name__ - def __set_name__(self, owner, name): + def __set_name__(self, owner: type, name: str) -> None: self.property_name = name - def __get__(self, instance, owner): + def __get__(self, instance: models.Model | None, owner: type) -> str | EncryptedProperty: if instance is None: return self @@ -109,16 +99,15 @@ def __get__(self, instance, owner): if not encrypted_value: return '' - # Handle bytes (sqlite3) vs memoryview (postgresql) + # Handle bytes (sqlite3) vs memoryview (postgresql). + # PostgreSQL/psycopg returns memoryview for BinaryFields; + # SQLite returns bytes. Fernet.decrypt() needs bytes. if isinstance(encrypted_value, memoryview): - # postgresql/psycopg uses a memoryview object for BinaryFields. - # Sqlite3 uses bytes. When needed, convert to the encrypted_value - # to bytes before we decrypt and decode it. encrypted_value = encrypted_value.tobytes() return cipher.decrypt(encrypted_value).decode() - def __set__(self, instance, value: str): + def __set__(self, instance: models.Model, value: str) -> None: cipher = getattr(instance, '_cipher', None) if not isinstance(cipher, Fernet): raise AttributeError( @@ -135,84 +124,32 @@ def __set__(self, instance, value: str): class EncryptableModelMixin(models.Model): - """ - A mixin for models that use EncryptedProperty to handle sensitive data. + """A mixin for models that use ``EncryptedProperty`` to handle sensitive data. - Provides a generic re-encryption mechanism for all encrypted properties - in the model. + Any model that stores encrypted fields should inherit from this mixin. + It provides: + + - A standardized ``user`` OneToOneField so that utility functions can + always find the user associated with an encryptable model instance. + - A ``clear_encrypted_fields()`` method to null out all encrypted fields + (used when a user's DEK must be regenerated). + + Subclasses should not redefine the ``user`` field. """ - # By defining the user relationship here, we ensure that any model using this - # mixin has a standardized way to associate with a user. This removes - # ambiguity and the need for assumptions in utility functions that need to - # find the user associated with an encryptable model instance. - # Subclasses should not redefine this field. user = models.OneToOneField(settings.AUTH_USER_MODEL, on_delete=models.CASCADE) - def reencrypt_model_fields(self, decoding_cipher: Fernet, encoding_cipher: Fernet) -> None: - """Re-encrypts all fields managed by an EncryptedProperty descriptor. - - Re-encryption means decypting to plaintext with the old cipher based on the old - password and re-encrypting the plaintext with the new cipher based on the new - password. - - The `EncryptableModelMixin` and the `EncyptedProperty` descriptor work together - to access the `Model`'s encytped `BinaryField`s (for setting, getting, and - re-encrypting, which involves both). - - The `EncryptedProperty` descriptor uses the `_cipher` attribute on the encyrpted - `BinaryField`-containing `Model` and this method sets and resets `_cipher` in the - process of re-encrypting: First, `Model._cipher` is the `decoding_cipher` to get the - plaintext value from the encrypted `BinaryField`. Second, `Model._cipher` is reset - to the `encoding_cipher` to encrypt the plaintext value and save it in the - `BinaryField`. Third, the `_cipher` attribute is removed from the `Model` until - the next time it's needed, when it's attached again. - - So, to re-encrpyt, for each of the Model's encrypted `BinaryField`s, we need to: - 1. Use the `decoding_cipher` to get the `plaintext` of the value stored in the - BinaryField. `self._cipher` is set to the `decoding_cipher` for this purpose - and the `EncyptedProperty` descriptor handles the getting. - 2. Reset `self._cipher` to be the `encoding_cipher` and have the `EncyptedProperty` - descriptor handle the encryption and setting. - 3. Remove the `_cipher` attribute from the Model. - """ - model_save_needed = False - for attr_name in dir(self.__class__): - attr = getattr(self.__class__, attr_name) - if isinstance(attr, EncryptedProperty): - try: - # Set decoding cipher and get plaintext - self._cipher = decoding_cipher - plaintext = getattr(self, attr_name) - - if plaintext: - # Set encoding cipher and set new value - self._cipher = encoding_cipher - setattr(self, attr_name, plaintext) - model_save_needed = True - except Exception as e: - logger.error(f"Error re-encrypting property {attr_name} for {self.__class__.__name__}" - f" instance {getattr(self, 'pk', 'UnknownPK')}: {e}") - finally: - # Clean up the temporary cipher - if hasattr(self, '_cipher'): - del self._cipher - if model_save_needed: - self.save() - def clear_encrypted_fields(self) -> None: - """ - Clears all fields managed by an EncryptedProperty descriptor. + """Clear all fields managed by an ``EncryptedProperty`` descriptor. - This is a destructive operation used when re-encryption is not possible, - e.g., when a user's password is reset by an admin and the old - decryption key is unavailable. It sets the value of each encrypted - field to None. + Sets each encrypted BinaryField to None and saves the model. This is a + destructive operation — the encrypted data is permanently lost. """ model_save_needed = False for attr_name in dir(self.__class__): attr = getattr(self.__class__, attr_name) if isinstance(attr, EncryptedProperty): - # Directly set the underlying db field to None + # Set the underlying BinaryField directly to None, bypassing + # the descriptor (which would require a cipher). setattr(self, attr.db_field_name, None) model_save_needed = True logger.info(f"Cleared encrypted property '{attr_name}' for {self.__class__.__name__} " diff --git a/tom_common/session_utils.py b/tom_common/session_utils.py index 7e87f41d2..d7a3ec42b 100644 --- a/tom_common/session_utils.py +++ b/tom_common/session_utils.py @@ -1,25 +1,49 @@ -import base64 +"""Utilities for encrypting and decrypting sensitive user data at rest. + +This module implements the "read/write time" portion of TOM Toolkit's envelope +encryption scheme. The full architecture is documented in +``docs/design/encryption_architecture_redesign.md``; here is a brief summary +of how the pieces fit together: + +**Master key** (``TOMTOOLKIT_FIELD_ENCRYPTION_KEY`` in settings / environment): + A Fernet key that never touches the database. It wraps (encrypts) each + user's Data Encryption Key so that database access alone cannot reveal + user data. + +**Per-user DEK** (``Profile.wrapped_dek``): + A random Fernet key generated when the user is created. Stored in the + database encrypted by the master key. To use it, we unwrap it with the + master key, build a Fernet cipher, and attach it briefly to the model + instance that holds the encrypted field. + +**EncryptedProperty / EncryptableModelMixin** (in ``models.py``): + The descriptor and mixin that plugin models use to declare encrypted + fields. They expect a ``_cipher`` attribute on the model instance — + this module's helper functions manage that lifecycle. + +Typical call from a view or API endpoint:: + + from tom_common.session_utils import get_encrypted_field, set_encrypted_field + + api_key = get_encrypted_field(user, eso_profile, 'api_key') + set_encrypted_field(user, eso_profile, 'api_key', new_value) + eso_profile.save() +""" + +from __future__ import annotations + import logging from typing import Optional, TypeVar from cryptography.fernet import Fernet -from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.backends import default_backend -from django.apps import AppConfig, apps +from django.conf import settings from django.db import models -from django.contrib.auth.models import User -from django.contrib.sessions.models import Session -from django.contrib.sessions.backends.db import SessionStore -from tom_common.models import EncryptableModelMixin, UserSession +from tom_common.models import Profile -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -# Constant for storing the cipher encryption key in the session -SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY = 'key' +logger = logging.getLogger(__name__) # A generic TypeVar for a Django models.Model subclass instance. # The `bound=models.Model` constraint ensures that any @@ -27,113 +51,107 @@ ModelType = TypeVar('ModelType', bound=models.Model) -def create_cipher_encryption_key(user: User, password: str) -> bytes: - """Creates a Fernet cipher encryption key derived from the user's password. +def _get_master_cipher() -> Fernet: + """Return a Fernet cipher built from the server-side master key. - This key is intended to be stored (e.g., in the session) and used to - instantiate Fernet ciphers for encrypting and decrypting sensitive data - associated with the user, such as API keys or external service credentials. + The master key (``TOMTOOLKIT_FIELD_ENCRYPTION_KEY``) lives in the server + environment, not in the database. It is used only to wrap and unwrap + per-user DEKs — never to encrypt user data directly. - The key derivation process uses PBKDF2HMAC with a salt generated from - the user's username, making the key unique per user and password. - - Args: - user: The Django User object. - password: The user's plaintext password. - - Returns: - A URL-safe base64-encoded Fernet encryption key as bytes. - - See Also: - https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet + Raises: + django.core.exceptions.ImproperlyConfigured: If the setting is missing + or empty. """ + key = getattr(settings, 'TOMTOOLKIT_FIELD_ENCRYPTION_KEY', '') + if not key: + from django.core.exceptions import ImproperlyConfigured + raise ImproperlyConfigured( + "TOMTOOLKIT_FIELD_ENCRYPTION_KEY is not set. This setting is required for " + "encrypting sensitive user data at rest. Generate one with:\n" + " python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\"\n" + "Then add it to your environment or settings.py." + ) + # The key may be a string (from os.getenv) or bytes; Fernet accepts both. + return Fernet(key) - # Generate a salt from hash and username - salt = hashes.Hash(hashes.SHA256(), backend=default_backend()) - salt.update(user.username.encode()) - # Derive encryption_key using PBKDF2-HMAC and the newly generated salt - kdf = PBKDF2HMAC( # key derivation function - algorithm=hashes.SHA256(), - length=32, - salt=salt.finalize()[:16], # only finalize once; returns bytes; use 16 bytes - iterations=1_000_000, # Django recommendation of jan-2025 - backend=default_backend(), - ) - encryption_key: bytes = base64.urlsafe_b64encode(kdf.derive(password.encode())) - return encryption_key +def create_encrypted_dek() -> bytes: + """Generate a new random DEK and return it wrapped (encrypted) by the master key. + This is called once per user, at user-creation time (see ``signals.py``). + The returned bytes are stored in ``Profile.wrapped_dek``. -def save_key_to_session_store(key: bytes, session_store: SessionStore) -> None: - """Saves the provided encryption key to the given Django session store. + We use ``Fernet.generate_key()`` rather than ``os.urandom()`` because + Fernet keys have a specific format (URL-safe base64-encoded 32 bytes) + and ``generate_key()`` guarantees that format. - The key is first base64 encoded and converted to a string before being - stored in the session under a predefined session key. - - Args: - key: The encryption key (bytes) to be saved. - session_store: The Django SessionStore instance where the key will be saved. + Returns: + The DEK encrypted by the master key, as bytes suitable for a BinaryField. """ - try: - assert isinstance(session_store, SessionStore), \ - f"session_store is not a SessionStore; it's a {type(session_store)}" - except AssertionError as e: - logger.error(str(e)) - - # The key is bytes, but session values must be JSON-serializable. - # A Fernet key is already base64-encoded, so we just decode it to a string for storage. - session_store[SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY] = key.decode('utf-8') - session_store.save() # we might be accessing the session before it's saved (in the middleware?) + # Generate a fresh random Fernet key for this user + dek: bytes = Fernet.generate_key() + # Wrap (encrypt) the DEK with the master key so it can be stored safely + # in the database. The master key is the only thing that can unwrap it. + master_cipher = _get_master_cipher() + wrapped_dek: bytes = master_cipher.encrypt(dek) + return wrapped_dek -def get_key_from_session_model(session: Session) -> bytes: - """Extracts and decodes the encryption key from a Django Session object. - - Retrieves the base64 encoded key string from the session, decodes it - from base64, and returns the raw bytes of the encryption key. +def _unwrap_dek(wrapped_dek: bytes) -> bytes: + """Unwrap (decrypt) a user's DEK using the master key. Args: - session: The Django Session object from which to extract the key. + wrapped_dek: The encrypted DEK from ``Profile.wrapped_dek``. Returns: - The encryption key as bytes. + The plaintext DEK (a valid Fernet key as bytes). """ + # Handle memoryview from PostgreSQL BinaryField + if isinstance(wrapped_dek, memoryview): + wrapped_dek = wrapped_dek.tobytes() + + master_cipher = _get_master_cipher() + return master_cipher.decrypt(wrapped_dek) - logger.debug(f"Extracting key from Session model: {type(session)} = {session} - {session.get_decoded()}") - key_as_str: str = session.get_decoded()[SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY] # type: ignore - # The key was stored as a string, so we encode it back to bytes. - return key_as_str.encode('utf-8') +def _get_cipher_for_user(user) -> Fernet: + """Build a Fernet cipher from a user's unwrapped DEK. -def get_key_from_session_store(session_store: SessionStore) -> bytes: - """Extracts the encryption key from a Django SessionStore instance. + This fetches the user's ``Profile.wrapped_dek``, unwraps it with the master + key, and returns a Fernet cipher ready to encrypt or decrypt the user's + data fields. - Use the dictionary-like API that the SessionStore provides to retreive - the encryption key. + The unwrapped DEK exists only in memory for the duration of this call and + the subsequent encrypt/decrypt operation. It is never persisted in + plaintext. Args: - session_store: The Django SessionStore instance. + user: A Django User instance. Returns: - The encryption key as bytes. + A Fernet cipher built from the user's DEK. + + Raises: + Profile.DoesNotExist: If the user has no Profile. + ValueError: If the user's Profile has no wrapped DEK. """ - if not isinstance(session_store, SessionStore): - # manual type checking - raise TypeError(f"Expected a SessionStore object, but got {type(session_store)}") + profile = Profile.objects.get(user=user) + if not profile.encrypted_dek: + raise ValueError(f"User {user.username} has no encryption key (wrapped_dek is empty). " + f"This may indicate the user was created before encryption was configured.") - key_as_str: str = session_store[SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY] - return key_as_str.encode('utf-8') + dek: bytes = _unwrap_dek(profile.encrypted_dek) + return Fernet(dek) -def get_encrypted_field(user: User, - model_instance: ModelType, # type: ignore +def get_encrypted_field(user, + model_instance: ModelType, field_name: str) -> Optional[str]: - """ - Helper function to safely get the decrypted value of an EncryptedProperty. + """Safely get the decrypted value of an EncryptedProperty. - This function encapsulates the logic of fetching the user's session key, - creating a cipher, attaching it to the model instance, reading the - decrypted value, and cleaning up. + Fetches the user's DEK from their Profile, unwraps it with the master key, + creates a Fernet cipher, and uses the ``EncryptedProperty`` descriptor to + decrypt the field value. Args: user: The User object associated with the encrypted data. @@ -142,26 +160,18 @@ def get_encrypted_field(user: User, Returns: The decrypted string value, or None if decryption fails for any reason - (e.g., no active session, key not found). + (e.g., no Profile, no DEK, corrupted data). """ try: - # Get the current Session from the UserSession - # A user can be logged in from multiple browsers, resulting in multiple - # UserSession objects. Since the encryption key is derived from the - # password and is the same for all sessions, we can safely take the first one. - user_session = UserSession.objects.filter(user=user).first() - if not user_session: - raise UserSession.DoesNotExist(f"No active session found for user {user.username}") - - session: Session = user_session.session - cipher_key: bytes = get_key_from_session_model(session) - cipher: Fernet = Fernet(cipher_key) - - # Attach the cipher, get the value, and then clean up - model_instance._cipher = cipher # type: ignore + cipher = _get_cipher_for_user(user) + # Attach the cipher so the EncryptedProperty descriptor can use it, + # read the decrypted value, then clean up. The cipher is attached to + # the model instance (not the user) because the descriptor's __get__ + # method receives the instance it's defined on. + model_instance._cipher = cipher # type: ignore[attr-defined] decrypted_value = getattr(model_instance, field_name) return decrypted_value - except (UserSession.DoesNotExist, KeyError) as e: + except (Profile.DoesNotExist, ValueError) as e: logger.warning(f"Could not get encryption key for user {user.username} to access " f"'{field_name}': {e}") return None @@ -170,24 +180,23 @@ def get_encrypted_field(user: User, f"for user {user.username}: {e}") return None finally: - # Ensure the temporary cipher is always removed from the instance + # Always remove the temporary cipher from the instance to avoid + # accidental reuse or leaking the key in memory longer than needed. if hasattr(model_instance, '_cipher'): - del model_instance._cipher # type: ignore + del model_instance._cipher # type: ignore[attr-defined] -def set_encrypted_field(user: User, - model_instance: ModelType, # type: ignore +def set_encrypted_field(user, + model_instance: ModelType, field_name: str, value: str) -> bool: - """ - Helper function to safely set the value of an EncryptedProperty. + """Safely set the value of an EncryptedProperty. - This function encapsulates the logic of fetching the user's session key, - creating a cipher, attaching it to the model instance, setting the new - encrypted value, and cleaning up. + Fetches the user's DEK, creates a cipher, and uses the + ``EncryptedProperty`` descriptor to encrypt and store the value. Note: This function does NOT save the instance. The caller is responsible - for calling `instance.save()` after the field has been set. + for calling ``instance.save()`` after the field has been set. Args: user: The User object associated with the encrypted data. @@ -199,20 +208,11 @@ def set_encrypted_field(user: User, True if the field was set successfully, False otherwise. """ try: - # Get the current Session from the UserSession - user_session = UserSession.objects.filter(user=user).first() # see comment above - if not user_session: - raise UserSession.DoesNotExist(f"No active session found for user {user.username}") - - session: Session = user_session.session - cipher_key: bytes = get_key_from_session_model(session) - cipher = Fernet(cipher_key) - - # Attach the cipher, set the value, and then clean up - model_instance._cipher = cipher # type: ignore + cipher = _get_cipher_for_user(user) + model_instance._cipher = cipher # type: ignore[attr-defined] setattr(model_instance, field_name, value) return True - except (UserSession.DoesNotExist, KeyError) as e: + except (Profile.DoesNotExist, ValueError) as e: logger.error(f"Could not get encryption key for user {user.username} to set " f"'{field_name}': {e}") return False @@ -221,131 +221,5 @@ def set_encrypted_field(user: User, f"for user {user.username}: {e}") return False finally: - # Ensure the temporary cipher is always removed from the instance if hasattr(model_instance, '_cipher'): - del model_instance._cipher # type: ignore - - -def reencrypt_data(user) -> None: - """Re-encrypts sensitive data for a user after a password change. - - If an Administrator is changing another user's password, and - the `user: User` is not logged-in, then they have no SessionStore, - and, thus, no encryption key is available. In that case, the User's - encrypted fields are cleared out because they are stale, having - been ecrypted with an encryption key derived from a password that - is no longer in use. - - Args: - user: The Django User object whose password has changed. - """ - logger.debug("Re-encrypting sensitive data...") - - # Get the current Session from the UserSession - user_session = UserSession.objects.filter(user=user.id).first() # see comment above - - if not user_session: - logger.warning(f"User {user.username} is not logged in. Cannot re-encrypt sensitive data. " - f"Clearing all encrypted fields instead.") - # Loop through all the installed apps and ask them to clear their encrypted profile fields - for app_config in apps.get_app_configs(): - clear_encrypted_fields_for_user(app_config, user) # type: ignore - return - - session: Session = user_session.session - # Get the current encryption_key from the Session - current_encryption_key: bytes = get_key_from_session_model(session) - # Generate a decoding Fernet cipher with the current encryption key - decoding_cipher = Fernet(current_encryption_key) - - # Get the new raw password from the User instance - new_raw_password = user._password # CAUTION: this is implemenation dependent (using _) - # Generate a new encryption_key with the new raw password - new_encryption_key: bytes = create_cipher_encryption_key(user, new_raw_password) - # Generate a new encoding Fernet cipher with the new encryption key - encoding_cipher = Fernet(new_encryption_key) - - # Save the new encryption key in the User's Session - session_store: SessionStore = SessionStore(session_key=session.session_key) - save_key_to_session_store(new_encryption_key, session_store) - # also, attach the new encryption key to the User instance so it can be inserted - # into the Session before we call update_session_auth_hash in - # tom_common.views.UserUpdateView.form_valid() - user._temp_new_fernet_key = new_encryption_key - - # Loop through all the installed apps and ask them to reencrypt their encrypted profile fields - for app_config in apps.get_app_configs(): - try: - reencrypt_encypted_fields_for_user(app_config, user, decoding_cipher, encoding_cipher) # type: ignore - except AttributeError: - logger.debug(f'App: {app_config.name} does not have a reencrypt_app_fields method.') - continue - - -def reencrypt_encypted_fields_for_user(app_config: AppConfig, user: 'User', - decoding_cipher: Fernet, encoding_cipher: Fernet): - """ - Automatically finds models in the app_config that inherit from EncryptableModelMixin - and attempts to re-encrypt their fields for the given user. - - :param app_config: The AppConfig instance of the plugin app. - :param user: The User whose data needs re-encryption. - :param decoding_cipher: Fernet cipher to decrypt existing data. - :param encoding_cipher: Fernet cipher to encrypt new data. - """ - for model_class in app_config.get_models(): - if issubclass(model_class, EncryptableModelMixin): - logger.debug(f"Found EncryptableModelMixin subclass: {model_class.__name__} in app {app_config.name}") - # The EncryptableModelMixin guarantees a 'user' field, which is a OneToOneField. - try: - encryptable_model_instance = model_class.objects.get(user=user) - # instance of the Model which is a subclass of EncryptableModelMixin - encryptable_model_instance.reencrypt_model_fields(decoding_cipher, encoding_cipher) # re-entrpt here - except model_class.DoesNotExist: - logger.info(f"No {model_class.__name__} instance found for user {user.username}.") - except model_class.MultipleObjectsReturned: - # This should not be reached if the mixin correctly enforces a OneToOneField. - # It's kept here as a safeguard against unexpected configurations. - logger.error(f"Multiple {model_class.__name__} instances found for user {user.username}. " - f"This is unexpected for an EncryptableModelMixin. Re-encrypting all found.") - instances = model_class.objects.filter(user=user) - for encryptable_model_instance in instances: - encryptable_model_instance.reencrypt_model_fields(decoding_cipher, encoding_cipher) - except Exception as e: - logger.error(f"Error processing model {model_class.__name__} for re-encryption for " - f"user {user.username}: {e}") - - -def clear_encrypted_fields_for_user(app_config: AppConfig, user: 'User',) -> None: - """ - Finds models in an app that are Encryptable and clears their encrypted fields for the given user. - - This is a destructive operation used when a user's password is reset without - them being logged in, making the old decryption key unavailable. This happens, - for example, when an adminitrator resets their password. - - :param app_config: The AppConfig instance of the plugin app. - :param user: The User whose data needs to be cleared. - """ - for model_class in app_config.get_models(): - if issubclass(model_class, EncryptableModelMixin): - logger.debug(f"Found EncryptableModelMixin subclass: {model_class.__name__} in " - f"app {app_config.name} for clearing.") - # The EncryptableModelMixin now guarantees a 'user' field, which is a OneToOneField. - try: - encryptable_model_instance = model_class.objects.get(user=user) - # instance of the Model which is a subclass of EncryptableModelMixin - encryptable_model_instance.clear_encrypted_fields() # do the clearing of the fields here - except model_class.DoesNotExist: - logger.info(f"No {model_class.__name__} instance found for user {user.username} to clear.") - except model_class.MultipleObjectsReturned: - # This should not be reached if the mixin correctly enforces a OneToOneField. - # It's kept here as a safeguard against unexpected configurations. - logger.error(f"Multiple {model_class.__name__} instances found for user {user.username}. " - f"This is unexpected for an EncryptableModelMixin. Clearing all found.") - instances = model_class.objects.filter(user=user) - for encryptable_model_instance in instances: - encryptable_model_instance.clear_encrypted_fields() - except Exception as e: - logger.error(f"Error clearing encrypted fields for model {model_class.__name__} for " - f"user {user.username}: {e}") + del model_instance._cipher # type: ignore[attr-defined] diff --git a/tom_common/signals.py b/tom_common/signals.py index 23b4a50a9..de37f8417 100644 --- a/tom_common/signals.py +++ b/tom_common/signals.py @@ -1,16 +1,13 @@ import logging from django.conf import settings -from django.contrib.auth import get_user_model from django.contrib.auth.models import User -from django.contrib.auth.signals import user_logged_in, user_logged_out -from django.contrib.sessions.models import Session -from django.db.models.signals import post_save, pre_save +from django.db.models.signals import post_save from django.dispatch import receiver from rest_framework.authtoken.models import Token -from tom_common.models import Profile, UserSession +from tom_common.models import Profile from tom_common import session_utils logger = logging.getLogger(__name__) @@ -64,120 +61,3 @@ def create_auth_token_on_user_post_save(sender, instance=None, created=False, ** """ if created: Token.objects.create(user=instance) - - -# Signal: Create UserSession on login -@receiver(user_logged_in) -def create_user_session_on_user_logged_in(sender, request, user, **kwargs) -> None: - """Whenever a user logs in, create a UserSession instance to associate - the User with the new Session. - """ - logger.debug(f"User {user.username} has logged in. request: {request}") - logger.debug(f"Request session: {type(request.session)} = {request.session}") - - # the request.session is a SessionStore object, we need the Session - # and we can get it using the session_key - try: - session: Session = Session.objects.get(pk=request.session.session_key) - except Session.DoesNotExist: - # this request should have a sesssion: SessionStore object, and if it - # doesn't, it could be because the user was logged in as part of a test, but - # TODO: sort out whether the test code should be updated or ??? - logger.error(f"Session {request.session.session_key} does not exist.") - return - - logger.debug(f"Session: {type(session)} = {session}") - - user_session, created = UserSession.objects.get_or_create(user=user, session=session) - if created: - logger.debug(f"UserSession created: {user_session}") - else: - logger.debug(f"UserSession already exists: {user_session}") - - -# Signal: Delete UserSession on logout -@receiver(user_logged_out) -def delete_user_session_on_user_logged_out(sender, request, user, **kwargs) -> None: - """Whenever a user logs out, delete all their UserSession instances. - """ - user_sessions = UserSession.objects.filter(user=user) - for user_session in user_sessions: - user_session.session.delete() - # TODO: consider if the User has logged in from multiple browsers/devices - # (i.e. we want to delete all their sessions or just the one they logged out from) - # this could probably be done by filtering on the session_key of the request in - # addition to the user above. - - -# Signal: Set cipher on login -@receiver(user_logged_in) -def set_cipher_on_user_logged_in(sender, request, user, **kwargs) -> None: - """When the user logs in, capture their password and use it to - generate a cipher encryption key and save it in the User's Session. - """ - logger.debug(f"User {user.username} has logged in. request: {request}") - - password = request.POST.get("password") # Capture password from login - if password: - encryption_key: bytes = session_utils.create_cipher_encryption_key(user, password) - session_utils.save_key_to_session_store(encryption_key, request.session) - else: - logger.error(f'User {user.username} logged in without a password. Cannot create encryption key.') - - -# Signal: Clear cipher encryption key on logout -@receiver(user_logged_out) -def clear_encryption_key_on_user_logged_out(sender, request, user, **kwargs) -> None: - """Clear the cipher encryption key when a user logs out. - """ - if user: - logger.debug(f'User {user.username} has logged out. Deleting key from Session.' - f'sender: {sender}; request: {request}') - request.session.pop(session_utils.SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY, None) - - -# Signal: Update the User's sensitive data when the password changes -@receiver(pre_save, sender=get_user_model()) -def user_updated_on_user_pre_save(sender, **kwargs): - """When the User model is saved, detect if the password has changed. - - kwargs: - * signal: - * instance: - * raw: Boolean - * using: str - * update_fields: frozenset | NoneType - - If the User's password has changed, take the following actions: - - Current list of actions to be taken upon User password change: - * re-encrypt the user's sensitive data (see session_utils.reencrypt_data() function) - * - """ - logger.debug(f"kwargs: {kwargs}") - user = kwargs.get("instance", None) - - if user and not user.username == 'AnonymousUser' and not user.is_anonymous: - # user.password vs. user._password: - # the user.password field is used for authentication (via comparison to the (hashed) password - # being tested for validity). The _password field is the raw password and is what we need to - # create a new cipher for the User's sensitive data. - - # This Signal is called for ANY change to the User model, not just password changes. - # So, determine if the password has changed by comparing new and old (hashed) passwords. - # NOTE: the update_fields kwarg is a frozenset of changed updated fields, but it does not contain - # 'password' when the User is changing their password. So, compare new and old: - - new_hashed_password = user.password # from the not-yet-saved User instance - try: - old_hashed_password = User.objects.get(id=user.id).password # from the previously-saved User instance - except User.DoesNotExist: - old_hashed_password = None - - if new_hashed_password != old_hashed_password: - # New password detected - logger.debug(f'User {user.username} is changing their password.') - session_utils.reencrypt_data(user) # need new RAW password to re-create cipher and re-encrypt - else: - # No new password detected - logger.debug(f'User {user.username} is updating their profile without a password change.') diff --git a/tom_common/views.py b/tom_common/views.py index 47974f505..616f1e96f 100644 --- a/tom_common/views.py +++ b/tom_common/views.py @@ -13,7 +13,6 @@ from django.shortcuts import redirect from django.contrib.auth import update_session_auth_hash -from tom_common.models import UserSession from tom_common.forms import ChangeUserPasswordForm, CustomUserCreationForm, GroupForm from tom_common.mixins import SuperuserRequiredMixin @@ -225,64 +224,18 @@ def dispatch(self, *args, **kwargs): return super().dispatch(*args, **kwargs) def form_valid(self, form): - """ - Called after form is validated. Updates the session hash if the password was changed - to keep the user logged in, and ensures the UserSession is updated to the new session. + """Called after form is validated. - :param form: User creation form - :type form: django.forms.Form + If the password was changed, updates the session auth hash to keep the + user logged in (Django invalidates the session when the password hash + changes). Encryption keys are independent of the password, so no + re-encryption is needed. """ - self.object = form.save() # self.object is the user + self.object = form.save() - # if new password was provided, update the session hash and UserSession if form.cleaned_data.get("password1"): - from tom_common import session_utils - # But, before we update the session hash, we need to put the new Fernet key into - # the Session so that it gets copied over to the new Session when we update the - # session hash. (It was stashed in the User object when we called reencrypt_data). - - if hasattr(self.object, '_temp_new_fernet_key'): - new_fernet_key = self.object._temp_new_fernet_key - session_utils.save_key_to_session_store(new_fernet_key, self.request.session) - del self.object._temp_new_fernet_key # clean up that temporary attribute - - # now we're ready to update the session hash update_session_auth_hash(self.request, self.object) - # The old UserSession (if any) linked to the old Session would have been deleted by CASCADE. - # We need to create a new UserSession linking the User to the new Session. - new_session_key = self.request.session.session_key - if new_session_key: - try: - # Get the new Django Session object from the database - # Need to import Session model: from django.contrib.sessions.models import Session - from django.contrib.sessions.models import Session - - new_session = Session.objects.get(session_key=new_session_key) - - # Create a UserSession entry for the user and their new session. - # This mirrors the logic in the user_logged_in signal. - _, created = UserSession.objects.get_or_create( - user=self.object, - session=new_session - ) - if created: - logger.debug(f"Created UserSession for {self.object.username} with new session " - f"{new_session.session_key} after password change.") - # else: (not created) implies a UserSession for this user and this *new* session already existed, - # which would be unusual immediately after update_session_auth_hash. - except Session.DoesNotExist: - logger.error( - f"New session {new_session_key} not found in database for user {self.object.username} " - f"after password change. Cannot create UserSession." - ) - except Exception as e: - logger.error(f"Error creating UserSession for user {self.object.username} " - f"after password change: {e}") - else: - logger.error(f"No session key found in request for user {self.object.username} " - f"after password change. Cannot create UserSession.") - messages.success(self.request, 'Profile updated') return HttpResponseRedirect(self.get_success_url()) From 67ee2e14b9469da21717262b8e303bde984f1d82 Mon Sep 17 00:00:00 2001 From: "William (Lindy) Lindstrom" Date: Mon, 30 Mar 2026 15:14:23 -0700 Subject: [PATCH 04/14] use encrypt/decrypt terminology instead of wrap/unwrap (I gather that) the term of art in envelope encryption is to wrap(encrypt) and unwrap(decrypt) the secret encryption keys (with the TOMTOOLKIT_FIELD_ENCRYPTION_KEY-created cipher in our case). This commit removes that conceptual jargon in favor of what's literally going on: encrypting and decrypting. --- tom_common/models.py | 18 +- tom_common/session_utils.py | 53 +++-- tom_common/signals.py | 10 +- tom_common/tests.py | 413 ++++++++++++------------------------ 4 files changed, 176 insertions(+), 318 deletions(-) diff --git a/tom_common/models.py b/tom_common/models.py index 20e3c0641..efc9bcc1d 100644 --- a/tom_common/models.py +++ b/tom_common/models.py @@ -6,14 +6,14 @@ observatory credentials) at rest in the database. The scheme has two layers: 1. A server-side **master key** (``TOMTOOLKIT_FIELD_ENCRYPTION_KEY``) is stored in the - environment, never in the database. It is a Fernet key used to wrap - (encrypt) per-user keys. + environment, never in the database. It is a Fernet key used to encrypt + per-user keys. 2. Each user has a random **Data Encryption Key (DEK)** that encrypts their - actual data. The DEK is stored on the user's ``Profile`` as ``wrapped_dek`` - — encrypted by the master key. To use it, we unwrap (decrypt) it with the - master key, create a Fernet cipher, and use that cipher to encrypt or - decrypt individual fields. + actual data. The DEK is stored on the user's ``Profile`` as ``encrypted_dek`` + — encrypted by the master key. To use it, we decrypt it with the master + key, create a Fernet cipher, and use that cipher to encrypt or decrypt + individual fields. This means database access alone cannot decrypt user data — an attacker also needs the master key from the server environment. See @@ -44,9 +44,9 @@ class Profile(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE) affiliation = models.CharField(max_length=100, null=True, blank=True) - # The user's Data Encryption Key (DEK), wrapped (encrypted) by the master key. + # The user's Data Encryption Key (DEK), encrypted by the master key. # This is a Fernet-encrypted blob of the user's random DEK. It can only be - # unwrapped using TOMTOOLKIT_FIELD_ENCRYPTION_KEY from the server environment. + # decrypted using TOMTOOLKIT_FIELD_ENCRYPTION_KEY from the server environment. # Null means no DEK has been generated yet (e.g., pre-existing users before # this feature was added). encrypted_dek = models.BinaryField(null=True, blank=True) @@ -63,7 +63,7 @@ class EncryptedProperty: functions in ``session_utils``. It expects a Fernet cipher to be temporarily attached to the model instance as ``_cipher`` before the property is read or written. The cipher is created from the user's - unwrapped DEK by the helper functions and removed immediately after use. + decrypted DEK by the helper functions and removed immediately after use. The ``_cipher`` attachment pattern exists because Python descriptors cannot accept extra arguments — the cipher must be passed through the instance. diff --git a/tom_common/session_utils.py b/tom_common/session_utils.py index d7a3ec42b..bd6fdcdd2 100644 --- a/tom_common/session_utils.py +++ b/tom_common/session_utils.py @@ -6,13 +6,12 @@ of how the pieces fit together: **Master key** (``TOMTOOLKIT_FIELD_ENCRYPTION_KEY`` in settings / environment): - A Fernet key that never touches the database. It wraps (encrypts) each - user's Data Encryption Key so that database access alone cannot reveal - user data. + A Fernet key that never touches the database. It encrypts each user's + Data Encryption Key so that database access alone cannot reveal user data. -**Per-user DEK** (``Profile.wrapped_dek``): +**Per-user DEK** (``Profile.encrypted_dek``): A random Fernet key generated when the user is created. Stored in the - database encrypted by the master key. To use it, we unwrap it with the + database encrypted by the master key. To use it, we decrypt it with the master key, build a Fernet cipher, and attach it briefly to the model instance that holds the encrypted field. @@ -55,7 +54,7 @@ def _get_master_cipher() -> Fernet: """Return a Fernet cipher built from the server-side master key. The master key (``TOMTOOLKIT_FIELD_ENCRYPTION_KEY``) lives in the server - environment, not in the database. It is used only to wrap and unwrap + environment, not in the database. It is used only to encrypt and decrypt per-user DEKs — never to encrypt user data directly. Raises: @@ -76,10 +75,10 @@ def _get_master_cipher() -> Fernet: def create_encrypted_dek() -> bytes: - """Generate a new random DEK and return it wrapped (encrypted) by the master key. + """Generate a new random DEK and return it encrypted by the master key. This is called once per user, at user-creation time (see ``signals.py``). - The returned bytes are stored in ``Profile.wrapped_dek``. + The returned bytes are stored in ``Profile.encrypted_dek``. We use ``Fernet.generate_key()`` rather than ``os.urandom()`` because Fernet keys have a specific format (URL-safe base64-encoded 32 bytes) @@ -90,38 +89,38 @@ def create_encrypted_dek() -> bytes: """ # Generate a fresh random Fernet key for this user dek: bytes = Fernet.generate_key() - # Wrap (encrypt) the DEK with the master key so it can be stored safely - # in the database. The master key is the only thing that can unwrap it. + # Encrypt the DEK with the master key so it can be stored safely + # in the database. The master key is the only thing that can decrypt it. master_cipher = _get_master_cipher() - wrapped_dek: bytes = master_cipher.encrypt(dek) - return wrapped_dek + encrypted_dek: bytes = master_cipher.encrypt(dek) + return encrypted_dek -def _unwrap_dek(wrapped_dek: bytes) -> bytes: - """Unwrap (decrypt) a user's DEK using the master key. +def _decrypt_dek(encrypted_dek: bytes) -> bytes: + """Decrypt a user's DEK using the master key. Args: - wrapped_dek: The encrypted DEK from ``Profile.wrapped_dek``. + encrypted_dek: The encrypted DEK from ``Profile.encrypted_dek``. Returns: The plaintext DEK (a valid Fernet key as bytes). """ # Handle memoryview from PostgreSQL BinaryField - if isinstance(wrapped_dek, memoryview): - wrapped_dek = wrapped_dek.tobytes() + if isinstance(encrypted_dek, memoryview): + encrypted_dek = encrypted_dek.tobytes() master_cipher = _get_master_cipher() - return master_cipher.decrypt(wrapped_dek) + return master_cipher.decrypt(encrypted_dek) def _get_cipher_for_user(user) -> Fernet: - """Build a Fernet cipher from a user's unwrapped DEK. + """Build a Fernet cipher from a user's decrypted DEK. - This fetches the user's ``Profile.wrapped_dek``, unwraps it with the master - key, and returns a Fernet cipher ready to encrypt or decrypt the user's - data fields. + This fetches the user's ``Profile.encrypted_dek``, decrypts it with the + master key, and returns a Fernet cipher ready to encrypt or decrypt the + user's data fields. - The unwrapped DEK exists only in memory for the duration of this call and + The decrypted DEK exists only in memory for the duration of this call and the subsequent encrypt/decrypt operation. It is never persisted in plaintext. @@ -133,14 +132,14 @@ def _get_cipher_for_user(user) -> Fernet: Raises: Profile.DoesNotExist: If the user has no Profile. - ValueError: If the user's Profile has no wrapped DEK. + ValueError: If the user's Profile has no encrypted DEK. """ profile = Profile.objects.get(user=user) if not profile.encrypted_dek: - raise ValueError(f"User {user.username} has no encryption key (wrapped_dek is empty). " + raise ValueError(f"User {user.username} has no encryption key (encrypted_dek is empty). " f"This may indicate the user was created before encryption was configured.") - dek: bytes = _unwrap_dek(profile.encrypted_dek) + dek: bytes = _decrypt_dek(profile.encrypted_dek) return Fernet(dek) @@ -149,7 +148,7 @@ def get_encrypted_field(user, field_name: str) -> Optional[str]: """Safely get the decrypted value of an EncryptedProperty. - Fetches the user's DEK from their Profile, unwraps it with the master key, + Fetches the user's DEK from their Profile, decrypts it with the master key, creates a Fernet cipher, and uses the ``EncryptedProperty`` descriptor to decrypt the field value. diff --git a/tom_common/signals.py b/tom_common/signals.py index de37f8417..cd9e9c745 100644 --- a/tom_common/signals.py +++ b/tom_common/signals.py @@ -20,15 +20,15 @@ # while get_user_model() is valid after INSTALLED_APPS are loaded. -# Signal: Create a Profile (with a wrapped DEK) for the User when the User instance is created +# Signal: Create a Profile (with an encrypted DEK) for the User when the User instance is created @receiver(post_save, sender=User) def save_profile_on_user_post_save(sender, instance, created, **kwargs) -> None: - """When a user is saved, ensure their Profile exists and has a wrapped DEK. + """When a user is saved, ensure their Profile exists and has an encrypted DEK. - On first save (user creation), creates a new Profile and generates a - wrapped Data Encryption Key (DEK) for the user. The DEK is a random + On first save (user creation), creates a new Profile and generates an + encrypted Data Encryption Key (DEK) for the user. The DEK is a random Fernet key encrypted by the server-side master key — see - ``session_utils.create_encrypted_data_encryption_key()`` for details. + ``session_utils.create_encrypted_dek()`` for details. On subsequent saves, just saves the existing Profile (e.g., to propagate any changes from inline formsets). diff --git a/tom_common/tests.py b/tom_common/tests.py index 3f7d9527d..138e14879 100644 --- a/tom_common/tests.py +++ b/tom_common/tests.py @@ -1,25 +1,19 @@ -import datetime from http import HTTPStatus import tempfile import logging -from unittest.mock import MagicMock, patch from cryptography.fernet import Fernet from django.contrib.auth.models import User -from django.contrib.sessions.models import Session from django.contrib.sites.models import Site from django.urls import reverse from django_comments.models import Comment from django.core.paginator import Paginator -from django.db.models import QuerySet from django.test import TestCase, override_settings from django.test.runner import DiscoverRunner -from tom_common.models import UserSession -from tom_common import session_utils # noqa Import the whole module for patching -from tom_common.session_utils import (get_key_from_session_store, get_key_from_session_model, - SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY) +from tom_common.models import Profile +from tom_common import session_utils from tom_targets.tests.factories import SiderealTargetFactory from tom_common.templatetags.tom_common_extras import verbose_name, multiplyby, truncate_value_for_display from tom_common.templatetags.bootstrap4_overrides import bootstrap_pagination @@ -380,284 +374,149 @@ def test_nonexistent_custom_robots_txt(self): assert response.content.startswith(b"User-Agent: *\n") # known a priori from default robots.txt -class TestUserSession(TestCase): - """Test that a UserSession instance is created when a user logs in. - """ - def setUp(self): - # Create a user and log them in. - username = 'testuser' - password = 'testpassword' # noqa - self.user = User.objects.create_user(username=username, password=password) - # don't user client.force_login() here, because it matters how the user logs in - self.client.login(username=username, password=password) - - def test_user_session_created(self): - """The UserSession links the User to the User's SessionStore instance. - - Here we just test that the UserSession instance is created when the user logs in. - The UserSession instance is created in the `user_logged_in` signal receiver, - `tom_common.signals.create_user_session_on_login` - """ - # Check that a UserSession instance is created for the logged-in user. - user_sessions: QuerySet = UserSession.objects.filter(user=self.user) - self.assertEqual(user_sessions.count(), 1) # make sure there's only one - - user_session = user_sessions.first() - self.assertIsInstance(user_session, UserSession) # check that it's a UserSession instance - self.assertEqual(user_session.user, self.user) # that links to the correct User - - def test_user_session_deleted(self): - # Check that the UserSession instance is deleted when the user logs out. - self.client.logout() - user_sessions: QuerySet = UserSession.objects.filter(user=self.user) - self.assertEqual(user_sessions.count(), 0) # there should be none left - - def test_user_session_properties(self): - user_sessions: QuerySet = UserSession.objects.filter(user=self.user) - self.assertEqual(user_sessions.count(), 1) # make sure there's only one - - user_session: UserSession = user_sessions.first() - - session: Session = user_session.session - self.assertIsInstance(session, Session) # make sure it's a Session instance - - class TestEncryptionKeyManagement(TestCase): - def setUp(self): - # Create a user and log in. - username = 'testuser' - password = 'testpassword' - self.user = User.objects.create_user(username=username, password=password) - self.plaintext = f'this is a plaintext test message on {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' - - # Don't use client.login() here, because we need the request to go through the middleware - # in order to create a SessionStore instance. So, this doesn't work: - # self.client.login(username=username, password=password) # NOPE - - # Instead, we use the client.post() method to log in, which will create a SessionStore instance - # as the User is logged in. (it returns an HTTPResponse object, but we don't need it) - _ = self.client.post("/accounts/login/", {"username": username, "password": password}) - - def test_encryption_key_extraction(self): - """The UserSession.session field is a ForeignKey to the the Session model. - So, `user_session.session` should be an instance of the Session model. This is the - dictionary-like object saved in the SessionStore. We use it to hold the User's - encryption key. - - This test checks that after setUp, where we create a user and log them in: - 1. the UserSession instance is created and that there's only one - 2. the encryption key can be extracted from the session store. - 3. the encryption key is a bytes object that can be used to create a Fernet cipher - 4. the Fernet cipher can be used to encrypt and decrypt a plaintext message. - """ - user_sessions: QuerySet = UserSession.objects.filter(user=self.user) - self.assertEqual(user_sessions.count(), 1) # make sure there's only one + """Tests for the envelope encryption architecture. - user_session: UserSession = user_sessions.first() - session: Session = user_session.session - - # extract the encryption key from the session store - encryption_key: bytes = get_key_from_session_model(session) - self.assertIsInstance(encryption_key, bytes) # check that it's a bytes object - cipher = Fernet(encryption_key) # and we can use it to create a Fernet cipher - - # make sure the cipher works (i.e. the key is not weird). + Verifies that: + - Users get an encrypted DEK on creation (via signal) + - The DEK can be decrypted and used to encrypt/decrypt data + - Password changes do not affect the encryption key + - The master key (TOMTOOLKIT_FIELD_ENCRYPTION_KEY) is required for decryption + """ + def setUp(self): + self.user = User.objects.create_user( + username='testuser', password='testpassword', email='test@example.com' + ) + self.plaintext = 'this is a secret observatory API key' + + def test_profile_has_encrypted_dek_after_user_creation(self): + """When a user is created, the post_save signal should generate an + encrypted DEK and store it on their Profile.""" + profile = Profile.objects.get(user=self.user) + self.assertIsNotNone(profile.encrypted_dek) + # The encrypted DEK should be non-empty bytes + self.assertGreater(len(profile.encrypted_dek), 0) + + def test_decrypted_dek_is_valid_fernet_key(self): + """The decrypted DEK should be a valid Fernet key that can + encrypt and decrypt data.""" + profile = Profile.objects.get(user=self.user) + dek = session_utils._decrypt_dek(profile.encrypted_dek) + # Should not raise — a valid Fernet key + cipher = Fernet(dek) ciphertext = cipher.encrypt(self.plaintext.encode()) - decoded_ciphertext = cipher.decrypt(ciphertext).decode() - self.assertEqual(self.plaintext, decoded_ciphertext) - - def test_encryption_key_extraction_from_session_store(self): - """Test that get_key_from_session_store() and get_key_from_session_model() - return the same key. - - Use get_key_from_session_store() when you have a SessionStore instance, - probably from a decorated HTTPRequest. Use get_key_from_session_model() - only have a UserSession instance. - """ - # Get the UserSession instance for the logged-in user. - user_sessions: QuerySet = UserSession.objects.filter(user=self.user) - self.assertEqual(user_sessions.count(), 1) # make sure there's only one in the QuerySet - user_session: UserSession = user_sessions.first() - session: Session = user_session.session - self.assertIsInstance(session, Session) # make sure it's a Session instance - - # To demonstrate the difference between Session and SessionStore: - # Get the key from the Session model instance - key_from_session: bytes = get_key_from_session_model(session) - self.assertIsInstance(key_from_session, bytes) - - # Create a SessionStore... - from django.contrib.sessions.backends.db import SessionStore - # (the session has a session_key we can use to create a SessionStore instance) - session_store = SessionStore(session_key=session.session_key) - # ...and get the key from it - key_from_session_store: bytes = get_key_from_session_store(session_store) - self.assertEqual(key_from_session, key_from_session_store) # check that they are the same - - def test_encryption_key_update_upon_password_change(self): - """Test that the encryption key is updated when the user changes their password. - - The basic structure of this test is to: - 1. log in and make sure the UserSession, SessionStore, encryption_key, etc is working. - 2. change the user's password - 3. log out and back in again with the new password - 4. check that the encryption key in the new session store has changed - - Along the way, we check that the encryption keys (old and new) work and - a cipher created with them can be used to encrypt and decrypt a plaintext - message. - - This test does not test the re-encryption of the model fields up on - a password change. - """ - # Get the UserSession instance for the logged-in user. - user_sessions: QuerySet = UserSession.objects.filter(user=self.user) - self.assertEqual(user_sessions.count(), 1) - user_session: UserSession = user_sessions.first() - session: Session = user_session.session - self.assertIsInstance(session, Session) # make sure it's a Session instance - - # Get the key from the Session model instance - encryption_key: bytes = get_key_from_session_model(session) - self.assertIsInstance(encryption_key, bytes) - - cipher = Fernet(encryption_key) + decrypted = cipher.decrypt(ciphertext).decode() + self.assertEqual(self.plaintext, decrypted) - # Encrypt the plaintext message with the current key. - ciphertext = cipher.encrypt(self.plaintext.encode()) - decoded_ciphertext = cipher.decrypt(ciphertext).decode() - self.assertEqual(self.plaintext, decoded_ciphertext) # check that the key works - - # Change the user's password. - new_password = 'newpassword' # noqa - self.user.set_password(new_password) - self.user.save() # triggers pre_save signal on User model - - # check that the password was changed - self.assertFalse(self.user.check_password('testpassword')) - self.assertTrue(self.user.check_password(new_password)) - - # Log out and back in again to create a new session - self.client.post("/accounts/logout/") - response = self.client.post("/accounts/login/", - {"username": self.user.username, "password": new_password}) - logger.debug(f'login response: {response}') - - # Get the new UserSession instance for the logged-in user. - user_sessions: QuerySet = UserSession.objects.filter(user=self.user) - self.assertEqual(user_sessions.count(), 1) - user_session: UserSession = user_sessions.first() - session: Session = user_session.session - - # Extract the new encryption key from the session store. - new_encryption_key: bytes = get_key_from_session_model(session) - self.assertIsInstance(new_encryption_key, bytes) - - # The new encryption key should be different from the old one. - self.assertNotEqual(new_encryption_key, encryption_key) - - # Encrypt the plaintext message with the new key. - cipher = Fernet(new_encryption_key) + def test_each_user_gets_unique_dek(self): + """Two different users should have different DEKs.""" + other_user = User.objects.create_user( + username='otheruser', password='otherpassword', email='other@example.com' + ) + profile_1 = Profile.objects.get(user=self.user) + profile_2 = Profile.objects.get(user=other_user) + + dek_1 = session_utils._decrypt_dek(profile_1.encrypted_dek) + dek_2 = session_utils._decrypt_dek(profile_2.encrypted_dek) + self.assertNotEqual(dek_1, dek_2) + + def test_password_change_does_not_affect_dek(self): + """Changing a user's password should not change their DEK. + This is a key improvement over the old password-derived scheme.""" + profile = Profile.objects.get(user=self.user) + dek_before = session_utils._decrypt_dek(profile.encrypted_dek) + + # Change password + self.user.set_password('newpassword') + self.user.save() + + profile.refresh_from_db() + dek_after = session_utils._decrypt_dek(profile.encrypted_dek) + self.assertEqual(dek_before, dek_after) + + def test_admin_password_reset_does_not_affect_dek(self): + """When an admin resets another user's password, the user's DEK + should remain intact. In the old scheme, this would destroy all + encrypted data.""" + admin = User.objects.create_superuser( + username='admin', password='admin', email='admin@example.com' + ) + self.client.force_login(admin) + + profile = Profile.objects.get(user=self.user) + dek_before = session_utils._decrypt_dek(profile.encrypted_dek) + + # Admin changes the user's password via the admin view + change_url = reverse('admin-user-change-password', kwargs={'pk': self.user.id}) + self.client.post(change_url, { + 'password': 'admin_reset_password', + 'change_password_form': '1', + }) + + profile.refresh_from_db() + dek_after = session_utils._decrypt_dek(profile.encrypted_dek) + self.assertEqual(dek_before, dek_after) + + def test_get_cipher_for_user(self): + """The internal _get_cipher_for_user should return a working Fernet cipher.""" + cipher = session_utils._get_cipher_for_user(self.user) + self.assertIsInstance(cipher, Fernet) ciphertext = cipher.encrypt(self.plaintext.encode()) - decoded_ciphertext = cipher.decrypt(ciphertext).decode() - - # Check that the new key works. - self.assertEqual(self.plaintext, decoded_ciphertext) + decrypted = cipher.decrypt(ciphertext).decode() + self.assertEqual(self.plaintext, decrypted) + + def test_create_encrypted_dek_produces_valid_encrypted_key(self): + """create_encrypted_dek() should produce bytes that + can be decrypted to a valid Fernet key.""" + encrypted = session_utils.create_encrypted_dek() + self.assertIsInstance(encrypted, bytes) + dek = session_utils._decrypt_dek(encrypted) + # Should be usable as a Fernet key + Fernet(dek) + + def test_master_key_required_for_decryption(self): + """Decrypting with a different master key should fail, proving + that the encrypted DEK is bound to TOMTOOLKIT_FIELD_ENCRYPTION_KEY.""" + profile = Profile.objects.get(user=self.user) + wrong_key = Fernet.generate_key() + wrong_cipher = Fernet(wrong_key) + with self.assertRaises(Exception): + wrong_cipher.decrypt(profile.encrypted_dek) + + def test_preexisting_profile_gets_dek_on_next_save(self): + """A Profile that was created before the encryption system (no DEK) + should get a DEK on the next user save.""" + # Simulate a pre-existing profile with no DEK + profile = Profile.objects.get(user=self.user) + profile.encrypted_dek = None + profile.save() + + # Trigger the post_save signal by saving the user + self.user.save() + + profile.refresh_from_db() + self.assertIsNotNone(profile.encrypted_dek) + # Verify it's a valid encrypted DEK + dek = session_utils._decrypt_dek(profile.encrypted_dek) + Fernet(dek) class TestSignalHandlers(TestCase): + """Tests for signal handlers in signals.py.""" def setUp(self): self.username = 'signaltestuser' self.password = 'signaltestpass' - self.user = User.objects.create_user(username=self.username, password=self.password, email='signal@example.com') - - def test_create_user_session_on_user_logged_in(self): - # Initially, no UserSession for this user - self.assertFalse(UserSession.objects.filter(user=self.user).exists()) - # Log in the user - self.client.login(username=self.username, password=self.password) - # Check UserSession is created - self.assertTrue(UserSession.objects.filter(user=self.user).exists()) - user_session = UserSession.objects.get(user=self.user) - self.assertIsNotNone(user_session.session) - - def test_delete_user_session_on_user_logged_out(self): - # Log in the user to create a session - self.client.login(username=self.username, password=self.password) - self.assertTrue(UserSession.objects.filter(user=self.user).exists()) - session_key = self.client.session.session_key - self.assertTrue(Session.objects.filter(session_key=session_key).exists()) - - # Log out the user - self.client.logout() - - # Check UserSession is deleted - self.assertFalse(UserSession.objects.filter(user=self.user).exists()) - # Check the associated Session is also deleted - self.assertFalse(Session.objects.filter(session_key=session_key).exists()) - - def test_set_cipher_on_user_logged_in(self): - # Log in the user using client.post to simulate form submission with password - self.client.post(reverse('login'), {'username': self.username, 'password': self.password}) - - # Check that the encryption key is in the session - self.assertIn(SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY, self.client.session) - key_from_session_store = self.client.session[SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY] - self.assertIsInstance(key_from_session_store, str) - self.assertIsNotNone(key_from_session_store) - - # Verify the key can be used - try: - retrieved_fernet_key = get_key_from_session_store(self.client.session) - - # retrieved_fernet_key should now be the original Fernet key and usable by Fernet. - Fernet(retrieved_fernet_key) # This should not raise an error if the key is valid - except Exception as e: - self.fail(f"Encryption key from session is not a valid Fernet key: {e}") - - def test_set_cipher_on_user_logged_in_no_password_in_request(self): - # Simulate a login scenario where request.POST does not contain 'password' - # This can happen with force_login or other custom auth backends - # We expect an error to be logged, but the login should still proceed. - # The key won't be set in the session by this specific signal handler. - - # Use force_login which doesn't populate request.POST['password'] - self.client.force_login(self.user) + self.user = User.objects.create_user( + username=self.username, password=self.password, email='signal@example.com' + ) + + def test_profile_created_with_dek_on_user_creation(self): + """The post_save signal should create a Profile with an encrypted DEK + when a new user is created.""" + profile = Profile.objects.get(user=self.user) + self.assertIsNotNone(profile.encrypted_dek) - # Check that the encryption key is NOT in the session from this signal - # (it might be set by other mechanisms, but not by set_cipher_on_user_logged_in) - # We can't directly assert it's not there if other parts of the login process add it. - # Instead, we check that our logger.error was called. - with self.assertLogs('tom_common.signals', level='ERROR') as cm: - # Manually trigger the signal with a mock request that has no POST data - from django.contrib.auth.signals import user_logged_in - mock_request = MagicMock() - mock_request.POST = {} # No password - mock_request.session = self.client.session # Use the actual session object - user_logged_in.send(sender=self.user.__class__, request=mock_request, user=self.user) - self.assertIn(f'User {self.username} logged in without a password. Cannot create encryption key.', cm.output[0]) - # Key should not be in session from *this* signal call - self.assertNotIn(SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY, mock_request.session) - - def test_clear_encryption_key_on_user_logged_out(self): - # Log in and ensure key is set - self.client.post(reverse('login'), {'username': self.username, 'password': self.password}) - self.assertIn(SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY, self.client.session) - - # Log out - self.client.logout() - - # Check that the encryption key is removed from the session - self.assertNotIn(SESSION_KEY_FOR_CIPHER_ENCRYPTION_KEY, self.client.session) - - @patch('tom_common.signals.session_utils.reencrypt_data') - def test_user_updated_on_user_pre_save_password_changed(self, mock_reencrypt): - self.user.set_password('newpassword123') - self.user.save() # Triggers pre_save signal - mock_reencrypt.assert_called_once_with(self.user) - - @patch('tom_common.signals.session_utils.reencrypt_data') - def test_user_updated_on_user_pre_save_password_not_changed(self, mock_reencrypt): - self.user.first_name = 'Signal' - self.user.save() # Triggers pre_save signal - mock_reencrypt.assert_not_called() + def test_drf_token_created_on_user_creation(self): + """The post_save signal should create a DRF auth token for new users.""" + from rest_framework.authtoken.models import Token + self.assertTrue(Token.objects.filter(user=self.user).exists()) From bf02e1126b01d309e420877feccca520a23edde5 Mon Sep 17 00:00:00 2001 From: "William (Lindy) Lindstrom" Date: Mon, 30 Mar 2026 15:45:07 -0700 Subject: [PATCH 05/14] add management command to rotate field encryption key --- tom_common/management/__init__.py | 0 tom_common/management/commands/__init__.py | 0 .../commands/rotate_field_encryption_key.py | 73 +++++++++++++ tom_common/session_utils.py | 103 +++++++++++++++++- 4 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 tom_common/management/__init__.py create mode 100644 tom_common/management/commands/__init__.py create mode 100644 tom_common/management/commands/rotate_field_encryption_key.py diff --git a/tom_common/management/__init__.py b/tom_common/management/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tom_common/management/commands/__init__.py b/tom_common/management/commands/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tom_common/management/commands/rotate_field_encryption_key.py b/tom_common/management/commands/rotate_field_encryption_key.py new file mode 100644 index 000000000..39953372a --- /dev/null +++ b/tom_common/management/commands/rotate_field_encryption_key.py @@ -0,0 +1,73 @@ +"""Management command to rotate the TOMTOOLKIT_FIELD_ENCRYPTION_KEY. + +This is a thin CLI wrapper around ``session_utils.rotate_master_key()``. +See that function for the actual rotation logic. + +Usage: + 1. Generate a new Fernet key: + python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" + 2. Run the rotation: + python manage.py rotate_field_encryption_key --new-key + 3. Update your environment / settings.py with the new key. + 4. Restart the server. +""" +from __future__ import annotations + +from django.core.management.base import BaseCommand, CommandError + +from tom_common.session_utils import rotate_master_key + + +class Command(BaseCommand): + help = ( + 'Re-encrypts all per-user Data Encryption Keys (DEKs) with a new master key. ' + 'Run this when rotating TOMTOOLKIT_FIELD_ENCRYPTION_KEY.' + ) + + def add_arguments(self, parser) -> None: + parser.add_argument( + '--new-key', + required=True, + help='The new Fernet master key (URL-safe base64-encoded, 32 bytes). ' + 'Generate with: python -c "from cryptography.fernet import Fernet; ' + 'print(Fernet.generate_key().decode())"', + ) + + def handle(self, *args, **options) -> None: + new_key: str = options['new_key'] + + try: + result = rotate_master_key(new_key) + except ValueError as e: + raise CommandError(str(e)) + except Exception as e: + raise CommandError(f"Cannot access current master key: {e}") + + if result.total == 0: + self.stdout.write(self.style.WARNING( + "No profiles with encryption keys found. Nothing to rotate." + )) + return + + self.stdout.write(f"Re-encrypting DEKs for {result.total} profile(s)...") + + if result.success_count: + self.stdout.write(self.style.SUCCESS( + f"Done. {result.success_count} re-encrypted successfully." + )) + + for error in result.errors: + self.stderr.write(self.style.ERROR( + f" FAILED: Profile pk={error.profile_pk} (user={error.username}) — {error.error}" + )) + + if result.error_count: + self.stdout.write(self.style.ERROR( + f"{result.error_count} failed — see errors above." + )) + + self.stdout.write("") + self.stdout.write(self.style.WARNING( + "IMPORTANT: Update TOMTOOLKIT_FIELD_ENCRYPTION_KEY in your environment / " + "settings.py with the new key, then restart the server." + )) diff --git a/tom_common/session_utils.py b/tom_common/session_utils.py index bd6fdcdd2..6693924b7 100644 --- a/tom_common/session_utils.py +++ b/tom_common/session_utils.py @@ -32,9 +32,10 @@ from __future__ import annotations import logging +from dataclasses import dataclass, field from typing import Optional, TypeVar -from cryptography.fernet import Fernet +from cryptography.fernet import Fernet, InvalidToken from django.conf import settings from django.db import models @@ -222,3 +223,103 @@ def set_encrypted_field(user, finally: if hasattr(model_instance, '_cipher'): del model_instance._cipher # type: ignore[attr-defined] + + +# --------------------------------------------------------------------------- +# Master key rotation +# --------------------------------------------------------------------------- + +@dataclass +class RotationError: + """Details about a single Profile that failed during key rotation.""" + profile_pk: int + username: str + error: str + + +@dataclass +class RotationResult: + """Result of a master key rotation operation. + + Attributes: + success_count: Number of Profiles whose DEKs were successfully re-encrypted. + errors: Per-profile details for any that failed. + """ + success_count: int = 0 + errors: list[RotationError] = field(default_factory=list) + + @property + def error_count(self) -> int: + return len(self.errors) + + @property + def total(self) -> int: + return self.success_count + self.error_count + + +def rotate_master_key(new_key: str) -> RotationResult: + """Re-encrypt all per-user DEKs with a new master key. + + Each Profile's ``encrypted_dek`` is decrypted with the current master key + (from ``TOMTOOLKIT_FIELD_ENCRYPTION_KEY``) and re-encrypted with + ``new_key``. The user Profile's plaintext DEK is unchanged — only its + encryption layer (i.e. `encrypted_dek`) is replaced. The actual encrypted + data is not touched. + + After this function completes successfully, the server's + ``TOMTOOLKIT_FIELD_ENCRYPTION_KEY`` must be updated to ``new_key`` and the + server restarted. Until that happens, the re-encrypted DEKs cannot be + decrypted. + + Args: + new_key: The new Fernet master key as a string (URL-safe base64, 44 chars). + + Returns: + A ``RotationResult`` with per-profile success/error details. + + Raises: + ValueError: If ``new_key`` is not a valid Fernet key. + django.core.exceptions.ImproperlyConfigured: If the current master key + is missing or empty. + """ + # Validate the new key before touching any data. + try: + new_master_cipher = Fernet(new_key.encode()) + except Exception as e: + raise ValueError(f"Invalid new key: {e}") from e + + # Build the old master cipher from current settings. + # Raises ImproperlyConfigured if missing — intentionally not caught here. + old_master_cipher = _get_master_cipher() + + profiles = Profile.objects.exclude(encrypted_dek=None) + result = RotationResult() + + for profile in profiles.iterator(): + try: + encrypted_dek = profile.encrypted_dek + # Handle memoryview from PostgreSQL + if isinstance(encrypted_dek, memoryview): + encrypted_dek = encrypted_dek.tobytes() + + # Decrypt with old key, re-encrypt with new key + plaintext_dek: bytes = old_master_cipher.decrypt(encrypted_dek) + new_encrypted_dek: bytes = new_master_cipher.encrypt(plaintext_dek) + + profile.encrypted_dek = new_encrypted_dek + profile.save(update_fields=['encrypted_dek']) + result.success_count += 1 + except InvalidToken: + result.errors.append(RotationError( + profile_pk=profile.pk, + username=profile.user.username, + error="could not decrypt with current master key", + )) + except Exception as e: + result.errors.append(RotationError( + profile_pk=profile.pk, + username=profile.user.username, + error=str(e), + )) + + return result From dfcfce61136d28dd340db969861743716ea60d8d Mon Sep 17 00:00:00 2001 From: "William (Lindy) Lindstrom" Date: Mon, 30 Mar 2026 16:17:30 -0700 Subject: [PATCH 06/14] add tests for TOMTOOLKIT_FIELD_ENCRYPTION_KEY rotation --- tom_common/tests.py | 137 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/tom_common/tests.py b/tom_common/tests.py index 138e14879..54cd2446a 100644 --- a/tom_common/tests.py +++ b/tom_common/tests.py @@ -501,6 +501,143 @@ def test_preexisting_profile_gets_dek_on_next_save(self): Fernet(dek) +class TestMasterKeyRotation(TestCase): + """Tests for ``session_utils.rotate_master_key()``. + + Verifies that master key rotation re-encrypts all per-user DEKs correctly, + preserves the plaintext DEK (so existing encrypted data remains readable), + and handles edge cases like invalid keys, missing DEKs, and corrupted DEKs. + """ + def setUp(self): + self.user = User.objects.create_user( + username='rotateuser', password='rotatepass', email='rotate@example.com' + ) + self.new_key: str = Fernet.generate_key().decode() + # Count profiles with DEKs at the start — other apps (e.g., guardian) + # may have created users with profiles during test setup. + self.baseline_dek_count = Profile.objects.exclude(encrypted_dek=None).count() + + def test_rotation_re_encrypts_deks(self): + """After rotation, the DEK on disk should be different (re-encrypted + with the new key) but the plaintext DEK should be the same.""" + profile = Profile.objects.get(user=self.user) + old_encrypted_dek = bytes(profile.encrypted_dek) + dek_before = session_utils._decrypt_dek(profile.encrypted_dek) + + result = session_utils.rotate_master_key(self.new_key) + + self.assertEqual(result.success_count, self.baseline_dek_count) + self.assertEqual(result.error_count, 0) + + profile.refresh_from_db() + # The encrypted representation should have changed + self.assertNotEqual(bytes(profile.encrypted_dek), old_encrypted_dek) + # But decrypting with the NEW key should yield the same plaintext DEK + new_master_cipher = Fernet(self.new_key) + dek_after = new_master_cipher.decrypt(profile.encrypted_dek) + self.assertEqual(dek_before, dek_after) + + def test_encrypted_data_survives_rotation(self): + """Data encrypted with a user's DEK before rotation should still be + decryptable after rotation, since the plaintext DEK is unchanged.""" + # Encrypt some data with the user's DEK + cipher_before = session_utils._get_cipher_for_user(self.user) + secret = b'my observatory API key' + ciphertext = cipher_before.encrypt(secret) + + # Rotate the master key + session_utils.rotate_master_key(self.new_key) + + # Decrypt the DEK with the new master key and verify the data + profile = Profile.objects.get(user=self.user) + new_master_cipher = Fernet(self.new_key) + dek = new_master_cipher.decrypt(profile.encrypted_dek) + cipher_after = Fernet(dek) + self.assertEqual(cipher_after.decrypt(ciphertext), secret) + + def test_rotation_handles_multiple_users(self): + """Rotation should re-encrypt DEKs for all users.""" + User.objects.create_user(username='user2', password='pass2', email='u2@example.com') + User.objects.create_user(username='user3', password='pass3', email='u3@example.com') + + result = session_utils.rotate_master_key(self.new_key) + + self.assertEqual(result.success_count, self.baseline_dek_count + 2) + self.assertEqual(result.error_count, 0) + self.assertEqual(result.total, self.baseline_dek_count + 2) + + def test_rotation_with_no_profiles(self): + """Rotation with no DEKs should return a zero-count result, not an error.""" + # Clear all DEKs + Profile.objects.update(encrypted_dek=None) + + result = session_utils.rotate_master_key(self.new_key) + + self.assertEqual(result.success_count, 0) + self.assertEqual(result.error_count, 0) + self.assertEqual(result.total, 0) + + def test_rotation_rejects_invalid_new_key(self): + """An invalid Fernet key should raise ValueError before any data is touched.""" + profile = Profile.objects.get(user=self.user) + encrypted_dek_before = bytes(profile.encrypted_dek) + + with self.assertRaises(ValueError): + session_utils.rotate_master_key('not-a-valid-fernet-key') + + # Verify nothing was modified + profile.refresh_from_db() + self.assertEqual(bytes(profile.encrypted_dek), encrypted_dek_before) + + def test_rotation_skips_profiles_without_dek(self): + """Profiles with encrypted_dek=None should be excluded from rotation.""" + # Create a second user and clear their DEK to simulate a pre-encryption user + user_no_dek = User.objects.create_user( + username='nodekuser', password='nodekpass', email='nodek@example.com' + ) + profile_no_dek = Profile.objects.get(user=user_no_dek) + profile_no_dek.encrypted_dek = None + profile_no_dek.save() + + result = session_utils.rotate_master_key(self.new_key) + + # The no-DEK profile should be excluded from rotation + self.assertEqual(result.success_count, self.baseline_dek_count) + self.assertEqual(result.error_count, 0) + + # The no-DEK profile should still be None + profile_no_dek.refresh_from_db() + self.assertIsNone(profile_no_dek.encrypted_dek) + + def test_rotation_partial_failure(self): + """If one profile's DEK can't be decrypted, rotation should still + succeed for the other profiles and report the failure.""" + # Create a second user with a valid DEK + user2 = User.objects.create_user( + username='user2', password='pass2', email='u2@example.com' + ) + + # Corrupt the first user's DEK by encrypting it with a different key + wrong_key_cipher = Fernet(Fernet.generate_key()) + profile = Profile.objects.get(user=self.user) + profile.encrypted_dek = wrong_key_cipher.encrypt(b'garbage') + profile.save() + + result = session_utils.rotate_master_key(self.new_key) + + # All profiles except the corrupted one should succeed + self.assertEqual(result.success_count, self.baseline_dek_count) + self.assertEqual(result.error_count, 1) + self.assertEqual(result.errors[0].username, 'rotateuser') + self.assertIn('current master key', result.errors[0].error) + + # Verify user2's DEK was actually rotated + profile2 = Profile.objects.get(user=user2) + new_master_cipher = Fernet(self.new_key) + dek = new_master_cipher.decrypt(profile2.encrypted_dek) + Fernet(dek) # Should not raise + + class TestSignalHandlers(TestCase): """Tests for signal handlers in signals.py.""" def setUp(self): From 6a1365988ed5b483a3265346e38231acc0862240 Mon Sep 17 00:00:00 2001 From: "William (Lindy) Lindstrom" Date: Wed, 1 Apr 2026 13:04:25 -0700 Subject: [PATCH 07/14] Update encrypted_dek field comment on Profile model Shorten the comment and add a note explaining that BinaryField is excluded by Django's model_to_dict(), which is why encrypted_dek intentionally does not appear on the user Profile card. --- tom_common/models.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tom_common/models.py b/tom_common/models.py index efc9bcc1d..5bc8fae46 100644 --- a/tom_common/models.py +++ b/tom_common/models.py @@ -44,11 +44,11 @@ class Profile(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE) affiliation = models.CharField(max_length=100, null=True, blank=True) - # The user's Data Encryption Key (DEK), encrypted by the master key. - # This is a Fernet-encrypted blob of the user's random DEK. It can only be - # decrypted using TOMTOOLKIT_FIELD_ENCRYPTION_KEY from the server environment. - # Null means no DEK has been generated yet (e.g., pre-existing users before - # this feature was added). + # The user's Data Encryption Key (DEK), encrypted by the master key + # (TOMTOOLKIT_DEK_ENCRYPTION_KEY). Generated on first user save; null for + # users created before this feature who haven't logged in yet. + # BinaryField is excluded by model_to_dict(), so this intentionally does + # not appear on the user Profile card. encrypted_dek = models.BinaryField(null=True, blank=True) def __str__(self) -> str: From 77680df12492cd6f5e445fcc8b149cdf37221bd9 Mon Sep 17 00:00:00 2001 From: "William (Lindy) Lindstrom" Date: Wed, 1 Apr 2026 13:06:08 -0700 Subject: [PATCH 08/14] Remove unneeded password-change warning from user update form This warning isn't needed with the new ecryption scheme. --- tom_common/templates/tom_common/create_user.html | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tom_common/templates/tom_common/create_user.html b/tom_common/templates/tom_common/create_user.html index 482b905ef..fbe1accbf 100644 --- a/tom_common/templates/tom_common/create_user.html +++ b/tom_common/templates/tom_common/create_user.html @@ -10,10 +10,6 @@ {% csrf_token %} {% bootstrap_form form %} {% bootstrap_formset form.user_profile_formset %} - {% if object.pk != current_user.pk %} -

WARNING: Changing the password for user {{ object.username }} will clear out all of - their saved external service API keys and passwords (if any).

- {% endif %} {% buttons %}