Конвертер принимает уже спарсенные товары из receiver, нормализует поля и готовит запись для catalog.
- Общий
BaseParserHandler(мастер-класс) с единым контрактом нормализации. - Реестр обработчиков
HandlerRegistryдля выбора модуля поparser_name. - Отдельные модули
parsers/fixprice,parsers/chizhik,parsers/perekrestokс parser-specific title обработчиками. - Пайплайн
ConverterPipeline:- обработчик парсера,
- резолв canonical product id (
plu/sku/source_id + parser), - persistent image dedup,
- backfill
NULLполей по ближайшей версии товара во времени.
converter/
core/
base.py # мастер-класс обработчика
models.py # raw/normalized dataclass-модели
ports.py # интерфейсы receiver/catalog/storage
registry.py # реестр обработчиков
services.py # identity, image dedup, null-backfill
parsers/
fixprice/
handler.py # обработчик Fix Price
title_parser.py
normalizers.py
patterns.py
chizhik/
handler.py # обработчик Чижик
title_parser.py
patterns.py
perekrestok/
handler.py # обработчик Перекрёсток
title_parser.py
sync.py # сервис batch-sync receiver -> catalog
daemon.py # очередь + HTTP trigger API
pipeline.py # title/category/geo/composition normalization
catalog теперь хранит данные не только в projection-таблице, а в нормализованной структуре с историей:
catalog_product_snapshots- append-only история версий товара (каждый проход sync добавляет snapshot, без перетирания прошлого).catalog_product_sources- состояние источника(parser_name, source_id)и ссылка на последний snapshot.catalog_settlements- справочник населенных пунктов/регионов/стран.catalog_settlement_geodata- история геоточек (lat/lon) по settlement.catalog_categories- справочник категорий (uid/title/depth/parent).catalog_product_category_links- связи snapshot -> category.catalog_products- текущая проекция (read-model) для быстрых чтений.catalog_product_assets/catalog_snapshot_assets- массивные поля товара (image urls, duplicates, fingerprints) в нормализованном виде.catalog_product_payload_nodes/catalog_snapshot_payload_nodes- полный payload источника как дерево узлов (без JSON-колонок).
Для title в БД хранится единое поле title_normalized_no_stopwords; поля
title_normalized и title_original_no_stopwords в catalog_products и
catalog_product_snapshots не сохраняются.
Converter сохраняет расширенный product-контракт без потерь: в snapshots/current projection
пишутся цены (price/discount_price/loyal_price/price_unit), product-флаги и producer/rating,
оригинальный и нормализованный состав (composition_original / composition_normalized),
а полный источник из receiver (product/artifact/admin/categories/images/meta/wholesale/category-links)
сохраняется в реляционных таблицах catalog_*_payload_nodes (без JSON-колонок).
Политика обновления:
- история не удаляется и не перезаписывается (
append-only snapshots); - справочники (
settlements/categories/geodata) пополняются и дополняются; catalog_productsобновляется неразрушительно:NULL/пустыеновые значения не затирают заполненные старые.
Поддержан паттерн вида:
Название, Бренд(опц), floatXfloat[ Xfloat ] см ИЛИ float (г/кг/мл/л), int(кол-во, опц), в ассортименте
Из title формируются:
title_originaltitle_normalizedtitle_original_no_stopwordstitle_normalized_no_stopwordsunit,available_count,package_quantity,package_unit
Unit guide:
Chocolate 200 g->unit=PCE,available_count=15,package_quantity=0.2,package_unit=KGMMilk 1 L->unit=PCE,available_count=10,package_quantity=1,package_unit=LTRPotatoes by weight->unit=KGM,available_count=None,package_quantity=NoneWater vending->unit=LTR,available_count=None,package_quantity=None
python3 example_fixprice_title_parser.pypython3 -m unittest discover -s tests -p 'test_*.py' -vЕсть адаптер под SQLite-базу receiver:
converter.adapters.ReceiverSQLiteRepository- поддерживает только актуальную схему
receiver(run_artifacts.parser_nameобязателен). - если обязательной колонки нет, адаптер падает с ошибкой несовместимой схемы.
Есть sink под SQLite-базу catalog:
converter.adapters.CatalogSQLiteRepository- выполняет
upsertнормализованных товаров; - хранит persistent
canonical_product_idmap, image fingerprints и sync cursor.
Полный sync receiver -> catalog (SQLite):
python3 sync_receiver_to_catalog.py \
--receiver-db ../receiver/data/receiver.db \
--catalog-db ./data/catalog.db \
--parser-name fixprice \
--batch-size 250Полный sync receiver -> catalog (MySQL):
pip install sqlalchemy pymysql pymorphy3 razdel stop-words
python3 sync_receiver_to_catalog.py \
--receiver-db 'mysql+pymysql://user:pass@127.0.0.1:3306/receiver' \
--catalog-db 'mysql+pymysql://user:pass@127.0.0.1:3306/catalog' \
--parser-name fixprice \
--batch-size 250Конвертер может удалять duplicate image URLs сразу в момент upsert_many:
CONVERTER_STORAGE_BASE_URL(илиSTORAGE_BASE_URL) — базовый URL storage.CONVERTER_STORAGE_API_TOKEN(илиSTORAGE_API_TOKEN) — токенBearer.CONVERTER_STORAGE_DELETE_TIMEOUT_SEC— timeoutDELETEзапроса (по умолчанию10).CONVERTER_STORAGE_DELETE_STRICT— если1/true, ошибка удаления прерывает обработку.
Удаление выполняется только для URL текущего storage origin и путей /images/<name>.
Запуск daemon-процесса (очередь задач + HTTP API):
python3 converter_daemon.py \
--host 127.0.0.1 \
--port 8090 \
--receiver-db ../receiver/data/receiver.db \
--catalog-db ./data/catalog.db \
--parser-name fixprice \
--batch-size 250 \
--max-queue-size 100HTTP точки:
GET /health— состояние воркера и очереди.POST /trigger— поставить sync-задачу в очередь.
Пример trigger-запроса:
curl -X POST http://127.0.0.1:8090/trigger \
-H 'Content-Type: application/json' \
-d '{"parser_name":"fixprice","run_id":"<receiver-run-id>","source":"receiver"}'Дедупликация очереди выполняется по ключу (receiver_db, catalog_db, parser_name):
пока задача с тем же ключом в pending/active, повторный trigger не создаст дубль.
В receiver добавлен post-run hook: при status=success он отправляет trigger в converter daemon.
Переменные окружения receiver:
CONVERTER_TRIGGER_URL— напримерhttp://127.0.0.1:8090/trigger.CONVERTER_TRIGGER_TOKEN— bearer token (если у daemon задан--auth-token).CONVERTER_TRIGGER_TIMEOUT_SEC— timeout запроса (по умолчанию3).CONVERTER_TRIGGER_RECEIVER_DB— опционально переопределяетreceiver_dbв payload.CONVERTER_TRIGGER_CATALOG_DB— опционально переопределяетcatalog_dbв payload.CONVERTER_TRIGGER_BATCH_SIZE— опционально переопределяет batch size.CONVERTER_TRIGGER_MAX_BATCHES— опционально ограничивает число batch-ов за один trigger.
- Создать папку
converter/parsers/<parser_name>/. - Реализовать
<ParserName>Handler(BaseParserHandler). - Зарегистрировать в
converter/parsers/__init__.py. - При необходимости добавить parser-specific normalizers/patterns.
../dataclass../storage../receiver