|
6 | 6 | from fastapi import FastAPI |
7 | 7 | from fastapi.templating import Jinja2Templates |
8 | 8 | from fastapi.middleware.cors import CORSMiddleware |
| 9 | +from slowapi.errors import RateLimitExceeded |
| 10 | +from slowapi import Limiter, _rate_limit_exceeded_handler |
| 11 | +from slowapi.util import get_remote_address |
9 | 12 | from .utils import generate_uuid |
10 | 13 |
|
| 14 | +limiter = Limiter(key_func=get_remote_address) |
11 | 15 | app = FastAPI(title="paste.py 🐍") |
| 16 | +app.state.limiter = limiter |
| 17 | +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) |
12 | 18 |
|
13 | 19 | origins = ["*"] |
14 | 20 |
|
|
24 | 30 |
|
25 | 31 | BASE_DIR = Path(__file__).resolve().parent |
26 | 32 |
|
27 | | -templates = Jinja2Templates(directory=str(Path(BASE_DIR, 'templates'))) |
| 33 | +templates = Jinja2Templates(directory=str(Path(BASE_DIR, "templates"))) |
28 | 34 |
|
29 | 35 |
|
30 | 36 | @app.post("/file") |
31 | | -def post_as_a_file(file: UploadFile = File(...)): |
| 37 | +@limiter.limit("100/minute") |
| 38 | +async def post_as_a_file(request: Request, file: UploadFile = File(...)): |
32 | 39 | try: |
33 | 40 | uuid = generate_uuid() |
34 | 41 | if uuid in large_uuid_storage: |
35 | 42 | uuid = generate_uuid() |
36 | 43 | path = f"data/{uuid}" |
37 | | - with open(path, 'wb') as f: |
| 44 | + with open(path, "wb") as f: |
38 | 45 | shutil.copyfileobj(file.file, f) |
39 | 46 | large_uuid_storage.append(uuid) |
40 | 47 | print(large_uuid_storage) |
41 | 48 | except Exception: |
42 | 49 | # return {"message": "There was an error uploading the file"} |
43 | | - raise HTTPException(detail="There was an error uploading the file", |
44 | | - status_code=status.HTTP_403_FORBIDDEN) |
| 50 | + raise HTTPException( |
| 51 | + detail="There was an error uploading the file", |
| 52 | + status_code=status.HTTP_403_FORBIDDEN, |
| 53 | + ) |
45 | 54 | finally: |
46 | 55 | file.file.close() |
47 | 56 |
|
48 | 57 | return PlainTextResponse(uuid, status_code=status.HTTP_201_CREATED) |
49 | 58 |
|
50 | 59 |
|
51 | 60 | @app.get("/paste/{uuid}") |
52 | | -def post_as_a_text(uuid): |
| 61 | +async def post_as_a_text(uuid): |
53 | 62 | path = f"data/{uuid}" |
54 | 63 | try: |
55 | | - with open(path, 'rb') as f: |
| 64 | + with open(path, "rb") as f: |
56 | 65 | return PlainTextResponse(f.read()) |
57 | 66 | except Exception as e: |
58 | 67 | print(e) |
59 | | - raise HTTPException(detail="404: The Requested Resource is not found", |
60 | | - status_code=status.HTTP_404_NOT_FOUND) |
| 68 | + raise HTTPException( |
| 69 | + detail="404: The Requested Resource is not found", |
| 70 | + status_code=status.HTTP_404_NOT_FOUND, |
| 71 | + ) |
61 | 72 |
|
62 | 73 |
|
63 | 74 | @app.get("/", response_class=HTMLResponse) |
64 | | -def indexpage(request: Request): |
| 75 | +async def indexpage(request: Request): |
65 | 76 | return templates.TemplateResponse("index.html", {"request": request}) |
66 | 77 |
|
67 | 78 |
|
68 | 79 | @app.delete("/paste/{uuid}", response_class=PlainTextResponse) |
69 | | -def delete_paste(uuid): |
| 80 | +async def delete_paste(uuid): |
70 | 81 | path = f"data/{uuid}" |
71 | 82 | try: |
72 | 83 | os.remove(path) |
73 | 84 | return PlainTextResponse(f"File successfully deleted {uuid}") |
74 | 85 | except FileNotFoundError: |
75 | | - raise HTTPException(detail="File Not Found", |
76 | | - status_code=status.HTTP_404_NOT_FOUND) |
| 86 | + raise HTTPException( |
| 87 | + detail="File Not Found", status_code=status.HTTP_404_NOT_FOUND |
| 88 | + ) |
77 | 89 | except Exception as e: |
78 | 90 | raise HTTPException( |
79 | | - detail=f"The exception is {e}", status_code=status.HTTP_409_CONFLICT) |
| 91 | + detail=f"The exception is {e}", status_code=status.HTTP_409_CONFLICT |
| 92 | + ) |
80 | 93 |
|
81 | 94 |
|
82 | 95 | @app.get("/web", response_class=HTMLResponse) |
83 | | -def web(request: Request): |
| 96 | +async def web(request: Request): |
84 | 97 | return templates.TemplateResponse("web.html", {"request": request}) |
85 | 98 |
|
86 | 99 |
|
87 | 100 | @app.post("/web", response_class=PlainTextResponse) |
88 | | -def web_post(content: str = Form(...)): |
89 | | - # print(content) |
90 | | - # return PlainTextResponse(content=content) |
| 101 | +@limiter.limit("100/minute") |
| 102 | +async def web_post(request: Request, content: str = Form(...)): |
91 | 103 | try: |
92 | 104 | file_content = content.encode() |
93 | 105 | uuid = generate_uuid() |
94 | 106 | if uuid in large_uuid_storage: |
95 | 107 | uuid = generate_uuid() |
96 | 108 | path = f"data/{uuid}" |
97 | | - with open(path, 'wb') as f: |
| 109 | + with open(path, "wb") as f: |
98 | 110 | f.write(file_content) |
99 | 111 | large_uuid_storage.append(uuid) |
100 | 112 | except Exception as e: |
101 | | - # return {"message": "There was an error uploading the file"} |
102 | 113 | print(e) |
103 | | - raise HTTPException(detail="There was an error uploading the file", |
104 | | - status_code=status.HTTP_403_FORBIDDEN) |
| 114 | + raise HTTPException( |
| 115 | + detail="There was an error uploading the file", |
| 116 | + status_code=status.HTTP_403_FORBIDDEN, |
| 117 | + ) |
105 | 118 |
|
106 | | - return RedirectResponse(f"http://paste.fosscu.org/paste/{uuid}", status_code=status.HTTP_303_SEE_OTHER) |
| 119 | + return RedirectResponse( |
| 120 | + f"http://paste.fosscu.org/paste/{uuid}", status_code=status.HTTP_303_SEE_OTHER |
| 121 | + ) |
107 | 122 |
|
108 | 123 |
|
109 | 124 | @app.get("/health", status_code=status.HTTP_200_OK) |
110 | | -def health() -> dict[str, str]: |
| 125 | +async def health() -> dict[str, str]: |
111 | 126 | return {"status": "ok"} |
0 commit comments