Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions backend/alembic/versions/2026_04_12_1847-925cf8989fbc_item_pending.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
"""item pending

Revision ID: 925cf8989fbc
Revises: a986c03d9ba3
Create Date: 2026-04-12 18:47:43.218344

README:
Historically, task state items were stored as binary (pickle) blobs in the database.
This approach has proven to be brittle and difficult to maintain. In particular,
changes and upgrades in beets break deserialization, requiring manual
intervention to recover or migrate data.

For the unpickling to work, we would rely on beets class definitions – which are likely
to change over time. Thus, we have a custom unpickler, and mocked beets classes, which
will give the right structures, even past beets 2.5.1. Beware, this also holds for
our own classes (like BeetsItemType) which we will need to make copies of once we
change them.
"""

from __future__ import annotations
from collections.abc import Sequence
from datetime import datetime
import io
import pickle
from uuid import uuid4

import sqlalchemy as sa
from beets_flask import log
from beets_flask.database.models.pending import BeetsItemType
from alembic import op


Comment on lines +29 to +32
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This migration imports BeetsItemType from the application package. Alembic migrations should be self-contained because future refactors to BeetsItemType (or its imports) can break running historical migrations on a fresh database. Consider copying the minimal BeetsItemType implementation into this migration file (or using a plain sa.JSON() column here and handling serialization manually during data migration).

Suggested change
from beets_flask.database.models.pending import BeetsItemType
from alembic import op
from alembic import op
class BeetsItemType(sa.TypeDecorator):
"""Migration-local copy of the pending item type.
Alembic migrations must remain self-contained so they can run even if
application code changes in the future. This minimal implementation keeps
the historical migration independent from the live application model code.
"""
impl = sa.JSON
cache_ok = True
@staticmethod
def _encode(value):
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
if isinstance(value, dict):
return {
str(key): BeetsItemType._encode(item)
for key, item in value.items()
}
if isinstance(value, (list, tuple, set)):
return [BeetsItemType._encode(item) for item in value]
return str(value)
def process_bind_param(self, value, dialect):
return self._encode(value)
def process_result_value(self, value, dialect):
return value

Copilot uses AI. Check for mistakes.
# revision identifiers, used by Alembic.
revision: str = "925cf8989fbc"
down_revision: str | Sequence[str] | None = "a986c03d9ba3"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
"""Upgrade schema."""
op.create_table(
"task_pending_items",
sa.Column("id", sa.String(), nullable=False),
sa.Column("task_id", sa.String(), nullable=False),
sa.Column("item", BeetsItemType(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["task_id"],
["task.id"],
),
)
op.create_index(
op.f("ix_task_pending_items_created_at"),
"task_pending_items",
["created_at"],
unique=False,
)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_pending_items will be queried by task_id whenever loading TaskStateInDb.pending_items, but the migration only creates an index on created_at. On Postgres/SQLite this can lead to slow lookups as the table grows. Add an index on task_id (and optionally a composite (task_id, created_at) if you plan to order by creation time).

Suggested change
)
)
op.create_index(
op.f("ix_task_pending_items_task_id"),
"task_pending_items",
["task_id"],
unique=False,
)

Copilot uses AI. Check for mistakes.

migrate_data()

op.drop_column("task", "items")


def downgrade() -> None:
"""Downgrade schema."""
op.add_column("task", sa.Column("items", sa.BLOB(), nullable=False))
op.drop_table("task_pending_items")


def migrate_data():
conn = op.get_bind()
meta = sa.MetaData()

task_pending_items = sa.Table("task_pending_items", meta, autoload_with=conn)

result = conn.execute(sa.text("SELECT id, items FROM task WHERE items IS NOT NULL"))
for row in result:
task_id = row[0]
items_blob = row[1]

try:
items = load_items(items_blob)
except Exception as e:
log.error(f"Failed to unpickle task {task_id}: {e}")
continue

rows = []
now = datetime.utcnow()
for stub in items:
rows.append(
{
"id": str(uuid4()),
"created_at": now,
"updated_at": now,
"task_id": task_id,
"item": {
"fixed_values": BeetsItemType._encode(
dict(stub._values_fixed.items())
),
"flex_values": BeetsItemType._encode(
dict(stub._values_flex.items())
),
},
}
)

if rows:
conn.execute(
task_pending_items.insert(),
rows,
)


def load_items(blob: bytes) -> list[ModelStub]:
return ItemsUnpickler(io.BytesIO(blob)).load()


# --------------------------- Mocked Beets Classes --------------------------- #


class ModelStub:
def __init__(self, *args, **kwargs):
self._values_fixed = {}
self._values_flex = {}
self._db = None

def __setstate__(self, state):
self.__dict__.update(state)
self._model_cls = None # must be reattached externally

def __getstate__(self):
return self.__dict__

