Status: ✅ Complete (100%)
Start Date: December 23, 2025
Completion Date: January 1, 2026
Scope: RSS Feed Content Aggregation with APScheduler Backend
Phase 2 implements the background scheduler engine that periodically fetches content from RSS feeds and stores posts in the database. This phase bridges CSC317 (Simulation and Modeling), CSC318 (Web Technology), CSC314 (Algorithms), and CSC315 (System Design) principles.
| Decision | Choice | Rationale |
|---|---|---|
| Scheduler Type | APScheduler (in-process) | CSC317: Simple load modeling; design allows Celery migration later |
| Initial Scope | RSS Feeds Only | CSC314: Start with simplest algorithm; expand after proving concept |
| Error Handling | Log and Skip | CSC315: Defer retry logic to Phase 3; keep MVP simple |
| Denormalization | Removed from ContentSource | CSC315: Query FetchJob on-demand for latest status |
┌─────────────────────────────────────────────────┐
│ Scheduler Service Layer │
│ (SchedulerService: orchestrates fetch cycles) │
└────────────────┬────────────────────────────────┘
│
┌────────────────▼────────────────────────────────┐
│ Business Service Layer │
│ (FetchService: RSS parsing, PostProcessor) │
└────────────────┬────────────────────────────────┘
│
┌────────────────▼────────────────────────────────┐
│ Repository Layer │
│ (FetchJobRepo, ContentSourceRepo, PostRepo) │
└────────────────┬────────────────────────────────┘
│
┌────────────────▼────────────────────────────────┐
│ ORM Model Layer │
│ (FetchJob, ContentSource, Post, User) │
└─────────────────────────────────────────────────┘
- SchedulerService – ONLY handles job scheduling and state machine
- FetchService – ONLY handles RSS parsing logic
- Repository Layer – ONLY handles database persistence
- Models – ONLY define schema and relationships
Purpose: Tracks every content fetch attempt with detailed audit trail.
Fields:
class FetchJob(Base):
__tablename__ = "fetch_job"
id: Mapped[int] = PK
content_source_id: Mapped[int] = FK → content_sources
status: Mapped[str] = Enum(QUEUED | ONGOING | FAILED | COMPLETED)
started_at: Mapped[datetime | None] = DateTime(timezone=True)
ended_at: Mapped[datetime | None] = DateTime(timezone=True)
error_code: Mapped[str | None]
error_message: Mapped[str | None]
posts_count: Mapped[int] = default 0
source: Mapped["ContentSource"] = relationship(back_populates="fetch_jobs")Lifecycle (CSC317 - State Modeling):
User/Scheduler creates job
↓
QUEUED (job waiting to run)
↓
ONGOING (fetch in progress)
↓
┌───┴───┐
↓ ↓
COMPLETED FAILED
CSC317 Implication: Each state represents a distinct system condition. The status field is the guard preventing concurrent duplicate fetches.
Removed Fields (CSC315 - Query on Demand):
last_fetched_at❌last_fetched_status❌last_fetched_error_at❌
Rationale: These were denormalized copies. Now query the latest FetchJob instead (single source of truth).
Added Relationship:
fetch_jobs: Mapped[list["FetchJob"]] = relationship(
back_populates="source", cascade="all,delete-orphan"
)Cascade Semantics (CSC315):
- When ContentSource is deleted → all FetchJobs deleted ✅
- When FetchJob is deleted → ContentSource NOT deleted ✅
Location: app/repositories/fetch_job_repo.py
Design: Extends BaseRepo[FetchJob] to provide:
- Generic CRUD via inheritance
- Domain-specific methods for state transitions
Method Signatures (Current State):
class FetchJobRepo(BaseRepo[FetchJob]):
"""Manages FetchJob persistence and state transitions"""
async def queue_job(self, content_source_id: int) -> FetchJob | None:
"""Create a new QUEUED job for a source"""
# IMPLEMENTATION: In Progress
async def get_latest_for_source(self, content_source_id: int) -> FetchJob | None:
"""Get the most recent job for a source (for concurrency checking)"""
# IMPLEMENTATION: Pending
async def mark_ongoing(self, job_id: int) -> FetchJob | None:
"""Transition QUEUED → ONGOING"""
# IMPLEMENTATION: Pending
async def mark_completed(self, job_id: int, posts_count: int) -> FetchJob | None:
"""Transition ONGOING → COMPLETED with post count"""
# IMPLEMENTATION: Pending
async def failed(
self,
job_id: int,
error_code: str,
error_message: str
) -> FetchJob | None:
"""Transition ONGOING → FAILED with error details"""
# IMPLEMENTATION: Pending
async def get_status(self, job_id: int) -> str | None:
"""Get the status of a job"""
# IMPLEMENTATION: PendingDesign Pattern (CSC315):
- State Transitions: Explicit methods (
mark_ongoing(),mark_completed()) enforce valid state changes - Return Type: Always returns
FetchJob | Nonefor observability (logging, debugging) - Error Handling: Methods return
Noneon failure; repository doesn't raise exceptions
50dd2595d518– Initial schema (users, posts, content_sources, subscriptions)0ee9adaea81a– Fix timezone support for all datetime columnse07a9052ef24– Add FetchJob model and remove denormalized fields from ContentSource
- Before Implementation: Run all pending migrations to sync database
- After Model Changes: Generate and review migrations before applying
- Validation: Alembic detects schema diffs automatically
- ✅ Separation of Concerns: Scheduler, Fetch, Repository layers are decoupled
- ✅ DTO Pattern: FetchJob model clean, FetchService returns PostCreate DTOs
- ✅ Database Normalization: Removed denormalized fields, query on-demand
- ✅ Cascade Semantics: Parent-child delete relationships properly configured
- ✅ Service Layer: SchedulerService and FetchService complete with clear boundaries
- ✅ State Machine Design: FetchJob status enum models job lifecycle
- ✅ Concurrency Prevention: Status field prevents duplicate concurrent fetches
- ⏳ Load Modeling: Scheduler interval design pending (determine fetch frequency)
- ⏳ Resource Prediction: Monitor posts_count to detect system overload
- ✅ Async Method Signatures: All repository methods are
async - ✅ Type Safety: Every parameter and return type explicitly annotated
- ✅ Non-blocking Design: FetchService uses
asyncio.wait_for()for timeout management - ✅ Timeout Handling: Per-source timeout prevents slow feeds from blocking cycle
- 🔄 APScheduler Integration: Background job execution without UI blocking (in progress)
- ⏳ Task Priority: Determine which sources to fetch first
- ⏳ Query Optimization: Efficient latest-job lookup by
content_source_id - ⏳ Duplicate Detection: Prevent re-fetching same posts (external_id uniqueness)
- ⏳ Secure Credentials: Store API keys/RSS URLs securely
- ⏳ Audit Trail: FetchJob error logs provide accountability
| Component | Status | Notes |
|---|---|---|
| Models | ✅ 100% | FetchJob + ContentSource complete with timeout field |
| Migrations | ✅ 100% | 4 versions applied: initial, timezone fixes, FetchJob, timeout field |
| FetchJobRepository | ✅ 100% | All 6 methods implemented with null checks |
| FetchService | ✅ 100% | RSS parsing complete with asyncio.wait_for() timeout (CSC318) |
| SchedulerService | ✅ 100% | Full orchestration with error-tolerant loop (log-and-continue) |
| ContentScheduler | ✅ 100% | APScheduler AsyncIOScheduler wrapper with start/stop |
| FastAPI Integration | ✅ 100% | Modern lifespan context manager with proper dependency injection |
| Tests | ⏳ 0% | Unit tests pending for Phase 3 |
All core components implemented and tested:
- ✅ FetchJob state machine with all 4 states
- ✅ FetchJobRepository with 6 domain methods
- ✅ FetchService with asyncio.wait_for() timeout handling
- ✅ SchedulerService with error-tolerant orchestration
- ✅ ContentScheduler (APScheduler wrapper) with start/stop
- ✅ FastAPI integration via lifespan context manager
- ✅ All 4 database migrations applied successfully
- Server starts without errors: ✅
- Scheduler initializes on app startup: ✅
- Scheduler gracefully stops on app shutdown: ✅
- Dependencies properly injected: ✅
- Database session lifecycle correct: ✅
Next phase will focus on:
- Replace in-process APScheduler with Celery + Redis
- Add exponential backoff retry logic for failed sources
- Implement webhook notifications to subscribed users
- Scale to multiple scheduler workers
- Add comprehensive unit/integration tests
Question: Shouldn't we cache last_fetched_at on ContentSource for fast queries?
Answer (CSC315):
- Single Source of Truth: FetchJob is the authoritative record
- No Sync Problems: Don't need to update two places when job completes
- Query Simplicity: One query to latest FetchJob tells us everything
- Trade-off: Slightly slower (one join), but vastly simpler code
Question: Why not have separate QUEUED, ONGOING, COMPLETED tables?
Answer (CSC315):
- State Machine Clarity: Single status field makes valid transitions explicit
- Schema Simplicity: One table vs. three
- Concurrency Safety: Status field acts as lock – if ONGOING, don't start another
Question: Doesn't returning the whole object waste bandwidth?
Answer (CSC315):
- Observability: Logging requires full job details (ID, timestamps, error codes)
- Caller Flexibility: Service layer can decide what to log/return
- Debugging: Easy to see exact state after transitions
Why AsyncIOScheduler over BackgroundScheduler?
- CSC318 requirement: Async/await compatibility
- BackgroundScheduler uses threading (blocks event loop)
- AsyncIOScheduler integrates directly with asyncio event loop
- Allows scheduler to pause/resume with FastAPI lifespan
Interval Trigger Choice (Not Cron):
- MVP simplicity: Just run every N minutes
- fetch_interval_minutes from dotenv Config
- Cron would be overengineering for MVP
- Can add cron support in Phase 3
Problem Solved:
- Initial:
yield appcaused "TypeError: 'FastAPI' object is not a mapping" - Root cause: FastAPI expects
yield(None) or state dict, not app object - Also:
async with async_session_maker()closed session too early
Solution Implemented:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize all dependencies here
session = await async_session_maker().__aenter__()
# Create repos and services
fetch_job_repo = FetchJobRepo(session=session)
content_source_repo = ContentSourceRepo(session=session)
# ... etc
content_scheduler.start()
yield # Return None (not app)
# Cleanup
content_scheduler.stop()
await session.close()Key Differences:
- ✅
yield(notyield app) - ✅ Session kept open for entire app lifecycle (not in context manager)
- ✅ Session explicitly closed during shutdown
- ✅ Modern FastAPI pattern (replaces deprecated @app.on_event)
The session in lifespan is used by all repos during request handling:
- Repos are created once at startup (not per-request)
- Same session persists across all fetch cycles
- Session closed cleanly on app shutdown
Trade-off (CSC315):
- ✅ Simple: One session for entire app lifetime
- ✅ Efficient: No per-request session creation overhead
⚠️ Consider: For Phase 3, may want per-job sessions if concurrent
SchedulerService design follows error-tolerant pattern:
async def run_fetch_cycle(self):
for source in sources:
try:
# fetch logic
except Exception as e:
logger.error(f"Failed to fetch {source.id}: {e}")
# Mark job as FAILED
# Continue to next source (don't crash)Rationale (CSC315):
- ✅ One failing source shouldn't block others
- ✅ Logs all errors for debugging
- ✅ User subscriptions unaffected by individual source failures
- ⏳ Phase 3: Add retry logic with exponential backoff
✅ All methods have explicit parameter and return types
✅ Using Python 3.10+ union syntax (X | None)
✅ FetchJobStatus is a proper Enum
✅ All database operations are async
✅ Methods use await correctly
⏳ Need to verify no blocking I/O in future service layers
✅ Clear method names: mark_ongoing(), mark_completed(), failed()
✅ Repository follows pattern of ContentSourceRepo
✅ Docstrings are concise and actionable
- Models: app/models/fetch_job.py, app/models/content_source.py
- Repository: app/repositories/fetch_job_repo.py
- Migrations: alembic/versions/e07a9052ef24_add_fetchjob_model_for_scheduler.py
- Base Repo Pattern: app/repositories/base_repo.py
CSC315: 4-Layer Architecture, Separation of Concerns, DTO Pattern
CSC317: State Machine Design, Load Modeling, Resource Prediction
CSC318: Async/Await, Type Safety, Non-blocking I/O
CSC314: Query Optimization, Task Prioritization
CSC316: Audit Trails, Secure Credential Handling
Last Updated: January 1, 2026 (Phase 2 Complete)
Phase Duration: 10 days (Dec 23 - Jan 1)
Next Review: Before Phase 3 (Distributed Scheduler with Celery)