Skip to content

MaksimShevtsov/fastapi-request-pipeline

Repository files navigation

fastapi-request-pipeline

CI codecov PyPI Version Python 3.11+ License Code style: ruff

A composable, type-safe request processing pipeline for FastAPI. Build clean, maintainable APIs with reusable components for authentication, permissions, rate limiting, and more.

Features

  • Composable Architecture - Build complex request flows from simple, reusable FlowComponent instances
  • Type-Safe - Full type hints with strict mypy checking
  • Built-in Components - Authentication (JWT, API Key, Cookie), permissions, rate limiting, pagination, filters
  • Flow Composition - Layer and merge flows at app, router, and route levels
  • OpenAPI Integration - Automatic OpenAPI schema enrichment with security requirements
  • Debug Mode - Detailed execution traces for development
  • Extensible - Easy to create custom FlowComponent subclasses
  • Production Ready - Comprehensive test suite and production usage

Quick Start

pip install fastapi-request-pipeline
from fastapi import FastAPI, Depends
from fastapi_request_pipeline import (
    Flow,
    JWTAuthentication,
    HasRole,
    RateLimit,
    RequestContext,
    flow_dependency,
    enrich_openapi,
)

app = FastAPI()

# Define your flow
async def decode_jwt(token: str) -> dict:
    # Your JWT decoding logic
    return {"sub": "user123", "roles": ["admin"]}

admin_flow = Flow(
    JWTAuthentication(decode=decode_jwt),
    HasRole("admin"),
    RateLimit(rate=100, window_seconds=60),
)

# Use as dependency
@app.get("/admin/dashboard")
async def admin_dashboard(
    ctx: RequestContext = Depends(flow_dependency(admin_flow))
):
    return {"user": ctx.user, "message": "Welcome, admin!"}

# Enrich OpenAPI schema
enrich_openapi(app)

Core Concepts

Flow

A Flow is an ordered container of FlowComponent instances that process requests sequentially. FlowComponent instances are automatically sorted by ComponentCategory to ensure proper execution order (authentication → permissions → feature flags → throttling → filters → pagination → custom).

flow = Flow(
    JWTAuthentication(decode=decode_jwt),
    HasPermission("posts:write"),
    RateLimit(rate=100, window_seconds=60),
)

Built-in FlowComponent Classes

Built-in FlowComponent classes:

Authentication

  • JWTAuthentication - Bearer token authentication
  • APIKeyAuthentication - API key in headers
  • CookieAuthentication - Session cookies
  • AllowAnonymous - Skip authentication

Permissions

  • Authenticated - Require authenticated user
  • HasRole - Role-based access control
  • HasPermission - Permission-based access control

Rate Limiting

  • RateLimit - Configurable rate limiting with pluggable backends
  • InMemoryThrottleBackend - Default in-memory backend
  • Custom backends (e.g., Redis) via ThrottleBackend protocol

Filters & Pagination

  • QueryFilter - Extract selected query parameters into RequestContext.state
  • LimitOffset - Offset-based pagination

Flow Composition

Compose flows at different levels with merge_flows():

from fastapi_request_pipeline import merge_flows, OverrideFlow, DisableFlow

# Application-wide defaults
app_flow = Flow(
    JWTAuthentication(decode=decode_jwt),
    RateLimit(rate=1000, window_seconds=3600)
)

# Router-specific additions
admin_flow = Flow(HasRole("admin"))

# Route-specific overrides
public_flow = Flow(OverrideFlow(AllowAnonymous()))

# Merge with last-writer-wins per category
final_flow = merge_flows(app_flow, admin_flow, public_flow)

Custom FlowComponent Subclasses

Create custom FlowComponent subclasses by extending the base class:

from fastapi_request_pipeline import FlowComponent, ComponentCategory

class AuditLog(FlowComponent):
    category = ComponentCategory.CUSTOM

    async def resolve(self, ctx: RequestContext) -> None:
        await log_request(ctx.user, ctx.request.url.path)

Documentation

Requirements

  • Python 3.11+
  • FastAPI 0.100+

Development

# Install with dev dependencies
uv sync --extra dev

# Run tests
uv run pytest

# Run tests with coverage
uv run pytest --cov

# Lint
uv run ruff check .

# Format
uv run ruff format .

# Type check
uv run mypy --strict src/

Why Use This Library?

Instead of this:

@app.get("/posts")
async def get_posts(request: Request):
    # Authentication
    token = request.headers.get("Authorization")
    if not token or not token.startswith("Bearer "):
        raise HTTPException(401)
    user = decode_jwt(token[7:])

    # Permission check
    if "posts:read" not in user.permissions:
        raise HTTPException(403)

    # Rate limiting
    if not check_rate_limit(user.id):
        raise HTTPException(429)

    # Pagination
    limit = int(request.query_params.get("limit", 20))
    offset = int(request.query_params.get("offset", 0))

    # Business logic
    return get_posts_from_db(limit, offset)

Write this:

posts_flow = Flow(
    JWTAuthentication(decode=decode_jwt),
    HasPermission("posts:read"),
    RateLimit(rate=100, window_seconds=60),
    LimitOffset(default_limit=20),
)

@app.get("/posts")
async def get_posts(ctx: RequestContext = Depends(flow_dependency(posts_flow))):
    pagination = ctx.state["pagination"]
    return get_posts_from_db(pagination["limit"], pagination["offset"])

Benefits:

  • Separation of concerns - Security logic separated from business logic
  • Reusability - Define flows once, use across multiple endpoints
  • Composability - Mix and match FlowComponent instances, override at any level
  • Maintainability - Changes to auth/permissions in one place
  • Type safety - Full type hints and IDE support
  • Testability - FlowComponent instances are easy to unit test
  • Documentation - OpenAPI schema automatically reflects security requirements

License

MIT

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors