-
Notifications
You must be signed in to change notification settings - Fork 28
db_migration: Migrated TaskStateInDb.items field pickle to json #296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,220 @@ | ||||||||||||||||||
| """item pending | ||||||||||||||||||
|
|
||||||||||||||||||
| Revision ID: 925cf8989fbc | ||||||||||||||||||
| Revises: a986c03d9ba3 | ||||||||||||||||||
| Create Date: 2026-04-12 18:47:43.218344 | ||||||||||||||||||
|
|
||||||||||||||||||
| README: | ||||||||||||||||||
| Historically, task state items were stored as binary (pickle) blobs in the database. | ||||||||||||||||||
| This approach has proven to be brittle and difficult to maintain. In particular, | ||||||||||||||||||
| changes and upgrades in beets break deserialization, requiring manual | ||||||||||||||||||
| intervention to recover or migrate data. | ||||||||||||||||||
|
|
||||||||||||||||||
| For the unpickling to work, we would rely on beets class definitions – which are likely | ||||||||||||||||||
| to change over time. Thus, we have a custom unpickler, and mocked beets classes, which | ||||||||||||||||||
| will give the right structures, even past beets 2.5.1. Beware, this also holds for | ||||||||||||||||||
| our own classes (like BeetsItemType) which we will need to make copies of once we | ||||||||||||||||||
| change them. | ||||||||||||||||||
| """ | ||||||||||||||||||
|
|
||||||||||||||||||
| from __future__ import annotations | ||||||||||||||||||
| from collections.abc import Sequence | ||||||||||||||||||
| from datetime import datetime | ||||||||||||||||||
| import io | ||||||||||||||||||
| import pickle | ||||||||||||||||||
| from uuid import uuid4 | ||||||||||||||||||
|
|
||||||||||||||||||
| import sqlalchemy as sa | ||||||||||||||||||
| from beets_flask import log | ||||||||||||||||||
| from beets_flask.database.models.pending import BeetsItemType | ||||||||||||||||||
| from alembic import op | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| # revision identifiers, used by Alembic. | ||||||||||||||||||
| revision: str = "925cf8989fbc" | ||||||||||||||||||
| down_revision: str | Sequence[str] | None = "a986c03d9ba3" | ||||||||||||||||||
| branch_labels: str | Sequence[str] | None = None | ||||||||||||||||||
| depends_on: str | Sequence[str] | None = None | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| def upgrade() -> None: | ||||||||||||||||||
| """Upgrade schema.""" | ||||||||||||||||||
| op.create_table( | ||||||||||||||||||
| "task_pending_items", | ||||||||||||||||||
| sa.Column("id", sa.String(), nullable=False), | ||||||||||||||||||
| sa.Column("task_id", sa.String(), nullable=False), | ||||||||||||||||||
| sa.Column("item", BeetsItemType(), nullable=False), | ||||||||||||||||||
| sa.PrimaryKeyConstraint("id"), | ||||||||||||||||||
| sa.Column("created_at", sa.DateTime(), nullable=False), | ||||||||||||||||||
| sa.Column("updated_at", sa.DateTime(), nullable=False), | ||||||||||||||||||
| sa.ForeignKeyConstraint( | ||||||||||||||||||
| ["task_id"], | ||||||||||||||||||
| ["task.id"], | ||||||||||||||||||
| ), | ||||||||||||||||||
| ) | ||||||||||||||||||
| op.create_index( | ||||||||||||||||||
| op.f("ix_task_pending_items_created_at"), | ||||||||||||||||||
| "task_pending_items", | ||||||||||||||||||
| ["created_at"], | ||||||||||||||||||
| unique=False, | ||||||||||||||||||
| ) | ||||||||||||||||||
|
||||||||||||||||||
| ) | |
| ) | |
| op.create_index( | |
| op.f("ix_task_pending_items_task_id"), | |
| "task_pending_items", | |
| ["task_id"], | |
| unique=False, | |
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import base64 | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from sqlalchemy import JSON, ForeignKey | ||
| from sqlalchemy.orm import Mapped, mapped_column, relationship | ||
| from sqlalchemy.types import TypeDecorator | ||
|
|
||
| from beets_flask.importer.types import BeetsItem | ||
|
|
||
| from .base import Base | ||
|
|
||
| if TYPE_CHECKING: | ||
| from .states import TaskStateInDb | ||
|
|
||
|
|
||
| class BeetsItemType(TypeDecorator[BeetsItem]): | ||
| """Serializer and Deserializer for Beets Item class. | ||
|
|
||
| This component serializes Beets items by storing their fixed and flex fields | ||
| as JSON. Full object queryability is not implemented yet; instead, items are | ||
| preserved in a lightweight serialized form. | ||
|
|
||
| The transformation is intentionally minimal because Beets items are already | ||
| structured for database persistence. | ||
|
|
||
|
|
||
| Notes | ||
| ----- | ||
| - Items are persisted here before Beets writes them to the beets db, we need this | ||
| to allow import processes to be resumed safely. | ||
| - In Beets, fixed and flex values are stored differently: | ||
| - fixed fields are stored as standard columns | ||
| - flex fields are stored in a separate linked table | ||
| - Pickling is avoided to ensure portability and migration stability (no | ||
| functions or runtime state are serialized). | ||
| """ | ||
|
|
||
| impl = JSON | ||
|
semohr marked this conversation as resolved.
|
||
| cache_ok = True | ||
|
|
||
| @classmethod | ||
| def _encode(cls, v): | ||
| if isinstance(v, bytes): | ||
| return { | ||
| "__type__": "bytes", | ||
| "data": base64.b64encode(v).decode("ascii"), | ||
| } | ||
|
|
||
| if isinstance(v, dict): | ||
| return {str(k): cls._encode(val) for k, val in v.items()} | ||
|
|
||
| if isinstance(v, list): | ||
| return [cls._encode(x) for x in v] | ||
|
|
||
| return v | ||
|
|
||
| @classmethod | ||
| def _decode(cls, v): | ||
| if isinstance(v, dict): | ||
| if v.get("__type__") == "bytes": | ||
| return base64.b64decode(v["data"]) | ||
| return {k: cls._decode(val) for k, val in v.items()} | ||
|
|
||
| if isinstance(v, list): | ||
| return [cls._decode(x) for x in v] | ||
|
|
||
| return v | ||
|
|
||
| def process_bind_param(self, value: BeetsItem | None, dialect): | ||
| """Transform from live object into serialized json in database.""" | ||
| if value is None or not value: | ||
| return None | ||
|
|
||
| return { | ||
| "fixed_values": { | ||
| k: self._encode(v) for k, v in value._values_fixed.items() | ||
| }, | ||
| "flex_values": {k: self._encode(v) for k, v in value._values_flex.items()}, | ||
| } | ||
|
|
||
| def process_result_value(self, value, dialect): | ||
| """Transform from serialized json in database to live object.""" | ||
| if value is None: | ||
| return None | ||
|
|
||
| return BeetsItem._awaken( | ||
| fixed_values={ | ||
| k: self._decode(v) for k, v in value.get("fixed_values", {}).items() | ||
| }, | ||
| flex_values={ | ||
| k: self._decode(v) for k, v in value.get("flex_values", {}).items() | ||
| }, | ||
| ) | ||
|
|
||
|
|
||
| class TaskPendingItem(Base): | ||
| __tablename__ = "task_pending_items" | ||
|
|
||
| task_id: Mapped[str] = mapped_column(ForeignKey("task.id")) | ||
| task: Mapped[TaskStateInDb] = relationship(back_populates="pending_items") | ||
| item: Mapped[BeetsItem] = mapped_column(BeetsItemType()) | ||
|
|
||
| def __init__(self, item: BeetsItem, id: str | None = None): | ||
| super().__init__(id) | ||
| self.item = item | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This migration imports
BeetsItemTypefrom the application package. Alembic migrations should be self-contained because future refactors toBeetsItemType(or its imports) can break running historical migrations on a fresh database. Consider copying the minimalBeetsItemTypeimplementation into this migration file (or using a plainsa.JSON()column here and handling serialization manually during data migration).