diff --git a/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py new file mode 100644 index 00000000..e0cadd1c --- /dev/null +++ b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py @@ -0,0 +1,220 @@ +"""item pending + +Revision ID: 925cf8989fbc +Revises: a986c03d9ba3 +Create Date: 2026-04-12 18:47:43.218344 + +README: +Historically, task state items were stored as binary (pickle) blobs in the database. +This approach has proven to be brittle and difficult to maintain. In particular, +changes and upgrades in beets break deserialization, requiring manual +intervention to recover or migrate data. + +For the unpickling to work, we would rely on beets class definitions – which are likely +to change over time. Thus, we have a custom unpickler, and mocked beets classes, which +will give the right structures, even past beets 2.5.1. Beware, this also holds for +our own classes (like BeetsItemType) which we will need to make copies of once we +change them. +""" + +from __future__ import annotations +from collections.abc import Sequence +from datetime import datetime +import io +import pickle +from uuid import uuid4 + +import sqlalchemy as sa +from beets_flask import log +from beets_flask.database.models.pending import BeetsItemType +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = "925cf8989fbc" +down_revision: str | Sequence[str] | None = "a986c03d9ba3" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_table( + "task_pending_items", + sa.Column("id", sa.String(), nullable=False), + sa.Column("task_id", sa.String(), nullable=False), + sa.Column("item", BeetsItemType(), nullable=False), + sa.PrimaryKeyConstraint("id"), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["task_id"], + ["task.id"], + ), + ) + op.create_index( + op.f("ix_task_pending_items_created_at"), + "task_pending_items", + ["created_at"], + unique=False, + ) + + migrate_data() + + op.drop_column("task", "items") + + +def downgrade() -> None: + """Downgrade schema.""" + op.add_column("task", sa.Column("items", sa.BLOB(), nullable=False)) + op.drop_table("task_pending_items") + + +def migrate_data(): + conn = op.get_bind() + meta = sa.MetaData() + + task_pending_items = sa.Table("task_pending_items", meta, autoload_with=conn) + + result = conn.execute(sa.text("SELECT id, items FROM task WHERE items IS NOT NULL")) + for row in result: + task_id = row[0] + items_blob = row[1] + + try: + items = load_items(items_blob) + except Exception as e: + log.error(f"Failed to unpickle task {task_id}: {e}") + continue + + rows = [] + now = datetime.utcnow() + for stub in items: + rows.append( + { + "id": str(uuid4()), + "created_at": now, + "updated_at": now, + "task_id": task_id, + "item": { + "fixed_values": BeetsItemType._encode( + dict(stub._values_fixed.items()) + ), + "flex_values": BeetsItemType._encode( + dict(stub._values_flex.items()) + ), + }, + } + ) + + if rows: + conn.execute( + task_pending_items.insert(), + rows, + ) + + +def load_items(blob: bytes) -> list[ModelStub]: + return ItemsUnpickler(io.BytesIO(blob)).load() + + +# --------------------------- Mocked Beets Classes --------------------------- # + + +class ModelStub: + def __init__(self, *args, **kwargs): + self._values_fixed = {} + self._values_flex = {} + self._db = None + + def __setstate__(self, state): + self.__dict__.update(state) + self._model_cls = None # must be reattached externally + + def __getstate__(self): + return self.__dict__ + + # --- mimic beets resolution --- + def __getitem__(self, key): + if "_values_fixed" in self.__dict__ and key in self._values_fixed: + return self._values_fixed[key] + + if "_values_flex" in self.__dict__ and key in self._values_flex: + return self._values_flex[key] + + if key in self.__dict__: + return self.__dict__[key] + + raise KeyError(key) + + def __getattr__(self, name): + try: + return self[name] + except KeyError: + raise AttributeError(name) + + def __setattr__(self, name, value): + # keep internal structure intact + if name in ("_values_fixed", "_values_flex", "_db"): + self.__dict__[name] = value + else: + self.__dict__[name] = value + + +class LazyConvertDictStub: + def __init__(self, *args, **kwargs): + self._data = {} + self._converted = {} + self.model_cls = None # Don't enforce model_cls, keep it flexible + + def __setstate__(self, state): + self.__dict__.update(state) + self.model_cls = None + + def __getstate__(self): + return self.__dict__ + + def __getitem__(self, key): + if key in self._converted: + return self._converted[key] + if key in self._data: + return self._data[key] + raise KeyError(key) + + def __setitem__(self, key, value): + self._converted[key] = value + + def __contains__(self, key): + return key in self._converted or key in self._data + + def keys(self): + return list(self._converted.keys()) + list(self._data.keys()) + + def __iter__(self): + return iter(self.keys()) + + def items(self): + for key in self: + yield key, self[key] + + +class ItemsUnpickler(pickle.Unpickler): + CLASS_MAP = { + ("beets.dbcore.db", "LazyConvertDict"): LazyConvertDictStub, + ("beets.library", "Item"): ModelStub, + ("beets.library.models", "Item"): ModelStub, + } + + def find_class(self, module, name): + """Override the find_class method to redirect Distance class references.""" + key = (module, name) + if key not in self.CLASS_MAP: + log.warning( + "Unknown class not in migration map during item unpickling: %s.%s", + module, + name, + ) + raise pickle.UnpicklingError( + f"Unknown class not in migration map: {module}.{name}" + ) + return self.CLASS_MAP[key] diff --git a/backend/beets_flask/database/models/pending.py b/backend/beets_flask/database/models/pending.py new file mode 100644 index 00000000..f972511a --- /dev/null +++ b/backend/beets_flask/database/models/pending.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import base64 +from typing import TYPE_CHECKING + +from sqlalchemy import JSON, ForeignKey +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.types import TypeDecorator + +from beets_flask.importer.types import BeetsItem + +from .base import Base + +if TYPE_CHECKING: + from .states import TaskStateInDb + + +class BeetsItemType(TypeDecorator[BeetsItem]): + """Serializer and Deserializer for Beets Item class. + + This component serializes Beets items by storing their fixed and flex fields + as JSON. Full object queryability is not implemented yet; instead, items are + preserved in a lightweight serialized form. + + The transformation is intentionally minimal because Beets items are already + structured for database persistence. + + + Notes + ----- + - Items are persisted here before Beets writes them to the beets db, we need this + to allow import processes to be resumed safely. + - In Beets, fixed and flex values are stored differently: + - fixed fields are stored as standard columns + - flex fields are stored in a separate linked table + - Pickling is avoided to ensure portability and migration stability (no + functions or runtime state are serialized). + """ + + impl = JSON + cache_ok = True + + @classmethod + def _encode(cls, v): + if isinstance(v, bytes): + return { + "__type__": "bytes", + "data": base64.b64encode(v).decode("ascii"), + } + + if isinstance(v, dict): + return {str(k): cls._encode(val) for k, val in v.items()} + + if isinstance(v, list): + return [cls._encode(x) for x in v] + + return v + + @classmethod + def _decode(cls, v): + if isinstance(v, dict): + if v.get("__type__") == "bytes": + return base64.b64decode(v["data"]) + return {k: cls._decode(val) for k, val in v.items()} + + if isinstance(v, list): + return [cls._decode(x) for x in v] + + return v + + def process_bind_param(self, value: BeetsItem | None, dialect): + """Transform from live object into serialized json in database.""" + if value is None or not value: + return None + + return { + "fixed_values": { + k: self._encode(v) for k, v in value._values_fixed.items() + }, + "flex_values": {k: self._encode(v) for k, v in value._values_flex.items()}, + } + + def process_result_value(self, value, dialect): + """Transform from serialized json in database to live object.""" + if value is None: + return None + + return BeetsItem._awaken( + fixed_values={ + k: self._decode(v) for k, v in value.get("fixed_values", {}).items() + }, + flex_values={ + k: self._decode(v) for k, v in value.get("flex_values", {}).items() + }, + ) + + +class TaskPendingItem(Base): + __tablename__ = "task_pending_items" + + task_id: Mapped[str] = mapped_column(ForeignKey("task.id")) + task: Mapped[TaskStateInDb] = relationship(back_populates="pending_items") + item: Mapped[BeetsItem] = mapped_column(BeetsItemType()) + + def __init__(self, item: BeetsItem, id: str | None = None): + super().__init__(id) + self.item = item diff --git a/backend/beets_flask/database/models/states.py b/backend/beets_flask/database/models/states.py index 2e8e5712..fa49e047 100644 --- a/backend/beets_flask/database/models/states.py +++ b/backend/beets_flask/database/models/states.py @@ -21,7 +21,6 @@ from beets.autotag import AlbumMatch from beets.autotag.distance import Distance from beets.importer import Action, ImportTask -from beets.library.models import Item as LibraryItem from sqlalchemy import ( ForeignKey, UniqueConstraint, @@ -45,10 +44,12 @@ SessionState, TaskState, ) -from beets_flask.importer.types import BeetsAlbumMatch, BeetsTrackMatch +from beets_flask.importer.types import BeetsAlbumMatch, BeetsItem, BeetsTrackMatch from beets_flask.logger import log from beets_flask.server.exceptions import SerializedException +from .pending import TaskPendingItem + class FolderInDb(Base): """Represents a folder on disk, to keep track of changes. @@ -363,7 +364,10 @@ class TaskStateInDb(Base): old_paths: Mapped[bytes | None] # old_paths contain original file paths, but are only set when files are moved. # (which breaks some deep links that before were identical to paths, but no more!) - items: Mapped[bytes] + pending_items: Mapped[list[TaskPendingItem]] = relationship( + back_populates="task", + cascade="all, delete-orphan", + ) choice_flag: Mapped[Action | None] # To allow for continue we need to store the current artist and album @@ -374,13 +378,21 @@ class TaskStateInDb(Base): progress: Mapped[Progress] + @property + def items(self) -> list[BeetsItem]: + return [row.item for row in self.pending_items] + + @items.setter + def items(self, value: list[BeetsItem]): + self.pending_items = [TaskPendingItem(item=v) for v in value] + def __init__( self, id: str | None = None, toppath: bytes | None = None, paths: list[bytes] = [], old_paths: list[bytes] | None = None, - items: list[LibraryItem] = [], + items: list[BeetsItem] = [], candidates: list[CandidateStateInDb] = [], chosen_candidate_id: str | None = None, progress: Progress = Progress.NOT_STARTED, @@ -393,12 +405,7 @@ def __init__( self.paths = pickle.dumps(paths) self.old_paths = pickle.dumps(old_paths) if old_paths else None - for item in items: - # Remove db from all items as it can't be pickled - item._db = None - item._Item__album = None - - self.items = pickle.dumps(items) + self.items = items self.candidates = candidates self.chosen_candidate_id = chosen_candidate_id self.progress = progress @@ -418,7 +425,7 @@ def from_live_state(cls, state: TaskState) -> TaskStateInDb: id=state.id, toppath=str(state.toppath).encode("utf-8") if state.toppath else None, paths=state.task.paths, - items=state.task.items, + items=state.items, candidates=[ CandidateStateInDb.from_live_state(c) for c in state.candidate_states ], @@ -438,7 +445,7 @@ def to_live_state(self, session_state: SessionState | None = None) -> TaskState: beets_task = ImportTask( toppath=self.toppath, paths=pickle.loads(self.paths), - items=pickle.loads(self.items), + items=self.items, ) beets_task.choice_flag = self.choice_flag beets_task.cur_artist = self.cur_artist