From 13dbf6cec4735788a5f376999b9112debadd53a1 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sun, 12 Apr 2026 19:14:55 +0200 Subject: [PATCH 1/4] Migrated items in taskStateDB to proper dict schema. No more pickle shenanigans. --- ...26_04_12_1847-925cf8989fbc_item_pending.py | 208 ++++++++++++++++++ .../beets_flask/database/models/pending.py | 83 +++++++ backend/beets_flask/database/models/states.py | 30 ++- 3 files changed, 309 insertions(+), 12 deletions(-) create mode 100644 backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py create mode 100644 backend/beets_flask/database/models/pending.py diff --git a/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py new file mode 100644 index 00000000..ed3b0276 --- /dev/null +++ b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py @@ -0,0 +1,208 @@ +"""item pending + +Revision ID: 925cf8989fbc +Revises: a986c03d9ba3 +Create Date: 2026-04-12 18:47:43.218344 + +README: +Historically, task state items were stored as binary (pickle) blobs in the database. +This approach has proven to be brittle and difficult to maintain. In particular, +changes and upgrades in beets break deserialization, requiring manual +intervention to recover or migrate data. +""" + +from collections.abc import Sequence +from datetime import datetime +import io +import pickle +from uuid import uuid4 + +import sqlalchemy as sa +from beets_flask import log +from beets_flask.database.models.pending import BeetsItemType +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = "925cf8989fbc" +down_revision: str | Sequence[str] | None = "a986c03d9ba3" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +class ModelStub: + def __init__(self, *args, **kwargs): + self._values_fixed = {} + self._values_flex = {} + self._db = None + + def __setstate__(self, state): + self.__dict__.update(state) + self._model_cls = None # must be reattached externally + + def __getstate__(self): + return self.__dict__ + + # --- mimic beets resolution --- + def __getitem__(self, key): + if "_values_fixed" in self.__dict__ and key in self._values_fixed: + return self._values_fixed[key] + + if "_values_flex" in self.__dict__ and key in self._values_flex: + return self._values_flex[key] + + if key in self.__dict__: + return self.__dict__[key] + + raise KeyError(key) + + def __getattr__(self, name): + try: + return self[name] + except KeyError: + raise AttributeError(name) + + def __setattr__(self, name, value): + # keep internal structure intact + if name in ("_values_fixed", "_values_flex", "_db"): + self.__dict__[name] = value + else: + self.__dict__[name] = value + + +class LazyConvertDictStub: + def __init__(self, *args, **kwargs): + self._data = {} + self._converted = {} + self.model_cls = None # Don't enforce model_cls, keep it flexible + + def __setstate__(self, state): + self.__dict__.update(state) + self.model_cls = None + + def __getstate__(self): + return self.__dict__ + + def __getitem__(self, key): + if key in self._converted: + return self._converted[key] + if key in self._data: + return self._data[key] + raise KeyError(key) + + def __setitem__(self, key, value): + self._converted[key] = value + + def __contains__(self, key): + return key in self._converted or key in self._data + + def keys(self): + return list(self._converted.keys()) + list(self._data.keys()) + + def __iter__(self): + return iter(self.keys()) + + def items(self): + for key in self: + yield key, self[key] + + +class ItemsUnpickler(pickle.Unpickler): + CLASS_MAP = { + ("beets.dbcore.db", "LazyConvertDict"): LazyConvertDictStub, + ("beets.library", "Item"): ModelStub, + ("beets.library.models", "Item"): ModelStub, + } + + def find_class(self, module, name): + """Override the find_class method to redirect Distance class references.""" + key = (module, name) + if key not in self.CLASS_MAP: + print(f"WARNING: Unknown class not in migration map: {module}.{name}") + return dict # Fallback for unknown classes + return self.CLASS_MAP[key] + + +def load_items(blob: bytes) -> list[ModelStub]: + return ItemsUnpickler(io.BytesIO(blob)).load() + + +def migrate_data(): + conn = op.get_bind() + meta = sa.MetaData() + + task_pending_items = sa.Table("task_pending_items", meta, autoload_with=conn) + + result = conn.execute(sa.text("SELECT id, items FROM task WHERE items IS NOT NULL")) + for row in result: + task_id = row[0] + items_blob = row[1] + + try: + items = load_items(items_blob) + except Exception as e: + log.error(f"Failed to unpickle task {task_id}: {e}") + continue + + rows = [] + now = datetime.utcnow() + for stub in items: + rows.append( + { + "id": str(uuid4()), + "created_at": now, + "updated_at": now, + "task_id": task_id, + "item": { + "fixed_values": BeetsItemType._encode( + dict(stub._values_fixed.items()) + ), + "flex_values": BeetsItemType._encode( + dict(stub._values_flex.items()) + ), + }, + } + ) + + if rows: + conn.execute( + task_pending_items.insert(), + rows, + ) + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "task_pending_items", + sa.Column("id", sa.String(), nullable=False), + sa.Column("task_id", sa.String(), nullable=False), + sa.Column("item", BeetsItemType(), nullable=False), + sa.PrimaryKeyConstraint("id"), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["task_id"], + ["task.id"], + ), + ) + op.create_index( + op.f("ix_task_pending_items_created_at"), + "task_pending_items", + ["created_at"], + unique=False, + ) + + migrate_data() + + op.drop_column("task", "items") + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("task", sa.Column("items", sa.BLOB(), nullable=False)) + op.drop_table("task_pending_items") + # ### end Alembic commands ### diff --git a/backend/beets_flask/database/models/pending.py b/backend/beets_flask/database/models/pending.py new file mode 100644 index 00000000..3cdea974 --- /dev/null +++ b/backend/beets_flask/database/models/pending.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import base64 +from typing import TYPE_CHECKING + +from sqlalchemy import JSON, ForeignKey +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.types import TypeDecorator + +from beets_flask.importer.types import BeetsItem + +from .base import Base + +if TYPE_CHECKING: + from .states import TaskStateInDb + + +class BeetsItemType(TypeDecorator[BeetsItem]): + impl = JSON + + @classmethod + def _encode(cls, v): + if isinstance(v, bytes): + return { + "__type__": "bytes", + "data": base64.b64encode(v).decode("ascii"), + } + + if isinstance(v, dict): + return {str(k): cls._encode(val) for k, val in v.items()} + + if isinstance(v, list): + return [cls._encode(x) for x in v] + + return v + + @classmethod + def _decode(cls, v): + if isinstance(v, dict): + if v.get("__type__") == "bytes": + return base64.b64decode(v["data"]) + return {k: cls._decode(val) for k, val in v.items()} + + if isinstance(v, list): + return [cls._decode(x) for x in v] + + return v + + def process_bind_param(self, value: BeetsItem | None, dialect): + if value is None or not value: + return None + + return { + "fixed_values": { + k: self._encode(v) for k, v in value._values_fixed.items() + }, + "flex_values": {k: self._encode(v) for k, v in value._values_flex.items()}, + } + + def process_result_value(self, value, dialect): + if value is None: + return None + + return BeetsItem._awaken( + fixed_values={ + k: self._decode(v) for k, v in value.get("fixed_values", {}).items() + }, + flex_values={ + k: self._decode(v) for k, v in value.get("flex_values", {}).items() + }, + ) + + +class TaskPendingItem(Base): + __tablename__ = "task_pending_items" + + task_id: Mapped[int] = mapped_column(ForeignKey("task.id")) + task: Mapped[TaskStateInDb] = relationship(back_populates="pending_items") + item: Mapped[BeetsItem] = mapped_column(BeetsItemType()) + + def __init__(self, item: BeetsItem, id: str | None = None): + super().__init__(id) + self.item = item diff --git a/backend/beets_flask/database/models/states.py b/backend/beets_flask/database/models/states.py index 2e8e5712..72a086b5 100644 --- a/backend/beets_flask/database/models/states.py +++ b/backend/beets_flask/database/models/states.py @@ -21,7 +21,6 @@ from beets.autotag import AlbumMatch from beets.autotag.distance import Distance from beets.importer import Action, ImportTask -from beets.library.models import Item as LibraryItem from sqlalchemy import ( ForeignKey, UniqueConstraint, @@ -45,10 +44,12 @@ SessionState, TaskState, ) -from beets_flask.importer.types import BeetsAlbumMatch, BeetsTrackMatch +from beets_flask.importer.types import BeetsAlbumMatch, BeetsItem, BeetsTrackMatch from beets_flask.logger import log from beets_flask.server.exceptions import SerializedException +from .pending import TaskPendingItem + class FolderInDb(Base): """Represents a folder on disk, to keep track of changes. @@ -363,7 +364,9 @@ class TaskStateInDb(Base): old_paths: Mapped[bytes | None] # old_paths contain original file paths, but are only set when files are moved. # (which breaks some deep links that before were identical to paths, but no more!) - items: Mapped[bytes] + pending_items: Mapped[list[TaskPendingItem]] = relationship( + cascade="all, delete-orphan" + ) choice_flag: Mapped[Action | None] # To allow for continue we need to store the current artist and album @@ -374,13 +377,21 @@ class TaskStateInDb(Base): progress: Mapped[Progress] + @property + def items(self) -> list[BeetsItem]: + return [row.item for row in self.pending_items] + + @items.setter + def items(self, value: list[BeetsItem]): + self.pending_items = [TaskPendingItem(item=v) for v in value] + def __init__( self, id: str | None = None, toppath: bytes | None = None, paths: list[bytes] = [], old_paths: list[bytes] | None = None, - items: list[LibraryItem] = [], + items: list[BeetsItem] = [], candidates: list[CandidateStateInDb] = [], chosen_candidate_id: str | None = None, progress: Progress = Progress.NOT_STARTED, @@ -393,12 +404,7 @@ def __init__( self.paths = pickle.dumps(paths) self.old_paths = pickle.dumps(old_paths) if old_paths else None - for item in items: - # Remove db from all items as it can't be pickled - item._db = None - item._Item__album = None - - self.items = pickle.dumps(items) + self.items = items self.candidates = candidates self.chosen_candidate_id = chosen_candidate_id self.progress = progress @@ -418,7 +424,7 @@ def from_live_state(cls, state: TaskState) -> TaskStateInDb: id=state.id, toppath=str(state.toppath).encode("utf-8") if state.toppath else None, paths=state.task.paths, - items=state.task.items, + items=state.items, candidates=[ CandidateStateInDb.from_live_state(c) for c in state.candidate_states ], @@ -438,7 +444,7 @@ def to_live_state(self, session_state: SessionState | None = None) -> TaskState: beets_task = ImportTask( toppath=self.toppath, paths=pickle.loads(self.paths), - items=pickle.loads(self.items), + items=self.items, ) beets_task.choice_flag = self.choice_flag beets_task.cur_artist = self.cur_artist From ecf3103c8000e5ac447a1063d7ff6c6362f1d4fc Mon Sep 17 00:00:00 2001 From: pSpitzner Date: Mon, 13 Apr 2026 19:52:35 +0200 Subject: [PATCH 2/4] Added some comments / restructured. --- ...26_04_12_1847-925cf8989fbc_item_pending.py | 175 +++++++++--------- .../beets_flask/database/models/pending.py | 19 ++ 2 files changed, 110 insertions(+), 84 deletions(-) diff --git a/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py index ed3b0276..c2869a1b 100644 --- a/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py +++ b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py @@ -9,8 +9,15 @@ This approach has proven to be brittle and difficult to maintain. In particular, changes and upgrades in beets break deserialization, requiring manual intervention to recover or migrate data. + +For the unpickling to work, we would rely on beets class definitions – which are likely +to change over time. Thus, we have a custom unpickler, and mocked beets classes, which +will give the right structures, even past beets 2.5.1. Beware, this also holds for +our own classes (like BeetsItemType) which we will need to make copies of once we +change them. """ +from __future__ import annotations from collections.abc import Sequence from datetime import datetime import io @@ -30,6 +37,90 @@ depends_on: str | Sequence[str] | None = None +def upgrade() -> None: + """Upgrade schema.""" + op.create_table( + "task_pending_items", + sa.Column("id", sa.String(), nullable=False), + sa.Column("task_id", sa.String(), nullable=False), + sa.Column("item", BeetsItemType(), nullable=False), + sa.PrimaryKeyConstraint("id"), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["task_id"], + ["task.id"], + ), + ) + op.create_index( + op.f("ix_task_pending_items_created_at"), + "task_pending_items", + ["created_at"], + unique=False, + ) + + migrate_data() + + op.drop_column("task", "items") + + +def downgrade() -> None: + """Downgrade schema.""" + op.add_column("task", sa.Column("items", sa.BLOB(), nullable=False)) + op.drop_table("task_pending_items") + + +def migrate_data(): + conn = op.get_bind() + meta = sa.MetaData() + + task_pending_items = sa.Table("task_pending_items", meta, autoload_with=conn) + + result = conn.execute(sa.text("SELECT id, items FROM task WHERE items IS NOT NULL")) + for row in result: + task_id = row[0] + items_blob = row[1] + + try: + items = load_items(items_blob) + except Exception as e: + log.error(f"Failed to unpickle task {task_id}: {e}") + continue + + rows = [] + now = datetime.utcnow() + for stub in items: + rows.append( + { + "id": str(uuid4()), + "created_at": now, + "updated_at": now, + "task_id": task_id, + "item": { + "fixed_values": BeetsItemType._encode( + dict(stub._values_fixed.items()) + ), + "flex_values": BeetsItemType._encode( + dict(stub._values_flex.items()) + ), + }, + } + ) + + if rows: + conn.execute( + task_pending_items.insert(), + rows, + ) + + +def load_items(blob: bytes) -> list[ModelStub]: + return ItemsUnpickler(io.BytesIO(blob)).load() + + +# --------------------------- Mocked Beets Classes --------------------------- # + + class ModelStub: def __init__(self, *args, **kwargs): self._values_fixed = {} @@ -122,87 +213,3 @@ def find_class(self, module, name): return dict # Fallback for unknown classes return self.CLASS_MAP[key] - -def load_items(blob: bytes) -> list[ModelStub]: - return ItemsUnpickler(io.BytesIO(blob)).load() - - -def migrate_data(): - conn = op.get_bind() - meta = sa.MetaData() - - task_pending_items = sa.Table("task_pending_items", meta, autoload_with=conn) - - result = conn.execute(sa.text("SELECT id, items FROM task WHERE items IS NOT NULL")) - for row in result: - task_id = row[0] - items_blob = row[1] - - try: - items = load_items(items_blob) - except Exception as e: - log.error(f"Failed to unpickle task {task_id}: {e}") - continue - - rows = [] - now = datetime.utcnow() - for stub in items: - rows.append( - { - "id": str(uuid4()), - "created_at": now, - "updated_at": now, - "task_id": task_id, - "item": { - "fixed_values": BeetsItemType._encode( - dict(stub._values_fixed.items()) - ), - "flex_values": BeetsItemType._encode( - dict(stub._values_flex.items()) - ), - }, - } - ) - - if rows: - conn.execute( - task_pending_items.insert(), - rows, - ) - - -def upgrade() -> None: - """Upgrade schema.""" - # ### commands auto generated by Alembic - please adjust! ### - op.create_table( - "task_pending_items", - sa.Column("id", sa.String(), nullable=False), - sa.Column("task_id", sa.String(), nullable=False), - sa.Column("item", BeetsItemType(), nullable=False), - sa.PrimaryKeyConstraint("id"), - sa.Column("created_at", sa.DateTime(), nullable=False), - sa.Column("updated_at", sa.DateTime(), nullable=False), - sa.ForeignKeyConstraint( - ["task_id"], - ["task.id"], - ), - ) - op.create_index( - op.f("ix_task_pending_items_created_at"), - "task_pending_items", - ["created_at"], - unique=False, - ) - - migrate_data() - - op.drop_column("task", "items") - # ### end Alembic commands ### - - -def downgrade() -> None: - """Downgrade schema.""" - # ### commands auto generated by Alembic - please adjust! ### - op.add_column("task", sa.Column("items", sa.BLOB(), nullable=False)) - op.drop_table("task_pending_items") - # ### end Alembic commands ### diff --git a/backend/beets_flask/database/models/pending.py b/backend/beets_flask/database/models/pending.py index 3cdea974..6e395135 100644 --- a/backend/beets_flask/database/models/pending.py +++ b/backend/beets_flask/database/models/pending.py @@ -16,6 +16,23 @@ class BeetsItemType(TypeDecorator[BeetsItem]): + """ + We do not implement items in full detail yet, but keep them as a serialized json. + + The transformation is relatively short, because items in beets are already + in a format for (their) database. + + Notes: + - this type is relevant, because we want to save items __before__ beets does, so + that we can resume imports. + - in beets, the fixed_ and flex_values are computed at runtime and not stored + 1:1 in beets' internal library: + - fixed are their own columns + - flex are linked in an extra table + - we need them here, but not as pickle (no need for functions/classes, and including + functions etc makes migrations a lot harder) + """ + impl = JSON @classmethod @@ -47,6 +64,7 @@ def _decode(cls, v): return v def process_bind_param(self, value: BeetsItem | None, dialect): + """Transform from live object into serialized json in database.""" if value is None or not value: return None @@ -58,6 +76,7 @@ def process_bind_param(self, value: BeetsItem | None, dialect): } def process_result_value(self, value, dialect): + """Transform from serialized json in database to live object.""" if value is None: return None From 4b875fd1f0b1292453637fe496739257b8ddc012 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Tue, 14 Apr 2026 11:55:48 +0200 Subject: [PATCH 3/4] Enhanced comment. --- .../beets_flask/database/models/pending.py | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/backend/beets_flask/database/models/pending.py b/backend/beets_flask/database/models/pending.py index 6e395135..2830bc99 100644 --- a/backend/beets_flask/database/models/pending.py +++ b/backend/beets_flask/database/models/pending.py @@ -16,21 +16,25 @@ class BeetsItemType(TypeDecorator[BeetsItem]): - """ - We do not implement items in full detail yet, but keep them as a serialized json. - - The transformation is relatively short, because items in beets are already - in a format for (their) database. - - Notes: - - this type is relevant, because we want to save items __before__ beets does, so - that we can resume imports. - - in beets, the fixed_ and flex_values are computed at runtime and not stored - 1:1 in beets' internal library: - - fixed are their own columns - - flex are linked in an extra table - - we need them here, but not as pickle (no need for functions/classes, and including - functions etc makes migrations a lot harder) + """Serializer and Deserializer for Beets Item class. + + This component serializes Beets items by storing their fixed and flex fields + as JSON. Full object queryability is not implemented yet; instead, items are + preserved in a lightweight serialized form. + + The transformation is intentionally minimal because Beets items are already + structured for database persistence. + + + Notes + ----- + - Items are persisted here before Beets writes them to the beets db, we need this + to allow import processes to be resumed safely. + - In Beets, fixed and flex values are stored differently: + - fixed fields are stored as standard columns + - flex fields are stored in a separate linked table + - Pickling is avoided to ensure portability and migration stability (no + functions or runtime state are serialized). """ impl = JSON From 49396022d83709965c17d31dcac710c49e0127b3 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Thu, 16 Apr 2026 17:49:41 +0200 Subject: [PATCH 4/4] Minor fixes --- .../2026_04_12_1847-925cf8989fbc_item_pending.py | 11 ++++++++--- backend/beets_flask/database/models/pending.py | 3 ++- backend/beets_flask/database/models/states.py | 3 ++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py index c2869a1b..e0cadd1c 100644 --- a/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py +++ b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py @@ -209,7 +209,12 @@ def find_class(self, module, name): """Override the find_class method to redirect Distance class references.""" key = (module, name) if key not in self.CLASS_MAP: - print(f"WARNING: Unknown class not in migration map: {module}.{name}") - return dict # Fallback for unknown classes + log.warning( + "Unknown class not in migration map during item unpickling: %s.%s", + module, + name, + ) + raise pickle.UnpicklingError( + f"Unknown class not in migration map: {module}.{name}" + ) return self.CLASS_MAP[key] - diff --git a/backend/beets_flask/database/models/pending.py b/backend/beets_flask/database/models/pending.py index 2830bc99..f972511a 100644 --- a/backend/beets_flask/database/models/pending.py +++ b/backend/beets_flask/database/models/pending.py @@ -38,6 +38,7 @@ class BeetsItemType(TypeDecorator[BeetsItem]): """ impl = JSON + cache_ok = True @classmethod def _encode(cls, v): @@ -97,7 +98,7 @@ def process_result_value(self, value, dialect): class TaskPendingItem(Base): __tablename__ = "task_pending_items" - task_id: Mapped[int] = mapped_column(ForeignKey("task.id")) + task_id: Mapped[str] = mapped_column(ForeignKey("task.id")) task: Mapped[TaskStateInDb] = relationship(back_populates="pending_items") item: Mapped[BeetsItem] = mapped_column(BeetsItemType()) diff --git a/backend/beets_flask/database/models/states.py b/backend/beets_flask/database/models/states.py index 72a086b5..fa49e047 100644 --- a/backend/beets_flask/database/models/states.py +++ b/backend/beets_flask/database/models/states.py @@ -365,7 +365,8 @@ class TaskStateInDb(Base): # old_paths contain original file paths, but are only set when files are moved. # (which breaks some deep links that before were identical to paths, but no more!) pending_items: Mapped[list[TaskPendingItem]] = relationship( - cascade="all, delete-orphan" + back_populates="task", + cascade="all, delete-orphan", ) choice_flag: Mapped[Action | None]