# --- mimic beets resolution ---
def __getitem__(self, key):
if "_values_fixed" in self.__dict__ and key in self._values_fixed:
return self._values_fixed[key]

if "_values_flex" in self.__dict__ and key in self._values_flex:
return self._values_flex[key]

if key in self.__dict__:
return self.__dict__[key]

raise KeyError(key)

def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)

def __setattr__(self, name, value):
# keep internal structure intact
if name in ("_values_fixed", "_values_flex", "_db"):
self.__dict__[name] = value
else:
self.__dict__[name] = value


class LazyConvertDictStub:
def __init__(self, *args, **kwargs):
self._data = {}
self._converted = {}
self.model_cls = None # Don't enforce model_cls, keep it flexible

def __setstate__(self, state):
self.__dict__.update(state)
self.model_cls = None

def __getstate__(self):
return self.__dict__

def __getitem__(self, key):
if key in self._converted:
return self._converted[key]
if key in self._data:
return self._data[key]
raise KeyError(key)

def __setitem__(self, key, value):
self._converted[key] = value

def __contains__(self, key):
return key in self._converted or key in self._data

def keys(self):
return list(self._converted.keys()) + list(self._data.keys())

def __iter__(self):
return iter(self.keys())

def items(self):
for key in self:
yield key, self[key]


class ItemsUnpickler(pickle.Unpickler):
CLASS_MAP = {
("beets.dbcore.db", "LazyConvertDict"): LazyConvertDictStub,
("beets.library", "Item"): ModelStub,
("beets.library.models", "Item"): ModelStub,
}

def find_class(self, module, name):
"""Override the find_class method to redirect Distance class references."""
key = (module, name)
if key not in self.CLASS_MAP:
log.warning(
"Unknown class not in migration map during item unpickling: %s.%s",
module,
name,
)
raise pickle.UnpicklingError(
f"Unknown class not in migration map: {module}.{name}"
)
return self.CLASS_MAP[key]
107 changes: 107 additions & 0 deletions backend/beets_flask/database/models/pending.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import annotations

import base64
from typing import TYPE_CHECKING

from sqlalchemy import JSON, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.types import TypeDecorator

from beets_flask.importer.types import BeetsItem

from .base import Base

if TYPE_CHECKING:
from .states import TaskStateInDb


class BeetsItemType(TypeDecorator[BeetsItem]):
"""Serializer and Deserializer for Beets Item class.

This component serializes Beets items by storing their fixed and flex fields
as JSON. Full object queryability is not implemented yet; instead, items are
preserved in a lightweight serialized form.

The transformation is intentionally minimal because Beets items are already
structured for database persistence.


Notes
-----
- Items are persisted here before Beets writes them to the beets db, we need this
to allow import processes to be resumed safely.
- In Beets, fixed and flex values are stored differently:
- fixed fields are stored as standard columns
- flex fields are stored in a separate linked table
- Pickling is avoided to ensure portability and migration stability (no
functions or runtime state are serialized).
"""

impl = JSON
Comment thread
semohr marked this conversation as resolved.
cache_ok = True

@classmethod
def _encode(cls, v):
if isinstance(v, bytes):
return {
"__type__": "bytes",
"data": base64.b64encode(v).decode("ascii"),
}

if isinstance(v, dict):
return {str(k): cls._encode(val) for k, val in v.items()}

if isinstance(v, list):
return [cls._encode(x) for x in v]

return v

@classmethod
def _decode(cls, v):
if isinstance(v, dict):
if v.get("__type__") == "bytes":
return base64.b64decode(v["data"])
return {k: cls._decode(val) for k, val in v.items()}

if isinstance(v, list):
return [cls._decode(x) for x in v]

return v

def process_bind_param(self, value: BeetsItem | None, dialect):
"""Transform from live object into serialized json in database."""
if value is None or not value:
return None

return {
"fixed_values": {
k: self._encode(v) for k, v in value._values_fixed.items()
},
"flex_values": {k: self._encode(v) for k, v in value._values_flex.items()},
}

def process_result_value(self, value, dialect):
"""Transform from serialized json in database to live object."""
if value is None:
return None

return BeetsItem._awaken(
fixed_values={
k: self._decode(v) for k, v in value.get("fixed_values", {}).items()
},
flex_values={
k: self._decode(v) for k, v in value.get("flex_values", {}).items()
},
)


class TaskPendingItem(Base):
__tablename__ = "task_pending_items"

task_id: Mapped[str] = mapped_column(ForeignKey("task.id"))
task: Mapped[TaskStateInDb] = relationship(back_populates="pending_items")
item: Mapped[BeetsItem] = mapped_column(BeetsItemType())

def __init__(self, item: BeetsItem, id: str | None = None):
super().__init__(id)
self.item = item
Loading
Loading