diff --git a/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py index e0cadd1c..fdbfbd17 100644 --- a/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py +++ b/backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py @@ -18,15 +18,16 @@ """ from __future__ import annotations +import base64 from collections.abc import Sequence from datetime import datetime import io import pickle +from typing import TypeVar from uuid import uuid4 import sqlalchemy as sa from beets_flask import log -from beets_flask.database.models.pending import BeetsItemType from alembic import op @@ -39,11 +40,28 @@ def upgrade() -> None: """Upgrade schema.""" + op.create_table( - "task_pending_items", + "items", + sa.Column("id", sa.String(), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.Column("flex_values", sa.JSON(), nullable=False), + sa.Column("fixed_values", sa.JSON(), nullable=False), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_items_created_at"), + "items", + ["created_at"], + unique=False, + ) + + op.create_table( + "tasks_items", sa.Column("id", sa.String(), nullable=False), sa.Column("task_id", sa.String(), nullable=False), - sa.Column("item", BeetsItemType(), nullable=False), + sa.Column("item_id", sa.String(), nullable=False), sa.PrimaryKeyConstraint("id"), sa.Column("created_at", sa.DateTime(), nullable=False), sa.Column("updated_at", sa.DateTime(), nullable=False), @@ -51,10 +69,14 @@ def upgrade() -> None: ["task_id"], ["task.id"], ), + sa.ForeignKeyConstraint( + ["item_id"], + ["items.id"], + ), ) op.create_index( - op.f("ix_task_pending_items_created_at"), - "task_pending_items", + op.f("ix_tasks_items_created_at"), + "tasks_items", ["created_at"], unique=False, ) @@ -67,15 +89,35 @@ def upgrade() -> None: def downgrade() -> None: """Downgrade schema.""" op.add_column("task", sa.Column("items", sa.BLOB(), nullable=False)) - op.drop_table("task_pending_items") + op.drop_table("tasks_items") + op.drop_table("items") + + +T = TypeVar("T", bound=dict) + + +def _encode(v): + if isinstance(v, bytes): + return { + "__type__": "bytes", + "data": base64.b64encode(v).decode("ascii"), + } + + if isinstance(v, dict): + return {str(k): _encode(val) for k, val in v.items()} + + if isinstance(v, list): + return [_encode(x) for x in v] + + return v def migrate_data(): conn = op.get_bind() meta = sa.MetaData() - task_pending_items = sa.Table("task_pending_items", meta, autoload_with=conn) - + tasks_items_table = sa.Table("tasks_items", meta, autoload_with=conn) + items_table = sa.Table("items", meta, autoload_with=conn) result = conn.execute(sa.text("SELECT id, items FROM task WHERE items IS NOT NULL")) for row in result: task_id = row[0] @@ -87,30 +129,38 @@ def migrate_data(): log.error(f"Failed to unpickle task {task_id}: {e}") continue - rows = [] + item_rows = [] + tasks_items_rows = [] now = datetime.utcnow() for stub in items: - rows.append( + item_id = str(uuid4()) + item_rows.append( + { + "id": item_id, + "created_at": now, + "updated_at": now, + "fixed_values": _encode(dict(stub._values_fixed.items())), + "flex_values": _encode(dict(stub._values_flex.items())), + } + ) + tasks_items_rows.append( { "id": str(uuid4()), "created_at": now, "updated_at": now, "task_id": task_id, - "item": { - "fixed_values": BeetsItemType._encode( - dict(stub._values_fixed.items()) - ), - "flex_values": BeetsItemType._encode( - dict(stub._values_flex.items()) - ), - }, + "item_id": item_id, } ) - if rows: + if item_rows and tasks_items_rows: + conn.execute( + items_table.insert(), + item_rows, + ) conn.execute( - task_pending_items.insert(), - rows, + tasks_items_table.insert(), + tasks_items_rows, ) diff --git a/backend/beets_flask/database/models/pending.py b/backend/beets_flask/database/models/pending.py index f972511a..4f2e0a63 100644 --- a/backend/beets_flask/database/models/pending.py +++ b/backend/beets_flask/database/models/pending.py @@ -1,11 +1,10 @@ from __future__ import annotations import base64 -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from sqlalchemy import JSON, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship -from sqlalchemy.types import TypeDecorator from beets_flask.importer.types import BeetsItem @@ -15,30 +14,52 @@ from .states import TaskStateInDb -class BeetsItemType(TypeDecorator[BeetsItem]): - """Serializer and Deserializer for Beets Item class. +class TasksItems(Base): + __tablename__ = "tasks_items" - This component serializes Beets items by storing their fixed and flex fields - as JSON. Full object queryability is not implemented yet; instead, items are - preserved in a lightweight serialized form. + task_id: Mapped[str] = mapped_column(ForeignKey("task.id")) + task: Mapped[TaskStateInDb] = relationship(back_populates="pending_items") + item_id: Mapped[str] = mapped_column(ForeignKey("items.id")) + item: Mapped[Item] = relationship() + + def __init__(self, item: Item, id: str | None = None): + super().__init__(id) + self.item = item + + +class Item(Base): + __tablename__ = "items" - The transformation is intentionally minimal because Beets items are already - structured for database persistence. + # items table in beets db + fixed_values: Mapped[dict[str, Any]] = mapped_column(JSON) + # item_attributes table in beets db + flex_values: Mapped[dict[str, Any]] = mapped_column(JSON) - Notes - ----- - - Items are persisted here before Beets writes them to the beets db, we need this - to allow import processes to be resumed safely. - - In Beets, fixed and flex values are stored differently: - - fixed fields are stored as standard columns - - flex fields are stored in a separate linked table - - Pickling is avoided to ensure portability and migration stability (no - functions or runtime state are serialized). - """ + def __init__( + self, + fixed_values: dict[str, Any], + flex_values: dict[str, Any], + id: str | None = None, + ): + super().__init__(id) + self.fixed_values = fixed_values + self.flex_values = flex_values + + # FIXME: Move to mapper layer after match migration! - impl = JSON - cache_ok = True + def to_beets(self): + return BeetsItem._awaken( + fixed_values={k: self._decode(v) for k, v in self.fixed_values.items()}, + flex_values={k: self._decode(v) for k, v in self.flex_values.items()}, + ) + + @classmethod + def from_beets(cls, obj: BeetsItem): + return cls( + fixed_values={k: cls._encode(v) for k, v in obj._values_fixed.items()}, + flex_values={k: cls._encode(v) for k, v in obj._values_flex.items()}, + ) @classmethod def _encode(cls, v): @@ -67,41 +88,3 @@ def _decode(cls, v): return [cls._decode(x) for x in v] return v - - def process_bind_param(self, value: BeetsItem | None, dialect): - """Transform from live object into serialized json in database.""" - if value is None or not value: - return None - - return { - "fixed_values": { - k: self._encode(v) for k, v in value._values_fixed.items() - }, - "flex_values": {k: self._encode(v) for k, v in value._values_flex.items()}, - } - - def process_result_value(self, value, dialect): - """Transform from serialized json in database to live object.""" - if value is None: - return None - - return BeetsItem._awaken( - fixed_values={ - k: self._decode(v) for k, v in value.get("fixed_values", {}).items() - }, - flex_values={ - k: self._decode(v) for k, v in value.get("flex_values", {}).items() - }, - ) - - -class TaskPendingItem(Base): - __tablename__ = "task_pending_items" - - task_id: Mapped[str] = mapped_column(ForeignKey("task.id")) - task: Mapped[TaskStateInDb] = relationship(back_populates="pending_items") - item: Mapped[BeetsItem] = mapped_column(BeetsItemType()) - - def __init__(self, item: BeetsItem, id: str | None = None): - super().__init__(id) - self.item = item diff --git a/backend/beets_flask/database/models/states.py b/backend/beets_flask/database/models/states.py index fa49e047..2fa045c3 100644 --- a/backend/beets_flask/database/models/states.py +++ b/backend/beets_flask/database/models/states.py @@ -48,7 +48,7 @@ from beets_flask.logger import log from beets_flask.server.exceptions import SerializedException -from .pending import TaskPendingItem +from .pending import Item, TasksItems class FolderInDb(Base): @@ -364,7 +364,7 @@ class TaskStateInDb(Base): old_paths: Mapped[bytes | None] # old_paths contain original file paths, but are only set when files are moved. # (which breaks some deep links that before were identical to paths, but no more!) - pending_items: Mapped[list[TaskPendingItem]] = relationship( + pending_items: Mapped[list[TasksItems]] = relationship( back_populates="task", cascade="all, delete-orphan", ) @@ -380,11 +380,11 @@ class TaskStateInDb(Base): @property def items(self) -> list[BeetsItem]: - return [row.item for row in self.pending_items] + return [row.item.to_beets() for row in self.pending_items] @items.setter def items(self, value: list[BeetsItem]): - self.pending_items = [TaskPendingItem(item=v) for v in value] + self.pending_items = [TasksItems(item=Item.from_beets(v)) for v in value] def __init__( self,