Skip to content

Multi-threaded / Concurrent Data Pipeline #19

@W1ndrunn3rr

Description

@W1ndrunn3rr

Refactor the Prefect pipeline for parallel document processing.

  • Replace the sequential for page in pages in pipeline.py with asyncio.gather() or Prefect's task.submit() (thread/process pool)
  • Each page processes extract → generate Cypher → populate in parallel (configurable concurrency limit)
  • Add idempotency: track processed docs (hash in DB)
  • Integration test: mock 10 pages, assert concurrent write to Neo4j (no schema reflection race)

Metadata

Metadata

Assignees

Labels

DataData related taskMLOPSMlops related task

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions