A composable, type-safe request processing pipeline for FastAPI. Build clean, maintainable APIs with reusable components for authentication, permissions, rate limiting, and more.
- Composable Architecture - Build complex request flows from simple, reusable FlowComponent instances
- Type-Safe - Full type hints with strict mypy checking
- Built-in Components - Authentication (JWT, API Key, Cookie), permissions, rate limiting, pagination, filters
- Flow Composition - Layer and merge flows at app, router, and route levels
- OpenAPI Integration - Automatic OpenAPI schema enrichment with security requirements
- Debug Mode - Detailed execution traces for development
- Extensible - Easy to create custom FlowComponent subclasses
- Production Ready - Comprehensive test suite and production usage
pip install fastapi-request-pipelinefrom fastapi import FastAPI, Depends
from fastapi_request_pipeline import (
Flow,
JWTAuthentication,
HasRole,
RateLimit,
RequestContext,
flow_dependency,
enrich_openapi,
)
app = FastAPI()
# Define your flow
async def decode_jwt(token: str) -> dict:
# Your JWT decoding logic
return {"sub": "user123", "roles": ["admin"]}
admin_flow = Flow(
JWTAuthentication(decode=decode_jwt),
HasRole("admin"),
RateLimit(rate=100, window_seconds=60),
)
# Use as dependency
@app.get("/admin/dashboard")
async def admin_dashboard(
ctx: RequestContext = Depends(flow_dependency(admin_flow))
):
return {"user": ctx.user, "message": "Welcome, admin!"}
# Enrich OpenAPI schema
enrich_openapi(app)A Flow is an ordered container of FlowComponent instances that process requests sequentially. FlowComponent instances are automatically sorted by ComponentCategory to ensure proper execution order (authentication → permissions → feature flags → throttling → filters → pagination → custom).
flow = Flow(
JWTAuthentication(decode=decode_jwt),
HasPermission("posts:write"),
RateLimit(rate=100, window_seconds=60),
)Built-in FlowComponent classes:
Authentication
JWTAuthentication- Bearer token authenticationAPIKeyAuthentication- API key in headersCookieAuthentication- Session cookiesAllowAnonymous- Skip authentication
Permissions
Authenticated- Require authenticated userHasRole- Role-based access controlHasPermission- Permission-based access control
Rate Limiting
RateLimit- Configurable rate limiting with pluggable backendsInMemoryThrottleBackend- Default in-memory backend- Custom backends (e.g., Redis) via
ThrottleBackendprotocol
Filters & Pagination
QueryFilter- Extract selected query parameters into RequestContext.stateLimitOffset- Offset-based pagination
Compose flows at different levels with merge_flows():
from fastapi_request_pipeline import merge_flows, OverrideFlow, DisableFlow
# Application-wide defaults
app_flow = Flow(
JWTAuthentication(decode=decode_jwt),
RateLimit(rate=1000, window_seconds=3600)
)
# Router-specific additions
admin_flow = Flow(HasRole("admin"))
# Route-specific overrides
public_flow = Flow(OverrideFlow(AllowAnonymous()))
# Merge with last-writer-wins per category
final_flow = merge_flows(app_flow, admin_flow, public_flow)Create custom FlowComponent subclasses by extending the base class:
from fastapi_request_pipeline import FlowComponent, ComponentCategory
class AuditLog(FlowComponent):
category = ComponentCategory.CUSTOM
async def resolve(self, ctx: RequestContext) -> None:
await log_request(ctx.user, ctx.request.url.path)- User Guide - Comprehensive guide with examples
- API Reference - Complete API documentation
- CI/CD and Release Guide - Versioning rules, GitHub Releases, TestPyPI/PyPI publish pipeline
- Examples - Working code examples
- Python 3.11+
- FastAPI 0.100+
# Install with dev dependencies
uv sync --extra dev
# Run tests
uv run pytest
# Run tests with coverage
uv run pytest --cov
# Lint
uv run ruff check .
# Format
uv run ruff format .
# Type check
uv run mypy --strict src/Instead of this:
@app.get("/posts")
async def get_posts(request: Request):
# Authentication
token = request.headers.get("Authorization")
if not token or not token.startswith("Bearer "):
raise HTTPException(401)
user = decode_jwt(token[7:])
# Permission check
if "posts:read" not in user.permissions:
raise HTTPException(403)
# Rate limiting
if not check_rate_limit(user.id):
raise HTTPException(429)
# Pagination
limit = int(request.query_params.get("limit", 20))
offset = int(request.query_params.get("offset", 0))
# Business logic
return get_posts_from_db(limit, offset)Write this:
posts_flow = Flow(
JWTAuthentication(decode=decode_jwt),
HasPermission("posts:read"),
RateLimit(rate=100, window_seconds=60),
LimitOffset(default_limit=20),
)
@app.get("/posts")
async def get_posts(ctx: RequestContext = Depends(flow_dependency(posts_flow))):
pagination = ctx.state["pagination"]
return get_posts_from_db(pagination["limit"], pagination["offset"])Benefits:
- Separation of concerns - Security logic separated from business logic
- Reusability - Define flows once, use across multiple endpoints
- Composability - Mix and match FlowComponent instances, override at any level
- Maintainability - Changes to auth/permissions in one place
- Type safety - Full type hints and IDE support
- Testability - FlowComponent instances are easy to unit test
- Documentation - OpenAPI schema automatically reflects security requirements
MIT