Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
"""

from __future__ import annotations
import base64
from collections.abc import Sequence
from datetime import datetime
import io
import pickle
from typing import TypeVar
from uuid import uuid4

import sqlalchemy as sa
from beets_flask import log
from beets_flask.database.models.pending import BeetsItemType
from alembic import op


Expand All @@ -39,22 +40,43 @@

def upgrade() -> None:
"""Upgrade schema."""

op.create_table(
"task_pending_items",
"items",
sa.Column("id", sa.String(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.Column("flex_values", sa.JSON(), nullable=False),
sa.Column("fixed_values", sa.JSON(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_items_created_at"),
"items",
["created_at"],
unique=False,
)

op.create_table(
"tasks_items",
sa.Column("id", sa.String(), nullable=False),
sa.Column("task_id", sa.String(), nullable=False),
sa.Column("item", BeetsItemType(), nullable=False),
sa.Column("item_id", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["task_id"],
["task.id"],
),
sa.ForeignKeyConstraint(
["item_id"],
["items.id"],
),
)
op.create_index(
op.f("ix_task_pending_items_created_at"),
"task_pending_items",
op.f("ix_tasks_items_created_at"),
"tasks_items",
["created_at"],
unique=False,
)
Expand All @@ -67,15 +89,35 @@ def upgrade() -> None:
def downgrade() -> None:
"""Downgrade schema."""
op.add_column("task", sa.Column("items", sa.BLOB(), nullable=False))
op.drop_table("task_pending_items")
op.drop_table("tasks_items")
op.drop_table("items")


T = TypeVar("T", bound=dict)


def _encode(v):
if isinstance(v, bytes):
return {
"__type__": "bytes",
"data": base64.b64encode(v).decode("ascii"),
}

if isinstance(v, dict):
return {str(k): _encode(val) for k, val in v.items()}

if isinstance(v, list):
return [_encode(x) for x in v]

return v


def migrate_data():
conn = op.get_bind()
meta = sa.MetaData()

task_pending_items = sa.Table("task_pending_items", meta, autoload_with=conn)

tasks_items_table = sa.Table("tasks_items", meta, autoload_with=conn)
items_table = sa.Table("items", meta, autoload_with=conn)
result = conn.execute(sa.text("SELECT id, items FROM task WHERE items IS NOT NULL"))
for row in result:
task_id = row[0]
Expand All @@ -87,30 +129,38 @@ def migrate_data():
log.error(f"Failed to unpickle task {task_id}: {e}")
continue

rows = []
item_rows = []
tasks_items_rows = []
now = datetime.utcnow()
for stub in items:
rows.append(
item_id = str(uuid4())
item_rows.append(
{
"id": item_id,
"created_at": now,
"updated_at": now,
"fixed_values": _encode(dict(stub._values_fixed.items())),
"flex_values": _encode(dict(stub._values_flex.items())),
}
)
tasks_items_rows.append(
{
"id": str(uuid4()),
"created_at": now,
"updated_at": now,
"task_id": task_id,
"item": {
"fixed_values": BeetsItemType._encode(
dict(stub._values_fixed.items())
),
"flex_values": BeetsItemType._encode(
dict(stub._values_flex.items())
),
},
"item_id": item_id,
}
)

if rows:
if item_rows and tasks_items_rows:
conn.execute(
items_table.insert(),
item_rows,
)
conn.execute(
task_pending_items.insert(),
rows,
tasks_items_table.insert(),
tasks_items_rows,
)


Expand Down
101 changes: 42 additions & 59 deletions backend/beets_flask/database/models/pending.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from __future__ import annotations

import base64
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from sqlalchemy import JSON, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.types import TypeDecorator

from beets_flask.importer.types import BeetsItem

Expand All @@ -15,30 +14,52 @@
from .states import TaskStateInDb


class BeetsItemType(TypeDecorator[BeetsItem]):
"""Serializer and Deserializer for Beets Item class.
class TasksItems(Base):
__tablename__ = "tasks_items"

This component serializes Beets items by storing their fixed and flex fields
as JSON. Full object queryability is not implemented yet; instead, items are
preserved in a lightweight serialized form.
task_id: Mapped[str] = mapped_column(ForeignKey("task.id"))
task: Mapped[TaskStateInDb] = relationship(back_populates="pending_items")
item_id: Mapped[str] = mapped_column(ForeignKey("items.id"))
item: Mapped[Item] = relationship()

def __init__(self, item: Item, id: str | None = None):
super().__init__(id)
self.item = item


class Item(Base):
__tablename__ = "items"

The transformation is intentionally minimal because Beets items are already
structured for database persistence.
# items table in beets db
fixed_values: Mapped[dict[str, Any]] = mapped_column(JSON)

# item_attributes table in beets db
flex_values: Mapped[dict[str, Any]] = mapped_column(JSON)

Notes
-----
- Items are persisted here before Beets writes them to the beets db, we need this
to allow import processes to be resumed safely.
- In Beets, fixed and flex values are stored differently:
- fixed fields are stored as standard columns
- flex fields are stored in a separate linked table
- Pickling is avoided to ensure portability and migration stability (no
functions or runtime state are serialized).
"""
def __init__(
self,
fixed_values: dict[str, Any],
flex_values: dict[str, Any],
id: str | None = None,
):
super().__init__(id)
self.fixed_values = fixed_values
self.flex_values = flex_values

# FIXME: Move to mapper layer after match migration!

impl = JSON
cache_ok = True
def to_beets(self):
return BeetsItem._awaken(
fixed_values={k: self._decode(v) for k, v in self.fixed_values.items()},
flex_values={k: self._decode(v) for k, v in self.flex_values.items()},
)

@classmethod
def from_beets(cls, obj: BeetsItem):
return cls(
fixed_values={k: cls._encode(v) for k, v in obj._values_fixed.items()},
flex_values={k: cls._encode(v) for k, v in obj._values_flex.items()},
)

@classmethod
def _encode(cls, v):
Expand Down Expand Up @@ -67,41 +88,3 @@ def _decode(cls, v):
return [cls._decode(x) for x in v]

return v

def process_bind_param(self, value: BeetsItem | None, dialect):
"""Transform from live object into serialized json in database."""
if value is None or not value:
return None

return {
"fixed_values": {
k: self._encode(v) for k, v in value._values_fixed.items()
},
"flex_values": {k: self._encode(v) for k, v in value._values_flex.items()},
}

def process_result_value(self, value, dialect):
"""Transform from serialized json in database to live object."""
if value is None:
return None

return BeetsItem._awaken(
fixed_values={
k: self._decode(v) for k, v in value.get("fixed_values", {}).items()
},
flex_values={
k: self._decode(v) for k, v in value.get("flex_values", {}).items()
},
)


class TaskPendingItem(Base):
__tablename__ = "task_pending_items"

task_id: Mapped[str] = mapped_column(ForeignKey("task.id"))
task: Mapped[TaskStateInDb] = relationship(back_populates="pending_items")
item: Mapped[BeetsItem] = mapped_column(BeetsItemType())

def __init__(self, item: BeetsItem, id: str | None = None):
super().__init__(id)
self.item = item
8 changes: 4 additions & 4 deletions backend/beets_flask/database/models/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from beets_flask.logger import log
from beets_flask.server.exceptions import SerializedException

from .pending import TaskPendingItem
from .pending import Item, TasksItems


class FolderInDb(Base):
Expand Down Expand Up @@ -364,7 +364,7 @@ class TaskStateInDb(Base):
old_paths: Mapped[bytes | None]
# old_paths contain original file paths, but are only set when files are moved.
# (which breaks some deep links that before were identical to paths, but no more!)
pending_items: Mapped[list[TaskPendingItem]] = relationship(
pending_items: Mapped[list[TasksItems]] = relationship(
back_populates="task",
cascade="all, delete-orphan",
)
Expand All @@ -380,11 +380,11 @@ class TaskStateInDb(Base):

@property
def items(self) -> list[BeetsItem]:
return [row.item for row in self.pending_items]
return [row.item.to_beets() for row in self.pending_items]

@items.setter
def items(self, value: list[BeetsItem]):
self.pending_items = [TaskPendingItem(item=v) for v in value]
self.pending_items = [TasksItems(item=Item.from_beets(v)) for v in value]

def __init__(
self,
Expand Down
Loading